diff --git a/Cargo.toml b/Cargo.toml index 82b5583..8d3a006 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "init4-bin-base" description = "Internal utilities for binaries produced by the init4 team" keywords = ["init4", "bin", "base"] -version = "0.18.0-rc.13" +version = "0.18.0" edition = "2021" rust-version = "1.85" authors = ["init4", "James Prestwich", "evalir"] @@ -15,8 +15,8 @@ repository = "https://github.com/init4tech/bin-base" init4-from-env-derive = { version = "0.2.0", path = "from-env-derive" } # Signet -signet-constants = { version = "0.16.0-rc.11" } -signet-tx-cache = { version = "0.16.0-rc.11", optional = true } +signet-constants = { version = "0.16.0" } +signet-tx-cache = { version = "0.16.0", optional = true } # alloy alloy = { version = "1.0.35", optional = true, default-features = false, features = ["std", "signer-local", "consensus", "network"] } @@ -76,6 +76,7 @@ default = ["alloy", "rustls"] alloy = ["dep:alloy"] aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"] perms = ["dep:eyre", "dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache", "dep:futures-util"] +sse = ["perms", "signet-tx-cache/sse"] pylon = ["perms", "alloy/kzg"] block_watcher = ["dep:tokio"] rustls = ["dep:rustls", "rustls/aws-lc-rs"] diff --git a/src/perms/tx_cache.rs b/src/perms/tx_cache.rs index 0b57db1..b398f97 100644 --- a/src/perms/tx_cache.rs +++ b/src/perms/tx_cache.rs @@ -39,6 +39,8 @@ impl From for BuilderTxCacheError { } const BUNDLES: &str = "bundles"; +#[cfg(feature = "sse")] +const BUNDLES_FEED: &str = "bundles/feed"; /// A client for interacting with the transaction cache, a thin wrapper around /// the [`TxCache`] and [`SharedToken`] that implements the necessary methods @@ -188,3 +190,24 @@ impl BuilderTxCache { .map_err(Into::into) } } + +#[cfg(feature = "sse")] +impl BuilderTxCache { + /// Subscribe to real-time bundle events via SSE. + /// + /// Connects to the `/bundles/feed` endpoint with bearer auth and + /// returns a [`Stream`] that yields each [`CachedBundle`] as it + /// arrives. The stream terminates on the first error, which is + /// yielded as the final item. + #[instrument(skip_all)] + pub async fn subscribe_bundles( + &self, + ) -> Result> + Send> { + let secret = self.token.secret().await?; + let stream = self + .tx_cache + .subscribe_inner::(BUNDLES_FEED, Some(&secret)) + .await?; + Ok(stream.map(|r| r.map_err(Into::into))) + } +}