diff --git a/Cargo.lock b/Cargo.lock index 7f98bc1..7658a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5217,6 +5217,8 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3d05b97f789b0e0b7d54b2fe05f05edfafb94f72d065482fc20ce1e9fab69e" dependencies = [ "anyhow", "async-trait", diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index 0cbba46..5113f33 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -236,6 +236,12 @@ pub enum ChunkQuoteResponse { quote: Vec, /// `true` when the chunk already exists on this node (skip payment). already_stored: bool, + /// Up to `CLOSE_GROUP_SIZE` peer IDs (raw 32-byte BLAKE3 hashes) this + /// node considers closest to the content address, **excluding itself** + /// (the local node is filtered out by the DHT query). Clients combine + /// these views from multiple nodes to verify close-group quorum before + /// paying. + close_group: Vec<[u8; 32]>, }, /// Quote generation failed. Error(ProtocolError), diff --git a/src/devnet.rs b/src/devnet.rs index f61fc40..47fd452 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -588,6 +588,7 @@ impl Devnet { evm: evm_config, cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); let metrics_tracker = @@ -602,6 +603,7 @@ impl Devnet { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } @@ -643,6 +645,10 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + if protocol.set_p2p_node(Arc::clone(p2p)).is_err() { + warn!("P2P node already set on protocol handler for devnet node {index}"); + } let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/node.rs b/src/node.rs index 2726f9d..f9bc76a 100644 --- a/src/node.rs +++ b/src/node.rs @@ -123,10 +123,14 @@ impl NodeBuilder { None }; + // Wrap P2P node in Arc early so it can be shared with the protocol handler. + let p2p_node = Arc::new(p2p_node); + // Initialize ANT protocol handler for chunk storage let ant_protocol = if self.config.storage.enabled { Some(Arc::new( - Self::build_ant_protocol(&self.config, &identity).await?, + Self::build_ant_protocol(&self.config, &identity, Some(Arc::clone(&p2p_node))) + .await?, )) } else { info!("Chunk storage disabled"); @@ -135,7 +139,7 @@ impl NodeBuilder { let node = RunningNode { config: self.config, - p2p_node: Arc::new(p2p_node), + p2p_node, shutdown, events_tx, events_rx: Some(events_rx), @@ -330,6 +334,7 @@ impl NodeBuilder { async fn build_ant_protocol( config: &NodeConfig, identity: &NodeIdentity, + p2p_node: Option>, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { @@ -363,6 +368,7 @@ impl NodeBuilder { }, cache_capacity: config.payment.cache_capacity, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); // Safe: 5GB fits in usize on all supported 64-bit platforms. @@ -379,6 +385,7 @@ impl NodeBuilder { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + p2p_node, ); info!( diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index c36e5b8..2c672fa 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -18,6 +18,7 @@ use evmlib::Network as EvmNetwork; use lru::LruCache; use parking_lot::Mutex; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use std::collections::HashSet; use std::num::NonZeroUsize; use std::time::SystemTime; use tracing::{debug, info}; @@ -75,6 +76,10 @@ pub struct PaymentVerifierConfig { /// Local node's rewards address. /// The verifier rejects payments that don't include this node as a recipient. pub local_rewards_address: RewardsAddress, + /// Local node's peer ID (32-byte BLAKE3 hash of ML-DSA-65 public key). + /// Used to build the full close group view (self + DHT peers) during + /// payment proof validation. + pub local_peer_id: [u8; 32], } /// Status returned by payment verification. @@ -199,6 +204,7 @@ impl PaymentVerifier { &self, xorname: &XorName, payment_proof: Option<&[u8]>, + local_close_group: &[[u8; 32]], ) -> Result { // First check if payment is required let status = self.check_payment_required(xorname); @@ -237,7 +243,8 @@ impl PaymentVerifier { debug!("Proof includes {} transaction hash(es)", tx_hashes.len()); } - self.verify_evm_payment(xorname, &payment).await?; + self.verify_evm_payment(xorname, &payment, local_close_group) + .await?; } None => { let tag = proof.first().copied().unwrap_or(0); @@ -300,7 +307,12 @@ impl PaymentVerifier { /// For unit tests that don't need on-chain verification, pre-populate /// the cache so `verify_payment` returns `CachedAsVerified` before /// reaching this method. - async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> { + async fn verify_evm_payment( + &self, + xorname: &XorName, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { if tracing::enabled!(tracing::Level::DEBUG) { let xorname_hex = hex::encode(xorname); let quote_count = payment.peer_quotes.len(); @@ -312,6 +324,7 @@ impl PaymentVerifier { Self::validate_quote_timestamps(payment)?; Self::validate_peer_bindings(payment)?; self.validate_local_recipient(payment)?; + self.validate_close_group_membership(payment, local_close_group)?; // Verify quote signatures (CPU-bound, run off async runtime) let peer_quotes = payment.peer_quotes.clone(); @@ -692,12 +705,56 @@ impl PaymentVerifier { } Ok(()) } + + /// Verify that **every** peer in the payment proof is a known close group member. + /// + /// Builds the known set from the current DHT close group plus this node + /// itself, then checks that each proof peer (derived via `BLAKE3(pub_key)`) + /// appears in that set. Rejects the proof if ANY peer is unrecognized. + /// + /// Skipped when `local_close_group` is empty (unit tests without DHT). + fn validate_close_group_membership( + &self, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { + if local_close_group.is_empty() { + return Ok(()); + } + + // Build the known peer set: current DHT close group + this node. + let mut known_peers: HashSet<[u8; 32]> = local_close_group.iter().copied().collect(); + known_peers.insert(self.config.local_peer_id); + + // Every proof peer must be in the known set. + for (_encoded_peer_id, quote) in &payment.peer_quotes { + let peer_id = peer_id_from_public_key_bytes("e.pub_key).map_err(|e| { + Error::Payment(format!("Invalid ML-DSA pub_key in proof quote: {e}")) + })?; + + if !known_peers.contains(peer_id.as_bytes()) { + return Err(Error::Payment(format!( + "Proof peer {} is not in the current close group", + peer_id.to_hex() + ))); + } + } + + debug!( + "Close group membership validated: all {} proof peers recognized", + payment.peer_quotes.len() + ); + Ok(()) + } } #[cfg(test)] #[allow(clippy::expect_used)] mod tests { use super::*; + use ant_evm::EncodedPeerId; + use saorsa_core::MlDsa65; + use saorsa_pqc::pqc::MlDsaOperations; /// Create a verifier for unit tests. EVM is always on, but tests can /// pre-populate the cache to bypass on-chain verification. @@ -706,6 +763,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; PaymentVerifier::new(config) } @@ -739,7 +797,7 @@ mod tests { let xorname = [1u8; 32]; // No proof provided => should return an error (EVM is always on) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!( result.is_err(), "Expected Err without proof, got: {result:?}" @@ -755,7 +813,7 @@ mod tests { verifier.cache.insert(xorname); // Should succeed without payment (cached) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!(result.is_ok()); assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified); } @@ -802,7 +860,9 @@ mod tests { // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1]; - let result = verifier.verify_payment(&xorname, Some(&small_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&small_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -818,7 +878,9 @@ mod tests { // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1]; - let result = verifier.verify_payment(&xorname, Some(&large_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&large_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -835,7 +897,7 @@ mod tests { // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -853,7 +915,7 @@ mod tests { // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -871,7 +933,7 @@ mod tests { // Valid tag (0x01) but garbage payload — should fail deserialization let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE]; garbage.extend_from_slice(&[0xAB; 63]); - let result = verifier.verify_payment(&xorname, Some(&garbage)).await; + let result = verifier.verify_payment(&xorname, Some(&garbage), &[]).await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -926,7 +988,7 @@ mod tests { let v = verifier.clone(); handles.push(tokio::spawn(async move { let xorname = [i; 32]; - v.verify_payment(&xorname, None).await + v.verify_payment(&xorname, None, &[]).await })); } @@ -1055,7 +1117,7 @@ mod tests { let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof"); let result = verifier - .verify_payment(&target_xorname, Some(&proof_bytes)) + .verify_payment(&target_xorname, Some(&proof_bytes), &[]) .await; assert!(result.is_err(), "Should reject mismatched content address"); @@ -1128,7 +1190,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject expired quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1159,7 +1223,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject future-timestamped quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1190,7 +1256,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1221,7 +1289,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!( result.is_err(), @@ -1255,7 +1325,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1294,6 +1366,7 @@ mod tests { }, cache_capacity: 100, local_rewards_address: local_addr, + local_peer_id: [0xAAu8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1316,7 +1389,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject payment not addressed to us"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1355,7 +1430,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject wrong peer binding"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1383,7 +1460,7 @@ mod tests { merkle_garbage.extend_from_slice(&[0xAB; 63]); let result = verifier - .verify_payment(&xorname, Some(&merkle_garbage)) + .verify_payment(&xorname, Some(&merkle_garbage), &[]) .await; assert!( @@ -1433,7 +1510,9 @@ mod tests { // verify_payment should process it through the single-node path. // It will fail at quote validation (fake pub_key), but we verify // it passes the deserialization stage by checking the error type. - let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_bytes), &[]) + .await; assert!(result.is_err(), "Should fail at quote validation stage"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1608,7 +1687,7 @@ mod tests { let wrong_xorname = [0xFFu8; 32]; let result = verifier - .verify_payment(&wrong_xorname, Some(&tagged_proof)) + .verify_payment(&wrong_xorname, Some(&tagged_proof), &[]) .await; assert!( @@ -1636,7 +1715,9 @@ mod tests { bad_proof.push(0x00); } - let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&bad_proof), &[]) + .await; assert!(result.is_err(), "Should reject malformed merkle body"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1688,6 +1769,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1803,7 +1885,7 @@ mod tests { let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize"); - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1833,7 +1915,7 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1868,7 +1950,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1902,7 +1986,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!(result.is_err(), "Should reject paid node address mismatch"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1928,7 +2014,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1941,4 +2029,164 @@ mod tests { "Error should mention depth/count mismatch: {err_msg}" ); } + + // ========================================================================= + // Close-group membership validation tests + // ========================================================================= + + #[test] + fn test_close_group_all_peers_recognised_accepted() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers with real ML-DSA keys. + for _ in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + close_group_ids.push(*ant_peer_id.as_bytes()); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + // Verifier whose local_peer_id is NOT one of the proof peers (but that's + // fine — it only needs to be in the known set, and we insert it). + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!( + result.is_ok(), + "All proof peers are in close group — should accept: {result:?}" + ); + } + + #[test] + fn test_close_group_unknown_peer_rejected() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers; include all but the last in the + // close group so one peer is "unknown". + for i in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + // Only add the first N-1 peers to the close group. + if i < CLOSE_GROUP_SIZE - 1 { + close_group_ids.push(*ant_peer_id.as_bytes()); + } + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!(result.is_err(), "One unknown peer — should reject"); + let err_msg = format!("{}", result.expect_err("should fail")); + assert!( + err_msg.contains("not in the current close group"), + "Error should mention close group: {err_msg}" + ); + } + + #[test] + fn test_close_group_empty_skips_validation() { + // With an empty close group (unit test / no DHT), validation is skipped. + let verifier = create_test_verifier(); + + let quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + let keypair = libp2p::identity::Keypair::generate_ed25519(); + let peer_id = libp2p::PeerId::from_public_key(&keypair.public()); + let peer_quotes = vec![(EncodedPeerId::from(peer_id), quote)]; + + let payment = ProofOfPayment { peer_quotes }; + + let result = verifier.validate_close_group_membership(&payment, &[]); + assert!( + result.is_ok(), + "Empty close group should skip validation: {result:?}" + ); + } + + #[test] + fn test_close_group_local_peer_is_implicitly_known() { + let ml_dsa = MlDsa65::new(); + + // Generate a single peer whose BLAKE3 ID we'll set as local_peer_id. + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + + let payment = ProofOfPayment { + peer_quotes: vec![(encoded, quote)], + }; + + // The local_peer_id matches the proof peer, and the close group + // contains at least one entry (so validation isn't skipped) but + // does NOT contain the proof peer — only local_peer_id does. + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: *ant_peer_id.as_bytes(), + }; + let verifier = PaymentVerifier::new(config); + + // Close group has a dummy entry so validation isn't skipped. + let dummy_peer = [0xFFu8; 32]; + let result = verifier.validate_close_group_membership(&payment, &[dummy_peer]); + assert!( + result.is_ok(), + "Proof peer matches local_peer_id — should accept: {result:?}" + ); + } } diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 12e5449..99779b2 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -30,15 +30,17 @@ use crate::ant_protocol::{ ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest, - MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK, - MAX_CHUNK_SIZE, + MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, CLOSE_GROUP_SIZE, + DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, }; use crate::client::compute_address; use crate::error::{Error, Result}; use crate::payment::{PaymentVerifier, QuoteGenerator}; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use saorsa_core::P2PNode; use std::sync::Arc; +use std::sync::OnceLock; use tracing::{debug, info, warn}; /// ANT protocol handler. @@ -53,6 +55,10 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, + /// P2P node for local close-group lookups during quote and payment + /// validation. Initialised via the constructor or [`set_p2p_node`] when + /// the P2P layer starts after the protocol handler (devnet / test nodes). + p2p_node: OnceLock>, } impl AntProtocol { @@ -63,16 +69,60 @@ impl AntProtocol { /// * `storage` - LMDB storage for chunk persistence /// * `payment_verifier` - Payment verifier for validating payments /// * `quote_generator` - Quote generator for creating storage quotes + /// * `p2p_node` - P2P node for local close-group lookups (`None` in unit tests + /// or when the P2P layer is not yet started — see [`set_p2p_node`]) #[must_use] pub fn new( storage: Arc, payment_verifier: Arc, quote_generator: Arc, + p2p_node: Option>, ) -> Self { + let lock = OnceLock::new(); + if let Some(node) = p2p_node { + // Fresh OnceLock — set cannot fail. + let _ = lock.set(node); + } Self { storage, payment_verifier, quote_generator, + p2p_node: lock, + } + } + + /// Inject the P2P node after construction. + /// + /// Used by devnet and test harnesses where the `P2PNode` is created after + /// the `AntProtocol` handler. + /// + /// # Errors + /// + /// Returns the rejected `Arc` if a node was already set. + pub fn set_p2p_node(&self, node: Arc) -> std::result::Result<(), Arc> { + self.p2p_node.set(node) + } + + /// Query the local routing table for the closest peers to `address`. + /// + /// Returns up to `CLOSE_GROUP_SIZE` peer IDs **excluding this node**. + /// The local node is intentionally omitted because `find_closest_nodes_local` + /// filters out self — the caller adds `local_peer_id` separately when + /// building the full close-group set for validation. + /// + /// We request `CLOSE_GROUP_SIZE` (not `CLOSE_GROUP_SIZE - 1`) because this + /// node may not be in the actual close group for the target address — asking + /// for fewer peers could exclude a legitimate member. + async fn local_close_group(&self, address: &[u8; 32]) -> Vec<[u8; 32]> { + match self.p2p_node.get() { + Some(p2p) => p2p + .dht() + .find_closest_nodes_local(address, CLOSE_GROUP_SIZE) + .await + .iter() + .map(|node| *node.peer_id.as_bytes()) + .collect(), + None => Vec::new(), } } @@ -106,7 +156,7 @@ impl AntProtocol { ChunkMessageBody::GetResponse(self.handle_get(req).await) } ChunkMessageBody::QuoteRequest(ref req) => { - ChunkMessageBody::QuoteResponse(self.handle_quote(req)) + ChunkMessageBody::QuoteResponse(self.handle_quote(req).await) } ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => { ChunkMessageBody::MerkleCandidateQuoteResponse( @@ -171,10 +221,17 @@ impl AntProtocol { Ok(false) => {} } - // 4. Verify payment + // 4. Look up local close group for this content address. + let local_close_group = self.local_close_group(&address).await; + + // 5. Verify payment (including close group membership check) let payment_result = self .payment_verifier - .verify_payment(&address, request.payment_proof.as_deref()) + .verify_payment( + &address, + request.payment_proof.as_deref(), + &local_close_group, + ) .await; match payment_result { @@ -191,7 +248,7 @@ impl AntProtocol { } } - // 5. Store chunk + // 6. Store chunk match self.storage.put(&address, &request.content).await { Ok(_) => { let content_len = request.content.len(); @@ -232,7 +289,7 @@ impl AntProtocol { } /// Handle a quote request. - fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { + async fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { let addr_hex = hex::encode(request.address); let data_size = request.data_size; debug!("Handling quote request for {addr_hex} (size: {data_size})"); @@ -265,6 +322,8 @@ impl AntProtocol { }); } + let close_group = self.local_close_group(&request.address).await; + match self .quote_generator .create_quote(request.address, data_size_usize, request.data_type) @@ -275,6 +334,7 @@ impl AntProtocol { Ok(quote_bytes) => ChunkQuoteResponse::Success { quote: quote_bytes, already_stored, + close_group, }, Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!( "Failed to serialize quote: {e}" @@ -416,6 +476,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100_000, local_rewards_address: rewards_address, + local_peer_id: [1u8; 32], }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); let metrics_tracker = QuotingMetricsTracker::new(1000, 100); @@ -434,7 +495,7 @@ mod tests { .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec()) }); - let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator)); + let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator), None); (protocol, temp_dir) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 949fff6..c9ad10c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -38,7 +38,7 @@ //! let storage = Arc::new(LmdbStorage::new(config).await?); //! //! // Create protocol handler -//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator)); +//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator), None); //! //! // Register with saorsa-core //! listener.register_protocol(protocol).await?; diff --git a/tests/e2e/data_types/chunk.rs b/tests/e2e/data_types/chunk.rs index 3822c41..e6a038d 100644 --- a/tests/e2e/data_types/chunk.rs +++ b/tests/e2e/data_types/chunk.rs @@ -440,6 +440,7 @@ mod tests { evm: EvmVerifierConfig { network }, cache_capacity: 100, local_rewards_address: rewards_address, + local_peer_id: [0x01; 32], }); let metrics_tracker = QuotingMetricsTracker::new(1000, 100); let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker); @@ -448,6 +449,7 @@ mod tests { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, ); Ok((protocol, temp_dir, testnet)) diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index e61c1a2..9468e66 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1077,6 +1077,7 @@ impl TestNetwork { }, cache_capacity: TEST_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); @@ -1111,6 +1112,7 @@ impl TestNetwork { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } @@ -1153,6 +1155,13 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + protocol.set_p2p_node(Arc::clone(p2p)).map_err(|_| { + TestnetError::Startup(format!( + "P2P node already set on protocol handler for node {}", + node.index, + )) + })?; let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol);