From aca950bb484f34c2eecf5add7b59644fbdc88f6d Mon Sep 17 00:00:00 2001 From: Jon Hinson Date: Thu, 21 May 2026 09:04:49 -0500 Subject: [PATCH] Cap the number of blocked executions per concurrency key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `max_blocked:` option to `limits_concurrency` that bounds how many jobs can sit in the blocked queue for a given concurrency key. Once the cap is reached, further conflicting jobs are silently discarded instead of joining an ever-growing blocked queue. The default is `nil`, preserving the existing unbounded behaviour. The option is only consulted on the `on_conflict: :block` path; with `:discard`, every conflict is destroyed regardless, so `max_blocked` is a no-op there. The cap is enforced under the existing per-key semaphore row lock, so concurrent enqueues racing to fill the last slot serialize against each other and the cap is never blown. When a finishing job releases the semaphore and promotes a blocked execution to ready, the slot reopens for the next enqueue — no dispatcher or release-path changes needed. --- .../solid_queue/job/concurrency_controls.rb | 15 ++- lib/active_job/concurrency_controls.rb | 4 +- .../jobs/bounded_blocked_update_result_job.rb | 3 + test/integration/concurrency_controls_test.rb | 22 +++++ test/models/solid_queue/job_test.rb | 96 +++++++++++++++++++ 5 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 test/dummy/app/jobs/bounded_blocked_update_result_job.rb diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 30d4399ed..86394ef56 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, :concurrency_max_blocked, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -59,7 +59,18 @@ def handle_concurrency_conflict end def block - BlockedExecution.create_or_find_by!(job_id: id) + return BlockedExecution.create_or_find_by!(job_id: id) unless concurrency_max_blocked + + transaction do + # Lock the semaphore row to serialize concurrent enqueues racing to fill the cap. + Semaphore.lock.find_by(key: concurrency_key) + + if BlockedExecution.where(concurrency_key: concurrency_key).count < concurrency_max_blocked + BlockedExecution.create_or_find_by!(job_id: id) + else + destroy + end + end end def release_next_blocked_job diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 47b0f5eea..08d0f6048 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -14,15 +14,17 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period class_attribute :concurrency_on_conflict, default: :block + class_attribute :concurrency_max_blocked end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block, max_blocked: nil) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block + self.concurrency_max_blocked = max_blocked end end diff --git a/test/dummy/app/jobs/bounded_blocked_update_result_job.rb b/test/dummy/app/jobs/bounded_blocked_update_result_job.rb new file mode 100644 index 000000000..81b4da8dd --- /dev/null +++ b/test/dummy/app/jobs/bounded_blocked_update_result_job.rb @@ -0,0 +1,3 @@ +class BoundedBlockedUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1 +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a12c48e42..f5cd1d366 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -235,6 +235,28 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence(another_result, [ "2" ]) end + test "block up to max_blocked and discard the rest" do + # to: 1, max_blocked: 1 — so 1 runs, 1 blocks, the rest get discarded + job1 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 0.5) + sleep(0.1) # ensure "1" is claimed first + + job2 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "2") # blocks under cap + job3 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "3") # cap full, discarded + job4 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "4") # cap full, discarded + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # 1 ran, 2 promoted after 1 finished and ran; 3 and 4 never ran + assert_stored_sequence(@result, [ "1", "2" ]) + + jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3, job4 ].map(&:job_id)) + assert_equal 2, jobs.count + assert_equal [ job1.provider_job_id, job2.provider_job_id ].sort, jobs.pluck(:id).sort + assert_nil job3.provider_job_id + assert_nil job4.provider_job_id + end + test "discard on conflict and release semaphore" do DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1) # will be discarded diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 47702bd18..7c3bd6c06 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -14,6 +14,18 @@ class DiscardableNonOverlappingJob < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard end + class BoundedBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1 + end + + class BoundedBlockedThrottledJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 2 + end + + class DiscardableWithMaxBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard, max_blocked: 5 + end + class DiscardableThrottledJob < NonOverlappingJob limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard end @@ -137,6 +149,90 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob end end + test "block jobs up to max_blocked, then discard further conflicts" do + assert_ready do + BoundedBlockedJob.perform_later(@result, name: "ready") + end + + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "blocked") + end + + # Cap is hit: the next two jobs should be silently discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-1") + BoundedBlockedJob.perform_later(@result, name: "discarded-2") + end + end + + test "max_blocked cap reopens after the blocked job is released" do + ready_active_job = BoundedBlockedJob.perform_later(@result, name: "ready") + BoundedBlockedJob.perform_later(@result, name: "blocked-1") + ready_job, blocked_job = SolidQueue::Job.last(2) + + # Cap full: this enqueue is discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded") + end + + # Discarding the ready job signals the semaphore and promotes the blocked one. + # Net: ready_job destroyed (-1 Job), BlockedExecution -1, ReadyExecution unchanged + # (one destroyed for ready_job, one created from promotion). + assert_job_counts(blocked: -1) do + ready_job.discard + end + assert blocked_job.reload.ready? + + # Cap is now empty; a fresh enqueue should block again, not be discarded + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "newly-blocked") + end + end + + test "max_blocked enforces the cap independently per concurrency key" do + another_result = JobResult.create!(queue_name: "default") + + BoundedBlockedJob.perform_later(@result, name: "ready-a") + BoundedBlockedJob.perform_later(another_result, name: "ready-b") + + assert_job_counts(blocked: 2) do + BoundedBlockedJob.perform_later(@result, name: "blocked-a") + BoundedBlockedJob.perform_later(another_result, name: "blocked-b") + end + + # Both keys are at cap; further enqueues for either key get discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-a") + BoundedBlockedJob.perform_later(another_result, name: "discarded-b") + end + end + + test "max_blocked respects concurrency_limit > 1" do + # to: 2, max_blocked: 2 => up to 2 ready + up to 2 blocked + assert_job_counts(ready: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-2") + end + + assert_job_counts(blocked: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-2") + end + + assert_job_counts do + BoundedBlockedThrottledJob.perform_later(@result, name: "discarded") + end + end + + test "max_blocked is a no-op when on_conflict is :discard" do + # max_blocked is only consulted on the :block path; with :discard, every + # conflict is destroyed regardless of how many are allegedly "allowed". + assert_job_counts(ready: 1) do + DiscardableWithMaxBlockedJob.perform_later(@result, name: "ready") + DiscardableWithMaxBlockedJob.perform_later(@result, name: "would-have-blocked-if-cap-applied") + end + end + test "block jobs in the same concurrency group when concurrency limits are reached" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")