From 90763f40a28a9969bd1b84439f7fe44827b1a5a5 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 1 Apr 2026 00:48:48 -0600 Subject: [PATCH] feat: fan out bundle submission to multiple MEV relay endpoints Replaces the current single FLASHBOTS_ENDPOINT with SUBMIT_ENDPOINTS, a comma-separated list of MEV relay/builder RPC URLs. The submit task sends each prepared bundle to all configured endpoints concurrently via join_all with a slot-defined deadline, so that a single slow or unresponsive reply doesn't delay the bundle broadcast. --- src/config.rs | 54 ++++++-- src/tasks/submit/flashbots.rs | 223 +++++++++++++++++++++------------- src/test_utils.rs | 2 +- 3 files changed, 181 insertions(+), 98 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8d338f2f..5faf5b45 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,8 +53,8 @@ pub type HostProvider = FillProvider< RootProvider, >; -/// The provider type used to submit bundles to a Flashbots relay. -pub type FlashbotsProvider = FillProvider< +/// The provider type used to submit bundles to an MEV relay. +pub type RelayProvider = FillProvider< JoinFill< JoinFill< Identity, @@ -91,14 +91,18 @@ pub struct BuilderConfig { #[from_env(var = "TX_POOL_URL", desc = "URL of the tx pool to poll for incoming transactions")] pub tx_pool_url: url::Url, - /// Configuration for the Flashbots provider to submit - /// SignetBundles and Rollup blocks to the Host chain - /// as private MEV bundles via Flashbots. + /// Comma-separated list of MEV relay/builder endpoints for bundle + /// submission. The builder fans out each bundle to all endpoints + /// concurrently for maximum inclusion probability. + /// + /// At least one endpoint must be configured. Parsed from raw strings + /// into [`url::Url`] during [`sanitize`](Self::sanitize). #[from_env( - var = "FLASHBOTS_ENDPOINT", - desc = "Flashbots endpoint for privately submitting Signet bundles" + var = "SUBMIT_ENDPOINTS", + desc = "Comma-separated list of MEV relay/builder RPC endpoints for bundle submission (at least one required)", + infallible )] - pub flashbots_endpoint: url::Url, + pub submit_endpoints: Vec, /// URL for remote Quincey Sequencer server to sign blocks. /// NB: Disregarded if a sequencer_signer is configured. @@ -220,6 +224,19 @@ impl BuilderConfig { if !self.tx_pool_url.path().ends_with('/') { self.tx_pool_url.set_path(&format!("{}/", self.tx_pool_url.path())); } + + assert!( + !self.submit_endpoints.is_empty(), + "SUBMIT_ENDPOINTS must contain at least one URL" + ); + + // Validate that every submit endpoint is a parseable URL. Fail fast + // on startup rather than at first block submission. + for raw in &self.submit_endpoints { + url::Url::parse(raw) + .unwrap_or_else(|e| panic!("invalid URL in SUBMIT_ENDPOINTS \"{raw}\": {e}")); + } + self } @@ -274,11 +291,22 @@ impl BuilderConfig { .connect_provider(provider?)) } - /// Connect to a Flashbots bundle provider. - pub async fn connect_flashbots(&self) -> Result { - self.connect_builder_signer().await.map(|signer| { - ProviderBuilder::new().wallet(signer).connect_http(self.flashbots_endpoint.clone()) - }) + /// Connect to all configured MEV relay/builder endpoints. + /// + /// Returns one [`RelayProvider`] per URL in `submit_endpoints`. + /// URLs were already validated during [`sanitize`](Self::sanitize). + pub async fn connect_relays(&self) -> Result> { + let signer = self.connect_builder_signer().await?; + Ok(self + .submit_endpoints + .iter() + .map(|raw| { + let url: url::Url = raw.parse().expect("validated in sanitize"); + let provider = + ProviderBuilder::new().wallet(signer.clone()).connect_http(url.clone()); + (url, provider) + }) + .collect()) } /// Connect to the Zenith instance, using the specified provider. diff --git a/src/tasks/submit/flashbots.rs b/src/tasks/submit/flashbots.rs index d9a5d1bd..de52af43 100644 --- a/src/tasks/submit/flashbots.rs +++ b/src/tasks/submit/flashbots.rs @@ -1,7 +1,24 @@ -//! Flashbots Task receives simulated blocks from an upstream channel and -//! submits them to the Flashbots relay as bundles. +//! Submit Task receives simulated blocks from an upstream channel and +//! submits them to configured MEV relay/builder endpoints as bundles. +//! +//! # Metrics +//! +//! | Name | Type | Description | +//! |------|------|-------------| +//! | `signet.builder.submit.transactions_prepared` | counter | Signed rollup block transactions ready for submission | +//! | `signet.builder.submit.empty_blocks` | counter | Empty blocks skipped | +//! | `signet.builder.submit.bundle_prep_failures` | counter | Bundle preparation errors | +//! | `signet.builder.submit.relay_submissions` | counter | Per-relay submission attempts | +//! | `signet.builder.submit.relay_successes` | counter | Per-relay successful submissions | +//! | `signet.builder.submit.relay_failures` | counter | Per-relay failed submissions | +//! | `signet.builder.submit.all_relays_failed` | counter | No relay accepted the bundle | +//! | `signet.builder.submit.bundles_submitted` | counter | At least one relay accepted | +//! | `signet.builder.submit.deadline_met` | counter | Bundle submitted within slot deadline | +//! | `signet.builder.submit.deadline_missed` | counter | Bundle submitted after slot deadline | +//! | `signet.builder.pylon.submission_failures` | counter | Pylon sidecar submission errors | +//! | `signet.builder.pylon.sidecars_submitted` | counter | Successful Pylon sidecar submissions | use crate::{ - config::{BuilderConfig, FlashbotsProvider, HostProvider, PylonClient, ZenithInstance}, + config::{BuilderConfig, HostProvider, PylonClient, RelayProvider, ZenithInstance}, quincey::Quincey, tasks::{block::sim::SimResult, submit::SubmitPrep}, }; @@ -10,12 +27,18 @@ use alloy::{ rpc::types::mev::EthSendBundle, }; use init4_bin_base::{deps::metrics::counter, utils::signer::LocalOrAws}; -use std::time::{Duration, Instant}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{Instrument, debug, debug_span, error, info, instrument, warn}; -/// Handles preparation and submission of simulated rollup blocks to the -/// Flashbots relay as MEV bundles. +/// Handles preparation and submission of simulated rollup blocks to +/// configured MEV relay/builder endpoints as bundles. +/// +/// Fans out each prepared bundle to all relays concurrently and submits +/// the blob sidecar to Pylon regardless of relay outcome. #[derive(Debug)] pub struct FlashbotsTask { /// Builder configuration for the task. @@ -24,9 +47,10 @@ pub struct FlashbotsTask { quincey: Quincey, /// Zenith instance. zenith: ZenithInstance, - /// Provides access to a Flashbots-compatible bundle API. - flashbots: FlashbotsProvider, - /// The key used to sign requests to the Flashbots relay. + /// MEV relay/builder providers for bundle fan-out. Wrapped in [`Arc`] + /// for cheap cloning into spawned submission tasks. + relays: Arc>, + /// The key used to sign requests to MEV relays. signer: LocalOrAws, /// Channel for sending hashes of outbound transactions. outbound: mpsc::UnboundedSender, @@ -36,21 +60,31 @@ pub struct FlashbotsTask { impl FlashbotsTask { /// Returns a new `FlashbotsTask` instance that receives `SimResult` types from the given - /// channel and handles their preparation, submission to the Flashbots network. + /// channel and handles their preparation and submission to MEV relay/builder endpoints. pub async fn new(outbound: mpsc::UnboundedSender) -> eyre::Result { let config = crate::config(); - let (quincey, host_provider, flashbots, builder_key) = tokio::try_join!( + let (quincey, host_provider, relays, builder_key) = tokio::try_join!( config.connect_quincey(), config.connect_host_provider(), - config.connect_flashbots(), + config.connect_relays(), config.connect_builder_signer() )?; + info!(n_relays = relays.len(), "connected to MEV relay/builder endpoints"); + let zenith = config.connect_zenith(host_provider); let pylon = config.connect_pylon(); - Ok(Self { config, quincey, zenith, flashbots, signer: builder_key, outbound, pylon }) + Ok(Self { + config, + quincey, + zenith, + relays: Arc::new(relays), + signer: builder_key, + outbound, + pylon, + }) } /// Prepares a MEV bundle from a simulation result. @@ -119,109 +153,144 @@ impl FlashbotsTask { } /// Tracks the outbound transaction hash and increments submission metrics. - /// - /// Sends the transaction hash to the outbound channel for monitoring. - /// Logs a debug message if the channel is closed. fn track_outbound_tx(&self, envelope: &TxEnvelope) { - counter!("signet.builder.flashbots.").increment(1); + counter!("signet.builder.submit.transactions_prepared").increment(1); let hash = *envelope.tx_hash(); if self.outbound.send(hash).is_err() { debug!("outbound channel closed, could not track tx hash"); } } - /// Main task loop that processes simulation results and submits bundles to Flashbots. + /// Main task loop that processes simulation results and submits bundles + /// to all configured MEV relay/builder endpoints. /// - /// Receives `SimResult`s from the inbound channel, prepares MEV bundles, and submits - /// them to the Flashbots relay. Skips empty blocks and continues processing on errors. + /// For each `SimResult`: + /// 1. Prepares the MEV bundle (once per block). + /// 2. Fans out `send_bundle` to all relays concurrently with a + /// deadline timeout. + /// 3. Submits the blob sidecar to Pylon unconditionally — even if all + /// relays fail or the deadline fires — so the sidecar is always + /// available for the host chain. async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { - debug!("starting flashbots task"); + debug!("starting submit task with {} relay(s)", self.relays.len()); loop { - // Wait for a sim result to come in let Some(sim_result) = inbound.recv().await else { - debug!("upstream task gone - exiting flashbots task"); + debug!("upstream task gone - exiting submit task"); break; }; let span = sim_result.clone_span(); - - // Calculate the submission deadline for this block let deadline = self.calculate_submit_deadline(); - // Don't submit empty blocks if sim_result.block.is_empty() { - counter!("signet.builder.flashbots.empty_block").increment(1); + counter!("signet.builder.submit.empty_blocks").increment(1); span_debug!(span, "received empty block - skipping"); continue; } - span_debug!(span, "flashbots task received block"); + span_debug!(span, "submit task received block"); - // Prepare a MEV bundle with the configured call type from the sim result let result = self.prepare(&sim_result).instrument(span.clone()).await; let bundle = match result { Ok(bundle) => bundle, Err(error) => { - counter!("signet.builder.flashbots.bundle_prep_failures").increment(1); + counter!("signet.builder.submit.bundle_prep_failures").increment(1); span_debug!(span, %error, "bundle preparation failed"); continue; } }; - // Due to the way the bundle is built, the block transaction is the last transaction in the bundle, and will always exist. - // We'll use this to forward the tx to pylon, which will preload the sidecar. + // The block transaction is always last in the bundle. let block_tx = bundle.txs.last().unwrap().clone(); - // Make a child span to cover submission, or use the current span - // if debug is not enabled. let _guard = span.enter(); - let submit_span = debug_span!("flashbots.submit",).or_current(); + let submit_span = debug_span!("submit.fan_out",).or_current(); - // Send the bundle to Flashbots, instrumenting the send future so - // all events inside the async send are attributed to the submit - // span. If Flashbots accepts it, submit the envelope to Pylon. - let flashbots = self.flashbots().to_owned(); + let relays = Arc::clone(&self.relays); let signer = self.signer.clone(); let pylon = self.pylon.clone(); tokio::spawn( async move { - let resp = match flashbots - .send_bundle(bundle) - .with_auth(signer.clone()) - .into_future() + let n_relays = relays.len(); + + // Build one future per relay + let futs: Vec<_> = relays + .iter() + .map(|(url, provider)| { + let bundle = bundle.clone(); + let signer = signer.clone(); + let url = url.clone(); + async move { + let result = provider + .send_bundle(bundle) + .with_auth(signer) + .into_future() + .await; + (url, result) + } + }) + .collect(); + + // Apply deadline timeout to the fan-out + let deadline_dur = deadline + .saturating_duration_since(Instant::now()) + .max(Duration::from_secs(1)); + + let (mut successes, mut failures) = (0u32, 0u32); + + match tokio::time::timeout(deadline_dur, futures_util::future::join_all(futs)) .await { - Ok(resp) => resp, - Err(err) => { - counter!("signet.builder.flashbots.submission_failures").increment(1); - if Instant::now() > deadline { - counter!("signet.builder.flashbots.deadline_missed").increment(1); - error!(%err, "MEV bundle submission failed AFTER deadline - error returned"); - } else { - error!(%err, "MEV bundle submission failed - error returned"); + Ok(relay_results) => { + for (url, result) in &relay_results { + let host = url.host_str().unwrap_or("unknown"); + counter!("signet.builder.submit.relay_submissions").increment(1); + match result { + Ok(_) => { + counter!("signet.builder.submit.relay_successes") + .increment(1); + debug!(relay = host, "bundle accepted"); + successes += 1; + } + Err(err) => { + counter!("signet.builder.submit.relay_failures") + .increment(1); + warn!(relay = host, %err, "bundle rejected"); + failures += 1; + } + } } - return; } - }; - - // Check if we met the submission deadline - counter!("signet.builder.flashbots.bundles_submitted").increment(1); - if Instant::now() > deadline { - counter!("signet.builder.flashbots.deadline_missed").increment(1); - warn!( - ?resp, - "Submitted MEV bundle to Flashbots AFTER deadline - submission may be too late" - ); - return; + Err(_) => { + counter!("signet.builder.submit.deadline_missed").increment(1); + warn!("relay fan-out timed out - some relays may not have responded"); + } } - counter!("signet.builder.flashbots.deadline_met").increment(1); - info!( - hash = resp.as_ref().map(|r| r.bundle_hash.to_string()), - "Submitted MEV bundle to Flashbots within deadline" - ); + if successes == 0 { + counter!("signet.builder.submit.all_relays_failed").increment(1); + error!( + failures, + n_relays, "all relay submissions failed - bundle may not land" + ); + } else { + counter!("signet.builder.submit.bundles_submitted").increment(1); + if Instant::now() > deadline { + counter!("signet.builder.submit.deadline_missed").increment(1); + warn!(successes, failures, "bundle submitted to relays AFTER deadline"); + } else { + counter!("signet.builder.submit.deadline_met").increment(1); + info!( + successes, + failures, n_relays, "bundle submitted to relays within deadline" + ); + } + } + // Always submit sidecar to Pylon, regardless of relay + // outcome. The sidecar must be available on the host chain + // even if relay submission failed or timed out. if let Err(err) = pylon.post_blob_tx(block_tx).await { counter!("signet.builder.pylon.submission_failures").increment(1); warn!(%err, "pylon submission failed"); @@ -241,25 +310,16 @@ impl FlashbotsTask { /// The deadline is calculated as the time remaining in the current slot, /// minus the configured submit deadline buffer. Submissions completing /// after this deadline will be logged as warnings. - /// - /// # Returns - /// - /// An `Instant` representing the submission deadline. fn calculate_submit_deadline(&self) -> Instant { let slot_calculator = &self.config.slot_calculator; - - // Get the current number of milliseconds into the slot. let timepoint_ms = slot_calculator.current_point_within_slot_ms().expect("host chain has started"); - let slot_duration = slot_calculator.slot_duration() * 1000; // convert to milliseconds + let slot_duration = slot_calculator.slot_duration() * 1000; let submit_buffer = self.config.submit_deadline_buffer.into_inner(); - // To find the remaining slot time, subtract the timepoint from the slot duration. - // Then subtract the submit deadline buffer to give us margin before slot ends. let remaining = slot_duration.saturating_sub(timepoint_ms).saturating_sub(submit_buffer); - // The deadline is calculated by adding the remaining time to the current instant. let deadline = Instant::now() + Duration::from_millis(remaining); deadline.max(Instant::now()) } @@ -269,12 +329,7 @@ impl FlashbotsTask { self.zenith.provider().clone() } - /// Returns a reference to the Flashbots provider. - const fn flashbots(&self) -> &FlashbotsProvider { - &self.flashbots - } - - /// Spawns the Flashbots task in a new Tokio task. + /// Spawns the submit task in a new Tokio task. /// /// Returns a channel sender for submitting `SimResult`s and a join handle for the task. pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { diff --git a/src/test_utils.rs b/src/test_utils.rs index 733d6e32..1a2c9e53 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -37,7 +37,7 @@ pub fn setup_test_config() -> &'static BuilderConfig { .unwrap() .try_into() .unwrap(), - flashbots_endpoint: "https://relay-sepolia.flashbots.net:443".parse().unwrap(), + submit_endpoints: vec!["https://relay-sepolia.flashbots.net:443".to_string()], quincey_url: "http://localhost:8080".into(), sequencer_key: None, builder_key: env::var("SEPOLIA_ETH_PRIV_KEY")