State management with real-time network access.
ooo provides a fast, zero-configuration In-memory layer for storing and synchronizing application state or settings. It uses an embedded storage engine with optional persistence via ko, and delivers changes to subscribers using JSON Patch for efficient updates.
- Application state/settings that need real-time sync across clients
- Prototyping real-time features quickly
- Small to medium datasets where speed matters more than scale
For large-scale data storage (millions of records, complex queries), use a dedicated database like nopog. You can combine both: ooo for real-time state, nopog for bulk data.
| Package | Description |
|---|---|
| ooo | Core server - in-memory state with WebSocket/REST API |
| ko | Persistent storage adapter (LevelDB) |
| ooo-client | JavaScript client with reconnecting WebSocket |
| auth | JWT authentication middleware |
| mono | Full-stack boilerplate (Go + React) |
| nopog | PostgreSQL adapter for large-scale storage |
| pivot | Multi-instance synchronization (AP distributed) |
- Dynamic routing with glob patterns for collections
- Real-time subscriptions via WebSocket
- JSON Patch updates for efficient sync
- Version checking (no data sent on version match while reconnecting)
- RESTful CRUD reflected to subscribers
- Filtering and audit middleware
- Auto-managed timestamps (created, updated) with a monotonic clock for consistency on ntp/ptp synchronizations
- Built-in web UI for data management
There's a js client.
with go installed get the library
go get github.com/benitogf/ooocreate a file main.go
package main
import "github.com/benitogf/ooo"
func main() {
server := ooo.Server{}
server.Start("0.0.0.0:8800")
server.WaitClose()
}run the service:
go run main.go| Method | Description | URL |
|---|---|---|
| GET | Web interface | http://{host}:{port}/ |
| GET | List all keys (paginated) | http://{host}:{port}/?api=keys |
| GET | Server info | http://{host}:{port}/?api=info |
| GET | Filter paths | http://{host}:{port}/?api=filters |
| GET | Connection state | http://{host}:{port}/?api=state |
| Parameter | Description | Default |
|---|---|---|
page |
Page number (1-indexed) | 1 |
limit |
Items per page (max 500) | 50 |
filter |
Filter by prefix or glob pattern | (none) |
| Method | Description | URL |
|---|---|---|
| POST | Create/Update | http://{host}:{port}/{key} |
| GET | Read | http://{host}:{port}/{key} |
| PATCH | Partial update (JSON Patch) | http://{host}:{port}/{key} |
| DELETE | Delete | http://{host}:{port}/{key} |
| Method | Description | URL |
|---|---|---|
| WS | Server clock | ws://{host}:{port} |
| WS | Subscribe to path | ws://{host}:{port}/{key} |
Activating this flag will limit the server to process requests defined by filters
server := ooo.Server{}
server.Static = trueFilters control access and transform data. When Static mode is enabled, only filtered routes are available.
Paths support glob patterns (*) and multi-level globs like users/*/posts/*.
| Filter | Description |
|---|---|
OpenFilter |
Enable route (required in static mode) |
WriteFilter |
Transform/validate before write |
AfterWriteFilter |
Callback after write completes |
ReadObjectFilter |
Transform single object on read |
ReadListFilter |
Transform list items on read |
DeleteFilter |
Control delete operations |
LimitFilter |
Maintain max entries in a list (auto-cleanup) |
// Enable a route (required when Static=true)
server.OpenFilter("books/*")// Validate/transform before write
server.WriteFilter("books/*", func(index string, data json.RawMessage) (json.RawMessage, error) {
// return error to deny, or modified data
return data, nil
})// Callback after write completes
server.AfterWriteFilter("books/*", func(index string) {
log.Println("wrote:", index)
})// Transform single object on read
server.ReadObjectFilter("books/special", func(index string, data meta.Object) (meta.Object, error) {
return data, nil
})// Transform list items on read
server.ReadListFilter("books/*", func(index string, items []meta.Object) ([]meta.Object, error) {
return items, nil
})// Control delete (return error to prevent)
server.DeleteFilter("books/protected", func(key string) error {
return errors.New("cannot delete")
})LimitFilter is implemented using a ReadListFilter (to limit visible items), a noop WriteFilter (to allow writes), a DeleteFilter (to allow deletes), and an AfterWriteFilter (to trigger cleanup). This means it includes open read and write access.
Supports count-based limits, time-based retention, or both combined. At least one constraint must be provided.
// Count-only: keep N most recent entries (auto-deletes oldest)
server.LimitFilter("logs/*", ooo.LimitFilterConfig{Limit: 100})
// Time-only: keep entries younger than MaxAge (retention policy)
server.LimitFilter("events/*", ooo.LimitFilterConfig{
MaxAge: 24 * time.Hour,
})
// Combined: both count and time constraints (stricter wins)
server.LimitFilter("metrics/*", ooo.LimitFilterConfig{
Limit: 1000,
MaxAge: 7 * 24 * time.Hour,
})
// Dynamic limit based on runtime state
server.LimitFilter("games/*", ooo.LimitFilterConfig{
LimitFunc: func() int { return getDeviceCap() },
Order: ooo.OrderAsc,
})
// Dynamic max age from external config
server.LimitFilter("audit/*", ooo.LimitFilterConfig{
MaxAgeFunc: func() time.Duration { return getRetentionPolicy() },
})
// With periodic background cleanup
server.LimitFilter("telemetry/*", ooo.LimitFilterConfig{
MaxAge: 30 * 24 * time.Hour,
Cleanup: ooo.CleanupConfig{
Enabled: true,
Interval: 10 * time.Minute, // default: 10min, minimum: 1min
},
})LimitFilterConfig options:
Limit- Maximum number of entriesLimitFunc- Dynamic limit function (func() int)MaxAge- Maximum age of entries (time.Duration)MaxAgeFunc- Dynamic max age function (func() time.Duration)Order- Sort order:OrderDesc(default, most recent first) orOrderAsc(oldest first)Cleanup- Periodic background cleanup config (CleanupConfig{Enabled, Interval})Description- Human-readable description for the explorer UISchema- JSON schema struct for UI display
server.Audit = func(r *http.Request) bool {
// return true to allow, false to deny (401)
return r.Header.Get("X-API-Key") == "secret"
}Register custom HTTP endpoints with typed schemas visible in the UI.
server.Endpoint(ooo.EndpointConfig{
Path: "/policies/{id}",
Description: "Manage access control policies",
// Vars are route variables (mandatory) - auto-extracted from {id} in path
Vars: ooo.Vars{"id": "Policy ID"},
Methods: ooo.Methods{
"GET": ooo.MethodSpec{
Response: PolicyResponse{},
// Params are query parameters (optional) - per method
Params: ooo.Params{"filter": "Optional filter value"},
},
"PUT": ooo.MethodSpec{
Request: Policy{},
Response: PolicyResponse{},
},
},
Handler: func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"] // Route variable (mandatory)
filter := r.URL.Query().Get("filter") // Query param (optional)
// ... handle request
},
})Forward filters from remote ooo servers with path remapping.
// Proxy /settings/{deviceID} → /settings on remote
proxy.Route(server, "settings/*", proxy.Config{
Resolve: func(localPath string) (address, remotePath string, err error) {
return "localhost:8800", "settings", nil
},
})
// Proxy list routes: /items/{deviceID}/* → /items/* on remote
proxy.RouteList(server, "items/*/*", proxy.Config{
Resolve: func(localPath string) (address, remotePath string, err error) {
parts := strings.SplitN(localPath, "/", 3)
if len(parts) == 3 {
return "localhost:8800", "items/" + parts[2], nil
}
return "localhost:8800", "items/*", nil
},
})These functions handle JSON serialization/deserialization and provide a more convenient way to work with your data structures than using storage api directly.
// Get retrieves a single item from the specified path
item, err := ooo.Get[YourType](server, "path/to/item")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Item: %+v\n", item.Data)// GetList retrieves all items from a list path (ends with "/*")
items, err := ooo.GetList[YourType](server, "path/to/items/*")
if err != nil {
log.Fatal(err)
}
for _, item := range items {
fmt.Printf("Item: %+v (created: %v)\n", item.Data, item.Created)
}// Set creates or updates an item at the specified path
err := ooo.Set(server, "path/to/item", YourType{
Field1: "value1",
Field2: "value2",
})
if err != nil {
log.Fatal(err)
}// Push adds an item to a list (path must end with "/*")
index, err := ooo.Push(server, "path/to/items/*", YourType{
Field1: "new item",
Field2: "another value",
})
if err != nil {
log.Fatal(err)
}
fmt.Println("Created at:", index)// Delete removes item(s) at the specified path
// For single item: ooo.Delete(server, "path/to/item")
// For glob pattern: ooo.Delete(server, "items/*") removes all matching items
err := ooo.Delete(server, "path/to/item")
if err != nil {
log.Fatal(err)
}Perform operations on remote ooo servers using the io package.
cfg := io.RemoteConfig{
Client: &http.Client{Timeout: 10 * time.Second},
Host: "localhost:8800",
SSL: false, // set to true for HTTPS
}item, err := io.RemoteGet[YourType](cfg, "path/to/item")err := io.RemoteSet(cfg, "path/to/item", YourType{Field1: "value"})err := io.RemotePush(cfg, "path/to/items/*", YourType{Field1: "new item"})items, err := io.RemoteGetList[YourType](cfg, "path/to/items/*")err := io.RemoteDelete(cfg, "path/to/item")Use the Go WebSocket client to subscribe to real-time updates.
go client.SubscribeList(client.SubscribeConfig{
Ctx: ctx,
Server: client.Server{Protocol: "ws", Host: "localhost:8800"},
}, "items/*", client.SubscribeListEvents[Item]{
OnMessage: func(items []client.Meta[Item]) { /* handle updates */ },
OnError: func(err error) { /* handle error */ },
})go client.Subscribe(client.SubscribeConfig{
Ctx: ctx,
Server: client.Server{Protocol: "ws", Host: "localhost:8800"},
}, "config", client.SubscribeEvents[Config]{
OnMessage: func(item client.Meta[Config]) { /* handle updates */ },
OnError: func(err error) { /* handle error */ },
})For JavaScript, use ooo-client.
ooo includes a built-in web-based ui to manage and monitor your data. The ui is automatically available at the root path (/) when the server starts.
- Storage Browser - Browse all registered filters and their data
- Live Mode - Real-time WebSocket subscriptions with automatic updates
- Static Mode - Traditional CRUD operations with JSON editor
- State Monitor - View active WebSocket connections and subscriptions
- Filter Management - Visual representation of filter types (open, read-only, write-only, custom, limit)
server.Close(sig) runs a bounded graceful shutdown: it drains in-flight HTTP requests up to Server.Deadline, closes WebSocket connections, closes the storage backend, and runs any user-supplied teardown callbacks. server.WaitClose() is the convenience wrapper that blocks on SIGINT / SIGTERM / SIGHUP and then calls Close with the received signal.
POSIX requires SIGKILL (and SIGSTOP) to be delivered by the kernel with no opportunity for user code to run. When SIGKILL arrives:
- No deferred functions execute.
Closedoes not run.OnClosedoes not run. - Any queued broadcasts, watcher events, and writes that the storage backend has not yet flushed to durable media are lost.
- Connected subscribers receive a TCP RST and re-fetch state on reconnect.
Durability across hard kill is the responsibility of the storage backend, not of Server.Close. Pick a backend whose contract you trust and budget your shutdown window so SIGKILL never has to fire under normal operation.
The orchestrator should send SIGTERM first and grant a grace window at least as long as Server.Deadline plus the worst-case runtime of every user-supplied callback (preClose, proxy, OnClose). SIGKILL should be the orchestrator's stuck-teardown fallback, not the primary shutdown path.
On Kubernetes, set terminationGracePeriodSeconds accordingly. The default of 30s is enough for most configurations; raise it if you've increased Server.Deadline or if your callbacks flush sizable state.
Server exposes three places to register teardown code, each running at a different point in Close:
| Hook | When it runs | Storage / stream / HTTP state | Typical use |
|---|---|---|---|
RegisterPreClose(fn) |
First, before any tear-down | All up | Flush in-memory state to storage, broadcast a "shutting down" message |
RegisterProxyCleanup(fn) |
After preClose, before the stream closes | Stream still up | Unsubscribe from upstream proxy servers |
OnClose (field) |
Last, after everything else is closed | All torn down | Close user-owned resources (DB pools, log handles) |
Callbacks run synchronously and Close blocks until each returns. Today there is no aggregate timeout on user callbacks — a callback that hangs hangs Close, which can push the orchestrator past terminationGracePeriodSeconds and trigger SIGKILL. Keep callbacks short, or make them respect their own timeouts.
HTTP handlers registered via server.Endpoint are not wrapped in a request timeout. During shutdown they receive a cancellation through r.Context().Done() once Server.Deadline elapses, but a handler that ignores its context will keep running and Go cannot kill it. Always check r.Context().Done() in long-running custom handlers if you want them to exit cleanly during shutdown.
