Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module ConcurrencyControls
included do
has_one :blocked_execution

delegate :concurrency_limit, :concurrency_duration, to: :job_class
delegate :concurrency_limit, :concurrency_duration, :concurrency_max_blocked, to: :job_class

before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
end
Expand Down Expand Up @@ -59,7 +59,20 @@ def handle_concurrency_conflict
end

def block
BlockedExecution.create_or_find_by!(job_id: id)
return BlockedExecution.create_or_find_by!(job_id: id) unless concurrency_max_blocked

Job.transaction do
# Serialize concurrent enqueues for the same key via the semaphore row lock.
# The semaphore row is guaranteed to exist when we reach this point (see Semaphore::Proxy#wait).
Semaphore.lock.find_by(key: concurrency_key)

if BlockedExecution.where(concurrency_key: concurrency_key).count < concurrency_max_blocked
BlockedExecution.create_or_find_by!(job_id: id)
else
SolidQueue.instrument(:max_blocked_dropped, concurrency_key: concurrency_key, dropped_job_id: id)
destroy!
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this back to destroy as that's what is used on line 55.

end
end
end

def release_next_blocked_job
Expand Down
4 changes: 3 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ module ConcurrencyControls
class_attribute :concurrency_limit
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
class_attribute :concurrency_on_conflict, default: :block
class_attribute :concurrency_max_blocked, default: nil
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block, max_blocked: nil)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block
self.concurrency_max_blocked = max_blocked
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/dummy/app/jobs/max_blocked_update_result_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class MaxBlockedUpdateResultJob < UpdateResultJob
limits_concurrency key: ->(job_result, **) { job_result }, max_blocked: 1
end
73 changes: 64 additions & 9 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,25 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end

test "run several jobs over the same record limiting concurrency" do
incr = 0
# C is the last one to update the record
# A: 0 to 0.5
# B: 0 to 1.0
# C: 0 to 1.5
# C: 0 to 3.0 — long enough to outlast D–H even on a loaded CI machine
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
("A".."C").each do |name|
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds)
incr += 0.5
{ "A" => 0.5, "B" => 1.0, "C" => 3.0 }.each do |name, pause|
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: pause.seconds)
end
end

sleep(0.01) # To ensure these aren't picked up before ABC
# D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25)
# These would finish all before B and C
sleep(0.1) # To ensure these aren't picked up before ABC
# D to H: each 0.05s; all finish well before C's 3s pause is up
assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do
("D".."H").each do |name|
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds)
end
end

wait_for_jobs_to_finish_for(5.seconds)
wait_for_jobs_to_finish_for(10.seconds)
assert_no_unfinished_jobs

# C would have started in the beginning, seeing the status empty, and would finish after
Expand Down Expand Up @@ -253,6 +250,64 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert_stored_sequence(@result, [ "1", "3" ])
end

test "max_blocked: cap honored — blocked count stays at 1 under a storm of enqueues" do
# J1 starts running with a 2-second pause; 9 more enqueues race in behind it
MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds)
sleep(0.1) # ensure J1 is claimed before the storm

assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do
9.times { |i| MaxBlockedUpdateResultJob.perform_later(@result, name: i.to_s) }
end

wait_for_jobs_to_finish_for(10.seconds)
assert_no_unfinished_jobs
end

test "max_blocked: drop-newer semantics — J1 runs, J2 queued, J3 and J4 dropped" do
MaxBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 2.seconds)
sleep(0.1) # ensure J1 is claimed before the storm

assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do
MaxBlockedUpdateResultJob.perform_later(@result, name: "2")
MaxBlockedUpdateResultJob.perform_later(@result, name: "3")
MaxBlockedUpdateResultJob.perform_later(@result, name: "4")
end

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs
end

test "max_blocked: nil preserves today's unbounded block behavior" do
NonOverlappingUpdateResultJob.perform_later(@result, name: "1", pause: 1.second)
sleep(0.1)

assert_difference -> { SolidQueue::BlockedExecution.count }, +9 do
("2".."10").each { |name| NonOverlappingUpdateResultJob.perform_later(@result, name: name) }
end

wait_for_jobs_to_finish_for(10.seconds)
assert_no_unfinished_jobs
end

test "max_blocked: independent concurrency keys each get their own cap" do
another_result = JobResult.create!(queue_name: "default", status: "")

MaxBlockedUpdateResultJob.perform_later(@result, name: "A1", pause: 1.second)
MaxBlockedUpdateResultJob.perform_later(another_result, name: "B1", pause: 1.second)
sleep(0.1)

MaxBlockedUpdateResultJob.perform_later(@result, name: "A2")
MaxBlockedUpdateResultJob.perform_later(another_result, name: "B2")
# cap hit per key — these are dropped
MaxBlockedUpdateResultJob.perform_later(@result, name: "A3")
MaxBlockedUpdateResultJob.perform_later(another_result, name: "B3")

assert_equal 2, SolidQueue::BlockedExecution.count

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs
end

private
def assert_stored_sequence(result, sequence)
expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join
Expand Down
96 changes: 96 additions & 0 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ class DiscardableNonOverlappingJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
end

class BoundedBlockedJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1
end

class BoundedBlockedThrottledJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 2
end

class DiscardableWithMaxBlockedJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard, max_blocked: 5
end

class DiscardableThrottledJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard
end
Expand Down Expand Up @@ -137,6 +149,90 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob
end
end

test "block jobs up to max_blocked, then discard further conflicts" do
assert_ready do
BoundedBlockedJob.perform_later(@result, name: "ready")
end

assert_blocked do
BoundedBlockedJob.perform_later(@result, name: "blocked")
end

# Cap is hit: the next two jobs should be silently discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded-1")
BoundedBlockedJob.perform_later(@result, name: "discarded-2")
end
end

test "max_blocked cap reopens after the blocked job is released" do
ready_active_job = BoundedBlockedJob.perform_later(@result, name: "ready")
BoundedBlockedJob.perform_later(@result, name: "blocked-1")
ready_job, blocked_job = SolidQueue::Job.last(2)

# Cap full: this enqueue is discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded")
end

# Discarding the ready job signals the semaphore and promotes the blocked one.
# Net: ready_job destroyed (-1 Job), BlockedExecution -1, ReadyExecution unchanged
# (one destroyed for ready_job, one created from promotion).
assert_job_counts(blocked: -1) do
ready_job.discard
end
assert blocked_job.reload.ready?

# Cap is now empty; a fresh enqueue should block again, not be discarded
assert_blocked do
BoundedBlockedJob.perform_later(@result, name: "newly-blocked")
end
end

test "max_blocked enforces the cap independently per concurrency key" do
another_result = JobResult.create!(queue_name: "default")

BoundedBlockedJob.perform_later(@result, name: "ready-a")
BoundedBlockedJob.perform_later(another_result, name: "ready-b")

assert_job_counts(blocked: 2) do
BoundedBlockedJob.perform_later(@result, name: "blocked-a")
BoundedBlockedJob.perform_later(another_result, name: "blocked-b")
end

# Both keys are at cap; further enqueues for either key get discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded-a")
BoundedBlockedJob.perform_later(another_result, name: "discarded-b")
end
end

test "max_blocked respects concurrency_limit > 1" do
# to: 2, max_blocked: 2 => up to 2 ready + up to 2 blocked
assert_job_counts(ready: 2) do
BoundedBlockedThrottledJob.perform_later(@result, name: "ready-1")
BoundedBlockedThrottledJob.perform_later(@result, name: "ready-2")
end

assert_job_counts(blocked: 2) do
BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-1")
BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-2")
end

assert_job_counts do
BoundedBlockedThrottledJob.perform_later(@result, name: "discarded")
end
end

test "max_blocked is a no-op when on_conflict is :discard" do
# max_blocked is only consulted on the :block path; with :discard, every
# conflict is destroyed regardless of how many are allegedly "allowed".
assert_job_counts(ready: 1) do
DiscardableWithMaxBlockedJob.perform_later(@result, name: "ready")
DiscardableWithMaxBlockedJob.perform_later(@result, name: "would-have-blocked-if-cap-applied")
end
end

test "block jobs in the same concurrency group when concurrency limits are reached" do
assert_ready do
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
Expand Down
Loading