Add max_blocked: option to limits_concurrency#4
Conversation
Adds a generic `max_blocked: N` integer option to `limits_concurrency`. When set, enqueues that would exceed the cap are dropped (drop-newer semantics) rather than accumulating an unbounded blocked queue. Serializes the count check via a FOR UPDATE on the existing semaphore row — no schema change required. Instruments dropped jobs via solid_queue.max_blocked_dropped. `max_blocked: nil` (default) preserves today's unbounded block behavior.
Review findings (architecture / security / performance)One confirmed HIGH finding, one overstated. HIGH —
|
- destroy! instead of destroy so a failed destroy propagates as an exception and rolls back job creation, matching the :discard path - Remove provider_job_id and BlockedExecution.first assertions from the drop-newer test; they were order-dependent and tied to an impl detail. The count assertion is sufficient to prove the cap is honored.
Wrap the three storm enqueues in assert_difference and extend J1's pause to 2s so the semaphore is guaranteed held during the count check.
How
|
SolidQueue: high-level overviewSolidQueue is a database-backed Active Job adapter. All state lives in a set of Job lifecycle When Workers and dispatchers A supervisor process forks workers and dispatchers. Workers poll Concurrency controls Jobs that declare Blocked jobs sit in Failure and retry If a job raises, the claimed execution is moved to Reliability Because all state is in the database, jobs survive process crashes. The dispatcher's concurrency maintenance pass periodically expires stale semaphores and unblocks any executions that were orphaned by a worker that died without releasing its lock. |
|
Blocker: need to better understand versioning + deployment strategy for this report |
|
Cover the gating logic at the Job model layer, complementing the existing integration tests with fast, deterministic cases that don't require workers: - block up to max_blocked, then discard further conflicts - the cap reopens after the blocked job is released - the cap is enforced per concurrency key - max_blocked respects concurrency_limit > 1 (e.g. 2 ready + 2 blocked) - max_blocked is a no-op when on_conflict is :discard
| BlockedExecution.create_or_find_by!(job_id: id) | ||
| else | ||
| SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id) | ||
| destroy! |
There was a problem hiding this comment.
I think we should change this back to destroy as that's what is used on line 55.
|
Duplicate PR: #5 |
Summary
Adds
max_blocked: Ntolimits_concurrency. When set, enqueues that would push the blocked count aboveNare dropped (drop-newer semantics) instead of accumulating unbounded.FOR UPDATEon the existing semaphore rowmax_blocked: nil(default) preserves today's unbounded:blockbehaviorsolid_queue.max_blocked_droppedMotivation
OpenSearch::IndexDocumentJobandOpenSearch::BulkIndexAccountJobuselimits_concurrency to: 1keyed per record/account. Rapid updates can accumulate hundreds of redundant blocked jobs.max_blocked: 1gives us 1 running + 1 pending per key, dropping all further enqueues until the running job finishes.Changes
lib/active_job/concurrency_controls.rb—class_attribute :concurrency_max_blocked;max_blocked:kwarg onlimits_concurrencyapp/models/solid_queue/job/concurrency_controls.rb— delegate +blockrewritetest/dummy/app/jobs/max_blocked_update_result_job.rb— test job withmax_blocked: 1test/integration/concurrency_controls_test.rb— 4 new testsTest plan