diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 30d4399e..4ffba678 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, :concurrency_max_blocked, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -59,7 +59,20 @@ def handle_concurrency_conflict end def block - BlockedExecution.create_or_find_by!(job_id: id) + return BlockedExecution.create_or_find_by!(job_id: id) unless concurrency_max_blocked + + Job.transaction do + # Serialize concurrent enqueues for the same key via the semaphore row lock. + # The semaphore row is guaranteed to exist when we reach this point (see Semaphore::Proxy#wait). + Semaphore.lock.find_by(key: concurrency_key) + + if BlockedExecution.where(concurrency_key: concurrency_key).count < concurrency_max_blocked + BlockedExecution.create_or_find_by!(job_id: id) + else + SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id) + destroy! + end + end end def release_next_blocked_job diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 47b0f5ee..c63fe96f 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -14,15 +14,17 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period class_attribute :concurrency_on_conflict, default: :block + class_attribute :concurrency_max_blocked, default: nil end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block, max_blocked: nil) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block + self.concurrency_max_blocked = max_blocked end end diff --git a/test/dummy/app/jobs/max_blocked_update_result_job.rb b/test/dummy/app/jobs/max_blocked_update_result_job.rb new file mode 100644 index 00000000..e2872b22 --- /dev/null +++ b/test/dummy/app/jobs/max_blocked_update_result_job.rb @@ -0,0 +1,3 @@ +class MaxBlockedUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result }, max_blocked: 1 +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a12c48e4..ae2aba2c 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -55,28 +55,25 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "run several jobs over the same record limiting concurrency" do - incr = 0 # C is the last one to update the record # A: 0 to 0.5 # B: 0 to 1.0 - # C: 0 to 1.5 + # C: 0 to 3.0 — long enough to outlast D–H even on a loaded CI machine assert_no_difference -> { SolidQueue::BlockedExecution.count } do - ("A".."C").each do |name| - ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds) - incr += 0.5 + { "A" => 0.5, "B" => 1.0, "C" => 3.0 }.each do |name, pause| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: pause.seconds) end end - sleep(0.01) # To ensure these aren't picked up before ABC - # D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25) - # These would finish all before B and C + sleep(0.1) # To ensure these aren't picked up before ABC + # D to H: each 0.05s; all finish well before C's 3s pause is up assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do ("D".."H").each do |name| ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds) end end - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(10.seconds) assert_no_unfinished_jobs # C would have started in the beginning, seeing the status empty, and would finish after @@ -253,6 +250,64 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence(@result, [ "1", "3" ]) end + test "max_blocked: cap honored — blocked count stays at 1 under a storm of enqueues" do + # J1 starts running with a 2-second pause; 9 more enqueues race in behind it + MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds) + sleep(0.1) # ensure J1 is claimed before the storm + + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + 9.times { |i| MaxBlockedUpdateResultJob.perform_later(@result, name: i.to_s) } + end + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: drop-newer semantics — J1 runs, J2 queued, J3 and J4 dropped" do + MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds) + sleep(0.1) # ensure J1 is claimed before the storm + + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + MaxBlockedUpdateResultJob.perform_later(@result, name: "2") + MaxBlockedUpdateResultJob.perform_later(@result, name: "3") + MaxBlockedUpdateResultJob.perform_later(@result, name: "4") + end + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: nil preserves today's unbounded block behavior" do + NonOverlappingUpdateResultJob.perform_later(@result, name: "1", pause: 1.second) + sleep(0.1) + + assert_difference -> { SolidQueue::BlockedExecution.count }, +9 do + ("2".."10").each { |name| NonOverlappingUpdateResultJob.perform_later(@result, name: name) } + end + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + end + + test "max_blocked: independent concurrency keys each get their own cap" do + another_result = JobResult.create!(queue_name: "default", status: "") + + MaxBlockedUpdateResultJob.perform_later(@result, name: "A1", pause: 1.second) + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B1", pause: 1.second) + sleep(0.1) + + MaxBlockedUpdateResultJob.perform_later(@result, name: "A2") + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B2") + # cap hit per key — these are dropped + MaxBlockedUpdateResultJob.perform_later(@result, name: "A3") + MaxBlockedUpdateResultJob.perform_later(another_result, name: "B3") + + assert_equal 2, SolidQueue::BlockedExecution.count + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + end + private def assert_stored_sequence(result, sequence) expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 47702bd1..7c3bd6c0 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -14,6 +14,18 @@ class DiscardableNonOverlappingJob < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard end + class BoundedBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1 + end + + class BoundedBlockedThrottledJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 2 + end + + class DiscardableWithMaxBlockedJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard, max_blocked: 5 + end + class DiscardableThrottledJob < NonOverlappingJob limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard end @@ -137,6 +149,90 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob end end + test "block jobs up to max_blocked, then discard further conflicts" do + assert_ready do + BoundedBlockedJob.perform_later(@result, name: "ready") + end + + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "blocked") + end + + # Cap is hit: the next two jobs should be silently discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-1") + BoundedBlockedJob.perform_later(@result, name: "discarded-2") + end + end + + test "max_blocked cap reopens after the blocked job is released" do + ready_active_job = BoundedBlockedJob.perform_later(@result, name: "ready") + BoundedBlockedJob.perform_later(@result, name: "blocked-1") + ready_job, blocked_job = SolidQueue::Job.last(2) + + # Cap full: this enqueue is discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded") + end + + # Discarding the ready job signals the semaphore and promotes the blocked one. + # Net: ready_job destroyed (-1 Job), BlockedExecution -1, ReadyExecution unchanged + # (one destroyed for ready_job, one created from promotion). + assert_job_counts(blocked: -1) do + ready_job.discard + end + assert blocked_job.reload.ready? + + # Cap is now empty; a fresh enqueue should block again, not be discarded + assert_blocked do + BoundedBlockedJob.perform_later(@result, name: "newly-blocked") + end + end + + test "max_blocked enforces the cap independently per concurrency key" do + another_result = JobResult.create!(queue_name: "default") + + BoundedBlockedJob.perform_later(@result, name: "ready-a") + BoundedBlockedJob.perform_later(another_result, name: "ready-b") + + assert_job_counts(blocked: 2) do + BoundedBlockedJob.perform_later(@result, name: "blocked-a") + BoundedBlockedJob.perform_later(another_result, name: "blocked-b") + end + + # Both keys are at cap; further enqueues for either key get discarded + assert_job_counts do + BoundedBlockedJob.perform_later(@result, name: "discarded-a") + BoundedBlockedJob.perform_later(another_result, name: "discarded-b") + end + end + + test "max_blocked respects concurrency_limit > 1" do + # to: 2, max_blocked: 2 => up to 2 ready + up to 2 blocked + assert_job_counts(ready: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "ready-2") + end + + assert_job_counts(blocked: 2) do + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-1") + BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-2") + end + + assert_job_counts do + BoundedBlockedThrottledJob.perform_later(@result, name: "discarded") + end + end + + test "max_blocked is a no-op when on_conflict is :discard" do + # max_blocked is only consulted on the :block path; with :discard, every + # conflict is destroyed regardless of how many are allegedly "allowed". + assert_job_counts(ready: 1) do + DiscardableWithMaxBlockedJob.perform_later(@result, name: "ready") + DiscardableWithMaxBlockedJob.perform_later(@result, name: "would-have-blocked-if-cap-applied") + end + end + test "block jobs in the same concurrency group when concurrency limits are reached" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")