From 207b4ebabbbb23466a797f464ba9a8dbcf622549 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Tue, 26 May 2026 17:22:46 -0300 Subject: [PATCH 1/2] chore: spawn actors after sync --- bin/ntx-builder/src/actor/mod.rs | 137 +----- bin/ntx-builder/src/builder.rs | 177 ++++++-- bin/ntx-builder/src/chain_state.rs | 15 +- bin/ntx-builder/src/coordinator.rs | 391 +++++++----------- bin/ntx-builder/src/db/mod.rs | 59 +-- .../src/db/models/account_effect.rs | 18 +- bin/ntx-builder/src/db/models/conv.rs | 7 - .../src/db/models/queries/notes.rs | 20 + .../src/db/models/queries/tests.rs | 39 ++ bin/ntx-builder/src/lib.rs | 53 ++- bin/ntx-builder/src/test_utils.rs | 11 - 11 files changed, 426 insertions(+), 501 deletions(-) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index 3a79f4294..d4ebd79f2 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -9,7 +9,6 @@ use std::time::Duration; use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist}; use anyhow::Context; use candidate::TransactionCandidate; -use futures::FutureExt; use miden_node_proto::domain::account::NetworkAccountId; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; @@ -227,97 +226,33 @@ impl AccountActor { } } - /// Runs the account actor, processing events and managing state until shutdown. + /// Runs the account actor until shutdown. /// /// The return value signals the shutdown category to the coordinator: /// - /// - `Ok(())`: intentional shutdown (idle timeout or account removal). - /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). - pub async fn run(self, semaphore: Arc) -> anyhow::Result<()> { + /// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time). + /// - `Err(_)`: crash (database error or any other bug). + pub async fn run(self, _semaphore: Arc) -> anyhow::Result<()> { let account_id = self.account_id; // Wait for the account to be committed to the DB. For newly created accounts, the creation - // transaction must be committed before we start processing notes. + // transaction must be committed before the actor becomes active. if !self.wait_for_committed_account(account_id).await? { return Ok(()); } - // Determine initial mode by checking DB for available notes. - let block_num = self.state.chain.chain_tip_block_number(); - let has_notes = self - .state - .db - .has_available_notes(account_id, block_num, self.config.max_note_attempts) - .await - .context("failed to check for available notes")?; - - let mut mode = if has_notes { - ActorMode::NotesAvailable - } else { - ActorMode::NoViableNotes - }; - loop { - // Enable or disable transaction execution based on actor mode. - let tx_permit_acquisition = match mode { - // Disable transaction execution. - ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => { - std::future::pending().boxed() - }, - // Enable transaction execution. - ActorMode::NotesAvailable => semaphore.acquire().boxed(), - }; - - // Idle timeout timer: only ticks when in NoViableNotes mode. Mode changes cause the - // next loop iteration to create a fresh sleep or pending. - let idle_timeout_sleep = match mode { - ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(), - _ => std::future::pending().boxed(), - }; - tokio::select! { - // Handle coordinator notifications. On notification, re-evaluate state from DB. + // A committed block touched this account (or the coordinator woke everyone). PR 3 + // reconnects transaction execution here. _ = self.notify.notified() => { - match mode { - ActorMode::TransactionInflight(awaited_id) => { - // Check DB: is the inflight tx still pending? - let exists = self - .state - .db - .transaction_exists(awaited_id) - .await - .context("failed to check transaction status")?; - if exists { - mode = ActorMode::NotesAvailable; - } - }, - _ => { - mode = ActorMode::NotesAvailable; - } - } - }, - // Execute transactions. - permit = tx_permit_acquisition => { - let _permit = permit.context("semaphore closed")?; - - // Read the chain state. - let chain_state = self.state.chain.get_cloned(); - - // Query DB for latest account and available notes. - let tx_candidate = self.select_candidate_from_db( - account_id, - chain_state, - ).await?; - - if let Some(tx_candidate) = tx_candidate { - mode = self.execute_transactions(account_id, tx_candidate).await; - } else { - // No transactions to execute, wait for events. - mode = ActorMode::NoViableNotes; - } + tracing::debug!( + %account_id, + "actor notified; transaction execution reconnects in PR 3", + ); } - // Idle timeout: actor has been idle too long, deactivate account. - _ = idle_timeout_sleep => { + // Idle timeout: actor has been idle too long, deactivate. + () = tokio::time::sleep(self.config.idle_timeout) => { tracing::info!(%account_id, "Account actor deactivated due to idle timeout"); return Ok(()); } @@ -385,9 +320,8 @@ impl AccountActor { /// For accounts that are being created by an inflight transaction, this will idle /// until the transaction is committed. Returns `true` when the account is ready, or /// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case - /// the coordinator will respawn a new actor when the account reappears through - /// [`Coordinator::send_targeted`](crate::coordinator::Coordinator::send_targeted) or the - /// account loader. + /// the coordinator will respawn a new actor when a later committed block targets the + /// account again. async fn wait_for_committed_account( &self, account_id: NetworkAccountId, @@ -577,47 +511,6 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { #[cfg(test)] mod tests { - - use std::sync::Arc; - - use tokio::sync::{Notify, mpsc}; - - use super::*; - - const OTHER_NOTE_SCRIPT: &str = "\ -@note_script -pub proc main - push.1 drop -end"; - - async fn ack_actor_requests(mut rx: mpsc::Receiver, db: Db) { - while let Some(request) = rx.recv().await { - match request { - ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { - db.notes_failed(failed_notes, block_num) - .await - .expect("test DB write should succeed"); - let _ = ack_tx.send(()); - }, - ActorRequest::CacheNoteScript { .. } => {}, - } - } - } - - fn actor_with_request_handler( - db: &Db, - account_id: NetworkAccountId, - ) -> (AccountActor, AccountActorContext) { - let (request_tx, request_rx) = mpsc::channel(8); - let mut context = AccountActorContext::test(db); - context.request_tx = request_tx; - tokio::spawn(ack_actor_requests(request_rx, db.clone())); - - let actor = AccountActor::new(account_id, &context, Arc::new(Notify::new())); - - (actor, context) - } - #[tokio::test] #[ignore = "wip refactor"] async fn select_candidate_keeps_allowlisted_notes() { diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index 08a153fe2..da654b6b9 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -1,19 +1,32 @@ use std::pin::Pin; +use std::sync::Arc; use anyhow::Context; use futures::Stream; use miden_protocol::block::{BlockNumber, SignedBlock}; use tokio::net::TcpListener; +use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; use crate::NtxBuilderConfig; -use crate::chain_state::ChainState; +use crate::actor::ActorRequest; +use crate::chain_state::SharedChainState; use crate::clients::RpcError; use crate::committed_block::CommittedBlockEffects; +use crate::coordinator::Coordinator; use crate::db::Db; use crate::server::NtxBuilderRpcServer; +/// Discriminator returned by the steady-state `select!` so the dispatch can run on a fully-owned +/// `&mut self` instead of three concurrent borrows. The `Block` variant is boxed since a +/// `SignedBlock` dwarfs the other two payloads. +enum SteadyStateAction { + Block(Box>>), + Request(Option), + Respawn(Option), +} + // NETWORK TRANSACTION BUILDER // ================================================================================================ @@ -25,12 +38,18 @@ use crate::server::NtxBuilderRpcServer; pub(crate) type BlockStream = Pin> + Send>>; -/// Network transaction builder component (PR 1: subscription-driven sync only). +/// Network transaction builder component. /// -/// The builder consumes the RPC committed-block subscription and applies each block's -/// network-relevant effects to its local database. The actor execution path is wired back in a -/// subsequent PR; in this PR the binary stays up and keeps the local DB caught up to the live -/// chain tip without scheduling any network transactions. +/// Runs in three phases: +/// 1. **Catch-up**: drain the committed-block subscription, applying each block to the local DB +/// and in-memory chain, until the local tip matches the node-reported `committed_chain_tip` +/// (signaled by `is_synced` flipping to `true`). No actors run. +/// 2. **Boundary**: query the DB for accounts with carry-over pending notes (e.g. from a previous +/// process) and spawn an actor for each. +/// 3. **Steady-state**: on every subsequent committed block, apply the effects, advance the chain, +/// and have the coordinator spawn-if-missing for newly-targeted accounts then wake every active +/// actor. Concurrently drain actor requests (`NotesFailed`, `CacheNoteScript`) so the actors' +/// DB writes happen serialized through the builder. pub struct NetworkTransactionBuilder { /// Configuration for the builder. config: NtxBuilderConfig, @@ -40,13 +59,15 @@ pub struct NetworkTransactionBuilder { block_stream: BlockStream, /// Highest block number applied to the DB so far. last_applied_block: BlockNumber, - /// In-memory partial chain (tip header + chain MMR + tracked recent headers). Persisted - /// alongside each block in the DB so the builder can resume without replaying genesis on - /// restart. - chain: ChainState, + /// In-memory partial chain shared with every spawned actor through the coordinator. + chain: Arc, + /// Lifecycle owner for `AccountActor` instances. + coordinator: Coordinator, + /// Channel receiving DB-side requests (note-failed bookkeeping, script-cache persistence) from + /// spawned actors. Drained in the steady-state loop so writes happen through the builder. + actor_request_rx: mpsc::Receiver, /// `false` until the first applied block whose `committed_chain_tip` matches the just-applied - /// block number. Stays `true` afterwards. Exposed so the gRPC status surface and PR 2's actor - /// spawn gating can read it. + /// block number. Stays `true` afterwards. is_synced: bool, } @@ -56,7 +77,9 @@ impl NetworkTransactionBuilder { db: Db, block_stream: BlockStream, last_applied_block: BlockNumber, - chain: ChainState, + chain: Arc, + coordinator: Coordinator, + actor_request_rx: mpsc::Receiver, ) -> Self { Self { config, @@ -64,6 +87,8 @@ impl NetworkTransactionBuilder { block_stream, last_applied_block, chain, + coordinator, + actor_request_rx, is_synced: false, } } @@ -75,10 +100,6 @@ impl NetworkTransactionBuilder { } /// Runs the network transaction builder event loop until a fatal error occurs. - /// - /// 1. Starts the gRPC server for note status queries. - /// 2. Continuously drains the committed-block subscription, applying each block's effects to - /// the local DB. pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> { let mut join_set = JoinSet::new(); @@ -100,14 +121,9 @@ impl NetworkTransactionBuilder { } async fn run_event_loop(mut self) -> anyhow::Result<()> { - // First sync up to the chain tip. + // Phase 1: catch-up. loop { - let (block, committed_tip) = self - .block_stream - .next() - .await - .context("block stream ended")? - .context("block stream failed")?; + let (block, committed_tip) = self.next_block().await?; let local_tip = block.header().block_num(); self.apply_committed_block(block, committed_tip).await?; @@ -118,31 +134,96 @@ impl NetworkTransactionBuilder { } } - // Spawn and handle network account actors, and apply new blocks. + // Phase 2: spawn an actor for every account with carry-over pending notes. + let pending_accounts = self + .db + .accounts_with_pending_notes(self.config.max_note_attempts) + .await + .context("failed to load accounts with pending notes at catch-up")?; + tracing::info!( + num_accounts = pending_accounts.len(), + "spawning actors for accounts with carry-over pending notes", + ); + for account_id in pending_accounts { + self.coordinator.spawn_actor(account_id); + } + + // Phase 3: drive actors per committed block, plus serialize their DB writes. loop { - let (block, committed_tip) = self - .block_stream - .next() - .await - .context("block stream ended")? - .context("block stream failed")?; - self.apply_committed_block(block, committed_tip).await?; + // Split `&mut self` into disjoint borrows so each `select!` arm holds only the one + // field it polls. The action is materialised and self is released before the body + // dispatches the work via the regular `&mut self` methods. + let action = { + let block_stream = &mut self.block_stream; + let actor_request_rx = &mut self.actor_request_rx; + let coordinator = &mut self.coordinator; + + tokio::select! { + block = block_stream.next() => SteadyStateAction::Block(Box::new(block)), + request = actor_request_rx.recv() => SteadyStateAction::Request(request), + respawn = coordinator.next() => SteadyStateAction::Respawn(respawn?), + } + }; + + match action { + SteadyStateAction::Block(block) => { + let (block, committed_tip) = + (*block).context("block stream ended")?.context("block stream failed")?; + let effects = + self.apply_committed_block_with_effects(block, committed_tip).await?; + self.coordinator.handle_committed_block(&effects); + }, + SteadyStateAction::Request(request) => { + let Some(request) = request else { + anyhow::bail!("actor request channel closed unexpectedly"); + }; + handle_actor_request(&self.db, request).await?; + }, + SteadyStateAction::Respawn(respawn) => { + if let Some(account_id) = respawn { + tracing::info!( + account.id = %account_id, + "respawning actor that shut down with a pending notification", + ); + self.coordinator.spawn_actor(account_id); + } + }, + } } } - /// Applies a single committed block's effects to the DB, advances the in-memory partial chain, - /// persists the updated chain MMR atomically with the effects, and flips `is_synced` the first - /// time the applied block matches the node-reported committed tip. + /// Pulls the next `(block, committed_tip)` pair from the subscription, surfacing both the + /// "stream ended" and per-item RPC errors as `anyhow::Error`. + async fn next_block(&mut self) -> anyhow::Result<(SignedBlock, BlockNumber)> { + self.block_stream + .next() + .await + .context("block stream ended")? + .context("block stream failed") + } + + /// Applies a committed block without surfacing the computed effects. + async fn apply_committed_block( + &mut self, + block: SignedBlock, + committed_tip: BlockNumber, + ) -> anyhow::Result<()> { + self.apply_committed_block_with_effects(block, committed_tip).await.map(drop) + } + + /// Applies a committed block and returns the computed `CommittedBlockEffects` so the + /// steady-state loop can hand them to the coordinator without re-deriving from the signed + /// block. #[tracing::instrument( name = "ntx.builder.apply_committed_block", skip(self, block), fields(block_num = %block.header().block_num(), %committed_tip), )] - async fn apply_committed_block( + async fn apply_committed_block_with_effects( &mut self, block: SignedBlock, committed_tip: BlockNumber, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let header = block.header().clone(); let block_num = header.block_num(); @@ -154,12 +235,30 @@ impl NetworkTransactionBuilder { let next_mmr = self.chain.current_mmr(); self.db - .apply_committed_block(effects, next_mmr) + .apply_committed_block(effects.clone(), next_mmr) .await .context("failed to apply committed block to DB")?; self.last_applied_block = block_num; - Ok(()) + Ok(effects) + } +} + +/// Handles a single actor request then acknowledges the actor. +async fn handle_actor_request(db: &Db, request: ActorRequest) -> anyhow::Result<()> { + match request { + ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { + db.notes_failed(failed_notes, block_num) + .await + .context("failed to persist note failure")?; + let _ = ack_tx.send(()); + }, + ActorRequest::CacheNoteScript { script_root, script } => { + db.insert_note_script(script_root, &script) + .await + .context("failed to cache note script")?; + }, } + Ok(()) } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index 033695c47..f8942131d 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -47,11 +47,6 @@ impl ChainState { (self.chain_tip_header, self.chain_mmr) } - /// Returns the current chain tip header. - pub(crate) fn chain_tip_header(&self) -> &BlockHeader { - &self.chain_tip_header - } - /// Returns a clone of the current partial chain MMR. pub(crate) fn current_mmr(&self) -> PartialMmr { self.chain_mmr.mmr().clone() @@ -96,10 +91,18 @@ impl SharedChainState { Self(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))) } + // Read by the actor execution path, which is unwired until PR 3. + #[expect(dead_code)] pub(crate) fn chain_tip_block_number(&self) -> BlockNumber { self.0.read().expect("chain state lock poisoned").chain_tip_header.block_num() } + /// Returns a clone of the current partial chain MMR. Cheap enough for per-block persistence + /// since the MMR is bounded by `max_block_count` headers. + pub(crate) fn current_mmr(&self) -> PartialMmr { + self.0.read().expect("chain state lock poisoned").current_mmr() + } + pub(crate) fn update_chain_tip(&self, tip: BlockHeader, max_block_count: usize) { self.0 .write() @@ -107,6 +110,8 @@ impl SharedChainState { .update_chain_tip(tip, max_block_count); } + // Read by the actor execution path (candidate selection), which is unwired until PR 3. + #[expect(dead_code)] pub(crate) fn get_cloned(&self) -> ChainState { self.0.read().expect("chain state lock poisoned").clone() } diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index 8705c2fb3..8ecdfb7ce 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -1,24 +1,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; -use miden_node_proto::domain::mempool::MempoolEvent; -use miden_protocol::account::delta::AccountUpdateDetails; use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use crate::actor::{AccountActor, AccountActorContext}; -use crate::db::Db; - -// WRITE EVENT RESULT -// ================================================================================================ - -/// Result of writing a mempool event to the database. -pub struct WriteEventResult { - /// Accounts that should be notified of state changes. - pub accounts_to_notify: Vec, -} +use crate::committed_block::CommittedBlockEffects; // ACTOR HANDLE // ================================================================================================ @@ -62,68 +50,34 @@ impl ActorHandle { // COORDINATOR // ================================================================================================ -/// Coordinator for managing [`AccountActor`] instances, tasks, and notifications. -/// -/// The `Coordinator` is the central orchestrator of the network transaction builder system. -/// It manages the lifecycle of account actors. Each actor is responsible for handling transactions -/// for a specific network account. The coordinator provides the following core -/// functionality: +/// Lifecycle owner for [`AccountActor`] instances driven by committed blocks. /// -/// ## Actor Management -/// - Spawns new [`AccountActor`] instances for network accounts as needed. -/// - Maintains a registry of active actors with their notification handles. -/// - Gracefully handles actor shutdown and cleanup when actors complete or fail. -/// - Monitors actor tasks through a join set to detect completion or errors. +/// The coordinator owns the actor-side context (gRPC clients, shared chain state, script cache, +/// per-actor config), the actor task join set, and a registry mapping each network account to a +/// notify handle. The builder calls into the coordinator at two moments: /// -/// ## Event Notification -/// - Notifies actors via a shared [`Notify`] when state may have changed. -/// - The DB is the source of truth: actors re-evaluate their state from DB on notification. -/// - Notifications are coalesced: [`Notify`] stores at most one permit, so multiple notifications -/// while an actor is busy result in a single wake-up. +/// 1. At the catch-up boundary, to spawn one actor per account returned by +/// `Db::accounts_with_pending_notes()`. +/// 2. On every committed block in steady state, via [`Coordinator::handle_committed_block`], which +/// spawns missing actors for accounts that just received new network notes and wakes every +/// active actor so it can re-evaluate its state from the DB. /// -/// ## Resource Management -/// - Controls transaction concurrency across all network accounts using a semaphore. -/// - Prevents resource exhaustion by limiting simultaneous transaction processing. -/// -/// ## Actor Lifecycle -/// - Actors that have been idle for longer than the idle timeout deactivate themselves. -/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor -/// timed out. If so, the actor is respawned immediately. -/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting -/// an account without an active actor. -/// -/// The coordinator operates in an event-driven manner: -/// 1. Network accounts are registered and actors spawned as needed. -/// 2. Mempool events are written to DB, then actors are notified. -/// 3. Actor completion/failure events are monitored and handled. -/// 4. Failed or completed actors are cleaned up from the registry. +/// Notifications are coalesced through [`Notify`]: multiple wakes while an actor is busy +/// collapse into one. Actors that crash repeatedly are deactivated after `max_account_crashes` +/// failures. pub struct Coordinator { /// Mapping of network account IDs to their notification handles. - /// - /// This registry serves as the primary directory for notifying active account actors. - /// When actors are spawned, they register their notification handle here. When events need - /// to be broadcast, this registry is used to locate the appropriate actors. The registry is - /// automatically cleaned up when actors complete their execution. actor_registry: HashMap, - /// Join set for managing actor tasks and monitoring their completion status. - /// - /// This join set allows the coordinator to wait for actor task completion and handle - /// different shutdown scenarios. When an actor task completes (either successfully or - /// due to an error), the corresponding entry is removed from the actor registry. + /// Join set tracking each spawned actor task; used to detect intentional shutdowns vs. crashes. actor_join_set: JoinSet<(NetworkAccountId, anyhow::Result<()>)>, - /// Semaphore for controlling the maximum number of concurrent transactions across all network - /// accounts. - /// - /// This shared semaphore prevents the system from becoming overwhelmed by limiting the total - /// number of transactions that can be processed simultaneously across all account actors. - /// Each actor must acquire a permit from this semaphore before processing a transaction, - /// ensuring fair resource allocation and system stability under load. + /// Shared transaction-execution semaphore handed to each spawned actor. semaphore: Arc, - /// Database for persistent state. - db: Db, + /// Shared resources needed to spawn an actor. Stored on the coordinator so spawns at runtime + /// don't need the builder to plumb context through every call site. + actor_context: AccountActorContext, /// Tracks the number of crashes per account actor. /// @@ -137,14 +91,18 @@ pub struct Coordinator { } impl Coordinator { - /// Creates a new coordinator with the specified maximum number of inflight transactions and the - /// crash threshold for account deactivation. - pub fn new(max_inflight_transactions: usize, max_account_crashes: usize, db: Db) -> Self { + /// Creates a new coordinator with the specified transaction concurrency limit and the per- + /// account crash threshold. + pub fn new( + max_inflight_transactions: usize, + max_account_crashes: usize, + actor_context: AccountActorContext, + ) -> Self { Self { actor_registry: HashMap::new(), actor_join_set: JoinSet::new(), semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), - db, + actor_context, crash_counts: HashMap::new(), max_account_crashes, } @@ -155,71 +113,64 @@ impl Coordinator { /// This method creates a new [`AccountActor`] instance for the specified account origin /// and adds it to the coordinator's management system. The actor will be responsible for /// processing transactions and managing state for the network account. - #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, actor_context))] - pub fn spawn_actor( - &mut self, - account_id: NetworkAccountId, - actor_context: &AccountActorContext, - ) { - // Skip spawning if the account has been deactivated due to repeated crashes. - if let Some(&count) = self.crash_counts.get(&account_id) { - if count >= self.max_account_crashes { - tracing::warn!( - account.id = %account_id, - crash_count = count, - "Account deactivated due to repeated crashes, skipping actor spawn" - ); - return; - } + #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self))] + pub fn spawn_actor(&mut self, account_id: NetworkAccountId) { + if let Some(&count) = self.crash_counts.get(&account_id) + && count >= self.max_account_crashes + { + tracing::warn!( + account.id = %account_id, + crash_count = count, + "Account deactivated due to repeated crashes, skipping actor spawn" + ); + return; } - // If an actor already exists for this account ID, something has gone wrong. Reject the - // spawn rather than replacing. if self.actor_registry.contains_key(&account_id) { tracing::error!( - account_id = %account_id, - "Account actor already exists" + account.id = %account_id, + "Account actor already exists", ); return; } let notify = Arc::new(Notify::new()); - let actor = AccountActor::new(account_id, actor_context, notify.clone()); + let actor = AccountActor::new(account_id, &self.actor_context, notify.clone()); let handle = ActorHandle::new(notify); - // Run the actor. Actor reads state from DB on startup. let semaphore = self.semaphore.clone(); self.actor_join_set .spawn(Box::pin(async move { (account_id, actor.run(semaphore).await) })); self.actor_registry.insert(account_id, handle); - tracing::info!(account_id = %account_id, "Created actor for account prefix"); + tracing::info!(account.id = %account_id, "Created actor for account"); } - /// Notifies specific account actors that state may have changed. + /// Reacts to a committed block: spawns actors for any newly-targeted network accounts and + /// wakes every active actor so it can re-evaluate its state. /// - /// Only actors that are currently active are notified. Each actor will re-evaluate its state - /// from the DB on the next iteration of its run loop. Notifications are coalesced: multiple - /// notifications while an actor is busy result in a single wake-up. - pub fn notify_accounts(&self, account_ids: &[NetworkAccountId]) { - for account_id in account_ids { - if let Some(handle) = self.actor_registry.get(account_id) { - handle.notify(); + pub fn handle_committed_block(&mut self, effects: &CommittedBlockEffects) { + let mut targeted: HashSet = HashSet::new(); + for note in &effects.network_notes { + targeted.insert(NetworkAccountId::new_unchecked(note.target_account_id())); + } + + for account_id in &targeted { + if !self.actor_registry.contains_key(account_id) { + self.spawn_actor(*account_id); } } + + for handle in self.actor_registry.values() { + handle.notify(); + } } /// Waits for the next actor to complete and handles the outcome. /// - /// This method monitors the join set for actor task completion and handles - /// different shutdown scenarios appropriately. It's designed to be called - /// in a loop to continuously monitor and manage actor lifecycles. - /// - /// If no actors are currently running, this method will wait indefinitely until - /// new actors are spawned. This prevents busy-waiting when the coordinator is idle. - /// - /// Returns `Some(account_id)` if an actor should be respawned (because a - /// notification arrived just as it shut down), or `None` otherwise. + /// Returns `Some(account_id)` if an actor should be respawned (because a notification arrived + /// just as it shut down on idle timeout), or `None` otherwise. If no actors are currently + /// running, this method waits indefinitely until new actors are spawned. pub async fn next(&mut self) -> anyhow::Result> { let actor_result = self.actor_join_set.join_next().await; match actor_result { @@ -235,7 +186,6 @@ impl Coordinator { Ok(should_respawn.then_some(account_id)) }, Some(Ok((account_id, Err(err)))) => { - // Actor crashed. Increment crash counter. let count = self.crash_counts.entry(account_id).or_insert(0); *count += 1; tracing::error!( @@ -255,117 +205,30 @@ impl Coordinator { }, } } - - /// Notifies account actors that are affected by a `TransactionAdded` event. - /// - /// Only actors that are currently active are notified. Since event effects are already - /// persisted in the DB by `write_event()`, actors that spawn later read their state from the - /// DB and do not need predating events. - /// - /// Returns account IDs of note targets that do not have active actors (e.g. previously - /// deactivated due to sterility). The caller can use this to re-activate actors for those - /// accounts. - pub fn send_targeted(&self, event: &MempoolEvent) -> Vec { - let mut target_account_ids = HashSet::new(); - let mut inactive_targets = Vec::new(); - - if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event { - // We need to inform the account if it was updated. This lets it know that its own - // transaction has been applied, and in the future also resolves race conditions with - // external network transactions (once these are allowed). - if let Some(AccountUpdateDetails::Delta(delta)) = account_delta { - // The actor registry only contains accounts the builder has already classified as - // network. Wrap the id unconditionally and let the registry lookup filter for us; - // unknown accounts simply won't match. - let network_account_id = NetworkAccountId::new_unchecked(delta.id()); - if self.actor_registry.contains_key(&network_account_id) { - target_account_ids.insert(network_account_id); - } - } - - // Determine target actors for each note. - for note in network_notes { - let account = note.target_account_id(); - let account = NetworkAccountId::new_unchecked(account); - - if self.actor_registry.contains_key(&account) { - target_account_ids.insert(account); - } else { - inactive_targets.push(account); - } - } - } - // Notify target actors. - for account_id in &target_account_ids { - if let Some(handle) = self.actor_registry.get(account_id) { - handle.notify(); - } - } - - inactive_targets - } - - /// Writes mempool event effects to the database. - /// - /// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`] - /// with the accounts to notify and cancel. - pub async fn write_event( - &self, - event: &MempoolEvent, - ) -> Result { - match event { - MempoolEvent::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - } => { - self.db - .handle_transaction_added( - *id, - account_delta.clone(), - network_notes.clone(), - nullifiers.clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: Vec::new() }) - }, - MempoolEvent::BlockCommitted { header, txs } => { - let affected_accounts = self - .db - .handle_block_committed( - txs.clone(), - header.block_num(), - header.as_ref().clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - MempoolEvent::TransactionsReverted(tx_ids) => { - let affected_accounts = - self.db.handle_transactions_reverted(tx_ids.iter().copied().collect()).await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - } - } } #[cfg(test)] impl Coordinator { - /// Creates a coordinator with default settings backed by a temp DB. - pub async fn test() -> (Self, tempfile::TempDir) { + /// Creates a coordinator with default settings backed by a temp DB. Returns the coordinator, + /// the temp dir holding the DB file, and the actor request receiver (drop it to discard, or + /// drive it from the test to inspect actor requests). + pub async fn test() + -> (Self, tempfile::TempDir, tokio::sync::mpsc::Receiver) { + use crate::db::Db; + let (db, dir) = Db::test_setup().await; - (Self::new(4, 10, db), dir) + let (tx, rx) = tokio::sync::mpsc::channel(8); + let mut actor_context = AccountActorContext::test(&db); + actor_context.request_tx = tx; + (Self::new(4, 10, actor_context), dir, rx) } } #[cfg(test)] mod tests { - use miden_node_proto::domain::mempool::MempoolEvent; + use futures::FutureExt; use super::*; - use crate::actor::AccountActorContext; - use crate::db::Db; use crate::test_utils::*; /// Registers a dummy actor handle (no real actor task) in the coordinator's registry. @@ -374,75 +237,109 @@ mod tests { coordinator.actor_registry.insert(account_id, ActorHandle::new(notify)); } - // SEND TARGETED TESTS - // ============================================================================================ - #[tokio::test] - async fn send_targeted_returns_inactive_targets() { - let (mut coordinator, _dir) = Coordinator::test().await; + async fn handle_committed_block_spawns_for_unknown_note_target() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let unknown_id = mock_network_account_id(); + let note = mock_single_target_note(unknown_id, 10); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![note], + nullifiers: vec![], + network_account_updates: vec![], + }; - let active_id = mock_network_account_id(); - let inactive_id = mock_network_account_id_seeded(42); + coordinator.handle_committed_block(&effects); - // Only register the active account. - register_dummy_actor(&mut coordinator, active_id); + assert!( + coordinator.actor_registry.contains_key(&unknown_id), + "previously-untouched account targeted by a note should get a fresh actor", + ); + } - let note_active = mock_single_target_note(active_id, 10); - let note_inactive = mock_single_target_note(inactive_id, 20); + #[tokio::test] + async fn handle_committed_block_does_not_spawn_for_account_update_only() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; - let event = MempoolEvent::TransactionAdded { - id: mock_tx_id(1), + let updated_id = mock_network_account_id(); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![], nullifiers: vec![], - network_notes: vec![note_active, note_inactive], - account_delta: None, + network_account_updates: vec![( + updated_id, + miden_protocol::account::delta::AccountUpdateDetails::Private, + )], }; - let inactive_targets = coordinator.send_targeted(&event); + coordinator.handle_committed_block(&effects); - assert_eq!(inactive_targets.len(), 1); - assert_eq!(inactive_targets[0], inactive_id); + assert!( + !coordinator.actor_registry.contains_key(&updated_id), + "an account update without a new note should not trigger an actor spawn", + ); } - // DEACTIVATED ACCOUNTS - // ============================================================================================ - #[tokio::test] async fn spawn_actor_skips_deactivated_account() { - let (db, _dir) = Db::test_setup().await; - let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); - let actor_context = AccountActorContext::test(&db); + let (mut coordinator, _dir, _rx) = Coordinator::test().await; let account_id = mock_network_account_id(); + coordinator.crash_counts.insert(account_id, coordinator.max_account_crashes); - // Simulate the account having reached the crash threshold. - coordinator.crash_counts.insert(account_id, max_crashes); - - coordinator.spawn_actor(account_id, &actor_context); + coordinator.spawn_actor(account_id); assert!( !coordinator.actor_registry.contains_key(&account_id), - "Deactivated account should not have an actor in the registry" + "deactivated account should not have an actor in the registry", ); } #[tokio::test] async fn spawn_actor_allows_below_threshold() { - let (db, _dir) = Db::test_setup().await; - let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); - let actor_context = AccountActorContext::test(&db); + let (mut coordinator, _dir, _rx) = Coordinator::test().await; let account_id = mock_network_account_id(); + coordinator + .crash_counts + .insert(account_id, coordinator.max_account_crashes.saturating_sub(1)); - // Set crash count below the threshold. - coordinator.crash_counts.insert(account_id, max_crashes - 1); - - coordinator.spawn_actor(account_id, &actor_context); + coordinator.spawn_actor(account_id); assert!( coordinator.actor_registry.contains_key(&account_id), - "Account below crash threshold should have an actor in the registry" + "account below crash threshold should have an actor in the registry", + ); + } + + #[tokio::test] + async fn handle_committed_block_notifies_existing_actors() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let bystander = mock_network_account_id(); + register_dummy_actor(&mut coordinator, bystander); + let bystander_notify = coordinator.actor_registry.get(&bystander).unwrap().notify.clone(); + + let target = mock_network_account_id_seeded(42); + let note = mock_single_target_note(target, 10); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![note], + nullifiers: vec![], + network_account_updates: vec![], + }; + + coordinator.handle_committed_block(&effects); + + assert!( + bystander_notify.notified().now_or_never().is_some(), + "every registered actor should be notified on a committed block", + ); + + assert!( + coordinator.actor_registry.contains_key(&target), + "freshly-targeted account should get an actor", ); } } diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index 4c85efba0..209c9f856 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -139,6 +139,19 @@ impl Db { .await } + /// Returns the distinct set of network accounts that currently have at least one pending + /// (unconsumed, within attempt budget) note. + pub async fn accounts_with_pending_notes( + &self, + max_attempts: usize, + ) -> Result> { + self.inner + .query("accounts_with_pending_notes", move |conn| { + queries::accounts_with_pending_notes(conn, max_attempts) + }) + .await + } + /// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing /// the latest error message. pub async fn notes_failed( @@ -183,52 +196,6 @@ impl Db { .await } - // DEAD-CODE STUBS - // ============================================================================================ - // - // These methods exist to keep the dead actor/coordinator modules compiling in PR 1. They are - // never reached because `NetworkTransactionBuilder` does not spawn the actor path. PR 2 - // replaces them with their new committed-block-driven equivalents. - - #[expect(clippy::unused_async)] - pub async fn transaction_exists( - &self, - _tx_id: miden_protocol::transaction::TransactionId, - ) -> Result { - unimplemented!("transaction_exists is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_transaction_added( - &self, - _tx_id: miden_protocol::transaction::TransactionId, - _account_delta: Option, - _notes: Vec, - _nullifiers: Vec, - ) -> Result<()> { - unimplemented!("handle_transaction_added is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_block_committed( - &self, - _txs: Vec, - _block_num: BlockNumber, - _header: BlockHeader, - ) -> Result> { - unimplemented!("handle_block_committed is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_transactions_reverted( - &self, - _tx_ids: Vec, - ) -> Result> { - unimplemented!( - "handle_transactions_reverted is rewired in PR 2 of the ntx-builder refactor" - ) - } - /// Creates a file-backed SQLite test connection with migrations applied. #[cfg(test)] pub fn test_conn() -> (diesel::SqliteConnection, tempfile::TempDir) { diff --git a/bin/ntx-builder/src/db/models/account_effect.rs b/bin/ntx-builder/src/db/models/account_effect.rs index 6b01158bc..c586db8f8 100644 --- a/bin/ntx-builder/src/db/models/account_effect.rs +++ b/bin/ntx-builder/src/db/models/account_effect.rs @@ -1,6 +1,5 @@ -use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::account::{Account, AccountDelta, AccountId}; +use miden_protocol::account::{Account, AccountDelta}; use miden_standards::account::auth::NetworkAccount; // NETWORK ACCOUNT EFFECT @@ -33,19 +32,4 @@ impl NetworkAccountEffect { }, } } - - #[expect(dead_code)] - pub fn network_account_id(&self) -> NetworkAccountId { - // Trusted: constructors only produce this enum for accounts already classified as network - // (via the allowlist check above) or for updates that the caller filters through the actor - // registry. - NetworkAccountId::new_unchecked(self.protocol_account_id()) - } - - fn protocol_account_id(&self) -> AccountId { - match self { - NetworkAccountEffect::Created(acc) => acc.id(), - NetworkAccountEffect::Updated(delta) => delta.id(), - } - } } diff --git a/bin/ntx-builder/src/db/models/conv.rs b/bin/ntx-builder/src/db/models/conv.rs index 5ec6e24f2..67ed94fbb 100644 --- a/bin/ntx-builder/src/db/models/conv.rs +++ b/bin/ntx-builder/src/db/models/conv.rs @@ -6,7 +6,6 @@ use miden_protocol::Word; use miden_protocol::account::{Account, AccountId}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; use miden_protocol::utils::serde::{Deserializable, Serializable}; // SERIALIZATION (domain → DB) @@ -24,11 +23,6 @@ pub fn network_account_id_to_bytes(id: NetworkAccountId) -> Vec { id.inner().to_bytes() } -#[expect(dead_code)] -pub fn transaction_id_to_bytes(id: &TransactionId) -> Vec { - id.to_bytes() -} - pub fn nullifier_to_bytes(nullifier: &Nullifier) -> Vec { nullifier.to_bytes() } @@ -57,7 +51,6 @@ pub fn account_id_from_bytes(bytes: &[u8]) -> Result { AccountId::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("account id", e)) } -#[expect(dead_code)] pub fn network_account_id_from_bytes(bytes: &[u8]) -> Result { let account_id = account_id_from_bytes(bytes)?; Ok(NetworkAccountId::new_unchecked(account_id)) diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index 385c2587d..99702c58d 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -174,6 +174,26 @@ pub fn get_note_status( .map_err(Into::into) } +/// Returns the distinct set of network accounts that currently have at least one pending note +/// (unconsumed and within the per-note attempt budget). +#[expect(clippy::cast_possible_wrap)] +pub fn accounts_with_pending_notes( + conn: &mut SqliteConnection, + max_attempts: usize, +) -> Result, DatabaseError> { + let account_id_blobs: Vec> = schema::notes::table + .filter(schema::notes::committed_at.is_null()) + .filter(schema::notes::attempt_count.lt(max_attempts as i32)) + .select(schema::notes::account_id) + .distinct() + .load(conn)?; + + account_id_blobs + .iter() + .map(|bytes| conversions::network_account_id_from_bytes(bytes)) + .collect() +} + // HELPERS // ================================================================================================ diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index 7768d5e02..fcc2c07d5 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -222,6 +222,45 @@ fn note_script_cache_roundtrip() { // NOTE STATUS // ================================================================================================ +// ACCOUNTS WITH PENDING NOTES +// ================================================================================================ + +#[test] +fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { + let (conn, _dir) = &mut test_conn(); + let alice = mock_network_account_id(); + let bob = mock_network_account_id_seeded(42); + let carol = mock_network_account_id_seeded(99); + + let alice_note_1 = mock_single_target_note(alice, 1); + let alice_note_2 = mock_single_target_note(alice, 2); + let bob_note = mock_single_target_note(bob, 3); + let carol_note = mock_single_target_note(carol, 4); + + insert_network_notes( + conn, + &[alice_note_1.clone(), alice_note_2, bob_note.clone(), carol_note.clone()], + ) + .unwrap(); + + // Alice has two notes — must still appear exactly once (DISTINCT). Bob's only note is already + // consumed — exclude. + mark_notes_consumed(conn, &[bob_note.as_note().nullifier()], BlockNumber::from(7)).unwrap(); + // Carol's note has hit the attempt cap — exclude. + for _ in 0..30 { + notes_failed( + conn, + &[(carol_note.as_note().nullifier(), test_note_error("boom"))], + BlockNumber::from(5), + ) + .unwrap(); + } + + let pending = accounts_with_pending_notes(conn, 30).unwrap(); + assert_eq!(pending.len(), 1, "only alice should remain pending"); + assert_eq!(pending[0], alice); +} + #[test] fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index a9658562d..6779affdb 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -5,29 +5,32 @@ use std::time::Duration; use anyhow::Context; use builder::BlockStream; -use chain_state::ChainState; +use chain_state::SharedChainState; use clients::RpcClient; use db::Db; use futures::StreamExt; use miden_node_utils::ErrorReport; +use miden_node_utils::lru_cache::LruCache; use miden_protocol::block::BlockNumber; use miden_protocol::crypto::merkle::mmr::PartialMmr; +use miden_remote_prover_client::RemoteTransactionProver; +use tokio::sync::mpsc; use url::Url; +use crate::actor::{AccountActorContext, ActorConfig, GrpcClients, State}; use crate::committed_block::CommittedBlockEffects; +use crate::coordinator::Coordinator; pub(crate) type NoteError = Arc; -// PR 1 of the block-subscription refactor leaves the actor execution path in tree but unwired. It -// is restored by PR 2 +// PR 2 spawns actors and runs their lifecycle (wait-for-account + notify/idle), but the transaction +// execution path (candidate selection, proving, submission) stays unwired until PR 3 reconnects it. #[expect(dead_code)] mod actor; mod builder; -#[expect(dead_code)] mod chain_state; mod clients; mod committed_block; -#[expect(dead_code)] mod coordinator; pub(crate) mod db; pub mod server; @@ -325,7 +328,7 @@ impl NtxBuilderConfig { let (chain, last_applied_block) = if let Some((block_num, header, mmr)) = stored_chain_state { - (ChainState::new(header, mmr), block_num) + (SharedChainState::new(header, mmr), block_num) } else { // Fresh DB: consume the genesis block inline so the in-memory chain state is non- empty // before the steady-state loop runs. @@ -346,8 +349,42 @@ impl NtxBuilderConfig { .await .context("failed to apply genesis block during bootstrap")?; - (ChainState::new(genesis_header, PartialMmr::default()), BlockNumber::GENESIS) + ( + SharedChainState::new(genesis_header, PartialMmr::default()), + BlockNumber::GENESIS, + ) + }; + let chain = Arc::new(chain); + + // Wire the actor context + coordinator. The actor request channel is owned by the builder + // (receiver) and cloned into every spawned actor (sender) so all DB writes from actors + // serialize through the builder's event loop. + let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity); + let actor_context = AccountActorContext { + clients: GrpcClients { + rpc: rpc.clone(), + prover: self + .tx_prover_url + .clone() + .map(|url| RemoteTransactionProver::new(url.as_str())), + }, + state: State { + db: db.clone(), + chain: chain.clone(), + script_cache: LruCache::new(self.script_cache_size), + }, + config: ActorConfig { + max_notes_per_tx: self.max_notes_per_tx, + max_note_attempts: self.max_note_attempts, + idle_timeout: self.idle_timeout, + max_cycles: self.max_cycles, + request_backoff_initial: self.request_backoff_initial, + request_backoff_max: self.request_backoff_max, + }, + request_tx, }; + let coordinator = + Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context); Ok(NetworkTransactionBuilder::new( self, @@ -355,6 +392,8 @@ impl NtxBuilderConfig { block_stream, last_applied_block, chain, + coordinator, + actor_request_rx, )) } } diff --git a/bin/ntx-builder/src/test_utils.rs b/bin/ntx-builder/src/test_utils.rs index 1ebabe66f..6f14ee1ef 100644 --- a/bin/ntx-builder/src/test_utils.rs +++ b/bin/ntx-builder/src/test_utils.rs @@ -8,7 +8,6 @@ use miden_protocol::testing::account_id::{ ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE, AccountIdBuilder, }; -use miden_protocol::transaction::TransactionId; use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint}; use miden_standards::testing::note::NoteBuilder; use rand_chacha::ChaCha20Rng; @@ -29,16 +28,6 @@ pub fn mock_network_account_id_seeded(seed: u8) -> NetworkAccountId { NetworkAccountId::new_unchecked(account_id) } -/// Creates a unique `TransactionId` from a seed value. -pub fn mock_tx_id(seed: u64) -> TransactionId { - use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET; - - let w = |n: u64| Word::try_from([n, 0, 0, 0]).unwrap(); - let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); - let fee = miden_protocol::asset::FungibleAsset::new(faucet_id, 0).unwrap(); - TransactionId::new(w(seed), w(seed + 1), w(seed + 2), w(seed + 3), fee) -} - /// Creates a `AccountTargetNetworkNote` targeting the given network account. pub fn mock_single_target_note( network_account_id: NetworkAccountId, From 7b28d597a456eb573109b234eaeaf0e1c9c53131 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 27 May 2026 11:31:52 -0300 Subject: [PATCH 2/2] review: use pinned connection --- bin/ntx-builder/src/builder.rs | 43 +++++++++++----- bin/ntx-builder/src/db/mod.rs | 67 ++++++++++++++++++++++++ bin/ntx-builder/src/lib.rs | 8 +++ crates/db/src/lib.rs | 93 +++++++++++++++++++++++++--------- 4 files changed, 172 insertions(+), 39 deletions(-) diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index da654b6b9..a26a247b8 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -15,7 +15,7 @@ use crate::chain_state::SharedChainState; use crate::clients::RpcError; use crate::committed_block::CommittedBlockEffects; use crate::coordinator::Coordinator; -use crate::db::Db; +use crate::db::{Db, LoopDb}; use crate::server::NtxBuilderRpcServer; /// Discriminator returned by the steady-state `select!` so the dispatch can run on a fully-owned @@ -121,11 +121,19 @@ impl NetworkTransactionBuilder { } async fn run_event_loop(mut self) -> anyhow::Result<()> { + // Pin a dedicated connection for the loop's DB writes so block application is never starved + // by the account actors competing for the shared pool. + let loop_db = self + .db + .pin_loop_connection() + .await + .context("failed to pin a database connection for the ntx-builder event loop")?; + // Phase 1: catch-up. loop { let (block, committed_tip) = self.next_block().await?; let local_tip = block.header().block_num(); - self.apply_committed_block(block, committed_tip).await?; + self.apply_committed_block(&loop_db, block, committed_tip).await?; if local_tip == committed_tip { self.is_synced = true; @@ -135,8 +143,7 @@ impl NetworkTransactionBuilder { } // Phase 2: spawn an actor for every account with carry-over pending notes. - let pending_accounts = self - .db + let pending_accounts = loop_db .accounts_with_pending_notes(self.config.max_note_attempts) .await .context("failed to load accounts with pending notes at catch-up")?; @@ -169,15 +176,16 @@ impl NetworkTransactionBuilder { SteadyStateAction::Block(block) => { let (block, committed_tip) = (*block).context("block stream ended")?.context("block stream failed")?; - let effects = - self.apply_committed_block_with_effects(block, committed_tip).await?; + let effects = self + .apply_committed_block_with_effects(&loop_db, block, committed_tip) + .await?; self.coordinator.handle_committed_block(&effects); }, SteadyStateAction::Request(request) => { let Some(request) = request else { anyhow::bail!("actor request channel closed unexpectedly"); }; - handle_actor_request(&self.db, request).await?; + handle_actor_request(&loop_db, request).await?; }, SteadyStateAction::Respawn(respawn) => { if let Some(account_id) = respawn { @@ -205,10 +213,13 @@ impl NetworkTransactionBuilder { /// Applies a committed block without surfacing the computed effects. async fn apply_committed_block( &mut self, + loop_db: &LoopDb, block: SignedBlock, committed_tip: BlockNumber, ) -> anyhow::Result<()> { - self.apply_committed_block_with_effects(block, committed_tip).await.map(drop) + self.apply_committed_block_with_effects(loop_db, block, committed_tip) + .await + .map(drop) } /// Applies a committed block and returns the computed `CommittedBlockEffects` so the @@ -216,11 +227,12 @@ impl NetworkTransactionBuilder { /// block. #[tracing::instrument( name = "ntx.builder.apply_committed_block", - skip(self, block), + skip(self, loop_db, block), fields(block_num = %block.header().block_num(), %committed_tip), )] async fn apply_committed_block_with_effects( &mut self, + loop_db: &LoopDb, block: SignedBlock, committed_tip: BlockNumber, ) -> anyhow::Result { @@ -234,7 +246,7 @@ impl NetworkTransactionBuilder { self.chain.update_chain_tip(header, self.config.max_block_count); let next_mmr = self.chain.current_mmr(); - self.db + loop_db .apply_committed_block(effects.clone(), next_mmr) .await .context("failed to apply committed block to DB")?; @@ -245,17 +257,20 @@ impl NetworkTransactionBuilder { } } -/// Handles a single actor request then acknowledges the actor. -async fn handle_actor_request(db: &Db, request: ActorRequest) -> anyhow::Result<()> { +/// Handles a single actor request then acknowledges the actor. Runs on the pinned loop connection +/// so the actors' shared pool cannot starve these writes. +async fn handle_actor_request(loop_db: &LoopDb, request: ActorRequest) -> anyhow::Result<()> { match request { ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { - db.notes_failed(failed_notes, block_num) + loop_db + .notes_failed(failed_notes, block_num) .await .context("failed to persist note failure")?; let _ = ack_tx.send(()); }, ActorRequest::CacheNoteScript { script_root, script } => { - db.insert_note_script(script_root, &script) + loop_db + .insert_note_script(script_root, &script) .await .context("failed to cache note script")?; }, diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index 209c9f856..42a54eb6f 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -196,6 +196,13 @@ impl Db { .await } + /// Pins a dedicated connection for the builder's event loop, returning a [`LoopDb`]. + pub async fn pin_loop_connection(&self) -> Result { + Ok(LoopDb { + conn: self.inner.pinned_connection().await?, + }) + } + /// Creates a file-backed SQLite test connection with migrations applied. #[cfg(test)] pub fn test_conn() -> (diesel::SqliteConnection, tempfile::TempDir) { @@ -222,3 +229,63 @@ impl Db { (db, dir) } } + +/// The subset of write operations the builder's event loop performs, bound to a connection pinned +/// out of [`Db`]'s pool. Routing the loop's writes here keeps block application off the shared pool +/// that the account actors hammer, so the loop is never starved of a connection. +pub struct LoopDb { + conn: miden_node_db::PinnedConnection, +} + +impl LoopDb { + /// Applies a committed block's effects (see [`Db::apply_committed_block`]) on the pinned + /// connection. + pub async fn apply_committed_block( + &self, + effects: CommittedBlockEffects, + chain_mmr: PartialMmr, + ) -> Result> { + self.conn + .transact("apply_committed_block", move |conn| { + queries::apply_committed_block(conn, &effects, &chain_mmr) + }) + .await + } + + /// Returns the network accounts with carry-over pending notes (see + /// [`Db::accounts_with_pending_notes`]) on the pinned connection. + pub async fn accounts_with_pending_notes( + &self, + max_attempts: usize, + ) -> Result> { + self.conn + .query("accounts_with_pending_notes", move |conn| { + queries::accounts_with_pending_notes(conn, max_attempts) + }) + .await + } + + /// Marks notes as failed (see [`Db::notes_failed`]) on the pinned connection. + pub async fn notes_failed( + &self, + failed_notes: Vec<(Nullifier, NoteError)>, + block_num: BlockNumber, + ) -> Result<()> { + self.conn + .transact("notes_failed", move |conn| { + queries::notes_failed(conn, &failed_notes, block_num) + }) + .await + } + + /// Persists a note script to the local cache (see [`Db::insert_note_script`]) on the pinned + /// connection. + pub async fn insert_note_script(&self, script_root: Word, script: &NoteScript) -> Result<()> { + let script = script.clone(); + self.conn + .transact("insert_note_script", move |conn| { + queries::insert_note_script(conn, &script_root, &script) + }) + .await + } +} diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index 6779affdb..adf8a5c87 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -291,6 +291,14 @@ impl NtxBuilderConfig { /// - The RPC connection fails (after retries) /// - The genesis block cannot be read from the subscription on a fresh start pub async fn build(self) -> anyhow::Result { + // The event loop pins one connection for itself (so block application is never starved by + // the account actors), leaving the rest of the pool for actors and the gRPC server. That + // requires at least two connections. + anyhow::ensure!( + self.sqlite_connection_pool_size.get() >= 2, + "sqlite connection pool size must be at least 2 (the event loop pins one connection)", + ); + // Set up the database (bootstrap + connection pool). let db = Db::setup_with_pool_size( self.database_filepath.clone(), diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index d1a3b81b3..2da3b5eb6 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -49,6 +49,21 @@ impl Db { Ok(Self { pool }) } + /// Checks out a connection from the pool and pins it for the caller's exclusive, long-lived + /// use. See [`PinnedConnection`]. + /// + /// This removes one connection from the shared pool for the lifetime of the returned handle, + /// so the pool must be sized to leave at least one connection for other users. + pub async fn pinned_connection(&self) -> Result { + let conn = self + .pool + .get() + .in_current_span() + .await + .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; + Ok(PinnedConnection { conn }) + } + /// Create and commit a transaction with the queries added in the provided closure pub async fn transact(&self, msg: M, query: Q) -> std::result::Result where @@ -61,20 +76,7 @@ impl Db { E: From, E: std::error::Error + Send + Sync + 'static, { - let conn = self - .pool - .get() - .in_current_span() - .await - .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; - - let span = tracing::Span::current(); - conn.interact(move |conn| { - let _guard = span.enter(); - <_ as diesel::Connection>::transaction::(conn, query) - }) - .await - .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + self.pinned_connection().await.map_err(E::from)?.transact(msg, query).await } /// Run the query _without_ a transaction @@ -86,19 +88,60 @@ impl Db { E: From, E: std::error::Error + Send + Sync + 'static, { - let conn = self - .pool - .get() + self.pinned_connection().await.map_err(E::from)?.query(msg, query).await + } +} + +/// A connection checked out of [`Db`]'s pool and held for the caller's exclusive, long-lived use. +/// +/// A hot event loop can pin a connection so its queries never wait on the shared pool even when +/// many concurrent tasks are saturating it. `transact`/`query` mirror [`Db`]'s, but run on the +/// pinned connection rather than acquiring one per call. The connection is returned to the pool +/// when the `PinnedConnection` is dropped. +pub struct PinnedConnection { + conn: deadpool::managed::Object, +} + +impl PinnedConnection { + /// Create and commit a transaction with the queries added in the provided closure, running on + /// the pinned connection. + pub async fn transact(&self, msg: M, query: Q) -> std::result::Result + where + Q: Send + + for<'a, 't> FnOnce(&'a mut SqliteConnection) -> std::result::Result + + 'static, + R: Send + 'static, + M: Send + ToString, + E: From, + E: From, + E: std::error::Error + Send + Sync + 'static, + { + let span = tracing::Span::current(); + self.conn + .interact(move |conn| { + let _guard = span.enter(); + <_ as diesel::Connection>::transaction::(conn, query) + }) .await - .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; + .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + } + /// Run the query _without_ a transaction on the pinned connection. + pub async fn query(&self, msg: M, query: Q) -> std::result::Result + where + Q: Send + FnOnce(&mut SqliteConnection) -> std::result::Result + 'static, + R: Send + 'static, + M: Send + ToString, + E: From, + E: std::error::Error + Send + Sync + 'static, + { let span = tracing::Span::current(); - conn.interact(move |conn| { - let _guard = span.enter(); - let r = query(conn)?; - Ok(r) - }) - .await - .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + self.conn + .interact(move |conn| { + let _guard = span.enter(); + query(conn) + }) + .await + .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? } }