Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 30 additions & 39 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ path = "bin/builder.rs"
[dependencies]
init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] }

signet-constants = { version = "0.16.0-rc.16" }
signet-sim = { version = "0.16.0-rc.16" }
signet-tx-cache = { version = "0.16.0-rc.16" }
signet-types = { version = "0.16.0-rc.16" }
signet-zenith = { version = "0.16.0-rc.16" }
signet-constants = { version = "0.16.0-rc.17" }
signet-sim = { version = "0.16.0-rc.17" }
signet-tx-cache = { version = "0.16.0-rc.17" }
signet-types = { version = "0.16.0-rc.17" }
signet-zenith = { version = "0.16.0-rc.17" }
signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }
signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }

Expand Down
55 changes: 50 additions & 5 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use init4_bin_base::{
deps::metrics::{counter, histogram},
perms::tx_cache::{BuilderTxCache, BuilderTxCacheError},
};
use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list};
use signet_tx_cache::{
TxCacheError,
types::{BundleKey, CachedBundle},
Expand All @@ -13,7 +14,7 @@ use tokio::{
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, trace, trace_span, warn};
use tracing::{Instrument, debug_span, trace, trace_span, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -87,6 +88,53 @@ impl BundlePoller {
Ok(all_bundles)
}

/// Spawns a tokio task to check the validity of all host transactions in a
/// bundle before sending it to the cache task via the outbound channel.
///
/// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces
/// and balance against the host chain. Drops bundles that are not currently valid.
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should re-use existing validity checks from crates/sim/src/cache/item.rs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation now mirrors the check_bundle_tx_list pattern from signet-sim:

  • Uses recovered.host_tx_reqs() to extract signer/nonce requirements (same as the SDK)
  • Maintains a BTreeMap nonce cache
  • Validates sequentially with exact nonce matching
  • Increments cached nonces for same-signer sequential txs

The only difference is that we pre-fetch all on-chain nonces concurrently (since we're calling an async provider over the network), then run the validation loop — which is the appropriate adaptation for this context.

let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);
tokio::spawn(async move {
let recovered = match bundle.bundle.try_to_recovered() {
Ok(recovered) => recovered,
Err(error) => {
span_debug!(span, ?error, "Failed to recover bundle, dropping");
return;
}
};

if recovered.host_txs().is_empty() {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed");
}
return;
}

let Ok(host_provider) =
crate::config().connect_host_provider().instrument(span.clone()).await
else {
span_debug!(span, "Failed to connect to host provider, dropping bundle");
return;
};

let source = ProviderStateSource(host_provider);
match check_bundle_tx_list(recovered.host_tx_reqs(), &source).await {
Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed");
}
}
Ok(SimItemValidity::Never) => {
span_debug!(span, "Dropping bundle: host txs will never be valid");
}
Err(error) => {
span_debug!(span, %error, "Failed to check bundle validity, dropping");
}
}
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
Expand All @@ -106,10 +154,7 @@ impl BundlePoller {
counter!("signet.builder.cache.bundle_poll_count").increment(1);
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
}
}

Expand Down