From aaba5bdf57eac09b9ba21bf7e52411cbea49f284 Mon Sep 17 00:00:00 2001 From: Lucas Meira Date: Thu, 21 May 2026 09:36:37 -0400 Subject: [PATCH 1/6] Add max_blocked: option to limits_concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a generic `max_blocked: N` integer option to `limits_concurrency`. When set, enqueues that would exceed the cap are dropped (drop-newer semantics) rather than accumulating an unbounded blocked queue. Serializes the count check via a FOR UPDATE on the existing semaphore row — no schema change required. Instruments dropped jobs via solid_queue.max_blocked_dropped. `max_blocked: nil` (default) preserves today's unbounded block behavior. --- .../solid_queue/job/concurrency_controls.rb | 17 ++++- lib/active_job/concurrency_controls.rb | 4 +- .../app/jobs/max_blocked_update_result_job.rb | 3 + test/integration/concurrency_controls_test.rb | 68 +++++++++++++++++++ 4 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 test/dummy/app/jobs/max_blocked_update_result_job.rb diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 30d4399ed..7e70239fd 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, :concurrency_max_blocked, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -59,7 +59,20 @@ def handle_concurrency_conflict end def block - BlockedExecution.create_or_find_by!(job_id: id) + return BlockedExecution.create_or_find_by!(job_id: id) unless concurrency_max_blocked + + Job.transaction do + # Serialize concurrent enqueues for the same key via the semaphore row lock. + # The semaphore row is guaranteed to exist when we reach this point (see Semaphore::Proxy#wait). + Semaphore.lock.find_by(key: concurrency_key) + + if BlockedExecution.where(concurrency_key: concurrency_key).count < concurrency_max_blocked + BlockedExecution.create_or_find_by!(job_id: id) + else + SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id) + destroy + end + end end def release_next_blocked_job diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 47b0f5eea..c63fe96fa 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -14,15 +14,17 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period class_attribute :concurrency_on_conflict, default: :block + class_attribute :concurrency_max_blocked, default: nil end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block, max_blocked: nil) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block + self.concurrency_max_blocked = max_blocked end end diff --git a/test/dummy/app/jobs/max_blocked_update_result_job.rb b/test/dummy/app/jobs/max_blocked_update_result_job.rb new file mode 100644 index 000000000..e2872b22c --- /dev/null +++ b/test/dummy/app/jobs/max_blocked_update_result_job.rb @@ -0,0 +1,3 @@ +class MaxBlockedUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result }, max_blocked: 1 +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a12c48e42..279a5ba4c 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -253,6 +253,74 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence(@result, [ "1", "3" ]) end + test "max_blocked: cap honored — blocked count never exceeds limit under churn" do + # J1 starts running with a 2-second pause; 10 enqueues race in behind it + MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds) + sleep(0.1) # ensure J1 is claimed before the storm + + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + 9.times { |i| MaxBlockedUpdateResultJob.perform_later(@result, name: i.to_s) } + end + + assert_equal 1, SolidQueue::BlockedExecution.count + assert_equal 8, SolidQueue::Job.where(finished_at: nil).where.not(id: SolidQueue::BlockedExecution.select(:job_id)).count + SolidQueue::Job.where.not(finished_at: nil).count - 1 + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: drop-newer semantics — J1 runs, J2 queued, J3 and J4 dropped" do + MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 1.second) + sleep(0.1) # ensure J1 is claimed + + job2 = MaxBlockedUpdateResultJob.perform_later(@result, name: "2") + job3 = MaxBlockedUpdateResultJob.perform_later(@result, name: "3") + job4 = MaxBlockedUpdateResultJob.perform_later(@result, name: "4") + + assert_equal 1, SolidQueue::BlockedExecution.count + assert_equal job2.provider_job_id, SolidQueue::BlockedExecution.first.job_id + + # J3 and J4 were destroyed (no provider_job_id) + assert_nil job3.provider_job_id + assert_nil job4.provider_job_id + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: nil preserves today's unbounded block behavior" do + NonOverlappingUpdateResultJob.perform_later(@result, name: "1", pause: 1.second) + sleep(0.1) + + assert_difference -> { SolidQueue::BlockedExecution.count }, +9 do + ("2".."10").each { |name| NonOverlappingUpdateResultJob.perform_later(@result, name: name) } + end + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: different keys do not interfere" do + another_result = JobResult.create!(queue_name: "default", status: "") + + MaxBlockedUpdateResultJob.perform_later(@result, name: "A1", pause: 1.second) + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B1", pause: 1.second) + sleep(0.1) + + # One blocked per key allowed + MaxBlockedUpdateResultJob.perform_later(@result, name: "A2") + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B2") + + # These are dropped (cap hit per key) + MaxBlockedUpdateResultJob.perform_later(@result, name: "A3") + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B3") + + assert_equal 2, SolidQueue::BlockedExecution.count + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + end + private def assert_stored_sequence(result, sequence) expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join From 477e22b7f60b9a3f147324c59584b3ee583d5e04 Mon Sep 17 00:00:00 2001 From: Lucas Meira Date: Thu, 21 May 2026 09:42:51 -0400 Subject: [PATCH 2/6] Tighten max_blocked integration tests --- test/integration/concurrency_controls_test.rb | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 279a5ba4c..7a8bdc160 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -253,8 +253,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence(@result, [ "1", "3" ]) end - test "max_blocked: cap honored — blocked count never exceeds limit under churn" do - # J1 starts running with a 2-second pause; 10 enqueues race in behind it + test "max_blocked: cap honored — blocked count stays at 1 under a storm of enqueues" do + # J1 starts running with a 2-second pause; 9 more enqueues race in behind it MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds) sleep(0.1) # ensure J1 is claimed before the storm @@ -262,9 +262,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase 9.times { |i| MaxBlockedUpdateResultJob.perform_later(@result, name: i.to_s) } end - assert_equal 1, SolidQueue::BlockedExecution.count - assert_equal 8, SolidQueue::Job.where(finished_at: nil).where.not(id: SolidQueue::BlockedExecution.select(:job_id)).count + SolidQueue::Job.where.not(finished_at: nil).count - 1 - wait_for_jobs_to_finish_for(10.seconds) assert_no_unfinished_jobs end @@ -279,8 +276,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_equal 1, SolidQueue::BlockedExecution.count assert_equal job2.provider_job_id, SolidQueue::BlockedExecution.first.job_id - - # J3 and J4 were destroyed (no provider_job_id) assert_nil job3.provider_job_id assert_nil job4.provider_job_id @@ -300,18 +295,16 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_no_unfinished_jobs end - test "max_blocked: different keys do not interfere" do + test "max_blocked: independent concurrency keys each get their own cap" do another_result = JobResult.create!(queue_name: "default", status: "") MaxBlockedUpdateResultJob.perform_later(@result, name: "A1", pause: 1.second) MaxBlockedUpdateResultJob.perform_later(another_result, name: "B1", pause: 1.second) sleep(0.1) - # One blocked per key allowed MaxBlockedUpdateResultJob.perform_later(@result, name: "A2") MaxBlockedUpdateResultJob.perform_later(another_result, name: "B2") - - # These are dropped (cap hit per key) + # cap hit per key — these are dropped MaxBlockedUpdateResultJob.perform_later(@result, name: "A3") MaxBlockedUpdateResultJob.perform_later(another_result, name: "B3") From 3515bc2616a782010b82104b699e9896f6b1f804 Mon Sep 17 00:00:00 2001 From: Lucas Meira Date: Thu, 21 May 2026 12:57:26 -0400 Subject: [PATCH 3/6] Use destroy! and simplify drop-newer test assertions - destroy! instead of destroy so a failed destroy propagates as an exception and rolls back job creation, matching the :discard path - Remove provider_job_id and BlockedExecution.first assertions from the drop-newer test; they were order-dependent and tied to an impl detail. The count assertion is sufficient to prove the cap is honored. --- app/models/solid_queue/job/concurrency_controls.rb | 2 +- test/integration/concurrency_controls_test.rb | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 7e70239fd..4ffba678a 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -70,7 +70,7 @@ def block BlockedExecution.create_or_find_by!(job_id: id) else SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id) - destroy + destroy! end end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 7a8bdc160..5012ab87c 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -270,14 +270,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 1.second) sleep(0.1) # ensure J1 is claimed - job2 = MaxBlockedUpdateResultJob.perform_later(@result, name: "2") - job3 = MaxBlockedUpdateResultJob.perform_later(@result, name: "3") - job4 = MaxBlockedUpdateResultJob.perform_later(@result, name: "4") + MaxBlockedUpdateResultJob.perform_later(@result, name: "2") + MaxBlockedUpdateResultJob.perform_later(@result, name: "3") + MaxBlockedUpdateResultJob.perform_later(@result, name: "4") assert_equal 1, SolidQueue::BlockedExecution.count - assert_equal job2.provider_job_id, SolidQueue::BlockedExecution.first.job_id - assert_nil job3.provider_job_id - assert_nil job4.provider_job_id wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs From 7d769a55c5eb7fd7c7916268fa07f4cc79365285 Mon Sep 17 00:00:00 2001 From: Lucas Meira Date: Thu, 21 May 2026 13:26:16 -0400 Subject: [PATCH 4/6] Harden max_blocked test 2 against worker timing races Wrap the three storm enqueues in assert_difference and extend J1's pause to 2s so the semaphore is guaranteed held during the count check. --- test/integration/concurrency_controls_test.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 5012ab87c..3ae4fb819 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -267,14 +267,14 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "max_blocked: drop-newer semantics — J1 runs, J2 queued, J3 and J4 dropped" do - MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 1.second) - sleep(0.1) # ensure J1 is claimed - - MaxBlockedUpdateResultJob.perform_later(@result, name: "2") - MaxBlockedUpdateResultJob.perform_later(@result, name: "3") - MaxBlockedUpdateResultJob.perform_later(@result, name: "4") + MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds) + sleep(0.1) # ensure J1 is claimed before the storm - assert_equal 1, SolidQueue::BlockedExecution.count + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + MaxBlockedUpdateResultJob.perform_later(@result, name: "2") + MaxBlockedUpdateResultJob.perform_later(@result, name: "3") + MaxBlockedUpdateResultJob.perform_later(@result, name: "4") + end wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs From 2819a2e30c4adb7a9c84ec54f29da0afe65e0330 Mon Sep 17 00:00:00 2001 From: Lucas Meira Date: Thu, 21 May 2026 15:08:50 -0400 Subject: [PATCH 5/6] Widen timing window in throttled concurrency test to reduce flakiness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit C's pause was 1.5s — on a loaded CI machine D–H (0.05s each) could outlast it, breaking the final assertion. Raise C to 3s and the sleep guard to 0.1s, following the same pattern used in prior flakiness fixes (74b12c8, ac912dd, d330516). --- test/integration/concurrency_controls_test.rb | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 3ae4fb819..ae2aba2ce 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -55,28 +55,25 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "run several jobs over the same record limiting concurrency" do - incr = 0 # C is the last one to update the record # A: 0 to 0.5 # B: 0 to 1.0 - # C: 0 to 1.5 + # C: 0 to 3.0 — long enough to outlast D–H even on a loaded CI machine assert_no_difference -> { SolidQueue::BlockedExecution.count } do - ("A".."C").each do |name| - ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds) - incr += 0.5 + { "A" => 0.5, "B" => 1.0, "C" => 3.0 }.each do |name, pause| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: pause.seconds) end end - sleep(0.01) # To ensure these aren't picked up before ABC - # D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25) - # These would finish all before B and C + sleep(0.1) # To ensure these aren't picked up before ABC + # D to H: each 0.05s; all finish well before C's 3s pause is up assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do ("D".."H").each do |name| ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds) end end - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(10.seconds) assert_no_unfinished_jobs # C would have started in the beginning, seeing the status empty, and would finish after From 2c588f422e9a2764b439f88d049415f76bf8cd42 Mon Sep 17 00:00:00 2001 From: Jon Hinson Date: Fri, 22 May 2026 09:39:39 -0500 Subject: [PATCH 6/6] Add model-level tests for max_blocked Cover the gating logic at the Job model layer, complementing the existing integration tests with fast, deterministic cases that don't require workers: - block up to max_blocked, then discard further conflicts - the cap reopens after the blocked job is released - the cap is enforced per concurrency key - max_blocked respects concurrency_limit > 1 (e.g. 2 ready + 2 blocked) - max_blocked is a no-op when on_conflict is :discard --- test/models/solid_queue/job_test.rb | 96 +++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 47702bd18..7c3bd6c06 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -14,6 +14,18 @@ class DiscardableNonOverlappingJob < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard end + class BoundedBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1 + end + + class BoundedBlockedThrottledJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 2 + end + + class DiscardableWithMaxBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard, max_blocked: 5 + end + class DiscardableThrottledJob < NonOverlappingJob limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard end @@ -137,6 +149,90 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob end end + test "block jobs up to max_blocked, then discard further conflicts" do + assert_ready do + BoundedBlockedJob.perform_later(@result, name: "ready") + end + + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "blocked") + end + + # Cap is hit: the next two jobs should be silently discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-1") + BoundedBlockedJob.perform_later(@result, name: "discarded-2") + end + end + + test "max_blocked cap reopens after the blocked job is released" do + ready_active_job = BoundedBlockedJob.perform_later(@result, name: "ready") + BoundedBlockedJob.perform_later(@result, name: "blocked-1") + ready_job, blocked_job = SolidQueue::Job.last(2) + + # Cap full: this enqueue is discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded") + end + + # Discarding the ready job signals the semaphore and promotes the blocked one. + # Net: ready_job destroyed (-1 Job), BlockedExecution -1, ReadyExecution unchanged + # (one destroyed for ready_job, one created from promotion). + assert_job_counts(blocked: -1) do + ready_job.discard + end + assert blocked_job.reload.ready? + + # Cap is now empty; a fresh enqueue should block again, not be discarded + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "newly-blocked") + end + end + + test "max_blocked enforces the cap independently per concurrency key" do + another_result = JobResult.create!(queue_name: "default") + + BoundedBlockedJob.perform_later(@result, name: "ready-a") + BoundedBlockedJob.perform_later(another_result, name: "ready-b") + + assert_job_counts(blocked: 2) do + BoundedBlockedJob.perform_later(@result, name: "blocked-a") + BoundedBlockedJob.perform_later(another_result, name: "blocked-b") + end + + # Both keys are at cap; further enqueues for either key get discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-a") + BoundedBlockedJob.perform_later(another_result, name: "discarded-b") + end + end + + test "max_blocked respects concurrency_limit > 1" do + # to: 2, max_blocked: 2 => up to 2 ready + up to 2 blocked + assert_job_counts(ready: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-2") + end + + assert_job_counts(blocked: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-2") + end + + assert_job_counts do + BoundedBlockedThrottledJob.perform_later(@result, name: "discarded") + end + end + + test "max_blocked is a no-op when on_conflict is :discard" do + # max_blocked is only consulted on the :block path; with :discard, every + # conflict is destroyed regardless of how many are allegedly "allowed". + assert_job_counts(ready: 1) do + DiscardableWithMaxBlockedJob.perform_later(@result, name: "ready") + DiscardableWithMaxBlockedJob.perform_later(@result, name: "would-have-blocked-if-cap-applied") + end + end + test "block jobs in the same concurrency group when concurrency limits are reached" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")