Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ name = "zenith-builder-example"
path = "bin/builder.rs"

[dependencies]
init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] }
init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] }

signet-constants = { version = "0.16.0-rc.16" }
signet-sim = { version = "0.16.0-rc.16" }
signet-tx-cache = { version = "0.16.0-rc.16" }
signet-types = { version = "0.16.0-rc.16" }
signet-zenith = { version = "0.16.0-rc.16" }
signet-constants = { version = "0.16.0" }
signet-sim = { version = "0.16.0" }
signet-tx-cache = { version = "0.16.0" }
signet-types = { version = "0.16.0" }
signet-zenith = { version = "0.16.0" }
signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }
signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }

Expand Down
2 changes: 1 addition & 1 deletion src/tasks/cache/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl CacheTasks {
/// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s.
pub fn spawn(&self) -> CacheSystem {
// Tx Poller pulls transactions from the cache
let tx_poller = TxPoller::new();
let tx_poller = TxPoller::new(self.block_env.clone());
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
Expand Down
174 changes: 111 additions & 63 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,50 @@
//! Transaction service responsible for fetching and sending transactions to the simulator.
use crate::config::BuilderConfig;
use crate::{config::BuilderConfig, tasks::env::SimEnv};
use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use futures_util::{TryFutureExt, TryStreamExt};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use init4_bin_base::deps::metrics::{counter, histogram};
use signet_tx_cache::{TxCache, TxCacheError};
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, debug_span, trace, trace_span};
use std::{pin::Pin, time::Duration};
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
time,
};
use tracing::{Instrument, debug, debug_span, trace, trace_span, warn};

/// Poll interval for the transaction poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
type SseStream = Pin<Box<dyn Stream<Item = Result<TxEnvelope, TxCacheError>> + Send>>;

/// Implements a poller for the block builder to pull transactions from the
/// transaction pool.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct TxPoller {
/// Config values from the Builder.
config: &'static BuilderConfig,
/// Client for the tx cache.
tx_cache: TxCache,
/// Defines the interval at which the service should poll the cache.
poll_interval_ms: u64,
}

impl Default for TxPoller {
fn default() -> Self {
Self::new()
}
/// Receiver for block environment updates, used to trigger refetches.
envs: watch::Receiver<Option<SimEnv>>,
}

/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool
/// and sends them into the provided channel sender.
/// [`TxPoller`] fetches transactions from the transaction pool on startup
/// and on each block environment change, and subscribes to an SSE stream
/// for real-time delivery of new transactions in between.
impl TxPoller {
/// Returns a new [`TxPoller`] with the given config.
/// * Defaults to 1000ms poll interval (1s).
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}
const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1);
const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30);

/// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
/// Returns a new [`TxPoller`] with the given block environment receiver.
pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self {
let config = crate::config();
let tx_cache = TxCache::new(config.tx_pool_url.clone());
Self { config, tx_cache, poll_interval_ms }
Self { config, tx_cache, envs }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

// Spawn a tokio task to check the nonce of a transaction before sending
// it to the cachetask via the outbound channel.
/// Spawn a tokio task to check the nonce of a transaction before sending
/// it to the cachetask via the outbound channel.
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<ReceivedTx>) {
tokio::spawn(async move {
let span = debug_span!("check_nonce", tx_id = %tx.tx_hash());
Expand Down Expand Up @@ -96,46 +86,104 @@ impl TxPoller {
});
}

/// Polls the transaction cache for transactions, paginating through all available pages.
pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
self.tx_cache.stream_transactions().try_collect().await
/// Fetches all transactions from the cache, forwarding each to nonce
/// checking before it reaches the cache task.
async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) {
let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url);

counter!("signet.builder.cache.tx_poll_count").increment(1);
if let Ok(transactions) = self
.tx_cache
.stream_transactions()
.try_collect::<Vec<_>>()
.inspect_err(|error| {
counter!("signet.builder.cache.tx_poll_errors").increment(1);
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
{
let _guard = span.entered();
histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64);
trace!(count = transactions.len(), "found transactions");
for tx in transactions {
self.spawn_check_nonce(tx, outbound.clone());
}
}
}

async fn task_future(self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
loop {
let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url);

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
span.in_scope(|| trace!("No receivers left, shutting down"));
break;
/// Opens an SSE subscription to the transaction feed. Returns an empty
/// stream on connection failure so the caller can handle reconnection
/// uniformly.
async fn subscribe(&self) -> SseStream {
match self.tx_cache.subscribe_transactions().await {
Ok(stream) => {
debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established");
Box::pin(stream)
}
Err(error) => {
warn!(%error, "Failed to open SSE transaction subscription");
Box::pin(futures_util::stream::empty())
}
}
}

counter!("signet.builder.cache.tx_poll_count").increment(1);
if let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
counter!("signet.builder.cache.tx_poll_errors").increment(1);
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
{
let _guard = span.entered();
histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64);
trace!(count = transactions.len(), "found transactions");
for tx in transactions.into_iter() {
/// Reconnects the SSE stream with backoff. Performs a full refetch to
/// cover any items missed while disconnected.
async fn reconnect(
&self,
outbound: &mpsc::UnboundedSender<ReceivedTx>,
backoff: &mut Duration,
) -> SseStream {
time::sleep(*backoff).await;
*backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF);
self.full_fetch(outbound).await;
self.subscribe().await
}

async fn task_future(mut self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
// Initial full fetch of all transactions currently in the cache.
self.full_fetch(&outbound).await;

// Open the SSE stream for real-time delivery of new transactions.
let mut sse_stream = self.subscribe().await;
let mut backoff = Self::INITIAL_RECONNECT_BACKOFF;

loop {
tokio::select! {
item = sse_stream.next() => {
let Some(result) = item else {
warn!("SSE transaction stream ended, reconnecting");
sse_stream = self.reconnect(&outbound, &mut backoff).await;
continue;
};
let Ok(tx) = result else {
warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting");
sse_stream = self.reconnect(&outbound, &mut backoff).await;
continue;
};
backoff = Self::INITIAL_RECONNECT_BACKOFF;
if outbound.is_closed() {
trace!("No receivers left, shutting down");
break;
}
self.spawn_check_nonce(tx, outbound.clone());
}
res = self.envs.changed() => {
if res.is_err() {
debug!("Block env channel closed, shutting down");
break;
}
trace!("Block env changed, refetching all transactions");
self.full_fetch(&outbound).await;
}
}

time::sleep(self.poll_duration()).await;
}
}

/// Spawns a task that continuously polls the cache for transactions and sends any it finds to
/// its sender.
/// Spawns a task that fetches all current transactions, then subscribes
/// to the SSE feed for real-time updates, refetching on each new block
/// environment.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<ReceivedTx>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));
Expand Down
Loading