diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..6d39d91 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,45 @@ +on: + push: + branches: + - master + pull_request: + branches: + - master + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + runs-on: ubuntu-24.04 + steps: + - uses: actions/checkout@v6 + + - name: Install Rust + uses: actions-rust-lang/setup-rust-toolchain@v1 + + - name: Install build requirements + run: | + sudo apt-get update + sudo apt-get install -y \ + libcurl4-openssl-dev \ + gnupg \ + libudev-dev \ + libsasl2-dev \ + libssl-dev \ + libzstd-dev + sudo apt-get satisfy -f -y "protobuf-compiler (>=3.15)" + + - uses: Swatinem/rust-cache@v2 + + - name: cargo fmt + run: cargo fmt --all -- --check + + - name: cargo clippy + run: cargo clippy --workspace --all-targets --no-deps -- -D warnings + + - name: cargo build + run: cargo build --workspace + + - name: cargo test + run: cargo test --workspace -- --test-threads=16 diff --git a/Makefile b/Makefile index 8106757..9911f0d 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ CLIENT_REST ?= http://127.0.0.1:3030 help: @echo "Available targets:" @echo " build - Build all Rust workspace packages" - @echo " check - Build the whole workspace" + @echo " check - Run workspace fmt, clippy, build, and test" @echo " kafka-up - Start the Kafka/ksqlDB stack" @echo " kafka-down - Stop and remove the Kafka/ksqlDB stack" @echo " kafka-ready - Start the stack and initialize stream/table/schema" @@ -41,7 +41,10 @@ build: cargo build -p solana-accountsdb-plugin-kafka check: + cargo fmt --all -- --check + cargo clippy --workspace --all-targets --no-deps -- -D warnings cargo build --workspace + cargo test --workspace -- --test-threads=16 kafka-up: $(MAKE) -C kafka-setup up diff --git a/README.md b/README.md index df6a397..90f8067 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,18 @@ This repo contains the MagicBlock account update pipeline: - `kafka-setup/`: minimal Kafka/ksqlDB local environment - `Makefile`: top-level operator entrypoint +## CI Contract + +The repository-level CI in `.github/workflows/test.yml` runs the +same workspace checks on every push and pull request: + +```sh +cargo fmt --all -- --check +cargo clippy --workspace --all-targets --no-deps -- -D warnings +cargo build --workspace +cargo test --workspace -- --test-threads=16 +``` + ## Common Root Workflows - `make build` diff --git a/event-proto/build.rs b/event-proto/build.rs index 456c41d..a2c3106 100644 --- a/event-proto/build.rs +++ b/event-proto/build.rs @@ -3,7 +3,9 @@ fn main() { println!("cargo:rerun-if-changed=proto/event.proto"); let mut config = prost_build::Config::new(); - config.boxed(".blockdaemon.solana.accountsdb_plugin_kafka.types.MessageWrapper"); + config.boxed( + ".blockdaemon.solana.accountsdb_plugin_kafka.types.MessageWrapper", + ); config.protoc_arg("--experimental_allow_proto3_optional"); config .compile_protos(&["proto/event.proto"], &["proto/"]) diff --git a/geyser-plugin/.github/dependabot.yml b/geyser-plugin/.github/dependabot.yml deleted file mode 100644 index 334bc82..0000000 --- a/geyser-plugin/.github/dependabot.yml +++ /dev/null @@ -1,66 +0,0 @@ -version: 2 -updates: - - package-ecosystem: "github-actions" - directory: "/" - schedule: - interval: "daily" - - - package-ecosystem: "github-actions" - directory: "/" - schedule: - interval: "daily" - target-branch: "v3.1" - - # Monitor main branch for development dependencies (allows 3.x upgrades) - - package-ecosystem: "cargo" - directory: "/" - schedule: - interval: "daily" - # Main branch can accept all updates including major versions - groups: - # Agave + Solana stack in one PR - agave-solana-stack: - patterns: - - "agave-*" - - "solana-transaction-*" - other-updates: - exclude-patterns: - - "agave-*" - - "solana-transaction-*" - - # Monitor v3.1 branch for production dependencies (restricts to 3.x) - - package-ecosystem: "cargo" - directory: "/" - schedule: - interval: "daily" - target-branch: "v3.1" - # Allow updates within 3.x range, prevent 4.x upgrades - allow: - # Allow updates within major version 3.x for Agave ecosystem - - dependency-name: "agave-geyser-plugin-interface" - # Allow updates within major version 3.x for Solana ecosystem - - dependency-name: "solana-*" - # Allow all other dependency updates - - dependency-name: "*" - # Prevent major version upgrades that could break compatibility - ignore: - # Block Agave 4.x upgrades (would break ABI compatibility) - - dependency-name: "agave-geyser-plugin-interface" - versions: [">=4.0.0"] - # Block Solana 4.x upgrades (would break compatibility) - - dependency-name: "solana-*" - versions: [">=4.0.0"] - # Pinned for Rust 1.86 on v3.1; vergen 9.1.0+ and time 0.3.45+ require rustc 1.88 - #- dependency-name: "vergen" - # versions: [">=9.0.7"] - #- dependency-name: "time" - # versions: [">=0.3.45"] - groups: - agave-solana-stack: - patterns: - - "agave-*" - - "solana-transaction-*" - other-updates: - exclude-patterns: - - "agave-*" - - "solana-transaction-*" diff --git a/geyser-plugin/.github/workflows/release.yml b/geyser-plugin/.github/workflows/release.yml deleted file mode 100644 index 9f50ae1..0000000 --- a/geyser-plugin/.github/workflows/release.yml +++ /dev/null @@ -1,62 +0,0 @@ -# yaml-language-server: $schema=https://json.schemastore.org/github-workflow.json -on: - push: - tags: - - 'v*' - pull_request: - paths: - - '.github/workflows/release.yml' - -env: - CARGO_TERM_COLOR: always - -jobs: - release: - runs-on: ubuntu-24.04 - steps: - - uses: actions/checkout@v6 - - - name: Install Rust - id: rust - uses: actions-rust-lang/setup-rust-toolchain@v1 - - - name: Capture Rust version - id: rust-version - run: echo "version=$(rustc --version | awk '{print $2}')" >> "$GITHUB_OUTPUT" - - - name: Install build requirements - if: runner.os == 'Linux' - run: | - sudo apt-get update - sudo apt-get install -y \ - gnupg \ - libudev-dev \ - libsasl2-dev \ - libssl-dev \ - libzstd-dev - sudo apt-get satisfy -f -y "protobuf-compiler (>=3.15)" - - - name: Check Solana version - id: solana-version - run: | - echo "CI_TAG=${GITHUB_REF#refs/*/}" >> "$GITHUB_ENV" - echo "CI_OS_NAME=linux" >> "$GITHUB_ENV" - - SOLANA_VERSION="$(./ci/solana-version.sh)" - SOLANA_VERSION="v${SOLANA_VERSION#=}" - echo "solana_version=$SOLANA_VERSION" >> "$GITHUB_OUTPUT" - echo "SOLANA_VERSION=$SOLANA_VERSION" >> "$GITHUB_ENV" - - - name: Build release tarball - run: ./ci/create-tarball.sh - - - name: Release - uses: softprops/action-gh-release@v2 - if: startsWith(github.ref, 'refs/tags/') - with: - body: | - solana-accountsdb-plugin-kafka ${{ github.ref_name }} - solana ${{ steps.solana-version.outputs.solana_version }} - rust ${{ steps.rust-version.outputs.version }} - files: | - solana-accountsdb-plugin-kafka-release* diff --git a/geyser-plugin/.github/workflows/test.yml b/geyser-plugin/.github/workflows/test.yml deleted file mode 100644 index 67811c5..0000000 --- a/geyser-plugin/.github/workflows/test.yml +++ /dev/null @@ -1,55 +0,0 @@ -# Source: -# https://github.com/solana-labs/solana-accountsdb-plugin-postgres/blob/master/.github/workflows/test.yml - -on: - push: - pull_request: - -env: - CARGO_TERM_COLOR: always - -jobs: - test: - runs-on: ubuntu-24.04 - steps: - - uses: actions/checkout@v6 - - - name: Install Rust - uses: actions-rust-lang/setup-rust-toolchain@v1 - - - name: Install build requirements - if: runner.os == 'Linux' - run: | - sudo apt-get update - sudo apt-get install -y \ - gnupg \ - libudev-dev \ - libsasl2-dev \ - libssl-dev \ - libzstd-dev - sudo apt-get satisfy -f -y "protobuf-compiler (>=3.15)" - - - name: Install cargo-cache - # Run without --locked to avoid yanked transitive deps (e.g. crossbeam-channel 0.5.6) - run: cargo install cargo-cache - - - uses: actions/cache@v5 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cargo-build- - - - name: Clean cargo cache - run: cargo cache --autoclean - - - name: cargo fmt - run: cargo fmt --all -- --check - - - name: cargo clippy - run: cargo clippy --workspace --all-targets -- --deny=warnings - - - name: Build - run: ./ci/cargo-build-test.sh diff --git a/geyser-plugin/src/account_update_publisher.rs b/geyser-plugin/src/account_update_publisher.rs index b06a443..d82ae13 100644 --- a/geyser-plugin/src/account_update_publisher.rs +++ b/geyser-plugin/src/account_update_publisher.rs @@ -1,7 +1,8 @@ use { crate::{ - initial_account_backfill::InitialAccountBackfillHandle, publisher::Publisher, - server::subscriptions::AccountSubscriptions, wire::UpdateAccountEvent, + initial_account_backfill::InitialAccountBackfillHandle, + publisher::Publisher, server::subscriptions::AccountSubscriptions, + wire::UpdateAccountEvent, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPluginError as PluginError, Result as PluginResult, @@ -50,7 +51,8 @@ pub fn publish_backfill_account_update( live_update_seen: bool, event: UpdateAccountEvent, ) -> PluginResult { - let decision = should_publish_backfill_account(subs, pubkey, live_update_seen); + let decision = + should_publish_backfill_account(subs, pubkey, live_update_seen); match decision { AccountUpdatePublishOutcome::Published => { publish_raw_account_update(publisher, topic, event)?; @@ -117,7 +119,10 @@ fn should_publish_backfill_account( AccountUpdatePublishOutcome::Published } -fn should_publish_subscribed_account(subs: &AccountSubscriptions, pubkey: &[u8]) -> bool { +fn should_publish_subscribed_account( + subs: &AccountSubscriptions, + pubkey: &[u8], +) -> bool { match <&[u8; 32]>::try_from(pubkey) { Ok(key) => subs.contains_sync(key), Err(_) => false, @@ -145,7 +150,9 @@ mod tests { AccountUpdatePublishOutcome, should_publish_backfill_account, should_publish_confirmed_account, }; - use crate::{server::subscriptions::AccountSubscriptions, wire::UpdateAccountEvent}; + use crate::{ + server::subscriptions::AccountSubscriptions, wire::UpdateAccountEvent, + }; fn sample_event(is_startup: bool) -> UpdateAccountEvent { UpdateAccountEvent { @@ -167,7 +174,10 @@ mod tests { #[test] fn test_confirmed_startup_replay_updates_are_suppressed() { assert!(matches!( - should_publish_confirmed_account(&AccountSubscriptions::new(), &sample_event(true)), + should_publish_confirmed_account( + &AccountSubscriptions::new(), + &sample_event(true) + ), AccountUpdatePublishOutcome::SkippedStartupReplay )); } @@ -175,7 +185,11 @@ mod tests { #[test] fn test_backfill_snapshots_are_suppressed_after_live_updates() { assert!(matches!( - should_publish_backfill_account(&AccountSubscriptions::new(), [7; 32], true), + should_publish_backfill_account( + &AccountSubscriptions::new(), + [7; 32], + true + ), AccountUpdatePublishOutcome::SkippedLiveUpdateWon )); } diff --git a/geyser-plugin/src/config.rs b/geyser-plugin/src/config.rs index 3e3098c..9582ba2 100644 --- a/geyser-plugin/src/config.rs +++ b/geyser-plugin/src/config.rs @@ -18,7 +18,9 @@ use { }, reqwest::Url, serde::Deserialize, - std::{collections::BTreeMap, fs::File, io::Read, net::SocketAddr, path::Path}, + std::{ + collections::BTreeMap, fs::File, io::Read, net::SocketAddr, path::Path, + }, }; /// Plugin config. @@ -114,8 +116,9 @@ impl Config { let mut file = File::open(config_path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; - let mut this: Self = toml::from_str(&contents) - .map_err(|e| GeyserPluginError::ConfigFileReadError { msg: e.to_string() })?; + let mut this: Self = toml::from_str(&contents).map_err(|e| { + GeyserPluginError::ConfigFileReadError { msg: e.to_string() } + })?; this.fill_defaults(); this.validate()?; Ok(this) @@ -137,7 +140,8 @@ impl Config { fn validate(&self) -> PluginResult<()> { if self.kafka.bootstrap_servers.trim().is_empty() { return Err(GeyserPluginError::ConfigFileReadError { - msg: "missing required config field `kafka.bootstrap_servers`".to_owned(), + msg: "missing required config field `kafka.bootstrap_servers`" + .to_owned(), }); } @@ -149,7 +153,8 @@ impl Config { if self.plugin.local_rpc_url.trim().is_empty() { return Err(GeyserPluginError::ConfigFileReadError { - msg: "missing required config field `plugin.local_rpc_url`".to_owned(), + msg: "missing required config field `plugin.local_rpc_url`" + .to_owned(), }); } @@ -163,14 +168,17 @@ impl Config { let trimmed = url.trim(); if trimmed.is_empty() { return Err(GeyserPluginError::ConfigFileReadError { - msg: "invalid config field `ksql.url`: URL must not be empty".to_owned(), + msg: + "invalid config field `ksql.url`: URL must not be empty" + .to_owned(), }); } - let parsed = - Url::parse(trimmed).map_err(|error| GeyserPluginError::ConfigFileReadError { + let parsed = Url::parse(trimmed).map_err(|error| { + GeyserPluginError::ConfigFileReadError { msg: format!("invalid config field `ksql.url`: {error}"), - })?; + } + })?; match parsed.scheme() { "http" | "https" => {} @@ -185,7 +193,8 @@ impl Config { if !parsed.has_host() { return Err(GeyserPluginError::ConfigFileReadError { - msg: "invalid config field `ksql.url`: host is required".to_owned(), + msg: "invalid config field `ksql.url`: host is required" + .to_owned(), }); } @@ -205,7 +214,8 @@ mod tests { use super::Config; fn parse_config(toml: &str) -> Result { - let mut config: Config = toml::from_str(toml).map_err(|error| error.to_string())?; + let mut config: Config = + toml::from_str(toml).map_err(|error| error.to_string())?; config.fill_defaults(); config.validate().map_err(|error| format!("{error:?}"))?; Ok(config) @@ -287,7 +297,9 @@ admin = "127.0.0.1:8080" ) .unwrap_err(); - assert!(error.contains("missing required config field `kafka.bootstrap_servers`")); + assert!(error.contains( + "missing required config field `kafka.bootstrap_servers`" + )); } #[test] @@ -461,7 +473,9 @@ admin = "127.0.0.1:8080" ) .unwrap_err(); - assert!(error.contains("empty host") || error.contains("host is required")); + assert!( + error.contains("empty host") || error.contains("host is required") + ); } #[test] diff --git a/geyser-plugin/src/confirmation_buffer.rs b/geyser-plugin/src/confirmation_buffer.rs index 3bd8652..0fd9684 100644 --- a/geyser-plugin/src/confirmation_buffer.rs +++ b/geyser-plugin/src/confirmation_buffer.rs @@ -104,7 +104,9 @@ impl ConfirmedAccounts { ) -> SlotTransitionResult { self.update_highest_observed_slot(slot); - if self.dead_slots.contains(&slot) && !matches!(status, InternalSlotStatus::Dead) { + if self.dead_slots.contains(&slot) + && !matches!(status, InternalSlotStatus::Dead) + { return SlotTransitionResult::default(); } @@ -273,7 +275,8 @@ impl ConfirmedAccounts { return Vec::new(); } - let cutoff = self.highest_observed_slot - STALE_UNCONFIRMED_SLOT_RETENTION; + let cutoff = + self.highest_observed_slot - STALE_UNCONFIRMED_SLOT_RETENTION; let victims: Vec = self .slots .iter() @@ -358,7 +361,11 @@ mod tests { slots } - fn account_event(slot: u64, pubkey_byte: u8, write_version: u64) -> UpdateAccountEvent { + fn account_event( + slot: u64, + pubkey_byte: u8, + write_version: u64, + ) -> UpdateAccountEvent { UpdateAccountEvent { slot, pubkey: vec![pubkey_byte; 32], @@ -383,11 +390,17 @@ mod tests { confirmed.record_account(account_event(10, 1, 2)); assert_eq!(confirmed.pending_count_for(10), 1); - let result = confirmed.record_slot_status(10, None, InternalSlotStatus::Confirmed); + let result = confirmed.record_slot_status( + 10, + None, + InternalSlotStatus::Confirmed, + ); assert_eq!(result.newly_confirmed_slots, vec![10]); assert_eq!(result.confirmed_updates.len(), 1); assert_eq!( - sorted_slots(result.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + result.confirmed_updates.iter().map(|event| event.slot) + ), vec![10] ); assert!(result.dead_slots_cleaned.is_empty()); @@ -402,13 +415,23 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(11, 2, 1)); - let first = confirmed.record_slot_status(11, None, InternalSlotStatus::Confirmed); - let second = confirmed.record_slot_status(11, None, InternalSlotStatus::Confirmed); + let first = confirmed.record_slot_status( + 11, + None, + InternalSlotStatus::Confirmed, + ); + let second = confirmed.record_slot_status( + 11, + None, + InternalSlotStatus::Confirmed, + ); assert_eq!(first.newly_confirmed_slots, vec![11]); assert_eq!(first.confirmed_updates.len(), 1); assert_eq!( - sorted_slots(first.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + first.confirmed_updates.iter().map(|event| event.slot) + ), vec![11] ); assert!(first.dead_slots_cleaned.is_empty()); @@ -427,12 +450,15 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(12, 3, 1)); - let result = confirmed.record_slot_status(12, None, InternalSlotStatus::Rooted); + let result = + confirmed.record_slot_status(12, None, InternalSlotStatus::Rooted); assert_eq!(result.newly_confirmed_slots, vec![12]); assert_eq!(result.confirmed_updates.len(), 1); assert_eq!( - sorted_slots(result.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + result.confirmed_updates.iter().map(|event| event.slot) + ), vec![12] ); assert!(result.dead_slots_cleaned.is_empty()); @@ -447,15 +473,29 @@ mod tests { confirmed.record_account(account_event(20, 4, 1)); confirmed.record_account(account_event(21, 5, 1)); confirmed.record_account(account_event(22, 6, 1)); - confirmed.record_slot_status(21, Some(20), InternalSlotStatus::Processed); - confirmed.record_slot_status(22, Some(21), InternalSlotStatus::Processed); + confirmed.record_slot_status( + 21, + Some(20), + InternalSlotStatus::Processed, + ); + confirmed.record_slot_status( + 22, + Some(21), + InternalSlotStatus::Processed, + ); - let result = confirmed.record_slot_status(22, None, InternalSlotStatus::Confirmed); + let result = confirmed.record_slot_status( + 22, + None, + InternalSlotStatus::Confirmed, + ); assert_eq!(result.newly_confirmed_slots, vec![22, 21, 20]); assert_eq!(result.confirmed_updates.len(), 3); assert_eq!( - sorted_slots(result.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + result.confirmed_updates.iter().map(|event| event.slot) + ), vec![20, 21, 22] ); assert!(result.dead_slots_cleaned.is_empty()); @@ -473,12 +513,18 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(30, 7, 1)); - let result = confirmed.record_slot_status(30, None, InternalSlotStatus::Confirmed); + let result = confirmed.record_slot_status( + 30, + None, + InternalSlotStatus::Confirmed, + ); assert_eq!(result.newly_confirmed_slots, vec![30]); assert_eq!(result.confirmed_updates.len(), 1); assert_eq!( - sorted_slots(result.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + result.confirmed_updates.iter().map(|event| event.slot) + ), vec![30] ); assert!(result.dead_slots_cleaned.is_empty()); @@ -492,14 +538,24 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(40, 8, 1)); confirmed.record_account(account_event(41, 9, 1)); - confirmed.record_slot_status(41, Some(40), InternalSlotStatus::Processed); + confirmed.record_slot_status( + 41, + Some(40), + InternalSlotStatus::Processed, + ); - let result = confirmed.record_slot_status(41, None, InternalSlotStatus::Confirmed); + let result = confirmed.record_slot_status( + 41, + None, + InternalSlotStatus::Confirmed, + ); assert_eq!(result.newly_confirmed_slots, vec![41, 40]); assert_eq!(result.confirmed_updates.len(), 2); assert_eq!( - sorted_slots(result.confirmed_updates.iter().map(|event| event.slot)), + sorted_slots( + result.confirmed_updates.iter().map(|event| event.slot) + ), vec![40, 41] ); assert!(result.dead_slots_cleaned.is_empty()); @@ -515,7 +571,8 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(50, 10, 1)); - let result = confirmed.record_slot_status(50, None, InternalSlotStatus::Dead); + let result = + confirmed.record_slot_status(50, None, InternalSlotStatus::Dead); assert_eq!(result.dead_slots_cleaned, vec![50]); assert!(result.confirmed_updates.is_empty()); @@ -531,9 +588,14 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(60, 11, 1)); confirmed.record_account(account_event(61, 12, 1)); - confirmed.record_slot_status(61, Some(60), InternalSlotStatus::Processed); + confirmed.record_slot_status( + 61, + Some(60), + InternalSlotStatus::Processed, + ); - let result = confirmed.record_slot_status(60, None, InternalSlotStatus::Dead); + let result = + confirmed.record_slot_status(60, None, InternalSlotStatus::Dead); assert_eq!(sorted_slots(result.dead_slots_cleaned), vec![60, 61]); assert!(result.confirmed_updates.is_empty()); @@ -553,7 +615,11 @@ mod tests { confirmed.record_account(account_event(70, 13, 1)); confirmed.record_slot_status(70, None, InternalSlotStatus::Dead); - let result = confirmed.record_slot_status(70, None, InternalSlotStatus::Confirmed); + let result = confirmed.record_slot_status( + 70, + None, + InternalSlotStatus::Confirmed, + ); assert!(result.confirmed_updates.is_empty()); assert!(result.newly_confirmed_slots.is_empty()); @@ -569,8 +635,13 @@ mod tests { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(80, 14, 1)); - let _ = confirmed.record_slot_status(80, None, InternalSlotStatus::Confirmed); - let result = confirmed.record_slot_status(80, None, InternalSlotStatus::Rooted); + let _ = confirmed.record_slot_status( + 80, + None, + InternalSlotStatus::Confirmed, + ); + let result = + confirmed.record_slot_status(80, None, InternalSlotStatus::Rooted); assert!(result.confirmed_updates.is_empty()); assert!(result.newly_confirmed_slots.is_empty()); @@ -604,7 +675,11 @@ mod tests { fn test_stale_fallback_does_not_evict_confirmed_slots() { let mut confirmed = ConfirmedAccounts::new(); confirmed.record_account(account_event(2, 16, 1)); - let _ = confirmed.record_slot_status(2, None, InternalSlotStatus::Confirmed); + let _ = confirmed.record_slot_status( + 2, + None, + InternalSlotStatus::Confirmed, + ); let result = confirmed.record_slot_status( STALE_UNCONFIRMED_SLOT_RETENTION + 10, diff --git a/geyser-plugin/src/initial_account_backfill/mod.rs b/geyser-plugin/src/initial_account_backfill/mod.rs index 7d4751a..888abb0 100644 --- a/geyser-plugin/src/initial_account_backfill/mod.rs +++ b/geyser-plugin/src/initial_account_backfill/mod.rs @@ -1,9 +1,13 @@ use { crate::{ - account_update_publisher::{AccountUpdatePublishOutcome, publish_backfill_account_update}, + account_update_publisher::{ + AccountUpdatePublishOutcome, publish_backfill_account_update, + }, metrics::{ - INITIAL_BACKFILL_IN_FLIGHT, INITIAL_BACKFILL_PUBKEYS_ENQUEUED_TOTAL, - INITIAL_BACKFILL_REQUESTS_ENQUEUED_TOTAL, INITIAL_BACKFILL_RPC_FAILURES_TOTAL, + INITIAL_BACKFILL_IN_FLIGHT, + INITIAL_BACKFILL_PUBKEYS_ENQUEUED_TOTAL, + INITIAL_BACKFILL_REQUESTS_ENQUEUED_TOTAL, + INITIAL_BACKFILL_RPC_FAILURES_TOTAL, INITIAL_BACKFILL_SNAPSHOTS_TOTAL, }, publisher::Publisher, @@ -50,7 +54,10 @@ impl InitialAccountBackfill { publisher: Some(publisher), update_account_topic, subscriptions, - client: RpcClient::new_with_commitment(local_rpc_url, CommitmentConfig::confirmed()), + client: RpcClient::new_with_commitment( + local_rpc_url, + CommitmentConfig::confirmed(), + ), }); let handle = InitialAccountBackfillHandle { inner: inner.clone(), @@ -85,7 +92,10 @@ impl InitialAccountBackfill { publisher: None, update_account_topic: Arc::new(String::new()), subscriptions: AccountSubscriptions::new(), - client: RpcClient::new_with_commitment(String::new(), CommitmentConfig::confirmed()), + client: RpcClient::new_with_commitment( + String::new(), + CommitmentConfig::confirmed(), + ), }); Self { handle: InitialAccountBackfillHandle { inner }, @@ -137,7 +147,8 @@ impl InitialAccountBackfillHandle { INITIAL_BACKFILL_PUBKEYS_ENQUEUED_TOTAL .with_label_values(&["accepted"]) .inc_by(request_pubkey_count as u64); - INITIAL_BACKFILL_IN_FLIGHT.set(self.inner.in_flight.len() as i64); + INITIAL_BACKFILL_IN_FLIGHT + .set(self.inner.in_flight.len() as i64); info!( "Enqueued initial account backfill request for {} pubkeys, in_flight={}", request_pubkey_count, @@ -247,7 +258,8 @@ struct InitialAccountBackfillInner { impl InitialAccountBackfillInner { async fn process_request(&self, pubkeys: &[[u8; 32]]) { - match rpc::fetch_account_events_for_request(&self.client, pubkeys).await { + match rpc::fetch_account_events_for_request(&self.client, pubkeys).await + { Ok(events) => { info!( "Initial account backfill RPC request succeeded for {} pubkeys", @@ -278,7 +290,9 @@ impl InitialAccountBackfillInner { INITIAL_BACKFILL_SNAPSHOTS_TOTAL .with_label_values(&["publish_failed"]) .inc(); - error!("Failed to publish initial account backfill snapshot: {error:?}"); + error!( + "Failed to publish initial account backfill snapshot: {error:?}" + ); } } } @@ -293,7 +307,8 @@ impl InitialAccountBackfillInner { fn complete_backfill_event( &self, event: UpdateAccountEvent, - ) -> agave_geyser_plugin_interface::geyser_plugin_interface::Result<()> { + ) -> agave_geyser_plugin_interface::geyser_plugin_interface::Result<()> + { let pubkey = match <[u8; 32]>::try_from(event.pubkey.as_slice()) { Ok(pubkey) => pubkey, Err(_) => { @@ -325,9 +340,15 @@ impl InitialAccountBackfillInner { )?; Self::record_snapshot(match outcome { AccountUpdatePublishOutcome::Published => "published", - AccountUpdatePublishOutcome::SkippedStartupReplay => "skipped_startup_replay", - AccountUpdatePublishOutcome::SkippedUnsubscribed => "skipped_unsubscribed", - AccountUpdatePublishOutcome::SkippedLiveUpdateWon => "suppressed_live_seen", + AccountUpdatePublishOutcome::SkippedStartupReplay => { + "skipped_startup_replay" + } + AccountUpdatePublishOutcome::SkippedUnsubscribed => { + "skipped_unsubscribed" + } + AccountUpdatePublishOutcome::SkippedLiveUpdateWon => { + "suppressed_live_seen" + } }); if matches!(outcome, AccountUpdatePublishOutcome::Published) { info!( @@ -352,8 +373,8 @@ impl InitialAccountBackfillInner { #[cfg(test)] mod tests { use { - super::*, crate::server::subscriptions::AccountSubscriptions, solana_account::Account, - tokio::sync::mpsc, + super::*, crate::server::subscriptions::AccountSubscriptions, + solana_account::Account, tokio::sync::mpsc, }; fn pk(byte: u8) -> [u8; 32] { @@ -471,7 +492,8 @@ mod tests { let _ = handle.enqueue(vec![pubkey]); handle.mark_live_update_seen(&pubkey); - let result = inner.complete_backfill_event(rpc::map_missing_account(1, pubkey)); + let result = + inner.complete_backfill_event(rpc::map_missing_account(1, pubkey)); assert!(result.is_ok()); assert!(!inner.in_flight.contains_key(&pubkey)); diff --git a/geyser-plugin/src/initial_account_backfill/rpc.rs b/geyser-plugin/src/initial_account_backfill/rpc.rs index bdab3b7..2477ecc 100644 --- a/geyser-plugin/src/initial_account_backfill/rpc.rs +++ b/geyser-plugin/src/initial_account_backfill/rpc.rs @@ -2,9 +2,13 @@ use { crate::{ initial_account_backfill::{ INITIAL_BACKFILL_INITIAL_BACKOFF_MS, INITIAL_BACKFILL_MAX_ATTEMPTS, - INITIAL_BACKFILL_MAX_BACKOFF_MS, INITIAL_BACKFILL_MAX_RPC_KEYS_PER_REQUEST, + INITIAL_BACKFILL_MAX_BACKOFF_MS, + INITIAL_BACKFILL_MAX_RPC_KEYS_PER_REQUEST, + }, + metrics::{ + INITIAL_BACKFILL_RPC_ATTEMPTS_TOTAL, + INITIAL_BACKFILL_RPC_FAILURES_TOTAL, }, - metrics::{INITIAL_BACKFILL_RPC_ATTEMPTS_TOTAL, INITIAL_BACKFILL_RPC_FAILURES_TOTAL}, wire::UpdateAccountEvent, }, log::*, @@ -52,7 +56,10 @@ async fn fetch_account_events_for_chunk( ); match client - .get_multiple_accounts_with_commitment(&keys, CommitmentConfig::confirmed()) + .get_multiple_accounts_with_commitment( + &keys, + CommitmentConfig::confirmed(), + ) .await { Ok(response) => { @@ -83,10 +90,14 @@ async fn fetch_account_events_for_chunk( .iter() .zip(response.value.into_iter()) .map(|(pubkey, maybe_account)| match maybe_account { - Some(account) => { - map_existing_account(account, response.context.slot, *pubkey) + Some(account) => map_existing_account( + account, + response.context.slot, + *pubkey, + ), + None => { + map_missing_account(response.context.slot, *pubkey) } - None => map_missing_account(response.context.slot, *pubkey), }) .collect()); } @@ -105,7 +116,8 @@ async fn fetch_account_events_for_chunk( if attempt < INITIAL_BACKFILL_MAX_ATTEMPTS { sleep(Duration::from_millis(backoff_ms)).await; - backoff_ms = (backoff_ms * 2).min(INITIAL_BACKFILL_MAX_BACKOFF_MS); + backoff_ms = + (backoff_ms * 2).min(INITIAL_BACKFILL_MAX_BACKOFF_MS); } } } @@ -114,7 +126,8 @@ async fn fetch_account_events_for_chunk( Err(io::Error::other(last_error.unwrap())) } -pub(crate) const SYSTEM_PROGRAM_ID: Pubkey = pubkey!("11111111111111111111111111111111"); +pub(crate) const SYSTEM_PROGRAM_ID: Pubkey = + pubkey!("11111111111111111111111111111111"); pub(crate) fn map_existing_account( account: Account, @@ -137,7 +150,10 @@ pub(crate) fn map_existing_account( } } -pub(crate) fn map_missing_account(slot: u64, pubkey: [u8; 32]) -> UpdateAccountEvent { +pub(crate) fn map_missing_account( + slot: u64, + pubkey: [u8; 32], +) -> UpdateAccountEvent { UpdateAccountEvent { slot, pubkey: pubkey.to_vec(), diff --git a/geyser-plugin/src/ksql.rs b/geyser-plugin/src/ksql.rs index 990dacf..865cd46 100644 --- a/geyser-plugin/src/ksql.rs +++ b/geyser-plugin/src/ksql.rs @@ -16,12 +16,13 @@ pub(crate) struct KsqlPubkeyRestoreClient { impl KsqlPubkeyRestoreClient { pub(crate) fn new(base_url: &str, table: &str) -> io::Result { - let parsed = Url::parse(base_url) - .map_err(|error| io::Error::other(format!("invalid ksql base URL: {error}")))?; + let parsed = Url::parse(base_url).map_err(|error| { + io::Error::other(format!("invalid ksql base URL: {error}")) + })?; let normalized = parsed.as_str().trim_end_matches('/').to_owned(); - let client = Client::builder() - .build() - .map_err(|error| io::Error::other(format!("failed to build ksql client: {error}")))?; + let client = Client::builder().build().map_err(|error| { + io::Error::other(format!("failed to build ksql client: {error}")) + })?; Ok(Self { client, @@ -46,9 +47,13 @@ impl KsqlPubkeyRestoreClient { "sql": sql, })) .send() - .map_err(|error| io::Error::other(format!("failed to query ksqlDB: {error}")))? + .map_err(|error| { + io::Error::other(format!("failed to query ksqlDB: {error}")) + })? .error_for_status() - .map_err(|error| io::Error::other(format!("ksqlDB query failed: {error}")))?; + .map_err(|error| { + io::Error::other(format!("ksqlDB query failed: {error}")) + })?; let reader = BufReader::new(response); let pubkeys = parse_pubkeys_stream(reader)?; @@ -60,7 +65,9 @@ impl KsqlPubkeyRestoreClient { } } -pub(crate) fn parse_pubkeys_stream(reader: impl BufRead) -> io::Result> { +pub(crate) fn parse_pubkeys_stream( + reader: impl BufRead, +) -> io::Result> { let mut pubkeys = Vec::new(); for line_result in reader.lines() { @@ -70,7 +77,9 @@ pub(crate) fn parse_pubkeys_stream(reader: impl BufRead) -> io::Result io::Result| { + io::Error::other(format!( + "expected 32 decoded PUBKEY bytes, got {}", + bytes.len() + )) })?; - let pubkey: [u8; 32] = decoded.try_into().map_err(|bytes: Vec| { - io::Error::other(format!( - "expected 32 decoded PUBKEY bytes, got {}", - bytes.len() - )) - })?; pubkeys.push(pubkey); } _ => { @@ -161,9 +173,11 @@ mod tests { #[test] fn test_rejects_ksql_error_rows() { - let error = parse_pubkeys_stream("{\"@type\":\"error\",\"message\":\"boom\"}\n".as_bytes()) - .unwrap_err() - .to_string(); + let error = parse_pubkeys_stream( + "{\"@type\":\"error\",\"message\":\"boom\"}\n".as_bytes(), + ) + .unwrap_err() + .to_string(); assert!(error.contains("ksql error response")); } diff --git a/geyser-plugin/src/metrics.rs b/geyser-plugin/src/metrics.rs index 465c23e..7cdcfbf 100644 --- a/geyser-plugin/src/metrics.rs +++ b/geyser-plugin/src/metrics.rs @@ -101,7 +101,10 @@ impl ClientContext for StatsThreadedProducerContext { macro_rules! set_value { ($name:expr, $value:expr) => { KAFKA_STATS - .with_label_values(&[&name.to_string(), &$name.to_string()]) + .with_label_values(&[ + &name.to_string(), + &$name.to_string(), + ]) .set($value as f64); }; } diff --git a/geyser-plugin/src/plugin/dispatch.rs b/geyser-plugin/src/plugin/dispatch.rs index a68c0ec..5f455eb 100644 --- a/geyser-plugin/src/plugin/dispatch.rs +++ b/geyser-plugin/src/plugin/dispatch.rs @@ -1,8 +1,9 @@ use { crate::{ account_update_publisher::publish_confirmed_account_update, - initial_account_backfill::InitialAccountBackfillHandle, publisher::Publisher, - server::subscriptions::AccountSubscriptions, wire::UpdateAccountEvent, + initial_account_backfill::InitialAccountBackfillHandle, + publisher::Publisher, server::subscriptions::AccountSubscriptions, + wire::UpdateAccountEvent, }, agave_geyser_plugin_interface::geyser_plugin_interface::Result as PluginResult, }; diff --git a/geyser-plugin/src/plugin/mod.rs b/geyser-plugin/src/plugin/mod.rs index 9673c5a..fa6a980 100644 --- a/geyser-plugin/src/plugin/mod.rs +++ b/geyser-plugin/src/plugin/mod.rs @@ -30,8 +30,9 @@ use { }, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ - GeyserPlugin, GeyserPluginError as PluginError, ReplicaAccountInfoVersions, - Result as PluginResult, SlotStatus as PluginSlotStatus, + GeyserPlugin, GeyserPluginError as PluginError, + ReplicaAccountInfoVersions, Result as PluginResult, + SlotStatus as PluginSlotStatus, }, log::{error, info, warn}, rdkafka::util::get_rdkafka_version, @@ -96,15 +97,17 @@ impl GeyserPlugin for KafkaPlugin { for (key, value) in &config.kafka.client { producer_config.set(key, value); } - producer_config.set("bootstrap.servers", &config.kafka.bootstrap_servers); - let producer = rdkafka::producer::ThreadedProducer::from_config_and_context( - &producer_config, - StatsThreadedProducerContext, - ) - .map_err(|error| { - error!("Failed to create kafka producer: {error:?}"); - PluginError::Custom(Box::new(error)) - })?; + producer_config + .set("bootstrap.servers", &config.kafka.bootstrap_servers); + let producer = + rdkafka::producer::ThreadedProducer::from_config_and_context( + &producer_config, + StatsThreadedProducerContext, + ) + .map_err(|error| { + error!("Failed to create kafka producer: {error:?}"); + PluginError::Custom(Box::new(error)) + })?; let publisher = Arc::new(Publisher::new( producer, Duration::from_millis(config.plugin.shutdown_timeout_ms), @@ -195,7 +198,9 @@ impl KafkaPlugin { Default::default() } - fn lock_confirmed_accounts(&self) -> PluginResult> { + fn lock_confirmed_accounts( + &self, + ) -> PluginResult> { self.confirmed_accounts.lock().map_err(|error| { PluginError::Custom(Box::new(std::io::Error::other(format!( "confirmed_accounts mutex poisoned: {error}" @@ -357,8 +362,10 @@ mod tests { use { super::{KafkaPlugin, restore_pubkeys_in_chunks}, crate::{ - config::Config, initial_account_backfill::InitialAccountBackfillHandle, - ksql::INIT_TRACKING_RESTORE_CHUNK_SIZE, server::subscriptions::AccountSubscriptions, + config::Config, + initial_account_backfill::InitialAccountBackfillHandle, + ksql::INIT_TRACKING_RESTORE_CHUNK_SIZE, + server::subscriptions::AccountSubscriptions, }, }; @@ -373,8 +380,11 @@ mod tests { let initial_account_backfill = crate::initial_account_backfill::InitialAccountBackfill::default(); - let result = - KafkaPlugin::restore_tracking_from_ksql(&config, &subs, &initial_account_backfill); + let result = KafkaPlugin::restore_tracking_from_ksql( + &config, + &subs, + &initial_account_backfill, + ); assert!(result.is_ok()); assert!(!subs.contains_sync(&pk(1))); @@ -385,9 +395,12 @@ mod tests { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(8); - let summary = - restore_pubkeys_in_chunks(&subs, &test_handle, vec![pk(1), pk(1), pk(2), pk(2), pk(3)]) - .unwrap(); + let summary = restore_pubkeys_in_chunks( + &subs, + &test_handle, + vec![pk(1), pk(1), pk(2), pk(2), pk(3)], + ) + .unwrap(); assert_eq!(summary.deduplicated_count, 3); assert_eq!(summary.accepted_count, 3); @@ -411,14 +424,18 @@ mod tests { }) .collect::>(); - let summary = restore_pubkeys_in_chunks(&subs, &test_handle, pubkeys).unwrap(); + let summary = + restore_pubkeys_in_chunks(&subs, &test_handle, pubkeys).unwrap(); assert_eq!( summary.deduplicated_count, INIT_TRACKING_RESTORE_CHUNK_SIZE + 1 ); assert_eq!(summary.chunk_count, 2); - assert_eq!(summary.accepted_count, INIT_TRACKING_RESTORE_CHUNK_SIZE + 1); + assert_eq!( + summary.accepted_count, + INIT_TRACKING_RESTORE_CHUNK_SIZE + 1 + ); } #[test] @@ -427,7 +444,8 @@ mod tests { let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(9)]); - let error = restore_pubkeys_in_chunks(&subs, &test_handle, vec![pk(7), pk(8)]); + let error = + restore_pubkeys_in_chunks(&subs, &test_handle, vec![pk(7), pk(8)]); assert!(error.is_err()); assert_eq!(subs.needs_backfill_count(), 2); diff --git a/geyser-plugin/src/publisher.rs b/geyser-plugin/src/publisher.rs index af0d1c3..093a70a 100644 --- a/geyser-plugin/src/publisher.rs +++ b/geyser-plugin/src/publisher.rs @@ -44,10 +44,15 @@ impl Publisher { } } - pub fn update_account(&self, ev: UpdateAccountEvent, topic: &str) -> Result<(), KafkaError> { + pub fn update_account( + &self, + ev: UpdateAccountEvent, + topic: &str, + ) -> Result<(), KafkaError> { let log_pubkey = Pubkey::try_from(ev.pubkey.as_slice()).ok(); let (key, buf) = Self::encode_account_update(ev); - let record = BaseRecord::, _>::to(topic).key(&key).payload(&buf); + let record = + BaseRecord::, _>::to(topic).key(&key).payload(&buf); let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e); match &result { Ok(()) => match log_pubkey { @@ -69,7 +74,11 @@ impl Publisher { }, } UPLOAD_ACCOUNTS_TOTAL - .with_label_values(&[if result.is_ok() { "success" } else { "failed" }]) + .with_label_values(&[if result.is_ok() { + "success" + } else { + "failed" + }]) .inc(); result } @@ -97,7 +106,9 @@ impl Drop for Publisher { #[cfg(test)] mod tests { use super::Publisher; - use crate::{MessageWrapper, UpdateAccountEvent, message_wrapper::EventMessage}; + use crate::{ + MessageWrapper, UpdateAccountEvent, message_wrapper::EventMessage, + }; use prost::Message; fn sample_event() -> UpdateAccountEvent { @@ -136,7 +147,9 @@ mod tests { let wrapper = MessageWrapper::decode(payload.as_slice()).unwrap(); match wrapper.event_message { - Some(EventMessage::Account(account)) => assert_eq!(*account, expected), + Some(EventMessage::Account(account)) => { + assert_eq!(*account, expected) + } other => panic!("unexpected wrapper payload: {other:?}"), } } diff --git a/geyser-plugin/src/server/accounts.rs b/geyser-plugin/src/server/accounts.rs index 4d40274..c13878a 100644 --- a/geyser-plugin/src/server/accounts.rs +++ b/geyser-plugin/src/server/accounts.rs @@ -138,7 +138,10 @@ pub async fn handle_post_accounts( initial_account_backfill: InitialAccountBackfillHandle, ) -> Response> { use http_body_util::BodyExt; - let body_bytes = match Limited::new(req.into_body(), MAX_BODY_SIZE).collect().await { + let body_bytes = match Limited::new(req.into_body(), MAX_BODY_SIZE) + .collect() + .await + { Ok(collected) => collected.to_bytes(), Err(e) => { return if e @@ -150,14 +153,22 @@ pub async fn handle_post_accounts( &format!("body exceeds max size of {MAX_BODY_SIZE} bytes"), ) } else { - error_response(StatusCode::BAD_REQUEST, &format!("body read error: {e}")) + error_response( + StatusCode::BAD_REQUEST, + &format!("body read error: {e}"), + ) }; } }; let parsed: AddAccountsRequest = match serde_json::from_slice(&body_bytes) { Ok(v) => v, - Err(e) => return error_response(StatusCode::BAD_REQUEST, &format!("invalid JSON: {e}")), + Err(e) => { + return error_response( + StatusCode::BAD_REQUEST, + &format!("invalid JSON: {e}"), + ); + } }; let mut keys = Vec::with_capacity(parsed.pubkeys.len()); @@ -174,7 +185,9 @@ pub async fn handle_post_accounts( } match add_accounts(&subs, &initial_account_backfill, keys) { - Ok(outcome) => json_response(StatusCode::OK, &AccountsResponse::from(outcome)), + Ok(outcome) => { + json_response(StatusCode::OK, &AccountsResponse::from(outcome)) + } Err(AddAccountsError::QueueFull(outcome)) => json_response( StatusCode::SERVICE_UNAVAILABLE, &AccountsResponse::from(outcome), @@ -199,7 +212,10 @@ impl From for AccountsResponse { } } -fn json_response(status: StatusCode, body: &T) -> Response> { +fn json_response( + status: StatusCode, + body: &T, +) -> Response> { let json = match serde_json::to_vec(body) { Ok(j) => j, Err(e) => { @@ -236,7 +252,8 @@ fn error_500() -> Response> { .body(Full::new(Bytes::new())) .unwrap_or_else(|_| { // Final fallback: bare 500 response - let (mut parts, _) = Response::new(Full::new(Bytes::new())).into_parts(); + let (mut parts, _) = + Response::new(Full::new(Bytes::new())).into_parts(); parts.status = StatusCode::INTERNAL_SERVER_ERROR; Response::from_parts(parts, Full::new(Bytes::new())) }) @@ -270,7 +287,8 @@ mod tests { let subs = AccountSubscriptions::new(); let test_handle = InitialAccountBackfillHandle::new_test(4); - let outcome = add_accounts(&subs, &test_handle, vec![pk(1), pk(2)]).unwrap(); + let outcome = + add_accounts(&subs, &test_handle, vec![pk(1), pk(2)]).unwrap(); assert_eq!(outcome.accepted_count, 2); assert_eq!(outcome.newly_added_count, 2); @@ -285,7 +303,9 @@ mod tests { let test_handle = InitialAccountBackfillHandle::new_test(4); let _ = add_accounts(&subs, &test_handle, vec![pk(3)]).unwrap(); - let outcome = add_accounts(&subs, &test_handle, vec![pk(3), pk(4), pk(4)]).unwrap(); + let outcome = + add_accounts(&subs, &test_handle, vec![pk(3), pk(4), pk(4)]) + .unwrap(); assert_eq!(outcome.accepted_count, 3); assert_eq!(outcome.newly_added_count, 1); @@ -311,7 +331,8 @@ mod tests { let test_handle = InitialAccountBackfillHandle::new_test(1); test_handle.prefill_queue_for_test(vec![pk(50)]); - let error = add_accounts(&subs, &test_handle, vec![pk(51), pk(52)]).unwrap_err(); + let error = add_accounts(&subs, &test_handle, vec![pk(51), pk(52)]) + .unwrap_err(); match error { AddAccountsError::QueueFull(outcome) => { diff --git a/geyser-plugin/src/server/mod.rs b/geyser-plugin/src/server/mod.rs index 573e931..4eaf21f 100644 --- a/geyser-plugin/src/server/mod.rs +++ b/geyser-plugin/src/server/mod.rs @@ -3,7 +3,10 @@ pub mod prom; pub mod subscriptions; use { - crate::{initial_account_backfill::InitialAccountBackfillHandle, metrics::register_metrics}, + crate::{ + initial_account_backfill::InitialAccountBackfillHandle, + metrics::register_metrics, + }, bytes::Bytes, http::StatusCode, http_body_util::Full, @@ -52,10 +55,16 @@ impl HttpService { let service = service_fn(move |req: Request| { let subs = subs.clone(); - let initial_account_backfill = initial_account_backfill.clone(); + let initial_account_backfill = + initial_account_backfill.clone(); async move { - let response = - route(req, subs, initial_account_backfill, metrics_enabled).await; + let response = route( + req, + subs, + initial_account_backfill, + metrics_enabled, + ) + .await; Ok::<_, hyper::Error>(response) } }); @@ -87,7 +96,8 @@ async fn route( match (req.method(), req.uri().path()) { (&Method::GET, "/metrics") if metrics_enabled => metrics_handler(), (&Method::POST, "/filters/accounts") => { - accounts::handle_post_accounts(req, subs, initial_account_backfill).await + accounts::handle_post_accounts(req, subs, initial_account_backfill) + .await } _ => not_found(), } @@ -105,7 +115,8 @@ fn not_found() -> Response> { .status(StatusCode::NOT_FOUND) .body(Full::new(Bytes::new())) .unwrap_or_else(|_| { - let (mut parts, _) = Response::new(Full::new(Bytes::new())).into_parts(); + let (mut parts, _) = + Response::new(Full::new(Bytes::new())).into_parts(); parts.status = StatusCode::NOT_FOUND; Response::from_parts(parts, Full::new(Bytes::new())) }) diff --git a/geyser-plugin/src/server/prom.rs b/geyser-plugin/src/server/prom.rs index e0665d1..06bb0ae 100644 --- a/geyser-plugin/src/server/prom.rs +++ b/geyser-plugin/src/server/prom.rs @@ -1,6 +1,6 @@ use { - crate::metrics::REGISTRY, bytes::Bytes, http_body_util::Full, hyper::Response, log::*, - prometheus::TextEncoder, + crate::metrics::REGISTRY, bytes::Bytes, http_body_util::Full, + hyper::Response, log::*, prometheus::TextEncoder, }; pub fn metrics_handler() -> Response> { diff --git a/geyser-plugin/src/server/subscriptions.rs b/geyser-plugin/src/server/subscriptions.rs index 6029488..f11bc61 100644 --- a/geyser-plugin/src/server/subscriptions.rs +++ b/geyser-plugin/src/server/subscriptions.rs @@ -39,7 +39,10 @@ impl AccountSubscriptions { } /// Add pubkeys and report how many were newly inserted versus duplicates. - pub fn add>(&self, pubkeys: I) -> AddAccountsResult { + pub fn add>( + &self, + pubkeys: I, + ) -> AddAccountsResult { let mut newly_added = Vec::new(); let mut request_seen = HashSet::new(); let mut duplicate_count = 0; diff --git a/grpc-service/rustfmt.toml b/rustfmt.toml similarity index 100% rename from grpc-service/rustfmt.toml rename to rustfmt.toml