Add durable background jobs for memory and scanner work#180
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a durable background job system using MongoDB to handle long-running tasks in the memory and scanner modules. It adds a new job status endpoint, a worker for asynchronous execution, and relevant configuration settings. The review feedback primarily highlights the need to offload synchronous database operations to worker threads using asyncio.to_thread to avoid blocking the FastAPI event loop. Additionally, improvements were suggested for the stability of idempotency keys, the completeness of dead-letter logs, and the refactoring of duplicated user identification logic.
|
Hi @massy-o , thank you for the contribution. the PR looks good to me, mostly I am concerned that if we need to make the changes in the /v1/memory routes or we could upgrade the versioning and make the changes in /v2/memory routes leaving the /v1/memory as it is. What do you think on this? Also let me know your thoughts on celery & redis, did you try out the ingest endpoint after job tracking improvement and notice the latency? has that increased? |
|
Thanks @ishaanxgupta, that is a fair concern. On the API versioning question: I agree that changing the response contract of On Celery + Redis: I think that is a good production direction, especially once we want multiple worker processes, clearer operational controls, scheduling, and mature retry/dead-letter behavior. I kept this PR on the existing MongoDB dependency to make the first step smaller and avoid introducing Redis/Celery as new required infrastructure. The job store/worker boundary should also make it possible to swap the backend later without changing the route-level API much. On ingest latency: I have not run a production-like benchmark yet, so I do not want to overstate the numbers. The intended effect is that the request path only persists the job record and returns the status URL, while the expensive ingest pipeline runs out of band. So the interactive request latency should generally decrease versus synchronous ingest, with the tradeoff that completion is now observed via polling. There is a small extra cost for the Mongo job insert/status tracking, but that should be much smaller than the embedding/judge/weaver work. If useful, I can add a lightweight timing note/test or run a before/after local measurement as part of the PR update. So my proposed next step is move the async job-tracked ingest behavior to |
|
@massy-o yes that would be great, lets bring about these changes in /v2/memory and lets keep /v1/memory as it was. We can do the celery or redis integration in the next PR. |
There was a problem hiding this comment.
@massy-o thanks for the update. I would hold off on merging this until the async job-tracked memory ingest path is moved to /v2/memory and /v1/memory/ingest + /v1/memory/batch-ingest remain backward-compatible.
One more concern: scanner jobs use a longer timeout, but stale lease recovery still uses the default lease window. With the current defaults, a long scanner job could be picked up again by another worker while it is still running. Can we align the lease with the job timeout or add lease renewal/heartbeat handling?
After that, please add tests for the v1/v2 behavior, job status/idempotency, retry/dead-letter flow, and scanner enqueue path, plus the before/after latency numbers requested above.
|
Thanks @ishaanxgupta and @Ankit-Kotnala, I pushed the requested update in commit What changed:
Validation run locally:
Before/after local measurement: Results over 12 requests each:
So in this local synthetic measurement, the request return path is about 37.3x faster for the v2 enqueue path. This is not a production benchmark, but it confirms the intended behavior: v1 keeps the old synchronous contract, while v2 returns quickly after recording the job and leaves the expensive ingest work to the worker. |
|
| Filename | Overview |
|---|---|
| src/jobs.py | New MongoDB-backed job queue and async worker; broad exception catch in enqueue can mask original errors, and stale-lease recovery fetches all running jobs client-side on every poll tick. |
| src/api/routes/scanner.py | Scanner start/resume routed through durable queue, but the GitHub PAT is stored in plain text in the job payload and propagates to dead-letter records on failure. |
| src/api/routes/memory.py | New v2 enqueue endpoints added alongside existing v1 sync routes; run_ingest_job hard-codes 120s timeout and batch timeout has no upper cap. |
| src/api/routes/jobs.py | New /v1/jobs/{job_id} status endpoint; correctly scopes lookups to the authenticated owner and strips the payload from the response. |
| src/api/schemas.py | Adds JobEnqueueResponse and JobStatusResponse Pydantic models; clean and complete. |
| src/config/settings.py | Adds six new job-worker settings with sensible defaults; no issues found. |
| src/api/app.py | Registers job handlers and routes in lifespan; init_jobs is correctly called before yield and shutdown_jobs before boot_task cleanup. |
| src/api/dependencies.py | Adds get_owner_id() helper to centralise the username/name/id fallback chain; clean refactor. |
| tests/api/test_durable_jobs_routes.py | Covers v1/v2 memory enqueue, job status ownership scoping, idempotency, dead-letter, and per-job lease tests; good coverage for the new paths. |
Sequence Diagram
sequenceDiagram
participant Client
participant API as FastAPI Route
participant Store as JobStore (MongoDB)
participant Worker as JobWorker (async poll)
participant Handler as Job Handler
Client->>API: POST /v2/memory/ingest
API->>Store: enqueue(job_type, owner_id, payload)
Store-->>API: "{job_id, status: pending}"
API-->>Client: "200 {job_id, status_url}"
loop every job_poll_interval_seconds
Worker->>Worker: is_ready()?
Worker->>Store: claim_next(worker_id)
Store-->>Worker: job doc (or stale lease recovery)
Worker->>Handler: await handler(payload) [timeout]
alt success
Handler-->>Worker: result
Worker->>Store: succeed(job_id, result)
else failure
Worker->>Store: fail_or_retry(job, error)
Store-->>Store: retry (PENDING + backoff) or DEAD_LETTER
end
end
Client->>API: "GET /v1/jobs/{job_id}"
API->>Store: get(job_id, owner_id)
Store-->>API: job doc (payload stripped)
API-->>Client: JobStatusResponse
Reviews (1): Last reviewed commit: "Move async memory ingest to v2" | Re-trigger Greptile
|
Pushed commit 7dac71c addressing the latest Greptile findings:
Validation run locally: |
|
Greptile encountered an error while reviewing this PR. Please reach out to support@greptile.com for assistance. |
Refs #162
Summary
/v1/memory/ingestand/v1/memory/batch-ingestwork and expose/v1/jobs/{job_id}for job status/resultsThis is a focused Phase 1 implementation for the task-queue/status foundation described in the issue discussion.
Validation
python3 -m py_compile src/jobs.py src/api/routes/jobs.py src/api/routes/memory.py src/api/routes/scanner.py src/api/app.py src/api/schemas.pygit diff --checkuv run --with pytest --with pytest-asyncio --with fastapi --with pydantic --with pydantic-settings --with python-jose --with pymongo --with httpx --with beautifulsoup4 pytest tests/api/test_dependencies_and_routes.py -q-> 4 passed