Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module ConcurrencyControls
included do
has_one :blocked_execution

delegate :concurrency_limit, :concurrency_duration, to: :job_class
delegate :concurrency_limit, :concurrency_duration, :concurrency_max_blocked, to: :job_class

before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
end
Expand Down Expand Up @@ -59,7 +59,18 @@ def handle_concurrency_conflict
end

def block
BlockedExecution.create_or_find_by!(job_id: id)
return BlockedExecution.create_or_find_by!(job_id: id) unless concurrency_max_blocked

transaction do
# Lock the semaphore row to serialize concurrent enqueues racing to fill the cap.
Semaphore.lock.find_by(key: concurrency_key)

if BlockedExecution.where(concurrency_key: concurrency_key).count < concurrency_max_blocked
BlockedExecution.create_or_find_by!(job_id: id)
else
destroy
end
end
end

def release_next_blocked_job
Expand Down
4 changes: 3 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ module ConcurrencyControls
class_attribute :concurrency_limit
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
class_attribute :concurrency_on_conflict, default: :block
class_attribute :concurrency_max_blocked
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block, max_blocked: nil)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block
self.concurrency_max_blocked = max_blocked
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/dummy/app/jobs/bounded_blocked_update_result_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class BoundedBlockedUpdateResultJob < UpdateResultJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1
end
22 changes: 22 additions & 0 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,28 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert_stored_sequence(another_result, [ "2" ])
end

test "block up to max_blocked and discard the rest" do
# to: 1, max_blocked: 1 — so 1 runs, 1 blocks, the rest get discarded
job1 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "1", pause: 0.5)
sleep(0.1) # ensure "1" is claimed first

job2 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "2") # blocks under cap
job3 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "3") # cap full, discarded
job4 = BoundedBlockedUpdateResultJob.perform_later(@result, name: "4") # cap full, discarded

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# 1 ran, 2 promoted after 1 finished and ran; 3 and 4 never ran
assert_stored_sequence(@result, [ "1", "2" ])

jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3, job4 ].map(&:job_id))
assert_equal 2, jobs.count
assert_equal [ job1.provider_job_id, job2.provider_job_id ].sort, jobs.pluck(:id).sort
assert_nil job3.provider_job_id
assert_nil job4.provider_job_id
end

test "discard on conflict and release semaphore" do
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1)
# will be discarded
Expand Down
96 changes: 96 additions & 0 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ class DiscardableNonOverlappingJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
end

class BoundedBlockedJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 1
end

class BoundedBlockedThrottledJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :block, max_blocked: 2
end

class DiscardableWithMaxBlockedJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard, max_blocked: 5
end

class DiscardableThrottledJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard
end
Expand Down Expand Up @@ -137,6 +149,90 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob
end
end

test "block jobs up to max_blocked, then discard further conflicts" do
assert_ready do
BoundedBlockedJob.perform_later(@result, name: "ready")
end

assert_blocked do
BoundedBlockedJob.perform_later(@result, name: "blocked")
end

# Cap is hit: the next two jobs should be silently discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded-1")
BoundedBlockedJob.perform_later(@result, name: "discarded-2")
end
end

test "max_blocked cap reopens after the blocked job is released" do
ready_active_job = BoundedBlockedJob.perform_later(@result, name: "ready")
BoundedBlockedJob.perform_later(@result, name: "blocked-1")
ready_job, blocked_job = SolidQueue::Job.last(2)

# Cap full: this enqueue is discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded")
end

# Discarding the ready job signals the semaphore and promotes the blocked one.
# Net: ready_job destroyed (-1 Job), BlockedExecution -1, ReadyExecution unchanged
# (one destroyed for ready_job, one created from promotion).
assert_job_counts(blocked: -1) do
ready_job.discard
end
assert blocked_job.reload.ready?

# Cap is now empty; a fresh enqueue should block again, not be discarded
assert_blocked do
BoundedBlockedJob.perform_later(@result, name: "newly-blocked")
end
end

test "max_blocked enforces the cap independently per concurrency key" do
another_result = JobResult.create!(queue_name: "default")

BoundedBlockedJob.perform_later(@result, name: "ready-a")
BoundedBlockedJob.perform_later(another_result, name: "ready-b")

assert_job_counts(blocked: 2) do
BoundedBlockedJob.perform_later(@result, name: "blocked-a")
BoundedBlockedJob.perform_later(another_result, name: "blocked-b")
end

# Both keys are at cap; further enqueues for either key get discarded
assert_job_counts do
BoundedBlockedJob.perform_later(@result, name: "discarded-a")
BoundedBlockedJob.perform_later(another_result, name: "discarded-b")
end
end

test "max_blocked respects concurrency_limit > 1" do
# to: 2, max_blocked: 2 => up to 2 ready + up to 2 blocked
assert_job_counts(ready: 2) do
BoundedBlockedThrottledJob.perform_later(@result, name: "ready-1")
BoundedBlockedThrottledJob.perform_later(@result, name: "ready-2")
end

assert_job_counts(blocked: 2) do
BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-1")
BoundedBlockedThrottledJob.perform_later(@result, name: "blocked-2")
end

assert_job_counts do
BoundedBlockedThrottledJob.perform_later(@result, name: "discarded")
end
end

test "max_blocked is a no-op when on_conflict is :discard" do
# max_blocked is only consulted on the :block path; with :discard, every
# conflict is destroyed regardless of how many are allegedly "allowed".
assert_job_counts(ready: 1) do
DiscardableWithMaxBlockedJob.perform_later(@result, name: "ready")
DiscardableWithMaxBlockedJob.perform_later(@result, name: "would-have-blocked-if-cap-applied")
end
end

test "block jobs in the same concurrency group when concurrency limits are reached" do
assert_ready do
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
Expand Down
Loading