Add service health endpoints + realtime transcription monitoring#6033
Add service health endpoints + realtime transcription monitoring#6033atlas-agent-omi[bot] wants to merge 5 commits intomainfrom
Conversation
New endpoints (all unauthenticated, 5s timeout):
- GET /v1/health/chat — pings Anthropic API
- GET /v1/health/transcription — pings Deepgram API
- GET /v1/health/ai — pings OpenAI API
- GET /v1/health/storage — pings Firestore
- GET /v1/health/search — pings Typesense
- GET /v1/health/services — aggregate check (all above)
Returns 200 + {status: ok} when healthy, 503 + {status: down, error: ...} when not.
Designed for external monitoring (Instatus/BetterStack/etc).
- GET /v1/health/listen: tracks Deepgram WebSocket keepalive failures in a rolling 5-minute window. Returns ok/degraded/down based on failure count thresholds. - Adds DG_KEEPALIVE_FAILURES Prometheus counter + rolling failure tracker to utils/metrics.py - Instruments safe_socket.py to record keepalive failures - Includes listen health in /v1/health/services aggregate - Also includes all Cloud Run service health endpoints (chat, transcription, ai, storage, search) from PR #6032
Greptile SummaryThis PR adds a suite of unauthenticated health-check endpoints ( The core idea is sound and addresses a real observability gap. A few issues need attention before merging:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Monitor as Instatus / External Monitor
participant FastAPI as FastAPI (backend)
participant Health as health.py router
participant Metrics as utils/metrics.py
participant SafeSocket as SafeDeepgramSocket
participant DG as Deepgram WebSocket
Note over SafeSocket,DG: Continuous background thread (dg-keepalive)
SafeSocket->>DG: keep_alive()
DG-->>SafeSocket: False / Exception
SafeSocket->>Metrics: DG_KEEPALIVE_FAILURES.inc()
SafeSocket->>Metrics: dg_failure_tracker.record(timestamp)
Note over Monitor,FastAPI: Health poll (unauthenticated)
Monitor->>FastAPI: GET /v1/health/listen
FastAPI->>Health: health_listen()
Health->>Metrics: dg_failure_tracker.count()
Metrics-->>Health: failures (rolling 5-min window)
Health-->>Monitor: 200 ok / 200 degraded / 503 down
Monitor->>FastAPI: GET /v1/health/services
FastAPI->>Health: health_services()
par Concurrent checks
Health->>Health: _check_anthropic()
Health->>Health: _check_deepgram()
Health->>Health: _check_openai()
Health->>Health: _check_firestore()
Health->>Health: _check_typesense()
end
Health-->>Monitor: 200 ok / 200 degraded / 503 down (aggregate)
Reviews (1): Last reviewed commit: "Add realtime transcription health endpoi..." | Re-trigger Greptile |
| async def _check_firestore() -> dict: | ||
| """Check Firestore connectivity with a minimal read.""" | ||
| try: | ||
| from database._client import db | ||
| # Read a nonexistent doc — fast, just checks connectivity | ||
| doc = db.collection('_health_check').document('ping').get() | ||
| return {"status": "ok"} | ||
| except Exception as e: | ||
| return {"status": "down", "error": str(e)[:200]} |
There was a problem hiding this comment.
Blocking sync Firestore call inside async function
db.collection(...).document(...).get() is a synchronous, blocking call from the google-cloud-firestore sync client (firestore.Client()). Calling it directly inside an async def function blocks the entire event loop thread for as long as Firestore takes to respond — up to the 5-second TIMEOUT. During that window every other concurrent async handler in the FastAPI process is stalled.
Wrap the call with asyncio.get_event_loop().run_in_executor so it runs on a thread-pool thread:
async def _check_firestore() -> dict:
"""Check Firestore connectivity with a minimal read."""
try:
import asyncio
from database._client import db
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: db.collection('_health_check').document('ping').get()
)
return {"status": "ok"}
except Exception as e:
return {"status": "down", "error": str(e)[:200]}(Note: importing inside the function is used here only for illustration — see separate comment about in-function imports.)
| async def _check_firestore() -> dict: | ||
| """Check Firestore connectivity with a minimal read.""" | ||
| try: | ||
| from database._client import db |
There was a problem hiding this comment.
In-function imports violate backend import rules
The project's explicit backend import rule states: "Keep imports at module top level only; no in-function imports." This file has three in-function imports:
health.py:82—from database._client import dbinside_check_firestorehealth.py:162—from utils.metrics import dg_failure_tracker, BACKEND_LISTEN_ACTIVE_WS_CONNECTIONSinsidehealth_listenhealth.py:200—from utils.metrics import dg_failure_trackerinsidehealth_services
Additionally, safe_socket.py lines 93 and 105 add in-function imports (from utils.metrics import DG_KEEPALIVE_FAILURES, dg_failure_tracker), also violating this rule. While the module docstring says it's intentionally lightweight for tests, the right solution is a conditional top-level import or a separate adapter rather than in-function imports.
All of these should be moved to module-level imports at the top of their respective files.
Context Used: Backend Python import rules - no in-function impor... (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| from utils.metrics import dg_failure_tracker, BACKEND_LISTEN_ACTIVE_WS_CONNECTIONS | ||
| failures = dg_failure_tracker.count() | ||
| try: | ||
| active_connections = int(BACKEND_LISTEN_ACTIVE_WS_CONNECTIONS._value.get()) |
There was a problem hiding this comment.
Accessing private prometheus_client internal API
BACKEND_LISTEN_ACTIVE_WS_CONNECTIONS._value.get() relies on the private _value attribute of prometheus_client's Gauge class. This is an undocumented internal implementation detail that can change without notice in any minor release.
Consider using prometheus_client's collect() method to read the current value, or simply track a separate plain Python int / threading.Event counter for the connection count that you can read safely without touching private attributes:
try:
samples = list(BACKEND_LISTEN_ACTIVE_WS_CONNECTIONS.collect()[0].samples)
active_connections = int(samples[0].value) if samples else -1
except Exception:
active_connections = -1| # Rolling window for health check (last 5 minutes) | ||
| import time | ||
| import threading |
There was a problem hiding this comment.
Standard library imports placed mid-file
import time and import threading are placed after the Prometheus counter definitions instead of at the top of the file with the other imports. Python convention (and most linters) require all imports to appear at the top of the module before any other statements.
| # Rolling window for health check (last 5 minutes) | |
| import time | |
| import threading | |
| import threading | |
| import time | |
| from prometheus_client import Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST | |
| from fastapi import Response |
(The import time/import threading lines at lines 40-41 should be removed from their current location.)
| DG_CONNECTION_CLOSES = Counter( | ||
| 'dg_connection_closes_total', | ||
| 'Total Deepgram WebSocket connection closes (1011 errors)', | ||
| ) |
There was a problem hiding this comment.
DG_CONNECTION_CLOSES counter is defined but never incremented
The DG_CONNECTION_CLOSES counter is added to metrics.py but there is no corresponding call to DG_CONNECTION_CLOSES.inc() anywhere in the PR (or in the changed code in safe_socket.py). The counter will always read 0 in Prometheus, which can mislead operators into thinking no 1011 errors have occurred.
Either wire it up to the relevant close-event code path (e.g. when the socket is closed with a 1011 status), or remove the counter until the instrumentation point is ready.
…DG_CONNECTION_CLOSES counter
…ecutor, use public Gauge API
|
Closing — we went with a different approach. Instead of health endpoints inside the backend, we wired Grafana alerting directly to Instatus via native webhook integration:
No new code, no new services — just Grafana config + Instatus config. Alerts fire → Instatus incident created → alert resolves → incident auto-resolves. |
|
Hey @atlas-agent-omi[bot] 👋 Thank you so much for taking the time to contribute to Omi! We truly appreciate you putting in the effort to submit this pull request. After careful review, we've decided not to merge this particular PR. Please don't take this personally — we genuinely try to merge as many contributions as possible, but sometimes we have to make tough calls based on:
Your contribution is still valuable to us, and we'd love to see you contribute again in the future! If you'd like feedback on how to improve this PR or want to discuss alternative approaches, please don't hesitate to reach out. Thank you for being part of the Omi community! 💜 |
What
Adds health check endpoints for all critical services, including realtime transcription monitoring via Deepgram keepalive failure tracking.
Endpoints
GET /v1/health/chatGET /v1/health/transcriptionGET /v1/health/listenGET /v1/health/aiGET /v1/health/storageGET /v1/health/searchGET /v1/health/servicesHow
/v1/health/listenworksSafeDeepgramSocket.keep_alive()to record failures to a rolling 5-minute window counter0-5 failures→ 200 ok (normal background noise)6-20 failures→ 200 degraded21+ failures→ 503 downactive_connectionscount and a Prometheus counterdg_keepalive_failures_totalWhy
The transcription outage on Mar 24 wasn'''t visible on Cloud Run metrics because realtime transcription runs on GKE (
backend-listenpods). This endpoint lets Instatus (or any external monitor) detect when Deepgram WebSocket connections are dying.All endpoints are unauthenticated, 5s timeout, return JSON with
Cache-Control: no-cache.Files changed
backend/routers/health.py(new) — all health endpointsbackend/utils/metrics.py— addsDG_KEEPALIVE_FAILUREScounter +dg_failure_trackerbackend/utils/stt/safe_socket.py— instruments keepalive failure recordingbackend/main.py— registers health routerSupersedes #6032.