diff --git a/homeguard-iot/Dockerfile b/homeguard-iot/Dockerfile
new file mode 100644
index 0000000..3b2dace
--- /dev/null
+++ b/homeguard-iot/Dockerfile
@@ -0,0 +1,23 @@
+# Two-stage build. Produces a small image containing both binaries; the
+# docker-compose file decides which one to run via `command:`.
+
+FROM golang:1.22-alpine AS build
+WORKDIR /src
+
+COPY go.mod go.sum* ./
+RUN go mod download || true
+
+COPY . .
+
+RUN CGO_ENABLED=0 go build -o /out/server ./cmd/server
+RUN CGO_ENABLED=0 go build -o /out/simulator ./cmd/simulator
+
+FROM gcr.io/distroless/static:nonroot
+WORKDIR /app
+COPY --from=build /out/server /app/server
+COPY --from=build /out/simulator /app/simulator
+USER nonroot:nonroot
+
+# Use CMD (not ENTRYPOINT) so docker-compose's per-service `command:` fully
+# REPLACES this rather than being appended as argv to the default binary.
+CMD ["/app/server"]
diff --git a/homeguard-iot/README.md b/homeguard-iot/README.md
new file mode 100644
index 0000000..a03dbb5
--- /dev/null
+++ b/homeguard-iot/README.md
@@ -0,0 +1,385 @@
+# HomeGuard IoT — CedarDB Operator Console
+
+
+
+A small Go project that simulates an alarm-monitoring company's IoT
+backplane and showcases CedarDB serving both the OLTP-style operator
+console *and* the OLAP-style ingest-rate analytics off the same table at
+the same time.
+
+The pitch: a customer running ~3-4 TB/day of incoming sensor events is
+almost certainly running two stacks today — BigQuery for offline analytics
+plus some streaming pipeline (Pub/Sub + Dataflow + a KV store) for the
+monitoring center operators. This demo collapses both onto a single
+CedarDB instance, with a multi-table normalized schema that lets the
+dashboard run real joins instead of operating over pre-denormalized flat
+tables.
+
+## Talking points for the demo
+
+- This is one database. The operator queue, the live event stream, the
+ drill-down, the footer ingest counters, and the storage growth gauge
+ all read against the same `events` table while the simulator is
+ writing to it at hundreds of thousands of rows/sec — the `-rate` knob
+ goes from 2,500 (gentle demo) to 1,500,000+ (driving toward the
+ 3 TB/day target on real hardware).
+
+- Today you'd run BigQuery for the aggregates and Pub/Sub → Dataflow →
+ Bigtable for the operator console. Two pipelines, two SLAs, one ETL
+ step in between with replication lag measured in seconds-to-minutes.
+
+- Notice the joins — Plan tier determines SLA, dispatch center comes
+ from Region, device code comes from the catalog. BigQuery prefers
+ denormalized flat tables; CedarDB does these joins on hot data without
+ flinching.
+
+## What's inside
+
+```
+schema.sql -- canonical reference (see internal/db/)
+docker-compose.yml -- CedarDB + simulator + dashboard
+Dockerfile -- builds both Go binaries (CMD, not ENTRYPOINT!)
+cmd/simulator/main.go -- drives the event stream
+cmd/server/main.go -- serves the operator dashboard
+internal/db/ -- connection pool + embedded schema bootstrap
+internal/sim/ -- catalog data, fleet synthesis, alert rules
+internal/web/ -- HTTP server, SSE + HTMX, embedded UI assets
+```
+
+## Schema (8 tables, real joins)
+
+```
+plans (dimension) monthly_price, sla_seconds
+regions (dimension) dispatch center + timezone
+device_types (dimension) MOTION, DOOR, SMOKE, CO, WATER, ...
+households (dimension) plan_id + region_id; armed/disarmed
+devices (dimension) one row per sensor; FK to household + type
+events (HOT) 100 K+ rows/sec sustained; the firehose
+alerts (HOT, small) derived from triggered events via rules
+storage_samples (telemetry) (sampled_at, uncompressed_bytes) per
+ HG_STORAGE_SAMPLER_INTERVAL; drives the
+ dashboard growth gauge
+```
+
+`events.event_id` is a plain `BIGINT` (not `BIGSERIAL`) — the simulator
+allocates IDs from an in-process `atomic.Int64` so it can keep the hot
+write path on `pgx.CopyFrom`. CedarDB rejects the binary `COPY` frame
+when the destination column carries a sequence default ("unable to
+cast from void to bigint"), and we never wanted the round trip a
+server-side sequence would imply.
+
+Indexes are sized for the join-heavy reads: `alerts (status, raised_at
+DESC)`, `events (household_id, ts DESC)`, `events (kind, ts DESC)`.
+See `internal/db/schema.sql` for the full list.
+
+## Run it (Docker)
+
+```
+docker compose up --build
+# open http://localhost:8080
+```
+
+Three containers come up: `hg-cedardb` (the database), `hg-simulator` (the
+data generator), and `hg-server` (the dashboard). The simulator embeds
+`schema.sql` and applies it automatically on first run.
+
+The simulator logs every DDL statement it runs and prints its target
+event rate; you should see something like:
+
+```
+hg-simulator | … connected to CedarDB
+hg-simulator | … schema-presence probe: households table missing
+hg-simulator | … applying schema: 15 statements
+hg-simulator | … [ 1/15] CREATE TABLE IF NOT EXISTS plans … — ok
+…
+hg-simulator | … synthesized fleet: 30000 households · 297218 devices
+hg-simulator | … event_id counter seeded at 0
+hg-simulator | … storage sampler: interval=5s
+hg-simulator | … ingestor: batchSize=10000 flushInterval=50ms
+hg-simulator | … simulator running: writers=8 tickHz=10 target=2500 ev/s hb/tick/writer=31 fleet=297218 devices · ingestors=1 batch=10000 flush=50ms
+hg-simulator | … heartbeat: queue=0/64 eventID=24320 delta=2432 rows/s lastCopy=12ms ago copyFails=0
+hg-server | … dashboard listening on :8080
+```
+
+If you need a clean slate (drop all tables and re-create), the simulator
+takes `-reset-schema`:
+
+```
+docker compose run --rm simulator /app/simulator -reset-schema
+docker compose up
+```
+
+## Dashboard layout
+
+```
+┌────────────────────────────────────────────────────────────────────────────┐
+│ ACTIVE ALERTS · SLA-aware refresh HG_ALERTS_REFRESH (1s) │
+│ ┌────────────────────────────────────────────────────────────────────────┐ │
+│ │ SEV · HH · PLAN · REGION / DC · DETAIL · AGE · SLA REMAINING │ │
+│ │ 5 #1024131 Premium Atlanta DC SMOKE detected 12 s 18 s │ │
+│ │ 4 #1009823 Plus Boston DC GLASS_BREAK 03 s 57 s │ │
+│ │ … │ │
+│ └────────────────────────────────────────────────────────────────────────┘ │
+├──────────────────────┬──────────────────────┬──────────────────────────────┤
+│ LIVE EVENT STREAM │ CUSTOMER DRILL-DOWN │ STORAGE GROWTH │
+│ SSE HG_SSE_INTERVAL │ HG_DRILLDOWN_REFRESH │ HG_STORAGE_REFRESH (1s) │
+│ (200 ms) │ (2s · auto-rotates) │ │
+│ │ │ ╭───────────╮ │
+│ 15:42:08 SMOKE kit. │ #1019823 Premium │ │ 25.3 │ MB/s │
+│ 15:42:07 MOTION hall │ ● ARMED │ ╰───────────╯ 1m avg │
+│ 15:42:06 DOOR front│ ──────────────────── │ target 34.7 MB/s · 3 TB/day │
+│ … │ 15:42:08 SMOKE kit. │ total uncompressed 62.0 GB │
+│ │ 15:42:01 MOTION hall │ 1m 25.3 · 5m 23.1 · 15m … │
+└──────────────────────┴──────────────────────┴──────────────────────────────┘
+INSERTs: 312,049 ev/sec · 1,251,514,277 total · 1,270,594 active / 2,022,994 alerts
+ one table, five concurrent reads · CedarDB · SQL queries↗
+```
+
+Each panel is driven by a different query against the same `events`,
+`alerts`, and `storage_samples` tables the simulator is still writing
+into. Cadences are configurable per-panel — see *Tuning at runtime*
+below.
+
+## The queries that matter
+
+Hit `http://localhost:8080/static/queries.html` once the dashboard is up
+for a syntax-highlighted reference of every SQL statement the app runs,
+where it lives in the code, and how often it fires. The three to draw
+attention to during a demo:
+
+**Active-alerts queue (joins 4 tables, refreshes every `HG_ALERTS_REFRESH`):**
+
+```sql
+SELECT a.alert_id, a.severity, a.detail,
+ EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s,
+ p.sla_seconds,
+ p.sla_seconds - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining,
+ h.household_id, h.address_hash,
+ p.name AS plan_name,
+ r.name AS region_name, r.dispatch_center
+FROM alerts a
+JOIN households h ON h.household_id = a.household_id
+JOIN plans p ON p.plan_id = h.plan_id
+JOIN regions r ON r.region_id = h.region_id
+WHERE a.status = 'active'
+ORDER BY a.severity DESC, a.raised_at ASC
+LIMIT 25
+```
+
+**Live event stream (joins 5 tables, server-pushed every `HG_SSE_INTERVAL`):**
+
+```sql
+SELECT e.event_id, e.ts, e.household_id, h.address_hash,
+ dt.code, d.location, e.kind, e.severity,
+ COALESCE(e.battery_pct, -1), r.name
+FROM events e
+JOIN devices d ON d.device_id = e.device_id
+JOIN device_types dt ON dt.device_type_id = d.device_type_id
+JOIN households h ON h.household_id = e.household_id
+JOIN regions r ON r.region_id = h.region_id
+WHERE e.kind > 0
+ORDER BY e.ts DESC
+LIMIT 25
+```
+
+**Live ingest rate (the meta-query, refreshes every `HG_STATS_REFRESH`):**
+
+The footer's events/sec and total-events counters come from
+`storage_samples` — *not* from a `COUNT(*)` over a billion-row events
+table. The simulator writes `(now(), 48 × eventID)` into
+`storage_samples` every `HG_STORAGE_SAMPLER_INTERVAL` from its
+in-process atomic counter, so the two most recent rows give both the
+size and the rate without ever scanning the hot table.
+
+```sql
+SELECT
+ (SELECT uncompressed_bytes FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+ (SELECT sampled_at FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_ts;
+-- total_events = latest_bytes / 48
+-- rows_per_sec = (latest_bytes - prior_bytes) / dt / 48
+```
+
+The published `rows_per_sec` is therefore an average over the sampler
+interval (default 5 s), not a strict 1-second window.
+
+## Knobs
+
+```
+docker compose run --rm simulator /app/simulator \
+ -households 50000 \
+ -devices-per-household 12 \
+ -rate 5000 \
+ -hz 20 \
+ -writers 8
+```
+
+`-rate` sets the sustained events/sec target (heartbeats + triggered).
+`-hz` sets the simulator's tick rate; higher Hz = smoother bursts but
+more network round-trips. `-writers` is the number of producer
+goroutines that partition the device fleet and generate rows in
+parallel; they hand off to a single ingestor goroutine that runs
+`CopyFrom` against CedarDB. The fleet sizing knobs control how many
+`households` and `devices` rows get synthesized on startup.
+
+## Tuning at runtime
+
+Most dashboard cadences and a few pipeline knobs are configurable via
+`HG_*` environment variables in `docker-compose.yml`. Defaults match
+the original hardcoded values, so leaving everything unset preserves
+the standard demo behaviour. All durations are Go duration strings
+(`200ms`, `1s`, `30s`, `1m30s`).
+
+### Dashboard refresh cadences (server container)
+
+| Env var | Default | UI region |
+|---|---|---|
+| `HG_SSE_INTERVAL` | `200ms` | **Live Event Stream** panel (bottom-left). Server-side SSE push rate — the browser holds one long-lived connection and the server pushes a frame at this interval. The "last frame" timestamp in the bottom-right of the footer tracks this one. |
+| `HG_ALERTS_REFRESH` | `1s` | **Active Alerts** queue (top, full-width). htmx polls `/api/alerts`. |
+| `HG_DRILLDOWN_REFRESH` | `2s` | **Customer Drill-Down** (bottom-middle). htmx polls `/api/drilldown`. This runs three SQL queries per tick (pick highest-severity household → fetch its plan/region header → fetch its last 20 events), so it's the biggest cost-per-tick and the most useful one to dial down if dashboard load is competing with ingest. |
+| `HG_STATS_REFRESH` | `1s` | **Footer counters** *and* the **header counters** ("Active alerts N · Events ingested N"). JS polls `/api/stats`. The only env var that updates two regions at once, so slowing it down has the most visible effect. |
+| `HG_STORAGE_REFRESH` | `1s` | **Storage Growth** gauge (bottom-right): the arc, the 1m/5m/15m rate table, the projected-daily figure. The cheapest poll of the bunch — it just reads `storage_samples`, a few hundred rows — so there's rarely a reason to raise this. |
+
+### Pipeline knobs (simulator container)
+
+| Env var | Default | Effect |
+|---|---|---|
+| `HG_STORAGE_SAMPLER_INTERVAL` | `5s` | How often the simulator writes a row into `storage_samples`. Doesn't drive any UI poll directly, but sets the minimum window over which the gauge can compute a rate — set this to `30s` and the dashboard's 1m/5m/15m rates become 30-second moving averages. Cheap to leave at `5s`. |
+| `HG_INGEST_BATCH` | `10000` | Rows the ingestor accumulates before firing one `pgx.CopyFrom`. Bigger → better per-COPY amortization; smaller → lower latency to the live event stream. Watch the heartbeat `lastCopy` and `delta` numbers when tuning. |
+| `HG_INGESTORS` | `1` | Number of ingestor goroutines running `CopyFrom` in parallel. Default `1` is safe everywhere — older CedarDB rejected overlapping COPYs with `SQLSTATE 40P01`; newer versions accept concurrent COPYs, in which case `2`, `4`, `8` may give a meaningful throughput bump. If the heartbeat's `queue=CAP/CAP` ceiling drops as you raise this, the single ingestor was the bottleneck and CedarDB can absorb more parallelism. If the queue stays pegged, CedarDB is serializing the work server-side and adding more ingestors won't help. |
+| `HG_RESOLVE_INTERVAL` | `2s` | How often the background alert resolver fires. |
+| `HG_RESOLVE_AUTOTUNE` | `true` | When on, the resolver picks each tick's LIMIT as `max(floor, ceil(deltaFired × 1.2 + backlog × 0.01))`, capped at 100 K. `deltaFired` and `backlog` are tracked via in-process atomic counters — no `COUNT(*)` on the alerts table needed. Set to `false` to pin the limits at their floors (handy when you want to demo what happens to AGE/SLA as a backlog grows). |
+| `HG_RESOLVE_LOW_LIMIT` | `2000` | **Floor** for the per-tick low-severity (1-2) resolver, which marks alerts `false_alarm`. Auto-tune can push higher; without auto-tune this is just the limit. |
+| `HG_RESOLVE_HIGH_LIMIT` | `600` | **Floor** for the per-tick high-severity (3+) resolver, which marks alerts `resolved`. High-severity alerts must also have aged at least 20 seconds to be eligible, so they sit in the operator queue long enough to look like real triage work. |
+
+### Reading the simulator heartbeat
+
+Once per second the simulator logs a one-line status report you can use
+to triage ingest behaviour. Tail it with `docker logs hg-simulator`:
+
+```
+heartbeat: queue=12/64 eventID=1483920475 delta=748520 rows/s lastCopy=12ms ago copyFails=0
+```
+
+- **`queue=N/CAP`** — depth of the producer→ingestor channel. Near `CAP` means CedarDB is the bottleneck.
+- **`eventID`** — monotonic atomic counter; each row generated bumps it. Doubles as a precise total-rows figure.
+- **`delta=N rows/s`** — row generation rate computed from the eventID counter.
+- **`lastCopy=Xms ago`** — wall time since the most recent successful `CopyFrom`. Should be under one second when ingest is healthy.
+- **`copyFails=N`** — cumulative `CopyFrom` errors since startup. Non-zero means CedarDB is rejecting; the error text appears on the line above the heartbeat.
+
+Three patterns worth recognising:
+
+- `delta=0 queue=0` → producers stopped. Look for goroutine panics in the simulator log.
+- `delta=N queue=CAP lastCopy growing` → CedarDB stopped accepting writes. Check `hg-cedardb` logs, disk space, and the compactor.
+- `delta=N queue=CAP lastCopy<1s copyFails=0` → healthy steady-state at the write-path cap. To push past it, try larger `HG_INGEST_BATCH` and/or more `HG_INGESTORS`, or move to bigger hardware.
+
+The resolver emits its own status line once every 10 seconds:
+
+```
+resolver: drain low=42/2000 high=3187/3825 · backlog low=0 high=14
+```
+
+`drain low=X/Y` is "Y rows of low-severity LIMITed, X actually resolved" — when X < Y the queue is empty for that tier; when X = Y the resolver is at the cap. `backlog` is the in-process `(fired − resolved)` estimate; if it climbs steadily, auto-tune is falling behind (rare — the controller's 1% backlog decay is normally enough to keep up).
+
+## Scaling up
+
+The defaults in `docker-compose.yml` are sized for a developer laptop
+(~10 cores, 16–32 GB RAM). On real demo hardware — for example a
+192-core x86 box with 384 GB RAM — there's a lot of headroom that the
+laptop config simply can't use. A reasonable starting point on a box
+like that:
+
+```yaml
+simulator:
+ command: ["/app/simulator",
+ "-households=200000",
+ "-devices-per-household=12",
+ "-rate=2000000", # 2 M ev/s ≈ 96 MB/s ≈ 8 TB/day uncompressed
+ "-hz=20",
+ "-writers=64"]
+ environment:
+ HG_INGESTORS: "8" # parallel CopyFroms (requires recent CedarDB)
+ HG_INGEST_BATCH: "50000" # bigger batches amortise the COPY round trip
+ HG_STORAGE_SAMPLER_INTERVAL: "5s"
+ # Alert generation scales with -writers, but the resolver auto-tunes
+ # its LIMITs each tick from the in-process generation rate and
+ # backlog, so no manual sizing is needed when you bump -writers.
+ # The defaults stay fine. If you'd rather see the queue grow (to
+ # demo SLA breach), set HG_RESOLVE_AUTOTUNE=false.
+```
+
+The pool's `MaxConns=64` in `internal/db/db.go` will need to grow if
+`-writers` × `HG_INGESTORS` plus the dashboard's concurrent reads
+exceed it — roughly speaking, set it to `writers + ingestors + 16`.
+
+The heartbeat is your scoreboard while you push the dial up:
+
+- **`delta` rises and `queue` no longer pegs at CAP** as you raise
+ `HG_INGESTORS` → the single ingestor was the bottleneck and CedarDB
+ can absorb more parallel COPYs.
+- **`delta` is flat regardless of `HG_INGESTORS`** → CedarDB is
+ serializing the work internally; the next move is `HG_INGEST_BATCH`
+ or CedarDB-side tuning.
+- **`copyFails > 0`** → CedarDB is rejecting. The error message on the
+ preceding log line tells you why (most likely you've raised
+ `HG_INGESTORS` past what your CedarDB version allows and gone back
+ to the 40P01 territory).
+
+For the dashboard side at higher rates, keep the read cadences honest
+about what the queries cost:
+
+```yaml
+server:
+ environment:
+ HG_SSE_INTERVAL: "200ms" # joins 5 tables, kind > 0 filter
+ HG_ALERTS_REFRESH: "1s" # joins 4 tables on the small alerts table
+ HG_DRILLDOWN_REFRESH: "2s" # three queries per tick; the costliest
+ HG_STATS_REFRESH: "1s" # reads storage_samples only — cheap
+ HG_STORAGE_REFRESH: "1s" # reads storage_samples only — cheap
+```
+
+On a 192-core box these are easily affordable; on a laptop you'll
+want to crank them up to give CedarDB more headroom for the write
+path.
+
+### Which indexes to keep
+
+There's a real tension between write throughput and dashboard read
+latency, but it isn't symmetric across tables:
+
+| Table | Indexes? | Why |
+|---|---|---|
+| `alerts` | **Keep them.** | The table is in the millions, never billions; index maintenance is trivial. *All four read paths* on the dashboard (queue, drilldown via household, resolver inner SELECT) need them. Without an `(status, raised_at DESC)` index the active-alerts query becomes a multi-million-row scan that the resolver fights with every 2 s, and you'll see the panel intermittently render "no active alerts" because the iteration timed out mid-scan. |
+| `events` | **Optional.** Drop if you want to maximise write rate. | One index entry per inserted row at ~30 K/s is real write cost; the dashboard's events queries are all small-LIMIT scans of the recent tail and CedarDB's column store handles them respectably even without the index. The trade-off is that the SSE event-stream panel and the drill-down's per-household scan will be slower at very large table sizes — usually still tolerable. |
+
+If you dropped the alerts indexes during a write-throughput experiment,
+put them back before treating the dashboard as canonical:
+
+```sql
+CREATE INDEX alerts_status_raised_idx ON alerts (status, raised_at DESC);
+CREATE INDEX alerts_household_raised_idx ON alerts (household_id, raised_at DESC);
+```
+
+### When a panel intermittently shows empty
+
+The dashboard handlers now log `rows.Err()` after each iteration, so a
+mid-scan context cancellation no longer looks identical to an empty
+result. If you see "no active alerts" in the panel, check the server
+container's logs:
+
+```
+docker logs hg-server 2>&1 | grep "rows.Err"
+```
+
+A non-empty stream of `context canceled` or `deadline exceeded` lines
+means the underlying SQL is taking long enough that requests are
+aborting before it returns. The fix is almost always either to add the
+missing index for that query or to lower the polling frequency.
+
diff --git a/homeguard-iot/build.sh b/homeguard-iot/build.sh
new file mode 100755
index 0000000..7e63092
--- /dev/null
+++ b/homeguard-iot/build.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+CGO_ENABLED=0 go build -o ./out/server ./cmd/server
+CGO_ENABLED=0 go build -o ./out/simulator ./cmd/simulator
+
diff --git a/homeguard-iot/clean.sh b/homeguard-iot/clean.sh
new file mode 100755
index 0000000..dd0277e
--- /dev/null
+++ b/homeguard-iot/clean.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+rm -f ./out/server
+rm -f ./out/simulator
+
diff --git a/homeguard-iot/cmd/server/main.go b/homeguard-iot/cmd/server/main.go
new file mode 100644
index 0000000..de2a764
--- /dev/null
+++ b/homeguard-iot/cmd/server/main.go
@@ -0,0 +1,52 @@
+// Command server runs the HomeGuard IoT operator dashboard.
+package main
+
+import (
+ "context"
+ "flag"
+ "log"
+ "net/http"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/cedardb-demo/homeguard-iot/internal/db"
+ "github.com/cedardb-demo/homeguard-iot/internal/web"
+)
+
+func main() {
+ addr := flag.String("addr", ":8080", "http listen address")
+ flag.Parse()
+
+ ctx, cancel := signal.NotifyContext(context.Background(),
+ syscall.SIGINT, syscall.SIGTERM)
+ defer cancel()
+
+ pool, err := db.Connect(ctx)
+ if err != nil {
+ log.Fatalf("db: %v", err)
+ }
+ defer pool.Close()
+
+ srv, err := web.NewServer(pool)
+ if err != nil {
+ log.Fatalf("server: %v", err)
+ }
+
+ httpSrv := &http.Server{
+ Addr: *addr,
+ Handler: srv.Routes(),
+ ReadHeaderTimeout: 5 * time.Second,
+ }
+ go func() {
+ <-ctx.Done()
+ shutdown, c := context.WithTimeout(context.Background(), 5*time.Second)
+ defer c()
+ _ = httpSrv.Shutdown(shutdown)
+ }()
+
+ log.Printf("dashboard listening on %s", *addr)
+ if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ log.Fatalf("listen: %v", err)
+ }
+}
diff --git a/homeguard-iot/cmd/simulator/main.go b/homeguard-iot/cmd/simulator/main.go
new file mode 100644
index 0000000..ff3571b
--- /dev/null
+++ b/homeguard-iot/cmd/simulator/main.go
@@ -0,0 +1,70 @@
+// Command simulator generates the IoT event stream and writes it to
+// CedarDB. Run it once per demo session; run cmd/server in parallel for
+// the dashboard.
+package main
+
+import (
+ "context"
+ "flag"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/cedardb-demo/homeguard-iot/internal/db"
+ "github.com/cedardb-demo/homeguard-iot/internal/sim"
+)
+
+func main() {
+ householdCount := flag.Int("households", 30000, "synthesized household fleet size")
+ devicesPerHH := flag.Int("devices-per-household", 10, "avg devices per household")
+ tickHz := flag.Int("hz", 10, "simulator tick rate (events flush each tick)")
+ rate := flag.Int("rate", 2500, "sustained events/sec target (heartbeats + triggered)")
+ writers := flag.Int("writers", 8,
+ "number of parallel writer goroutines; each owns a slice of the device fleet "+
+ "and its own pgx conn. Bump alongside -rate to scale ingest toward 3 TB/day.")
+ resetSchema := flag.Bool("reset-schema", false,
+ "drop & recreate all tables on startup — destroys all data")
+ flag.Parse()
+
+ ctx, cancel := signal.NotifyContext(context.Background(),
+ os.Interrupt, syscall.SIGTERM)
+ defer cancel()
+
+ pool, err := db.Connect(ctx)
+ if err != nil {
+ log.Fatalf("db: %v", err)
+ }
+ defer pool.Close()
+ log.Printf("connected to CedarDB")
+
+ if *resetSchema {
+ log.Printf("-reset-schema set: wiping all data and re-creating tables")
+ if err := db.ResetSchema(ctx, pool); err != nil {
+ log.Fatalf("reset schema: %v", err)
+ }
+ } else {
+ present, err := db.SchemaPresent(ctx, pool)
+ switch {
+ case err != nil:
+ log.Printf("schema-presence probe failed (continuing anyway): %v", err)
+ case present:
+ log.Printf("schema-presence probe: households table already present")
+ default:
+ log.Printf("schema-presence probe: households table missing")
+ if err := db.ApplySchema(ctx, pool); err != nil {
+ log.Fatalf("apply schema: %v", err)
+ }
+ }
+ }
+
+ s, err := sim.New(ctx, pool, *householdCount, *devicesPerHH, *tickHz, *rate, *writers)
+ if err != nil {
+ log.Fatalf("simulator setup: %v", err)
+ }
+
+ if err := s.Run(ctx); err != nil && ctx.Err() == nil {
+ log.Fatalf("simulator run: %v", err)
+ }
+ log.Printf("simulator shut down cleanly")
+}
diff --git a/homeguard-iot/db_error.txt b/homeguard-iot/db_error.txt
new file mode 100644
index 0000000..ee2a2a2
--- /dev/null
+++ b/homeguard-iot/db_error.txt
@@ -0,0 +1,25 @@
+hg-cedardb | 2026-05-19 19:58:46.272801386 UTC DEBUG1: connection 1084378400 terminated
+hg-simulator | 2026/05/19 19:58:48 simulator running: tickHz=10 target=2500 ev/s per-tick=250 fleet=285028 devices
+hg-simulator | 2026/05/19 19:58:49 len(rows): 254
+hg-cedardb | 2026-05-19 19:58:49.038975971 UTC ERROR: unable to cast from void to bigint
+hg-cedardb | Input data does not match the expected type or input format. Docs: https://cedardb.com/docs/references/datatypes/
+hg-cedardb | 2026-05-19 19:58:49.039580971 UTC ERROR: invalid message in simple query mode
+hg-cedardb | 2026-05-19 19:58:49.039624013 UTC ERROR: invalid message in simple query mode
+hg-simulator | 2026/05/19 19:58:49 copy events: ERROR: unable to cast from void to bigint (SQLSTATE 42804)
+hg-simulator | 2026/05/19 19:58:49 insert alert: ERROR: invalid message in simple query mode (SQLSTATE 08P01)
+hg-simulator | 2026/05/19 19:58:49 insert alert: ERROR: invalid message in simple query mode (SQLSTATE 08P01)
+hg-simulator | 2026/05/19 19:58:49 len(rows): 256
+hg-cedardb | 2026-05-19 19:58:49.149095304 UTC ERROR: unable to cast from void to bigint
+hg-cedardb | Input data does not match the expected type or input format. Docs: https://cedardb.com/docs/references/datatypes/
+hg-simulator | panic: runtime error: index out of range [0] with length 0
+hg-simulator |
+hg-simulator | goroutine 441 [running]:
+hg-simulator | github.com/jackc/pgx/v5.(*copyFrom).buildCopyBuf(0x40002030e0, {0x400009c400?, 0x0?, 0x0?}, 0x400243e280)
+hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:235 +0x2f4
+hg-simulator | github.com/jackc/pgx/v5.(*copyFrom).run.func1()
+hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:177 +0x1c4
+hg-simulator | created by github.com/jackc/pgx/v5.(*copyFrom).run in goroutine 1
+hg-simulator | /go/pkg/mod/github.com/jackc/pgx/v5@v5.6.0/copy_from.go:164 +0x3c0
+hg-cedardb | 2026-05-19 19:58:49.158704471 UTC LOG: recv: Connection reset by peer
+hg-cedardb | 2026-05-19 19:58:49.158726721 UTC ERROR: unable to read from client
+
diff --git a/homeguard-iot/docker-compose.yml b/homeguard-iot/docker-compose.yml
new file mode 100644
index 0000000..9763aa2
--- /dev/null
+++ b/homeguard-iot/docker-compose.yml
@@ -0,0 +1,106 @@
+# Brings up CedarDB alongside the Go simulator and operator dashboard.
+#
+# IMPORTANT: the CedarDB image name and env vars below mirror the Postgres
+# image conventions. If your CedarDB image differs, adjust `image:` and the
+# DATABASE_URL accordingly — the rest of the demo is pure Postgres wire
+# protocol and should not need to change.
+
+services:
+ cedardb:
+ image: cedardb/cedardb:latest
+ container_name: hg-cedardb
+ ports:
+ - "5432:5432"
+ environment:
+ CEDAR_PASSWORD: postgres
+ VERBOSITY: DEBUG1
+ LICENSE_KEY:
+ # Schema bootstrap is handled by the simulator (it embeds schema.sql via
+ # //go:embed and applies it on first run), so we do not mount any init
+ # scripts here.
+ volumes:
+ - cedar-data:/var/lib/cedardb
+ healthcheck:
+ test: ["CMD-SHELL", "pg_isready -U postgres -d homeguard || exit 1"]
+ interval: 3s
+ timeout: 2s
+ retries: 30
+
+ simulator:
+ build: .
+ container_name: hg-simulator
+ #command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=1750000", "-hz=20", "-writers=32"]
+ #command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=500000", "-hz=20", "-writers=16"]
+ command: ["/app/simulator", "-households=100000", "-devices-per-household=10", "-rate=600000", "-hz=20", "-writers=16"]
+ #command: ["/app/simulator", "-households=30000", "-devices-per-household=10", "-rate=2500", "-hz=10", "-writers=4"]
+
+ depends_on:
+ cedardb:
+ condition: service_healthy
+ environment:
+ DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require"
+
+ # How often the storage sampler writes a row into storage_samples.
+ # Lower values → fresher gauge, more INSERTs into storage_samples.
+ # Value is any Go duration string (e.g. "5s", "30s", "1m").
+ HG_STORAGE_SAMPLER_INTERVAL: "5s"
+
+ # How many rows the ingestor accumulates before firing one
+ # pgx.CopyFrom. Bigger → better per-COPY amortization and
+ # (sometimes) higher throughput; smaller → lower end-to-end
+ # latency on the live event stream. Watch the heartbeat
+ # `lastCopy` and `delta` numbers when tuning.
+ HG_INGEST_BATCH: "25000"
+
+ # How many ingestor goroutines run CopyFrom in parallel. Default
+ # 1 is safe everywhere. Older CedarDB rejected overlapping COPYs
+ # with SQLSTATE 40P01; newer versions accept concurrent COPYs,
+ # in which case cranking this up (try 2, 4, 8) may give a
+ # meaningful throughput bump. If the heartbeat's queue=64/64
+ # ceiling disappears as you raise this, the previous bottleneck
+ # was the single ingestor (and CedarDB can take more); if queue
+ # stays pegged, CedarDB is serializing the work internally and
+ # adding ingestors won't help.
+ HG_INGESTORS: "4"
+
+ # Alert resolver. The background resolveAlertsLoop picks up the
+ # oldest active alerts every HG_RESOLVE_INTERVAL and marks them
+ # resolved/false_alarm.
+ #
+ # By default the resolver auto-tunes its per-tick LIMITs from
+ # the in-process alert generation rate and backlog (no DB-side
+ # COUNT(*) required), so the queue stays bounded as -writers
+ # and -rate scale. The two LIMIT vars are now FLOORS — auto-tune
+ # can push higher but never below them. Set HG_RESOLVE_AUTOTUNE
+ # to false to disable auto-tune (useful if you want to demo what
+ # happens to AGE/SLA when the queue grows on purpose).
+ # HG_RESOLVE_INTERVAL: "2s"
+ # HG_RESOLVE_LOW_LIMIT: "2000" # floor for severity 1-2 → false_alarm
+ # HG_RESOLVE_HIGH_LIMIT: "600" # floor for severity 3+ → resolved
+ # HG_RESOLVE_AUTOTUNE: "true" # auto-grow LIMITs to match gen+backlog
+ restart: on-failure
+
+ server:
+ build: .
+ container_name: hg-server
+ command: ["/app/server", "-addr=:8080"]
+ depends_on:
+ cedardb:
+ condition: service_healthy
+ environment:
+ DATABASE_URL: "postgresql://postgres:postgres@cedardb:5432/postgres?sslmode=require"
+ # Dashboard polling cadences — all Go duration strings. Crank up
+ # when you want to ease the read load CedarDB has to serve next to
+ # the simulator's writes.
+ HG_SSE_INTERVAL: "1s" # live event stream push rate
+ HG_ALERTS_REFRESH: "5s" # active-alerts queue panel
+ HG_DRILLDOWN_REFRESH: "5s" # customer drill-down panel
+ HG_STATS_REFRESH: "1s" # footer ingest counters
+ HG_STORAGE_REFRESH: "1s" # storage growth gauge
+ ports:
+ - "8080:8080"
+ restart: on-failure
+
+volumes:
+ cedar-data:
+
diff --git a/homeguard-iot/docker_compose_run.sh b/homeguard-iot/docker_compose_run.sh
new file mode 100755
index 0000000..9ed1daa
--- /dev/null
+++ b/homeguard-iot/docker_compose_run.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+docker compose up --build
+
diff --git a/homeguard-iot/docker_rm_images.sh b/homeguard-iot/docker_rm_images.sh
new file mode 100755
index 0000000..84acf4a
--- /dev/null
+++ b/homeguard-iot/docker_rm_images.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+docker image rm homeguard-iot-server:latest homeguard-iot-simulator:latest
+
diff --git a/homeguard-iot/docker_rm_volume.sh b/homeguard-iot/docker_rm_volume.sh
new file mode 100755
index 0000000..f045902
--- /dev/null
+++ b/homeguard-iot/docker_rm_volume.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+docker compose down -v
+
diff --git a/homeguard-iot/go.mod b/homeguard-iot/go.mod
new file mode 100644
index 0000000..e29acf4
--- /dev/null
+++ b/homeguard-iot/go.mod
@@ -0,0 +1,14 @@
+module github.com/cedardb-demo/homeguard-iot
+
+go 1.22
+
+require github.com/jackc/pgx/v5 v5.6.0
+
+require (
+ github.com/jackc/pgpassfile v1.0.0 // indirect
+ github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
+ github.com/jackc/puddle/v2 v2.2.1 // indirect
+ golang.org/x/crypto v0.27.0 // indirect
+ golang.org/x/sync v0.8.0 // indirect
+ golang.org/x/text v0.18.0 // indirect
+)
diff --git a/homeguard-iot/go.sum b/homeguard-iot/go.sum
new file mode 100644
index 0000000..b9bb4b2
--- /dev/null
+++ b/homeguard-iot/go.sum
@@ -0,0 +1,28 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
+github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
+github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
+github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
+github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
+github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
+golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
+golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
+golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
+golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/homeguard-iot/internal/db/db.go b/homeguard-iot/internal/db/db.go
new file mode 100644
index 0000000..00dbf99
--- /dev/null
+++ b/homeguard-iot/internal/db/db.go
@@ -0,0 +1,62 @@
+// Package db is a thin wrapper around the pgx connection pool to CedarDB.
+//
+// CedarDB speaks the Postgres wire protocol, so pgx works unmodified — only
+// the DSN is supplied differently. The pool is sized generously so the
+// simulator's writer goroutines (each holding a conn for the duration of
+// a CopyFrom) don't compete with the dashboard's concurrent reads or with
+// the alert resolution / armed-state shuffle background jobs.
+package db
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+)
+
+// Connect dials CedarDB using DATABASE_URL from the environment.
+//
+// postgres://USER:PASS@HOST:5432/DB?sslmode=disable
+func Connect(ctx context.Context) (*pgxpool.Pool, error) {
+ dsn := os.Getenv("DATABASE_URL")
+ if dsn == "" {
+ return nil, fmt.Errorf("DATABASE_URL is not set")
+ }
+
+ cfg, err := pgxpool.ParseConfig(dsn)
+ if err != nil {
+ return nil, fmt.Errorf("parse DATABASE_URL: %w", err)
+ }
+ // Sized for the high-rate demo: up to ~16 simulator writer goroutines
+ // each holding a long-lived pgx conn during CopyFrom, plus the alert
+ // resolver, the armed-state shuffler, the inline alert inserters, and
+ // the dashboard's concurrent read paths.
+ cfg.MaxConns = 64
+ cfg.MinConns = 4
+ cfg.MaxConnLifetime = 30 * time.Minute
+
+ // CedarDB doesn't currently support the Postgres extended query protocol
+ // (Parse / Bind / Execute), and signals SQLSTATE 08P01
+ // "invalid message in simple query mode" when pgx tries to use it.
+ // Forcing simple-protocol mode makes pgx inline parameters as text
+ // literals and send each query as a single `Q` message — the only
+ // protocol path CedarDB accepts. COPY is unaffected (it uses its own
+ // dedicated protocol regardless of this setting).
+ cfg.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+
+ pool, err := pgxpool.NewWithConfig(ctx, cfg)
+ if err != nil {
+ return nil, fmt.Errorf("dial CedarDB: %w", err)
+ }
+
+ pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+ if err := pool.Ping(pingCtx); err != nil {
+ pool.Close()
+ return nil, fmt.Errorf("ping CedarDB: %w", err)
+ }
+ return pool, nil
+}
diff --git a/homeguard-iot/internal/db/schema.go b/homeguard-iot/internal/db/schema.go
new file mode 100644
index 0000000..d060c10
--- /dev/null
+++ b/homeguard-iot/internal/db/schema.go
@@ -0,0 +1,113 @@
+package db
+
+import (
+ "context"
+ _ "embed"
+ "fmt"
+ "log"
+ "strings"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+)
+
+// SchemaSQL is the canonical schema, embedded at build time. Edit
+// schema.sql and `go build` re-bakes it.
+//
+//go:embed schema.sql
+var SchemaSQL string
+
+// SchemaPresent reports whether the `households` table exists. Used for a
+// diagnostic log line on startup — we always call ApplySchema anyway since
+// the file is idempotent (CREATE TABLE IF NOT EXISTS).
+func SchemaPresent(ctx context.Context, pool *pgxpool.Pool) (bool, error) {
+ var present bool
+ if err := pool.QueryRow(ctx, `
+ SELECT (SELECT COUNT(*) FROM information_schema.tables
+ WHERE table_name = 'households') = 1
+ `).Scan(&present); err != nil {
+ return false, fmt.Errorf("schema presence check: %w", err)
+ }
+ return present, nil
+}
+
+// ApplySchema runs the embedded schema.sql one statement at a time using
+// the Postgres simple-query protocol (pgx.QueryExecModeSimpleProtocol).
+// CedarDB doesn't accept DDL through the extended Parse/Bind/Execute path
+// the same way vanilla Postgres does, so prepared-statement mode silently
+// fails to apply CREATE TABLE; simple-protocol bypasses that.
+func ApplySchema(ctx context.Context, pool *pgxpool.Pool) error {
+ statements := splitSQLStatements(SchemaSQL)
+ log.Printf("applying schema: %d statements", len(statements))
+ for i, stmt := range statements {
+ if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil {
+ return fmt.Errorf(
+ "apply schema (statement %d of %d failed): %w\n--- failing statement ---\n%s\n",
+ i+1, len(statements), err, stmt,
+ )
+ }
+ log.Printf(" [%2d/%d] %s — ok", i+1, len(statements), firstLine(stmt))
+ }
+ log.Printf("schema applied")
+ return nil
+}
+
+// ResetSchema drops everything and re-applies — destructive. Wired behind
+// the simulator's -reset-schema flag.
+func ResetSchema(ctx context.Context, pool *pgxpool.Pool) error {
+ drops := []string{
+ "DROP TABLE IF EXISTS storage_samples CASCADE",
+ "DROP TABLE IF EXISTS alerts CASCADE",
+ "DROP TABLE IF EXISTS events CASCADE",
+ "DROP TABLE IF EXISTS devices CASCADE",
+ "DROP TABLE IF EXISTS households CASCADE",
+ "DROP TABLE IF EXISTS device_types CASCADE",
+ "DROP TABLE IF EXISTS regions CASCADE",
+ "DROP TABLE IF EXISTS plans CASCADE",
+ }
+ log.Printf("resetting schema: dropping %d tables", len(drops))
+ for i, stmt := range drops {
+ if _, err := pool.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil {
+ return fmt.Errorf("drop %d: %w", i, err)
+ }
+ log.Printf(" [%d/%d] %s — ok", i+1, len(drops), stmt)
+ }
+ return ApplySchema(ctx, pool)
+}
+
+// splitSQLStatements strips `--` line comments and splits the script on `;`.
+// Sufficient for our DDL (no string literals or function bodies with
+// embedded semicolons).
+func splitSQLStatements(sql string) []string {
+ var clean strings.Builder
+ clean.Grow(len(sql))
+ for _, line := range strings.Split(sql, "\n") {
+ if idx := strings.Index(line, "--"); idx >= 0 {
+ line = line[:idx]
+ }
+ clean.WriteString(line)
+ clean.WriteByte('\n')
+ }
+ out := make([]string, 0, 16)
+ for _, raw := range strings.Split(clean.String(), ";") {
+ stmt := strings.TrimSpace(raw)
+ if stmt != "" {
+ out = append(out, stmt)
+ }
+ }
+ return out
+}
+
+func firstLine(stmt string) string {
+ for _, line := range strings.Split(stmt, "\n") {
+ line = strings.TrimSpace(line)
+ if line == "" {
+ continue
+ }
+ if len(line) > 70 {
+ return line[:67] + "..."
+ }
+ return line
+ }
+ return "(empty)"
+}
diff --git a/homeguard-iot/internal/db/schema.sql b/homeguard-iot/internal/db/schema.sql
new file mode 100644
index 0000000..88cfece
--- /dev/null
+++ b/homeguard-iot/internal/db/schema.sql
@@ -0,0 +1,106 @@
+-- HomeGuard IoT demo schema for CedarDB (Postgres dialect).
+--
+-- The story: a home-security operator running ~3-4 TB/day of incoming IoT
+-- events from millions of devices. Currently they pipe everything into
+-- BigQuery for offline analytics and run a parallel real-time stack
+-- (Pub/Sub + Dataflow + a KV store) for the monitoring center.
+--
+-- The CedarDB pitch is "collapse those two stacks: same table, OLTP-style
+-- writes and OLAP-style reads, no replication lag, real joins across the
+-- normalized dimension model."
+--
+-- Tables:
+-- plans - subscription tiers; SLA in seconds per tier
+-- regions - service regions / dispatch centers
+-- device_types - sensor catalog (motion, door, smoke, etc.)
+-- households -- customers; FK to plan + region
+-- devices -- sensors; FK to household + device_type
+-- events - HOT TABLE; up to ~500K rows/sec in the high-rate demo
+-- alerts - rules-derived alerts; smaller volume; the operator queue
+--
+-- This file is idempotent (CREATE TABLE IF NOT EXISTS on everything) so the
+-- simulator can safely apply it on every cold start. For destructive reset
+-- pass -reset-schema to the simulator.
+
+CREATE TABLE IF NOT EXISTS plans (
+ plan_id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ monthly_price_usd NUMERIC(8, 2) NOT NULL,
+ sla_seconds INTEGER NOT NULL -- monitoring response SLA
+);
+
+CREATE TABLE IF NOT EXISTS regions (
+ region_id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ dispatch_center TEXT NOT NULL,
+ timezone TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS device_types (
+ device_type_id INTEGER PRIMARY KEY,
+ code TEXT NOT NULL, -- MOTION, DOOR, WINDOW, GLASS_BREAK, SMOKE, CO, WATER, TEMP, DOORBELL, KEYPAD
+ name TEXT NOT NULL,
+ default_severity SMALLINT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS households (
+ household_id BIGINT PRIMARY KEY,
+ plan_id INTEGER NOT NULL,
+ region_id INTEGER NOT NULL,
+ address_hash TEXT NOT NULL, -- hashed for privacy
+ enrolled_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ armed BOOLEAN NOT NULL DEFAULT false -- current armed state
+);
+
+CREATE TABLE IF NOT EXISTS devices (
+ device_id BIGINT PRIMARY KEY,
+ household_id BIGINT NOT NULL,
+ device_type_id INTEGER NOT NULL,
+ location TEXT NOT NULL, -- 'front_door', 'kitchen', 'master_bedroom', ...
+ installed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ last_battery_pct SMALLINT
+);
+
+-- Hot table. ~500K rows/sec in the high-rate demo. Billions of rows over
+-- time. event_id is plain BIGINT (not BIGSERIAL) — the simulator generates
+-- ids from a client-side atomic counter so it can use the binary COPY path,
+-- which CedarDB rejects for BIGSERIAL defaults ("unable to cast from void
+-- to bigint"). Multiple writer goroutines share the same id space via the
+-- atomic counter.
+CREATE TABLE IF NOT EXISTS events (
+ event_id BIGINT NOT NULL,
+ device_id BIGINT NOT NULL,
+ household_id BIGINT NOT NULL, -- denormalized to skip a join on the hot path
+ ts TIMESTAMPTZ NOT NULL DEFAULT now(),
+ kind SMALLINT NOT NULL, -- 0=heartbeat, 1=triggered, 2=battery_low, 3=offline, 4=tamper
+ severity SMALLINT NOT NULL, -- 0=normal, 1..5 escalating
+ value DOUBLE PRECISION, -- sensor reading (temp °C, motion confidence, etc.)
+ battery_pct SMALLINT,
+ rssi_dbm SMALLINT -- wireless signal strength
+);
+
+CREATE TABLE IF NOT EXISTS alerts (
+ alert_id BIGSERIAL PRIMARY KEY,
+ household_id BIGINT NOT NULL,
+ triggered_event_id BIGINT NOT NULL,
+ raised_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ severity SMALLINT NOT NULL,
+ status TEXT NOT NULL, -- 'active', 'dispatched', 'resolved', 'false_alarm'
+ detail TEXT,
+ resolved_at TIMESTAMPTZ,
+ resolution_ms INTEGER
+);
+
+-- Periodic storage-growth samples: one row per ~5s, sourced from
+-- CedarDB's cedardb_compression_info system view. The dashboard's
+-- /api/storage endpoint derives 1m/5m/15m ingest rates from this table.
+-- sampled_at uses now()'s microsecond precision so it is unique at the
+-- sampler's 5-second cadence.
+CREATE TABLE IF NOT EXISTS storage_samples (
+ sampled_at TIMESTAMPTZ PRIMARY KEY,
+ uncompressed_bytes BIGINT NOT NULL
+);
+
+CREATE INDEX alerts_status_raised_idx ON alerts (status, raised_at DESC);
+CREATE INDEX alerts_household_raised_idx ON alerts (household_id, raised_at DESC);
+
diff --git a/homeguard-iot/internal/sim/catalog.go b/homeguard-iot/internal/sim/catalog.go
new file mode 100644
index 0000000..6881879
--- /dev/null
+++ b/homeguard-iot/internal/sim/catalog.go
@@ -0,0 +1,75 @@
+package sim
+
+// Static catalog data: plans, regions, device types. These get upserted on
+// simulator startup and rarely change. Keeping them in code (vs a seed file)
+// means the demo is self-contained and reproducible — no external CSVs to
+// keep in sync.
+
+// Plan describes one subscription tier with its monitoring SLA.
+type Plan struct {
+ ID int
+ Name string
+ PriceUSD float64
+ SLASeconds int
+}
+
+var Plans = []Plan{
+ {ID: 1, Name: "Basic", PriceUSD: 19.99, SLASeconds: 300}, // 5 min
+ {ID: 2, Name: "Plus", PriceUSD: 39.99, SLASeconds: 120}, // 2 min
+ {ID: 3, Name: "Premium", PriceUSD: 59.99, SLASeconds: 60}, // 1 min
+ {ID: 4, Name: "Concierge", PriceUSD: 99.99, SLASeconds: 30}, // 30 s
+}
+
+// Region groups households for dispatch + timezone purposes.
+type Region struct {
+ ID int
+ Name string
+ DispatchCenter string
+ Timezone string
+}
+
+var Regions = []Region{
+ {1, "Northeast", "Boston DC", "America/New_York"},
+ {2, "Mid-Atlantic", "Newark DC", "America/New_York"},
+ {3, "Southeast", "Atlanta DC", "America/New_York"},
+ {4, "Midwest", "Chicago DC", "America/Chicago"},
+ {5, "South-Central", "Dallas DC", "America/Chicago"},
+ {6, "Mountain", "Denver DC", "America/Denver"},
+ {7, "Pacific", "Phoenix DC", "America/Phoenix"},
+ {8, "Pacific-NW", "Seattle DC", "America/Los_Angeles"},
+ {9, "California", "San Jose DC", "America/Los_Angeles"},
+ {10, "Canada-East", "Toronto DC", "America/Toronto"},
+}
+
+// DeviceType is the sensor catalog. `DefaultSeverity` is the baseline
+// alert severity if a device of this type fires a triggered event with the
+// household armed — alert rules can promote or demote per-event.
+type DeviceType struct {
+ ID int
+ Code string
+ Name string
+ DefaultSeverity int
+}
+
+var DeviceTypes = []DeviceType{
+ {ID: 1, Code: "MOTION", Name: "Motion sensor", DefaultSeverity: 3},
+ {ID: 2, Code: "DOOR", Name: "Door contact", DefaultSeverity: 4},
+ {ID: 3, Code: "WINDOW", Name: "Window contact", DefaultSeverity: 4},
+ {ID: 4, Code: "GLASS_BREAK", Name: "Glass-break detector", DefaultSeverity: 4},
+ {ID: 5, Code: "SMOKE", Name: "Smoke detector", DefaultSeverity: 5},
+ {ID: 6, Code: "CO", Name: "Carbon monoxide detector", DefaultSeverity: 5},
+ {ID: 7, Code: "WATER", Name: "Water leak sensor", DefaultSeverity: 3},
+ {ID: 8, Code: "TEMP", Name: "Temperature sensor", DefaultSeverity: 1},
+ {ID: 9, Code: "DOORBELL", Name: "Smart doorbell", DefaultSeverity: 1},
+ {ID: 10, Code: "KEYPAD", Name: "Entry keypad", DefaultSeverity: 1},
+}
+
+// Locations is the pool of in-home placements used when allocating devices.
+// Not every device type lives in every location, but the simulator picks
+// per-type plausibly (see allocateDevices in simulator.go).
+var Locations = []string{
+ "front_door", "back_door", "garage_door", "side_door",
+ "living_room", "kitchen", "dining_room",
+ "master_bedroom", "bedroom_2", "bedroom_3",
+ "basement", "attic", "utility_room", "hallway",
+}
diff --git a/homeguard-iot/internal/sim/rules.go b/homeguard-iot/internal/sim/rules.go
new file mode 100644
index 0000000..cfe6e85
--- /dev/null
+++ b/homeguard-iot/internal/sim/rules.go
@@ -0,0 +1,61 @@
+package sim
+
+// Alert rules: deterministically promote certain triggered events into
+// rows in the alerts table. The mapping is intentionally simple so the
+// query that joins alerts × households × plans × device_types tells a
+// readable story on the dashboard.
+//
+// kind values mirror the SMALLINT column in events:
+//
+// 0 = heartbeat (never alerts)
+// 1 = triggered (the alerter; consult device_type + armed state)
+// 2 = battery_low (info-level alert if severity >= 4 plan SLA)
+// 3 = offline (skipped for now; could become an alert if device
+// was previously online for a long time)
+// 4 = tamper (always alerts at severity 4)
+
+// AlertDecision says whether to raise an alert for a given event, and at
+// what severity. nil = don't raise.
+type AlertDecision struct {
+ Severity int
+ Detail string
+}
+
+// EvaluateAlert applies the demo's alert rules to a candidate event.
+// Inputs are the device type code (e.g. "SMOKE"), event kind, and whether
+// the household was armed at the moment the event fired.
+func EvaluateAlert(deviceCode string, kind int, armed bool) *AlertDecision {
+ switch kind {
+ case 4: // tamper — always alert, anywhere
+ return &AlertDecision{Severity: 4, Detail: "Device tamper detected"}
+ case 1: // triggered — depends on device type and armed state
+ switch deviceCode {
+ case "SMOKE":
+ return &AlertDecision{Severity: 5, Detail: "Smoke detected"}
+ case "CO":
+ return &AlertDecision{Severity: 5, Detail: "Carbon monoxide detected"}
+ case "GLASS_BREAK":
+ return &AlertDecision{Severity: 4, Detail: "Glass break detected"}
+ case "WATER":
+ return &AlertDecision{Severity: 3, Detail: "Water leak detected"}
+ case "DOOR", "WINDOW":
+ if armed {
+ return &AlertDecision{Severity: 4, Detail: deviceCode + " opened while armed"}
+ }
+ // Door/window events with the system disarmed are routine.
+ return nil
+ case "MOTION":
+ if armed {
+ return &AlertDecision{Severity: 3, Detail: "Motion detected while armed"}
+ }
+ return nil
+ case "DOORBELL":
+ // Informational only.
+ return nil
+ default:
+ return nil
+ }
+ default:
+ return nil
+ }
+}
diff --git a/homeguard-iot/internal/sim/simulator.go b/homeguard-iot/internal/sim/simulator.go
new file mode 100644
index 0000000..927c460
--- /dev/null
+++ b/homeguard-iot/internal/sim/simulator.go
@@ -0,0 +1,926 @@
+// Package sim generates the IoT telemetry stream and writes it to CedarDB.
+//
+// On startup the simulator upserts the dimension data (plans, regions,
+// device_types) and synthesizes a fleet of households with ~10 devices
+// each. At runtime the work splits into two layers:
+//
+// 1. N row-producer goroutines ("writers"). Each owns a slice of the
+// device fleet, ticks at TickHz, and produces its share of the per-
+// tick budget: heartbeats rotated round-robin over its partition,
+// a handful of triggered events, and very occasional battery_low /
+// tamper / offline events. Producers do NOT talk to the DB on the
+// hot path; they push completed row batches onto eventCh.
+//
+// 2. One ingestor goroutine. Drains eventCh, coalesces batches up to
+// ingestBatch rows (or every flushInterval, whichever comes first),
+// and writes them via a single pgx.CopyFrom. Because there is only
+// one CopyFrom in flight at a time, CedarDB never trips the
+// "cannot start bulk operation until previous bulk operation has
+// become globally visible" (SQLSTATE 40P01) error that fires when
+// two writer goroutines try to COPY in parallel.
+//
+// All producers share an atomic event_id counter so they don't collide
+// on the BIGINT primary key. (event_id is plain BIGINT rather than
+// BIGSERIAL — CedarDB rejects the COPY frame when the column has a
+// sequence default; client-side ids dodge the problem entirely.)
+//
+// Triggered events are run through the alert rules (rules.go) and matching
+// alerts are inserted into the alerts table. Background goroutines resolve
+// old active alerts and shuffle households' armed state so the dashboard
+// stays dynamic.
+package sim
+
+import (
+ "context"
+ "crypto/sha1"
+ "encoding/hex"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+)
+
+// envDuration reads a Go duration string (e.g. "5s", "200ms", "1m30s")
+// from the named environment variable, falling back to the supplied
+// default if the variable is unset or unparseable. Used to externalise
+// polling intervals so they can be tuned from docker-compose without a
+// rebuild.
+func envDuration(name string, fallback time.Duration) time.Duration {
+ v := os.Getenv(name)
+ if v == "" {
+ return fallback
+ }
+ d, err := time.ParseDuration(v)
+ if err != nil {
+ log.Printf("envDuration: invalid %s=%q (%v); using default %s", name, v, err, fallback)
+ return fallback
+ }
+ return d
+}
+
+// envInt reads a positive integer from the named environment variable,
+// falling back to the default if unset, unparseable, or <= 0.
+func envInt(name string, fallback int) int {
+ v := os.Getenv(name)
+ if v == "" {
+ return fallback
+ }
+ n, err := strconv.Atoi(v)
+ if err != nil || n <= 0 {
+ log.Printf("envInt: invalid %s=%q; using default %d", name, v, fallback)
+ return fallback
+ }
+ return n
+}
+
+// envBool reads a boolean (strconv.ParseBool grammar: 1/t/T/TRUE/true/True
+// or 0/f/F/FALSE/false/False) from the named environment variable,
+// falling back to the supplied default if unset or unparseable.
+func envBool(name string, fallback bool) bool {
+ v := os.Getenv(name)
+ if v == "" {
+ return fallback
+ }
+ b, err := strconv.ParseBool(v)
+ if err != nil {
+ log.Printf("envBool: invalid %s=%q; using default %v", name, v, fallback)
+ return fallback
+ }
+ return b
+}
+
+// Tuning constants for the fan-in COPY drainer.
+const (
+ // ingestBatch is the row count at which the ingestor flushes a COPY.
+ // Larger = better amortization of CopyFrom overhead; smaller = lower
+ // latency and tighter alert-to-event ordering.
+ ingestBatch = 10000
+
+ // flushInterval caps how long pending rows can sit before a partial
+ // COPY is forced. Keeps the dashboard's live event stream fresh even
+ // when -rate is well below ingestBatch/flushInterval.
+ flushInterval = 50 * time.Millisecond
+
+ // eventChCapacity holds completed row batches awaiting ingest. Sized
+ // for ~2 batches per writer at the highest -writers we'd expect, so
+ // producers rarely block on send.
+ eventChCapacity = 64
+
+ // Auto-tune knobs for the alert resolver. The per-tick LIMIT is
+ // max(env_floor, deltaFired * autoTuneHeadroom + backlog * backlogDecay)
+ // then capped at maxAutoLimit.
+ autoTuneHeadroom = 1.2 // drain 20% faster than current generation
+ backlogDecay = 0.01 // chip 1% off the standing backlog per tick
+ maxAutoLimit = 100000 // ceiling per tick so a runaway UPDATE can't bury CedarDB
+)
+
+// Device is one physical sensor placed in a household.
+type Device struct {
+ ID int64
+ Household int64
+ TypeID int
+ Code string // device type code; cached so we don't re-join per tick
+ Location string
+ BatteryPct int
+}
+
+// Household is one customer.
+type Household struct {
+ ID int64
+ PlanID int
+ RegionID int
+ AddressHash string
+ Armed bool
+}
+
+// Simulator state. Read-only on the hot path after New() returns: writers
+// hold pointers in but don't mutate any field except via eventID's atomic
+// counter and the per-device BatteryPct (writers partition the device
+// slice so no two writers ever touch the same Device).
+//
+// One exception: Household.Armed is read by writers (when evaluating
+// alert rules on triggered events) and written by shuffleArmedLoop in a
+// separate goroutine. armedMu protects that access — writers take it as
+// a reader, the shuffler as a writer.
+//
+// eventCh is the producer→ingestor handoff. Writers push completed row
+// batches onto it; one ingestor goroutine drains it and runs CopyFrom.
+type Simulator struct {
+ Pool *pgxpool.Pool
+ Households []Household
+ Devices []Device
+ TickHz int
+ TargetRate int // events per second total target
+ Writers int // number of writer goroutines
+ eventID atomic.Int64 // shared id generator; seeded in Run()
+ armedMu sync.RWMutex // guards Household.Armed reads/writes
+ eventCh chan [][]any // producer→ingestor batch queue
+ lastCopyUnixNano atomic.Int64 // UnixNano of last successful CopyFrom (0 if none yet)
+ copyFailures atomic.Int64 // total CopyFrom errors since startup
+
+ // Resolver instrumentation. fireAlert bumps the appropriate fired
+ // counter; the resolver bumps the resolved counters with the UPDATE's
+ // CommandTag.RowsAffected. (fired - resolved) ≈ backlog in CedarDB,
+ // computed in-process so we don't need a COUNT(*) on the alerts table.
+ alertsFiredLow atomic.Int64
+ alertsFiredHigh atomic.Int64
+ alertsResolvedLow atomic.Int64
+ alertsResolvedHigh atomic.Int64
+}
+
+// New builds the simulator state: upserts dimensions, synthesizes the
+// household & device fleet, but does not begin the loop. Call Run to
+// start ticking.
+func New(ctx context.Context, pool *pgxpool.Pool, householdCount, devicesPerHousehold, tickHz, targetRate, writers int) (*Simulator, error) {
+ if writers < 1 {
+ writers = 1
+ }
+ if err := upsertDimensions(ctx, pool); err != nil {
+ return nil, fmt.Errorf("upsert dimensions: %w", err)
+ }
+
+ rng := rand.New(rand.NewSource(0xC1D4A12)) // deterministic fleet, demo-friendly
+
+ hh, devs := generateFleet(rng, householdCount, devicesPerHousehold)
+ log.Printf("synthesized fleet: %d households · %d devices", len(hh), len(devs))
+
+ if err := upsertHouseholds(ctx, pool, hh); err != nil {
+ return nil, fmt.Errorf("upsert households: %w", err)
+ }
+ if err := upsertDevices(ctx, pool, devs); err != nil {
+ return nil, fmt.Errorf("upsert devices: %w", err)
+ }
+
+ return &Simulator{
+ Pool: pool,
+ Households: hh,
+ Devices: devs,
+ TickHz: tickHz,
+ TargetRate: targetRate,
+ Writers: writers,
+ }, nil
+}
+
+// Run seeds the event_id counter, launches the background loops (alert
+// resolution + armed-state shuffle + storage sampler), spawns the single
+// ingestor goroutine that owns CopyFrom, and fans out s.Writers
+// producer goroutines over partitions of the device fleet. Producers
+// build row batches and push them onto s.eventCh; the ingestor drains.
+func (s *Simulator) Run(ctx context.Context) error {
+ var maxID int64
+ if err := s.Pool.QueryRow(ctx,
+ `SELECT COALESCE(MAX(event_id), 0) FROM events`).Scan(&maxID); err != nil {
+ return fmt.Errorf("seed event_id: %w", err)
+ }
+ s.eventID.Store(maxID)
+ log.Printf("event_id counter seeded at %d", maxID)
+
+ s.eventCh = make(chan [][]any, eventChCapacity)
+
+ go s.resolveAlertsLoop(ctx)
+ go s.shuffleArmedLoop(ctx)
+ go s.storageSamplerLoop(ctx)
+
+ // Ingestor pool — N goroutines all drain eventCh and run CopyFrom in
+ // parallel. Default N=1 is the historical safe behaviour (CedarDB
+ // used to reject overlapping COPYs with SQLSTATE 40P01); newer
+ // CedarDB versions accept concurrent COPYs, in which case N > 1 may
+ // give a meaningful throughput bump. Tune with HG_INGESTORS.
+ numIngestors := envInt("HG_INGESTORS", 1)
+ var ingestorWG sync.WaitGroup
+ for i := 0; i < numIngestors; i++ {
+ ingestorWG.Add(1)
+ go func(id int) {
+ defer ingestorWG.Done()
+ s.ingestorLoop(ctx, id)
+ }(i)
+ }
+
+ // Heartbeat: one line per second describing pipeline health. Lets us
+ // distinguish "producers stopped" (delta=0) from "ingestor stuck"
+ // (queue high, lastCopy ago growing) from "CedarDB rejecting"
+ // (failures climbing).
+ go s.heartbeatLoop(ctx)
+
+ // Per-writer per-tick budget. The total target rate divides across
+ // writers; each writer then divides its share across ticks.
+ hbPerTick := s.TargetRate / s.TickHz / s.Writers
+ if hbPerTick < 1 {
+ hbPerTick = 1
+ }
+ log.Printf("simulator running: writers=%d tickHz=%d target=%d ev/s hb/tick/writer=%d fleet=%d devices · ingestors=%d batch=%d flush=%s",
+ s.Writers, s.TickHz, s.TargetRate, hbPerTick, len(s.Devices),
+ numIngestors, ingestBatch, flushInterval)
+
+ var wg sync.WaitGroup
+ for i := 0; i < s.Writers; i++ {
+ w := &writer{
+ id: i,
+ sim: s,
+ devices: devicePartition(s.Devices, i, s.Writers),
+ rng: rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(i)*0xC0DE)),
+ hbPerTick: hbPerTick,
+ }
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ w.run(ctx)
+ }()
+ }
+ wg.Wait()
+ // Producers are done; let the ingestors drain anything still pending.
+ // Closing the channel is broadcast to every receiver, so all N
+ // ingestors observe (zero, false) and exit cleanly.
+ close(s.eventCh)
+ ingestorWG.Wait()
+ return ctx.Err()
+}
+
+// devicePartition returns the i-th slice of d, split into n contiguous
+// chunks. The last partition absorbs any remainder.
+func devicePartition(d []Device, i, n int) []Device {
+ chunk := len(d) / n
+ lo := i * chunk
+ hi := lo + chunk
+ if i == n-1 {
+ hi = len(d)
+ }
+ return d[lo:hi]
+}
+
+// writer is one ingest goroutine. Owns a slice of the device fleet,
+// keeps its own deviceCursor and rng, allocates fresh event_ids from
+// sim.eventID, and writes each tick via pgx.CopyFrom on its own pool
+// connection. Writers never share any mutable state on the hot path
+// except the atomic id counter.
+type writer struct {
+ id int
+ sim *Simulator
+ devices []Device
+ rng *rand.Rand
+ deviceCursor int
+ hbPerTick int
+}
+
+func (w *writer) run(ctx context.Context) {
+ interval := time.Second / time.Duration(w.sim.TickHz)
+ tick := time.NewTicker(interval)
+ defer tick.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ w.emitTick(ctx)
+ }
+ }
+}
+
+// emitTick generates one tick's worth of events from this writer's
+// device partition and hands the batch off to the ingestor goroutine
+// via the simulator's eventCh. No DB I/O happens here.
+func (w *writer) emitTick(ctx context.Context) {
+ if len(w.devices) == 0 {
+ return
+ }
+ now := time.Now()
+ rows := make([][]any, 0, w.hbPerTick+8)
+
+ // 1) Heartbeats: rotate through this writer's partition.
+ for i := 0; i < w.hbPerTick; i++ {
+ d := &w.devices[w.deviceCursor%len(w.devices)]
+ w.deviceCursor++
+ // Battery slowly drains over the demo; clamp at 5%.
+ if w.rng.Intn(10000) < 3 && d.BatteryPct > 5 {
+ d.BatteryPct--
+ }
+ rssi := -50 - w.rng.Intn(40) // -50 to -90 dBm
+ var val float64
+ if d.Code == "TEMP" {
+ val = 18.0 + w.rng.Float64()*10
+ }
+ rows = append(rows, w.row(d, now, 0 /*heartbeat*/, 0, val, rssi))
+ }
+
+ // 2) Triggered events: a handful per tick from random devices in
+ // this writer's partition.
+ triggers := 3 + w.rng.Intn(5)
+ for i := 0; i < triggers; i++ {
+ d := &w.devices[w.rng.Intn(len(w.devices))]
+ hh := &w.sim.Households[d.Household%int64(len(w.sim.Households))]
+ rssi := -50 - w.rng.Intn(40)
+ rows = append(rows, w.row(d, now, 1 /*triggered*/, int(severityFor(d.Code)), 1.0, rssi))
+ w.sim.armedMu.RLock()
+ armed := hh.Armed
+ w.sim.armedMu.RUnlock()
+ if dec := EvaluateAlert(d.Code, 1, armed); dec != nil {
+ w.sim.fireAlert(ctx, d.Household, dec.Severity, dec.Detail)
+ }
+ }
+
+ // 3) Occasional battery_low / tamper / offline.
+ if w.rng.Intn(100) < 5 {
+ d := &w.devices[w.rng.Intn(len(w.devices))]
+ kind := []int{2, 4, 3}[w.rng.Intn(3)]
+ sev := 1
+ if kind == 4 {
+ sev = 4
+ }
+ rows = append(rows, w.row(d, now, kind, sev, 0.0, -60-w.rng.Intn(30)))
+ if kind == 4 {
+ if dec := EvaluateAlert(d.Code, 4, false); dec != nil {
+ w.sim.fireAlert(ctx, d.Household, dec.Severity, dec.Detail)
+ }
+ }
+ }
+
+ // 4) Hand the batch to the ingestor. The send blocks if the channel
+ // is full — which is the right backpressure: it means the
+ // ingestor (CedarDB) can't keep up, so we shouldn't generate more
+ // rows until it can. ctx.Done unblocks for clean shutdown.
+ if len(rows) == 0 {
+ return
+ }
+ select {
+ case w.sim.eventCh <- rows:
+ case <-ctx.Done():
+ }
+}
+
+// row builds one events row with a freshly-allocated event_id. The
+// shared atomic counter means writers never collide on the BIGINT PK.
+func (w *writer) row(d *Device, ts time.Time, kind, sev int, val float64, rssi int) []any {
+ return []any{
+ w.sim.eventID.Add(1),
+ d.ID, d.Household, ts,
+ int16(kind), int16(sev), val,
+ int16(d.BatteryPct), int16(rssi),
+ }
+}
+
+// ingestorLoop is the single CopyFrom point. Coalesces batches off of
+// eventCh up to ingestBatch rows (or every flushInterval, whichever
+// comes first) and runs one pgx.CopyFrom per flush. Because there's
+// only ever one CopyFrom in flight, CedarDB never trips the
+// "previous bulk operation must become globally visible" (SQLSTATE
+// 40P01) error that fires when multiple goroutines COPY concurrently.
+//
+// Shutdown contract: Run() closes eventCh after all producers have
+// exited, the ingestor drains everything still in flight, runs one
+// final flush, and returns.
+func (s *Simulator) ingestorLoop(ctx context.Context, id int) {
+ batchSize := envInt("HG_INGEST_BATCH", ingestBatch)
+ if id == 0 {
+ log.Printf("ingestor: batchSize=%d flushInterval=%s", batchSize, flushInterval)
+ }
+
+ pending := make([][]any, 0, batchSize*2)
+ cols := []string{
+ "event_id", "device_id", "household_id", "ts",
+ "kind", "severity", "value", "battery_pct", "rssi_dbm",
+ }
+ flush := func() {
+ if len(pending) == 0 {
+ return
+ }
+ if _, err := s.Pool.CopyFrom(ctx,
+ pgx.Identifier{"events"}, cols, pgx.CopyFromRows(pending),
+ ); err != nil {
+ s.copyFailures.Add(1)
+ log.Printf("ingestor %d: copy events: %v (rows=%d)", id, err, len(pending))
+ } else {
+ s.lastCopyUnixNano.Store(time.Now().UnixNano())
+ }
+ pending = pending[:0]
+ }
+
+ flushTicker := time.NewTicker(flushInterval)
+ defer flushTicker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ flush()
+ return
+ case batch, ok := <-s.eventCh:
+ if !ok {
+ flush()
+ return
+ }
+ pending = append(pending, batch...)
+ if len(pending) >= batchSize {
+ flush()
+ }
+ case <-flushTicker.C:
+ flush()
+ }
+ }
+}
+
+// heartbeatLoop emits a one-line status report once a second so we can
+// tell from the logs whether the pipeline is healthy or stuck, and where
+// the stall is if it isn't:
+//
+// queue=N/CAP — eventCh depth. Near CAP means ingestor (CedarDB) is the bottleneck.
+// delta=N rows/s — row generation rate observed via the atomic counter.
+// lastCopy=Xs ago — wall time since the last successful CopyFrom. Should be < 1s under load.
+// copyFails=N — cumulative CopyFrom errors; non-zero means CedarDB is rejecting.
+//
+// 0 ev/s in the dashboard footer + queue=CAP + lastCopy growing → CedarDB stopped accepting writes.
+// 0 ev/s + queue=0 → producers stopped (look for goroutine panics).
+func (s *Simulator) heartbeatLoop(ctx context.Context) {
+ var prev int64
+ tick := time.NewTicker(1 * time.Second)
+ defer tick.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ cur := s.eventID.Load()
+ delta := cur - prev
+ prev = cur
+ ago := time.Duration(-1)
+ if last := s.lastCopyUnixNano.Load(); last > 0 {
+ ago = time.Since(time.Unix(0, last)).Round(time.Millisecond)
+ }
+ log.Printf("heartbeat: queue=%d/%d eventID=%d delta=%d rows/s lastCopy=%v ago copyFails=%d",
+ len(s.eventCh), eventChCapacity, cur, delta, ago, s.copyFailures.Load())
+ }
+ }
+}
+
+// fireAlert inserts a row into alerts. Status starts 'active'; the
+// background resolveAlertsLoop will resolve it after a short delay.
+// We also bump the in-process generation counter the resolver uses to
+// auto-tune its drain rate — we don't wait for the DB INSERT to settle
+// because the resolver's controller wants the request rate, not the
+// committed rate (they differ only by transient pool / fsync latency).
+func (s *Simulator) fireAlert(ctx context.Context, householdID int64, severity int, detail string) {
+ if severity <= 2 {
+ s.alertsFiredLow.Add(1)
+ } else {
+ s.alertsFiredHigh.Add(1)
+ }
+ _, err := s.Pool.Exec(ctx, `
+ INSERT INTO alerts (household_id, triggered_event_id, raised_at, severity, status, detail)
+ VALUES ($1, 0, now(), $2, 'active', $3)
+ `, householdID, int16(severity), detail)
+ if err != nil {
+ log.Printf("insert alert: %v", err)
+ }
+}
+
+// resolveAlertsLoop runs in the background; every HG_RESOLVE_INTERVAL it
+// resolves the oldest active alerts so the operator queue doesn't grow
+// without bound. The mix of severities resolved varies (high-severity
+// stick around longer, low-severity get auto-cleared faster) to mimic
+// real triage.
+//
+// When HG_RESOLVE_AUTOTUNE is on (default), the per-tick LIMITs are
+// chosen at runtime as
+//
+// limit = max(env_floor,
+// ceil(deltaFired * autoTuneHeadroom + backlog * backlogDecay))
+//
+// where deltaFired is alerts inserted since the previous tick, and
+// backlog ≈ alertsFired − alertsResolved tracked in two atomic counters
+// per severity tier. Both inputs come from in-process state so we never
+// have to COUNT(*) the alerts table to size the next tick.
+//
+// HG_RESOLVE_LOW_LIMIT / HG_RESOLVE_HIGH_LIMIT remain the floors —
+// auto-tune can push higher but never below them. Set
+// HG_RESOLVE_AUTOTUNE=false to pin the limits at the floors (useful when
+// you want to demo what happens to AGE/SLA as a backlog grows).
+func (s *Simulator) resolveAlertsLoop(ctx context.Context) {
+ interval := envDuration("HG_RESOLVE_INTERVAL", 2*time.Second)
+ lowFloor := envInt("HG_RESOLVE_LOW_LIMIT", 2000)
+ highFloor := envInt("HG_RESOLVE_HIGH_LIMIT", 600)
+ autoTune := envBool("HG_RESOLVE_AUTOTUNE", true)
+ log.Printf("alert resolver: interval=%s lowFloor=%d highFloor=%d autoTune=%v",
+ interval, lowFloor, highFloor, autoTune)
+
+ var lastFiredLow, lastFiredHigh int64
+ var lastLogAt time.Time
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ curFiredLow := s.alertsFiredLow.Load()
+ curFiredHigh := s.alertsFiredHigh.Load()
+ deltaLow := curFiredLow - lastFiredLow
+ deltaHigh := curFiredHigh - lastFiredHigh
+ lastFiredLow, lastFiredHigh = curFiredLow, curFiredHigh
+
+ backlogLow := curFiredLow - s.alertsResolvedLow.Load()
+ backlogHigh := curFiredHigh - s.alertsResolvedHigh.Load()
+ if backlogLow < 0 {
+ backlogLow = 0
+ }
+ if backlogHigh < 0 {
+ backlogHigh = 0
+ }
+
+ lowLimit, highLimit := lowFloor, highFloor
+ if autoTune {
+ autoLow := int(float64(deltaLow)*autoTuneHeadroom + float64(backlogLow)*backlogDecay)
+ autoHigh := int(float64(deltaHigh)*autoTuneHeadroom + float64(backlogHigh)*backlogDecay)
+ if autoLow > lowLimit {
+ lowLimit = autoLow
+ }
+ if autoHigh > highLimit {
+ highLimit = autoHigh
+ }
+ }
+ if lowLimit > maxAutoLimit {
+ lowLimit = maxAutoLimit
+ }
+ if highLimit > maxAutoLimit {
+ highLimit = maxAutoLimit
+ }
+
+ // Resolve low-severity (info / sev 1-2) → false_alarm.
+ lowTag, _ := s.Pool.Exec(ctx, `
+ UPDATE alerts
+ SET status = 'false_alarm',
+ resolved_at = now(),
+ resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int
+ WHERE alert_id IN (
+ SELECT alert_id FROM alerts
+ WHERE status = 'active' AND severity <= 2
+ ORDER BY raised_at ASC LIMIT $1
+ )
+ `, lowLimit)
+ // Dispatch then resolve a sample of higher-severity older alerts.
+ highTag, _ := s.Pool.Exec(ctx, `
+ UPDATE alerts
+ SET status = 'resolved',
+ resolved_at = now(),
+ resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int
+ WHERE alert_id IN (
+ SELECT alert_id FROM alerts
+ WHERE status = 'active' AND severity >= 3
+ AND raised_at < now() - interval '20 seconds'
+ ORDER BY raised_at ASC LIMIT $1
+ )
+ `, highLimit)
+ s.alertsResolvedLow.Add(lowTag.RowsAffected())
+ s.alertsResolvedHigh.Add(highTag.RowsAffected())
+
+ // Once every 10 s, log what we drained and how big the queue is.
+ if time.Since(lastLogAt) >= 10*time.Second {
+ log.Printf("resolver: drain low=%d/%d high=%d/%d · backlog low=%d high=%d",
+ lowTag.RowsAffected(), lowLimit,
+ highTag.RowsAffected(), highLimit,
+ backlogLow, backlogHigh)
+ lastLogAt = time.Now()
+ }
+ }
+ }
+}
+
+// shuffleArmedLoop runs as its own goroutine (replaces the
+// armedTimer-driven path in the old emitTick). Every 10 seconds it
+// flips ~5% of households' armed state so the rules engine outputs
+// stay varied over the demo.
+func (s *Simulator) shuffleArmedLoop(ctx context.Context) {
+ rng := rand.New(rand.NewSource(time.Now().UnixNano() ^ 0xA1ED))
+ tick := time.NewTicker(10 * time.Second)
+ defer tick.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ s.shuffleArmedState(ctx, rng)
+ }
+ }
+}
+
+func (s *Simulator) shuffleArmedState(ctx context.Context, rng *rand.Rand) {
+ // Flip ~5% of households' armed state in memory; push the new value
+ // in batches. armedMu guards the in-memory mutation against
+ // concurrent reads from the writer goroutines.
+ batch := &pgx.Batch{}
+ flipped := 0
+ s.armedMu.Lock()
+ for i := range s.Households {
+ if rng.Intn(20) == 0 {
+ s.Households[i].Armed = !s.Households[i].Armed
+ batch.Queue(`UPDATE households SET armed = $1 WHERE household_id = $2`,
+ s.Households[i].Armed, s.Households[i].ID)
+ flipped++
+ }
+ }
+ s.armedMu.Unlock()
+ if flipped == 0 {
+ return
+ }
+ if err := s.Pool.SendBatch(ctx, batch).Close(); err != nil {
+ log.Printf("armed-state shuffle: %v", err)
+ }
+}
+
+// eventsRowBytes is the per-row uncompressed footprint of the events
+// table, derived directly from the schema column widths:
+//
+// event_id BIGINT(8) + device_id BIGINT(8) + household_id BIGINT(8)
+// + ts TIMESTAMPTZ(8) + kind SMALLINT(2) + severity SMALLINT(2)
+// + value DOUBLE(8) + battery_pct SMALLINT(2) + rssi_dbm SMALLINT(2)
+// = 48 bytes
+//
+// We use this rather than CedarDB's cedardb_compression_info view
+// because the view only updates once written data lands in column-store
+// blocks — at high ingest rates that lags reality by many seconds.
+// COUNT(*) on events tracks live row counts in real time.
+const eventsRowBytes = 48
+
+// storageSamplerLoop records a (now(), 48 * eventID) sample into
+// storage_samples every 5 seconds. The dashboard's /api/storage endpoint
+// derives 1m/5m/15m ingest rates from this table.
+//
+// We compute bytes from the in-process atomic counter (s.eventID, which
+// is seeded from MAX(event_id) at startup and incremented per generated
+// row) rather than running COUNT(*) on the events table. Two reasons:
+//
+// 1. Cost. At demo scale the events table can reach billions of rows
+// in well under an hour. COUNT(*) becomes a multi-second full scan
+// and may hold a snapshot that pressures the write path — a likely
+// cause of the 30-minute ingest stall we saw.
+// 2. Currency. The atomic counter is updated at the moment a row is
+// generated, ahead of the CopyFrom that actually persists it. The
+// gauge can therefore appear up to (channel queue + in-flight
+// batch) rows ahead of what's truly in the table — at most ~160 K
+// rows ≈ 8 MB at the demo's default backpressure ceiling, which is
+// well under one second of headway and a fine trade-off given the
+// scan cost we avoid.
+//
+// The number is "uncompressed-bytes-equivalent" — what the data would
+// weigh as a flat file. CedarDB's actual on-disk footprint after column
+// encoding (truncate, frame-of-reference, dictionary, etc.) is 5–10×
+// smaller.
+func (s *Simulator) storageSamplerLoop(ctx context.Context) {
+ interval := envDuration("HG_STORAGE_SAMPLER_INTERVAL", 5*time.Second)
+ log.Printf("storage sampler: interval=%s", interval)
+ tick := time.NewTicker(interval)
+ defer tick.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ bytes := int64(eventsRowBytes) * s.eventID.Load()
+ if _, err := s.Pool.Exec(ctx, `
+ INSERT INTO storage_samples (sampled_at, uncompressed_bytes)
+ VALUES (now(), $1)
+ `, bytes); err != nil {
+ log.Printf("storage sampler: insert: %v", err)
+ }
+ }
+ }
+}
+
+// severityFor maps a device type code to its baseline severity for
+// triggered events. Used when writing the event row itself; the alert
+// decision (rules.go) does the more nuanced household-armed logic.
+func severityFor(code string) int16 {
+ for _, dt := range DeviceTypes {
+ if dt.Code == code {
+ return int16(dt.DefaultSeverity)
+ }
+ }
+ return 1
+}
+
+// --- fleet synthesis ------------------------------------------------------
+
+func generateFleet(rng *rand.Rand, householdCount, devicesPerHousehold int) ([]Household, []Device) {
+ hh := make([]Household, 0, householdCount)
+ devs := make([]Device, 0, householdCount*devicesPerHousehold)
+
+ for i := 0; i < householdCount; i++ {
+ id := int64(1000000 + i) // start at a friendly-looking ID
+ plan := pickWeighted(rng, []int{40, 35, 18, 7}) // basic-heavy
+ region := rng.Intn(len(Regions)) + 1
+ armed := rng.Intn(100) < 35 // ~35% armed at any time
+ hh = append(hh, Household{
+ ID: id,
+ PlanID: plan + 1,
+ RegionID: region,
+ AddressHash: hashAddr(id),
+ Armed: armed,
+ })
+
+ // Each household gets between (devicesPerHousehold/2) and
+ // (devicesPerHousehold*3/2) devices, distributed across plausible
+ // types and locations.
+ n := devicesPerHousehold/2 + rng.Intn(devicesPerHousehold)
+ for j := 0; j < n; j++ {
+ dt := pickDeviceType(rng)
+ loc := pickLocation(rng, dt.Code)
+ devs = append(devs, Device{
+ ID: int64(len(devs)) + 100000,
+ Household: id,
+ TypeID: dt.ID,
+ Code: dt.Code,
+ Location: loc,
+ BatteryPct: 70 + rng.Intn(30), // 70..99
+ })
+ }
+ }
+ return hh, devs
+}
+
+func pickWeighted(rng *rand.Rand, weights []int) int {
+ total := 0
+ for _, w := range weights {
+ total += w
+ }
+ r := rng.Intn(total)
+ for i, w := range weights {
+ if r < w {
+ return i
+ }
+ r -= w
+ }
+ return len(weights) - 1
+}
+
+func pickDeviceType(rng *rand.Rand) DeviceType {
+ // Reasonable distribution: motion + door/window common; smoke/CO/water
+ // less common but always at least one. Doorbell + keypad moderate.
+ weights := []int{30, 22, 15, 5, 6, 4, 6, 4, 5, 3}
+ idx := pickWeighted(rng, weights)
+ return DeviceTypes[idx]
+}
+
+func pickLocation(rng *rand.Rand, code string) string {
+ // Map device types to plausible locations.
+ switch code {
+ case "DOOR":
+ return []string{"front_door", "back_door", "garage_door", "side_door"}[rng.Intn(4)]
+ case "WINDOW":
+ return []string{"living_room", "kitchen", "dining_room", "master_bedroom", "bedroom_2", "bedroom_3"}[rng.Intn(6)]
+ case "GLASS_BREAK":
+ return []string{"living_room", "dining_room", "kitchen"}[rng.Intn(3)]
+ case "SMOKE", "CO":
+ return []string{"kitchen", "hallway", "master_bedroom", "bedroom_2", "basement"}[rng.Intn(5)]
+ case "WATER":
+ return []string{"kitchen", "basement", "utility_room"}[rng.Intn(3)]
+ case "TEMP":
+ return []string{"living_room", "master_bedroom", "basement", "attic"}[rng.Intn(4)]
+ case "DOORBELL":
+ return "front_door"
+ case "KEYPAD":
+ return "front_door"
+ case "MOTION":
+ return Locations[rng.Intn(len(Locations))]
+ }
+ return Locations[rng.Intn(len(Locations))]
+}
+
+func hashAddr(id int64) string {
+ h := sha1.Sum([]byte("addr-" + strconv.FormatInt(id, 10)))
+ return hex.EncodeToString(h[:6]) // short, opaque
+}
+
+// --- dimension + fleet upserts -------------------------------------------
+
+func upsertDimensions(ctx context.Context, pool *pgxpool.Pool) error {
+ batch := &pgx.Batch{}
+ for _, p := range Plans {
+ batch.Queue(`
+ INSERT INTO plans (plan_id, name, monthly_price_usd, sla_seconds)
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (plan_id) DO UPDATE SET
+ name = EXCLUDED.name,
+ monthly_price_usd = EXCLUDED.monthly_price_usd,
+ sla_seconds = EXCLUDED.sla_seconds
+ `, p.ID, p.Name, p.PriceUSD, p.SLASeconds)
+ }
+ for _, r := range Regions {
+ batch.Queue(`
+ INSERT INTO regions (region_id, name, dispatch_center, timezone)
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (region_id) DO UPDATE SET
+ name = EXCLUDED.name,
+ dispatch_center = EXCLUDED.dispatch_center,
+ timezone = EXCLUDED.timezone
+ `, r.ID, r.Name, r.DispatchCenter, r.Timezone)
+ }
+ for _, dt := range DeviceTypes {
+ batch.Queue(`
+ INSERT INTO device_types (device_type_id, code, name, default_severity)
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (device_type_id) DO UPDATE SET
+ code = EXCLUDED.code,
+ name = EXCLUDED.name,
+ default_severity = EXCLUDED.default_severity
+ `, dt.ID, dt.Code, dt.Name, int16(dt.DefaultSeverity))
+ }
+ return pool.SendBatch(ctx, batch).Close()
+}
+
+func upsertHouseholds(ctx context.Context, pool *pgxpool.Pool, hh []Household) error {
+ // CopyFrom into a temp staging approach would be faster, but a Batch of
+ // INSERTs with ON CONFLICT is simple, idempotent, and fast enough for
+ // ~30 000 rows once at startup.
+ const batchSize = 1000
+ for start := 0; start < len(hh); start += batchSize {
+ end := start + batchSize
+ if end > len(hh) {
+ end = len(hh)
+ }
+ batch := &pgx.Batch{}
+ for _, h := range hh[start:end] {
+ batch.Queue(`
+ INSERT INTO households (household_id, plan_id, region_id, address_hash, armed)
+ VALUES ($1, $2, $3, $4, $5)
+ ON CONFLICT (household_id) DO UPDATE SET
+ plan_id = EXCLUDED.plan_id,
+ region_id = EXCLUDED.region_id,
+ armed = EXCLUDED.armed
+ `, h.ID, h.PlanID, h.RegionID, h.AddressHash, h.Armed)
+ }
+ if err := pool.SendBatch(ctx, batch).Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func upsertDevices(ctx context.Context, pool *pgxpool.Pool, devs []Device) error {
+ const batchSize = 1000
+ for start := 0; start < len(devs); start += batchSize {
+ end := start + batchSize
+ if end > len(devs) {
+ end = len(devs)
+ }
+ batch := &pgx.Batch{}
+ for _, d := range devs[start:end] {
+ batch.Queue(`
+ INSERT INTO devices (device_id, household_id, device_type_id, location, last_battery_pct)
+ VALUES ($1, $2, $3, $4, $5)
+ ON CONFLICT (device_id) DO UPDATE SET
+ household_id = EXCLUDED.household_id,
+ device_type_id = EXCLUDED.device_type_id,
+ location = EXCLUDED.location,
+ last_battery_pct = EXCLUDED.last_battery_pct
+ `, d.ID, d.Household, d.TypeID, d.Location, int16(d.BatteryPct))
+ }
+ if err := pool.SendBatch(ctx, batch).Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/homeguard-iot/internal/web/server.go b/homeguard-iot/internal/web/server.go
new file mode 100644
index 0000000..960b5d8
--- /dev/null
+++ b/homeguard-iot/internal/web/server.go
@@ -0,0 +1,613 @@
+// Package web is the operator-facing dashboard.
+//
+// Layout (top → bottom): active-alerts queue, live event stream, customer
+// drill-down, ingest-rate footer. Each panel is driven by a different
+// query against the same CedarDB instance the simulator is INSERTing into,
+// at refresh rates from 200 ms to 2 s — that's the whole story this demo
+// is selling.
+package web
+
+import (
+ "context"
+ "database/sql"
+ "embed"
+ "encoding/json"
+ "fmt"
+ "html/template"
+ "io/fs"
+ "log"
+ "net/http"
+ "os"
+ "strconv"
+ "time"
+
+ "github.com/jackc/pgx/v5/pgxpool"
+)
+
+// envDuration reads a Go duration string from the named environment
+// variable, falling back to the supplied default. Mirrors the helper in
+// the sim package so neither has to import the other.
+func envDuration(name string, fallback time.Duration) time.Duration {
+ v := os.Getenv(name)
+ if v == "" {
+ return fallback
+ }
+ d, err := time.ParseDuration(v)
+ if err != nil {
+ log.Printf("envDuration: invalid %s=%q (%v); using default %s", name, v, err, fallback)
+ return fallback
+ }
+ return d
+}
+
+//go:embed templates/*.html
+var templatesFS embed.FS
+
+//go:embed static/*
+var staticFS embed.FS
+
+type Server struct {
+ Pool *pgxpool.Pool
+ tpl *template.Template
+ static http.Handler
+}
+
+func NewServer(pool *pgxpool.Pool) (*Server, error) {
+ tpl, err := template.ParseFS(templatesFS, "templates/*.html")
+ if err != nil {
+ return nil, fmt.Errorf("parse templates: %w", err)
+ }
+ sub, err := fs.Sub(staticFS, "static")
+ if err != nil {
+ return nil, fmt.Errorf("static sub: %w", err)
+ }
+ return &Server{
+ Pool: pool,
+ tpl: tpl,
+ static: http.FileServer(http.FS(sub)),
+ }, nil
+}
+
+func (s *Server) Routes() http.Handler {
+ mux := http.NewServeMux()
+ mux.HandleFunc("/", s.handleIndex)
+ mux.HandleFunc("/sse/events", s.handleSSE)
+ mux.HandleFunc("/api/alerts", s.handleAlerts)
+ mux.HandleFunc("/api/drilldown", s.handleDrilldown)
+ mux.HandleFunc("/api/stats", s.handleStats)
+ mux.HandleFunc("/api/storage", s.handleStorage)
+ mux.Handle("/static/", http.StripPrefix("/static/", s.static))
+ return mux
+}
+
+// --------------------------------------------------------------------- index
+
+// indexData carries the dashboard's polling cadences into the index
+// template. The four refresh intervals are millisecond integers so the
+// JS setInterval() calls and htmx "every Nms" triggers can use them
+// verbatim. Defaults match the original hardcoded values.
+type indexData struct {
+ AlertsRefreshMs int64
+ DrilldownRefreshMs int64
+ StatsRefreshMs int64
+ StorageRefreshMs int64
+}
+
+func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/" {
+ http.NotFound(w, r)
+ return
+ }
+ data := indexData{
+ AlertsRefreshMs: envDuration("HG_ALERTS_REFRESH", 1*time.Second).Milliseconds(),
+ DrilldownRefreshMs: envDuration("HG_DRILLDOWN_REFRESH", 2*time.Second).Milliseconds(),
+ StatsRefreshMs: envDuration("HG_STATS_REFRESH", 1*time.Second).Milliseconds(),
+ StorageRefreshMs: envDuration("HG_STORAGE_REFRESH", 1*time.Second).Milliseconds(),
+ }
+ if err := s.tpl.ExecuteTemplate(w, "index.html", data); err != nil {
+ log.Printf("template: %v", err)
+ }
+}
+
+// --------------------------------------------------------------------- alerts
+
+// handleAlerts returns the operator's active-alerts queue as an HTML
+// fragment. Joins alerts × households × plans so we can compute the
+// per-alert SLA countdown (plan.sla_seconds - age) directly in the query.
+func (s *Server) handleAlerts(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ rows, err := s.Pool.Query(ctx, `
+ SELECT a.alert_id, a.severity, a.detail,
+ EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s,
+ p.sla_seconds,
+ p.sla_seconds - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining,
+ h.household_id, h.address_hash,
+ p.name AS plan_name,
+ r.name AS region_name,
+ r.dispatch_center
+ FROM alerts a
+ JOIN households h ON h.household_id = a.household_id
+ JOIN plans p ON p.plan_id = h.plan_id
+ JOIN regions r ON r.region_id = h.region_id
+ WHERE a.status = 'active'
+ ORDER BY a.severity DESC, a.raised_at ASC
+ LIMIT 25
+ `)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ defer rows.Close()
+
+ type row struct {
+ AlertID int64
+ Severity int
+ Detail string
+ AgeSec int
+ SLASec int
+ SLARemaining int
+ HouseholdID int64
+ AddressHash string
+ PlanName string
+ RegionName string
+ Dispatch string
+ Breached bool
+ AgeFmt string
+ SLAFmt string
+ }
+ var out []row
+ for rows.Next() {
+ var rr row
+ if err := rows.Scan(
+ &rr.AlertID, &rr.Severity, &rr.Detail, &rr.AgeSec,
+ &rr.SLASec, &rr.SLARemaining, &rr.HouseholdID, &rr.AddressHash,
+ &rr.PlanName, &rr.RegionName, &rr.Dispatch,
+ ); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ rr.Breached = rr.SLARemaining < 0
+ rr.AgeFmt = fmtDuration(rr.AgeSec)
+ rr.SLAFmt = fmtDuration(absInt(rr.SLARemaining))
+ out = append(out, rr)
+ }
+ // If the iteration ended because of an error (most commonly a
+ // context-cancelled mid-scan on an unindexed query), surface it.
+ // Without this, a half-completed scan looks identical to "no rows"
+ // and the dashboard silently renders "no active alerts."
+ if err := rows.Err(); err != nil {
+ log.Printf("alerts query: rows.Err: %v", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ if err := s.tpl.ExecuteTemplate(w, "alerts.html", out); err != nil {
+ log.Printf("alerts tpl: %v", err)
+ }
+}
+
+// --------------------------------------------------------------------- SSE
+
+// handleSSE streams a JSON snapshot of the most-recent non-heartbeat
+// events every 200 ms. Heartbeats are filtered out server-side so the
+// operator panel stays readable; the analytical queries elsewhere
+// continue to count every row.
+func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) {
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ http.Error(w, "streaming not supported", http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("X-Accel-Buffering", "no")
+
+ ctx := r.Context()
+ tick := time.NewTicker(envDuration("HG_SSE_INTERVAL", 200*time.Millisecond))
+ defer tick.Stop()
+
+ if err := s.pushEventStream(ctx, w, flusher); err != nil {
+ return
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ if err := s.pushEventStream(ctx, w, flusher); err != nil {
+ return
+ }
+ }
+ }
+}
+
+type eventRow struct {
+ EventID int64 `json:"event_id"`
+ Ts string `json:"ts"`
+ HouseholdID int64 `json:"household_id"`
+ AddressHash string `json:"address_hash"`
+ DeviceCode string `json:"device_code"`
+ Location string `json:"location"`
+ Kind int `json:"kind"`
+ Severity int `json:"severity"`
+ BatteryPct int `json:"battery_pct"`
+ Region string `json:"region"`
+}
+
+func (s *Server) pushEventStream(ctx context.Context, w http.ResponseWriter, f http.Flusher) error {
+ // Newest 25 non-heartbeat events, joined with device + device_type +
+ // household + region for human-readable rendering. The (kind > 0)
+ // filter uses the events_kind_ts_idx index efficiently.
+ rows, err := s.Pool.Query(ctx, `
+ SELECT e.event_id, e.ts, e.household_id, h.address_hash,
+ dt.code, d.location, e.kind, e.severity,
+ COALESCE(e.battery_pct, -1), r.name
+ FROM events e
+ JOIN devices d ON d.device_id = e.device_id
+ JOIN device_types dt ON dt.device_type_id = d.device_type_id
+ JOIN households h ON h.household_id = e.household_id
+ JOIN regions r ON r.region_id = h.region_id
+ WHERE e.kind > 0
+ ORDER BY e.ts DESC
+ LIMIT 25
+ `)
+ if err != nil {
+ log.Printf("event stream query: %v", err)
+ return nil
+ }
+ defer rows.Close()
+
+ out := make([]eventRow, 0, 25)
+ for rows.Next() {
+ var er eventRow
+ var ts time.Time
+ var bp int
+ if err := rows.Scan(&er.EventID, &ts, &er.HouseholdID, &er.AddressHash,
+ &er.DeviceCode, &er.Location, &er.Kind, &er.Severity,
+ &bp, &er.Region); err != nil {
+ log.Printf("event stream scan: %v", err)
+ return nil
+ }
+ er.Ts = ts.Format("15:04:05")
+ er.BatteryPct = bp
+ out = append(out, er)
+ }
+ if err := rows.Err(); err != nil {
+ log.Printf("event stream: rows.Err: %v", err)
+ // Don't propagate as a stream error — just skip this frame and
+ // let the next tick try again.
+ return nil
+ }
+ buf, err := json.Marshal(map[string]any{
+ "events": out,
+ "stamp_ms": time.Now().UnixMilli(),
+ })
+ if err != nil {
+ return err
+ }
+ if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil {
+ return err
+ }
+ f.Flush()
+ return nil
+}
+
+// --------------------------------------------------------------------- drilldown
+
+// handleDrilldown returns the "what's going on at this household right
+// now" panel. We pick the household behind the highest-severity currently-
+// active alert (or the most-recently-active household if none) and show
+// its last 20 events with device + type joined in.
+//
+// This is the operator-flow query: alert fires → click household → see
+// context to decide dispatch.
+func (s *Server) handleDrilldown(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+
+ var householdID int64
+ err := s.Pool.QueryRow(ctx, `
+ SELECT a.household_id
+ FROM alerts a
+ WHERE a.status = 'active'
+ ORDER BY a.severity DESC, a.raised_at ASC
+ LIMIT 1
+ `).Scan(&householdID)
+ if err != nil {
+ // Fallback: pick the household with the most events in the last
+ // 5 minutes — keeps the panel populated even when there are no
+ // active alerts.
+ _ = s.Pool.QueryRow(ctx, `
+ SELECT household_id
+ FROM events
+ WHERE ts > now() - interval '5 minutes'
+ GROUP BY household_id
+ ORDER BY COUNT(*) DESC
+ LIMIT 1
+ `).Scan(&householdID)
+ }
+ if householdID == 0 {
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ _, _ = w.Write([]byte(`
waiting for events…
`))
+ return
+ }
+
+ // Household metadata for the header.
+ var (
+ addr, planName, regionName, dispatch string
+ armed bool
+ slaSec int
+ )
+ if err := s.Pool.QueryRow(ctx, `
+ SELECT h.address_hash, h.armed, p.name, p.sla_seconds, r.name, r.dispatch_center
+ FROM households h
+ JOIN plans p ON p.plan_id = h.plan_id
+ JOIN regions r ON r.region_id = h.region_id
+ WHERE h.household_id = $1
+ `, householdID).Scan(&addr, &armed, &planName, &slaSec, ®ionName, &dispatch); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ // Last 20 events for this household, any kind.
+ rows, err := s.Pool.Query(ctx, `
+ SELECT e.ts, dt.code, d.location, e.kind, e.severity,
+ COALESCE(e.battery_pct, -1)
+ FROM events e
+ JOIN devices d ON d.device_id = e.device_id
+ JOIN device_types dt ON dt.device_type_id = d.device_type_id
+ WHERE e.household_id = $1
+ ORDER BY e.ts DESC
+ LIMIT 20
+ `, householdID)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ defer rows.Close()
+
+ type evRow struct {
+ Ts string
+ Code string
+ Location string
+ Kind int
+ Severity int
+ BatteryPct int
+ }
+ var events []evRow
+ for rows.Next() {
+ var (
+ er evRow
+ ts time.Time
+ )
+ if err := rows.Scan(&ts, &er.Code, &er.Location, &er.Kind, &er.Severity, &er.BatteryPct); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ er.Ts = ts.Format("15:04:05")
+ events = append(events, er)
+ }
+ if err := rows.Err(); err != nil {
+ log.Printf("drilldown query: rows.Err: %v", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ if err := s.tpl.ExecuteTemplate(w, "drilldown.html", map[string]any{
+ "HouseholdID": householdID,
+ "Address": addr,
+ "Armed": armed,
+ "Plan": planName,
+ "SLASec": slaSec,
+ "Region": regionName,
+ "Dispatch": dispatch,
+ "Events": events,
+ }); err != nil {
+ log.Printf("drilldown tpl: %v", err)
+ }
+}
+
+// --------------------------------------------------------------------- stats
+
+// handleStats is the meta-query for the dashboard footer: how fast are
+// we ingesting right now, how many rows have landed total, how many
+// alerts are open. The dashboard footer reads this every
+// HG_STATS_REFRESH (default 1s).
+//
+// The event-rate side reads from storage_samples (which the simulator
+// populates every HG_STORAGE_SAMPLER_INTERVAL from its atomic eventID
+// counter), not from events itself. At billion-row scale, scanning
+// events for COUNT(*) — especially with no useful index — takes long
+// enough to make the footer effectively frozen. storage_samples is a
+// few hundred rows at most and answers instantly. The published
+// rows_per_sec is therefore the average over the last sample interval
+// (default 5s, the user may have set it longer in HG_STORAGE_SAMPLER_INTERVAL),
+// not a strict one-second window.
+func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+
+ var (
+ rowsPerSec int64
+ totalEv int64
+ activeAlts int64
+ totalAlts int64
+ )
+ // Event metrics — derived from the two most recent storage_samples
+ // rows. total_events = latest_bytes / 48. rate = bytes-delta over the
+ // elapsed time divided by 48.
+ var (
+ latestBytes, priorBytes sql.NullInt64
+ latestTs, priorTs sql.NullTime
+ )
+ if err := s.Pool.QueryRow(ctx, `
+ SELECT
+ (SELECT uncompressed_bytes FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+ (SELECT sampled_at FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_ts
+ `).Scan(&latestBytes, &latestTs, &priorBytes, &priorTs); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if latestBytes.Valid {
+ totalEv = latestBytes.Int64 / int64(eventsRowBytes)
+ }
+ if latestBytes.Valid && priorBytes.Valid && latestTs.Valid && priorTs.Valid {
+ dt := latestTs.Time.Sub(priorTs.Time).Seconds()
+ if dt > 0 {
+ delta := latestBytes.Int64 - priorBytes.Int64
+ if delta < 0 {
+ delta = 0
+ }
+ rowsPerSec = int64(float64(delta) / dt / float64(eventsRowBytes))
+ }
+ }
+ // Alert metrics.
+ if err := s.Pool.QueryRow(ctx, `
+ SELECT
+ COUNT(*) FILTER (WHERE status = 'active') AS active,
+ COUNT(*) AS total
+ FROM alerts
+ `).Scan(&activeAlts, &totalAlts); err != nil {
+ // Non-fatal; alerts table may be empty very early.
+ activeAlts = 0
+ totalAlts = 0
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(map[string]any{
+ "rows_per_sec": rowsPerSec,
+ "total_events": totalEv,
+ "active_alerts": activeAlts,
+ "total_alerts": totalAlts,
+ })
+}
+
+// --------------------------------------------------------------------- storage
+
+// targetBytesPerSec: 3 TB/day uncompressed = 3 * 10^12 / 86 400 s ≈
+// 34.722 MB/s. The dashboard renders gauges against this target.
+const targetBytesPerSec = 34_722_222
+
+// eventsRowBytes: per-row uncompressed footprint of the events table,
+// derived from the schema column widths (matches the constant of the
+// same name in the sim package). Used to convert storage_samples
+// uncompressed_bytes back into row counts for the stats footer.
+const eventsRowBytes = 48
+
+// handleStorage returns the most recent uncompressed-size sample plus
+// rates computed across the last 1, 5, and 15 minutes. Rates are
+// (latest_bytes - bytes_at_window_start) / window_seconds — so they
+// reflect the real growth of stored data, not the wire-rate of inserts.
+func (s *Server) handleStorage(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+
+ type windowResult struct {
+ LatestBytes sql.NullInt64
+ LatestTs sql.NullTime
+ Bytes1mAgo sql.NullInt64
+ Ts1mAgo sql.NullTime
+ Bytes5mAgo sql.NullInt64
+ Ts5mAgo sql.NullTime
+ Bytes15mAgo sql.NullInt64
+ Ts15mAgo sql.NullTime
+ }
+ var rr windowResult
+ if err := s.Pool.QueryRow(ctx, `
+ SELECT
+ (SELECT uncompressed_bytes FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+ (SELECT sampled_at FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '1 minute'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_1m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '1 minute'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_1m,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '5 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_5m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '5 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_5m,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '15 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_15m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '15 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_15m
+ `).Scan(
+ &rr.LatestBytes, &rr.LatestTs,
+ &rr.Bytes1mAgo, &rr.Ts1mAgo,
+ &rr.Bytes5mAgo, &rr.Ts5mAgo,
+ &rr.Bytes15mAgo, &rr.Ts15mAgo,
+ ); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ rate := func(latest, prev sql.NullInt64, latestTs, prevTs sql.NullTime) (float64, bool) {
+ if !latest.Valid || !prev.Valid || !latestTs.Valid || !prevTs.Valid {
+ return 0, false
+ }
+ dt := latestTs.Time.Sub(prevTs.Time).Seconds()
+ if dt <= 0 {
+ return 0, false
+ }
+ db := float64(latest.Int64 - prev.Int64)
+ if db < 0 {
+ db = 0 // can happen across CedarDB compactions
+ }
+ return db / dt, true
+ }
+
+ r1, ok1 := rate(rr.LatestBytes, rr.Bytes1mAgo, rr.LatestTs, rr.Ts1mAgo)
+ r5, ok5 := rate(rr.LatestBytes, rr.Bytes5mAgo, rr.LatestTs, rr.Ts5mAgo)
+ r15, ok15 := rate(rr.LatestBytes, rr.Bytes15mAgo, rr.LatestTs, rr.Ts15mAgo)
+
+ out := map[string]any{
+ "target_bytes_per_sec": targetBytesPerSec,
+ }
+ if rr.LatestBytes.Valid {
+ out["latest_bytes"] = rr.LatestBytes.Int64
+ }
+ if rr.LatestTs.Valid {
+ out["latest_ts"] = rr.LatestTs.Time.Format(time.RFC3339)
+ }
+ if ok1 {
+ out["rate_1m_bps"] = r1
+ }
+ if ok5 {
+ out["rate_5m_bps"] = r5
+ }
+ if ok15 {
+ out["rate_15m_bps"] = r15
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(out)
+}
+
+// --------------------------------------------------------------------- utils
+
+func fmtDuration(sec int) string {
+ if sec < 60 {
+ return strconv.Itoa(sec) + "s"
+ }
+ return strconv.Itoa(sec/60) + "m" + strconv.Itoa(sec%60) + "s"
+}
+
+func absInt(x int) int {
+ if x < 0 {
+ return -x
+ }
+ return x
+}
diff --git a/homeguard-iot/internal/web/static/cedardb-logo.svg b/homeguard-iot/internal/web/static/cedardb-logo.svg
new file mode 100644
index 0000000..192623c
--- /dev/null
+++ b/homeguard-iot/internal/web/static/cedardb-logo.svg
@@ -0,0 +1,10 @@
+
diff --git a/homeguard-iot/internal/web/static/queries.html b/homeguard-iot/internal/web/static/queries.html
new file mode 100644
index 0000000..a3dceca
--- /dev/null
+++ b/homeguard-iot/internal/web/static/queries.html
@@ -0,0 +1,865 @@
+
+
+
+
+HomeGuard IoT — SQL queries reference
+
+
+
+
+
+
+
+ Every SQL statement the simulator and dashboard run against CedarDB, in
+ one place. The interesting bit isn't any individual query — it's that
+ all of these run concurrently against the same instance:
+ N writer goroutines are CopyFrom-ing into events
+ at up to ~500 K events/sec (≈ 3 TB/day uncompressed) while four
+ different read paths (active-alerts queue, live event stream, customer
+ drill-down, ingest stats) are joining across the same hot table and the
+ normalized dimensions at 200 ms – 2 s refresh cadences. The whole pitch
+ is collapsing the BigQuery + Pub/Sub-plus-KV split onto one database
+ with real joins and no replication lag.
+
+
+ Tuning the cadences. Every "refresh N s" pill below is
+ an env-var-driven default — set HG_ALERTS_REFRESH,
+ HG_DRILLDOWN_REFRESH, HG_STATS_REFRESH,
+ HG_STORAGE_REFRESH, HG_SSE_INTERVAL
+ (on the server container), or HG_STORAGE_SAMPLER_INTERVAL
+ (on the simulator container) to any Go duration string (e.g.
+ "500ms", "2s", "30s") to dial
+ the read pressure CedarDB has to serve next to the ingest stream.
+
+
+
+ hot · every tick (10 Hz)
+ fast · 200 ms – 1 s
+ medium · 1 – 2 s
+ slow · 2 s+ background
+ once · startup
+
+ ~20 flushes/s · ≤ 10K rows each
+ up to ~500 K ev/s
+ internal/sim/simulator.go · ingestorLoop()
+
+
+
+ The firehose. -writers producer goroutines partition
+ the device fleet, build row batches, and push them onto a buffered
+ channel. A single ingestor goroutine drains the
+ channel, coalesces up to 10 000 rows (or every 50 ms, whichever
+ comes first), and runs one binary CopyFrom. This
+ sidesteps CedarDB's "cannot start bulk operation until previous
+ bulk operation has become globally visible" error (SQLSTATE
+ 40P01) that fires when more than one goroutine COPYs in parallel,
+ while still keeping the binary protocol's ~3× wire and parsing
+ advantage over multi-row INSERT under simple-query mode.
+ event_id is an atomic.Int64 seeded from
+ MAX(event_id) at startup, so producers share the id
+ space without locking — the column is plain BIGINT,
+ not BIGSERIAL (CedarDB rejects COPY for sequence-
+ defaulted columns with "unable to cast from void to bigint").
+
+
-- One round trip per ingestor flush (≤ 10K rows, or every 50ms).
+COPY events (
+ event_id, device_id, household_id, ts,
+ kind, severity, value, battery_pct, rssi_dbm
+) FROM STDIN BINARY;
+-- ...one binary tuple per buffered event...
+
+ On startup, the simulator runs one read to seed the id counter:
+
+ Called inline from emitTick whenever EvaluateAlert
+ in rules.go promotes a triggered/tamper event to an
+ operator-actionable alert. status starts 'active'; the
+ background resolveAlertsLoop later marks it resolved or
+ false_alarm.
+
+ every 1 s
+ joins 4 tables
+ internal/web/server.go · handleAlerts()
+
+
+
+ The top panel of the dashboard. Joins alerts × households × plans
+ × regions so the SLA countdown
+ (plan.sla_seconds − age) and the dispatch-center attribution
+ are computed by the database, not the app. Ordered by severity DESC
+ then raised_at ASC — escalations first, then oldest-of-equal-severity.
+
+
SELECT a.alert_id,
+ a.severity,
+ a.detail,
+ EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS age_s,
+ p.sla_seconds,
+ p.sla_seconds
+ - EXTRACT(EPOCH FROM (now() - a.raised_at))::int AS sla_remaining,
+ h.household_id,
+ h.address_hash,
+ p.name AS plan_name,
+ r.name AS region_name,
+ r.dispatch_center
+FROM alerts a
+JOIN households h ON h.household_id = a.household_id
+JOIN plans p ON p.plan_id = h.plan_id
+JOIN regions r ON r.region_id = h.region_id
+WHERE a.status = 'active'
+ORDER BY a.severity DESC, a.raised_at ASC
+LIMIT 25;
+ SSE every 200 ms
+ joins 5 tables
+ internal/web/server.go · pushEventStream()
+
+
+
+ Bottom-left panel, the ticker tape. Joins events × devices ×
+ device_types × households × regions so each row renders with a
+ human-readable device code, location and region. The
+ WHERE e.kind > 0 filter drops heartbeats and
+ rides the events_kind_ts_idx index.
+
+
SELECT e.event_id,
+ e.ts,
+ e.household_id,
+ h.address_hash,
+ dt.code,
+ d.location,
+ e.kind,
+ e.severity,
+ COALESCE(e.battery_pct, -1),
+ r.name
+FROM events e
+JOIN devices d ON d.device_id = e.device_id
+JOIN device_types dt ON dt.device_type_id = d.device_type_id
+JOIN households h ON h.household_id = e.household_id
+JOIN regions r ON r.region_id = h.region_id
+WHERE e.kind > 0
+ORDER BY e.ts DESC
+LIMIT 25;
+ every 2 s
+ first step of 3
+ internal/web/server.go · handleDrilldown()
+
+
+
+ Bottom-right panel auto-rotates to whichever household has the
+ highest-severity currently-active alert. Severity DESC then raised_at
+ ASC mirrors the operator's actual triage order.
+
+
SELECT a.household_id
+FROM alerts a
+WHERE a.status = 'active'
+ORDER BY a.severity DESC, a.raised_at ASC
+LIMIT 1;
+ when no active alerts
+ GROUP BY on hot table
+ internal/web/server.go · handleDrilldown()
+
+
+
+ Runs only when the first query found no active alerts — picks the
+ chattiest household in the last 5 minutes so the drill-down panel never
+ sits empty during a demo. A small but real aggregate over the hot
+ table.
+
+ every 2 s
+ uses events_household_ts_idx
+ internal/web/server.go · handleDrilldown()
+
+
+
+ The "what's going on at this household right now" panel. Indexed on
+ events (household_id, ts DESC) so the LIMIT 20 returns
+ instantly even with the table at billion-row scale.
+
+
SELECT e.ts,
+ dt.code,
+ d.location,
+ e.kind,
+ e.severity,
+ COALESCE(e.battery_pct, -1)
+FROM events e
+JOIN devices d ON d.device_id = e.device_id
+JOIN device_types dt ON dt.device_type_id = d.device_type_id
+WHERE e.household_id = $1
+ORDER BY e.ts DESC
+LIMIT 20;
+ every HG_STATS_REFRESH
+ storage_samples · ≤ a few hundred rows
+ internal/web/server.go · handleStats()
+
+
+
+ The two most recent storage_samples rows answer both
+ "how many rows are in events" and "how fast are they growing": the
+ table is just (sampled_at, uncompressed_bytes) pairs
+ the simulator writes every HG_STORAGE_SAMPLER_INTERVAL
+ from its in-process atomic counter, so the delta-over-time gives
+ you events/sec without ever touching events. Original
+ iterations of this query did COUNT(*) FILTER (WHERE ts >
+ now() - interval '1 second') on events directly — but
+ without an events (ts DESC) index that's a full table
+ scan, which at billion-row scale takes long enough to make the
+ footer freeze. (The published rows_per_sec is a
+ sample-interval average rather than a strict 1-second window.)
+
+
SELECT
+ (SELECT uncompressed_bytes FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+ (SELECT sampled_at FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_bytes,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at < (SELECT MAX(sampled_at) FROM storage_samples)
+ ORDER BY sampled_at DESC LIMIT 1) AS prior_ts;
+-- total_events = latest_bytes / 48
+-- rows_per_sec = (latest_bytes - prior_bytes) / dt / 48
+ every 1 s
+ 8 subselects · one round trip
+ internal/web/server.go · handleStorage()
+
+
+
+ Feeds the Storage Growth gauge. Picks the freshest sample and, for
+ each window, the freshest sample at or before the
+ window-start cutoff — handles uneven sampler timing without lying
+ about the elapsed seconds (the rate is computed in Go from the two
+ timestamps the query returns, not assumed). Reads only the
+ storage_samples table; the underlying compression view
+ is hit once every 5 s by the sampler, not here.
+
+
SELECT
+ (SELECT uncompressed_bytes FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_bytes,
+ (SELECT sampled_at FROM storage_samples
+ ORDER BY sampled_at DESC LIMIT 1) AS latest_ts,
+
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '1 minute'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_1m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '1 minute'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_1m,
+
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '5 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_5m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '5 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_5m,
+
+ (SELECT uncompressed_bytes FROM storage_samples
+ WHERE sampled_at <= now() - interval '15 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS bytes_15m,
+ (SELECT sampled_at FROM storage_samples
+ WHERE sampled_at <= now() - interval '15 minutes'
+ ORDER BY sampled_at DESC LIMIT 1) AS ts_15m;
+
+
+
+
3. Background maintenance4 queries — keep the demo dynamic
+ every 2 s
+ LIMIT 600 per tick
+ internal/sim/simulator.go · resolveAlertsLoop()
+
+
+
+ Severity 3+ alerts only get cleared after they've sat in the queue
+ for > 20 seconds — gives the operator panel something to look at
+ before the simulated dispatch fires.
+
+
UPDATE alerts
+SET status = 'resolved',
+ resolved_at = now(),
+ resolution_ms = (EXTRACT(EPOCH FROM (now() - raised_at)) * 1000)::int
+WHERE alert_id IN (
+ SELECT alert_id
+ FROM alerts
+ WHERE status = 'active' AND severity >= 3
+ AND raised_at < now() - interval '20 seconds'
+ ORDER BY raised_at ASC
+ LIMIT 600
+);
+ every ~10 s · batched
+ PK update · ~1 500 rows/round
+ internal/sim/simulator.go · shuffleArmedState()
+
+
+
+ Flips ~5 % of households armed/disarmed each round, sent as a single
+ pgx.Batch. Keeps the rules engine outputs varying — door
+ opens flip between "routine" and "alarmable" without restarting the
+ sim.
+
+ every 5 s
+ 1 row/call · no read needed
+ internal/sim/simulator.go · storageSamplerLoop()
+
+
+
+ The size value is 48 × s.eventID.Load() — the in-process
+ atomic counter the producers increment per generated row, no DB
+ query needed. Earlier iterations polled
+ cedardb_compression_info (laggy because it only
+ reflects rows packed into column-store blocks) and then
+ COUNT(*) FROM events (exact but increasingly expensive
+ as the table grows past a billion rows, and a likely cause of write
+ stalls because the long scan held an MVCC snapshot). The atomic
+ counter is exact for rows that producers tried to write —
+ diverges from the table count only by the channel queue plus the
+ in-flight COPY batch (≤ ~160 K rows ≈ 8 MB at the configured
+ backpressure ceiling). sampled_at is the table's PK
+ and is unique at the 5 s sampler cadence (microsecond resolution).
+
+
INSERT INTO storage_samples (sampled_at, uncompressed_bytes)
+VALUES (now(), $1);
+
+
+
+
4. Startup & bootstrap5 queries — once per cold start
+ The embedded schema.sql is split on ; and each
+ statement is run individually under simple-query protocol — CedarDB
+ accepts DDL through that path but silently no-ops some of it under
+ extended Parse/Bind/Execute, so we force the simpler path.
+
+
CREATE TABLE IF NOT EXISTS plans (
+ plan_id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ monthly_price_usd NUMERIC(8, 2) NOT NULL,
+ sla_seconds INTEGER NOT NULL
+);
+-- ... regions, device_types, households, devices, events, alerts ...
+-- ... plus the four supporting indexes (trimmed for the high-rate demo —
+-- every secondary index on events is write amplification at 500 K/s):
+CREATE INDEX IF NOT EXISTS events_household_ts_idx ON events (household_id, ts DESC);
+CREATE INDEX IF NOT EXISTS events_kind_ts_idx ON events (kind, ts DESC);
+CREATE INDEX IF NOT EXISTS alerts_status_raised_idx
+ ON alerts (status, raised_at DESC);
+CREATE INDEX IF NOT EXISTS alerts_household_raised_idx
+ ON alerts (household_id, raised_at DESC);
+ once · single batch
+ ~25 rows total
+ internal/sim/simulator.go · upsertDimensions()
+
+
+
+ The three small static dimension tables are populated from
+ catalog.go using INSERT ... ON CONFLICT DO UPDATE
+ so the simulator is safe to restart against a hot database. All three
+ upserts ride a single pgx.Batch.
+
+
-- plans
+INSERT INTO plans (plan_id, name, monthly_price_usd, sla_seconds)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (plan_id) DO UPDATE SET
+ name = EXCLUDED.name,
+ monthly_price_usd = EXCLUDED.monthly_price_usd,
+ sla_seconds = EXCLUDED.sla_seconds;
+
+-- regions
+INSERT INTO regions (region_id, name, dispatch_center, timezone)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (region_id) DO UPDATE SET
+ name = EXCLUDED.name,
+ dispatch_center = EXCLUDED.dispatch_center,
+ timezone = EXCLUDED.timezone;
+
+-- device_types
+INSERT INTO device_types (device_type_id, code, name, default_severity)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (device_type_id) DO UPDATE SET
+ code = EXCLUDED.code,
+ name = EXCLUDED.name,
+ default_severity = EXCLUDED.default_severity;
+ Synthesised fleet from generateFleet(). Run as batches of
+ 1 000 INSERTs per pgx.Batch.SendBatch — simpler and almost
+ as fast as a staging-table COPY at this scale, idempotent on restart.
+
+ Same shape as the households upsert. ~10 devices per household by
+ default; total fleet is whatever -households ×
+ -devices-per-household lands on.
+
+ Only runs when the simulator is started with -reset-schema.
+ Dropped in reverse-FK order so the CASCADE never has anything real to
+ do, then ApplySchema rebuilds.
+
+
DROP TABLE IF EXISTS storage_samples CASCADE;
+DROP TABLE IF EXISTS alerts CASCADE;
+DROP TABLE IF EXISTS events CASCADE;
+DROP TABLE IF EXISTS devices CASCADE;
+DROP TABLE IF EXISTS households CASCADE;
+DROP TABLE IF EXISTS device_types CASCADE;
+DROP TABLE IF EXISTS regions CASCADE;
+DROP TABLE IF EXISTS plans CASCADE;