Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
89b7286
Batch job POC
jpcamara Feb 2, 2024
d965ff9
Use ActiveSupport::IsolatedExecutionState to honor user isolation lev…
jpcamara Feb 5, 2024
5ea8b34
Ability to retrieve batch from a job
jpcamara Feb 5, 2024
5ebdfb9
Allow batch jobs to be instances
jpcamara Feb 8, 2024
fc251ae
Use text so the jobs store properly on mysql
jpcamara Mar 23, 2024
9503cad
Handle on_failure and on_success
jpcamara Sep 24, 2024
63e6915
Allow enqueueing into a batch instance
jpcamara Sep 24, 2024
3767273
Block enqueueing if the batch is finished
jpcamara Sep 24, 2024
1df6881
Migration to allow nesting batches
jpcamara Sep 24, 2024
8393c8c
Expanded batch readme
jpcamara Sep 26, 2024
8fa7c26
Force an initial batch check
jpcamara Sep 26, 2024
40db7c8
Initial batch lifecycle tests
jpcamara Sep 26, 2024
4df413f
Add job batches to queue_schema.rb as well
jpcamara Nov 22, 2024
5f46a34
Refactor internals and api namespace of batches
jpcamara Aug 29, 2025
619cac2
Move away from a batch_processed_at to batch_execution model
jpcamara Sep 5, 2025
f9d8729
Reduce complexity of batches implementation
jpcamara Sep 8, 2025
ddcb105
Test updates
jpcamara Sep 8, 2025
c1c8bb8
Create batch executions alongside ready and scheduled executions
jpcamara Sep 9, 2025
da28aaf
Leftover from previous implementation
jpcamara Sep 10, 2025
a515003
Move batch completion checks to job
jpcamara Sep 11, 2025
ac6b109
Support rails versions that don't have after_all_transactions_commit
jpcamara Sep 11, 2025
bff9d7e
Remove support for nested batches for now
jpcamara Sep 13, 2025
6a6c39f
Fix starting batch in rails 7.1
jpcamara Sep 13, 2025
c68a72c
Helper status method
jpcamara Sep 15, 2025
4c34c35
Remove parent/child batch relationship, which simplifies the logic
jpcamara Sep 15, 2025
2e7e08d
Performance improvements
jpcamara Sep 16, 2025
3e62618
We no longer need to keep jobs
jpcamara Sep 16, 2025
3b8895a
Removing pending_jobs column
jpcamara Sep 16, 2025
7794a84
Update doc to reflect current feature state
jpcamara Sep 16, 2025
71a8d06
We always save the batch first now, so we don't need to upsert
jpcamara Sep 16, 2025
1eff4c6
Rubocop
jpcamara Sep 16, 2025
07ef6c1
Accidental claude.md
jpcamara Sep 16, 2025
e09f6af
Allow omitting a block, which will just enqueue an empty job
jpcamara Sep 17, 2025
2bd2854
Switch batch_id to active_job_batch_id
jpcamara Oct 11, 2025
9479cb3
Make it so metadata is more ergonomic to include
jpcamara Oct 11, 2025
dca6ac4
Bad query field
jpcamara Oct 11, 2025
2008134
Update metadata interface
jpcamara Oct 11, 2025
ff3c70e
Give more breathing room for CI test runs
jpcamara Oct 11, 2025
e8f6aff
Simplify code for how callbacks are serialized/deserialized
jpcamara Jan 17, 2026
1393c00
.presence || nil is redundant
jpcamara Jan 17, 2026
3f54ff4
Move class << self to match rest of codebase
jpcamara Jan 17, 2026
d8304df
Add description to batches
jpcamara Jan 21, 2026
2f264ee
Switch callback job batch interface
jpcamara Jan 24, 2026
c2cf4a5
Bump rack-session to 2.1.2 (CVE-2026-39324)
treiff May 5, 2026
aca950b
Cap the number of blocked executions per concurrency key
jonhinson May 21, 2026
85eefc6
Merge pull request #5 from starburstlabs/feature/concurrency_max_blocked
jonhinson May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ GEM
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.8.1)
rack (3.2.3)
rack-session (2.1.1)
rack (3.2.6)
rack-session (2.1.2)
base64 (>= 0.1.0)
rack (>= 3.0.0)
rack-test (2.2.0)
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Performance considerations](#performance-considerations)
- [Failed jobs and retries](#failed-jobs-and-retries)
- [Error reporting on jobs](#error-reporting-on-jobs)
- [Batch jobs](#batch-jobs)
- [Puma plugin](#puma-plugin)
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
- [Recurring tasks](#recurring-tasks)
Expand Down Expand Up @@ -616,6 +617,66 @@ class ApplicationMailer < ActionMailer::Base
Rails.error.report(exception)
raise exception
end
```

## Batch jobs

SolidQueue offers support for batching jobs. This allows you to track progress of a set of jobs,
and optionally trigger callbacks based on their status. It supports the following:

- Relating jobs to a batch, to track their status
- Three available callbacks to fire:
- `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed.
- `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on`
- `on_failure`: Fired when all jobs have finished, including retries. Will only fire if one or more jobs have failed.
- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue`
- Attaching arbitrary metadata to a batch

```rb
class SleepyJob < ApplicationJob
def perform(seconds_to_sleep)
Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..."
sleep seconds_to_sleep
end
end

class BatchFinishJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "Good job finishing all jobs"
end
end

class BatchSuccessJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "Good job finishing all jobs, and all of them worked!"
end
end

class BatchFailureJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "At least one job failed, sorry!"
end
end

SolidQueue::Batch.enqueue(
on_finish: BatchFinishJob,
on_success: BatchSuccessJob,
on_failure: BatchFailureJob,
user_id: 123
) do
5.times.map { |i| SleepyJob.perform_later(i) }
end
```

### Batch options

In the case of an empty batch, a `SolidQueue::Batch::EmptyJob` is enqueued.

By default, this jobs run on the `default` queue. You can specify an alternative queue for it in an initializer:

```rb
Rails.application.config.after_initialize do # or to_prepare
SolidQueue::Batch::EmptyJob.queue_as "my_batch_queue"
end
```

Expand Down
12 changes: 12 additions & 0 deletions app/jobs/solid_queue/batch/empty_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module SolidQueue
class Batch
class EmptyJob < (defined?(ApplicationJob) ? ApplicationJob : ActiveJob::Base)
def perform
# This job does nothing - it just exists to trigger batch completion
# The batch completion will be handled by the normal job_finished! flow
end
end
end
end
145 changes: 145 additions & 0 deletions app/models/solid_queue/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# frozen_string_literal: true

module SolidQueue
class Batch < Record
class AlreadyFinished < StandardError; end

include Trackable, Clearable

has_many :jobs
has_many :batch_executions, class_name: "SolidQueue::BatchExecution", dependent: :destroy

serialize :metadata, coder: JSON
%w[ finish success failure ].each do |callback_type|
serialize "on_#{callback_type}", coder: JSON

define_method("on_#{callback_type}=") do |callback|
super serialize_callback(callback)
end
end

after_initialize :set_active_job_batch_id
after_commit :start_batch, on: :create, unless: -> { ActiveRecord.respond_to?(:after_all_transactions_commit) }

class << self
def enqueue(description: nil, on_success: nil, on_failure: nil, on_finish: nil, **metadata, &block)
new.tap do |batch|
batch.assign_attributes(
description: description,
on_success: on_success,
on_failure: on_failure,
on_finish: on_finish,
metadata: metadata
)

batch.enqueue(&block)
end
end

def current_batch_id
ActiveSupport::IsolatedExecutionState[:current_batch_id]
end

def wrap_in_batch_context(batch_id)
previous_batch_id = current_batch_id.presence
ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id
yield
ensure
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
end
end

def enqueue(&block)
raise AlreadyFinished, "You cannot enqueue a batch that is already finished" if finished?

transaction do
save! if new_record?

Batch.wrap_in_batch_context(id) do
block&.call(self)
end

if ActiveRecord.respond_to?(:after_all_transactions_commit)
ActiveRecord.after_all_transactions_commit do
start_batch
end
end
end
end

def metadata
(super || {}).with_indifferent_access
end

def check_completion
return if finished? || !enqueued?
return if batch_executions.any?
rows = Batch
.where(id: id)
.unfinished
.empty_executions
.update_all(finished_at: Time.current)

return if rows.zero?

with_lock do
failed = jobs.joins(:failed_execution).count
finished_attributes = {}
if failed > 0
finished_attributes[:failed_at] = Time.current
finished_attributes[:failed_jobs] = failed
end
finished_attributes[:completed_jobs] = total_jobs - failed

update!(finished_attributes)
enqueue_callback_jobs
end
end

private

def set_active_job_batch_id
self.active_job_batch_id ||= SecureRandom.uuid
end

def as_active_job(active_job_klass)
active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new
end

def serialize_callback(value)
if value.present?
active_job = value.is_a?(ActiveJob::Base) ? value : value.new
# We can pick up batch ids from context, but callbacks should never be considered a part of the batch
active_job.batch_id = nil
active_job.serialize
end
end

def enqueue_callback_job(callback_name)
active_job = ActiveJob::Base.deserialize(send(callback_name))
active_job.callback_batch_id = id
active_job.enqueue
end

def enqueue_callback_jobs
if failed_at?
enqueue_callback_job(:on_failure) if on_failure.present?
else
enqueue_callback_job(:on_success) if on_success.present?
end

enqueue_callback_job(:on_finish) if on_finish.present?
end

def enqueue_empty_job
Batch.wrap_in_batch_context(id) do
EmptyJob.perform_later
end
end

def start_batch
enqueue_empty_job if reload.total_jobs == 0
update!(enqueued_at: Time.current)
end
end
end
23 changes: 23 additions & 0 deletions app/models/solid_queue/batch/clearable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

module SolidQueue
class Batch
module Clearable
extend ActiveSupport::Concern

included do
scope :clearable, ->(finished_before: SolidQueue.clear_finished_jobs_after.ago) { where.not(finished_at: nil).where(finished_at: ...finished_before).where(failed_at: nil) }
end

class_methods do
def clear_finished_in_batches(batch_size: 500, finished_before: SolidQueue.clear_finished_jobs_after.ago, sleep_between_batches: 0)
loop do
records_deleted = clearable(finished_before: finished_before).limit(batch_size).delete_all
sleep(sleep_between_batches) if sleep_between_batches > 0
break if records_deleted == 0
end
end
end
end
end
end
68 changes: 68 additions & 0 deletions app/models/solid_queue/batch/trackable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module SolidQueue
class Batch
module Trackable
extend ActiveSupport::Concern

included do
scope :finished, -> { where.not(finished_at: nil) }
scope :succeeded, -> { finished.where(failed_at: nil) }
scope :unfinished, -> { where(finished_at: nil) }
scope :failed, -> { where.not(failed_at: nil) }
scope :empty_executions, -> {
where(<<~SQL)
NOT EXISTS (
SELECT 1 FROM solid_queue_batch_executions
WHERE solid_queue_batch_executions.batch_id = solid_queue_batches.id
LIMIT 1
)
SQL
}
end

def status
if finished?
failed? ? "failed" : "completed"
elsif enqueued?
"enqueued"
else
"pending"
end
end

def failed?
failed_at.present?
end

def succeeded?
finished? && !failed?
end

def finished?
finished_at.present?
end

def enqueued?
enqueued_at.present?
end

def completed_jobs
finished? ? self[:completed_jobs] : total_jobs - batch_executions.count
end

def failed_jobs
finished? ? self[:failed_jobs] : jobs.joins(:failed_execution).count
end

def pending_jobs
finished? ? 0 : batch_executions.count
end

def progress_percentage
return 0 if total_jobs == 0
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
end
end
end
end
32 changes: 32 additions & 0 deletions app/models/solid_queue/batch_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module SolidQueue
class BatchExecution < Record
belongs_to :job, optional: true
belongs_to :batch

after_commit :check_completion, on: :destroy

private
def check_completion
batch = Batch.find_by(id: batch_id)
batch.check_completion if batch.present?
end

class << self
def create_all_from_jobs(jobs)
batch_jobs = jobs.select { |job| job.batch_id.present? }
return if batch_jobs.empty?

batch_jobs.group_by(&:batch_id).each do |batch_id, jobs|
BatchExecution.insert_all!(jobs.map { |job|
{ batch_id:, job_id: job.respond_to?(:provider_job_id) ? job.provider_job_id : job.id }
})

total = jobs.size
SolidQueue::Batch.where(id: batch_id).update_all([ "total_jobs = total_jobs + ?", total ])
end
end
end
end
end
23 changes: 23 additions & 0 deletions app/models/solid_queue/execution/batchable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

module SolidQueue
class Execution
module Batchable
extend ActiveSupport::Concern

included do
after_create :update_batch_progress, if: -> { job.batch_id? }
end

private
def update_batch_progress
if is_a?(FailedExecution)
# FailedExecutions are only created when the job is done retrying
job.batch_execution&.destroy!
end
rescue => e
Rails.logger.error "[SolidQueue] Failed to notify batch #{job.batch_id} about job #{job.id} failure: #{e.message}"
end
end
end
end
Loading
Loading