diff --git a/CHANGELOG.md b/CHANGELOG.md index 7897fab72..ff3e3b242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - [BREAKING] Removed `--wallet-filepath` / `--counter-filepath` flags and the `MIDEN_MONITOR_WALLET_FILEPATH` / `MIDEN_MONITOR_COUNTER_FILEPATH` env vars from the network monitor. The monitor now keeps wallet and counter accounts fully in memory and regenerates them on every startup; the dashboard's counter value resets to zero on restart. - Added `--counter-pending-unhealthy-threshold` (env `MIDEN_MONITOR_COUNTER_PENDING_UNHEALTHY_THRESHOLD`, default `5`) to the network monitor: the Network Transactions card now flips unhealthy when the gap between expected and observed counter values stays above the threshold for three consecutive polls. - Allowed network transaction submission conditionally via the gRPC `SubmitProvenTx` and `SubmitProvenTxBatch` endpoints: the NTX builder can now send a key in the `x-miden-network-tx-auth` header that enables submitting network transactions ([#2131](https://github.com/0xMiden/node/issues/2131)). +- Added `--tx-expiration-delta` (env `MIDEN_NODE_NTX_BUILDER_TX_EXPIRATION_DELTA`, default `30`) to the network transaction builder: submitted network transactions now expire on-chain after this many blocks, and the builder reuses the same delta as the local window before resubmitting a transaction that has not landed ([#2148](https://github.com/0xMiden/node/pull/2148)). ## v0.14.11 (TBD) diff --git a/bin/ntx-builder/src/actor/execute.rs b/bin/ntx-builder/src/actor/execute.rs index 063cd19f5..f4b858908 100644 --- a/bin/ntx-builder/src/actor/execute.rs +++ b/bin/ntx-builder/src/actor/execute.rs @@ -32,6 +32,7 @@ use miden_protocol::transaction::{ TransactionArgs, TransactionId, TransactionInputs, + TransactionScript, }; use miden_protocol::vm::FutureMaybeSend; use miden_remote_prover_client::RemoteTransactionProver; @@ -153,18 +154,27 @@ pub struct NtxContext { /// Maximum number of VM execution cycles for network transactions. max_cycles: u32, + /// Pre-compiled transaction script that sets the network tx's on-chain expiration delta. Cloned + /// into the [`TransactionArgs`] of the executed transaction. + expiration_script: TransactionScript, + /// [`ExponentialBuilder`] used to back off retries on transient request failures. request_backoff: ExponentialBuilder, } impl NtxContext { /// Creates a new [`NtxContext`] instance. + #[expect( + clippy::too_many_arguments, + reason = "execution context aggregates actor resources" + )] pub fn new( prover: Option, rpc: RpcClient, script_cache: LruCache, db: Db, max_cycles: u32, + expiration_script: TransactionScript, request_backoff_initial: Duration, request_backoff_max: Duration, ) -> Self { @@ -175,6 +185,7 @@ impl NtxContext { script_cache, db, max_cycles, + expiration_script, request_backoff, } } @@ -369,11 +380,15 @@ impl NtxContext { ) -> NtxResult { let executor = self.create_executor(data_store); + // Attach the pre-compiled expiration script so the submitted tx is rejected on-chain if it + // does not land within the configured block delta. + let tx_args = TransactionArgs::default().with_tx_script(self.expiration_script.clone()); + Box::pin(executor.execute_transaction( data_store.account.id(), data_store.reference_block.block_num(), notes, - TransactionArgs::default(), + tx_args, )) .await .map_err(NtxError::Execution) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index affc33214..98a03bca3 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -9,14 +9,16 @@ use std::time::Duration; use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist}; use anyhow::Context; use candidate::TransactionCandidate; +use futures::FutureExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; +use miden_protocol::transaction::TransactionScript; use miden_remote_prover_client::RemoteTransactionProver; +use miden_standards::code_builder::CodeBuilder; use miden_tx::FailedNote; use tokio::sync::{Notify, Semaphore, mpsc}; @@ -25,6 +27,24 @@ use crate::chain_state::{ChainState, SharedChainState}; use crate::clients::RpcClient; use crate::db::Db; +/// Compiles the standalone transaction script that sets the on-chain expiration of a network +/// transaction to `delta` blocks. The script is account-independent, so the builder compiles it +/// once at startup and shares the resulting [`TransactionScript`] across all actors. +/// +/// ```masm +/// begin +/// push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta +/// end +/// ``` +pub(crate) fn expiration_tx_script(delta: u16) -> anyhow::Result { + let source = format!( + "begin\n push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta\nend" + ); + CodeBuilder::new() + .compile_tx_script(source) + .context("failed to compile network-tx expiration script") +} + // ACTOR REQUESTS // ================================================================================================ @@ -66,6 +86,9 @@ pub struct State { pub chain: Arc, /// Shared LRU cache for storing retrieved note scripts to avoid repeated RPC calls. pub script_cache: LruCache, + /// Pre-compiled transaction script that sets each network tx's on-chain expiration delta. + /// Shared into every executed transaction. + pub expiration_script: TransactionScript, } /// Per-actor configuration knobs. @@ -79,6 +102,9 @@ pub struct ActorConfig { pub idle_timeout: Duration, /// Maximum number of VM execution cycles for network transactions. pub max_cycles: u32, + /// Number of blocks after which a submitted transaction expires. Set as the on-chain expiration + /// delta and reused as the `WaitForBlock` retry timeout. + pub tx_expiration_delta: u16, /// Initial sleep applied between per-request retries on transient infrastructure failures /// (prover unreachable, RPC transport error, RPC gRPC hiccup). Doubles each retry up to /// [`Self::request_backoff_max`]. @@ -135,12 +161,15 @@ impl AccountActorContext { db: db.clone(), chain: chain_state, script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()), + expiration_script: expiration_tx_script(30) + .expect("expiration script should compile"), }, config: ActorConfig { max_notes_per_tx: NonZeroUsize::new(1).unwrap(), max_note_attempts: 1, idle_timeout: Duration::from_secs(60), max_cycles: 1 << 18, + tx_expiration_delta: 30, request_backoff_initial: Duration::from_millis(1), request_backoff_max: Duration::from_millis(10), }, @@ -155,9 +184,22 @@ impl AccountActorContext { /// The mode of operation that the account actor is currently performing. #[derive(Debug)] enum ActorMode { + /// No notes targeting this account are currently available. The actor sleeps on the idle + /// timeout and awaits a coordinator notification to re-evaluate. NoViableNotes, + /// Notes are available for consumption. The actor acquires a transaction permit and submits a + /// candidate. NotesAvailable, - TransactionInflight(TransactionId), + /// A network transaction has been submitted; the actor waits for it to land in a committed + /// block. Landing is detected from the local DB: `apply_committed_block` marks each consumed + /// nullifier with `committed_at` so no RPC roundtrip is needed. + WaitForBlock { + /// Nullifiers of the network notes consumed by the submitted transaction. + submitted_nullifiers: Vec, + /// Chain tip block number at submission. With [`ActorConfig::tx_expiration_delta`] this + /// bounds how long the actor waits before retrying. + submitted_at: BlockNumber, + }, } // ACCOUNT ACTOR @@ -231,28 +273,63 @@ impl AccountActor { /// The return value signals the shutdown category to the coordinator: /// /// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time). - /// - `Err(_)`: crash (database error or any other bug). - pub async fn run(self, _semaphore: Arc) -> anyhow::Result<()> { + /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). + pub async fn run(self, semaphore: Arc) -> anyhow::Result<()> { let account_id = self.account_id; // Wait for the account to be committed to the DB. For newly created accounts, the creation - // transaction must be committed before the actor becomes active. + // transaction must be committed before we start processing notes. if !self.wait_for_committed_account(account_id).await? { return Ok(()); } + // Determine initial mode by checking the DB for available notes. + let block_num = self.state.chain.chain_tip_block_number(); + let has_notes = self + .state + .db + .has_available_notes(account_id, block_num, self.config.max_note_attempts) + .await + .context("failed to check for available notes")?; + let mut mode = if has_notes { + ActorMode::NotesAvailable + } else { + ActorMode::NoViableNotes + }; + loop { + // Acquire an execution permit only when there are notes to process. + let tx_permit_acquisition = match mode { + ActorMode::NoViableNotes | ActorMode::WaitForBlock { .. } => { + std::future::pending().boxed() + }, + ActorMode::NotesAvailable => semaphore.acquire().boxed(), + }; + + // The idle timer only ticks while there is nothing to do. + let idle_timeout_sleep = match mode { + ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(), + _ => std::future::pending().boxed(), + }; + tokio::select! { - // A committed block touched this account (or the coordinator woke everyone). PR 3 - // reconnects transaction execution here. + // A committed block touched this account (or the coordinator woke everyone). _ = self.notify.notified() => { - tracing::debug!( - %account_id, - "actor notified; transaction execution reconnects in PR 3", - ); + mode = self.reevaluate_mode(account_id, mode).await?; + }, + // Execute a transaction once a permit is available. + permit = tx_permit_acquisition => { + let _permit = permit.context("semaphore closed")?; + let chain_state = self.state.chain.get_cloned(); + let tx_candidate = + self.select_candidate_from_db(account_id, chain_state).await?; + mode = match tx_candidate { + Some(candidate) => self.execute_transactions(account_id, candidate).await, + None => ActorMode::NoViableNotes, + }; } // Idle timeout: actor has been idle too long, deactivate. - () = tokio::time::sleep(self.config.idle_timeout) => { + () = idle_timeout_sleep => { tracing::info!(%account_id, "Account actor deactivated due to idle timeout"); return Ok(()); } @@ -260,6 +337,51 @@ impl AccountActor { } } + /// Decides the actor's next mode after a coordinator notification. + /// + /// - In `NoViableNotes`/`NotesAvailable`, a wake means the DB may now have new work; advance to + /// `NotesAvailable` and let the next `select_candidate` decide whether a real candidate + /// exists. + /// - In `WaitForBlock`, query whether the submitted transaction's nullifiers have all been + /// consumed by a committed block (the tx landed). If so, return to `NotesAvailable`. Else, if + /// `tx_expiration_delta` blocks have passed since submission, give up waiting and resume + /// candidate selection; otherwise stay in `WaitForBlock`. + async fn reevaluate_mode( + &self, + account_id: AccountId, + mode: ActorMode, + ) -> anyhow::Result { + match mode { + ActorMode::WaitForBlock { submitted_nullifiers, submitted_at } => { + let landed = self + .state + .db + .submitted_tx_landed(account_id, submitted_nullifiers.clone()) + .await + .context("failed to check submitted tx landing")?; + if landed { + return Ok(ActorMode::NotesAvailable); + } + + let chain_tip = self.state.chain.chain_tip_block_number(); + let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default(); + if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta) { + tracing::info!( + %account_id, + %submitted_at, + current_tip = %chain_tip, + delta = self.config.tx_expiration_delta, + "submitted tx not landed within expiration delta; retrying", + ); + return Ok(ActorMode::NotesAvailable); + } + + Ok(ActorMode::WaitForBlock { submitted_nullifiers, submitted_at }) + }, + _ => Ok(ActorMode::NotesAvailable), + } + } + /// Selects a transaction candidate by querying the DB. async fn select_candidate_from_db( &self, @@ -382,6 +504,7 @@ impl AccountActor { self.state.script_cache.clone(), self.state.db.clone(), self.config.max_cycles, + self.state.expiration_script.clone(), self.config.request_backoff_initial, self.config.request_backoff_max, ); @@ -406,11 +529,31 @@ impl AccountActor { "network transaction executed with some failed notes", ); self.cache_note_scripts(scripts_to_cache).await; + + // The nullifiers that actually went into the submitted tx are the candidate notes + // minus those rejected during consumability filtering. + let failed_nullifiers: std::collections::HashSet = + failed.iter().map(|f| f.note().nullifier()).collect(); + let submitted_nullifiers: Vec = notes + .iter() + .map(|n| n.as_note().nullifier()) + .filter(|nullifier| !failed_nullifiers.contains(nullifier)) + .collect(); + if !failed.is_empty() { let failed_notes = log_failed_notes(failed); self.mark_notes_failed(&failed_notes, block_num).await; } - ActorMode::TransactionInflight(tx_id) + + if submitted_nullifiers.is_empty() { + // Every input note was filtered out before submission. + ActorMode::NoViableNotes + } else { + ActorMode::WaitForBlock { + submitted_nullifiers, + submitted_at: block_num, + } + } }, // Transaction execution failed. Err(err) => { @@ -508,123 +651,18 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { #[cfg(test)] mod tests { - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_keeps_allowlisted_notes() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let note = mock_single_target_note(account_id, 10); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store(account_id, account, vec![note.clone()]) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed") - // .expect("allowed note should produce a candidate"); - - // assert_eq!(candidate.notes.len(), 1); - // assert_eq!(candidate.notes[0].as_note().nullifier(), note.as_note().nullifier()); - } - - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_marks_non_allowlisted_notes_failed() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let allowed_note = mock_single_target_note(account_id, 10); - // let rejected_note = - // mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT)); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store(account_id, account, vec![rejected_note.clone()]) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed"); - - // assert!(candidate.is_none()); - - // let status = db - // .get_note_status(rejected_note.as_note().id()) - // .await - // .expect("status query should succeed") - // .expect("note should exist"); - // assert_eq!(status.attempt_count, 1); - // assert!( - // status - // .last_error - // .as_deref() - // .expect("rejected note should record an error") - // .contains("not allowlisted") - // ); - } - - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_executes_allowed_notes_and_marks_rejected_notes_failed() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let allowed_note = mock_single_target_note(account_id, 10); - // let rejected_note = - // mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT)); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store( - // account_id, - // account, - // vec![allowed_note.clone(), rejected_note.clone()], - // ) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed") - // .expect("allowed note should remain"); - - // assert_eq!(candidate.notes.len(), 1); - // assert_eq!(candidate.notes[0].as_note().nullifier(), allowed_note.as_note().nullifier()); - - // let rejected_status = db - // .get_note_status(rejected_note.as_note().id()) - // .await - // .expect("status query should succeed") - // .expect("rejected note should exist"); - // assert_eq!(rejected_status.attempt_count, 1); + use super::expiration_tx_script; + + /// The expiration script must compile for the full valid delta range, and the delta must be + /// baked into the script (distinct deltas → distinct script roots), proving the on-chain + /// expiration value is actually carried rather than ignored. + #[test] + fn expiration_script_compiles_and_encodes_delta() { + let one = expiration_tx_script(1).expect("delta 1 should compile"); + let thirty = expiration_tx_script(30).expect("delta 30 should compile"); + let max = expiration_tx_script(u16::MAX).expect("delta u16::MAX should compile"); + + assert_ne!(one.root(), thirty.root(), "distinct deltas must yield distinct scripts"); + assert_ne!(thirty.root(), max.root(), "distinct deltas must yield distinct scripts"); } } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index 49025c0a2..68608164e 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -89,8 +89,6 @@ impl SharedChainState { Self(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))) } - // Read by the actor execution path, which is unwired until PR 3. - #[expect(dead_code)] pub(crate) fn chain_tip_block_number(&self) -> BlockNumber { self.0.read().expect("chain state lock poisoned").chain_tip_header.block_num() } @@ -108,8 +106,6 @@ impl SharedChainState { .update_chain_tip(tip, max_block_count); } - // Read by the actor execution path (candidate selection), which is unwired until PR 3. - #[expect(dead_code)] pub(crate) fn get_cloned(&self) -> ChainState { self.0.read().expect("chain state lock poisoned").clone() } diff --git a/bin/ntx-builder/src/commands/mod.rs b/bin/ntx-builder/src/commands/mod.rs index a6ff76d58..e808efec1 100644 --- a/bin/ntx-builder/src/commands/mod.rs +++ b/bin/ntx-builder/src/commands/mod.rs @@ -18,11 +18,13 @@ const ENV_RPC_AUTH_HEADER_VALUE: &str = "MIDEN_NODE_NTX_BUILDER_RPC_AUTH_HEADER_ const ENV_TX_PROVER_URL: &str = "MIDEN_NODE_NTX_BUILDER_NTX_PROVER_URL"; const ENV_SCRIPT_CACHE_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SCRIPT_CACHE_SIZE"; const ENV_MAX_CYCLES: &str = "MIDEN_NODE_NTX_BUILDER_MAX_CYCLES"; +const ENV_TX_EXPIRATION_DELTA: &str = "MIDEN_NODE_NTX_BUILDER_TX_EXPIRATION_DELTA"; const ENV_SQLITE_CONNECTION_POOL_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SQLITE_CONNECTION_POOL_SIZE"; const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); const DEFAULT_MAX_CYCLES: u32 = 1 << 18; +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -91,6 +93,19 @@ pub enum NtxBuilderCommand { )] max_tx_cycles: u32, + /// Number of blocks after which a submitted network transaction expires. + /// + /// Set as the on-chain transaction expiration delta and reused as the actor's local retry + /// timeout. Must be between 1 and 65535. + #[arg( + long = "tx-expiration-delta", + env = ENV_TX_EXPIRATION_DELTA, + default_value_t = DEFAULT_TX_EXPIRATION_DELTA, + value_parser = clap::value_parser!(u16).range(1..), + value_name = "NUM", + )] + tx_expiration_delta: u16, + /// Maximum number of SQLite connections in the ntx-builder database connection pool. #[arg( long = "sqlite.connection_pool_size", @@ -124,6 +139,7 @@ impl NtxBuilderCommand { idle_timeout, max_account_crashes, max_tx_cycles, + tx_expiration_delta, sqlite_connection_pool_size, data_directory, enable_otel: _, @@ -141,6 +157,7 @@ impl NtxBuilderCommand { .with_idle_timeout(idle_timeout) .with_max_account_crashes(max_account_crashes) .with_max_cycles(max_tx_cycles) + .with_tx_expiration_delta(tx_expiration_delta) .with_sqlite_connection_pool_size(sqlite_connection_pool_size); let config = match rpc_auth_header_value { Some(value) => config.with_rpc_auth_header(value), diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index cac8fb2e4..00c8f7de4 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -149,6 +149,20 @@ impl Db { .await } + /// Returns `true` when all the supplied nullifiers for `account_id` have been marked consumed + /// in a committed block. + pub async fn submitted_tx_landed( + &self, + account_id: AccountId, + nullifiers: Vec, + ) -> Result { + self.inner + .query("submitted_tx_landed", move |conn| { + queries::submitted_tx_landed(conn, account_id, &nullifiers) + }) + .await + } + /// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing /// the latest error message. pub async fn notes_failed( diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index a344e25a0..3394d3e24 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -193,6 +193,32 @@ pub fn accounts_with_pending_notes( .collect() } +/// Returns `true` if every one of `nullifiers` for `account_id` has been marked consumed +/// (`committed_at IS NOT NULL`), i.e. the actor's submitted transaction has landed in a committed +/// block. An empty `nullifiers` slice trivially returns `true`. +pub fn submitted_tx_landed( + conn: &mut SqliteConnection, + account_id: AccountId, + nullifiers: &[Nullifier], +) -> Result { + if nullifiers.is_empty() { + return Ok(true); + } + + let account_id_bytes = conversions::account_id_to_bytes(account_id); + let nullifier_bytes: Vec> = + nullifiers.iter().map(conversions::nullifier_to_bytes).collect(); + + let still_pending: i64 = schema::notes::table + .filter(schema::notes::account_id.eq(&account_id_bytes)) + .filter(schema::notes::nullifier.eq_any(&nullifier_bytes)) + .filter(schema::notes::committed_at.is_null()) + .count() + .get_result(conn)?; + + Ok(still_pending == 0) +} + // HELPERS // ================================================================================================ diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index fcc2c07d5..2e9a05541 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -261,6 +261,38 @@ fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { assert_eq!(pending[0], alice); } +// SUBMITTED-TX LANDING +// ================================================================================================ + +#[test] +fn submitted_tx_landed_detects_full_landing() { + let (conn, _dir) = &mut test_conn(); + let account_id = mock_network_account_id(); + let note_a = mock_single_target_note(account_id, 7); + let note_b = mock_single_target_note(account_id, 8); + insert_network_notes(conn, &[note_a.clone(), note_b.clone()]).unwrap(); + + let nullifiers = vec![note_a.as_note().nullifier(), note_b.as_note().nullifier()]; + + // Nothing consumed yet. + assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + + // Only one consumed. + mark_notes_consumed(conn, &[note_a.as_note().nullifier()], BlockNumber::from(3)).unwrap(); + assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + + // Both consumed. + mark_notes_consumed(conn, &[note_b.as_note().nullifier()], BlockNumber::from(4)).unwrap(); + assert!(submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); +} + +#[test] +fn submitted_tx_landed_empty_nullifiers_is_trivially_true() { + let (conn, _dir) = &mut test_conn(); + let account_id = mock_network_account_id(); + assert!(submitted_tx_landed(conn, account_id, &[]).unwrap()); +} + #[test] fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index d20f0c6dd..422eaee51 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -24,9 +24,6 @@ use crate::coordinator::Coordinator; pub(crate) type NoteError = Arc; -// PR 2 spawns actors and runs their lifecycle (wait-for-account + notify/idle), but the transaction -// execution path (candidate selection, proving, submission) stays unwired until PR 3 reconnects it. -#[expect(dead_code)] mod actor; mod builder; mod chain_state; @@ -89,6 +86,12 @@ const DEFAULT_REQUEST_BACKOFF_MAX: Duration = Duration::from_secs(30); /// `1 << 29` but network transactions should be much cheaper. const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19; +/// Default number of blocks after which a submitted network transaction expires. +/// +/// Used both as the on-chain transaction expiration delta and as the local retry timeout an actor +/// waits in `WaitForBlock` before resubmitting. Must be within the kernel's `1..=u16::MAX` range. +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; + // CONFIGURATION // ================================================================================================= @@ -144,6 +147,11 @@ pub struct NtxBuilderConfig { /// Defaults to 2^18 cycles. pub max_cycles: u32, + /// Number of blocks after which a submitted network transaction expires. Set as the on-chain + /// transaction expiration delta and reused as the local `WaitForBlock` retry timeout. Must be + /// within `1..=u16::MAX` (enforced by the transaction kernel). + pub tx_expiration_delta: u16, + /// Initial sleep applied between per-request retries on transient infrastructure failures (e.g. /// prover unreachable, RPC crash, transport error, RPC gRPC hiccup). Doubles on each retry up /// to [`Self::request_backoff_max`]. Per-note `attempt_count` is *not* advanced while retries @@ -175,6 +183,7 @@ impl NtxBuilderConfig { idle_timeout: DEFAULT_IDLE_TIMEOUT, max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES, max_cycles: DEFAULT_MAX_TX_CYCLES, + tx_expiration_delta: DEFAULT_TX_EXPIRATION_DELTA, request_backoff_initial: DEFAULT_REQUEST_BACKOFF_INITIAL, request_backoff_max: DEFAULT_REQUEST_BACKOFF_MAX, database_filepath, @@ -273,6 +282,14 @@ impl NtxBuilderConfig { self } + /// Sets the transaction expiration delta (in blocks). Also bounds the actor's `WaitForBlock` + /// retry timeout. + #[must_use] + pub fn with_tx_expiration_delta(mut self, delta: u16) -> Self { + self.tx_expiration_delta = delta; + self + } + /// Sets the per-request retry backoff bounds (initial sleep and cap) used when retrying /// transient infrastructure failures inside a single transaction attempt. #[must_use] @@ -384,28 +401,53 @@ impl NtxBuilderConfig { }; let chain = Arc::new(chain); - // Wire the actor context + coordinator. The actor request channel is owned by the builder - // (receiver) and cloned into every spawned actor (sender) so all DB writes from actors - // serialize through the builder's event loop. + let (coordinator, actor_request_rx) = + self.build_coordinator(rpc, db.clone(), chain.clone())?; + + Ok(NetworkTransactionBuilder::new( + self, + db, + block_stream, + last_applied_block, + chain, + coordinator, + actor_request_rx, + )) + } + + /// Builds the actor [`Coordinator`] and the channel over which spawned actors send their DB + /// writes back to the builder's event loop. + /// + /// The receiver is owned by the builder loop; the sender is cloned into every spawned actor so + /// all actor-side DB writes serialize through the loop. + fn build_coordinator( + &self, + rpc: RpcClient, + db: Db, + chain: Arc, + ) -> anyhow::Result<(Coordinator, mpsc::Receiver)> { let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity); let actor_context = AccountActorContext { clients: GrpcClients { - rpc: rpc.clone(), + rpc, prover: self .tx_prover_url .clone() .map(|url| RemoteTransactionProver::new(url.as_str())), }, state: State { - db: db.clone(), - chain: chain.clone(), + db, + chain, script_cache: LruCache::new(self.script_cache_size), + expiration_script: actor::expiration_tx_script(self.tx_expiration_delta) + .context("failed to compile network-tx expiration script")?, }, config: ActorConfig { max_notes_per_tx: self.max_notes_per_tx, max_note_attempts: self.max_note_attempts, idle_timeout: self.idle_timeout, max_cycles: self.max_cycles, + tx_expiration_delta: self.tx_expiration_delta, request_backoff_initial: self.request_backoff_initial, request_backoff_max: self.request_backoff_max, }, @@ -414,15 +456,7 @@ impl NtxBuilderConfig { let coordinator = Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context); - Ok(NetworkTransactionBuilder::new( - self, - db, - block_stream, - last_applied_block, - chain, - coordinator, - actor_request_rx, - )) + Ok((coordinator, actor_request_rx)) } }