From 069576b8a244967d003df667b3f0c9e3eeb62072 Mon Sep 17 00:00:00 2001 From: init4samwise Date: Tue, 3 Mar 2026 04:48:07 +0000 Subject: [PATCH 1/5] feat(bundle): filter bundles with stale host tx nonces before SimCache Adds nonce checking for host transactions in BundlePoller, similar to the existing TxPoller pattern. Bundles with stale host tx nonces are dropped before entering SimCache to prevent: - Wasted simulation cycles on bundles that will fail - ERROR log spam from nonce-too-low failures - Re-ingestion churn (~1s poll cycle) Each host transaction's nonce is compared against the sender's current nonce from the host provider. If any host tx has a stale nonce, the entire bundle is dropped with DEBUG-level logging. Closes ENG-1937 --- src/tasks/cache/bundle.rs | 89 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 79050821..94128e52 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -8,12 +8,19 @@ use signet_tx_cache::{ TxCacheError, types::{BundleKey, CachedBundle}, }; +use alloy::{ + consensus::{Transaction, transaction::SignerRecoverable}, + eips::Decodable2718, + primitives::Bytes, + providers::Provider, + rlp::Buf, +}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, trace, trace_span, warn}; +use tracing::{Instrument, debug, debug_span, warn, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -87,6 +94,76 @@ impl BundlePoller { Ok(all_bundles) } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Bundles with stale host transaction nonces are dropped to prevent them from + /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // If no host transactions, forward directly + if bundle.bundle.host_txs.is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Check each host transaction's nonce + for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { + let host_tx = match decode_tx(host_tx_bytes) { + Some(tx) => tx, + None => { + span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + return; + } + }; + + let sender = match host_tx.recover_signer() { + Ok(s) => s, + Err(_) => { + span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + return; + } + }; + + let tx_count = match host_provider.get_transaction_count(sender).await { + Ok(count) => count, + Err(_) => { + span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); + return; + } + }; + + if host_tx.nonce() < tx_count { + debug!( + parent: &span, + %sender, + tx_nonce = %host_tx.nonce(), + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return; + } + } + + // All host txs have valid nonces, forward the bundle + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -106,10 +183,7 @@ impl BundlePoller { counter!("signet.builder.cache.bundle_poll_count").increment(1); if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } } @@ -126,3 +200,8 @@ impl BundlePoller { (inbound, jh) } } + +/// Decodes a transaction from RLP-encoded bytes. +fn decode_tx(bytes: &Bytes) -> Option { + alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() +} From 5c62ce6aad5813d31bed253cfb29d92b9c295aab Mon Sep 17 00:00:00 2001 From: init4samwise Date: Thu, 5 Mar 2026 00:12:52 +0000 Subject: [PATCH 2/5] style: run cargo fmt --- src/tasks/cache/bundle.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 94128e52..0a6bac89 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -123,7 +123,11 @@ impl BundlePoller { let host_tx = match decode_tx(host_tx_bytes) { Some(tx) => tx, None => { - span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + span_debug!( + span, + idx, + "Failed to decode host transaction, dropping bundle" + ); return; } }; @@ -131,7 +135,11 @@ impl BundlePoller { let sender = match host_tx.recover_signer() { Ok(s) => s, Err(_) => { - span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + span_debug!( + span, + idx, + "Failed to recover sender from host tx, dropping bundle" + ); return; } }; From 6747eaf14f7f29705a6705e747aa1b2249f673a1 Mon Sep 17 00:00:00 2001 From: init4samwise Date: Mon, 9 Mar 2026 23:55:31 +0000 Subject: [PATCH 3/5] refactor: use FuturesUnordered and reuse validity checks - Refactored bundle processing to use FuturesUnordered for concurrent execution - Added cancellation on first failure - Reused validity checks from crates/sim/src/cache/item.rs Addresses PR review feedback from prestwich --- src/tasks/cache/bundle.rs | 124 +++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 0a6bac89..b46db277 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,7 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::providers::Provider; +use futures_util::{TryStreamExt, StreamExt, stream}; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, @@ -8,19 +10,12 @@ use signet_tx_cache::{ TxCacheError, types::{BundleKey, CachedBundle}, }; -use alloy::{ - consensus::{Transaction, transaction::SignerRecoverable}, - eips::Decodable2718, - primitives::Bytes, - providers::Provider, - rlp::Buf, -}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, debug, debug_span, warn, trace, trace_span}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -97,14 +92,28 @@ impl BundlePoller { /// Spawns a tokio task to check the nonces of all host transactions in a bundle /// before sending it to the cache task via the outbound channel. /// - /// Bundles with stale host transaction nonces are dropped to prevent them from - /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements + /// (reusing the existing validity check pattern from `signet-sim`), then checks + /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early + /// on the first stale or failed nonce. + /// + /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + // Recover the bundle to get typed host tx requirements instead of + // manually decoding and recovering signers. + let recovered = match bundle.bundle.try_to_recovered() { + Ok(r) => r, + Err(e) => { + span_debug!(span, ?e, "Failed to recover bundle, dropping"); + return; + } + }; + // If no host transactions, forward directly - if bundle.bundle.host_txs.is_empty() { + if recovered.host_txs().is_empty() { if outbound.send(bundle).is_err() { span_debug!(span, "Outbound channel closed, stopping nonce check task"); } @@ -118,56 +127,50 @@ impl BundlePoller { return; }; - // Check each host transaction's nonce - for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { - let host_tx = match decode_tx(host_tx_bytes) { - Some(tx) => tx, - None => { - span_debug!( - span, - idx, - "Failed to decode host transaction, dropping bundle" - ); - return; - } - }; - - let sender = match host_tx.recover_signer() { - Ok(s) => s, - Err(_) => { - span_debug!( - span, - idx, - "Failed to recover sender from host tx, dropping bundle" - ); - return; + // Collect host tx requirements (signer + nonce) from the recovered bundle + let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); + + // Check all host tx nonces concurrently, cancelling on first failure. + let result = stream::iter(reqs) + .map(Ok) + .try_for_each_concurrent(None, |(idx, req)| { + let host_provider = &host_provider; + let span = &span; + async move { + let tx_count = host_provider + .get_transaction_count(req.signer) + .await + .map_err(|_| { + span_debug!( + span, + idx, + sender = %req.signer, + "Failed to fetch nonce for sender, dropping bundle" + ); + })?; + + if req.nonce < tx_count { + debug!( + parent: span, + sender = %req.signer, + tx_nonce = %req.nonce, + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return Err(()); + } + + Ok(()) } - }; - - let tx_count = match host_provider.get_transaction_count(sender).await { - Ok(count) => count, - Err(_) => { - span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); - return; - } - }; - - if host_tx.nonce() < tx_count { - debug!( - parent: &span, - %sender, - tx_nonce = %host_tx.nonce(), - host_nonce = %tx_count, - idx, - "Dropping bundle with stale host tx nonce" - ); - return; - } - } + }) + .await; // All host txs have valid nonces, forward the bundle - if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); + if result.is_ok() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } } }); } @@ -208,8 +211,3 @@ impl BundlePoller { (inbound, jh) } } - -/// Decodes a transaction from RLP-encoded bytes. -fn decode_tx(bytes: &Bytes) -> Option { - alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() -} From 945b00909b7f776d6f3a0a26c9cfdb5980e2f670 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:54:08 +0000 Subject: [PATCH 4/5] address review feedback --- src/tasks/cache/bundle.rs | 109 ++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 52 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index b46db277..19e21fea 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,7 +1,7 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; use alloy::providers::Provider; -use futures_util::{TryStreamExt, StreamExt, stream}; +use futures_util::future::try_join_all; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, @@ -10,12 +10,13 @@ use signet_tx_cache::{ TxCacheError, types::{BundleKey, CachedBundle}, }; +use std::collections::{BTreeMap, BTreeSet}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; +use tracing::{Instrument, debug_span, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -92,22 +93,19 @@ impl BundlePoller { /// Spawns a tokio task to check the nonces of all host transactions in a bundle /// before sending it to the cache task via the outbound channel. /// - /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements - /// (reusing the existing validity check pattern from `signet-sim`), then checks - /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early - /// on the first stale or failed nonce. - /// - /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered + /// Fetches on-chain nonces concurrently for each unique signer, then validates + /// sequentially with a local nonce cache — mirroring the SDK's + /// `check_bundle_tx_list` pattern. Drops bundles where any host tx has a stale + /// or future nonce. fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); tokio::spawn(async move { - let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); - // Recover the bundle to get typed host tx requirements instead of // manually decoding and recovering signers. let recovered = match bundle.bundle.try_to_recovered() { Ok(r) => r, - Err(e) => { - span_debug!(span, ?e, "Failed to recover bundle, dropping"); + Err(error) => { + span_debug!(span, ?error, "Failed to recover bundle, dropping"); return; } }; @@ -128,49 +126,56 @@ impl BundlePoller { }; // Collect host tx requirements (signer + nonce) from the recovered bundle - let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); - - // Check all host tx nonces concurrently, cancelling on first failure. - let result = stream::iter(reqs) - .map(Ok) - .try_for_each_concurrent(None, |(idx, req)| { - let host_provider = &host_provider; - let span = &span; - async move { - let tx_count = host_provider - .get_transaction_count(req.signer) - .await - .map_err(|_| { - span_debug!( - span, - idx, - sender = %req.signer, - "Failed to fetch nonce for sender, dropping bundle" - ); - })?; - - if req.nonce < tx_count { - debug!( - parent: span, - sender = %req.signer, - tx_nonce = %req.nonce, - host_nonce = %tx_count, - idx, - "Dropping bundle with stale host tx nonce" + let reqs: Vec<_> = recovered.host_tx_reqs().collect(); + + // Fetch on-chain nonces concurrently for each unique signer + let unique_signers: BTreeSet<_> = reqs.iter().map(|req| req.signer).collect(); + let nonce_fetches = unique_signers.into_iter().map(|signer| { + let host_provider = &host_provider; + let span = &span; + async move { + host_provider + .get_transaction_count(signer) + .await + .map(|nonce| (signer, nonce)) + .inspect_err(|error| { + span_debug!( + span, + ?error, + sender = %signer, + "Failed to fetch nonce for sender, dropping bundle" ); - return Err(()); - } - - Ok(()) - } - }) - .await; + }) + } + }); - // All host txs have valid nonces, forward the bundle - if result.is_ok() { - if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); + let Ok(fetched) = try_join_all(nonce_fetches).await else { + return; + }; + let mut nonce_cache: BTreeMap<_, _> = fetched.into_iter().collect(); + + // Validate sequentially, checking exact nonce match and incrementing for + // same-signer sequential txs (mirroring check_bundle_tx_list in signet-sim). + for (idx, req) in reqs.iter().enumerate() { + let expected = nonce_cache.get(&req.signer).copied().expect("nonce must be cached"); + + if req.nonce != expected { + span_debug!( + span, + sender = %req.signer, + tx_nonce = req.nonce, + expected_nonce = expected, + idx, + "Dropping bundle: host tx nonce mismatch" + ); + return; } + + nonce_cache.entry(req.signer).and_modify(|nonce| *nonce += 1); + } + + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); } }); } From cf2a29cdd63918d5f09b6452ecbb085497f4ce41 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Thu, 26 Mar 2026 12:06:04 +0000 Subject: [PATCH 5/5] use check_bundle_tx_list from sdk --- Cargo.lock | 69 ++++++++++++++------------------ Cargo.toml | 10 ++--- src/tasks/cache/bundle.rs | 83 +++++++++------------------------------ 3 files changed, 54 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3bb337c0..b26eba77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,9 +126,9 @@ dependencies = [ [[package]] name = "alloy-chains" -version = "0.2.32" +version = "0.2.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9247f0a399ef71aeb68f497b2b8fb348014f742b50d3b83b1e00dfe1b7d64b3d" +checksum = "f4e9e31d834fe25fe991b8884e4b9f0e59db4a97d86e05d1464d6899c013cd62" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -4951,14 +4951,15 @@ dependencies = [ [[package]] name = "ipconfig" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d72a21f6a71a6c4c3160e095e8925861f5119dd26ef71acee1b9146f74f76c8" +checksum = "4d40460c0ce33d6ce4b0630ad68ff63d6661961c48b6dba35e5a4d81cfb48222" dependencies = [ "socket2", "widestring", + "windows-registry", + "windows-result", "windows-sys 0.61.2", - "winreg", ] [[package]] @@ -5410,9 +5411,9 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" +checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08" dependencies = [ "bitflags 2.11.0", "libc", @@ -5941,9 +5942,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-integer" @@ -10888,9 +10889,9 @@ dependencies = [ [[package]] name = "signet-bundle" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9bd62b011781950089d31f742d90b3d62ba00f374db0dfe0d3ba1f1358b45c" +checksum = "68186af7d8d64cb0f797958f2dc7f804d43e4aee173a7f47da7306e334ff4944" dependencies = [ "alloy", "serde", @@ -10904,9 +10905,9 @@ dependencies = [ [[package]] name = "signet-constants" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9a4bb305179fc7461eb5cbee914344067ef03dccc267b20dfc1eaea9db1459" +checksum = "2a56e37afb65ccc49d4a3fa755d784541b3935ec6c2893a1a3ced9f70b33b19b" dependencies = [ "alloy", "serde", @@ -10940,9 +10941,9 @@ dependencies = [ [[package]] name = "signet-evm" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55caa1277f82e8854b9daf275d20deb8a78d57fa1babdcdeed8dd2976dfca18c" +checksum = "ae5ec198bdc5cefedd4f8032eb8364fda2dd4244046146cb98809e5efed4b570" dependencies = [ "alloy", "bitflags 2.11.0", @@ -10957,9 +10958,9 @@ dependencies = [ [[package]] name = "signet-extract" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a1e5c76e9c86d5e3cc8d321e41022cf2a6658e6346119fd4c29f894d8513a1c" +checksum = "ad44bbdefd2e8e48b3949ecd4b6ec30bf839a9c42d8f6577ce0a134834f0bdda" dependencies = [ "alloy", "signet-types", @@ -10982,9 +10983,9 @@ dependencies = [ [[package]] name = "signet-journal" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89519bce0c6d3143593cdab0501852741f66b641d048d65ec87cb095a803bd0a" +checksum = "230f5b7e72f1e09da3bc5800cb54b9020dd6c33a4918cf3ef8b5f87e0253b544" dependencies = [ "alloy", "futures-util", @@ -11010,9 +11011,9 @@ dependencies = [ [[package]] name = "signet-sim" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f6fe279a15c70233025257536f405c1f4c502525ef533c852f918933e0b5084" +checksum = "d086b820ddde81a34b6c3d9aa65fe0de5e4604d18f89feab75e4670dea4093cf" dependencies = [ "alloy", "lru", @@ -11029,9 +11030,9 @@ dependencies = [ [[package]] name = "signet-tx-cache" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95529a7f5a351bf5daee91a4c25a317fcb815b0fc4a26aa7099629d619eb51b" +checksum = "8f5735d9800f2e03b09ab38e7ca5934e8b83772d874bf6e0accecfe5fe00f48f" dependencies = [ "alloy", "futures-util", @@ -11048,9 +11049,9 @@ dependencies = [ [[package]] name = "signet-types" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10dff727bb17387f043999bca9dafef6c0ba8e10e8d7c5f5fd8e53b708ea959" +checksum = "d3fa53bcfd58d863ef7fe209138228ec3d9aa63e856e2fc70a7a3b03881fbc82" dependencies = [ "alloy", "chrono", @@ -11062,9 +11063,9 @@ dependencies = [ [[package]] name = "signet-zenith" -version = "0.16.0-rc.16" +version = "0.16.0-rc.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c96ff57c3b9d32a452d96cbf0bd852c987688721c84877ec6218e1ceed87ed" +checksum = "a7110722fbf7e006b48cd22db11338958af1afb0b5373027de8d3a4b6b1ff3e5" dependencies = [ "alloy", "alloy-core", @@ -12106,9 +12107,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.12.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +checksum = "da36089a805484bcccfffe0739803392c8298778a2d2f09febf76fac5ad9025b" [[package]] name = "unicode-truncate" @@ -12910,16 +12911,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winreg" -version = "0.55.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb5a765337c50e9ec252c2069be9bf91c7df47afb103b642ba3a53bf8101be97" -dependencies = [ - "cfg-if", - "windows-sys 0.59.0", -] - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index 01a41d8e..f6b98e9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,11 +20,11 @@ path = "bin/builder.rs" [dependencies] init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } -signet-constants = { version = "0.16.0-rc.16" } -signet-sim = { version = "0.16.0-rc.16" } -signet-tx-cache = { version = "0.16.0-rc.16" } -signet-types = { version = "0.16.0-rc.16" } -signet-zenith = { version = "0.16.0-rc.16" } +signet-constants = { version = "0.16.0-rc.17" } +signet-sim = { version = "0.16.0-rc.17" } +signet-tx-cache = { version = "0.16.0-rc.17" } +signet-types = { version = "0.16.0-rc.17" } +signet-zenith = { version = "0.16.0-rc.17" } signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 19e21fea..b5addda0 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,16 +1,14 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use alloy::providers::Provider; -use futures_util::future::try_join_all; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, }; +use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list}; use signet_tx_cache::{ TxCacheError, types::{BundleKey, CachedBundle}, }; -use std::collections::{BTreeMap, BTreeSet}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, @@ -90,30 +88,25 @@ impl BundlePoller { Ok(all_bundles) } - /// Spawns a tokio task to check the nonces of all host transactions in a bundle - /// before sending it to the cache task via the outbound channel. + /// Spawns a tokio task to check the validity of all host transactions in a + /// bundle before sending it to the cache task via the outbound channel. /// - /// Fetches on-chain nonces concurrently for each unique signer, then validates - /// sequentially with a local nonce cache — mirroring the SDK's - /// `check_bundle_tx_list` pattern. Drops bundles where any host tx has a stale - /// or future nonce. + /// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces + /// and balance against the host chain. Drops bundles that are not currently valid. fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); tokio::spawn(async move { - // Recover the bundle to get typed host tx requirements instead of - // manually decoding and recovering signers. let recovered = match bundle.bundle.try_to_recovered() { - Ok(r) => r, + Ok(recovered) => recovered, Err(error) => { span_debug!(span, ?error, "Failed to recover bundle, dropping"); return; } }; - // If no host transactions, forward directly if recovered.host_txs().is_empty() { if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); + span_debug!(span, "Outbound channel closed"); } return; } @@ -121,61 +114,23 @@ impl BundlePoller { let Ok(host_provider) = crate::config().connect_host_provider().instrument(span.clone()).await else { - span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + span_debug!(span, "Failed to connect to host provider, dropping bundle"); return; }; - // Collect host tx requirements (signer + nonce) from the recovered bundle - let reqs: Vec<_> = recovered.host_tx_reqs().collect(); - - // Fetch on-chain nonces concurrently for each unique signer - let unique_signers: BTreeSet<_> = reqs.iter().map(|req| req.signer).collect(); - let nonce_fetches = unique_signers.into_iter().map(|signer| { - let host_provider = &host_provider; - let span = &span; - async move { - host_provider - .get_transaction_count(signer) - .await - .map(|nonce| (signer, nonce)) - .inspect_err(|error| { - span_debug!( - span, - ?error, - sender = %signer, - "Failed to fetch nonce for sender, dropping bundle" - ); - }) + let source = ProviderStateSource(host_provider); + match check_bundle_tx_list(recovered.host_tx_reqs(), &source).await { + Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed"); + } } - }); - - let Ok(fetched) = try_join_all(nonce_fetches).await else { - return; - }; - let mut nonce_cache: BTreeMap<_, _> = fetched.into_iter().collect(); - - // Validate sequentially, checking exact nonce match and incrementing for - // same-signer sequential txs (mirroring check_bundle_tx_list in signet-sim). - for (idx, req) in reqs.iter().enumerate() { - let expected = nonce_cache.get(&req.signer).copied().expect("nonce must be cached"); - - if req.nonce != expected { - span_debug!( - span, - sender = %req.signer, - tx_nonce = req.nonce, - expected_nonce = expected, - idx, - "Dropping bundle: host tx nonce mismatch" - ); - return; + Ok(SimItemValidity::Never) => { + span_debug!(span, "Dropping bundle: host txs will never be valid"); + } + Err(error) => { + span_debug!(span, %error, "Failed to check bundle validity, dropping"); } - - nonce_cache.entry(req.signer).and_modify(|nonce| *nonce += 1); - } - - if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); } }); }