From b53e7cdfcb3c8337b4d02675354651489960bc30 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Wed, 25 Feb 2026 17:32:37 +0100 Subject: [PATCH 1/8] Create submissions_cancelled table --- .../20260225164600_submissions_cancelled.sql | 10 ++++++++++ opsqueue/opsqueue_example_database_schema.db | Bin 94208 -> 102400 bytes 2 files changed, 10 insertions(+) create mode 100644 opsqueue/migrations/20260225164600_submissions_cancelled.sql diff --git a/opsqueue/migrations/20260225164600_submissions_cancelled.sql b/opsqueue/migrations/20260225164600_submissions_cancelled.sql new file mode 100644 index 0000000..56de6c9 --- /dev/null +++ b/opsqueue/migrations/20260225164600_submissions_cancelled.sql @@ -0,0 +1,10 @@ +CREATE TABLE submissions_cancelled +( + id BIGINT PRIMARY KEY NOT NULL, + prefix TEXT, + chunks_total INTEGER NOT NULL DEFAULT 0, + chunks_done INTEGER NOT NULL DEFAULT 0, + metadata BLOB, + cancelled_at DATETIME NOT NULL -- Unix Timestamp +); + diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index bced270be72bff4c21d3460188c2ca31890d4c69..63eb0a6fe6789cdac88cf616b5ddc15cf2a2dfa8 100644 GIT binary patch delta 1019 zcmZp8z}m2YO(!_eC$l6~AuYcsH?c&)m_dMnk&(ecL4kpRK^TZ7ffxpqC+Zk83QtTB zkzmQ=Wnl8*+0C_=Q<{AfyAs=OHf`4BOfH)h1$HoUunDk%wQOF>sl>$9WX8%aZf?%l z!o6Kyj8Tt~QFF5oQzSc^CmWl1toh_XUX{tZJe-q%aZ7CW=H1C8#>o@Fz`uasmG3TJ z9`7&SeqJq}T|5Cmlg)VQGdS5<&5b#8n1ynU+f*%zOOtXli;FY!^NJOc6Z4W&b8=Es zj0}v-bPbGjjZ75`&8!S8tW3<#KY6l@%izl%8*!=9-NlZpI(*tn{Nk?vvRom%pMiVX zSc+Be_*3Ne^D>9k$7c^(Hlt25Z9Y{s|0dXiF0N>Ym^pOulQM>qVm+I?1| z{m<5BarZb}-}boTzsfPGD29VyZn55dS^PRl+E?hxy+w;(JdJfvy**ts;nAwP~p_c{jwSY z=x&*OT-Jcud)3;>zhn(DC9UNQn0?abPR^6l!IWGDlyr;#Jo&DiDyF26yn(h)kw_#H zJF6tv-5JY&MI?e`I`xT##gG{VV0kAeRi|7-pS z{8#x;@$cu~!oQM#-eyIG7JhCnW>!W>G;?lVsh>81QGBzY!F&GA|LqwC7CC^>CI?0V zb*{Y(oXr0j_^$9x=A6On&Ed_j9Q<=Htu>Oc$7Zm|Qk13hZFyU=v^iYuWxvi?Nn*vkLni*3CXl zN=%b@y*KBvSTb(*=H1C8!ojnHfqwzNE8ktdJl znS5AArQTH6z(Uu+Si#WT%GA`#&~RCwc;8L7oHP}Sp9iM6dl^oeQfuuOEyy@BojNUB+%W=LK(|{MI?ef% z9Q-R8_`mVL=6}F{mH!m~e*P`|D>n-oOyu9ZQa^11qZp9=o`3Uydq#mp4j{D2fl)x6 zJ&A#n`91^R6~4(lYP{ZTqTI)s?{g+_Zp4Uu6{hLHIM3TID#obF2qK%AnK+kj-!H}J z$i%}V#K0qNXvo-}S(3Q@zc8a1C_#wuHZh2Zo0~JXaF-+|<)jvuCgo;spCJj-?=8!? zjY$^hYHp^*4194sU%CHr=W)GaFJs%x63dgy+{m;TnBZEOwl7v<%+O^8B~@Oq2PXfw W=MfbHaz%ky1c-&9SP&@vfFA&5TF#^Z From 6b34ce913bcb0bd55e37fc2c6e245829903d750c Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Wed, 25 Feb 2026 17:32:55 +0100 Subject: [PATCH 2/8] Add rust cancel_submission function --- opsqueue/src/common/submission.rs | 52 +++++++++++++++++++++++++++++++ opsqueue/src/prometheus.rs | 2 ++ 2 files changed, 54 insertions(+) diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 497d77d..897f69d 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -634,6 +634,58 @@ pub mod db { .await } + #[tracing::instrument(skip(conn))] + pub async fn cancel_submission( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + conn.transaction(move |mut tx| { + Box::pin( + async move { cancel_submission_notx(id, &mut tx).await }, + ) + }) + .await + } + + /// Do not call directly! Must be called inside a transaction. + pub async fn cancel_submission_notx( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + cancel_submission_raw(id, &mut conn).await?; + super::chunk::db::skip_remaining_chunks(id, conn).await?; + Ok(()) + } + + #[tracing::instrument(skip(conn))] + pub(super) async fn cancel_submission_raw( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + let now = chrono::prelude::Utc::now(); + + query!( + " + INSERT INTO submissions_cancelled + (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) + SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $3; + + DELETE FROM submissions WHERE id = $4 RETURNING *; + ", + now, + id, + id, + ) + .fetch_one(conn.get_inner()) + .await?; + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); + histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( + crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), + ); + + Ok(()) + } + #[tracing::instrument(skip(conn))] /// Do not call directly! MUST be called inside a transaction. pub(super) async fn complete_submission_raw( diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 74d7514..a74860c 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -17,8 +17,10 @@ use crate::db::DBPools; pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count"; pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count"; pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count"; +pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_failed_count"; pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds"; pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds"; +pub const SUBMISSIONS_DURATION_CANCEL_HISTOGRAM: &str = "submissions_cancel_duration_seconds"; pub const CHUNKS_COMPLETED_COUNTER: &str = "chunks_completed_count"; pub const CHUNKS_FAILED_COUNTER: &str = "chunks_failed_count"; From 085bd84d2c286caa96386b4595e74e532752cee9 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 11:58:54 +0100 Subject: [PATCH 3/8] Add endpoint to cancel a submission --- .../python/opsqueue/producer.py | 9 ++++++++ libs/opsqueue_python/src/producer.rs | 19 +++++++++++++++ opsqueue/src/common/submission.rs | 4 ++-- opsqueue/src/producer/client.rs | 23 +++++++++++++++++++ opsqueue/src/producer/server.rs | 13 +++++++++++ 5 files changed, 66 insertions(+), 2 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 5b8eabb..1edcab9 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -314,6 +314,15 @@ def count_submissions(self) -> int: """ return self.inner.count_submissions() # type: ignore[no-any-return] + def cancel_submission(self, submission_id: SubmissionId) -> None: + """ + Cancel a specific submission if it's in progress. + + Raises: + - `InternalProducerClientError` if there is a low-level internal error. + """ + return self.inner.cancel_submission(submission_id) + def get_submission_status( self, submission_id: SubmissionId ) -> SubmissionStatus | None: diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 2ea6078..a8347a5 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -118,6 +118,25 @@ impl ProducerClient { }) } + /// TODO docstring + pub fn cancel_submission( + &self, + py: Python<'_>, + id: SubmissionId, + ) -> CPyResult<(), E> + { + py.allow_threads(|| { + self.block_unless_interrupted(async { + self.producer_client + .cancel_submission(id.into()) + .await + .map_err(|e| CError(R(e))) + }) + // .map(|opt| opt.map(Into::into)) + // .map_err(|e| ProducerClientError::new_err(e.to_string())) + }) + } + /// Retrieve the status (in progress, completed or failed) of a specific submission. /// /// The returned SubmissionStatus object also includes the number of chunks finished so far, diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 897f69d..edc5b3b 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -668,9 +668,9 @@ pub mod db { " INSERT INTO submissions_cancelled (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) - SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $3; + SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $2; - DELETE FROM submissions WHERE id = $4 RETURNING *; + DELETE FROM submissions WHERE id = $3 RETURNING *; ", now, id, diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 0b94b66..2790949 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -101,6 +101,29 @@ impl Client { }) .retry(retry_policy()) .when(InternalProducerClientError::is_ephemeral) + .notify(|err, dur| { + tracing::debug!("retrying error {err:?} with sleeping {dur:?}");}) .await + } + + /// TODO docstring + pub async fn cancel_submission( + &self, + submission_id: SubmissionId, + ) -> Result<(), InternalProducerClientError> { + (|| async { + let base_url = &self.base_url; + self + .http_client + .post(format!("{base_url}/submissions/cancel/{submission_id}")) + .send() + .await? + .error_for_status()?; + // let bytes = resp.bytes().await?; + // let body = serde_json::from_slice(&bytes)?; + Ok(()) + }) + .retry(retry_policy()) + .when(InternalProducerClientError::is_ephemeral) .notify(|err, dur| { tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); }) diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index d32fee1..053f31f 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -46,6 +46,10 @@ impl ServerState { pub fn build_router(self) -> Router<()> { Router::new() .route("/submissions", post(insert_submission)) + .route( + "/submissions/cancel/{submission_id}", + post(cancel_submission), + ) .route( "/submissions/count_completed", get(submissions_count_completed), @@ -85,6 +89,15 @@ where } } +async fn cancel_submission( + State(state): State, + Path(submission_id): Path, +) -> Result<(), ServerError> { + let mut conn = state.pool.writer_conn().await?; + submission::db::cancel_submission(submission_id, &mut conn).await?; + Ok(()) +} + async fn submission_status( State(state): State, Path(submission_id): Path, From fa223da3a5376aebd12413715fc8f084016364ae Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 14:12:56 +0100 Subject: [PATCH 4/8] Add SubmissionCancelled to SubmissionStatus --- .../python/opsqueue/producer.py | 4 +- libs/opsqueue_python/src/common.rs | 38 ++++++++ libs/opsqueue_python/src/lib.rs | 2 + libs/opsqueue_python/src/producer.rs | 3 +- .../20260225164600_submissions_cancelled.sql | 1 - opsqueue/src/common/submission.rs | 94 ++++++++++++++++++- opsqueue/src/producer/client.rs | 44 +++++++-- opsqueue/src/prometheus.rs | 7 ++ 8 files changed, 176 insertions(+), 17 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 1edcab9..feee330 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -316,7 +316,9 @@ def count_submissions(self) -> int: def cancel_submission(self, submission_id: SubmissionId) -> None: """ - Cancel a specific submission if it's in progress. + Cancel a specific submission that is in progress. + + Returns None if the submission was successfully cancelled. Raises: - `InternalProducerClientError` if there is a low-level internal error. diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index caa5128..1ccbdf5 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -318,6 +318,19 @@ impl From for SubmissionFailed { } } +impl From for SubmissionCancelled { + fn from(value: opsqueue::common::submission::SubmissionCancelled) -> Self { + Self { + id: value.id.into(), + chunks_total: value.chunks_total.into(), + chunks_done: value.chunks_done.into(), + metadata: value.metadata, + strategic_metadata: value.strategic_metadata, + cancelled_at: value.cancelled_at, + } + } +} + #[pyclass(frozen, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmissionStatus { @@ -331,6 +344,9 @@ pub enum SubmissionStatus { submission: SubmissionFailed, chunk: ChunkFailed, }, + Cancelled { + submission: SubmissionCancelled, + }, } impl From for SubmissionStatus { @@ -348,6 +364,9 @@ impl From for SubmissionStatus { let submission = s.into(); SubmissionStatus::Failed { submission, chunk } } + Cancelled(s) => SubmissionStatus::Cancelled { + submission: s.into(), + }, } } } @@ -414,6 +433,14 @@ impl SubmissionFailed { } } +#[pymethods] +impl SubmissionCancelled { + fn __repr__(&self) -> String { + format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?}, cancelled_at={5})", + self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.strategic_metadata, self.cancelled_at) + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct SubmissionCompleted { @@ -435,6 +462,17 @@ pub struct SubmissionFailed { pub failed_chunk_id: u64, } +#[pyclass(frozen, get_all, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub chunks_total: u64, + pub chunks_done: u64, + pub metadata: Option, + pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + pub async fn run_unless_interrupted( future: impl IntoFuture>, ) -> Result diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 8800502..2c07889 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,6 +23,8 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index a8347a5..97f7bc2 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -123,8 +123,7 @@ impl ProducerClient { &self, py: Python<'_>, id: SubmissionId, - ) -> CPyResult<(), E> - { + ) -> CPyResult<(), E> { py.allow_threads(|| { self.block_unless_interrupted(async { self.producer_client diff --git a/opsqueue/migrations/20260225164600_submissions_cancelled.sql b/opsqueue/migrations/20260225164600_submissions_cancelled.sql index 56de6c9..34de8a7 100644 --- a/opsqueue/migrations/20260225164600_submissions_cancelled.sql +++ b/opsqueue/migrations/20260225164600_submissions_cancelled.sql @@ -7,4 +7,3 @@ CREATE TABLE submissions_cancelled metadata BLOB, cancelled_at DATETIME NOT NULL -- Unix Timestamp ); - diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index edc5b3b..e564b04 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -183,11 +183,27 @@ pub struct SubmissionFailed { pub otel_trace_carrier: String, } +/// A submission that has been cancelled. +/// +/// Once a submission is cancelled, it gets moved to the `submissions_cancelled` +/// table, and its old `submissions` record gets deleted. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub prefix: Option, + pub chunks_total: ChunkCount, + pub chunks_done: ChunkCount, + pub metadata: Option, + pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum SubmissionStatus { InProgress(Submission), Completed(SubmissionCompleted), Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled), } impl Default for Submission { @@ -607,6 +623,39 @@ pub mod db { failed_chunk, ))); } + + let cancelled_row_opt = query!( + r#" + SELECT + id AS "id: SubmissionId" + , prefix + , chunks_total AS "chunks_total: ChunkCount" + , chunks_done AS "chunks_done: ChunkCount" + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_cancelled.id + ) AS "strategic_metadata: sqlx::types::Json" + , cancelled_at AS "cancelled_at: DateTime" + FROM submissions_cancelled WHERE id = $1 + "#, + id + ) + .fetch_optional(conn.get_inner()) + .await?; + if let Some(row) = cancelled_row_opt { + let cancelled_submission = SubmissionCancelled { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + cancelled_at: row.cancelled_at, + }; + return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission))); + } + Ok(None) } @@ -640,9 +689,32 @@ pub mod db { mut conn: impl WriterConnection, ) -> sqlx::Result<()> { conn.transaction(move |mut tx| { - Box::pin( - async move { cancel_submission_notx(id, &mut tx).await }, - ) + Box::pin(async move { + match cancel_submission_notx(id, &mut tx).await { + Ok(()) => Ok(()), + Err(E::L(db_err)) => Err(E::L(db_err)), + Err(E::R(not_found_err)) => { + // Submission could not be found, let's check the status + // in order to return a more informative error. + match submission_status(id, &mut tx).await { + Ok(None) => Err(E::R(E::L(not_found_err))), + Ok(Some(SubmissionStatus::InProgress(submission))) => { + panic!("Failed to cancel in progress submission {:?}", submission) + } + Ok(Some(SubmissionStatus::Completed(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Completed(submission)))) + } + Ok(Some(SubmissionStatus::Failed(submission, chunk))) => Err(E::R( + E::R(SubmissionNotCancellable::Failed(submission, chunk)), + )), + Ok(Some(SubmissionStatus::Cancelled(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Cancelled(submission)))) + } + Err(db_err) => Err(E::L(db_err)), + } + } + } + }) }) .await } @@ -826,7 +898,7 @@ pub mod db { // Clean up old submissions_metadata query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_completed WHERE completed_at < julianday($1) );", older_than @@ -835,13 +907,25 @@ pub mod db { .await?; query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_failed WHERE failed_at < julianday($1) );", older_than ) .execute(tx.get_inner()) .await?; +<<<<<<< HEAD +======= + query!( + "DELETE FROM submissions_metadata + WHERE submission_id IN ( + SELECT id FROM submissions_cancelled WHERE cancelled_at < julianday($1) + );", + older_than + ) + .execute(tx.get_inner()) + .await?; +>>>>>>> a783c86 (fixup! Add SubmissionCancelled to SubmissionStatus) // Clean up old submissions: let n_submissions_completed = query!( diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 2790949..e81fef5 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -102,7 +102,9 @@ impl Client { .retry(retry_policy()) .when(InternalProducerClientError::is_ephemeral) .notify(|err, dur| { - tracing::debug!("retrying error {err:?} with sleeping {dur:?}");}) .await + tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); + }) + .await } /// TODO docstring @@ -112,15 +114,41 @@ impl Client { ) -> Result<(), InternalProducerClientError> { (|| async { let base_url = &self.base_url; - self - .http_client + self.http_client .post(format!("{base_url}/submissions/cancel/{submission_id}")) .send() - .await? - .error_for_status()?; - // let bytes = resp.bytes().await?; - // let body = serde_json::from_slice(&bytes)?; - Ok(()) + .await + .map_err(|e| E::R(E::R(e.into())))?; + let status = response.status(); + // 200, the submission was successfully cancelled. + if status.is_success() { + return Ok(()); + } + // 404, the submission could not be found. + if status == StatusCode::NOT_FOUND { + let not_found_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); + } + // 409, the submission could not be cancelled. + if status == StatusCode::CONFLICT { + let not_cancellable_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::R(E::L( + not_cancellable_err, + ))); + } + response + .error_for_status() + .map_err(|e| E::R(E::R(e.into())))?; + panic!( + "Unexpected {:?} from Opsqueue when cancelling a submission", + status + ) }) .retry(retry_policy()) .when(InternalProducerClientError::is_ephemeral) diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index a74860c..2cda3c3 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -56,7 +56,14 @@ pub fn describe_metrics() { Unit::Count, "Number of submissions failed permanently" ); + describe_counter!( + SUBMISSIONS_CANCELLED_COUNTER, + Unit::Count, + "Number of submissions cancelled permanently" + ); describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions."); + describe_histogram!(SUBMISSIONS_DURATION_FAIL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being failed."); + describe_histogram!(SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being canceled."); describe_counter!( CHUNKS_COMPLETED_COUNTER, From 9b3f119ce49d70aa7ba3f8f2e9ca7d38a438b4c9 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 15:50:48 +0100 Subject: [PATCH 5/8] Cleanup cancelled submissions/chunks --- opsqueue/src/common/submission.rs | 14 +++++++++----- opsqueue/src/producer/client.rs | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index e564b04..5f4adff 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -914,8 +914,6 @@ pub mod db { ) .execute(tx.get_inner()) .await?; -<<<<<<< HEAD -======= query!( "DELETE FROM submissions_metadata WHERE submission_id IN ( @@ -925,7 +923,6 @@ pub mod db { ) .execute(tx.get_inner()) .await?; ->>>>>>> a783c86 (fixup! Add SubmissionCancelled to SubmissionStatus) // Clean up old submissions: let n_submissions_completed = query!( @@ -940,6 +937,12 @@ pub mod db { ) .execute(tx.get_inner()) .await?.rows_affected(); + let n_submissions_cancelled = query!( + "DELETE FROM submissions_cancelled WHERE cancelled_at < julianday($1);", + older_than + ) + .execute(tx.get_inner()) + .await?.rows_affected(); let n_chunks_completed = query!( "DELETE FROM chunks_completed WHERE completed_at < julianday($1);", @@ -954,8 +957,9 @@ pub mod db { .execute(tx.get_inner()) .await?.rows_affected(); - tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks)"); - tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks)"); + tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks completed)"); + tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks failed)"); + tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions (with {n_chunks_completed} chunks completed and {n_chunks_failed} chunks failed)"); Ok(()) }) }) diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index e81fef5..df02b5b 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -394,7 +394,7 @@ mod tests { .expect("Should be OK") .expect("Should be Some"); match status { - SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) => { + SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) | SubmissionStatus::Cancelled(_) => { panic!("Expected a SubmissionStatus that is still Inprogress, got: {status:?}"); } SubmissionStatus::InProgress(submission) => { From 744cb952d4860f3110b02aa5a1915514cbe356b5 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 17:40:14 +0100 Subject: [PATCH 6/8] Error handling when cancelling submission --- .../python/opsqueue/exceptions.py | 45 +++++++- .../python/opsqueue/producer.py | 18 ++- libs/opsqueue_python/src/common.rs | 58 ++++++++-- libs/opsqueue_python/src/errors.rs | 22 +++- libs/opsqueue_python/src/lib.rs | 2 +- libs/opsqueue_python/src/producer.rs | 15 ++- libs/opsqueue_python/tests/test_roundtrip.py | 106 +++++++++++++++++- opsqueue/opsqueue_example_database_schema.db | Bin 102400 -> 102400 bytes opsqueue/src/common/errors.rs | 17 ++- opsqueue/src/common/submission.rs | 36 +++--- opsqueue/src/producer/client.rs | 86 ++++++++------ opsqueue/src/producer/server.rs | 25 ++++- opsqueue/src/prometheus.rs | 14 ++- 13 files changed, 367 insertions(+), 77 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 14603b3..97602fd 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -1,6 +1,7 @@ ## Expected errors: from . import opsqueue_internal +from typing import Optional class SubmissionFailedError(Exception): @@ -38,6 +39,35 @@ def __repr__(self) -> str: return str(self) +class SubmissionNotCancellableError(Exception): + __slots__ = ["submission", "chunk"] + """Raised when a submission could not be cancelled due to already being + completed, failed or cancelled. + + """ + + def __init__( + self, + submission: opsqueue_internal.SubmissionNotCancellable, + chunk: Optional[opsqueue_internal.ChunkFailed]=None, + ): + super().__init__() + self.submission = submission + self.chunk = chunk + + def __str__(self) -> str: + chunk_str = f"\n{self.chunk}" + return f""" + Submission {self.submission.submission.id} was not cancelled because: + + {self.submission} + {"" if self.chunk is None else chunk_str} + """ + + def __repr__(self) -> str: + return str(self) + + ## Usage errors: @@ -76,7 +106,20 @@ class SubmissionNotFoundError(IncorrectUsageError): but the submission doesn't exist within the Opsqueue. """ - pass + __slots = ["submission_id"] + + def __init__( + self, + submission_id: int, + ): + super().__init__() + self.submission_id = submission_id + + def __str__(self) -> str: + return f"Submission {self.submission_id} could not be found" + + def __repr__(self) -> str: + return str(self) class ChunkCountIsZeroError(IncorrectUsageError): diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index feee330..fb75fdf 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -14,13 +14,18 @@ ) from . import opsqueue_internal from . import tracing -from opsqueue.exceptions import SubmissionFailedError +from opsqueue.exceptions import ( + SubmissionFailedError, + SubmissionNotCancellableError, + SubmissionNotFoundError, +) from .opsqueue_internal import ( # type: ignore[import-not-found] SubmissionId, SubmissionStatus, SubmissionCompleted, SubmissionFailed, ChunkFailed, + SubmissionNotCancellable, ) __all__ = [ @@ -30,6 +35,9 @@ "SubmissionCompleted", "SubmissionFailedError", "SubmissionFailed", + "SubmissionNotCancellable", + "SubmissionNotCancellableError", + "SubmissionNotFoundError", "ChunkFailed", ] @@ -318,12 +326,14 @@ def cancel_submission(self, submission_id: SubmissionId) -> None: """ Cancel a specific submission that is in progress. - Returns None if the submission was successfully cancelled. + Returns None if the submission was succesfully cancelled. Raises: - - `InternalProducerClientError` if there is a low-level internal error. + - `SubmissionNotCancellableError` if the submission could not be + cancelled because it was already completed, failed or cancelled. + - `SubmissionNotFoundError` if the submission could not be found. """ - return self.inner.cancel_submission(submission_id) + self.inner.cancel_submission(submission_id) def get_submission_status( self, submission_id: SubmissionId diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 1ccbdf5..4a09b9a 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -371,6 +371,13 @@ impl From for SubmissionStatus { } } +#[pymethods] +impl SubmissionStatus { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct Submission { @@ -404,13 +411,6 @@ impl Submission { } } -#[pymethods] -impl SubmissionStatus { - fn __repr__(&self) -> String { - format!("{self:?}") - } -} - #[pymethods] impl SubmissionCompleted { fn __repr__(&self) -> String { @@ -473,6 +473,50 @@ pub struct SubmissionCancelled { pub cancelled_at: DateTime, } +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. +#[pyclass(frozen, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubmissionNotCancellable { + Completed { + submission: SubmissionCompleted, + }, + Failed { + submission: SubmissionFailed, + chunk: ChunkFailed, + }, + Cancelled { + submission: SubmissionCancelled, + }, +} + +impl From for SubmissionNotCancellable { + fn from(value: opsqueue::common::errors::SubmissionNotCancellable) -> Self { + use opsqueue::common::errors::SubmissionNotCancellable::*; + match value { + Completed(s) => SubmissionNotCancellable::Completed { + submission: s.into(), + }, + Failed(s, c) => { + let chunk = ChunkFailed::from_internal(c, &s); + SubmissionNotCancellable::Failed { + submission: s.into(), chunk + } + }, + Cancelled(s) => SubmissionNotCancellable::Cancelled { + submission: s.into(), + }, + } + } +} + +#[pymethods] +impl SubmissionNotCancellable { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + pub async fn run_unless_interrupted( future: impl IntoFuture>, ) -> Result diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 0c99f63..a3481d6 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -1,14 +1,16 @@ -/// NOTE: We defne the potentially raisable errors/exceptions in Python +/// NOTE: We define the potentially raisable errors/exceptions in Python /// so we have nice IDE support for docs-on-hover and for 'go to definition'. use std::error::Error; use opsqueue::common::chunk::ChunkId; use opsqueue::common::errors::{ - ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, + ChunkNotFound, IncorrectUsage, SubmissionNotFound, SubmissionNotCancellable, + UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; use pyo3::{import_exception, Bound, PyErr, Python}; +use crate::common; use crate::common::{ChunkIndex, SubmissionId}; // Expected errors: @@ -19,6 +21,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError); import_exception!(opsqueue.exceptions, TryFromIntError); import_exception!(opsqueue.exceptions, ChunkNotFoundError); import_exception!(opsqueue.exceptions, SubmissionNotFoundError); +import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); import_exception!(opsqueue.exceptions, NewObjectStoreClientError); import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError); @@ -123,10 +126,23 @@ impl From>> for PyErr { } } +impl From> for PyErr { + fn from(value: CError) -> Self { + let c: Option = match &value.0 { + opsqueue::common::errors::SubmissionNotCancellable::Failed(submission, chunk) => Some( + common::ChunkFailed::from_internal(chunk.clone(), submission), + ), + _ => None, + }; + let s: common::SubmissionNotCancellable = value.0.into(); + SubmissionNotCancellableError::new_err((s, c)) + } +} + impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; - SubmissionNotFoundError::new_err((value.0.to_string(), SubmissionId::from(submission_id))) + SubmissionNotFoundError::new_err(u64::from(submission_id)) } } diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 2c07889..b5f804f 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -21,9 +21,9 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 97f7bc2..dc1a909 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -10,6 +10,7 @@ use pyo3::{ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use opsqueue::{ common::errors::E::{self, L, R}, + common::errors::{SubmissionNotCancellable, SubmissionNotFound}, object_store::{ChunksStorageError, NewObjectStoreClientError}, producer::client::{Client as ActualClient, InternalProducerClientError}, }; @@ -118,12 +119,21 @@ impl ProducerClient { }) } - /// TODO docstring + /// Cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. pub fn cancel_submission( &self, py: Python<'_>, id: SubmissionId, - ) -> CPyResult<(), E> { + ) -> CPyResult< + (), + E< + FatalPythonException, + E>, + >, + > { py.allow_threads(|| { self.block_unless_interrupted(async { self.producer_client @@ -131,6 +141,7 @@ impl ProducerClient { .await .map_err(|e| CError(R(e))) }) + // TODO ? // .map(|opt| opt.map(Into::into)) // .map_err(|e| ProducerClientError::new_err(e.to_string())) }) diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index e13bcac..143f688 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -4,11 +4,16 @@ from collections.abc import Iterator, Sequence from opsqueue.producer import ( + SubmissionId, ProducerClient, SubmissionCompleted, SubmissionFailed, ChunkFailed, + SubmissionStatus, SubmissionFailedError, + SubmissionNotFoundError, + SubmissionNotCancellable, + SubmissionNotCancellableError, ) from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat @@ -302,7 +307,7 @@ def run_consumer() -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(lambda x: x, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): # Wait for the submission to complete. producer_client.blocking_stream_completed_submission(submission_id) submission = producer_client.get_submission_status(submission_id) @@ -339,7 +344,7 @@ def consume(x: int) -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(consume, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert isinstance(x, SubmissionFailed) @@ -354,3 +359,100 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: submission = producer_client.get_submission_status(submission_id) assert submission is not None assert_submission_failed_has_metadata(submission.submission) + +def test_cancel_submission_not_found( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that doesn't exist raises a + SubmissionNotFoundError. + + """ + url = "file:///tmp/opsqueue/test_cancel_submission_not_found" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = 0 + with pytest.raises(SubmissionNotFoundError) as exc_info: + producer_client.cancel_submission(SubmissionId(0)) + assert exc_info.value.submission_id == submission_id + +def test_cancel_in_progress_and_already_cancelled_submissions( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Cancelling a submission that is in progress succeeds and returns none. + Attempting to cancel a submission that is already cancelled should raise a + SubmissionNotCancellableError. + + """ + + url = "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + status = producer_client.get_submission_status(submission_id) + # Sanity check submission is in progress before proceeding to cancel. + assert isinstance(status, SubmissionStatus.InProgress) + # Cancelling an in progress submission should change submission status to + # cancelled. + producer_client.cancel_submission(submission_id) + assert isinstance( + producer_client.get_submission_status(submission_id), SubmissionStatus.Cancelled + ) + # Cancelling an already cancelled submission should raise an exception. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Cancelled) + +def test_cancel_complete_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has already completed should raise + a SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_complete_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(lambda x: x, strategy=strategy) + + with background_process(run_consumer): + # Wait for the submission to complete. + producer_client.blocking_stream_completed_submission(submission_id) + submission = producer_client.get_submission_status(submission_id) + assert submission is not None + assert isinstance(submission.submission, SubmissionCompleted) + # Cancelling the already completed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Completed) + assert exc_info.value.chunk is None + +def test_cancel_failed_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has failed should raise a + SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_failed_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + + def consume(x: int) -> None: + raise ValueError(f"Couldn't consume {x}") + + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(consume, strategy=strategy) + + with background_process(run_consumer): + with pytest.raises(SubmissionFailedError): + producer_client.blocking_stream_completed_submission(submission_id) + # Cancelling the failed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Failed) + assert isinstance(exc_info.value.chunk, ChunkFailed) diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index 63eb0a6fe6789cdac88cf616b5ddc15cf2a2dfa8..4d6ad39aac49256a3bb2c0107d27d7f13b0ed702 100644 GIT binary patch delta 262 zcmZozz}B#UZ9}b$t(k(Msg<#bm5Ev5$C$^j{P(^8eV*Ayg!g!mP}o1!!r8a2Kk9d9 zeqC^ir$SFs6EUp`uCu_-Sz{MhEjhTZA zr%vvd)q{&20g8FAT08ldtT|lFLe7}kCvEQJJUJt{*m9tlTm0wAcjdI>~*$$@&b8fxB;htVxDnR Ln-rEQFcts+r9o66 diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index 38ee007..f30436e 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -11,7 +11,10 @@ use thiserror::Error; use crate::consumer::common::SyncServerToClientResponse; -use super::{chunk::ChunkId, submission::SubmissionId}; +use super::{ + chunk::{ChunkFailed, ChunkId}, + submission::{SubmissionCancelled, SubmissionCompleted, SubmissionFailed, SubmissionId}, +}; // #[derive(Error, Debug, Clone, Serialize, Deserialize)] // #[error("Low-level database error: {0:?}")] @@ -32,10 +35,20 @@ impl From for E { #[error("Chunk not found for ID {0:?}")] pub struct ChunkNotFound(pub ChunkId); -#[derive(Error, Debug)] +#[derive(Error, Debug, Deserialize, Serialize)] #[error("Submission not found for ID {0:?}")] pub struct SubmissionNotFound(pub SubmissionId); +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. +#[derive(Error, Debug, Deserialize, Serialize)] +#[error("Submission could not be cancelled {0:?}")] +pub enum SubmissionNotCancellable { + Completed(SubmissionCompleted), + Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled), +} + #[derive(Error, Debug)] #[error("Unexpected opsqueue consumer server response. This indicates an error inside Opsqueue itself: {0:?}")] pub struct UnexpectedOpsqueueConsumerServerResponse(pub SyncServerToClientResponse); diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 5f4adff..9979d10 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -260,7 +260,7 @@ impl Submission { pub mod db { use crate::{ common::{ - errors::{DatabaseError, SubmissionNotFound, E}, + errors::{DatabaseError, SubmissionNotCancellable, SubmissionNotFound, E}, StrategicMetadataMap, }, db::{Connection, True, WriterConnection, WriterPool}, @@ -687,15 +687,15 @@ pub mod db { pub async fn cancel_submission( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E>> { conn.transaction(move |mut tx| { Box::pin(async move { match cancel_submission_notx(id, &mut tx).await { Ok(()) => Ok(()), Err(E::L(db_err)) => Err(E::L(db_err)), Err(E::R(not_found_err)) => { - // Submission could not be found, let's check the status - // in order to return a more informative error. + // Submission was not found in the 'submissions' table, + // but it could still be in one of the other tables. match submission_status(id, &mut tx).await { Ok(None) => Err(E::R(E::L(not_found_err))), Ok(Some(SubmissionStatus::InProgress(submission))) => { @@ -723,7 +723,7 @@ pub mod db { pub async fn cancel_submission_notx( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E> { cancel_submission_raw(id, &mut conn).await?; super::chunk::db::skip_remaining_chunks(id, conn).await?; Ok(()) @@ -733,10 +733,10 @@ pub mod db { pub(super) async fn cancel_submission_raw( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E> { let now = chrono::prelude::Utc::now(); - query!( + let submission_opt = query!( " INSERT INTO submissions_cancelled (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) @@ -748,14 +748,20 @@ pub mod db { id, id, ) - .fetch_one(conn.get_inner()) + .fetch_optional(conn.get_inner()) .await?; - counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); - histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( - crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), - ); - - Ok(()) + match submission_opt { + None => { + histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( + crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), + ); + Err(E::R(SubmissionNotFound(id))) + } + Some(_) => { + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); + Ok(()) + } + } } #[tracing::instrument(skip(conn))] @@ -959,7 +965,7 @@ pub mod db { tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks completed)"); tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks failed)"); - tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions (with {n_chunks_completed} chunks completed and {n_chunks_failed} chunks failed)"); + tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions"); Ok(()) }) }) diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index df02b5b..8530fee 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -3,7 +3,10 @@ use std::time::Duration; use backon::BackoffBuilder; use backon::FibonacciBuilder; use backon::Retryable; +use http::StatusCode; +use crate::common::errors; +use crate::common::errors::E; use crate::common::submission::{SubmissionId, SubmissionStatus}; use crate::tracing::CarrierMap; @@ -107,51 +110,61 @@ impl Client { .await } - /// TODO docstring + /// Send a HTTP request to the OpsQueue server to cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. pub async fn cancel_submission( &self, submission_id: SubmissionId, - ) -> Result<(), InternalProducerClientError> { + ) -> Result< + (), + E< + errors::SubmissionNotFound, + E, + >, + > { (|| async { let base_url = &self.base_url; - self.http_client + let response = self + .http_client .post(format!("{base_url}/submissions/cancel/{submission_id}")) .send() .await .map_err(|e| E::R(E::R(e.into())))?; let status = response.status(); - // 200, the submission was successfully cancelled. - if status.is_success() { - return Ok(()); - } - // 404, the submission could not be found. - if status == StatusCode::NOT_FOUND { - let not_found_err = response - .json::() - .await - .map_err(|e| E::R(E::R(e.into())))?; - return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); - } - // 409, the submission could not be cancelled. - if status == StatusCode::CONFLICT { - let not_cancellable_err = response - .json::() - .await - .map_err(|e| E::R(E::R(e.into())))?; - return Err(E::<_, E<_, InternalProducerClientError>>::R(E::L( - not_cancellable_err, - ))); + match status { + // 200, the submission was successfully cancelled. + StatusCode::OK => Ok(()), + // 404, the submission could not be found. + StatusCode::NOT_FOUND => { + let not_found_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)) + } + // 409, the submission could not be cancelled. + StatusCode::CONFLICT => { + let not_cancellable_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + Err(E::<_, E<_, InternalProducerClientError>>::R(E::L( + not_cancellable_err, + ))) + } + _ => Err(E::R(E::R(InternalProducerClientError::UnexpectedStatus( + status, + )))), } - response - .error_for_status() - .map_err(|e| E::R(E::R(e.into())))?; - panic!( - "Unexpected {:?} from Opsqueue when cancelling a submission", - status - ) }) .retry(retry_policy()) - .when(InternalProducerClientError::is_ephemeral) + .when(|e| match e { + E::L(_) => false, + E::R(E::L(_)) => false, + E::R(E::R(client_err)) => client_err.is_ephemeral(), + }) .notify(|err, dur| { tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); }) @@ -236,11 +249,16 @@ pub enum InternalProducerClientError { HTTPClientError(#[from] reqwest::Error), #[error("Error decoding JSON response: {0}")] ResponseDecodingError(#[from] serde_json::Error), + #[error("Internal client received unexpected status: {0}")] + UnexpectedStatus(StatusCode), } impl InternalProducerClientError { pub fn is_ephemeral(&self) -> bool { match self { + // In the case of an unexpected status error, developer intervention + // will be required. + Self::UnexpectedStatus(_) => false, // In the case of an ungraceful restart, this case might theoretically trigger. // So even cleaner would be a tiny retry loop for this special case. // However, we certainly **do not** want to wait multiple minutes before returning. @@ -394,7 +412,9 @@ mod tests { .expect("Should be OK") .expect("Should be Some"); match status { - SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) | SubmissionStatus::Cancelled(_) => { + SubmissionStatus::Completed(_) + | SubmissionStatus::Failed(_, _) + | SubmissionStatus::Cancelled(_) => { panic!("Expected a SubmissionStatus that is still Inprogress, got: {status:?}"); } SubmissionStatus::InProgress(submission) => { diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index 053f31f..ad144e0 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::common::errors::E; use crate::common::submission::{self, SubmissionId}; use crate::db::DBPools; use axum::extract::{Path, State}; @@ -89,13 +90,29 @@ where } } +// 200 if the submission was successfully cancelled. +// 404 if the submission could not be found. +// 409 if the submission could not be cancelled. +// 500 if a DatabaseError occurred. async fn cancel_submission( State(state): State, Path(submission_id): Path, -) -> Result<(), ServerError> { - let mut conn = state.pool.writer_conn().await?; - submission::db::cancel_submission(submission_id, &mut conn).await?; - Ok(()) +) -> Result<(), Response> { + let mut conn = state + .pool + .writer_conn() + .await + .map_err(|e| ServerError(e.into()).into_response())?; + match submission::db::cancel_submission(submission_id, &mut conn).await { + Ok(_) => Ok(()), + Err(E::L(db_err)) => Err(ServerError(db_err.into()).into_response()), + Err(E::R(E::L(not_found_err))) => { + Err((StatusCode::NOT_FOUND, Json(not_found_err)).into_response()) + } + Err(E::R(E::R(not_cancellable_err))) => { + Err((StatusCode::CONFLICT, Json(not_cancellable_err)).into_response()) + } + } } async fn submission_status( diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 2cda3c3..0c0a3e5 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -17,7 +17,7 @@ use crate::db::DBPools; pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count"; pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count"; pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count"; -pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_failed_count"; +pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_cancelled_count"; pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds"; pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds"; pub const SUBMISSIONS_DURATION_CANCEL_HISTOGRAM: &str = "submissions_cancel_duration_seconds"; @@ -62,8 +62,16 @@ pub fn describe_metrics() { "Number of submissions cancelled permanently" ); describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions."); - describe_histogram!(SUBMISSIONS_DURATION_FAIL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being failed."); - describe_histogram!(SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being canceled."); + describe_histogram!( + SUBMISSIONS_DURATION_FAIL_HISTOGRAM, + Unit::Seconds, + "Time between a submission entering the system and its first chunk being failed." + ); + describe_histogram!( + SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, + Unit::Seconds, + "Time between a submission entering the system and its first chunk being canceled." + ); describe_counter!( CHUNKS_COMPLETED_COUNTER, From 7bb5cf129d64c998f96f5240cc3c32b4323d40c4 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 15:35:01 +0100 Subject: [PATCH 7/8] Bump version to 0.35.0 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- libs/opsqueue_python/python/opsqueue/exceptions.py | 2 +- libs/opsqueue_python/src/common.rs | 5 +++-- libs/opsqueue_python/src/errors.rs | 2 +- libs/opsqueue_python/tests/test_roundtrip.py | 8 +++++++- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e5d698..9c5a8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 061bc81..804d2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.0" +version = "0.35.0" [workspace.lints.clippy] diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 97602fd..f54c46b 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -49,7 +49,7 @@ class SubmissionNotCancellableError(Exception): def __init__( self, submission: opsqueue_internal.SubmissionNotCancellable, - chunk: Optional[opsqueue_internal.ChunkFailed]=None, + chunk: Optional[opsqueue_internal.ChunkFailed] = None, ): super().__init__() self.submission = submission diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 4a09b9a..5a1dabb 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -500,9 +500,10 @@ impl From for SubmissionNotC Failed(s, c) => { let chunk = ChunkFailed::from_internal(c, &s); SubmissionNotCancellable::Failed { - submission: s.into(), chunk + submission: s.into(), + chunk, } - }, + } Cancelled(s) => SubmissionNotCancellable::Cancelled { submission: s.into(), }, diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index a3481d6..f0c6850 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -4,7 +4,7 @@ use std::error::Error; use opsqueue::common::chunk::ChunkId; use opsqueue::common::errors::{ - ChunkNotFound, IncorrectUsage, SubmissionNotFound, SubmissionNotCancellable, + ChunkNotFound, IncorrectUsage, SubmissionNotCancellable, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index 143f688..a00ab17 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -360,6 +360,7 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert submission is not None assert_submission_failed_has_metadata(submission.submission) + def test_cancel_submission_not_found( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -374,6 +375,7 @@ def test_cancel_submission_not_found( producer_client.cancel_submission(SubmissionId(0)) assert exc_info.value.submission_id == submission_id + def test_cancel_in_progress_and_already_cancelled_submissions( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -383,7 +385,9 @@ def test_cancel_in_progress_and_already_cancelled_submissions( """ - url = "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + url = ( + "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + ) producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) status = producer_client.get_submission_status(submission_id) @@ -400,6 +404,7 @@ def test_cancel_in_progress_and_already_cancelled_submissions( producer_client.cancel_submission(submission_id) assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Cancelled) + def test_cancel_complete_submission( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -428,6 +433,7 @@ def run_consumer() -> None: assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Completed) assert exc_info.value.chunk is None + def test_cancel_failed_submission( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: From 0977184bab5c580eeec2bc3ac12b3938b49948d3 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Tue, 17 Mar 2026 16:46:01 +0100 Subject: [PATCH 8/8] fixup! Error handling when cancelling submission PR cleanup --- libs/opsqueue_python/python/opsqueue/exceptions.py | 5 +++-- libs/opsqueue_python/python/opsqueue/producer.py | 2 +- opsqueue/src/common/submission.rs | 8 +++----- opsqueue/src/prometheus.rs | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index f54c46b..e1ae2bb 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -40,12 +40,13 @@ def __repr__(self) -> str: class SubmissionNotCancellableError(Exception): - __slots__ = ["submission", "chunk"] """Raised when a submission could not be cancelled due to already being completed, failed or cancelled. """ + __slots__ = ["submission", "chunk"] + def __init__( self, submission: opsqueue_internal.SubmissionNotCancellable, @@ -106,7 +107,7 @@ class SubmissionNotFoundError(IncorrectUsageError): but the submission doesn't exist within the Opsqueue. """ - __slots = ["submission_id"] + __slots__ = ["submission_id"] def __init__( self, diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index fb75fdf..5c6a7f0 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -326,7 +326,7 @@ def cancel_submission(self, submission_id: SubmissionId) -> None: """ Cancel a specific submission that is in progress. - Returns None if the submission was succesfully cancelled. + Returns None if the submission was successfully cancelled. Raises: - `SubmissionNotCancellableError` if the submission could not be diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 9979d10..7c5986d 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -751,14 +751,12 @@ pub mod db { .fetch_optional(conn.get_inner()) .await?; match submission_opt { - None => { + None => Err(E::R(SubmissionNotFound(id))), + Some(_) => { + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), ); - Err(E::R(SubmissionNotFound(id))) - } - Some(_) => { - counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); Ok(()) } } diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 0c0a3e5..fc49359 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -70,7 +70,7 @@ pub fn describe_metrics() { describe_histogram!( SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, Unit::Seconds, - "Time between a submission entering the system and its first chunk being canceled." + "Time between a submission entering the system and it being cancelled." ); describe_counter!(