diff --git a/Cargo.lock b/Cargo.lock index f52111e4..ce422024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3701,6 +3701,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -4835,9 +4846,9 @@ dependencies = [ [[package]] name = "init4-bin-base" -version = "0.18.0-rc.13" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "054d4b927194b1e73c85945a93254b19772fee2c627b190c9ae7c72da0b40eaf" +checksum = "f94c64278d765503d752783c7a1f1052f0d41cff51cd1fd258958d94a9a03010" dependencies = [ "alloy", "async-trait", @@ -10888,9 +10899,9 @@ dependencies = [ [[package]] name = "signet-bundle" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9bd62b011781950089d31f742d90b3d62ba00f374db0dfe0d3ba1f1358b45c" +checksum = "9d4800df338d00c21ada5e79bb68c3a4dc092410339954a9fab5b40efefd4f38" dependencies = [ "alloy", "serde", @@ -10904,9 +10915,9 @@ dependencies = [ [[package]] name = "signet-constants" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9a4bb305179fc7461eb5cbee914344067ef03dccc267b20dfc1eaea9db1459" +checksum = "39cc8ae6e3190db31ba9b9124c82981228a60b7702694b6b01d165f93e75a6f3" dependencies = [ "alloy", "serde", @@ -10940,9 +10951,9 @@ dependencies = [ [[package]] name = "signet-evm" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55caa1277f82e8854b9daf275d20deb8a78d57fa1babdcdeed8dd2976dfca18c" +checksum = "8073dfb7077dee75e9a42cd0cc368530e211f749af7d43d55cfd73ddbf3e0d08" dependencies = [ "alloy", "bitflags 2.11.0", @@ -10957,9 +10968,9 @@ dependencies = [ [[package]] name = "signet-extract" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a1e5c76e9c86d5e3cc8d321e41022cf2a6658e6346119fd4c29f894d8513a1c" +checksum = "829cc061f00fd022db2a894d0eab480a9d9964fe0baa4a5fde644664b1155cfb" dependencies = [ "alloy", "signet-types", @@ -10982,9 +10993,9 @@ dependencies = [ [[package]] name = "signet-journal" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89519bce0c6d3143593cdab0501852741f66b641d048d65ec87cb095a803bd0a" +checksum = "01aff7a1ddad90c45fd01c803ad67df8a425000ea3ac3c3376aab456571c519d" dependencies = [ "alloy", "futures-util", @@ -11010,9 +11021,9 @@ dependencies = [ [[package]] name = "signet-sim" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f6fe279a15c70233025257536f405c1f4c502525ef533c852f918933e0b5084" +checksum = "12996e1a9fb1c9922bba5295fe8a864eab720bb07506b652c892d0ec137b04dc" dependencies = [ "alloy", "lru", @@ -11029,14 +11040,16 @@ dependencies = [ [[package]] name = "signet-tx-cache" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95529a7f5a351bf5daee91a4c25a317fcb815b0fc4a26aa7099629d619eb51b" +checksum = "d5e69dbf74a07824c91899c67c2ef51547095a310b44732b908899eb4b28c3e8" dependencies = [ "alloy", + "eventsource-stream", "futures-util", "reqwest", "serde", + "serde_json", "signet-bundle", "signet-constants", "signet-types", @@ -11048,9 +11061,9 @@ dependencies = [ [[package]] name = "signet-types" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10dff727bb17387f043999bca9dafef6c0ba8e10e8d7c5f5fd8e53b708ea959" +checksum = "81933b0d48c5e2f5d4b7a669b20cfa170535e25b854a28ce842f81e24d4e426b" dependencies = [ "alloy", "chrono", @@ -11062,9 +11075,9 @@ dependencies = [ [[package]] name = "signet-zenith" -version = "0.16.0-rc.16" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c96ff57c3b9d32a452d96cbf0bd852c987688721c84877ec6218e1ceed87ed" +checksum = "23da99308da387b30d16e3fd62d6aef5ead4d961a72b65962a30b0f9658cbba7" dependencies = [ "alloy", "alloy-core", diff --git a/Cargo.toml b/Cargo.toml index 408cedcf..ca6e9a57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,13 +18,13 @@ name = "zenith-builder-example" path = "bin/builder.rs" [dependencies] -init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } +init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] } -signet-constants = { version = "0.16.0-rc.16" } -signet-sim = { version = "0.16.0-rc.16" } -signet-tx-cache = { version = "0.16.0-rc.16" } -signet-types = { version = "0.16.0-rc.16" } -signet-zenith = { version = "0.16.0-rc.16" } +signet-constants = { version = "0.16.0" } +signet-sim = { version = "0.16.0" } +signet-tx-cache = { version = "0.16.0" } +signet-types = { version = "0.16.0" } +signet-zenith = { version = "0.16.0" } signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 81223a36..89698a4d 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -23,7 +23,7 @@ impl CacheTasks { /// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s. pub fn spawn(&self) -> CacheSystem { // Tx Poller pulls transactions from the cache - let tx_poller = TxPoller::new(); + let tx_poller = TxPoller::new(self.block_env.clone()); let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index def47090..2bf02458 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,60 +1,50 @@ //! Transaction service responsible for fetching and sending transactions to the simulator. -use crate::config::BuilderConfig; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::time::Duration; -use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace, trace_span}; +use std::{pin::Pin, time::Duration}; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, + time, +}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; -/// Poll interval for the transaction poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; /// Implements a poller for the block builder to pull transactions from the /// transaction pool. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, /// Client for the tx cache. tx_cache: TxCache, - /// Defines the interval at which the service should poll the cache. - poll_interval_ms: u64, -} - -impl Default for TxPoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool -/// and sends them into the provided channel sender. +/// [`TxPoller`] fetches transactions from the transaction pool on startup +/// and on each block environment change, and subscribes to an SSE stream +/// for real-time delivery of new transactions in between. impl TxPoller { - /// Returns a new [`TxPoller`] with the given config. - /// * Defaults to 1000ms poll interval (1s). - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } + const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); + const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); - /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`TxPoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) - } - - // Spawn a tokio task to check the nonce of a transaction before sending - // it to the cachetask via the outbound channel. + /// Spawn a tokio task to check the nonce of a transaction before sending + /// it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); @@ -96,46 +86,104 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions, paginating through all available pages. - pub async fn check_tx_cache(&self) -> Result, TxCacheError> { - self.tx_cache.stream_transactions().try_collect().await + /// Fetches all transactions from the cache, forwarding each to nonce + /// checking before it reaches the cache task. + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); + + counter!("signet.builder.cache.tx_poll_count").increment(1); + if let Ok(transactions) = self + .tx_cache + .stream_transactions() + .try_collect::>() + .inspect_err(|error| { + counter!("signet.builder.cache.tx_poll_errors").increment(1); + debug!(%error, "Error fetching transactions"); + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); + } + } } - async fn task_future(self, outbound: mpsc::UnboundedSender) { - loop { - let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); - - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; + /// Opens an SSE subscription to the transaction feed. Returns an empty + /// stream on connection failure so the caller can handle reconnection + /// uniformly. + async fn subscribe(&self) -> SseStream { + match self.tx_cache.subscribe_transactions().await { + Ok(stream) => { + debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"); + Box::pin(stream) } + Err(error) => { + warn!(%error, "Failed to open SSE transaction subscription"); + Box::pin(futures_util::stream::empty()) + } + } + } - counter!("signet.builder.cache.tx_poll_count").increment(1); - if let Ok(transactions) = self - .check_tx_cache() - .inspect_err(|error| { - counter!("signet.builder.cache.tx_poll_errors").increment(1); - debug!(%error, "Error fetching transactions"); - }) - .instrument(span.clone()) - .await - { - let _guard = span.entered(); - histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions.into_iter() { + /// Reconnects the SSE stream with backoff. Performs a full refetch to + /// cover any items missed while disconnected. + async fn reconnect( + &self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + time::sleep(*backoff).await; + *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); + self.full_fetch(outbound).await; + self.subscribe().await + } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + // Initial full fetch of all transactions currently in the cache. + self.full_fetch(&outbound).await; + + // Open the SSE stream for real-time delivery of new transactions. + let mut sse_stream = self.subscribe().await; + let mut backoff = Self::INITIAL_RECONNECT_BACKOFF; + + loop { + tokio::select! { + item = sse_stream.next() => { + let Some(result) = item else { + warn!("SSE transaction stream ended, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + let Ok(tx) = result else { + warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + backoff = Self::INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } self.spawn_check_nonce(tx, outbound.clone()); } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); + break; + } + trace!("Block env changed, refetching all transactions"); + self.full_fetch(&outbound).await; + } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to - /// its sender. + /// Spawns a task that fetches all current transactions, then subscribes + /// to the SSE feed for real-time updates, refetching on each new block + /// environment. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index bca37e3c..09730d76 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,9 +1,8 @@ use alloy::{primitives::U256, signers::local::PrivateKeySigner}; -use builder::{ - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, -}; +use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; use eyre::{Ok, Result}; +use futures_util::TryStreamExt; +use signet_tx_cache::TxCache; #[ignore = "integration test"] #[tokio::test] @@ -14,11 +13,9 @@ async fn test_tx_roundtrip() -> Result<()> { // Post a transaction to the cache post_tx().await?; - // Create a new poller - let poller = TxPoller::new(); - // Fetch transactions from the pool - let transactions = poller.check_tx_cache().await?; + let tx_cache = TxCache::new(builder::config().tx_pool_url.clone()); + let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?; // Ensure at least one transaction exists assert!(!transactions.is_empty());