Skip to content

Add max_blocked: option to limits_concurrency#4

Closed
lucasmeira-wealthbox wants to merge 6 commits into
batch-rc-2-on-v1.4.0from
lucasmeira/aia-733-max-blocked
Closed

Add max_blocked: option to limits_concurrency#4
lucasmeira-wealthbox wants to merge 6 commits into
batch-rc-2-on-v1.4.0from
lucasmeira/aia-733-max-blocked

Conversation

@lucasmeira-wealthbox
Copy link
Copy Markdown

Summary

Adds max_blocked: N to limits_concurrency. When set, enqueues that would push the blocked count above N are dropped (drop-newer semantics) instead of accumulating unbounded.

  • No schema change — serializes via FOR UPDATE on the existing semaphore row
  • max_blocked: nil (default) preserves today's unbounded :block behavior
  • Instruments dropped jobs via solid_queue.max_blocked_dropped
  • Adds 4 integration tests covering: cap enforcement, drop-newer semantics, nil/unbounded regression, and multi-key isolation

Motivation

OpenSearch::IndexDocumentJob and OpenSearch::BulkIndexAccountJob use limits_concurrency to: 1 keyed per record/account. Rapid updates can accumulate hundreds of redundant blocked jobs. max_blocked: 1 gives us 1 running + 1 pending per key, dropping all further enqueues until the running job finishes.

Changes

  • lib/active_job/concurrency_controls.rbclass_attribute :concurrency_max_blocked; max_blocked: kwarg on limits_concurrency
  • app/models/solid_queue/job/concurrency_controls.rb — delegate + block rewrite
  • test/dummy/app/jobs/max_blocked_update_result_job.rb — test job with max_blocked: 1
  • test/integration/concurrency_controls_test.rb — 4 new tests

Test plan

cd solid_queue
TARGET_DB=postgres bin/setup
TARGET_DB=postgres bin/rails test test/integration/concurrency_controls_test.rb

Adds a generic `max_blocked: N` integer option to `limits_concurrency`.
When set, enqueues that would exceed the cap are dropped (drop-newer
semantics) rather than accumulating an unbounded blocked queue.

Serializes the count check via a FOR UPDATE on the existing semaphore
row — no schema change required. Instruments dropped jobs via
solid_queue.max_blocked_dropped.

`max_blocked: nil` (default) preserves today's unbounded block behavior.
@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

Review findings (architecture / security / performance)

One confirmed HIGH finding, one overstated.


HIGH — destroy should be destroy! in block method

File: app/models/solid_queue/job/concurrency_controls.rb

# current
destroy

# should be
destroy!

If destroy returns false (a before_destroy callback returning false, or a DB error), the job is silently orphaned — no blocked execution, no ready execution, no max_blocked_dropped instrumentation event. The job remains in solid_queue_jobs indefinitely and is never cleaned up.

destroy! is correct: failure raises an exception, which propagates out of the after_create callback and rolls back job creation entirely — the same semantics as the existing :discard on_conflict path.


Disputed — TOCTOU on blocked count

Multiple reviewers flagged the SELECT ... FOR UPDATE + COUNT pattern as racy. This appears overstated: FOR UPDATE on the semaphore row does serialize concurrent block calls for the same key — T2 blocks at Semaphore.lock.find_by until T1 commits, so T2's count reflects T1's insert. The only real gap is the documented §4.5 edge case (semaphore deleted by maintenance between wait and block), which is explicitly accepted as a soft cap in the design.

@lucasmeira-wealthbox lucasmeira-wealthbox marked this pull request as draft May 21, 2026 16:38
- destroy! instead of destroy so a failed destroy propagates as an
  exception and rolls back job creation, matching the :discard path
- Remove provider_job_id and BlockedExecution.first assertions from the
  drop-newer test; they were order-dependent and tied to an impl detail.
  The count assertion is sufficient to prove the cap is honored.
Wrap the three storm enqueues in assert_difference and extend J1's pause
to 2s so the semaphore is guaranteed held during the count check.
@lucasmeira-wealthbox lucasmeira-wealthbox marked this pull request as ready for review May 21, 2026 17:28
@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

How max_blocked fits into the SolidQueue concurrency flow

perform_later(job)
       │
       ▼
  Job created in DB
  after_create → prepare_for_execution → dispatch
       │
       ▼
  acquire_concurrency_lock → Semaphore.wait(job)
  ┌─────────────────────────────────────────────────────────────────┐
  │  First enqueue for this key?                                    │
  │    yes → create semaphore row (value = limit - 1 = 0) → true   │
  │    no  → SELECT ... FOR UPDATE on existing semaphore row        │
  │          value > 0? → decrement → true                         │
  │          value = 0? → false                                     │
  └─────────────────────────────────────────────────────────────────┘
       │ true                              │ false
       ▼                                   ▼
  ReadyExecution created          handle_concurrency_conflict
  → worker picks up                        │
                                           ├── on_conflict: :discard → job.destroy
                                           │
                                           └── default: block
                                                    │
                                                    ▼
                                         max_blocked nil?
                                                    │
                                         yes → BlockedExecution.create  (unbounded)
                                                    │
                                         no  [THIS PR]
                                                    │
                                                    ▼
                                         Job.transaction do
                                           SELECT ... FOR UPDATE on semaphore row
                                           ← serializes concurrent storm enqueues
                                           BlockedExecution.where(key).count < max_blocked?
                                             yes → BlockedExecution.create  (queued)
                                             no  → instrument(:max_blocked_dropped)
                                                   job.destroy!  (dropped, gone)
                                         end


════════════ Job finishes ════════════

ClaimedExecution#perform
  ensure → unblock_next_job → job.unblock_next_blocked_job
       │
       ├── Semaphore.signal → increment value (releases the lock)
       │
       └── BlockedExecution.release_one(concurrency_key)
                │
                ▼
           Lock oldest blocked_execution row
           → BlockedExecution#release
               re-acquire semaphore (Semaphore.wait)
               → promote_to_ready (ReadyExecution.create!)
               → destroy! blocked_execution

Key invariant max_blocked relies on: by the time block is called, the semaphore row already exists (created by the first Semaphore.wait). The SELECT ... FOR UPDATE on that row serializes two concurrent storm enqueues — without it, two threads could both read count < max_blocked as true and both insert, breaking the cap.

@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

SolidQueue: high-level overview

SolidQueue is a database-backed Active Job adapter. All state lives in a set of solid_queue_* tables — there is no separate broker process.

Job lifecycle

When perform_later is called, a solid_queue_jobs row is created and an after_create callback immediately dispatches it. If the job is due now it goes into solid_queue_ready_executions; if it is scheduled for the future it goes into solid_queue_scheduled_executions until a dispatcher promotes it.

Workers and dispatchers

A supervisor process forks workers and dispatchers. Workers poll solid_queue_ready_executions, claim a batch of rows (moving them into solid_queue_claimed_executions atomically), and run the jobs. Dispatchers handle two separate concerns: promoting scheduled jobs that are now due, and unblocking stuck concurrency-limited jobs whose semaphores have expired.

Concurrency controls

Jobs that declare limits_concurrency go through a semaphore check on enqueue. The semaphore is a single row in solid_queue_semaphores keyed by the concurrency key. The first enqueue for a key creates that row; subsequent enqueues lock it with SELECT ... FOR UPDATE and decrement the value. If the value is already zero, the job cannot run yet and is either blocked or discarded depending on on_conflict.

Blocked jobs sit in solid_queue_blocked_executions. When a running job finishes, it signals the semaphore (increments the value) and calls BlockedExecution.release_one, which locks the oldest blocked row, re-acquires the semaphore, and promotes that job to ready.

Failure and retry

If a job raises, the claimed execution is moved to solid_queue_failed_executions. Retry logic (backoff, max attempts) is handled at the Active Job layer; SolidQueue re-enqueues the job as a new ready execution on each retry.

Reliability

Because all state is in the database, jobs survive process crashes. The dispatcher's concurrency maintenance pass periodically expires stale semaphores and unblocks any executions that were orphaned by a worker that died without releasing its lock.

@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

Blocker: need to better understand versioning + deployment strategy for this report

@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

TARGET_DB=sqlite bin/rails test
Run options: --seed 54121

# Running:

............................................................................SS.....................................................................................................................S......................................................

Finished in 169.296754s, 1.4767 runs/s, 8.1986 assertions/s.
250 runs, 1388 assertions, 0 failures, 0 errors, 3 skips

lucasmeira-wealthbox and others added 2 commits May 21, 2026 15:08
C's pause was 1.5s — on a loaded CI machine D–H (0.05s each) could
outlast it, breaking the final assertion. Raise C to 3s and the
sleep guard to 0.1s, following the same pattern used in prior
flakiness fixes (74b12c8, ac912dd, d330516).
Cover the gating logic at the Job model layer, complementing the
existing integration tests with fast, deterministic cases that don't
require workers:

- block up to max_blocked, then discard further conflicts
- the cap reopens after the blocked job is released
- the cap is enforced per concurrency key
- max_blocked respects concurrency_limit > 1 (e.g. 2 ready + 2 blocked)
- max_blocked is a no-op when on_conflict is :discard
BlockedExecution.create_or_find_by!(job_id: id)
else
SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id)
destroy!
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this back to destroy as that's what is used on line 55.

@lucasmeira-wealthbox
Copy link
Copy Markdown
Author

Duplicate PR: #5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants