diff --git a/homeguard-iot/Dockerfile b/homeguard-iot/Dockerfile new file mode 100644 index 0000000..3b2dace --- /dev/null +++ b/homeguard-iot/Dockerfile @@ -0,0 +1,23 @@ +# Two-stage build. Produces a small image containing both binaries; the +# docker-compose file decides which one to run via `command:`. + +FROM golang:1.22-alpine AS build +WORKDIR /src + +COPY go.mod go.sum* ./ +RUN go mod download || true + +COPY . . + +RUN CGO_ENABLED=0 go build -o /out/server ./cmd/server +RUN CGO_ENABLED=0 go build -o /out/simulator ./cmd/simulator + +FROM gcr.io/distroless/static:nonroot +WORKDIR /app +COPY --from=build /out/server /app/server +COPY --from=build /out/simulator /app/simulator +USER nonroot:nonroot + +# Use CMD (not ENTRYPOINT) so docker-compose's per-service `command:` fully +# REPLACES this rather than being appended as argv to the default binary. +CMD ["/app/server"] diff --git a/homeguard-iot/README.md b/homeguard-iot/README.md new file mode 100644 index 0000000..a03dbb5 --- /dev/null +++ b/homeguard-iot/README.md @@ -0,0 +1,385 @@ +# HomeGuard IoT — CedarDB Operator Console + +![Screenshot of demo running on 192 cores, 384 GB RAM](./view_of_app_on_192_cores.jpg) + +A small Go project that simulates an alarm-monitoring company's IoT +backplane and showcases CedarDB serving both the OLTP-style operator +console *and* the OLAP-style ingest-rate analytics off the same table at +the same time. + +The pitch: a customer running ~3-4 TB/day of incoming sensor events is +almost certainly running two stacks today — BigQuery for offline analytics +plus some streaming pipeline (Pub/Sub + Dataflow + a KV store) for the +monitoring center operators. This demo collapses both onto a single +CedarDB instance, with a multi-table normalized schema that lets the +dashboard run real joins instead of operating over pre-denormalized flat +tables. + +## Talking points for the demo + +- This is one database. The operator queue, the live event stream, the + drill-down, the footer ingest counters, and the storage growth gauge + all read against the same `events` table while the simulator is + writing to it at hundreds of thousands of rows/sec — the `-rate` knob + goes from 2,500 (gentle demo) to 1,500,000+ (driving toward the + 3 TB/day target on real hardware). + +- Today you'd run BigQuery for the aggregates and Pub/Sub → Dataflow → + Bigtable for the operator console. Two pipelines, two SLAs, one ETL + step in between with replication lag measured in seconds-to-minutes. + +- Notice the joins — Plan tier determines SLA, dispatch center comes + from Region, device code comes from the catalog. BigQuery prefers + denormalized flat tables; CedarDB does these joins on hot data without + flinching. + +## What's inside + +``` +schema.sql -- canonical reference (see internal/db/) +docker-compose.yml -- CedarDB + simulator + dashboard +Dockerfile -- builds both Go binaries (CMD, not ENTRYPOINT!) +cmd/simulator/main.go -- drives the event stream +cmd/server/main.go -- serves the operator dashboard +internal/db/ -- connection pool + embedded schema bootstrap +internal/sim/ -- catalog data, fleet synthesis, alert rules +internal/web/ -- HTTP server, SSE + HTMX, embedded UI assets +``` + +## Schema (8 tables, real joins) + +``` +plans (dimension) monthly_price, sla_seconds +regions (dimension) dispatch center + timezone +device_types (dimension) MOTION, DOOR, SMOKE, CO, WATER, ... +households (dimension) plan_id + region_id; armed/disarmed +devices (dimension) one row per sensor; FK to household + type +events (HOT) 100 K+ rows/sec sustained; the firehose +alerts (HOT, small) derived from triggered events via rules +storage_samples (telemetry) (sampled_at, uncompressed_bytes) per + HG_STORAGE_SAMPLER_INTERVAL; drives the + dashboard growth gauge +``` + +`events.event_id` is a plain `BIGINT` (not `BIGSERIAL`) — the simulator +allocates IDs from an in-process `atomic.Int64` so it can keep the hot +write path on `pgx.CopyFrom`. CedarDB rejects the binary `COPY` frame +when the destination column carries a sequence default ("unable to +cast from void to bigint"), and we never wanted the round trip a +server-side sequence would imply. + +Indexes are sized for the join-heavy reads: `alerts (status, raised_at +DESC)`, `events (household_id, ts DESC)`, `events (kind, ts DESC)`. +See `internal/db/schema.sql` for the full list. + +## Run it (Docker) + +``` +docker compose up --build +# open http://localhost:8080 +``` + +Three containers come up: `hg-cedardb` (the database), `hg-simulator` (the +data generator), and `hg-server` (the dashboard). The simulator embeds +`schema.sql` and applies it automatically on first run. + +The simulator logs every DDL statement it runs and prints its target +event rate; you should see something like: + +``` +hg-simulator | … connected to CedarDB +hg-simulator | … schema-presence probe: households table missing +hg-simulator | … applying schema: 15 statements +hg-simulator | … [ 1/15] CREATE TABLE IF NOT EXISTS plans … — ok +… +hg-simulator | … synthesized fleet: 30000 households · 297218 devices +hg-simulator | … event_id counter seeded at 0 +hg-simulator | … storage sampler: interval=5s +hg-simulator | … ingestor: batchSize=10000 flushInterval=50ms +hg-simulator | … simulator running: writers=8 tickHz=10 target=2500 ev/s hb/tick/writer=31 fleet=297218 devices · ingestors=1 batch=10000 flush=50ms +hg-simulator | … heartbeat: queue=0/64 eventID=24320 delta=2432 rows/s lastCopy=12ms ago copyFails=0 +hg-server | … dashboard listening on :8080 +``` + +If you need a clean slate (drop all tables and re-create), the simulator +takes `-reset-schema`: + +``` +docker compose run --rm simulator /app/simulator -reset-schema +docker compose up +``` + +## Dashboard layout + +``` +┌────────────────────────────────────────────────────────────────────────────┐ +│ ACTIVE ALERTS · SLA-aware refresh HG_ALERTS_REFRESH (1s) │ +│ ┌────────────────────────────────────────────────────────────────────────┐ │ +│ │ SEV · HH · PLAN · REGION / DC · DETAIL · AGE · SLA REMAINING │ │ +│ │ 5 #1024131 Premium Atlanta DC SMOKE detected 12 s 18 s │ │ +│ │ 4 #1009823 Plus Boston DC GLASS_BREAK 03 s 57 s │ │ +│ │ … │ │ +│ └────────────────────────────────────────────────────────────────────────┘ │ +├──────────────────────┬──────────────────────┬──────────────────────────────┤ +│ LIVE EVENT STREAM │ CUSTOMER DRILL-DOWN │ STORAGE GROWTH │ +│ SSE HG_SSE_INTERVAL │ HG_DRILLDOWN_REFRESH │ HG_STORAGE_REFRESH (1s) │ +│ (200 ms) │ (2s · auto-rotates) │ │ +│ │ │ ╭───────────╮ │ +│ 15:42:08 SMOKE kit. │ #1019823 Premium │ │ 25.3 │ MB/s │ +│ 15:42:07 MOTION hall │ ● ARMED │ ╰───────────╯ 1m avg │ +│ 15:42:06 DOOR front│ ──────────────────── │ target 34.7 MB/s · 3 TB/day │ +│ … │ 15:42:08 SMOKE kit. │ total uncompressed 62.0 GB │ +│ │ 15:42:01 MOTION hall │ 1m 25.3 · 5m 23.1 · 15m … │ +└──────────────────────┴──────────────────────┴──────────────────────────────┘ +INSERTs: 312,049 ev/sec · 1,251,514,277 total · 1,270,594 active / 2,022,994 alerts + one table, five concurrent reads · CedarDB · SQL queries↗ +``` + +Each panel is driven by a different query against the same `events`, +`alerts`, and `storage_samples` tables the simulator is still writing +into. Cadences are configurable per-panel — see *Tuning at runtime* +below. + +## The queries that matter + +Hit `http://localhost:8080/static/queries.html` once the dashboard is up +for a syntax-highlighted reference of every SQL statement the app runs, +where it lives in the code, and how often it fires. The three to draw +attention to during a demo: + +**Active-alerts queue (joins 4 tables, refreshes every `HG_ALERTS_REFRESH`):** + +```sql +SELECT a.alert_id, a.severity, a.detail, + EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s, + p.sla_seconds, + p.sla_seconds - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining, + h.household_id, h.address_hash, + p.name AS plan_name, + r.name AS region_name, r.dispatch_center +FROM alerts a +JOIN households h ON h.household_id = a.household_id +JOIN plans p ON p.plan_id = h.plan_id +JOIN regions r ON r.region_id = h.region_id +WHERE a.status = 'active' +ORDER BY a.severity DESC, a.raised_at ASC +LIMIT 25 +``` + +**Live event stream (joins 5 tables, server-pushed every `HG_SSE_INTERVAL`):** + +```sql +SELECT e.event_id, e.ts, e.household_id, h.address_hash, + dt.code, d.location, e.kind, e.severity, + COALESCE(e.battery_pct, -1), r.name +FROM events e +JOIN devices d ON d.device_id = e.device_id +JOIN device_types dt ON dt.device_type_id = d.device_type_id +JOIN households h ON h.household_id = e.household_id +JOIN regions r ON r.region_id = h.region_id +WHERE e.kind > 0 +ORDER BY e.ts DESC +LIMIT 25 +``` + +**Live ingest rate (the meta-query, refreshes every `HG_STATS_REFRESH`):** + +The footer's events/sec and total-events counters come from +`storage_samples` — *not* from a `COUNT(*)` over a billion-row events +table. The simulator writes `(now(), 48 × eventID)` into +`storage_samples` every `HG_STORAGE_SAMPLER_INTERVAL` from its +in-process atomic counter, so the two most recent rows give both the +size and the rate without ever scanning the hot table. + +```sql +SELECT + (SELECT uncompressed_bytes FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes, + (SELECT sampled_at FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_ts, + (SELECT uncompressed_bytes FROM storage_samples + WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples) + ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes, + (SELECT sampled_at FROM storage_samples + WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples) + ORDER BY sampled_at DESC LIMIT 1) AS prior_ts; +-- total_events = latest_bytes / 48 +-- rows_per_sec = (latest_bytes - prior_bytes) / dt / 48 +``` + +The published `rows_per_sec` is therefore an average over the sampler +interval (default 5 s), not a strict 1-second window. + +## Knobs + +``` +docker compose run --rm simulator /app/simulator \ + -households 50000 \ + -devices-per-household 12 \ + -rate 5000 \ + -hz 20 \ + -writers 8 +``` + +`-rate` sets the sustained events/sec target (heartbeats + triggered). +`-hz` sets the simulator's tick rate; higher Hz = smoother bursts but +more network round-trips. `-writers` is the number of producer +goroutines that partition the device fleet and generate rows in +parallel; they hand off to a single ingestor goroutine that runs +`CopyFrom` against CedarDB. The fleet sizing knobs control how many +`households` and `devices` rows get synthesized on startup. + +## Tuning at runtime + +Most dashboard cadences and a few pipeline knobs are configurable via +`HG_*` environment variables in `docker-compose.yml`. Defaults match +the original hardcoded values, so leaving everything unset preserves +the standard demo behaviour. All durations are Go duration strings +(`200ms`, `1s`, `30s`, `1m30s`). + +### Dashboard refresh cadences (server container) + +| Env var | Default | UI region | +|---|---|---| +| `HG_SSE_INTERVAL` | `200ms` | **Live Event Stream** panel (bottom-left). Server-side SSE push rate — the browser holds one long-lived connection and the server pushes a frame at this interval. The "last frame" timestamp in the bottom-right of the footer tracks this one. | +| `HG_ALERTS_REFRESH` | `1s` | **Active Alerts** queue (top, full-width). htmx polls `/api/alerts`. | +| `HG_DRILLDOWN_REFRESH` | `2s` | **Customer Drill-Down** (bottom-middle). htmx polls `/api/drilldown`. This runs three SQL queries per tick (pick highest-severity household → fetch its plan/region header → fetch its last 20 events), so it's the biggest cost-per-tick and the most useful one to dial down if dashboard load is competing with ingest. | +| `HG_STATS_REFRESH` | `1s` | **Footer counters** *and* the **header counters** ("Active alerts N · Events ingested N"). JS polls `/api/stats`. The only env var that updates two regions at once, so slowing it down has the most visible effect. | +| `HG_STORAGE_REFRESH` | `1s` | **Storage Growth** gauge (bottom-right): the arc, the 1m/5m/15m rate table, the projected-daily figure. The cheapest poll of the bunch — it just reads `storage_samples`, a few hundred rows — so there's rarely a reason to raise this. | + +### Pipeline knobs (simulator container) + +| Env var | Default | Effect | +|---|---|---| +| `HG_STORAGE_SAMPLER_INTERVAL` | `5s` | How often the simulator writes a row into `storage_samples`. Doesn't drive any UI poll directly, but sets the minimum window over which the gauge can compute a rate — set this to `30s` and the dashboard's 1m/5m/15m rates become 30-second moving averages. Cheap to leave at `5s`. | +| `HG_INGEST_BATCH` | `10000` | Rows the ingestor accumulates before firing one `pgx.CopyFrom`. Bigger → better per-COPY amortization; smaller → lower latency to the live event stream. Watch the heartbeat `lastCopy` and `delta` numbers when tuning. | +| `HG_INGESTORS` | `1` | Number of ingestor goroutines running `CopyFrom` in parallel. Default `1` is safe everywhere — older CedarDB rejected overlapping COPYs with `SQLSTATE 40P01`; newer versions accept concurrent COPYs, in which case `2`, `4`, `8` may give a meaningful throughput bump. If the heartbeat's `queue=CAP/CAP` ceiling drops as you raise this, the single ingestor was the bottleneck and CedarDB can absorb more parallelism. If the queue stays pegged, CedarDB is serializing the work server-side and adding more ingestors won't help. | +| `HG_RESOLVE_INTERVAL` | `2s` | How often the background alert resolver fires. | +| `HG_RESOLVE_AUTOTUNE` | `true` | When on, the resolver picks each tick's LIMIT as `max(floor, ceil(deltaFired × 1.2 + backlog × 0.01))`, capped at 100 K. `deltaFired` and `backlog` are tracked via in-process atomic counters — no `COUNT(*)` on the alerts table needed. Set to `false` to pin the limits at their floors (handy when you want to demo what happens to AGE/SLA as a backlog grows). | +| `HG_RESOLVE_LOW_LIMIT` | `2000` | **Floor** for the per-tick low-severity (1-2) resolver, which marks alerts `false_alarm`. Auto-tune can push higher; without auto-tune this is just the limit. | +| `HG_RESOLVE_HIGH_LIMIT` | `600` | **Floor** for the per-tick high-severity (3+) resolver, which marks alerts `resolved`. High-severity alerts must also have aged at least 20 seconds to be eligible, so they sit in the operator queue long enough to look like real triage work. | + +### Reading the simulator heartbeat + +Once per second the simulator logs a one-line status report you can use +to triage ingest behaviour. Tail it with `docker logs hg-simulator`: + +``` +heartbeat: queue=12/64 eventID=1483920475 delta=748520 rows/s lastCopy=12ms ago copyFails=0 +``` + +- **`queue=N/CAP`** — depth of the producer→ingestor channel. Near `CAP` means CedarDB is the bottleneck. +- **`eventID`** — monotonic atomic counter; each row generated bumps it. Doubles as a precise total-rows figure. +- **`delta=N rows/s`** — row generation rate computed from the eventID counter. +- **`lastCopy=Xms ago`** — wall time since the most recent successful `CopyFrom`. Should be under one second when ingest is healthy. +- **`copyFails=N`** — cumulative `CopyFrom` errors since startup. Non-zero means CedarDB is rejecting; the error text appears on the line above the heartbeat. + +Three patterns worth recognising: + +- `delta=0 queue=0` → producers stopped. Look for goroutine panics in the simulator log. +- `delta=N queue=CAP lastCopy growing` → CedarDB stopped accepting writes. Check `hg-cedardb` logs, disk space, and the compactor. +- `delta=N queue=CAP lastCopy<1s copyFails=0` → healthy steady-state at the write-path cap. To push past it, try larger `HG_INGEST_BATCH` and/or more `HG_INGESTORS`, or move to bigger hardware. + +The resolver emits its own status line once every 10 seconds: + +``` +resolver: drain low=42/2000 high=3187/3825 · backlog low=0 high=14 +``` + +`drain low=X/Y` is "Y rows of low-severity LIMITed, X actually resolved" — when X < Y the queue is empty for that tier; when X = Y the resolver is at the cap. `backlog` is the in-process `(fired − resolved)` estimate; if it climbs steadily, auto-tune is falling behind (rare — the controller's 1% backlog decay is normally enough to keep up). + +## Scaling up + +The defaults in `docker-compose.yml` are sized for a developer laptop +(~10 cores, 16–32 GB RAM). On real demo hardware — for example a +192-core x86 box with 384 GB RAM — there's a lot of headroom that the +laptop config simply can't use. A reasonable starting point on a box +like that: + +```yaml +simulator: + command: ["/app/simulator", + "-households=200000", + "-devices-per-household=12", + "-rate=2000000", # 2 M ev/s ≈ 96 MB/s ≈ 8 TB/day uncompressed + "-hz=20", + "-writers=64"] + environment: + HG_INGESTORS: "8" # parallel CopyFroms (requires recent CedarDB) + HG_INGEST_BATCH: "50000" # bigger batches amortise the COPY round trip + HG_STORAGE_SAMPLER_INTERVAL: "5s" + # Alert generation scales with -writers, but the resolver auto-tunes + # its LIMITs each tick from the in-process generation rate and + # backlog, so no manual sizing is needed when you bump -writers. + # The defaults stay fine. If you'd rather see the queue grow (to + # demo SLA breach), set HG_RESOLVE_AUTOTUNE=false. +``` + +The pool's `MaxConns=64` in `internal/db/db.go` will need to grow if +`-writers` × `HG_INGESTORS` plus the dashboard's concurrent reads +exceed it — roughly speaking, set it to `writers + ingestors + 16`. + +The heartbeat is your scoreboard while you push the dial up: + +- **`delta` rises and `queue` no longer pegs at CAP** as you raise + `HG_INGESTORS` → the single ingestor was the bottleneck and CedarDB + can absorb more parallel COPYs. +- **`delta` is flat regardless of `HG_INGESTORS`** → CedarDB is + serializing the work internally; the next move is `HG_INGEST_BATCH` + or CedarDB-side tuning. +- **`copyFails > 0`** → CedarDB is rejecting. The error message on the + preceding log line tells you why (most likely you've raised + `HG_INGESTORS` past what your CedarDB version allows and gone back + to the 40P01 territory). + +For the dashboard side at higher rates, keep the read cadences honest +about what the queries cost: + +```yaml +server: + environment: + HG_SSE_INTERVAL: "200ms" # joins 5 tables, kind > 0 filter + HG_ALERTS_REFRESH: "1s" # joins 4 tables on the small alerts table + HG_DRILLDOWN_REFRESH: "2s" # three queries per tick; the costliest + HG_STATS_REFRESH: "1s" # reads storage_samples only — cheap + HG_STORAGE_REFRESH: "1s" # reads storage_samples only — cheap +``` + +On a 192-core box these are easily affordable; on a laptop you'll +want to crank them up to give CedarDB more headroom for the write +path. + +### Which indexes to keep + +There's a real tension between write throughput and dashboard read +latency, but it isn't symmetric across tables: + +| Table | Indexes? | Why | +|---|---|---| +| `alerts` | **Keep them.** | The table is in the millions, never billions; index maintenance is trivial. *All four read paths* on the dashboard (queue, drilldown via household, resolver inner SELECT) need them. Without an `(status, raised_at DESC)` index the active-alerts query becomes a multi-million-row scan that the resolver fights with every 2 s, and you'll see the panel intermittently render "no active alerts" because the iteration timed out mid-scan. | +| `events` | **Optional.** Drop if you want to maximise write rate. | One index entry per inserted row at ~30 K/s is real write cost; the dashboard's events queries are all small-LIMIT scans of the recent tail and CedarDB's column store handles them respectably even without the index. The trade-off is that the SSE event-stream panel and the drill-down's per-household scan will be slower at very large table sizes — usually still tolerable. | + +If you dropped the alerts indexes during a write-throughput experiment, +put them back before treating the dashboard as canonical: + +```sql +CREATE INDEX alerts_status_raised_idx ON alerts (status, raised_at DESC); +CREATE INDEX alerts_household_raised_idx ON alerts (household_id, raised_at DESC); +``` + +### When a panel intermittently shows empty + +The dashboard handlers now log `rows.Err()` after each iteration, so a +mid-scan context cancellation no longer looks identical to an empty +result. If you see "no active alerts" in the panel, check the server +container's logs: + +``` +docker logs hg-server 2>&1 | grep "rows.Err" +``` + +A non-empty stream of `context canceled` or `deadline exceeded` lines +means the underlying SQL is taking long enough that requests are +aborting before it returns. The fix is almost always either to add the +missing index for that query or to lower the polling frequency. + diff --git a/homeguard-iot/build.sh b/homeguard-iot/build.sh new file mode 100755 index 0000000..7e63092 --- /dev/null +++ b/homeguard-iot/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +CGO_ENABLED=0 go build -o ./out/server ./cmd/server +CGO_ENABLED=0 go build -o ./out/simulator ./cmd/simulator + diff --git a/homeguard-iot/clean.sh b/homeguard-iot/clean.sh new file mode 100755 index 0000000..dd0277e --- /dev/null +++ b/homeguard-iot/clean.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +rm -f ./out/server +rm -f ./out/simulator + diff --git a/homeguard-iot/cmd/server/main.go b/homeguard-iot/cmd/server/main.go new file mode 100644 index 0000000..de2a764 --- /dev/null +++ b/homeguard-iot/cmd/server/main.go @@ -0,0 +1,52 @@ +// Command server runs the HomeGuard IoT operator dashboard. +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os/signal" + "syscall" + "time" + + "github.com/cedardb-demo/homeguard-iot/internal/db" + "github.com/cedardb-demo/homeguard-iot/internal/web" +) + +func main() { + addr := flag.String("addr", ":8080", "http listen address") + flag.Parse() + + ctx, cancel := signal.NotifyContext(context.Background(), + syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + pool, err := db.Connect(ctx) + if err != nil { + log.Fatalf("db: %v", err) + } + defer pool.Close() + + srv, err := web.NewServer(pool) + if err != nil { + log.Fatalf("server: %v", err) + } + + httpSrv := &http.Server{ + Addr: *addr, + Handler: srv.Routes(), + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + <-ctx.Done() + shutdown, c := context.WithTimeout(context.Background(), 5*time.Second) + defer c() + _ = httpSrv.Shutdown(shutdown) + }() + + log.Printf("dashboard listening on %s", *addr) + if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %v", err) + } +} diff --git a/homeguard-iot/cmd/simulator/main.go b/homeguard-iot/cmd/simulator/main.go new file mode 100644 index 0000000..ff3571b --- /dev/null +++ b/homeguard-iot/cmd/simulator/main.go @@ -0,0 +1,70 @@ +// Command simulator generates the IoT event stream and writes it to +// CedarDB. Run it once per demo session; run cmd/server in parallel for +// the dashboard. +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + + "github.com/cedardb-demo/homeguard-iot/internal/db" + "github.com/cedardb-demo/homeguard-iot/internal/sim" +) + +func main() { + householdCount := flag.Int("households", 30000, "synthesized household fleet size") + devicesPerHH := flag.Int("devices-per-household", 10, "avg devices per household") + tickHz := flag.Int("hz", 10, "simulator tick rate (events flush each tick)") + rate := flag.Int("rate", 2500, "sustained events/sec target (heartbeats + triggered)") + writers := flag.Int("writers", 8, + "number of parallel writer goroutines; each owns a slice of the device fleet "+ + "and its own pgx conn. Bump alongside -rate to scale ingest toward 3 TB/day.") + resetSchema := flag.Bool("reset-schema", false, + "drop & recreate all tables on startup — destroys all data") + flag.Parse() + + ctx, cancel := signal.NotifyContext(context.Background(), + os.Interrupt, syscall.SIGTERM) + defer cancel() + + pool, err := db.Connect(ctx) + if err != nil { + log.Fatalf("db: %v", err) + } + defer pool.Close() + log.Printf("connected to CedarDB") + + if *resetSchema { + log.Printf("-reset-schema set: wiping all data and re-creating tables") + if err := db.ResetSchema(ctx, pool); err != nil { + log.Fatalf("reset schema: %v", err) + } + } else { + present, err := db.SchemaPresent(ctx, pool) + switch { + case err != nil: + log.Printf("schema-presence probe failed (continuing anyway): %v", err) + case present: + log.Printf("schema-presence probe: households table already present") + default: + log.Printf("schema-presence probe: households table missing") + if err := db.ApplySchema(ctx, pool); err != nil { + log.Fatalf("apply schema: %v", err) + } + } + } + + s, err := sim.New(ctx, pool, *householdCount, *devicesPerHH, *tickHz, *rate, *writers) + if err != nil { + log.Fatalf("simulator setup: %v", err) + } + + if err := s.Run(ctx); err != nil && ctx.Err() == nil { + log.Fatalf("simulator run: %v", err) + } + log.Printf("simulator shut down cleanly") +} diff --git a/homeguard-iot/db_error.txt b/homeguard-iot/db_error.txt new file mode 100644 index 0000000..ee2a2a2 --- /dev/null +++ b/homeguard-iot/db_error.txt @@ -0,0 +1,25 @@ +hg-cedardb | 2026-05-19 19:58:46.272801386 UTC DEBUG1: connection 1084378400 terminated +hg-simulator | 2026/05/19 19:58:48 simulator running: tickHz=10 target=2500 ev/s per-tick=250 fleet=285028 devices +hg-simulator | 2026/05/19 19:58:49 len(rows): 254 +hg-cedardb | 2026-05-19 19:58:49.038975971 UTC ERROR: unable to cast from void to bigint +hg-cedardb | Input data does not match the expected type or input format. Docs: https://cedardb.com/docs/references/datatypes/ +hg-cedardb | 2026-05-19 19:58:49.039580971 UTC ERROR: invalid message in simple query mode +hg-cedardb | 2026-05-19 19:58:49.039624013 UTC ERROR: invalid message in simple query mode +hg-simulator | 2026/05/19 19:58:49 copy events: ERROR: unable to cast from void to bigint (SQLSTATE 42804) +hg-simulator | 2026/05/19 19:58:49 insert alert: ERROR: invalid message in simple query mode (SQLSTATE 08P01) +hg-simulator | 2026/05/19 19:58:49 insert alert: ERROR: invalid message in simple query mode (SQLSTATE 08P01) +hg-simulator | 2026/05/19 19:58:49 len(rows): 256 +hg-cedardb | 2026-05-19 19:58:49.149095304 UTC ERROR: unable to cast from void to bigint +hg-cedardb | Input data does not match the expected type or input format. Docs: https://cedardb.com/docs/references/datatypes/ +hg-simulator | panic: runtime error: index out of range [0] with length 0 +hg-simulator | +hg-simulator | goroutine 441 [running]: +hg-simulator | github.com/jackc/pgx/v5.(*copyFrom).buildCopyBuf(0x40002030e0, {0x400009c400?, 0x0?, 0x0?}, 0x400243e280) +hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:235 +0x2f4 +hg-simulator | github.com/jackc/pgx/v5.(*copyFrom).run.func1() +hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:177 +0x1c4 +hg-simulator | created by github.com/jackc/pgx/v5.(*copyFrom).run in goroutine 1 +hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:164 +0x3c0 +hg-cedardb | 2026-05-19 19:58:49.158704471 UTC LOG: recv: Connection reset by peer +hg-cedardb | 2026-05-19 19:58:49.158726721 UTC ERROR: unable to read from client + diff --git a/homeguard-iot/docker-compose.yml b/homeguard-iot/docker-compose.yml new file mode 100644 index 0000000..9763aa2 --- /dev/null +++ b/homeguard-iot/docker-compose.yml @@ -0,0 +1,106 @@ +# Brings up CedarDB alongside the Go simulator and operator dashboard. +# +# IMPORTANT: the CedarDB image name and env vars below mirror the Postgres +# image conventions. If your CedarDB image differs, adjust `image:` and the +# DATABASE_URL accordingly — the rest of the demo is pure Postgres wire +# protocol and should not need to change. + +services: + cedardb: + image: cedardb/cedardb:latest + container_name: hg-cedardb + ports: + - "5432:5432" + environment: + CEDAR_PASSWORD: postgres + VERBOSITY: DEBUG1 + LICENSE_KEY: + # Schema bootstrap is handled by the simulator (it embeds schema.sql via + # //go:embed and applies it on first run), so we do not mount any init + # scripts here. + volumes: + - cedar-data:/var/lib/cedardb + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d homeguard || exit 1"] + interval: 3s + timeout: 2s + retries: 30 + + simulator: + build: . + container_name: hg-simulator + #command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=1750000", "-hz=20", "-writers=32"] + #command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=500000", "-hz=20", "-writers=16"] + command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=600000", "-hz=20", "-writers=16"] + #command: ["/app/simulator", "-households=30000", "-devices-per-household=10", "-rate=2500", "-hz=10", "-writers=4"] + + depends_on: + cedardb: + condition: service_healthy + environment: + DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require" + + # How often the storage sampler writes a row into storage_samples. + # Lower values → fresher gauge, more INSERTs into storage_samples. + # Value is any Go duration string (e.g. "5s", "30s", "1m"). + HG_STORAGE_SAMPLER_INTERVAL: "5s" + + # How many rows the ingestor accumulates before firing one + # pgx.CopyFrom. Bigger → better per-COPY amortization and + # (sometimes) higher throughput; smaller → lower end-to-end + # latency on the live event stream. Watch the heartbeat + # `lastCopy` and `delta` numbers when tuning. + HG_INGEST_BATCH: "25000" + + # How many ingestor goroutines run CopyFrom in parallel. Default + # 1 is safe everywhere. Older CedarDB rejected overlapping COPYs + # with SQLSTATE 40P01; newer versions accept concurrent COPYs, + # in which case cranking this up (try 2, 4, 8) may give a + # meaningful throughput bump. If the heartbeat's queue=64/64 + # ceiling disappears as you raise this, the previous bottleneck + # was the single ingestor (and CedarDB can take more); if queue + # stays pegged, CedarDB is serializing the work internally and + # adding ingestors won't help. + HG_INGESTORS: "4" + + # Alert resolver. The background resolveAlertsLoop picks up the + # oldest active alerts every HG_RESOLVE_INTERVAL and marks them + # resolved/false_alarm. + # + # By default the resolver auto-tunes its per-tick LIMITs from + # the in-process alert generation rate and backlog (no DB-side + # COUNT(*) required), so the queue stays bounded as -writers + # and -rate scale. The two LIMIT vars are now FLOORS — auto-tune + # can push higher but never below them. Set HG_RESOLVE_AUTOTUNE + # to false to disable auto-tune (useful if you want to demo what + # happens to AGE/SLA when the queue grows on purpose). + # HG_RESOLVE_INTERVAL: "2s" + # HG_RESOLVE_LOW_LIMIT: "2000" # floor for severity 1-2 → false_alarm + # HG_RESOLVE_HIGH_LIMIT: "600" # floor for severity 3+ → resolved + # HG_RESOLVE_AUTOTUNE: "true" # auto-grow LIMITs to match gen+backlog + restart: on-failure + + server: + build: . + container_name: hg-server + command: ["/app/server", "-addr=:8080"] + depends_on: + cedardb: + condition: service_healthy + environment: + DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require" + # Dashboard polling cadences — all Go duration strings. Crank up + # when you want to ease the read load CedarDB has to serve next to + # the simulator's writes. + HG_SSE_INTERVAL: "1s" # live event stream push rate + HG_ALERTS_REFRESH: "5s" # active-alerts queue panel + HG_DRILLDOWN_REFRESH: "5s" # customer drill-down panel + HG_STATS_REFRESH: "1s" # footer ingest counters + HG_STORAGE_REFRESH: "1s" # storage growth gauge + ports: + - "8080:8080" + restart: on-failure + +volumes: + cedar-data: + diff --git a/homeguard-iot/docker_compose_run.sh b/homeguard-iot/docker_compose_run.sh new file mode 100755 index 0000000..9ed1daa --- /dev/null +++ b/homeguard-iot/docker_compose_run.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +docker compose up --build + diff --git a/homeguard-iot/docker_rm_images.sh b/homeguard-iot/docker_rm_images.sh new file mode 100755 index 0000000..84acf4a --- /dev/null +++ b/homeguard-iot/docker_rm_images.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +docker image rm homeguard-iot-server:latest homeguard-iot-simulator:latest + diff --git a/homeguard-iot/docker_rm_volume.sh b/homeguard-iot/docker_rm_volume.sh new file mode 100755 index 0000000..f045902 --- /dev/null +++ b/homeguard-iot/docker_rm_volume.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +docker compose down -v + diff --git a/homeguard-iot/go.mod b/homeguard-iot/go.mod new file mode 100644 index 0000000..e29acf4 --- /dev/null +++ b/homeguard-iot/go.mod @@ -0,0 +1,14 @@ +module github.com/cedardb-demo/homeguard-iot + +go 1.22 + +require github.com/jackc/pgx/v5 v5.6.0 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.18.0 // indirect +) diff --git a/homeguard-iot/go.sum b/homeguard-iot/go.sum new file mode 100644 index 0000000..b9bb4b2 --- /dev/null +++ b/homeguard-iot/go.sum @@ -0,0 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/homeguard-iot/internal/db/db.go b/homeguard-iot/internal/db/db.go new file mode 100644 index 0000000..00dbf99 --- /dev/null +++ b/homeguard-iot/internal/db/db.go @@ -0,0 +1,62 @@ +// Package db is a thin wrapper around the pgx connection pool to CedarDB. +// +// CedarDB speaks the Postgres wire protocol, so pgx works unmodified — only +// the DSN is supplied differently. The pool is sized generously so the +// simulator's writer goroutines (each holding a conn for the duration of +// a CopyFrom) don't compete with the dashboard's concurrent reads or with +// the alert resolution / armed-state shuffle background jobs. +package db + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// Connect dials CedarDB using DATABASE_URL from the environment. +// +// postgres://USER:PASS@HOST:5432/DB?sslmode=disable +func Connect(ctx context.Context) (*pgxpool.Pool, error) { + dsn := os.Getenv("DATABASE_URL") + if dsn == "" { + return nil, fmt.Errorf("DATABASE_URL is not set") + } + + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse DATABASE_URL: %w", err) + } + // Sized for the high-rate demo: up to ~16 simulator writer goroutines + // each holding a long-lived pgx conn during CopyFrom, plus the alert + // resolver, the armed-state shuffler, the inline alert inserters, and + // the dashboard's concurrent read paths. + cfg.MaxConns = 64 + cfg.MinConns = 4 + cfg.MaxConnLifetime = 30 * time.Minute + + // CedarDB doesn't currently support the Postgres extended query protocol + // (Parse / Bind / Execute), and signals SQLSTATE 08P01 + // "invalid message in simple query mode" when pgx tries to use it. + // Forcing simple-protocol mode makes pgx inline parameters as text + // literals and send each query as a single `Q` message — the only + // protocol path CedarDB accepts. COPY is unaffected (it uses its own + // dedicated protocol regardless of this setting). + cfg.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("dial CedarDB: %w", err) + } + + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := pool.Ping(pingCtx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping CedarDB: %w", err) + } + return pool, nil +} diff --git a/homeguard-iot/internal/db/schema.go b/homeguard-iot/internal/db/schema.go new file mode 100644 index 0000000..d060c10 --- /dev/null +++ b/homeguard-iot/internal/db/schema.go @@ -0,0 +1,113 @@ +package db + +import ( + "context" + _ "embed" + "fmt" + "log" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// SchemaSQL is the canonical schema, embedded at build time. Edit +// schema.sql and `go build` re-bakes it. +// +//go:embed schema.sql +var SchemaSQL string + +// SchemaPresent reports whether the `households` table exists. Used for a +// diagnostic log line on startup — we always call ApplySchema anyway since +// the file is idempotent (CREATE TABLE IF NOT EXISTS). +func SchemaPresent(ctx context.Context, pool *pgxpool.Pool) (bool, error) { + var present bool + if err := pool.QueryRow(ctx, ` + SELECT (SELECT COUNT(*) FROM information_schema.tables + WHERE table_name = 'households') = 1 + `).Scan(&present); err != nil { + return false, fmt.Errorf("schema presence check: %w", err) + } + return present, nil +} + +// ApplySchema runs the embedded schema.sql one statement at a time using +// the Postgres simple-query protocol (pgx.QueryExecModeSimpleProtocol). +// CedarDB doesn't accept DDL through the extended Parse/Bind/Execute path +// the same way vanilla Postgres does, so prepared-statement mode silently +// fails to apply CREATE TABLE; simple-protocol bypasses that. +func ApplySchema(ctx context.Context, pool *pgxpool.Pool) error { + statements := splitSQLStatements(SchemaSQL) + log.Printf("applying schema: %d statements", len(statements)) + for i, stmt := range statements { + if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf( + "apply schema (statement %d of %d failed): %w\n--- failing statement ---\n%s\n", + i+1, len(statements), err, stmt, + ) + } + log.Printf(" [%2d/%d] %s — ok", i+1, len(statements), firstLine(stmt)) + } + log.Printf("schema applied") + return nil +} + +// ResetSchema drops everything and re-applies — destructive. Wired behind +// the simulator's -reset-schema flag. +func ResetSchema(ctx context.Context, pool *pgxpool.Pool) error { + drops := []string{ + "DROP TABLE IF EXISTS storage_samples CASCADE", + "DROP TABLE IF EXISTS alerts CASCADE", + "DROP TABLE IF EXISTS events CASCADE", + "DROP TABLE IF EXISTS devices CASCADE", + "DROP TABLE IF EXISTS households CASCADE", + "DROP TABLE IF EXISTS device_types CASCADE", + "DROP TABLE IF EXISTS regions CASCADE", + "DROP TABLE IF EXISTS plans CASCADE", + } + log.Printf("resetting schema: dropping %d tables", len(drops)) + for i, stmt := range drops { + if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf("drop %d: %w", i, err) + } + log.Printf(" [%d/%d] %s — ok", i+1, len(drops), stmt) + } + return ApplySchema(ctx, pool) +} + +// splitSQLStatements strips `--` line comments and splits the script on `;`. +// Sufficient for our DDL (no string literals or function bodies with +// embedded semicolons). +func splitSQLStatements(sql string) []string { + var clean strings.Builder + clean.Grow(len(sql)) + for _, line := range strings.Split(sql, "\n") { + if idx := strings.Index(line, "--"); idx >= 0 { + line = line[:idx] + } + clean.WriteString(line) + clean.WriteByte('\n') + } + out := make([]string, 0, 16) + for _, raw := range strings.Split(clean.String(), ";") { + stmt := strings.TrimSpace(raw) + if stmt != "" { + out = append(out, stmt) + } + } + return out +} + +func firstLine(stmt string) string { + for _, line := range strings.Split(stmt, "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + if len(line) > 70 { + return line[:67] + "..." + } + return line + } + return "(empty)" +} diff --git a/homeguard-iot/internal/db/schema.sql b/homeguard-iot/internal/db/schema.sql new file mode 100644 index 0000000..88cfece --- /dev/null +++ b/homeguard-iot/internal/db/schema.sql @@ -0,0 +1,106 @@ +-- HomeGuard IoT demo schema for CedarDB (Postgres dialect). +-- +-- The story: a home-security operator running ~3-4 TB/day of incoming IoT +-- events from millions of devices. Currently they pipe everything into +-- BigQuery for offline analytics and run a parallel real-time stack +-- (Pub/Sub + Dataflow + a KV store) for the monitoring center. +-- +-- The CedarDB pitch is "collapse those two stacks: same table, OLTP-style +-- writes and OLAP-style reads, no replication lag, real joins across the +-- normalized dimension model." +-- +-- Tables: +-- plans - subscription tiers; SLA in seconds per tier +-- regions - service regions / dispatch centers +-- device_types - sensor catalog (motion, door, smoke, etc.) +-- households -- customers; FK to plan + region +-- devices -- sensors; FK to household + device_type +-- events - HOT TABLE; up to ~500K rows/sec in the high-rate demo +-- alerts - rules-derived alerts; smaller volume; the operator queue +-- +-- This file is idempotent (CREATE TABLE IF NOT EXISTS on everything) so the +-- simulator can safely apply it on every cold start. For destructive reset +-- pass -reset-schema to the simulator. + +CREATE TABLE IF NOT EXISTS plans ( + plan_id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + monthly_price_usd NUMERIC(8, 2) NOT NULL, + sla_seconds INTEGER NOT NULL -- monitoring response SLA +); + +CREATE TABLE IF NOT EXISTS regions ( + region_id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + dispatch_center TEXT NOT NULL, + timezone TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS device_types ( + device_type_id INTEGER PRIMARY KEY, + code TEXT NOT NULL, -- MOTION, DOOR, WINDOW, GLASS_BREAK, SMOKE, CO, WATER, TEMP, DOORBELL, KEYPAD + name TEXT NOT NULL, + default_severity SMALLINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS households ( + household_id BIGINT PRIMARY KEY, + plan_id INTEGER NOT NULL, + region_id INTEGER NOT NULL, + address_hash TEXT NOT NULL, -- hashed for privacy + enrolled_at TIMESTAMPTZ NOT NULL DEFAULT now(), + armed BOOLEAN NOT NULL DEFAULT false -- current armed state +); + +CREATE TABLE IF NOT EXISTS devices ( + device_id BIGINT PRIMARY KEY, + household_id BIGINT NOT NULL, + device_type_id INTEGER NOT NULL, + location TEXT NOT NULL, -- 'front_door', 'kitchen', 'master_bedroom', ... + installed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_battery_pct SMALLINT +); + +-- Hot table. ~500K rows/sec in the high-rate demo. Billions of rows over +-- time. event_id is plain BIGINT (not BIGSERIAL) — the simulator generates +-- ids from a client-side atomic counter so it can use the binary COPY path, +-- which CedarDB rejects for BIGSERIAL defaults ("unable to cast from void +-- to bigint"). Multiple writer goroutines share the same id space via the +-- atomic counter. +CREATE TABLE IF NOT EXISTS events ( + event_id BIGINT NOT NULL, + device_id BIGINT NOT NULL, + household_id BIGINT NOT NULL, -- denormalized to skip a join on the hot path + ts TIMESTAMPTZ NOT NULL DEFAULT now(), + kind SMALLINT NOT NULL, -- 0=heartbeat, 1=triggered, 2=battery_low, 3=offline, 4=tamper + severity SMALLINT NOT NULL, -- 0=normal, 1..5 escalating + value DOUBLE PRECISION, -- sensor reading (temp °C, motion confidence, etc.) + battery_pct SMALLINT, + rssi_dbm SMALLINT -- wireless signal strength +); + +CREATE TABLE IF NOT EXISTS alerts ( + alert_id BIGSERIAL PRIMARY KEY, + household_id BIGINT NOT NULL, + triggered_event_id BIGINT NOT NULL, + raised_at TIMESTAMPTZ NOT NULL DEFAULT now(), + severity SMALLINT NOT NULL, + status TEXT NOT NULL, -- 'active', 'dispatched', 'resolved', 'false_alarm' + detail TEXT, + resolved_at TIMESTAMPTZ, + resolution_ms INTEGER +); + +-- Periodic storage-growth samples: one row per ~5s, sourced from +-- CedarDB's cedardb_compression_info system view. The dashboard's +-- /api/storage endpoint derives 1m/5m/15m ingest rates from this table. +-- sampled_at uses now()'s microsecond precision so it is unique at the +-- sampler's 5-second cadence. +CREATE TABLE IF NOT EXISTS storage_samples ( + sampled_at TIMESTAMPTZ PRIMARY KEY, + uncompressed_bytes BIGINT NOT NULL +); + +CREATE INDEX alerts_status_raised_idx ON alerts (status, raised_at DESC); +CREATE INDEX alerts_household_raised_idx ON alerts (household_id, raised_at DESC); + diff --git a/homeguard-iot/internal/sim/catalog.go b/homeguard-iot/internal/sim/catalog.go new file mode 100644 index 0000000..6881879 --- /dev/null +++ b/homeguard-iot/internal/sim/catalog.go @@ -0,0 +1,75 @@ +package sim + +// Static catalog data: plans, regions, device types. These get upserted on +// simulator startup and rarely change. Keeping them in code (vs a seed file) +// means the demo is self-contained and reproducible — no external CSVs to +// keep in sync. + +// Plan describes one subscription tier with its monitoring SLA. +type Plan struct { + ID int + Name string + PriceUSD float64 + SLASeconds int +} + +var Plans = []Plan{ + {ID: 1, Name: "Basic", PriceUSD: 19.99, SLASeconds: 300}, // 5 min + {ID: 2, Name: "Plus", PriceUSD: 39.99, SLASeconds: 120}, // 2 min + {ID: 3, Name: "Premium", PriceUSD: 59.99, SLASeconds: 60}, // 1 min + {ID: 4, Name: "Concierge", PriceUSD: 99.99, SLASeconds: 30}, // 30 s +} + +// Region groups households for dispatch + timezone purposes. +type Region struct { + ID int + Name string + DispatchCenter string + Timezone string +} + +var Regions = []Region{ + {1, "Northeast", "Boston DC", "America/New_York"}, + {2, "Mid-Atlantic", "Newark DC", "America/New_York"}, + {3, "Southeast", "Atlanta DC", "America/New_York"}, + {4, "Midwest", "Chicago DC", "America/Chicago"}, + {5, "South-Central", "Dallas DC", "America/Chicago"}, + {6, "Mountain", "Denver DC", "America/Denver"}, + {7, "Pacific", "Phoenix DC", "America/Phoenix"}, + {8, "Pacific-NW", "Seattle DC", "America/Los_Angeles"}, + {9, "California", "San Jose DC", "America/Los_Angeles"}, + {10, "Canada-East", "Toronto DC", "America/Toronto"}, +} + +// DeviceType is the sensor catalog. `DefaultSeverity` is the baseline +// alert severity if a device of this type fires a triggered event with the +// household armed — alert rules can promote or demote per-event. +type DeviceType struct { + ID int + Code string + Name string + DefaultSeverity int +} + +var DeviceTypes = []DeviceType{ + {ID: 1, Code: "MOTION", Name: "Motion sensor", DefaultSeverity: 3}, + {ID: 2, Code: "DOOR", Name: "Door contact", DefaultSeverity: 4}, + {ID: 3, Code: "WINDOW", Name: "Window contact", DefaultSeverity: 4}, + {ID: 4, Code: "GLASS_BREAK", Name: "Glass-break detector", DefaultSeverity: 4}, + {ID: 5, Code: "SMOKE", Name: "Smoke detector", DefaultSeverity: 5}, + {ID: 6, Code: "CO", Name: "Carbon monoxide detector", DefaultSeverity: 5}, + {ID: 7, Code: "WATER", Name: "Water leak sensor", DefaultSeverity: 3}, + {ID: 8, Code: "TEMP", Name: "Temperature sensor", DefaultSeverity: 1}, + {ID: 9, Code: "DOORBELL", Name: "Smart doorbell", DefaultSeverity: 1}, + {ID: 10, Code: "KEYPAD", Name: "Entry keypad", DefaultSeverity: 1}, +} + +// Locations is the pool of in-home placements used when allocating devices. +// Not every device type lives in every location, but the simulator picks +// per-type plausibly (see allocateDevices in simulator.go). +var Locations = []string{ + "front_door", "back_door", "garage_door", "side_door", + "living_room", "kitchen", "dining_room", + "master_bedroom", "bedroom_2", "bedroom_3", + "basement", "attic", "utility_room", "hallway", +} diff --git a/homeguard-iot/internal/sim/rules.go b/homeguard-iot/internal/sim/rules.go new file mode 100644 index 0000000..cfe6e85 --- /dev/null +++ b/homeguard-iot/internal/sim/rules.go @@ -0,0 +1,61 @@ +package sim + +// Alert rules: deterministically promote certain triggered events into +// rows in the alerts table. The mapping is intentionally simple so the +// query that joins alerts × households × plans × device_types tells a +// readable story on the dashboard. +// +// kind values mirror the SMALLINT column in events: +// +// 0 = heartbeat (never alerts) +// 1 = triggered (the alerter; consult device_type + armed state) +// 2 = battery_low (info-level alert if severity >= 4 plan SLA) +// 3 = offline (skipped for now; could become an alert if device +// was previously online for a long time) +// 4 = tamper (always alerts at severity 4) + +// AlertDecision says whether to raise an alert for a given event, and at +// what severity. nil = don't raise. +type AlertDecision struct { + Severity int + Detail string +} + +// EvaluateAlert applies the demo's alert rules to a candidate event. +// Inputs are the device type code (e.g. "SMOKE"), event kind, and whether +// the household was armed at the moment the event fired. +func EvaluateAlert(deviceCode string, kind int, armed bool) *AlertDecision { + switch kind { + case 4: // tamper — always alert, anywhere + return &AlertDecision{Severity: 4, Detail: "Device tamper detected"} + case 1: // triggered — depends on device type and armed state + switch deviceCode { + case "SMOKE": + return &AlertDecision{Severity: 5, Detail: "Smoke detected"} + case "CO": + return &AlertDecision{Severity: 5, Detail: "Carbon monoxide detected"} + case "GLASS_BREAK": + return &AlertDecision{Severity: 4, Detail: "Glass break detected"} + case "WATER": + return &AlertDecision{Severity: 3, Detail: "Water leak detected"} + case "DOOR", "WINDOW": + if armed { + return &AlertDecision{Severity: 4, Detail: deviceCode + " opened while armed"} + } + // Door/window events with the system disarmed are routine. + return nil + case "MOTION": + if armed { + return &AlertDecision{Severity: 3, Detail: "Motion detected while armed"} + } + return nil + case "DOORBELL": + // Informational only. + return nil + default: + return nil + } + default: + return nil + } +} diff --git a/homeguard-iot/internal/sim/simulator.go b/homeguard-iot/internal/sim/simulator.go new file mode 100644 index 0000000..927c460 --- /dev/null +++ b/homeguard-iot/internal/sim/simulator.go @@ -0,0 +1,926 @@ +// Package sim generates the IoT telemetry stream and writes it to CedarDB. +// +// On startup the simulator upserts the dimension data (plans, regions, +// device_types) and synthesizes a fleet of households with ~10 devices +// each. At runtime the work splits into two layers: +// +// 1. N row-producer goroutines ("writers"). Each owns a slice of the +// device fleet, ticks at TickHz, and produces its share of the per- +// tick budget: heartbeats rotated round-robin over its partition, +// a handful of triggered events, and very occasional battery_low / +// tamper / offline events. Producers do NOT talk to the DB on the +// hot path; they push completed row batches onto eventCh. +// +// 2. One ingestor goroutine. Drains eventCh, coalesces batches up to +// ingestBatch rows (or every flushInterval, whichever comes first), +// and writes them via a single pgx.CopyFrom. Because there is only +// one CopyFrom in flight at a time, CedarDB never trips the +// "cannot start bulk operation until previous bulk operation has +// become globally visible" (SQLSTATE 40P01) error that fires when +// two writer goroutines try to COPY in parallel. +// +// All producers share an atomic event_id counter so they don't collide +// on the BIGINT primary key. (event_id is plain BIGINT rather than +// BIGSERIAL — CedarDB rejects the COPY frame when the column has a +// sequence default; client-side ids dodge the problem entirely.) +// +// Triggered events are run through the alert rules (rules.go) and matching +// alerts are inserted into the alerts table. Background goroutines resolve +// old active alerts and shuffle households' armed state so the dashboard +// stays dynamic. +package sim + +import ( + "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "log" + "math/rand" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// envDuration reads a Go duration string (e.g. "5s", "200ms", "1m30s") +// from the named environment variable, falling back to the supplied +// default if the variable is unset or unparseable. Used to externalise +// polling intervals so they can be tuned from docker-compose without a +// rebuild. +func envDuration(name string, fallback time.Duration) time.Duration { + v := os.Getenv(name) + if v == "" { + return fallback + } + d, err := time.ParseDuration(v) + if err != nil { + log.Printf("envDuration: invalid %s=%q (%v); using default %s", name, v, err, fallback) + return fallback + } + return d +} + +// envInt reads a positive integer from the named environment variable, +// falling back to the default if unset, unparseable, or <= 0. +func envInt(name string, fallback int) int { + v := os.Getenv(name) + if v == "" { + return fallback + } + n, err := strconv.Atoi(v) + if err != nil || n <= 0 { + log.Printf("envInt: invalid %s=%q; using default %d", name, v, fallback) + return fallback + } + return n +} + +// envBool reads a boolean (strconv.ParseBool grammar: 1/t/T/TRUE/true/True +// or 0/f/F/FALSE/false/False) from the named environment variable, +// falling back to the supplied default if unset or unparseable. +func envBool(name string, fallback bool) bool { + v := os.Getenv(name) + if v == "" { + return fallback + } + b, err := strconv.ParseBool(v) + if err != nil { + log.Printf("envBool: invalid %s=%q; using default %v", name, v, fallback) + return fallback + } + return b +} + +// Tuning constants for the fan-in COPY drainer. +const ( + // ingestBatch is the row count at which the ingestor flushes a COPY. + // Larger = better amortization of CopyFrom overhead; smaller = lower + // latency and tighter alert-to-event ordering. + ingestBatch = 10000 + + // flushInterval caps how long pending rows can sit before a partial + // COPY is forced. Keeps the dashboard's live event stream fresh even + // when -rate is well below ingestBatch/flushInterval. + flushInterval = 50 * time.Millisecond + + // eventChCapacity holds completed row batches awaiting ingest. Sized + // for ~2 batches per writer at the highest -writers we'd expect, so + // producers rarely block on send. + eventChCapacity = 64 + + // Auto-tune knobs for the alert resolver. The per-tick LIMIT is + // max(env_floor, deltaFired * autoTuneHeadroom + backlog * backlogDecay) + // then capped at maxAutoLimit. + autoTuneHeadroom = 1.2 // drain 20% faster than current generation + backlogDecay = 0.01 // chip 1% off the standing backlog per tick + maxAutoLimit = 100000 // ceiling per tick so a runaway UPDATE can't bury CedarDB +) + +// Device is one physical sensor placed in a household. +type Device struct { + ID int64 + Household int64 + TypeID int + Code string // device type code; cached so we don't re-join per tick + Location string + BatteryPct int +} + +// Household is one customer. +type Household struct { + ID int64 + PlanID int + RegionID int + AddressHash string + Armed bool +} + +// Simulator state. Read-only on the hot path after New() returns: writers +// hold pointers in but don't mutate any field except via eventID's atomic +// counter and the per-device BatteryPct (writers partition the device +// slice so no two writers ever touch the same Device). +// +// One exception: Household.Armed is read by writers (when evaluating +// alert rules on triggered events) and written by shuffleArmedLoop in a +// separate goroutine. armedMu protects that access — writers take it as +// a reader, the shuffler as a writer. +// +// eventCh is the producer→ingestor handoff. Writers push completed row +// batches onto it; one ingestor goroutine drains it and runs CopyFrom. +type Simulator struct { + Pool *pgxpool.Pool + Households []Household + Devices []Device + TickHz int + TargetRate int // events per second total target + Writers int // number of writer goroutines + eventID atomic.Int64 // shared id generator; seeded in Run() + armedMu sync.RWMutex // guards Household.Armed reads/writes + eventCh chan [][]any // producer→ingestor batch queue + lastCopyUnixNano atomic.Int64 // UnixNano of last successful CopyFrom (0 if none yet) + copyFailures atomic.Int64 // total CopyFrom errors since startup + + // Resolver instrumentation. fireAlert bumps the appropriate fired + // counter; the resolver bumps the resolved counters with the UPDATE's + // CommandTag.RowsAffected. (fired - resolved) ≈ backlog in CedarDB, + // computed in-process so we don't need a COUNT(*) on the alerts table. + alertsFiredLow atomic.Int64 + alertsFiredHigh atomic.Int64 + alertsResolvedLow atomic.Int64 + alertsResolvedHigh atomic.Int64 +} + +// New builds the simulator state: upserts dimensions, synthesizes the +// household & device fleet, but does not begin the loop. Call Run to +// start ticking. +func New(ctx context.Context, pool *pgxpool.Pool, householdCount, devicesPerHousehold, tickHz, targetRate, writers int) (*Simulator, error) { + if writers < 1 { + writers = 1 + } + if err := upsertDimensions(ctx, pool); err != nil { + return nil, fmt.Errorf("upsert dimensions: %w", err) + } + + rng := rand.New(rand.NewSource(0xC1D4A12)) // deterministic fleet, demo-friendly + + hh, devs := generateFleet(rng, householdCount, devicesPerHousehold) + log.Printf("synthesized fleet: %d households · %d devices", len(hh), len(devs)) + + if err := upsertHouseholds(ctx, pool, hh); err != nil { + return nil, fmt.Errorf("upsert households: %w", err) + } + if err := upsertDevices(ctx, pool, devs); err != nil { + return nil, fmt.Errorf("upsert devices: %w", err) + } + + return &Simulator{ + Pool: pool, + Households: hh, + Devices: devs, + TickHz: tickHz, + TargetRate: targetRate, + Writers: writers, + }, nil +} + +// Run seeds the event_id counter, launches the background loops (alert +// resolution + armed-state shuffle + storage sampler), spawns the single +// ingestor goroutine that owns CopyFrom, and fans out s.Writers +// producer goroutines over partitions of the device fleet. Producers +// build row batches and push them onto s.eventCh; the ingestor drains. +func (s *Simulator) Run(ctx context.Context) error { + var maxID int64 + if err := s.Pool.QueryRow(ctx, + `SELECT COALESCE(MAX(event_id), 0) FROM events`).Scan(&maxID); err != nil { + return fmt.Errorf("seed event_id: %w", err) + } + s.eventID.Store(maxID) + log.Printf("event_id counter seeded at %d", maxID) + + s.eventCh = make(chan [][]any, eventChCapacity) + + go s.resolveAlertsLoop(ctx) + go s.shuffleArmedLoop(ctx) + go s.storageSamplerLoop(ctx) + + // Ingestor pool — N goroutines all drain eventCh and run CopyFrom in + // parallel. Default N=1 is the historical safe behaviour (CedarDB + // used to reject overlapping COPYs with SQLSTATE 40P01); newer + // CedarDB versions accept concurrent COPYs, in which case N > 1 may + // give a meaningful throughput bump. Tune with HG_INGESTORS. + numIngestors := envInt("HG_INGESTORS", 1) + var ingestorWG sync.WaitGroup + for i := 0; i < numIngestors; i++ { + ingestorWG.Add(1) + go func(id int) { + defer ingestorWG.Done() + s.ingestorLoop(ctx, id) + }(i) + } + + // Heartbeat: one line per second describing pipeline health. Lets us + // distinguish "producers stopped" (delta=0) from "ingestor stuck" + // (queue high, lastCopy ago growing) from "CedarDB rejecting" + // (failures climbing). + go s.heartbeatLoop(ctx) + + // Per-writer per-tick budget. The total target rate divides across + // writers; each writer then divides its share across ticks. + hbPerTick := s.TargetRate / s.TickHz / s.Writers + if hbPerTick < 1 { + hbPerTick = 1 + } + log.Printf("simulator running: writers=%d tickHz=%d target=%d ev/s hb/tick/writer=%d fleet=%d devices · ingestors=%d batch=%d flush=%s", + s.Writers, s.TickHz, s.TargetRate, hbPerTick, len(s.Devices), + numIngestors, ingestBatch, flushInterval) + + var wg sync.WaitGroup + for i := 0; i < s.Writers; i++ { + w := &writer{ + id: i, + sim: s, + devices: devicePartition(s.Devices, i, s.Writers), + rng: rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(i)*0xC0DE)), + hbPerTick: hbPerTick, + } + wg.Add(1) + go func() { + defer wg.Done() + w.run(ctx) + }() + } + wg.Wait() + // Producers are done; let the ingestors drain anything still pending. + // Closing the channel is broadcast to every receiver, so all N + // ingestors observe (zero, false) and exit cleanly. + close(s.eventCh) + ingestorWG.Wait() + return ctx.Err() +} + +// devicePartition returns the i-th slice of d, split into n contiguous +// chunks. The last partition absorbs any remainder. +func devicePartition(d []Device, i, n int) []Device { + chunk := len(d) / n + lo := i * chunk + hi := lo + chunk + if i == n-1 { + hi = len(d) + } + return d[lo:hi] +} + +// writer is one ingest goroutine. Owns a slice of the device fleet, +// keeps its own deviceCursor and rng, allocates fresh event_ids from +// sim.eventID, and writes each tick via pgx.CopyFrom on its own pool +// connection. Writers never share any mutable state on the hot path +// except the atomic id counter. +type writer struct { + id int + sim *Simulator + devices []Device + rng *rand.Rand + deviceCursor int + hbPerTick int +} + +func (w *writer) run(ctx context.Context) { + interval := time.Second / time.Duration(w.sim.TickHz) + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + w.emitTick(ctx) + } + } +} + +// emitTick generates one tick's worth of events from this writer's +// device partition and hands the batch off to the ingestor goroutine +// via the simulator's eventCh. No DB I/O happens here. +func (w *writer) emitTick(ctx context.Context) { + if len(w.devices) == 0 { + return + } + now := time.Now() + rows := make([][]any, 0, w.hbPerTick+8) + + // 1) Heartbeats: rotate through this writer's partition. + for i := 0; i < w.hbPerTick; i++ { + d := &w.devices[w.deviceCursor%len(w.devices)] + w.deviceCursor++ + // Battery slowly drains over the demo; clamp at 5%. + if w.rng.Intn(10000) < 3 && d.BatteryPct > 5 { + d.BatteryPct-- + } + rssi := -50 - w.rng.Intn(40) // -50 to -90 dBm + var val float64 + if d.Code == "TEMP" { + val = 18.0 + w.rng.Float64()*10 + } + rows = append(rows, w.row(d, now, 0 /*heartbeat*/, 0, val, rssi)) + } + + // 2) Triggered events: a handful per tick from random devices in + // this writer's partition. + triggers := 3 + w.rng.Intn(5) + for i := 0; i < triggers; i++ { + d := &w.devices[w.rng.Intn(len(w.devices))] + hh := &w.sim.Households[d.Household%int64(len(w.sim.Households))] + rssi := -50 - w.rng.Intn(40) + rows = append(rows, w.row(d, now, 1 /*triggered*/, int(severityFor(d.Code)), 1.0, rssi)) + w.sim.armedMu.RLock() + armed := hh.Armed + w.sim.armedMu.RUnlock() + if dec := EvaluateAlert(d.Code, 1, armed); dec != nil { + w.sim.fireAlert(ctx, d.Household, dec.Severity, dec.Detail) + } + } + + // 3) Occasional battery_low / tamper / offline. + if w.rng.Intn(100) < 5 { + d := &w.devices[w.rng.Intn(len(w.devices))] + kind := []int{2, 4, 3}[w.rng.Intn(3)] + sev := 1 + if kind == 4 { + sev = 4 + } + rows = append(rows, w.row(d, now, kind, sev, 0.0, -60-w.rng.Intn(30))) + if kind == 4 { + if dec := EvaluateAlert(d.Code, 4, false); dec != nil { + w.sim.fireAlert(ctx, d.Household, dec.Severity, dec.Detail) + } + } + } + + // 4) Hand the batch to the ingestor. The send blocks if the channel + // is full — which is the right backpressure: it means the + // ingestor (CedarDB) can't keep up, so we shouldn't generate more + // rows until it can. ctx.Done unblocks for clean shutdown. + if len(rows) == 0 { + return + } + select { + case w.sim.eventCh <- rows: + case <-ctx.Done(): + } +} + +// row builds one events row with a freshly-allocated event_id. The +// shared atomic counter means writers never collide on the BIGINT PK. +func (w *writer) row(d *Device, ts time.Time, kind, sev int, val float64, rssi int) []any { + return []any{ + w.sim.eventID.Add(1), + d.ID, d.Household, ts, + int16(kind), int16(sev), val, + int16(d.BatteryPct), int16(rssi), + } +} + +// ingestorLoop is the single CopyFrom point. Coalesces batches off of +// eventCh up to ingestBatch rows (or every flushInterval, whichever +// comes first) and runs one pgx.CopyFrom per flush. Because there's +// only ever one CopyFrom in flight, CedarDB never trips the +// "previous bulk operation must become globally visible" (SQLSTATE +// 40P01) error that fires when multiple goroutines COPY concurrently. +// +// Shutdown contract: Run() closes eventCh after all producers have +// exited, the ingestor drains everything still in flight, runs one +// final flush, and returns. +func (s *Simulator) ingestorLoop(ctx context.Context, id int) { + batchSize := envInt("HG_INGEST_BATCH", ingestBatch) + if id == 0 { + log.Printf("ingestor: batchSize=%d flushInterval=%s", batchSize, flushInterval) + } + + pending := make([][]any, 0, batchSize*2) + cols := []string{ + "event_id", "device_id", "household_id", "ts", + "kind", "severity", "value", "battery_pct", "rssi_dbm", + } + flush := func() { + if len(pending) == 0 { + return + } + if _, err := s.Pool.CopyFrom(ctx, + pgx.Identifier{"events"}, cols, pgx.CopyFromRows(pending), + ); err != nil { + s.copyFailures.Add(1) + log.Printf("ingestor %d: copy events: %v (rows=%d)", id, err, len(pending)) + } else { + s.lastCopyUnixNano.Store(time.Now().UnixNano()) + } + pending = pending[:0] + } + + flushTicker := time.NewTicker(flushInterval) + defer flushTicker.Stop() + + for { + select { + case <-ctx.Done(): + flush() + return + case batch, ok := <-s.eventCh: + if !ok { + flush() + return + } + pending = append(pending, batch...) + if len(pending) >= batchSize { + flush() + } + case <-flushTicker.C: + flush() + } + } +} + +// heartbeatLoop emits a one-line status report once a second so we can +// tell from the logs whether the pipeline is healthy or stuck, and where +// the stall is if it isn't: +// +// queue=N/CAP — eventCh depth. Near CAP means ingestor (CedarDB) is the bottleneck. +// delta=N rows/s — row generation rate observed via the atomic counter. +// lastCopy=Xs ago — wall time since the last successful CopyFrom. Should be < 1s under load. +// copyFails=N — cumulative CopyFrom errors; non-zero means CedarDB is rejecting. +// +// 0 ev/s in the dashboard footer + queue=CAP + lastCopy growing → CedarDB stopped accepting writes. +// 0 ev/s + queue=0 → producers stopped (look for goroutine panics). +func (s *Simulator) heartbeatLoop(ctx context.Context) { + var prev int64 + tick := time.NewTicker(1 * time.Second) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + cur := s.eventID.Load() + delta := cur - prev + prev = cur + ago := time.Duration(-1) + if last := s.lastCopyUnixNano.Load(); last > 0 { + ago = time.Since(time.Unix(0, last)).Round(time.Millisecond) + } + log.Printf("heartbeat: queue=%d/%d eventID=%d delta=%d rows/s lastCopy=%v ago copyFails=%d", + len(s.eventCh), eventChCapacity, cur, delta, ago, s.copyFailures.Load()) + } + } +} + +// fireAlert inserts a row into alerts. Status starts 'active'; the +// background resolveAlertsLoop will resolve it after a short delay. +// We also bump the in-process generation counter the resolver uses to +// auto-tune its drain rate — we don't wait for the DB INSERT to settle +// because the resolver's controller wants the request rate, not the +// committed rate (they differ only by transient pool / fsync latency). +func (s *Simulator) fireAlert(ctx context.Context, householdID int64, severity int, detail string) { + if severity <= 2 { + s.alertsFiredLow.Add(1) + } else { + s.alertsFiredHigh.Add(1) + } + _, err := s.Pool.Exec(ctx, ` + INSERT INTO alerts (household_id, triggered_event_id, raised_at, severity, status, detail) + VALUES ($1, 0, now(), $2, 'active', $3) + `, householdID, int16(severity), detail) + if err != nil { + log.Printf("insert alert: %v", err) + } +} + +// resolveAlertsLoop runs in the background; every HG_RESOLVE_INTERVAL it +// resolves the oldest active alerts so the operator queue doesn't grow +// without bound. The mix of severities resolved varies (high-severity +// stick around longer, low-severity get auto-cleared faster) to mimic +// real triage. +// +// When HG_RESOLVE_AUTOTUNE is on (default), the per-tick LIMITs are +// chosen at runtime as +// +// limit = max(env_floor, +// ceil(deltaFired * autoTuneHeadroom + backlog * backlogDecay)) +// +// where deltaFired is alerts inserted since the previous tick, and +// backlog ≈ alertsFired − alertsResolved tracked in two atomic counters +// per severity tier. Both inputs come from in-process state so we never +// have to COUNT(*) the alerts table to size the next tick. +// +// HG_RESOLVE_LOW_LIMIT / HG_RESOLVE_HIGH_LIMIT remain the floors — +// auto-tune can push higher but never below them. Set +// HG_RESOLVE_AUTOTUNE=false to pin the limits at the floors (useful when +// you want to demo what happens to AGE/SLA as a backlog grows). +func (s *Simulator) resolveAlertsLoop(ctx context.Context) { + interval := envDuration("HG_RESOLVE_INTERVAL", 2*time.Second) + lowFloor := envInt("HG_RESOLVE_LOW_LIMIT", 2000) + highFloor := envInt("HG_RESOLVE_HIGH_LIMIT", 600) + autoTune := envBool("HG_RESOLVE_AUTOTUNE", true) + log.Printf("alert resolver: interval=%s lowFloor=%d highFloor=%d autoTune=%v", + interval, lowFloor, highFloor, autoTune) + + var lastFiredLow, lastFiredHigh int64 + var lastLogAt time.Time + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + curFiredLow := s.alertsFiredLow.Load() + curFiredHigh := s.alertsFiredHigh.Load() + deltaLow := curFiredLow - lastFiredLow + deltaHigh := curFiredHigh - lastFiredHigh + lastFiredLow, lastFiredHigh = curFiredLow, curFiredHigh + + backlogLow := curFiredLow - s.alertsResolvedLow.Load() + backlogHigh := curFiredHigh - s.alertsResolvedHigh.Load() + if backlogLow < 0 { + backlogLow = 0 + } + if backlogHigh < 0 { + backlogHigh = 0 + } + + lowLimit, highLimit := lowFloor, highFloor + if autoTune { + autoLow := int(float64(deltaLow)*autoTuneHeadroom + float64(backlogLow)*backlogDecay) + autoHigh := int(float64(deltaHigh)*autoTuneHeadroom + float64(backlogHigh)*backlogDecay) + if autoLow > lowLimit { + lowLimit = autoLow + } + if autoHigh > highLimit { + highLimit = autoHigh + } + } + if lowLimit > maxAutoLimit { + lowLimit = maxAutoLimit + } + if highLimit > maxAutoLimit { + highLimit = maxAutoLimit + } + + // Resolve low-severity (info / sev 1-2) → false_alarm. + lowTag, _ := s.Pool.Exec(ctx, ` + UPDATE alerts + SET status = 'false_alarm', + resolved_at = now(), + resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int + WHERE alert_id IN ( + SELECT alert_id FROM alerts + WHERE status = 'active' AND severity <= 2 + ORDER BY raised_at ASC LIMIT $1 + ) + `, lowLimit) + // Dispatch then resolve a sample of higher-severity older alerts. + highTag, _ := s.Pool.Exec(ctx, ` + UPDATE alerts + SET status = 'resolved', + resolved_at = now(), + resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int + WHERE alert_id IN ( + SELECT alert_id FROM alerts + WHERE status = 'active' AND severity >= 3 + AND raised_at < now() - interval '20 seconds' + ORDER BY raised_at ASC LIMIT $1 + ) + `, highLimit) + s.alertsResolvedLow.Add(lowTag.RowsAffected()) + s.alertsResolvedHigh.Add(highTag.RowsAffected()) + + // Once every 10 s, log what we drained and how big the queue is. + if time.Since(lastLogAt) >= 10*time.Second { + log.Printf("resolver: drain low=%d/%d high=%d/%d · backlog low=%d high=%d", + lowTag.RowsAffected(), lowLimit, + highTag.RowsAffected(), highLimit, + backlogLow, backlogHigh) + lastLogAt = time.Now() + } + } + } +} + +// shuffleArmedLoop runs as its own goroutine (replaces the +// armedTimer-driven path in the old emitTick). Every 10 seconds it +// flips ~5% of households' armed state so the rules engine outputs +// stay varied over the demo. +func (s *Simulator) shuffleArmedLoop(ctx context.Context) { + rng := rand.New(rand.NewSource(time.Now().UnixNano() ^ 0xA1ED)) + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + s.shuffleArmedState(ctx, rng) + } + } +} + +func (s *Simulator) shuffleArmedState(ctx context.Context, rng *rand.Rand) { + // Flip ~5% of households' armed state in memory; push the new value + // in batches. armedMu guards the in-memory mutation against + // concurrent reads from the writer goroutines. + batch := &pgx.Batch{} + flipped := 0 + s.armedMu.Lock() + for i := range s.Households { + if rng.Intn(20) == 0 { + s.Households[i].Armed = !s.Households[i].Armed + batch.Queue(`UPDATE households SET armed = $1 WHERE household_id = $2`, + s.Households[i].Armed, s.Households[i].ID) + flipped++ + } + } + s.armedMu.Unlock() + if flipped == 0 { + return + } + if err := s.Pool.SendBatch(ctx, batch).Close(); err != nil { + log.Printf("armed-state shuffle: %v", err) + } +} + +// eventsRowBytes is the per-row uncompressed footprint of the events +// table, derived directly from the schema column widths: +// +// event_id BIGINT(8) + device_id BIGINT(8) + household_id BIGINT(8) +// + ts TIMESTAMPTZ(8) + kind SMALLINT(2) + severity SMALLINT(2) +// + value DOUBLE(8) + battery_pct SMALLINT(2) + rssi_dbm SMALLINT(2) +// = 48 bytes +// +// We use this rather than CedarDB's cedardb_compression_info view +// because the view only updates once written data lands in column-store +// blocks — at high ingest rates that lags reality by many seconds. +// COUNT(*) on events tracks live row counts in real time. +const eventsRowBytes = 48 + +// storageSamplerLoop records a (now(), 48 * eventID) sample into +// storage_samples every 5 seconds. The dashboard's /api/storage endpoint +// derives 1m/5m/15m ingest rates from this table. +// +// We compute bytes from the in-process atomic counter (s.eventID, which +// is seeded from MAX(event_id) at startup and incremented per generated +// row) rather than running COUNT(*) on the events table. Two reasons: +// +// 1. Cost. At demo scale the events table can reach billions of rows +// in well under an hour. COUNT(*) becomes a multi-second full scan +// and may hold a snapshot that pressures the write path — a likely +// cause of the 30-minute ingest stall we saw. +// 2. Currency. The atomic counter is updated at the moment a row is +// generated, ahead of the CopyFrom that actually persists it. The +// gauge can therefore appear up to (channel queue + in-flight +// batch) rows ahead of what's truly in the table — at most ~160 K +// rows ≈ 8 MB at the demo's default backpressure ceiling, which is +// well under one second of headway and a fine trade-off given the +// scan cost we avoid. +// +// The number is "uncompressed-bytes-equivalent" — what the data would +// weigh as a flat file. CedarDB's actual on-disk footprint after column +// encoding (truncate, frame-of-reference, dictionary, etc.) is 5–10× +// smaller. +func (s *Simulator) storageSamplerLoop(ctx context.Context) { + interval := envDuration("HG_STORAGE_SAMPLER_INTERVAL", 5*time.Second) + log.Printf("storage sampler: interval=%s", interval) + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + bytes := int64(eventsRowBytes) * s.eventID.Load() + if _, err := s.Pool.Exec(ctx, ` + INSERT INTO storage_samples (sampled_at, uncompressed_bytes) + VALUES (now(), $1) + `, bytes); err != nil { + log.Printf("storage sampler: insert: %v", err) + } + } + } +} + +// severityFor maps a device type code to its baseline severity for +// triggered events. Used when writing the event row itself; the alert +// decision (rules.go) does the more nuanced household-armed logic. +func severityFor(code string) int16 { + for _, dt := range DeviceTypes { + if dt.Code == code { + return int16(dt.DefaultSeverity) + } + } + return 1 +} + +// --- fleet synthesis ------------------------------------------------------ + +func generateFleet(rng *rand.Rand, householdCount, devicesPerHousehold int) ([]Household, []Device) { + hh := make([]Household, 0, householdCount) + devs := make([]Device, 0, householdCount*devicesPerHousehold) + + for i := 0; i < householdCount; i++ { + id := int64(1000000 + i) // start at a friendly-looking ID + plan := pickWeighted(rng, []int{40, 35, 18, 7}) // basic-heavy + region := rng.Intn(len(Regions)) + 1 + armed := rng.Intn(100) < 35 // ~35% armed at any time + hh = append(hh, Household{ + ID: id, + PlanID: plan + 1, + RegionID: region, + AddressHash: hashAddr(id), + Armed: armed, + }) + + // Each household gets between (devicesPerHousehold/2) and + // (devicesPerHousehold*3/2) devices, distributed across plausible + // types and locations. + n := devicesPerHousehold/2 + rng.Intn(devicesPerHousehold) + for j := 0; j < n; j++ { + dt := pickDeviceType(rng) + loc := pickLocation(rng, dt.Code) + devs = append(devs, Device{ + ID: int64(len(devs)) + 100000, + Household: id, + TypeID: dt.ID, + Code: dt.Code, + Location: loc, + BatteryPct: 70 + rng.Intn(30), // 70..99 + }) + } + } + return hh, devs +} + +func pickWeighted(rng *rand.Rand, weights []int) int { + total := 0 + for _, w := range weights { + total += w + } + r := rng.Intn(total) + for i, w := range weights { + if r < w { + return i + } + r -= w + } + return len(weights) - 1 +} + +func pickDeviceType(rng *rand.Rand) DeviceType { + // Reasonable distribution: motion + door/window common; smoke/CO/water + // less common but always at least one. Doorbell + keypad moderate. + weights := []int{30, 22, 15, 5, 6, 4, 6, 4, 5, 3} + idx := pickWeighted(rng, weights) + return DeviceTypes[idx] +} + +func pickLocation(rng *rand.Rand, code string) string { + // Map device types to plausible locations. + switch code { + case "DOOR": + return []string{"front_door", "back_door", "garage_door", "side_door"}[rng.Intn(4)] + case "WINDOW": + return []string{"living_room", "kitchen", "dining_room", "master_bedroom", "bedroom_2", "bedroom_3"}[rng.Intn(6)] + case "GLASS_BREAK": + return []string{"living_room", "dining_room", "kitchen"}[rng.Intn(3)] + case "SMOKE", "CO": + return []string{"kitchen", "hallway", "master_bedroom", "bedroom_2", "basement"}[rng.Intn(5)] + case "WATER": + return []string{"kitchen", "basement", "utility_room"}[rng.Intn(3)] + case "TEMP": + return []string{"living_room", "master_bedroom", "basement", "attic"}[rng.Intn(4)] + case "DOORBELL": + return "front_door" + case "KEYPAD": + return "front_door" + case "MOTION": + return Locations[rng.Intn(len(Locations))] + } + return Locations[rng.Intn(len(Locations))] +} + +func hashAddr(id int64) string { + h := sha1.Sum([]byte("addr-" + strconv.FormatInt(id, 10))) + return hex.EncodeToString(h[:6]) // short, opaque +} + +// --- dimension + fleet upserts ------------------------------------------- + +func upsertDimensions(ctx context.Context, pool *pgxpool.Pool) error { + batch := &pgx.Batch{} + for _, p := range Plans { + batch.Queue(` + INSERT INTO plans (plan_id, name, monthly_price_usd, sla_seconds) + VALUES ($1, $2, $3, $4) + ON CONFLICT (plan_id) DO UPDATE SET + name = EXCLUDED.name, + monthly_price_usd = EXCLUDED.monthly_price_usd, + sla_seconds = EXCLUDED.sla_seconds + `, p.ID, p.Name, p.PriceUSD, p.SLASeconds) + } + for _, r := range Regions { + batch.Queue(` + INSERT INTO regions (region_id, name, dispatch_center, timezone) + VALUES ($1, $2, $3, $4) + ON CONFLICT (region_id) DO UPDATE SET + name = EXCLUDED.name, + dispatch_center = EXCLUDED.dispatch_center, + timezone = EXCLUDED.timezone + `, r.ID, r.Name, r.DispatchCenter, r.Timezone) + } + for _, dt := range DeviceTypes { + batch.Queue(` + INSERT INTO device_types (device_type_id, code, name, default_severity) + VALUES ($1, $2, $3, $4) + ON CONFLICT (device_type_id) DO UPDATE SET + code = EXCLUDED.code, + name = EXCLUDED.name, + default_severity = EXCLUDED.default_severity + `, dt.ID, dt.Code, dt.Name, int16(dt.DefaultSeverity)) + } + return pool.SendBatch(ctx, batch).Close() +} + +func upsertHouseholds(ctx context.Context, pool *pgxpool.Pool, hh []Household) error { + // CopyFrom into a temp staging approach would be faster, but a Batch of + // INSERTs with ON CONFLICT is simple, idempotent, and fast enough for + // ~30 000 rows once at startup. + const batchSize = 1000 + for start := 0; start < len(hh); start += batchSize { + end := start + batchSize + if end > len(hh) { + end = len(hh) + } + batch := &pgx.Batch{} + for _, h := range hh[start:end] { + batch.Queue(` + INSERT INTO households (household_id, plan_id, region_id, address_hash, armed) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (household_id) DO UPDATE SET + plan_id = EXCLUDED.plan_id, + region_id = EXCLUDED.region_id, + armed = EXCLUDED.armed + `, h.ID, h.PlanID, h.RegionID, h.AddressHash, h.Armed) + } + if err := pool.SendBatch(ctx, batch).Close(); err != nil { + return err + } + } + return nil +} + +func upsertDevices(ctx context.Context, pool *pgxpool.Pool, devs []Device) error { + const batchSize = 1000 + for start := 0; start < len(devs); start += batchSize { + end := start + batchSize + if end > len(devs) { + end = len(devs) + } + batch := &pgx.Batch{} + for _, d := range devs[start:end] { + batch.Queue(` + INSERT INTO devices (device_id, household_id, device_type_id, location, last_battery_pct) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (device_id) DO UPDATE SET + household_id = EXCLUDED.household_id, + device_type_id = EXCLUDED.device_type_id, + location = EXCLUDED.location, + last_battery_pct = EXCLUDED.last_battery_pct + `, d.ID, d.Household, d.TypeID, d.Location, int16(d.BatteryPct)) + } + if err := pool.SendBatch(ctx, batch).Close(); err != nil { + return err + } + } + return nil +} diff --git a/homeguard-iot/internal/web/server.go b/homeguard-iot/internal/web/server.go new file mode 100644 index 0000000..960b5d8 --- /dev/null +++ b/homeguard-iot/internal/web/server.go @@ -0,0 +1,613 @@ +// Package web is the operator-facing dashboard. +// +// Layout (top → bottom): active-alerts queue, live event stream, customer +// drill-down, ingest-rate footer. Each panel is driven by a different +// query against the same CedarDB instance the simulator is INSERTing into, +// at refresh rates from 200 ms to 2 s — that's the whole story this demo +// is selling. +package web + +import ( + "context" + "database/sql" + "embed" + "encoding/json" + "fmt" + "html/template" + "io/fs" + "log" + "net/http" + "os" + "strconv" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// envDuration reads a Go duration string from the named environment +// variable, falling back to the supplied default. Mirrors the helper in +// the sim package so neither has to import the other. +func envDuration(name string, fallback time.Duration) time.Duration { + v := os.Getenv(name) + if v == "" { + return fallback + } + d, err := time.ParseDuration(v) + if err != nil { + log.Printf("envDuration: invalid %s=%q (%v); using default %s", name, v, err, fallback) + return fallback + } + return d +} + +//go:embed templates/*.html +var templatesFS embed.FS + +//go:embed static/* +var staticFS embed.FS + +type Server struct { + Pool *pgxpool.Pool + tpl *template.Template + static http.Handler +} + +func NewServer(pool *pgxpool.Pool) (*Server, error) { + tpl, err := template.ParseFS(templatesFS, "templates/*.html") + if err != nil { + return nil, fmt.Errorf("parse templates: %w", err) + } + sub, err := fs.Sub(staticFS, "static") + if err != nil { + return nil, fmt.Errorf("static sub: %w", err) + } + return &Server{ + Pool: pool, + tpl: tpl, + static: http.FileServer(http.FS(sub)), + }, nil +} + +func (s *Server) Routes() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleIndex) + mux.HandleFunc("/sse/events", s.handleSSE) + mux.HandleFunc("/api/alerts", s.handleAlerts) + mux.HandleFunc("/api/drilldown", s.handleDrilldown) + mux.HandleFunc("/api/stats", s.handleStats) + mux.HandleFunc("/api/storage", s.handleStorage) + mux.Handle("/static/", http.StripPrefix("/static/", s.static)) + return mux +} + +// --------------------------------------------------------------------- index + +// indexData carries the dashboard's polling cadences into the index +// template. The four refresh intervals are millisecond integers so the +// JS setInterval() calls and htmx "every Nms" triggers can use them +// verbatim. Defaults match the original hardcoded values. +type indexData struct { + AlertsRefreshMs int64 + DrilldownRefreshMs int64 + StatsRefreshMs int64 + StorageRefreshMs int64 +} + +func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + data := indexData{ + AlertsRefreshMs: envDuration("HG_ALERTS_REFRESH", 1*time.Second).Milliseconds(), + DrilldownRefreshMs: envDuration("HG_DRILLDOWN_REFRESH", 2*time.Second).Milliseconds(), + StatsRefreshMs: envDuration("HG_STATS_REFRESH", 1*time.Second).Milliseconds(), + StorageRefreshMs: envDuration("HG_STORAGE_REFRESH", 1*time.Second).Milliseconds(), + } + if err := s.tpl.ExecuteTemplate(w, "index.html", data); err != nil { + log.Printf("template: %v", err) + } +} + +// --------------------------------------------------------------------- alerts + +// handleAlerts returns the operator's active-alerts queue as an HTML +// fragment. Joins alerts × households × plans so we can compute the +// per-alert SLA countdown (plan.sla_seconds - age) directly in the query. +func (s *Server) handleAlerts(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + rows, err := s.Pool.Query(ctx, ` + SELECT a.alert_id, a.severity, a.detail, + EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s, + p.sla_seconds, + p.sla_seconds - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining, + h.household_id, h.address_hash, + p.name AS plan_name, + r.name AS region_name, + r.dispatch_center + FROM alerts a + JOIN households h ON h.household_id = a.household_id + JOIN plans p ON p.plan_id = h.plan_id + JOIN regions r ON r.region_id = h.region_id + WHERE a.status = 'active' + ORDER BY a.severity DESC, a.raised_at ASC + LIMIT 25 + `) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer rows.Close() + + type row struct { + AlertID int64 + Severity int + Detail string + AgeSec int + SLASec int + SLARemaining int + HouseholdID int64 + AddressHash string + PlanName string + RegionName string + Dispatch string + Breached bool + AgeFmt string + SLAFmt string + } + var out []row + for rows.Next() { + var rr row + if err := rows.Scan( + &rr.AlertID, &rr.Severity, &rr.Detail, &rr.AgeSec, + &rr.SLASec, &rr.SLARemaining, &rr.HouseholdID, &rr.AddressHash, + &rr.PlanName, &rr.RegionName, &rr.Dispatch, + ); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + rr.Breached = rr.SLARemaining < 0 + rr.AgeFmt = fmtDuration(rr.AgeSec) + rr.SLAFmt = fmtDuration(absInt(rr.SLARemaining)) + out = append(out, rr) + } + // If the iteration ended because of an error (most commonly a + // context-cancelled mid-scan on an unindexed query), surface it. + // Without this, a half-completed scan looks identical to "no rows" + // and the dashboard silently renders "no active alerts." + if err := rows.Err(); err != nil { + log.Printf("alerts query: rows.Err: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := s.tpl.ExecuteTemplate(w, "alerts.html", out); err != nil { + log.Printf("alerts tpl: %v", err) + } +} + +// --------------------------------------------------------------------- SSE + +// handleSSE streams a JSON snapshot of the most-recent non-heartbeat +// events every 200 ms. Heartbeats are filtered out server-side so the +// operator panel stays readable; the analytical queries elsewhere +// continue to count every row. +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + ctx := r.Context() + tick := time.NewTicker(envDuration("HG_SSE_INTERVAL", 200*time.Millisecond)) + defer tick.Stop() + + if err := s.pushEventStream(ctx, w, flusher); err != nil { + return + } + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + if err := s.pushEventStream(ctx, w, flusher); err != nil { + return + } + } + } +} + +type eventRow struct { + EventID int64 `json:"event_id"` + Ts string `json:"ts"` + HouseholdID int64 `json:"household_id"` + AddressHash string `json:"address_hash"` + DeviceCode string `json:"device_code"` + Location string `json:"location"` + Kind int `json:"kind"` + Severity int `json:"severity"` + BatteryPct int `json:"battery_pct"` + Region string `json:"region"` +} + +func (s *Server) pushEventStream(ctx context.Context, w http.ResponseWriter, f http.Flusher) error { + // Newest 25 non-heartbeat events, joined with device + device_type + + // household + region for human-readable rendering. The (kind > 0) + // filter uses the events_kind_ts_idx index efficiently. + rows, err := s.Pool.Query(ctx, ` + SELECT e.event_id, e.ts, e.household_id, h.address_hash, + dt.code, d.location, e.kind, e.severity, + COALESCE(e.battery_pct, -1), r.name + FROM events e + JOIN devices d ON d.device_id = e.device_id + JOIN device_types dt ON dt.device_type_id = d.device_type_id + JOIN households h ON h.household_id = e.household_id + JOIN regions r ON r.region_id = h.region_id + WHERE e.kind > 0 + ORDER BY e.ts DESC + LIMIT 25 + `) + if err != nil { + log.Printf("event stream query: %v", err) + return nil + } + defer rows.Close() + + out := make([]eventRow, 0, 25) + for rows.Next() { + var er eventRow + var ts time.Time + var bp int + if err := rows.Scan(&er.EventID, &ts, &er.HouseholdID, &er.AddressHash, + &er.DeviceCode, &er.Location, &er.Kind, &er.Severity, + &bp, &er.Region); err != nil { + log.Printf("event stream scan: %v", err) + return nil + } + er.Ts = ts.Format("15:04:05") + er.BatteryPct = bp + out = append(out, er) + } + if err := rows.Err(); err != nil { + log.Printf("event stream: rows.Err: %v", err) + // Don't propagate as a stream error — just skip this frame and + // let the next tick try again. + return nil + } + buf, err := json.Marshal(map[string]any{ + "events": out, + "stamp_ms": time.Now().UnixMilli(), + }) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { + return err + } + f.Flush() + return nil +} + +// --------------------------------------------------------------------- drilldown + +// handleDrilldown returns the "what's going on at this household right +// now" panel. We pick the household behind the highest-severity currently- +// active alert (or the most-recently-active household if none) and show +// its last 20 events with device + type joined in. +// +// This is the operator-flow query: alert fires → click household → see +// context to decide dispatch. +func (s *Server) handleDrilldown(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var householdID int64 + err := s.Pool.QueryRow(ctx, ` + SELECT a.household_id + FROM alerts a + WHERE a.status = 'active' + ORDER BY a.severity DESC, a.raised_at ASC + LIMIT 1 + `).Scan(&householdID) + if err != nil { + // Fallback: pick the household with the most events in the last + // 5 minutes — keeps the panel populated even when there are no + // active alerts. + _ = s.Pool.QueryRow(ctx, ` + SELECT household_id + FROM events + WHERE ts > now() - interval '5 minutes' + GROUP BY household_id + ORDER BY COUNT(*) DESC + LIMIT 1 + `).Scan(&householdID) + } + if householdID == 0 { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = w.Write([]byte(`

waiting for events…

`)) + return + } + + // Household metadata for the header. + var ( + addr, planName, regionName, dispatch string + armed bool + slaSec int + ) + if err := s.Pool.QueryRow(ctx, ` + SELECT h.address_hash, h.armed, p.name, p.sla_seconds, r.name, r.dispatch_center + FROM households h + JOIN plans p ON p.plan_id = h.plan_id + JOIN regions r ON r.region_id = h.region_id + WHERE h.household_id = $1 + `, householdID).Scan(&addr, &armed, &planName, &slaSec, ®ionName, &dispatch); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Last 20 events for this household, any kind. + rows, err := s.Pool.Query(ctx, ` + SELECT e.ts, dt.code, d.location, e.kind, e.severity, + COALESCE(e.battery_pct, -1) + FROM events e + JOIN devices d ON d.device_id = e.device_id + JOIN device_types dt ON dt.device_type_id = d.device_type_id + WHERE e.household_id = $1 + ORDER BY e.ts DESC + LIMIT 20 + `, householdID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer rows.Close() + + type evRow struct { + Ts string + Code string + Location string + Kind int + Severity int + BatteryPct int + } + var events []evRow + for rows.Next() { + var ( + er evRow + ts time.Time + ) + if err := rows.Scan(&ts, &er.Code, &er.Location, &er.Kind, &er.Severity, &er.BatteryPct); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + er.Ts = ts.Format("15:04:05") + events = append(events, er) + } + if err := rows.Err(); err != nil { + log.Printf("drilldown query: rows.Err: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := s.tpl.ExecuteTemplate(w, "drilldown.html", map[string]any{ + "HouseholdID": householdID, + "Address": addr, + "Armed": armed, + "Plan": planName, + "SLASec": slaSec, + "Region": regionName, + "Dispatch": dispatch, + "Events": events, + }); err != nil { + log.Printf("drilldown tpl: %v", err) + } +} + +// --------------------------------------------------------------------- stats + +// handleStats is the meta-query for the dashboard footer: how fast are +// we ingesting right now, how many rows have landed total, how many +// alerts are open. The dashboard footer reads this every +// HG_STATS_REFRESH (default 1s). +// +// The event-rate side reads from storage_samples (which the simulator +// populates every HG_STORAGE_SAMPLER_INTERVAL from its atomic eventID +// counter), not from events itself. At billion-row scale, scanning +// events for COUNT(*) — especially with no useful index — takes long +// enough to make the footer effectively frozen. storage_samples is a +// few hundred rows at most and answers instantly. The published +// rows_per_sec is therefore the average over the last sample interval +// (default 5s, the user may have set it longer in HG_STORAGE_SAMPLER_INTERVAL), +// not a strict one-second window. +func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var ( + rowsPerSec int64 + totalEv int64 + activeAlts int64 + totalAlts int64 + ) + // Event metrics — derived from the two most recent storage_samples + // rows. total_events = latest_bytes / 48. rate = bytes-delta over the + // elapsed time divided by 48. + var ( + latestBytes, priorBytes sql.NullInt64 + latestTs, priorTs sql.NullTime + ) + if err := s.Pool.QueryRow(ctx, ` + SELECT + (SELECT uncompressed_bytes FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes, + (SELECT sampled_at FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_ts, + (SELECT uncompressed_bytes FROM storage_samples + WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples) + ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes, + (SELECT sampled_at FROM storage_samples + WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples) + ORDER BY sampled_at DESC LIMIT 1) AS prior_ts + `).Scan(&latestBytes, &latestTs, &priorBytes, &priorTs); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if latestBytes.Valid { + totalEv = latestBytes.Int64 / int64(eventsRowBytes) + } + if latestBytes.Valid && priorBytes.Valid && latestTs.Valid && priorTs.Valid { + dt := latestTs.Time.Sub(priorTs.Time).Seconds() + if dt > 0 { + delta := latestBytes.Int64 - priorBytes.Int64 + if delta < 0 { + delta = 0 + } + rowsPerSec = int64(float64(delta) / dt / float64(eventsRowBytes)) + } + } + // Alert metrics. + if err := s.Pool.QueryRow(ctx, ` + SELECT + COUNT(*) FILTER (WHERE status = 'active') AS active, + COUNT(*) AS total + FROM alerts + `).Scan(&activeAlts, &totalAlts); err != nil { + // Non-fatal; alerts table may be empty very early. + activeAlts = 0 + totalAlts = 0 + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "rows_per_sec": rowsPerSec, + "total_events": totalEv, + "active_alerts": activeAlts, + "total_alerts": totalAlts, + }) +} + +// --------------------------------------------------------------------- storage + +// targetBytesPerSec: 3 TB/day uncompressed = 3 * 10^12 / 86 400 s ≈ +// 34.722 MB/s. The dashboard renders gauges against this target. +const targetBytesPerSec = 34_722_222 + +// eventsRowBytes: per-row uncompressed footprint of the events table, +// derived from the schema column widths (matches the constant of the +// same name in the sim package). Used to convert storage_samples +// uncompressed_bytes back into row counts for the stats footer. +const eventsRowBytes = 48 + +// handleStorage returns the most recent uncompressed-size sample plus +// rates computed across the last 1, 5, and 15 minutes. Rates are +// (latest_bytes - bytes_at_window_start) / window_seconds — so they +// reflect the real growth of stored data, not the wire-rate of inserts. +func (s *Server) handleStorage(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + type windowResult struct { + LatestBytes sql.NullInt64 + LatestTs sql.NullTime + Bytes1mAgo sql.NullInt64 + Ts1mAgo sql.NullTime + Bytes5mAgo sql.NullInt64 + Ts5mAgo sql.NullTime + Bytes15mAgo sql.NullInt64 + Ts15mAgo sql.NullTime + } + var rr windowResult + if err := s.Pool.QueryRow(ctx, ` + SELECT + (SELECT uncompressed_bytes FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes, + (SELECT sampled_at FROM storage_samples + ORDER BY sampled_at DESC LIMIT 1) AS latest_ts, + (SELECT uncompressed_bytes FROM storage_samples + WHERE sampled_at <= now() - interval '1 minute' + ORDER BY sampled_at DESC LIMIT 1) AS bytes_1m, + (SELECT sampled_at FROM storage_samples + WHERE sampled_at <= now() - interval '1 minute' + ORDER BY sampled_at DESC LIMIT 1) AS ts_1m, + (SELECT uncompressed_bytes FROM storage_samples + WHERE sampled_at <= now() - interval '5 minutes' + ORDER BY sampled_at DESC LIMIT 1) AS bytes_5m, + (SELECT sampled_at FROM storage_samples + WHERE sampled_at <= now() - interval '5 minutes' + ORDER BY sampled_at DESC LIMIT 1) AS ts_5m, + (SELECT uncompressed_bytes FROM storage_samples + WHERE sampled_at <= now() - interval '15 minutes' + ORDER BY sampled_at DESC LIMIT 1) AS bytes_15m, + (SELECT sampled_at FROM storage_samples + WHERE sampled_at <= now() - interval '15 minutes' + ORDER BY sampled_at DESC LIMIT 1) AS ts_15m + `).Scan( + &rr.LatestBytes, &rr.LatestTs, + &rr.Bytes1mAgo, &rr.Ts1mAgo, + &rr.Bytes5mAgo, &rr.Ts5mAgo, + &rr.Bytes15mAgo, &rr.Ts15mAgo, + ); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + rate := func(latest, prev sql.NullInt64, latestTs, prevTs sql.NullTime) (float64, bool) { + if !latest.Valid || !prev.Valid || !latestTs.Valid || !prevTs.Valid { + return 0, false + } + dt := latestTs.Time.Sub(prevTs.Time).Seconds() + if dt <= 0 { + return 0, false + } + db := float64(latest.Int64 - prev.Int64) + if db < 0 { + db = 0 // can happen across CedarDB compactions + } + return db / dt, true + } + + r1, ok1 := rate(rr.LatestBytes, rr.Bytes1mAgo, rr.LatestTs, rr.Ts1mAgo) + r5, ok5 := rate(rr.LatestBytes, rr.Bytes5mAgo, rr.LatestTs, rr.Ts5mAgo) + r15, ok15 := rate(rr.LatestBytes, rr.Bytes15mAgo, rr.LatestTs, rr.Ts15mAgo) + + out := map[string]any{ + "target_bytes_per_sec": targetBytesPerSec, + } + if rr.LatestBytes.Valid { + out["latest_bytes"] = rr.LatestBytes.Int64 + } + if rr.LatestTs.Valid { + out["latest_ts"] = rr.LatestTs.Time.Format(time.RFC3339) + } + if ok1 { + out["rate_1m_bps"] = r1 + } + if ok5 { + out["rate_5m_bps"] = r5 + } + if ok15 { + out["rate_15m_bps"] = r15 + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(out) +} + +// --------------------------------------------------------------------- utils + +func fmtDuration(sec int) string { + if sec < 60 { + return strconv.Itoa(sec) + "s" + } + return strconv.Itoa(sec/60) + "m" + strconv.Itoa(sec%60) + "s" +} + +func absInt(x int) int { + if x < 0 { + return -x + } + return x +} diff --git a/homeguard-iot/internal/web/static/cedardb-logo.svg b/homeguard-iot/internal/web/static/cedardb-logo.svg new file mode 100644 index 0000000..192623c --- /dev/null +++ b/homeguard-iot/internal/web/static/cedardb-logo.svg @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/homeguard-iot/internal/web/static/queries.html b/homeguard-iot/internal/web/static/queries.html new file mode 100644 index 0000000..a3dceca --- /dev/null +++ b/homeguard-iot/internal/web/static/queries.html @@ -0,0 +1,865 @@ + + + + +HomeGuard IoT — SQL queries reference + + + + + +
+

+ +   + HOMEGUARD IoT · SQL QUERIES +

+ ← back to operator console +
+ +
+ +

+ Every SQL statement the simulator and dashboard run against CedarDB, in + one place. The interesting bit isn't any individual query — it's that + all of these run concurrently against the same instance: + N writer goroutines are CopyFrom-ing into events + at up to ~500 K events/sec (≈ 3 TB/day uncompressed) while four + different read paths (active-alerts queue, live event stream, customer + drill-down, ingest stats) are joining across the same hot table and the + normalized dimensions at 200 ms – 2 s refresh cadences. The whole pitch + is collapsing the BigQuery + Pub/Sub-plus-KV split onto one database + with real joins and no replication lag. +

+

+ Tuning the cadences. Every "refresh N s" pill below is + an env-var-driven default — set HG_ALERTS_REFRESH, + HG_DRILLDOWN_REFRESH, HG_STATS_REFRESH, + HG_STORAGE_REFRESH, HG_SSE_INTERVAL + (on the server container), or HG_STORAGE_SAMPLER_INTERVAL + (on the simulator container) to any Go duration string (e.g. + "500ms", "2s", "30s") to dial + the read pressure CedarDB has to serve next to the ingest stream. +

+ +
+ hot · every tick (10 Hz) + fast · 200 ms – 1 s + medium · 1 – 2 s + slow · 2 s+ background + once · startup +
+ + + + +

1. Hot write path2 queries — drives the firehose

+ +
+
+

COPY events (fan-in, single ingestor)

+
+ ~20 flushes/s · ≤ 10K rows each + up to ~500 K ev/s + internal/sim/simulator.go · ingestorLoop() +
+
+

+ The firehose. -writers producer goroutines partition + the device fleet, build row batches, and push them onto a buffered + channel. A single ingestor goroutine drains the + channel, coalesces up to 10 000 rows (or every 50 ms, whichever + comes first), and runs one binary CopyFrom. This + sidesteps CedarDB's "cannot start bulk operation until previous + bulk operation has become globally visible" error (SQLSTATE + 40P01) that fires when more than one goroutine COPYs in parallel, + while still keeping the binary protocol's ~3× wire and parsing + advantage over multi-row INSERT under simple-query mode. + event_id is an atomic.Int64 seeded from + MAX(event_id) at startup, so producers share the id + space without locking — the column is plain BIGINT, + not BIGSERIAL (CedarDB rejects COPY for sequence- + defaulted columns with "unable to cast from void to bigint"). +

+
-- One round trip per ingestor flush (≤ 10K rows, or every 50ms).
+COPY events (
+    event_id, device_id, household_id, ts,
+    kind, severity, value, battery_pct, rssi_dbm
+) FROM STDIN BINARY;
+-- ...one binary tuple per buffered event...
+

+ On startup, the simulator runs one read to seed the id counter: +

+
SELECT COALESCE(MAX(event_id), 0) FROM events;
+
+ +
+
+

Insert one alert

+
+ ~10–30/s + 1 row/call + internal/sim/simulator.go · fireAlert() +
+
+

+ Called inline from emitTick whenever EvaluateAlert + in rules.go promotes a triggered/tamper event to an + operator-actionable alert. status starts 'active'; the + background resolveAlertsLoop later marks it resolved or + false_alarm. +

+
INSERT INTO alerts (
+    household_id,
+    triggered_event_id,
+    raised_at,
+    severity,
+    status,
+    detail
+)
+VALUES ($1, 0, now(), $2, 'active', $3);
+
+ + +

2. Dashboard reads9 queries — the demo story

+ +
+
+

Active-alerts queue (SLA-aware)

+
+ every 1 s + joins 4 tables + internal/web/server.go · handleAlerts() +
+
+

+ The top panel of the dashboard. Joins alerts × households × plans + × regions so the SLA countdown + (plan.sla_seconds − age) and the dispatch-center attribution + are computed by the database, not the app. Ordered by severity DESC + then raised_at ASC — escalations first, then oldest-of-equal-severity. +

+
SELECT a.alert_id,
+       a.severity,
+       a.detail,
+       EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s,
+       p.sla_seconds,
+       p.sla_seconds
+         - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining,
+       h.household_id,
+       h.address_hash,
+       p.name AS plan_name,
+       r.name AS region_name,
+       r.dispatch_center
+FROM   alerts     a
+JOIN   households h ON h.household_id = a.household_id
+JOIN   plans      p ON p.plan_id      = h.plan_id
+JOIN   regions    r ON r.region_id    = h.region_id
+WHERE  a.status = 'active'
+ORDER  BY a.severity DESC, a.raised_at ASC
+LIMIT  25;
+
+ +
+
+

Live event stream (non-heartbeat)

+
+ SSE every 200 ms + joins 5 tables + internal/web/server.go · pushEventStream() +
+
+

+ Bottom-left panel, the ticker tape. Joins events × devices × + device_types × households × regions so each row renders with a + human-readable device code, location and region. The + WHERE e.kind > 0 filter drops heartbeats and + rides the events_kind_ts_idx index. +

+
SELECT e.event_id,
+       e.ts,
+       e.household_id,
+       h.address_hash,
+       dt.code,
+       d.location,
+       e.kind,
+       e.severity,
+       COALESCE(e.battery_pct, -1),
+       r.name
+FROM   events       e
+JOIN   devices      d  ON d.device_id      = e.device_id
+JOIN   device_types dt ON dt.device_type_id = d.device_type_id
+JOIN   households   h  ON h.household_id   = e.household_id
+JOIN   regions      r  ON r.region_id      = h.region_id
+WHERE  e.kind > 0
+ORDER  BY e.ts DESC
+LIMIT  25;
+
+ +
+
+

Drill-down: pick the household

+
+ every 2 s + first step of 3 + internal/web/server.go · handleDrilldown() +
+
+

+ Bottom-right panel auto-rotates to whichever household has the + highest-severity currently-active alert. Severity DESC then raised_at + ASC mirrors the operator's actual triage order. +

+
SELECT a.household_id
+FROM   alerts a
+WHERE  a.status = 'active'
+ORDER  BY a.severity DESC, a.raised_at ASC
+LIMIT  1;
+
+ +
+
+

Drill-down: fallback household

+
+ when no active alerts + GROUP BY on hot table + internal/web/server.go · handleDrilldown() +
+
+

+ Runs only when the first query found no active alerts — picks the + chattiest household in the last 5 minutes so the drill-down panel never + sits empty during a demo. A small but real aggregate over the hot + table. +

+
SELECT household_id
+FROM   events
+WHERE  ts > now() - interval '5 minutes'
+GROUP  BY household_id
+ORDER  BY COUNT(*) DESC
+LIMIT  1;
+
+ +
+
+

Drill-down: household header

+
+ every 2 s + 3-way join + internal/web/server.go · handleDrilldown() +
+
+

+ Pulls the household's plan + region + dispatch center to render the + drill-down panel header (plan name, SLA seconds, armed state, region + DC). +

+
SELECT h.address_hash,
+       h.armed,
+       p.name,
+       p.sla_seconds,
+       r.name,
+       r.dispatch_center
+FROM   households h
+JOIN   plans      p ON p.plan_id   = h.plan_id
+JOIN   regions    r ON r.region_id = h.region_id
+WHERE  h.household_id = $1;
+
+ +
+
+

Drill-down: last 20 events

+
+ every 2 s + uses events_household_ts_idx + internal/web/server.go · handleDrilldown() +
+
+

+ The "what's going on at this household right now" panel. Indexed on + events (household_id, ts DESC) so the LIMIT 20 returns + instantly even with the table at billion-row scale. +

+
SELECT e.ts,
+       dt.code,
+       d.location,
+       e.kind,
+       e.severity,
+       COALESCE(e.battery_pct, -1)
+FROM   events       e
+JOIN   devices      d  ON d.device_id       = e.device_id
+JOIN   device_types dt ON dt.device_type_id = d.device_type_id
+WHERE  e.household_id = $1
+ORDER  BY e.ts DESC
+LIMIT  20;
+
+ +
+
+

Footer: ingest rate + total events

+
+ every HG_STATS_REFRESH + storage_samples · ≤ a few hundred rows + internal/web/server.go · handleStats() +
+
+

+ The two most recent storage_samples rows answer both + "how many rows are in events" and "how fast are they growing": the + table is just (sampled_at, uncompressed_bytes) pairs + the simulator writes every HG_STORAGE_SAMPLER_INTERVAL + from its in-process atomic counter, so the delta-over-time gives + you events/sec without ever touching events. Original + iterations of this query did COUNT(*) FILTER (WHERE ts > + now() - interval '1 second') on events directly — but + without an events (ts DESC) index that's a full table + scan, which at billion-row scale takes long enough to make the + footer freeze. (The published rows_per_sec is a + sample-interval average rather than a strict 1-second window.) +

+
SELECT
+    (SELECT uncompressed_bytes FROM storage_samples
+         ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+    (SELECT sampled_at         FROM storage_samples
+         ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+    (SELECT uncompressed_bytes FROM storage_samples
+         WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+         ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes,
+    (SELECT sampled_at         FROM storage_samples
+         WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+         ORDER BY sampled_at DESC LIMIT 1) AS prior_ts;
+-- total_events  = latest_bytes / 48
+-- rows_per_sec  = (latest_bytes - prior_bytes) / dt / 48
+
+ +
+
+

Footer: active / total alerts

+
+ every 1 s + small table + internal/web/server.go · handleStats() +
+
+

+ Same shape as the events counter but over the alerts table. +

+
SELECT
+    COUNT(*) FILTER (WHERE status = 'active') AS active,
+    COUNT(*)                                  AS total
+FROM alerts;
+
+ +
+
+

Storage gauge: latest size + 1m / 5m / 15m rates

+
+ every 1 s + 8 subselects · one round trip + internal/web/server.go · handleStorage() +
+
+

+ Feeds the Storage Growth gauge. Picks the freshest sample and, for + each window, the freshest sample at or before the + window-start cutoff — handles uneven sampler timing without lying + about the elapsed seconds (the rate is computed in Go from the two + timestamps the query returns, not assumed). Reads only the + storage_samples table; the underlying compression view + is hit once every 5 s by the sampler, not here. +

+
SELECT
+    (SELECT uncompressed_bytes FROM storage_samples
+         ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+    (SELECT sampled_at         FROM storage_samples
+         ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+
+    (SELECT uncompressed_bytes FROM storage_samples
+         WHERE sampled_at <= now() - interval '1 minute'
+         ORDER BY sampled_at DESC LIMIT 1) AS bytes_1m,
+    (SELECT sampled_at         FROM storage_samples
+         WHERE sampled_at <= now() - interval '1 minute'
+         ORDER BY sampled_at DESC LIMIT 1) AS ts_1m,
+
+    (SELECT uncompressed_bytes FROM storage_samples
+         WHERE sampled_at <= now() - interval '5 minutes'
+         ORDER BY sampled_at DESC LIMIT 1) AS bytes_5m,
+    (SELECT sampled_at         FROM storage_samples
+         WHERE sampled_at <= now() - interval '5 minutes'
+         ORDER BY sampled_at DESC LIMIT 1) AS ts_5m,
+
+    (SELECT uncompressed_bytes FROM storage_samples
+         WHERE sampled_at <= now() - interval '15 minutes'
+         ORDER BY sampled_at DESC LIMIT 1) AS bytes_15m,
+    (SELECT sampled_at         FROM storage_samples
+         WHERE sampled_at <= now() - interval '15 minutes'
+         ORDER BY sampled_at DESC LIMIT 1) AS ts_15m;
+
+ + +

3. Background maintenance4 queries — keep the demo dynamic

+ +
+
+

Resolve low-severity alerts (false alarm)

+
+ every 2 s + LIMIT 2000 per tick + internal/sim/simulator.go · resolveAlertsLoop() +
+
+

+ Severity 1–2 alerts age out as false_alarm. Without this the operator + queue would just grow forever during a long-running demo. +

+
UPDATE alerts
+SET status        = 'false_alarm',
+    resolved_at   = now(),
+    resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int
+WHERE alert_id IN (
+    SELECT alert_id
+    FROM   alerts
+    WHERE  status = 'active' AND severity <= 2
+    ORDER  BY raised_at ASC
+    LIMIT  2000
+);
+
+ +
+
+

Resolve high-severity alerts (dispatch)

+
+ every 2 s + LIMIT 600 per tick + internal/sim/simulator.go · resolveAlertsLoop() +
+
+

+ Severity 3+ alerts only get cleared after they've sat in the queue + for > 20 seconds — gives the operator panel something to look at + before the simulated dispatch fires. +

+
UPDATE alerts
+SET status        = 'resolved',
+    resolved_at   = now(),
+    resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int
+WHERE alert_id IN (
+    SELECT alert_id
+    FROM   alerts
+    WHERE  status = 'active' AND severity >= 3
+      AND  raised_at < now() - interval '20 seconds'
+    ORDER  BY raised_at ASC
+    LIMIT  600
+);
+
+ +
+
+

Shuffle households' armed state

+
+ every ~10 s · batched + PK update · ~1 500 rows/round + internal/sim/simulator.go · shuffleArmedState() +
+
+

+ Flips ~5 % of households armed/disarmed each round, sent as a single + pgx.Batch. Keeps the rules engine outputs varying — door + opens flip between "routine" and "alarmable" without restarting the + sim. +

+
UPDATE households
+SET    armed = $1
+WHERE  household_id = $2;
+
+ +
+
+

Sampler: record table-size sample

+
+ every 5 s + 1 row/call · no read needed + internal/sim/simulator.go · storageSamplerLoop() +
+
+

+ The size value is 48 × s.eventID.Load() — the in-process + atomic counter the producers increment per generated row, no DB + query needed. Earlier iterations polled + cedardb_compression_info (laggy because it only + reflects rows packed into column-store blocks) and then + COUNT(*) FROM events (exact but increasingly expensive + as the table grows past a billion rows, and a likely cause of write + stalls because the long scan held an MVCC snapshot). The atomic + counter is exact for rows that producers tried to write — + diverges from the table count only by the channel queue plus the + in-flight COPY batch (≤ ~160 K rows ≈ 8 MB at the configured + backpressure ceiling). sampled_at is the table's PK + and is unique at the 5 s sampler cadence (microsecond resolution). +

+
INSERT INTO storage_samples (sampled_at, uncompressed_bytes)
+VALUES (now(), $1);
+
+ + +

4. Startup & bootstrap5 queries — once per cold start

+ +
+
+

Probe: is the schema present?

+
+ once + information_schema + internal/db/schema.go · SchemaPresent() +
+
+

+ Diagnostic log only — we always call ApplySchema anyway + because the file is idempotent (CREATE TABLE IF NOT EXISTS on + everything). +

+
SELECT (
+    SELECT COUNT(*) FROM information_schema.tables
+    WHERE  table_name = 'households'
+) = 1;
+
+ +
+
+

Apply schema (DDL, one stmt at a time)

+
+ once + ~14 statements + internal/db/schema.go · ApplySchema() · schema.sql +
+
+

+ The embedded schema.sql is split on ; and each + statement is run individually under simple-query protocol — CedarDB + accepts DDL through that path but silently no-ops some of it under + extended Parse/Bind/Execute, so we force the simpler path. +

+
CREATE TABLE IF NOT EXISTS plans (
+    plan_id           INTEGER PRIMARY KEY,
+    name              TEXT NOT NULL,
+    monthly_price_usd NUMERIC(8, 2) NOT NULL,
+    sla_seconds       INTEGER NOT NULL
+);
+-- ... regions, device_types, households, devices, events, alerts ...
+-- ... plus the four supporting indexes (trimmed for the high-rate demo —
+--     every secondary index on events is write amplification at 500 K/s):
+CREATE INDEX IF NOT EXISTS events_household_ts_idx ON events (household_id, ts DESC);
+CREATE INDEX IF NOT EXISTS events_kind_ts_idx      ON events (kind, ts DESC);
+CREATE INDEX IF NOT EXISTS alerts_status_raised_idx
+    ON alerts (status, raised_at DESC);
+CREATE INDEX IF NOT EXISTS alerts_household_raised_idx
+    ON alerts (household_id, raised_at DESC);
+
+ +
+
+

Upsert dimensions (plans, regions, device_types)

+
+ once · single batch + ~25 rows total + internal/sim/simulator.go · upsertDimensions() +
+
+

+ The three small static dimension tables are populated from + catalog.go using INSERT ... ON CONFLICT DO UPDATE + so the simulator is safe to restart against a hot database. All three + upserts ride a single pgx.Batch. +

+
-- plans
+INSERT INTO plans (plan_id, name, monthly_price_usd, sla_seconds)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (plan_id) DO UPDATE SET
+    name              = EXCLUDED.name,
+    monthly_price_usd = EXCLUDED.monthly_price_usd,
+    sla_seconds       = EXCLUDED.sla_seconds;
+
+-- regions
+INSERT INTO regions (region_id, name, dispatch_center, timezone)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (region_id) DO UPDATE SET
+    name            = EXCLUDED.name,
+    dispatch_center = EXCLUDED.dispatch_center,
+    timezone        = EXCLUDED.timezone;
+
+-- device_types
+INSERT INTO device_types (device_type_id, code, name, default_severity)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (device_type_id) DO UPDATE SET
+    code             = EXCLUDED.code,
+    name             = EXCLUDED.name,
+    default_severity = EXCLUDED.default_severity;
+
+ +
+
+

Upsert household fleet

+
+ once · batched 1 000/round + ~30 000 rows default + internal/sim/simulator.go · upsertHouseholds() +
+
+

+ Synthesised fleet from generateFleet(). Run as batches of + 1 000 INSERTs per pgx.Batch.SendBatch — simpler and almost + as fast as a staging-table COPY at this scale, idempotent on restart. +

+
INSERT INTO households (
+    household_id, plan_id, region_id, address_hash, armed
+)
+VALUES ($1, $2, $3, $4, $5)
+ON CONFLICT (household_id) DO UPDATE SET
+    plan_id   = EXCLUDED.plan_id,
+    region_id = EXCLUDED.region_id,
+    armed     = EXCLUDED.armed;
+
+ +
+
+

Upsert device fleet

+
+ once · batched 1 000/round + ~300 000 rows default + internal/sim/simulator.go · upsertDevices() +
+
+

+ Same shape as the households upsert. ~10 devices per household by + default; total fleet is whatever -households × + -devices-per-household lands on. +

+
INSERT INTO devices (
+    device_id, household_id, device_type_id, location, last_battery_pct
+)
+VALUES ($1, $2, $3, $4, $5)
+ON CONFLICT (device_id) DO UPDATE SET
+    household_id     = EXCLUDED.household_id,
+    device_type_id   = EXCLUDED.device_type_id,
+    location         = EXCLUDED.location,
+    last_battery_pct = EXCLUDED.last_battery_pct;
+
+ + +

5. Schema reset1 query · destructive · behind a flag

+ +
+
+

DROP everything (reverse FK order)

+
+ on -reset-schema + 8 tables + internal/db/schema.go · ResetSchema() +
+
+

+ Only runs when the simulator is started with -reset-schema. + Dropped in reverse-FK order so the CASCADE never has anything real to + do, then ApplySchema rebuilds. +

+
DROP TABLE IF EXISTS storage_samples CASCADE;
+DROP TABLE IF EXISTS alerts          CASCADE;
+DROP TABLE IF EXISTS events          CASCADE;
+DROP TABLE IF EXISTS devices         CASCADE;
+DROP TABLE IF EXISTS households      CASCADE;
+DROP TABLE IF EXISTS device_types    CASCADE;
+DROP TABLE IF EXISTS regions         CASCADE;
+DROP TABLE IF EXISTS plans           CASCADE;
+
+ +
+ + + + + diff --git a/homeguard-iot/internal/web/templates/alerts.html b/homeguard-iot/internal/web/templates/alerts.html new file mode 100644 index 0000000..8125cd3 --- /dev/null +++ b/homeguard-iot/internal/web/templates/alerts.html @@ -0,0 +1,32 @@ +{{if . }} + + + + + + + + + + + + + + {{range .}} + + + + + + + + + + {{end}} + +
SevHouseholdPlanRegion · DispatchDetailAgeSLA
{{.Severity}}#{{.HouseholdID}}{{.PlanName}}{{.RegionName}} · {{.Dispatch}}{{.Detail}}{{.AgeFmt}} + {{if .Breached}}+{{.SLAFmt}} OVER{{else}}{{.SLAFmt}} left{{end}} +
+{{else}} +

no active alerts right now — system clean.

+{{end}} diff --git a/homeguard-iot/internal/web/templates/drilldown.html b/homeguard-iot/internal/web/templates/drilldown.html new file mode 100644 index 0000000..5f09804 --- /dev/null +++ b/homeguard-iot/internal/web/templates/drilldown.html @@ -0,0 +1,25 @@ +
+ #{{.HouseholdID}} + addr {{.Address}} + {{.Plan}} (SLA {{.SLASec}}s) + {{.Region}} · {{.Dispatch}} + + {{if .Armed}}● ARMED{{else}}○ disarmed{{end}} + +
+{{if .Events }} + + + {{range .Events}} + + + + + + + {{end}} + +
{{.Ts}}{{.Code}}{{.Location}}{{if eq .Kind 0}}heartbeat{{else if eq .Kind 1}}TRIGGERED{{else if eq .Kind 2}}battery low{{else if eq .Kind 3}}offline{{else if eq .Kind 4}}TAMPER{{else}}?{{end}}{{if gt .Severity 0}} · sev {{.Severity}}{{end}}
+{{else}} +

no recent events for this household.

+{{end}} diff --git a/homeguard-iot/internal/web/templates/index.html b/homeguard-iot/internal/web/templates/index.html new file mode 100644 index 0000000..f76fe4e --- /dev/null +++ b/homeguard-iot/internal/web/templates/index.html @@ -0,0 +1,467 @@ + + + + +HomeGuard IoT — CedarDB Operator Console + + + + + +
+

+ +   + HOMEGUARD IoT · OPERATOR CONSOLE +

+
+ Active alerts 0 · + Events ingested 0 · + connecting… +
+
+ +
+
+
+

Active Alerts · SLA-aware refresh {{.AlertsRefreshMs}} ms · joins alerts × households × plans × regions

+
+

waiting for first alert…

+
+
+
+ +
+
+

Live Event Stream · non-heartbeat SSE · joins events × devices × device_types × households × regions

+
    +
  • connecting to stream…
  • +
+
+
+

Customer Drill-Down · highest-severity household refresh {{.DrilldownRefreshMs}} ms

+
+

selecting household…

+
+
+
+

Storage Growth · uncompressed cedardb_compression_info · refresh 1 s

+
+
+ + + + + + + + MB/s · 1m avg + TARGET 34.7 MB/s · 3 TB/day + +
collecting samples…
+
+ + + + + + + + + + + + + + + + + + + + + +
Total uncompressed
1-minute rate
5-minute rate
15-minute rate
+
+ Projected daily (current 1m rate): + +
+
+
+
+
+ + + + + + diff --git a/homeguard-iot/out/server b/homeguard-iot/out/server new file mode 100755 index 0000000..0957fbb Binary files /dev/null and b/homeguard-iot/out/server differ diff --git a/homeguard-iot/out/simulator b/homeguard-iot/out/simulator new file mode 100755 index 0000000..37f5f53 Binary files /dev/null and b/homeguard-iot/out/simulator differ diff --git a/homeguard-iot/run.sh b/homeguard-iot/run.sh new file mode 100755 index 0000000..56f01b9 --- /dev/null +++ b/homeguard-iot/run.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Using a random port which isn't likely to be taken by someone else +export DATABASE_URL="postgresql://postgres:postgres@localhost:26257/postgres?sslmode=require" + +# Simulator +export HG_STORAGE_SAMPLER_INTERVAL="5s" +export HG_INGEST_BATCH="50000" +export HG_INGESTORS="10" + +# Server +export HG_SSE_INTERVAL="1s" +export HG_ALERTS_REFRESH="5s" +export HG_DRILLDOWN_REFRESH="5s" +export HG_STATS_REFRESH="1s" +export HG_STORAGE_REFRESH="2s" + +# Small run, for a little laptop: +#nohup ./out/simulator -households=30000 -devices-per-household=10 -rate=2500 -hz=10 -writers=4 >> simulator.log 2>&1 > simulator.log 2>&1 > server.log 2>&1