Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,67 @@ All notable changes to the Backfill project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project will adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) after reaching version 1.0.0.

## [2.0.0] — Deprecation cleanup

Removes the API surface that was marked `#[deprecated(since = "1.2.0")]` in
the previous release. No behavioural changes — every removed item was
already non-functional or duplicative; the deprecation warnings in 1.2.0
were the upgrade signal.

### Removed (BREAKING)

**`RetryPolicy` math methods** — graphile_worker schedules retries via a
fixed `exp(min(attempts, 10))`-second SQL formula, so these never reached
the worker:

- `RetryPolicy::new(max, initial, max_delay, multiplier)` — multi-arg
constructor. Replace with `RetryPolicy { max_attempts: n,
..Default::default() }` or one of the presets.
- `RetryPolicy::with_jitter(f64)` — replace with: drop the call (had no
runtime effect).
- `RetryPolicy::calculate_delay(attempt)` — replace with: drop the call.
- `RetryPolicy::calculate_retry_time(attempt, base)` — replace with: drop.
- `JobSpec::calculate_retry_time(attempt, failed_at)` — replace with: drop.

The `RetryPolicy` struct itself is preserved with all five fields. Only
`max_attempts` is honored; the other fields document themselves as
not-honored. Keeping the shape leaves room for upstream graphile_worker to
expose per-job timing config without another API break. Presets (`fast`,
`aggressive`, `conservative`) and the JobSpec retry builders
(`with_fast_retries` / `with_aggressive_retries` / `with_conservative_retries`)
are unchanged — they're cheap convenience for setting `max_attempts`.

**`QueueConfig` multi-queue API** — graphile_worker doesn't expose
per-worker queue filtering, so the multi-config shape was always a lie:

- `pub struct QueueConfig` — removed entirely.
- `QueueConfig::default_queue` / `named_queue` / `priority_queue` — gone
with the struct.
- `WorkerConfig::with_queues(Vec<QueueConfig>)` — replace with
`WorkerConfig::with_concurrency(n)`.
- `WorkerConfig.queue_configs: Vec<QueueConfig>` field — replaced with
`WorkerConfig.concurrency: usize`. Set it via `with_concurrency` or
struct-literal init. To run multiple specialized workers, spawn multiple
`WorkerRunner` instances yourself.
- `WorkerOptionsBuilder.queue_name` field and `with_queue_name` method —
removed (the value was never propagated anywhere it would have effect).
- The internal "Queue name configuration is not supported" WARN log —
removed (now unreachable).

**Migration for the `named_queue` use case**: per-job queue routing
remains available at enqueue time via `Queue::serial(name)` and
`Queue::serial_for(entity, id)`. That was always the right tool; the
`QueueConfig::named_queue` constructor was a misleading second path that
silently delivered serial-by-default behaviour to users who just wanted
a label.

### Tests

- 108 (was 113 in 1.2.0). Five removed: three for the now-deleted
`QueueConfig` constructors, one for the "multi-queue first-config
wins" behaviour that no longer exists, one in-source unit test for
the deleted `RetryPolicy` math.

## [1.2.0] — Production-readiness audit

A focused bug-fix release driven by a top-to-bottom production-readiness
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "backfill"
version = "1.2.0"
version = "2.0.0"
edition = "2024"
description = "A boringly-named priority work queue system for doing async tasks."
categories = ["concurrency", "data-structures"]
Expand Down
20 changes: 0 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,26 +479,6 @@ impl JobSpec {
pub fn effective_retry_policy(&self) -> RetryPolicy {
self.retry_policy.clone().unwrap_or_default()
}

/// Calculate what the next retry time *would* be under this spec's
/// policy.
///
/// **Not used at runtime.** graphile_worker schedules retries via a
/// fixed SQL formula. This method is preserved as a utility but has no
/// effect on actual job behaviour.
#[deprecated(
since = "1.2.0",
note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect."
)]
pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime<Utc>) -> Option<DateTime<Utc>> {
let policy = self.effective_retry_policy();
if policy.should_retry(attempt) {
#[allow(deprecated)]
Some(policy.calculate_retry_time(attempt, failed_at))
} else {
None // No more retries
}
}
}

impl From<JobSpec> for GraphileJobSpec {
Expand Down
221 changes: 34 additions & 187 deletions src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
//!
//! **The only `RetryPolicy` field that actually affects job behaviour is
//! `max_attempts`**, which is forwarded to graphile_worker's `JobSpec`.
//! Methods that compute delays (`calculate_delay`, `calculate_retry_time`,
//! `with_jitter`) are deprecated — they were dead code masquerading as
//! configuration. The `fast()`, `aggressive()`, and `conservative()` presets
//! are kept for compatibility but in practice differ only in attempt count.
//! The other fields are kept on the struct for source-compatibility and
//! forward-compatibility — if upstream graphile_worker ever exposes per-job
//! backoff config, the shape is already there.
//!
//! Per-job backoff customisation requires upstream support in
//! `graphile_worker` and is not currently available.
//! The `fast()`, `aggressive()`, and `conservative()` presets are kept for
//! convenience but in practice differ only in attempt count (3 / 12 / 5
//! respectively).

use std::time::Duration;

/// Configuration for retry behaviour.
///
/// **Only `max_attempts` affects runtime behaviour.** The other fields are
/// retained for source-compatibility but graphile_worker computes retry
/// retained on the struct for source-compatibility (and so a future
/// graphile_worker upstream that supports per-job backoff config can
/// activate them without an API break) but graphile_worker computes retry
/// timing from a fixed SQL formula (`exp(min(attempts, 10))` seconds) — see
/// the module-level docs for details.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -64,104 +66,12 @@ impl Default for RetryPolicy {
}

impl RetryPolicy {
/// Create a new RetryPolicy with custom settings.
///
/// **Note:** Only `max_attempts` affects runtime behaviour. The other
/// arguments are stored on the returned policy but never reach
/// graphile_worker. See the module-level docs.
#[deprecated(
since = "1.2.0",
note = "graphile_worker honors only max_attempts; the timing arguments are dead config. \
Construct directly with RetryPolicy { max_attempts: n, ..Default::default() } \
or use a preset like RetryPolicy::fast()."
)]
pub fn new(max_attempts: i32, initial_delay: Duration, max_delay: Duration, backoff_multiplier: f64) -> Self {
Self {
max_attempts,
initial_delay,
max_delay,
backoff_multiplier,
jitter_factor: 0.1,
}
}

/// Configure jitter on this policy. **Not applied at runtime.**
#[deprecated(
since = "1.2.0",
note = "jitter_factor is not honored by graphile_worker; this method has no runtime effect."
)]
pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
self.jitter_factor = jitter_factor.clamp(0.0, 1.0);
self
}

/// Calculate what the delay *would* be for a given attempt number under
/// this policy.
///
/// **Not used at runtime.** graphile_worker schedules retries via a
/// fixed SQL formula and ignores this calculation. Retained as a
/// utility for tests and future use.
#[deprecated(
since = "1.2.0",
note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect."
)]
pub fn calculate_delay(&self, attempt: i32) -> Duration {
if attempt >= self.max_attempts {
return Duration::ZERO;
}

// Calculate base delay: initial_delay * multiplier^attempt
let base_delay_ms = (self.initial_delay.as_millis() as f64) * self.backoff_multiplier.powi(attempt);

// Cap at max_delay
let capped_delay_ms = base_delay_ms.min(self.max_delay.as_millis() as f64);

// Add jitter to prevent thundering herd
let jitter = if self.jitter_factor > 0.0 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

// Use a simple hash-based jitter for deterministic randomness
let mut hasher = DefaultHasher::new();
attempt.hash(&mut hasher);
std::thread::current().id().hash(&mut hasher);

let hash = hasher.finish();
let jitter_multiplier = (hash % 1000) as f64 / 1000.0; // 0.0 to 1.0
let jitter_range = capped_delay_ms * self.jitter_factor;
jitter_range * (jitter_multiplier * 2.0 - 1.0) // -jitter_range to +jitter_range
} else {
0.0
};

let final_delay_ms = (capped_delay_ms + jitter).max(0.0);
Duration::from_millis(final_delay_ms as u64)
}

/// Calculate what the run_at time *would* be for a retry under this
/// policy.
///
/// **Not used at runtime.** See [`RetryPolicy::calculate_delay`].
#[deprecated(
since = "1.2.0",
note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect."
)]
pub fn calculate_retry_time(
&self,
attempt: i32,
base_time: chrono::DateTime<chrono::Utc>,
) -> chrono::DateTime<chrono::Utc> {
#[allow(deprecated)]
let delay = self.calculate_delay(attempt);
base_time + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::MAX)
}

/// Check if we should retry for the given attempt number
/// Check if we should retry for the given attempt number.
pub fn should_retry(&self, attempt: i32) -> bool {
attempt < self.max_attempts
}

/// Get the total number of attempts (including initial attempt)
/// Get the total number of attempts (including initial attempt).
pub fn total_attempts(&self) -> i32 {
self.max_attempts + 1 // +1 for the initial attempt
}
Expand Down Expand Up @@ -211,10 +121,8 @@ impl RetryPolicy {
}

#[cfg(test)]
#[allow(deprecated)] // tests still exercise the deprecated math helpers
mod tests {
use super::*;
use crate::JobSpec;

#[test]
fn retry_policy_defaults() {
Expand All @@ -226,98 +134,37 @@ mod tests {
assert_eq!(policy.jitter_factor, 0.1);
}

#[test]
fn retry_policy_calculate_delay() {
let policy = RetryPolicy::new(3, Duration::from_millis(100), Duration::from_secs(10), 2.0);

// First retry (attempt 0): 100ms
let delay0 = policy.calculate_delay(0);
assert!(delay0.as_millis() >= 90 && delay0.as_millis() <= 110); // Allow for jitter

// Second retry (attempt 1): 200ms
let delay1 = policy.calculate_delay(1);
assert!(delay1.as_millis() >= 180 && delay1.as_millis() <= 220); // Allow for jitter

// Third retry (attempt 2): 400ms
let delay2 = policy.calculate_delay(2);
assert!(delay2.as_millis() >= 360 && delay2.as_millis() <= 440); // Allow for jitter

// No more retries (attempt 3): 0ms
let delay3 = policy.calculate_delay(3);
assert_eq!(delay3, Duration::ZERO);
}

#[test]
fn retry_policy_should_retry() {
let policy = RetryPolicy::new(3, Duration::from_millis(100), Duration::from_secs(10), 2.0);

assert!(policy.should_retry(0)); // First retry
assert!(policy.should_retry(1)); // Second retry
assert!(policy.should_retry(2)); // Third retry
assert!(!policy.should_retry(3)); // No more retries
assert!(!policy.should_retry(4)); // Definitely no more retries
}

#[test]
fn retry_policy_max_delay_cap() {
let policy = RetryPolicy::new(
10,
Duration::from_millis(100),
Duration::from_secs(1), // Cap at 1 second
2.0,
)
.with_jitter(0.0); // No jitter for predictable testing

// Attempt 0: 100ms
assert_eq!(policy.calculate_delay(0), Duration::from_millis(100));

// Attempt 3: 100 * 2^3 = 800ms
assert_eq!(policy.calculate_delay(3), Duration::from_millis(800));

// Attempt 4: 100 * 2^4 = 1600ms, but capped at 1000ms
assert_eq!(policy.calculate_delay(4), Duration::from_millis(1000));
let policy = RetryPolicy {
max_attempts: 3,
..Default::default()
};

// Attempt 10: Still capped at 1000ms
assert_eq!(policy.calculate_delay(9), Duration::from_millis(1000));
assert!(policy.should_retry(0));
assert!(policy.should_retry(1));
assert!(policy.should_retry(2));
assert!(!policy.should_retry(3));
assert!(!policy.should_retry(4));
}

#[test]
fn retry_policy_presets() {
let fast = RetryPolicy::fast();
assert_eq!(fast.max_attempts, 3);
assert_eq!(fast.initial_delay, Duration::from_millis(100));
assert_eq!(fast.max_delay, Duration::from_secs(30));

let aggressive = RetryPolicy::aggressive();
assert_eq!(aggressive.max_attempts, 12);
assert_eq!(aggressive.initial_delay, Duration::from_millis(500));
assert_eq!(aggressive.max_delay, Duration::from_secs(600));

let conservative = RetryPolicy::conservative();
assert_eq!(conservative.max_attempts, 5);
assert_eq!(conservative.initial_delay, Duration::from_secs(5));
assert_eq!(conservative.max_delay, Duration::from_secs(1800));
fn retry_policy_total_attempts() {
let policy = RetryPolicy {
max_attempts: 3,
..Default::default()
};
assert_eq!(policy.total_attempts(), 4); // +1 for the initial attempt
}

#[test]
fn job_spec_with_retry_policies() {
let spec = JobSpec::default().with_fast_retries();
assert_eq!(spec.max_attempts, Some(3));
assert!(spec.retry_policy.is_some());

let policy = spec.effective_retry_policy();
assert_eq!(policy.max_attempts, 3);
assert_eq!(policy.initial_delay, Duration::from_millis(100));

// Test retry time calculation
let base_time = chrono::Utc::now();
let retry_time = spec
.calculate_retry_time(0, base_time)
.expect("we should get a time from this");
assert!(retry_time > base_time);

// Test when no more retries
let no_retry = spec.calculate_retry_time(10, base_time);
assert!(no_retry.is_none());
fn retry_policy_presets_differ_only_in_attempt_count() {
// The presets exist as readable shorthand for max_attempts; they
// intentionally diverge on the other fields too, but those fields
// don't reach graphile_worker, so for runtime behaviour only the
// attempt count differs.
assert_eq!(RetryPolicy::fast().max_attempts, 3);
assert_eq!(RetryPolicy::aggressive().max_attempts, 12);
assert_eq!(RetryPolicy::conservative().max_attempts, 5);
}
}
Loading