From 60a0615b2758b521035cf8001856bedc4123d0a2 Mon Sep 17 00:00:00 2001 From: C J Silverio Date: Wed, 6 May 2026 20:23:17 -0700 Subject: [PATCH 1/3] feat!: remove deprecated RetryPolicy timing math MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the methods deprecated in 1.2.0 (since="1.2.0", removal scheduled for 2.0.0): - RetryPolicy::new(max, initial, max_delay, mult) - RetryPolicy::with_jitter(f64) - RetryPolicy::calculate_delay(attempt) -> Duration - RetryPolicy::calculate_retry_time(attempt, base) -> DateTime - JobSpec::calculate_retry_time(attempt, failed_at) -> Option> The math these computed was never reachable at runtime — graphile_worker schedules retries via its own SQL formula (`exp(min(attempts, 10))` seconds). Keeping the methods around as informational helpers risked users reading them as configuration that mattered. The struct itself is preserved with all five fields. Only `max_attempts` is honored; the other fields are documented as not-honored. Keeping the shape gives us a place to land per-job timing config if upstream graphile_worker ever exposes it, without another API break. The presets (fast / aggressive / conservative) and the JobSpec builders (with_fast_retries / with_aggressive_retries / with_conservative_retries) are unchanged — they're cheap convenience for setting max_attempts and have no semantic baggage. Tests: drop the three tests that exercised the deleted math (retry_policy_calculate_delay, retry_policy_max_delay_cap, job_spec_with_retry_policies). Add two cheap replacements covering RetryPolicy::should_retry, total_attempts, and that the presets pin to the documented attempt counts. BREAKING CHANGE: callers of any of the listed methods will get compile errors. Migrate by: RetryPolicy::new(8, ..., ..., ...) → RetryPolicy { max_attempts: 8, ..Default::default() } policy.with_jitter(0.2) → drop the call (had no runtime effect) policy.calculate_delay(n) → drop the call (no runtime effect) policy.calculate_retry_time(...) → drop the call (no runtime effect) spec.calculate_retry_time(...) → drop the call (no runtime effect) --- src/lib.rs | 20 ----- src/retries.rs | 221 ++++++++----------------------------------------- 2 files changed, 34 insertions(+), 207 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1dd3679..ab89eeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -479,26 +479,6 @@ impl JobSpec { pub fn effective_retry_policy(&self) -> RetryPolicy { self.retry_policy.clone().unwrap_or_default() } - - /// Calculate what the next retry time *would* be under this spec's - /// policy. - /// - /// **Not used at runtime.** graphile_worker schedules retries via a - /// fixed SQL formula. This method is preserved as a utility but has no - /// effect on actual job behaviour. - #[deprecated( - since = "1.2.0", - note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." - )] - pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime) -> Option> { - let policy = self.effective_retry_policy(); - if policy.should_retry(attempt) { - #[allow(deprecated)] - Some(policy.calculate_retry_time(attempt, failed_at)) - } else { - None // No more retries - } - } } impl From for GraphileJobSpec { diff --git a/src/retries.rs b/src/retries.rs index 609f4fa..6db94f9 100644 --- a/src/retries.rs +++ b/src/retries.rs @@ -17,20 +17,22 @@ //! //! **The only `RetryPolicy` field that actually affects job behaviour is //! `max_attempts`**, which is forwarded to graphile_worker's `JobSpec`. -//! Methods that compute delays (`calculate_delay`, `calculate_retry_time`, -//! `with_jitter`) are deprecated — they were dead code masquerading as -//! configuration. The `fast()`, `aggressive()`, and `conservative()` presets -//! are kept for compatibility but in practice differ only in attempt count. +//! The other fields are kept on the struct for source-compatibility and +//! forward-compatibility — if upstream graphile_worker ever exposes per-job +//! backoff config, the shape is already there. //! -//! Per-job backoff customisation requires upstream support in -//! `graphile_worker` and is not currently available. +//! The `fast()`, `aggressive()`, and `conservative()` presets are kept for +//! convenience but in practice differ only in attempt count (3 / 12 / 5 +//! respectively). use std::time::Duration; /// Configuration for retry behaviour. /// /// **Only `max_attempts` affects runtime behaviour.** The other fields are -/// retained for source-compatibility but graphile_worker computes retry +/// retained on the struct for source-compatibility (and so a future +/// graphile_worker upstream that supports per-job backoff config can +/// activate them without an API break) but graphile_worker computes retry /// timing from a fixed SQL formula (`exp(min(attempts, 10))` seconds) — see /// the module-level docs for details. #[derive(Debug, Clone)] @@ -64,104 +66,12 @@ impl Default for RetryPolicy { } impl RetryPolicy { - /// Create a new RetryPolicy with custom settings. - /// - /// **Note:** Only `max_attempts` affects runtime behaviour. The other - /// arguments are stored on the returned policy but never reach - /// graphile_worker. See the module-level docs. - #[deprecated( - since = "1.2.0", - note = "graphile_worker honors only max_attempts; the timing arguments are dead config. \ - Construct directly with RetryPolicy { max_attempts: n, ..Default::default() } \ - or use a preset like RetryPolicy::fast()." - )] - pub fn new(max_attempts: i32, initial_delay: Duration, max_delay: Duration, backoff_multiplier: f64) -> Self { - Self { - max_attempts, - initial_delay, - max_delay, - backoff_multiplier, - jitter_factor: 0.1, - } - } - - /// Configure jitter on this policy. **Not applied at runtime.** - #[deprecated( - since = "1.2.0", - note = "jitter_factor is not honored by graphile_worker; this method has no runtime effect." - )] - pub fn with_jitter(mut self, jitter_factor: f64) -> Self { - self.jitter_factor = jitter_factor.clamp(0.0, 1.0); - self - } - - /// Calculate what the delay *would* be for a given attempt number under - /// this policy. - /// - /// **Not used at runtime.** graphile_worker schedules retries via a - /// fixed SQL formula and ignores this calculation. Retained as a - /// utility for tests and future use. - #[deprecated( - since = "1.2.0", - note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." - )] - pub fn calculate_delay(&self, attempt: i32) -> Duration { - if attempt >= self.max_attempts { - return Duration::ZERO; - } - - // Calculate base delay: initial_delay * multiplier^attempt - let base_delay_ms = (self.initial_delay.as_millis() as f64) * self.backoff_multiplier.powi(attempt); - - // Cap at max_delay - let capped_delay_ms = base_delay_ms.min(self.max_delay.as_millis() as f64); - - // Add jitter to prevent thundering herd - let jitter = if self.jitter_factor > 0.0 { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - // Use a simple hash-based jitter for deterministic randomness - let mut hasher = DefaultHasher::new(); - attempt.hash(&mut hasher); - std::thread::current().id().hash(&mut hasher); - - let hash = hasher.finish(); - let jitter_multiplier = (hash % 1000) as f64 / 1000.0; // 0.0 to 1.0 - let jitter_range = capped_delay_ms * self.jitter_factor; - jitter_range * (jitter_multiplier * 2.0 - 1.0) // -jitter_range to +jitter_range - } else { - 0.0 - }; - - let final_delay_ms = (capped_delay_ms + jitter).max(0.0); - Duration::from_millis(final_delay_ms as u64) - } - - /// Calculate what the run_at time *would* be for a retry under this - /// policy. - /// - /// **Not used at runtime.** See [`RetryPolicy::calculate_delay`]. - #[deprecated( - since = "1.2.0", - note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." - )] - pub fn calculate_retry_time( - &self, - attempt: i32, - base_time: chrono::DateTime, - ) -> chrono::DateTime { - #[allow(deprecated)] - let delay = self.calculate_delay(attempt); - base_time + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::MAX) - } - - /// Check if we should retry for the given attempt number + /// Check if we should retry for the given attempt number. pub fn should_retry(&self, attempt: i32) -> bool { attempt < self.max_attempts } - /// Get the total number of attempts (including initial attempt) + /// Get the total number of attempts (including initial attempt). pub fn total_attempts(&self) -> i32 { self.max_attempts + 1 // +1 for the initial attempt } @@ -211,10 +121,8 @@ impl RetryPolicy { } #[cfg(test)] -#[allow(deprecated)] // tests still exercise the deprecated math helpers mod tests { use super::*; - use crate::JobSpec; #[test] fn retry_policy_defaults() { @@ -226,98 +134,37 @@ mod tests { assert_eq!(policy.jitter_factor, 0.1); } - #[test] - fn retry_policy_calculate_delay() { - let policy = RetryPolicy::new(3, Duration::from_millis(100), Duration::from_secs(10), 2.0); - - // First retry (attempt 0): 100ms - let delay0 = policy.calculate_delay(0); - assert!(delay0.as_millis() >= 90 && delay0.as_millis() <= 110); // Allow for jitter - - // Second retry (attempt 1): 200ms - let delay1 = policy.calculate_delay(1); - assert!(delay1.as_millis() >= 180 && delay1.as_millis() <= 220); // Allow for jitter - - // Third retry (attempt 2): 400ms - let delay2 = policy.calculate_delay(2); - assert!(delay2.as_millis() >= 360 && delay2.as_millis() <= 440); // Allow for jitter - - // No more retries (attempt 3): 0ms - let delay3 = policy.calculate_delay(3); - assert_eq!(delay3, Duration::ZERO); - } - #[test] fn retry_policy_should_retry() { - let policy = RetryPolicy::new(3, Duration::from_millis(100), Duration::from_secs(10), 2.0); - - assert!(policy.should_retry(0)); // First retry - assert!(policy.should_retry(1)); // Second retry - assert!(policy.should_retry(2)); // Third retry - assert!(!policy.should_retry(3)); // No more retries - assert!(!policy.should_retry(4)); // Definitely no more retries - } - - #[test] - fn retry_policy_max_delay_cap() { - let policy = RetryPolicy::new( - 10, - Duration::from_millis(100), - Duration::from_secs(1), // Cap at 1 second - 2.0, - ) - .with_jitter(0.0); // No jitter for predictable testing - - // Attempt 0: 100ms - assert_eq!(policy.calculate_delay(0), Duration::from_millis(100)); - - // Attempt 3: 100 * 2^3 = 800ms - assert_eq!(policy.calculate_delay(3), Duration::from_millis(800)); - - // Attempt 4: 100 * 2^4 = 1600ms, but capped at 1000ms - assert_eq!(policy.calculate_delay(4), Duration::from_millis(1000)); + let policy = RetryPolicy { + max_attempts: 3, + ..Default::default() + }; - // Attempt 10: Still capped at 1000ms - assert_eq!(policy.calculate_delay(9), Duration::from_millis(1000)); + assert!(policy.should_retry(0)); + assert!(policy.should_retry(1)); + assert!(policy.should_retry(2)); + assert!(!policy.should_retry(3)); + assert!(!policy.should_retry(4)); } #[test] - fn retry_policy_presets() { - let fast = RetryPolicy::fast(); - assert_eq!(fast.max_attempts, 3); - assert_eq!(fast.initial_delay, Duration::from_millis(100)); - assert_eq!(fast.max_delay, Duration::from_secs(30)); - - let aggressive = RetryPolicy::aggressive(); - assert_eq!(aggressive.max_attempts, 12); - assert_eq!(aggressive.initial_delay, Duration::from_millis(500)); - assert_eq!(aggressive.max_delay, Duration::from_secs(600)); - - let conservative = RetryPolicy::conservative(); - assert_eq!(conservative.max_attempts, 5); - assert_eq!(conservative.initial_delay, Duration::from_secs(5)); - assert_eq!(conservative.max_delay, Duration::from_secs(1800)); + fn retry_policy_total_attempts() { + let policy = RetryPolicy { + max_attempts: 3, + ..Default::default() + }; + assert_eq!(policy.total_attempts(), 4); // +1 for the initial attempt } #[test] - fn job_spec_with_retry_policies() { - let spec = JobSpec::default().with_fast_retries(); - assert_eq!(spec.max_attempts, Some(3)); - assert!(spec.retry_policy.is_some()); - - let policy = spec.effective_retry_policy(); - assert_eq!(policy.max_attempts, 3); - assert_eq!(policy.initial_delay, Duration::from_millis(100)); - - // Test retry time calculation - let base_time = chrono::Utc::now(); - let retry_time = spec - .calculate_retry_time(0, base_time) - .expect("we should get a time from this"); - assert!(retry_time > base_time); - - // Test when no more retries - let no_retry = spec.calculate_retry_time(10, base_time); - assert!(no_retry.is_none()); + fn retry_policy_presets_differ_only_in_attempt_count() { + // The presets exist as readable shorthand for max_attempts; they + // intentionally diverge on the other fields too, but those fields + // don't reach graphile_worker, so for runtime behaviour only the + // attempt count differs. + assert_eq!(RetryPolicy::fast().max_attempts, 3); + assert_eq!(RetryPolicy::aggressive().max_attempts, 12); + assert_eq!(RetryPolicy::conservative().max_attempts, 5); } } From fb5949acd44f4eb74edc4d01062c33ef584de09b Mon Sep 17 00:00:00 2001 From: C J Silverio Date: Wed, 6 May 2026 20:28:01 -0700 Subject: [PATCH 2/3] feat!: remove deprecated QueueConfig multi-queue API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the entire QueueConfig surface that was deprecated in 1.2.0 (scheduled for removal in 2.0.0): - pub struct QueueConfig - QueueConfig::default_queue, named_queue, priority_queue - WorkerConfig::with_queues(Vec) - WorkerConfig.queue_configs field - WorkerOptionsBuilder.queue_name field and with_queue_name() method - The "Queue name configuration is not supported" WARN log in WorkerOptionsBuilder->WorkerOptions conversion (unreachable now) The library never actually consumed any of this beyond the first config's concurrency value — graphile_worker's WorkerOptions doesn't expose per-worker queue filtering. Keeping the surface around in deprecated form invited misuse: users who reasonably expected named_queue to filter jobs got silent serialization-by-default instead, which is the opposite of what they intended. The new shape is one field on WorkerConfig: pub struct WorkerConfig { pub database_url: String, pub schema: String, pub concurrency: usize, // was: queue_configs: Vec pub poll_interval: Duration, ... } Set it via `WorkerConfig::with_concurrency(n)` (already added in 1.2.0) or via struct-literal init. To run multiple specialized workers, spawn multiple WorkerRunner instances yourself. Per-job queue routing remains available at enqueue time via Queue::serial(name) / Queue::serial_for(...) — that path was never tied to QueueConfig. Tests: - Drop test_queue_config_default_queue, test_queue_config_named_queue, test_queue_config_priority_queue (testing deleted constructors). - Drop test_worker_runner_with_multiple_queues_only_first_honored (the whole "only first honored" concept is gone with the API). - Drop test_queue_config_builders unit test in src/worker.rs. - Update test_worker_config_default and test_worker_config_builder to assert config.concurrency directly instead of queue_configs[0].concurrency. - Update three tests in integration_tests_clean.rs that used struct-literal init with `queue_configs: vec![],` — switch to `concurrency: 10,`. BREAKING CHANGE: any code referencing QueueConfig, WorkerConfig.queue_configs, or WorkerConfig::with_queues stops compiling. Migration: cfg.with_queues(vec![QueueConfig::default_queue(N)]) → cfg.with_concurrency(N) cfg.with_queues(vec![QueueConfig::named_queue(_, N)]) → cfg.with_concurrency(N) cfg.queue_configs[0].concurrency → cfg.concurrency cfg.queue_configs → (gone; use cfg.concurrency) For per-job queue routing (the named_queue use case), use Queue::serial(name) at enqueue time — that's the correct mechanism and always was. --- src/worker.rs | 151 +++---------------------------- tests/integration_tests_clean.rs | 6 +- tests/worker_tests.rs | 68 +------------- 3 files changed, 21 insertions(+), 204 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 91b5b68..8070d41 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -13,7 +13,7 @@ //! //! ## Kubernetes-style `select!` Integration //! ```rust,no_run -//! use backfill::{WorkerRunner, WorkerConfig, QueueConfig, TaskHandler, WorkerContext, IntoTaskHandlerResult}; +//! use backfill::{WorkerRunner, WorkerConfig, TaskHandler, WorkerContext, IntoTaskHandlerResult}; //! use tokio_util::sync::CancellationToken; //! use std::time::Duration; //! use serde::{Deserialize, Serialize}; @@ -100,87 +100,22 @@ use tokio_util::sync::CancellationToken; use crate::{BackfillClient, BackfillError, Plugin, TaskHandler, WorkerOptions}; -// Re-export for use in wrapper -/// Configuration for a worker queue. +/// Flexible worker configuration for integration with existing applications. /// -/// **Currently only `concurrency` is honored.** `name` and `priority_range` -/// were intended for multi-queue worker setups, but graphile_worker's -/// `WorkerOptions` doesn't expose per-worker queue filtering, so a single -/// `WorkerRunner` runs exactly one worker and consumes only the first -/// `QueueConfig` in `WorkerConfig::queue_configs`. To run multiple workers -/// with different queue specializations, spawn multiple `WorkerRunner`s. -#[derive(Debug, Clone)] -pub struct QueueConfig { - /// Queue name (None for default queue). **Not honored** — the worker - /// processes jobs from any queue regardless of this field. - pub name: Option, - /// Number of concurrent jobs to process. **Honored.** - pub concurrency: usize, - /// Priority range for jobs in this queue. **Not honored** — the worker - /// fetches by priority order with no range filter. - pub priority_range: Option<(i32, i32)>, -} - -impl QueueConfig { - /// Create configuration for the default queue with the given concurrency. - /// - /// This is the only `QueueConfig` constructor where every field is - /// actually honored at runtime. - pub fn default_queue(concurrency: usize) -> Self { - Self { - name: None, - concurrency, - priority_range: None, - } - } - - /// Create configuration for a named queue. - /// - /// **Deprecated**: the `name` field is not honored at worker startup — - /// graphile_worker's `WorkerOptions` doesn't filter jobs by queue. Use - /// [`WorkerConfig::with_concurrency`] instead, and route jobs to - /// specific named queues at enqueue time via `Queue::serial(name)`. - #[deprecated( - since = "1.2.0", - note = "queue name is not honored by graphile_worker's WorkerOptions; use WorkerConfig::with_concurrency \ - and route jobs to named queues at enqueue time via Queue::serial(name)" - )] - pub fn named_queue(name: impl Into, concurrency: usize) -> Self { - Self { - name: Some(name.into()), - concurrency, - priority_range: None, - } - } - - /// Create configuration for a priority-based queue. - /// - /// **Deprecated**: neither `name` nor `priority_range` is honored at - /// runtime. The worker fetches jobs by priority order (lower number - /// first) regardless of any range configured here. Use - /// [`WorkerConfig::with_concurrency`] instead. - #[deprecated( - since = "1.2.0", - note = "priority_range is never honored by the worker fetch loop; this constructor stores values that are dead. Use WorkerConfig::with_concurrency." - )] - pub fn priority_queue(name: impl Into, concurrency: usize, min_priority: i32, max_priority: i32) -> Self { - Self { - name: Some(name.into()), - concurrency, - priority_range: Some((min_priority, max_priority)), - } - } -} - -/// Flexible worker configuration for integration with existing applications +/// `WorkerRunner` always spawns exactly one graphile_worker `Worker`. To run +/// multiple specialized workers, spawn multiple `WorkerRunner` instances +/// yourself. Per-job queue routing happens at enqueue time via +/// [`Queue::serial(name)`](crate::Queue::serial); it is *not* configured at +/// the worker level — graphile_worker's `WorkerOptions` doesn't expose a +/// queue filter. #[derive(Debug, Clone)] pub struct WorkerConfig { /// PostgreSQL database connection URL pub database_url: String, /// GraphileWorker schema name (default: "graphile_worker") pub schema: String, - /// Queue configurations - each queue will run with its own concurrency - pub queue_configs: Vec, + /// Number of concurrent jobs the worker can run at once. + pub concurrency: usize, /// How often to poll for new jobs pub poll_interval: Duration, /// How often to process failed jobs into DLQ (None to disable) @@ -209,7 +144,7 @@ impl Default for WorkerConfig { Self { database_url: "postgresql://localhost:5432/backfill".to_string(), schema: "graphile_worker".to_string(), - queue_configs: vec![QueueConfig::default_queue(10)], + concurrency: 10, poll_interval: Duration::from_millis(200), dlq_processor_interval: Some(Duration::from_secs(60)), stale_lock_cleanup_interval: Some(DEFAULT_STALE_LOCK_CLEANUP_INTERVAL), @@ -241,23 +176,7 @@ impl WorkerConfig { /// parallel. To run multiple specialized workers, spawn multiple /// `WorkerRunner` instances yourself. pub fn with_concurrency(mut self, concurrency: usize) -> Self { - self.queue_configs = vec![QueueConfig::default_queue(concurrency)]; - self - } - - /// Set queue configurations. - /// - /// **Deprecated**: only the first `QueueConfig`'s `concurrency` is - /// honored — graphile_worker's `WorkerOptions` doesn't expose - /// per-worker queue filtering, so subsequent configs are ignored. Use - /// [`Self::with_concurrency`] instead. Route jobs to named queues at - /// enqueue time via `Queue::serial(name)`. - #[deprecated( - since = "1.2.0", - note = "only the first QueueConfig's concurrency is honored; use with_concurrency() and route via Queue::serial(name) at enqueue time" - )] - pub fn with_queues(mut self, queues: Vec) -> Self { - self.queue_configs = queues; + self.concurrency = concurrency; self } @@ -362,7 +281,6 @@ pub struct WorkerOptionsBuilder { pub(crate) poll_interval: Duration, pub(crate) pool: sqlx::PgPool, pub(crate) concurrency: usize, - pub(crate) queue_name: Option, pub(crate) job_handlers: Vec, pub(crate) crontabs: Vec, } @@ -390,18 +308,11 @@ impl WorkerOptionsBuilder { client.init_dlq().await?; } - // Use the first queue config for concurrency, or default - let concurrency = config.queue_configs.first().map(|q| q.concurrency).unwrap_or(10); - - // Use the first queue's name if specified - let queue_name = config.queue_configs.first().and_then(|q| q.name.clone()); - Ok(Self { schema: config.schema.clone(), poll_interval: config.poll_interval, pool: client.pool().clone(), - concurrency, - queue_name, + concurrency: config.concurrency, job_handlers: Vec::new(), crontabs: Vec::new(), }) @@ -426,12 +337,6 @@ impl WorkerOptionsBuilder { self } - /// Set the queue name for this worker - pub fn with_queue_name(mut self, queue_name: Option) -> Self { - self.queue_name = queue_name; - self - } - /// Add a cron schedule for periodic job execution /// /// # Syntax @@ -490,12 +395,6 @@ impl From for WorkerOptions { .pg_pool(builder.pool) .concurrency(builder.concurrency); - // Note: GraphileWorker doesn't expose queue_name configuration in WorkerOptions - // Multiple queues are handled at the job enqueueing level - if builder.queue_name.is_some() { - log::warn!("Queue name configuration is not supported by GraphileWorker's WorkerOptions - ignoring"); - } - // Register all job handlers for handler_config in builder.job_handlers { worker_options = (handler_config.builder_fn)(worker_options); @@ -925,9 +824,8 @@ impl WorkerRunner { /// Get the number of worker instances actually running. /// /// Always returns 1: `WorkerRunner` spawns exactly one graphile_worker - /// `Worker`. The historical `Vec` API allowed callers to - /// pass multiple configs, but only the first was ever used at runtime — - /// this method now reports the truth instead of `queue_configs.len()`. + /// `Worker`. To run multiple specialized workers, spawn multiple + /// `WorkerRunner` instances yourself. pub fn worker_count(&self) -> usize { 1 } @@ -942,25 +840,6 @@ impl WorkerRunner { mod tests { use super::*; - #[test] - #[allow(deprecated)] // pins the deprecated constructors' storage shape - fn test_queue_config_builders() { - let default = QueueConfig::default_queue(5); - assert_eq!(default.name, None); - assert_eq!(default.concurrency, 5); - assert_eq!(default.priority_range, None); - - let named = QueueConfig::named_queue("bulk", 10); - assert_eq!(named.name, Some("bulk".to_string())); - assert_eq!(named.concurrency, 10); - assert_eq!(named.priority_range, None); - - let priority = QueueConfig::priority_queue("urgent", 3, 100, 1000); - assert_eq!(priority.name, Some("urgent".to_string())); - assert_eq!(priority.concurrency, 3); - assert_eq!(priority.priority_range, Some((100, 1000))); - } - #[test] fn test_worker_config_builders() { let config = WorkerConfig::new("postgresql://localhost/test") diff --git a/tests/integration_tests_clean.rs b/tests/integration_tests_clean.rs index fe6d531..b839358 100644 --- a/tests/integration_tests_clean.rs +++ b/tests/integration_tests_clean.rs @@ -492,7 +492,7 @@ async fn test_cron_schedule_registration() -> Result<()> { let config = WorkerConfig { database_url: database_url.clone(), schema: schema_name.clone(), - queue_configs: vec![], + concurrency: 10, poll_interval: std::time::Duration::from_millis(1000), dlq_processor_interval: None, ..Default::default() @@ -541,7 +541,7 @@ async fn test_multiple_cron_schedules() -> Result<()> { let config = WorkerConfig { database_url: database_url.clone(), schema: schema_name.clone(), - queue_configs: vec![], + concurrency: 10, poll_interval: std::time::Duration::from_millis(1000), dlq_processor_interval: None, ..Default::default() @@ -580,7 +580,7 @@ async fn test_cron_with_payload() -> Result<()> { let config = WorkerConfig { database_url: database_url.clone(), schema: schema_name.clone(), - queue_configs: vec![], + concurrency: 10, poll_interval: std::time::Duration::from_millis(1000), dlq_processor_interval: None, ..Default::default() diff --git a/tests/worker_tests.rs b/tests/worker_tests.rs index 43875ae..ab70e4b 100644 --- a/tests/worker_tests.rs +++ b/tests/worker_tests.rs @@ -3,8 +3,7 @@ use std::time::Duration; use backfill::{ - BackfillError, IntoTaskHandlerResult, QueueConfig, TaskHandler, WorkerConfig, WorkerContext, WorkerError, - WorkerRunner, + BackfillError, IntoTaskHandlerResult, TaskHandler, WorkerConfig, WorkerContext, WorkerError, WorkerRunner, }; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; @@ -34,7 +33,7 @@ async fn test_worker_config_default() { assert_eq!(config.database_url, "postgresql://localhost:5432/backfill"); assert_eq!(config.schema, "graphile_worker"); - assert_eq!(config.queue_configs.len(), 1); + assert_eq!(config.concurrency, 10); assert_eq!(config.poll_interval, Duration::from_millis(200)); assert_eq!(config.dlq_processor_interval, Some(Duration::from_secs(60))); } @@ -51,8 +50,7 @@ async fn test_worker_config_builder() { assert_eq!(config.schema, "custom_schema"); assert_eq!(config.poll_interval, Duration::from_millis(100)); assert_eq!(config.dlq_processor_interval, Some(Duration::from_secs(30))); - assert_eq!(config.queue_configs.len(), 1); - assert_eq!(config.queue_configs[0].concurrency, 10); + assert_eq!(config.concurrency, 10); } #[tokio::test] @@ -62,35 +60,6 @@ async fn test_worker_config_disable_dlq_processor() { assert_eq!(config.dlq_processor_interval, None); } -#[tokio::test] -async fn test_queue_config_default_queue() { - let config = QueueConfig::default_queue(10); - - assert_eq!(config.name, None); - assert_eq!(config.concurrency, 10); - assert_eq!(config.priority_range, None); -} - -#[tokio::test] -#[allow(deprecated)] // documents the deprecated constructor's storage shape -async fn test_queue_config_named_queue() { - let config = QueueConfig::named_queue("bulk", 20); - - assert_eq!(config.name, Some("bulk".to_string())); - assert_eq!(config.concurrency, 20); - assert_eq!(config.priority_range, None); -} - -#[tokio::test] -#[allow(deprecated)] // documents the deprecated constructor's storage shape -async fn test_queue_config_priority_queue() { - let config = QueueConfig::priority_queue("urgent", 5, -100, 100); - - assert_eq!(config.name, Some("urgent".to_string())); - assert_eq!(config.concurrency, 5); - assert_eq!(config.priority_range, Some((-100, 100))); -} - #[tokio::test] async fn test_worker_runner_builder_creation() -> Result<(), BackfillError> { let config = WorkerConfig::new(get_test_database_url()) @@ -150,37 +119,6 @@ async fn test_worker_runner_multiple_job_types() -> Result<(), BackfillError> { Ok(()) } -#[tokio::test] -#[allow(deprecated)] // covers the deprecated multi-queue API while it still exists -async fn test_worker_runner_with_multiple_queues_only_first_honored() -> Result<(), BackfillError> { - // The Vec API is deprecated because graphile_worker doesn't - // expose per-worker queue filtering; only one Worker is ever spawned and - // only the first config's concurrency is used. This test pins that - // behaviour: passing 3 configs results in worker_count == 1. - let config = WorkerConfig::new(get_test_database_url()) - .with_schema("test_worker_queues") - .with_queues(vec![ - QueueConfig::default_queue(5), - QueueConfig::named_queue("fast", 10), - QueueConfig::named_queue("bulk", 3), - ]) - .with_dlq_processor_interval(None); - - let worker = WorkerRunner::builder(config) - .await? - .define_job::() - .build() - .await?; - - assert_eq!( - worker.worker_count(), - 1, - "WorkerRunner only ever spawns one Worker, regardless of queue_configs.len()" - ); - - Ok(()) -} - #[tokio::test] async fn test_worker_runner_with_dlq_enabled() -> Result<(), BackfillError> { let config = WorkerConfig::new(get_test_database_url()) From b54dc97aa3fd051a717714bbe97329e8c298dcb8 Mon Sep 17 00:00:00 2001 From: C J Silverio Date: Wed, 6 May 2026 20:29:47 -0700 Subject: [PATCH 3/3] chore: bump version to 2.0.0 --- CHANGELOG.md | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9267468..ac32c6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,67 @@ All notable changes to the Backfill project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project will adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) after reaching version 1.0.0. +## [2.0.0] — Deprecation cleanup + +Removes the API surface that was marked `#[deprecated(since = "1.2.0")]` in +the previous release. No behavioural changes — every removed item was +already non-functional or duplicative; the deprecation warnings in 1.2.0 +were the upgrade signal. + +### Removed (BREAKING) + +**`RetryPolicy` math methods** — graphile_worker schedules retries via a +fixed `exp(min(attempts, 10))`-second SQL formula, so these never reached +the worker: + +- `RetryPolicy::new(max, initial, max_delay, multiplier)` — multi-arg + constructor. Replace with `RetryPolicy { max_attempts: n, + ..Default::default() }` or one of the presets. +- `RetryPolicy::with_jitter(f64)` — replace with: drop the call (had no + runtime effect). +- `RetryPolicy::calculate_delay(attempt)` — replace with: drop the call. +- `RetryPolicy::calculate_retry_time(attempt, base)` — replace with: drop. +- `JobSpec::calculate_retry_time(attempt, failed_at)` — replace with: drop. + +The `RetryPolicy` struct itself is preserved with all five fields. Only +`max_attempts` is honored; the other fields document themselves as +not-honored. Keeping the shape leaves room for upstream graphile_worker to +expose per-job timing config without another API break. Presets (`fast`, +`aggressive`, `conservative`) and the JobSpec retry builders +(`with_fast_retries` / `with_aggressive_retries` / `with_conservative_retries`) +are unchanged — they're cheap convenience for setting `max_attempts`. + +**`QueueConfig` multi-queue API** — graphile_worker doesn't expose +per-worker queue filtering, so the multi-config shape was always a lie: + +- `pub struct QueueConfig` — removed entirely. +- `QueueConfig::default_queue` / `named_queue` / `priority_queue` — gone + with the struct. +- `WorkerConfig::with_queues(Vec)` — replace with + `WorkerConfig::with_concurrency(n)`. +- `WorkerConfig.queue_configs: Vec` field — replaced with + `WorkerConfig.concurrency: usize`. Set it via `with_concurrency` or + struct-literal init. To run multiple specialized workers, spawn multiple + `WorkerRunner` instances yourself. +- `WorkerOptionsBuilder.queue_name` field and `with_queue_name` method — + removed (the value was never propagated anywhere it would have effect). +- The internal "Queue name configuration is not supported" WARN log — + removed (now unreachable). + +**Migration for the `named_queue` use case**: per-job queue routing +remains available at enqueue time via `Queue::serial(name)` and +`Queue::serial_for(entity, id)`. That was always the right tool; the +`QueueConfig::named_queue` constructor was a misleading second path that +silently delivered serial-by-default behaviour to users who just wanted +a label. + +### Tests + +- 108 (was 113 in 1.2.0). Five removed: three for the now-deleted + `QueueConfig` constructors, one for the "multi-queue first-config + wins" behaviour that no longer exists, one in-source unit test for + the deleted `RetryPolicy` math. + ## [1.2.0] — Production-readiness audit A focused bug-fix release driven by a top-to-bottom production-readiness diff --git a/Cargo.lock b/Cargo.lock index ea0a9b5..147e7e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,7 +179,7 @@ dependencies = [ [[package]] name = "backfill" -version = "1.2.0" +version = "2.0.0" dependencies = [ "axum", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 03b71b7..2370806 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "backfill" -version = "1.2.0" +version = "2.0.0" edition = "2024" description = "A boringly-named priority work queue system for doing async tasks." categories = ["concurrency", "data-structures"]