diff --git a/Cargo.lock b/Cargo.lock index 5dd2b47..42acee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -853,7 +853,7 @@ dependencies = [ "async-stream", "axum", "bytes", - "evmlib", + "evmlib 0.8.0 (git+https://github.com/WithAutonomi/evmlib?rev=a3be57fcb3bd4982bc93ad0b58116255d509db28)", "flate2", "fs2", "futures", @@ -902,7 +902,7 @@ dependencies = [ [[package]] name = "ant-node" version = "0.10.0" -source = "git+https://github.com/WithAutonomi/ant-node?rev=44358c1b4e35cc9aff5841a746bb2b1fcb6dcbee#44358c1b4e35cc9aff5841a746bb2b1fcb6dcbee" +source = "git+https://github.com/WithAutonomi/ant-node?rev=ba5fe8500c5726cbafa43b3df83c7d77ca0d1827#ba5fe8500c5726cbafa43b3df83c7d77ca0d1827" dependencies = [ "aes-gcm-siv", "blake3", @@ -911,7 +911,7 @@ dependencies = [ "clap", "color-eyre", "directories", - "evmlib", + "evmlib 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "flate2", "fs2", "futures", @@ -2436,6 +2436,26 @@ dependencies = [ "xor_name", ] +[[package]] +name = "evmlib" +version = "0.8.0" +source = "git+https://github.com/WithAutonomi/evmlib?rev=a3be57fcb3bd4982bc93ad0b58116255d509db28#a3be57fcb3bd4982bc93ad0b58116255d509db28" +dependencies = [ + "alloy", + "ant-merkle", + "exponential-backoff", + "hex", + "rand 0.8.5", + "rmp-serde", + "serde", + "serde_with", + "thiserror 1.0.69", + "tiny-keccak", + "tokio", + "tracing", + "xor_name", +] + [[package]] name = "exponential-backoff" version = "2.1.0" @@ -4985,9 +5005,9 @@ dependencies = [ [[package]] name = "saorsa-core" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19de0950a0c9c9a3f97681c539e513c52c909866caddef5d825fe2dcbd1b0173" +checksum = "ad51589091bdbfbf8c42b5db37428a1cad0febd0af616ef3fe3a0f38dc921b24" dependencies = [ "anyhow", "async-trait", @@ -5099,9 +5119,9 @@ dependencies = [ [[package]] name = "saorsa-transport" -version = "0.30.1" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbe38ad2be12e4be34e203e79a33eab1664c23a4b65a1ff4b08973ae9af01f01" +checksum = "0e33412e61b0cb630410981a629f967a858b1663be8f71b75eadb66c9588ef26" dependencies = [ "anyhow", "async-trait", diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index a9dbdc9..4f3936e 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -27,7 +27,7 @@ zip = "2" tower-http = { version = "0.6.8", features = ["cors"] } # Data operations -evmlib = "0.8" +evmlib = { git = "https://github.com/WithAutonomi/evmlib", rev = "a3be57fcb3bd4982bc93ad0b58116255d509db28" } xor_name = "5" self_encryption = "0.35.0" futures = "0.3" @@ -38,7 +38,7 @@ tracing = "0.1" bytes = "1" lru = "0.16" rand = "0.8" -ant-node = { git = "https://github.com/WithAutonomi/ant-node", rev = "44358c1b4e35cc9aff5841a746bb2b1fcb6dcbee" } +ant-node = { git = "https://github.com/WithAutonomi/ant-node", rev = "ba5fe8500c5726cbafa43b3df83c7d77ca0d1827" } saorsa-pqc = "0.5" tracing-subscriber = { version = "0.3", features = ["env-filter"] } @@ -101,3 +101,7 @@ path = "tests/e2e_huge_file.rs" [[test]] name = "e2e_upload_costs" path = "tests/e2e_upload_costs.rs" + +[[test]] +name = "e2e_close_group_quorum" +path = "tests/e2e_close_group_quorum.rs" diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 045f586..b529b35 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -14,6 +14,7 @@ use ant_node::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; use evmlib::common::Amount; use evmlib::PaymentQuote; use futures::stream::{FuturesUnordered, StreamExt}; +use std::collections::HashSet; use std::time::Duration; use tracing::{debug, info, warn}; @@ -124,9 +125,10 @@ impl Client { } match rmp_serde::from_slice::("e) { Ok(payment_quote) => { + let close_group = payment_quote.close_group.clone(); let price = payment_quote.price; debug!("Received quote from {peer_id_clone}: price = {price}"); - Some(Ok((payment_quote, price))) + Some(Ok((payment_quote, price, close_group))) } Err(e) => Some(Err(Error::Serialization(format!( "Failed to deserialize quote from {peer_id_clone}: {e}" @@ -156,6 +158,8 @@ impl Client { // Collect all responses with an overall timeout to prevent indefinite stalls. // Over-query means we have 2x peers, so we can tolerate failures. let mut quotes = Vec::with_capacity(over_query_count); + let mut close_group_views: Vec<(PeerId, Vec<[u8; 32]>)> = + Vec::with_capacity(over_query_count); let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new(); let mut failures: Vec = Vec::new(); @@ -163,7 +167,8 @@ impl Client { tokio::time::timeout(overall_timeout, async { while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { match quote_result { - Ok((quote, price)) => { + Ok((quote, price, close_group)) => { + close_group_views.push((peer_id, close_group)); quotes.push((peer_id, addrs, quote, price)); } Err(Error::AlreadyStored) => { @@ -221,29 +226,499 @@ impl Client { } } - if quotes.len() >= CLOSE_GROUP_SIZE { - let total_responses = quotes.len() + failures.len() + already_stored_peers.len(); + if quotes.len() < CLOSE_GROUP_SIZE { + return Err(Error::InsufficientPeers(format!( + "Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", + quotes.len(), + failures.join("; ") + ))); + } + + // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE. + // We over-queried, so trim to the true close group before validation. + quotes.sort_by(|a, b| { + let dist_a = xor_distance(&a.0, address); + let dist_b = xor_distance(&b.0, address); + dist_a.cmp(&dist_b) + }); + quotes.truncate(CLOSE_GROUP_SIZE); + + // Also trim close_group_views to match the kept quotes. + let kept_peers: HashSet<[u8; 32]> = quotes + .iter() + .map(|(pid, _, _, _)| *pid.as_bytes()) + .collect(); + close_group_views.retain(|(pid, _)| kept_peers.contains(pid.as_bytes())); + + // Validate close-group quorum: each responding peer should recognize + // most of the other queried peers in its own close-group view. + let quorum_indices = Self::validate_close_group_quorum("es, &close_group_views)?; + + // Reorder quotes so quorum members come first. These peers mutually + // recognize each other and are most likely to accept payment proofs, + // so chunk_put_to_close_group should target them as the initial set. + let quorum_set: HashSet = quorum_indices.iter().copied().collect(); + let (quorum, non_quorum): (Vec<_>, Vec<_>) = quotes + .into_iter() + .enumerate() + .partition(|(i, _)| quorum_set.contains(i)); + let reordered: Vec<_> = quorum + .into_iter() + .chain(non_quorum) + .map(|(_, q)| q) + .collect(); + + info!( + "Collected {} quotes for address {} (close-group quorum verified, {} quorum members prioritized)", + reordered.len(), + hex::encode(address), + quorum_indices.len(), + ); + Ok(reordered) + } + + /// Validate close-group quorum by finding the largest subset of queried + /// peers that mutually recognize each other. + /// + /// "Mutual recognition" means: for every pair (P, Q) in the subset, + /// Q appears in P's close-group view. This matches the server-side + /// `CLOSE_GROUP_MAJORITY` threshold that nodes enforce during payment + /// verification. + /// + /// Fails if no mutually-recognizing subset of size `CLOSE_GROUP_MAJORITY` + /// exists. Returns the indices of the quorum members on success so + /// callers can prioritize those peers for uploads. + fn validate_close_group_quorum( + quotes: &[(PeerId, Vec, PaymentQuote, Amount)], + close_group_views: &[(PeerId, Vec<[u8; 32]>)], + ) -> Result> { + let peer_ids: Vec<[u8; 32]> = quotes + .iter() + .map(|(peer_id, _, _, _)| *peer_id.as_bytes()) + .collect(); + + // Build a lookup: peer_bytes → set of peers it recognizes + let views: Vec<([u8; 32], HashSet<[u8; 32]>)> = close_group_views + .iter() + .map(|(peer_id, view)| (*peer_id.as_bytes(), view.iter().copied().collect())) + .collect(); - // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE. - quotes.sort_by(|a, b| { - let dist_a = xor_distance(&a.0, address); - let dist_b = xor_distance(&b.0, address); - dist_a.cmp(&dist_b) - }); - quotes.truncate(CLOSE_GROUP_SIZE); + // Check subsets from largest to smallest (CLOSE_GROUP_SIZE down to + // CLOSE_GROUP_MAJORITY). For CLOSE_GROUP_SIZE=5 this is at most + // C(5,5) + C(5,4) + C(5,3) = 1 + 5 + 10 = 16 checks. + let quorum_indices = Self::find_largest_mutual_subset(&peer_ids, &views); + if quorum_indices.len() >= CLOSE_GROUP_MAJORITY { info!( - "Collected {} quotes for address {} (from {total_responses} responses)", - quotes.len(), - hex::encode(address), + "Close-group quorum passed: {}/{} peers mutually recognize each other", + quorum_indices.len(), + peer_ids.len() ); - return Ok(quotes); + Ok(quorum_indices) + } else { + Err(Error::CloseGroupQuorumFailure(format!( + "Largest mutually-recognizing subset is {} peers (need {CLOSE_GROUP_MAJORITY})", + quorum_indices.len() + ))) + } + } + + /// Find the largest subset of `peer_ids` where every peer in the subset + /// appears in every other peer's close-group view. + /// + /// Returns the indices of the members in the largest mutual subset, + /// or an empty vec if no subset of size `CLOSE_GROUP_MAJORITY` exists. + fn find_largest_mutual_subset( + peer_ids: &[[u8; 32]], + views: &[([u8; 32], HashSet<[u8; 32]>)], + ) -> Vec { + let n = peer_ids.len(); + + // Try subset sizes from largest to smallest. + for size in (CLOSE_GROUP_MAJORITY..=n).rev() { + // Iterate all index combinations of the given size. + let mut indices: Vec = (0..size).collect(); + loop { + if Self::is_mutual_subset(peer_ids, views, &indices) { + return indices; + } + if !Self::next_combination(&mut indices, n) { + break; + } + } + } + + vec![] + } + + /// Check whether the peers at the given indices mutually recognize each other. + fn is_mutual_subset( + peer_ids: &[[u8; 32]], + views: &[([u8; 32], HashSet<[u8; 32]>)], + indices: &[usize], + ) -> bool { + for &i in indices { + // Find this peer's view + let peer_bytes = peer_ids[i]; + let view = views + .iter() + .find(|(id, _)| *id == peer_bytes) + .map(|(_, v)| v); + + let Some(view) = view else { + return false; + }; + + // Every OTHER peer in the subset must appear in this peer's view + for &j in indices { + if i == j { + continue; + } + if !view.contains(&peer_ids[j]) { + return false; + } + } + } + true + } + + /// Advance an index combination to the next one in lexicographic order. + /// Returns false when all combinations have been exhausted. + fn next_combination(indices: &mut [usize], n: usize) -> bool { + let k = indices.len(); + // Find the rightmost index that can be incremented + let mut i = k; + while i > 0 { + i -= 1; + if indices[i] < n - k + i { + indices[i] += 1; + // Reset all subsequent indices + for j in (i + 1)..k { + indices[j] = indices[j - 1] + 1; + } + return true; + } + } + false + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use ant_node::core::{MultiAddr, PeerId}; + use ant_node::CLOSE_GROUP_SIZE; + use evmlib::common::Amount; + use evmlib::{PaymentQuote, RewardsAddress}; + use std::collections::HashSet; + use std::net::{Ipv4Addr, SocketAddr}; + use std::time::SystemTime; + + /// Create a deterministic peer ID from an index byte. + fn peer(id: u8) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[0] = id; + bytes + } + + /// Build a views array where each peer's view contains the given neighbors. + fn make_views(entries: &[([u8; 32], &[[u8; 32]])]) -> Vec<([u8; 32], HashSet<[u8; 32]>)> { + entries + .iter() + .map(|(id, neighbors)| (*id, neighbors.iter().copied().collect())) + .collect() + } + + // ─── next_combination ────────────────────────────────────────────── + + #[test] + fn next_combination_enumerates_all_c_5_3() { + let n = 5; + let k = 3; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; // first combination is [0,1,2] + while Client::next_combination(&mut indices, n) { + count += 1; + } + // C(5,3) = 10 + assert_eq!(count, 10); + } + + #[test] + fn next_combination_enumerates_all_c_5_5() { + let n = 5; + let k = 5; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; + while Client::next_combination(&mut indices, n) { + count += 1; + } + // C(5,5) = 1 + assert_eq!(count, 1); + } + + #[test] + fn next_combination_single_element() { + let n = 5; + let k = 1; + let mut indices: Vec = (0..k).collect(); + let mut count = 1; + while Client::next_combination(&mut indices, n) { + count += 1; } + // C(5,1) = 5 + assert_eq!(count, 5); + } + + #[test] + fn next_combination_empty_returns_false() { + let mut indices: Vec = vec![]; + assert!(!Client::next_combination(&mut indices, 5)); + } + + // ─── is_mutual_subset ────────────────────────────────────────────── + + #[test] + fn is_mutual_subset_two_peers_recognize_each_other() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + let views = make_views(&[(a, &[b]), (b, &[a])]); + + assert!(Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + #[test] + fn is_mutual_subset_asymmetric_fails() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + // A sees B, but B does NOT see A + let views = make_views(&[(a, &[b]), (b, &[])]); + + assert!(!Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + #[test] + fn is_mutual_subset_missing_view_returns_false() { + let a = peer(1); + let b = peer(2); + let peer_ids = vec![a, b]; + // Only A has a view entry; B is missing entirely + let views = make_views(&[(a, &[b])]); + + assert!(!Client::is_mutual_subset(&peer_ids, &views, &[0, 1])); + } + + // ─── find_largest_mutual_subset ──────────────────────────────────── + + #[test] + fn all_five_peers_mutually_recognize() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3], peers[4]]), + (peers[1], &[peers[0], peers[2], peers[3], peers[4]]), + (peers[2], &[peers[0], peers[1], peers[3], peers[4]]), + (peers[3], &[peers[0], peers[1], peers[2], peers[4]]), + (peers[4], &[peers[0], peers[1], peers[2], peers[3]]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 5); + assert_eq!(result, vec![0, 1, 2, 3, 4]); + } + + #[test] + fn three_of_five_mutually_recognize() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Peers 0,1,2 see each other; peers 3,4 have empty views + let views = make_views(&[ + (peers[0], &[peers[1], peers[2]]), + (peers[1], &[peers[0], peers[2]]), + (peers[2], &[peers[0], peers[1]]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 3); + assert_eq!(result, vec![0, 1, 2]); + } + + #[test] + fn two_of_five_below_majority() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Only peers 0 and 1 see each other + let views = make_views(&[ + (peers[0], &[peers[1]]), + (peers[1], &[peers[0]]), + (peers[2], &[]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + // Largest mutual subset is 2, below CLOSE_GROUP_MAJORITY (3) + assert!(Client::find_largest_mutual_subset(&peers, &views).is_empty()); + } + + #[test] + fn empty_views_returns_zero() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + let views = make_views(&[ + (peers[0], &[]), + (peers[1], &[]), + (peers[2], &[]), + (peers[3], &[]), + (peers[4], &[]), + ]); + + assert!(Client::find_largest_mutual_subset(&peers, &views).is_empty()); + } + + #[test] + fn four_of_five_one_rogue_peer() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // Peer 4 doesn't recognize anyone, but the other 4 form a clique + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3]]), + (peers[1], &[peers[0], peers[2], peers[3]]), + (peers[2], &[peers[0], peers[1], peers[3]]), + (peers[3], &[peers[0], peers[1], peers[2]]), + (peers[4], &[]), + ]); + + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 4); + assert_eq!(result, vec![0, 1, 2, 3]); + } + + #[test] + fn asymmetric_recognition_reduces_clique() { + let peers: Vec<[u8; 32]> = (1..=5).map(peer).collect(); + // All see each other except: peer 3 does NOT see peer 0 + let views = make_views(&[ + (peers[0], &[peers[1], peers[2], peers[3], peers[4]]), + (peers[1], &[peers[0], peers[2], peers[3], peers[4]]), + (peers[2], &[peers[0], peers[1], peers[3], peers[4]]), + (peers[3], &[peers[1], peers[2], peers[4]]), // missing peers[0] + (peers[4], &[peers[0], peers[1], peers[2], peers[3]]), + ]); + + // {0,1,2,3,4} fails (3 doesn't see 0). Both {0,1,2,4} and {1,2,3,4} + // are valid 4-peer cliques; the algorithm returns the first found in + // lexicographic order. + let result = Client::find_largest_mutual_subset(&peers, &views); + assert_eq!(result.len(), 4); + assert_eq!(result, vec![0, 1, 2, 4]); + } + + // ─── validate_close_group_quorum (integration) ───────────────────── + + fn make_test_quote() -> PaymentQuote { + PaymentQuote { + content: xor_name::XorName([0u8; 32]), + timestamp: SystemTime::now(), + price: Amount::ZERO, + rewards_address: RewardsAddress::new([0u8; 20]), + close_group: vec![], + pub_key: vec![], + signature: vec![], + } + } + + fn make_dummy_addr() -> Vec { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 12345); + vec![MultiAddr::quic(addr)] + } + + type Quotes = Vec<(PeerId, Vec, PaymentQuote, Amount)>; + type CloseGroupViews = Vec<(PeerId, Vec<[u8; 32]>)>; + + fn make_quotes_and_views( + peer_ids: &[PeerId], + neighbor_map: &[Vec<[u8; 32]>], + ) -> (Quotes, CloseGroupViews) { + let quotes: Quotes = peer_ids + .iter() + .map(|pid| (*pid, make_dummy_addr(), make_test_quote(), Amount::ZERO)) + .collect(); + + let views: CloseGroupViews = peer_ids + .iter() + .zip(neighbor_map.iter()) + .map(|(pid, neighbors)| (*pid, neighbors.clone())) + .collect(); + + (quotes, views) + } + + #[test] + fn validate_quorum_all_mutual_passes() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + let neighbor_map: Vec> = peer_ids + .iter() + .map(|me| { + peer_ids + .iter() + .filter(|p| p != &me) + .map(|p| *p.as_bytes()) + .collect() + }) + .collect(); + + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let indices = Client::validate_close_group_quorum("es, &views).unwrap(); + assert_eq!(indices.len(), CLOSE_GROUP_SIZE); + } + + #[test] + fn validate_quorum_empty_views_fails() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + let neighbor_map: Vec> = vec![vec![]; CLOSE_GROUP_SIZE]; + + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let result = Client::validate_close_group_quorum("es, &views); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::CloseGroupQuorumFailure(_) + )); + } + + #[test] + fn validate_quorum_exactly_majority_passes() { + let peer_ids: Vec = (1..=CLOSE_GROUP_SIZE) + .map(|i| PeerId::from_bytes(peer(i as u8))) + .collect(); + + // Only first CLOSE_GROUP_MAJORITY peers see each other + let majority = &peer_ids[..CLOSE_GROUP_MAJORITY]; + let neighbor_map: Vec> = peer_ids + .iter() + .map(|me| { + if majority.contains(me) { + majority + .iter() + .filter(|p| p != &me) + .map(|p| *p.as_bytes()) + .collect() + } else { + vec![] + } + }) + .collect(); - Err(Error::InsufficientPeers(format!( - "Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", - quotes.len(), - failures.join("; ") - ))) + let (quotes, views) = make_quotes_and_views(&peer_ids, &neighbor_map); + let indices = Client::validate_close_group_quorum("es, &views).unwrap(); + assert_eq!(indices.len(), CLOSE_GROUP_MAJORITY); + // Only the first 3 peers (indices 0,1,2) form the quorum + assert_eq!(indices, vec![0, 1, 2]); } } diff --git a/ant-core/src/data/error.rs b/ant-core/src/data/error.rs index e571fa5..9c23bd2 100644 --- a/ant-core/src/data/error.rs +++ b/ant-core/src/data/error.rs @@ -84,6 +84,12 @@ pub enum Error { /// Root cause description. reason: String, }, + + /// Close group quorum check failed — the queried peers do not mutually + /// recognize each other as close group members, so payment would not + /// result in durable storage. + #[error("close group quorum failure: {0}")] + CloseGroupQuorumFailure(String), } impl From for Error { diff --git a/ant-core/tests/e2e_close_group_quorum.rs b/ant-core/tests/e2e_close_group_quorum.rs new file mode 100644 index 0000000..676ed2d --- /dev/null +++ b/ant-core/tests/e2e_close_group_quorum.rs @@ -0,0 +1,279 @@ +//! E2E tests for close group quorum validation with signed quotes. +//! +//! Verifies the full flow: quotes contain a signed close_group field, +//! the client validates quorum using those views, payment succeeds, +//! and nodes accept the proof (including close group membership checks). + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +mod support; + +use ant_core::data::{compute_address, Client, ClientConfig}; +use ant_node::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; +use bytes::Bytes; +use evmlib::PaymentQuote; +use serial_test::serial; +use std::collections::HashSet; +use std::sync::Arc; +use support::MiniTestnet; + +const CHUNK_DATA_TYPE: u32 = 0; +/// More nodes than DEFAULT_NODE_COUNT to tolerate the 2x over-query. +/// With only CLOSE_GROUP_SIZE+1 nodes, a single peer timeout causes +/// failure since there are zero spare peers. +const QUORUM_TEST_NODE_COUNT: usize = 10; + +async fn setup() -> (Client, MiniTestnet) { + let testnet = MiniTestnet::start(QUORUM_TEST_NODE_COUNT).await; + let node = testnet.node(3).expect("Node 3 should exist"); + let client = Client::from_node(Arc::clone(&node), ClientConfig::default()) + .with_wallet(testnet.wallet().clone()); + (client, testnet) +} + +// ─── Test 1: Quotes contain non-empty signed close_group ─────────────────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_quotes_contain_signed_close_group() { + let (client, testnet) = setup().await; + + let content = Bytes::from("close group quorum test payload"); + let address = compute_address(&content); + let data_size = content.len() as u64; + + let quotes = client + .get_store_quotes(&address, data_size, CHUNK_DATA_TYPE) + .await + .expect("get_store_quotes should succeed"); + + assert_eq!( + quotes.len(), + CLOSE_GROUP_SIZE, + "should get exactly CLOSE_GROUP_SIZE quotes" + ); + + // Every quote must have a non-empty close_group field + for (peer_id, _addrs, quote, _price) in "es { + assert!( + !quote.close_group.is_empty(), + "Quote from {peer_id} should have non-empty close_group" + ); + } + + drop(client); + testnet.teardown().await; +} + +// ─── Test 2: Close group views have mutual recognition (quorum) ──────────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_close_group_mutual_recognition() { + let (client, testnet) = setup().await; + + let content = Bytes::from("mutual recognition quorum test"); + let address = compute_address(&content); + let data_size = content.len() as u64; + + let quotes = client + .get_store_quotes(&address, data_size, CHUNK_DATA_TYPE) + .await + .expect("get_store_quotes should succeed"); + + // Build peer_id -> close_group_view map + let peer_ids: Vec<[u8; 32]> = quotes + .iter() + .map(|(pid, _, _, _)| *pid.as_bytes()) + .collect(); + + let views: Vec> = quotes + .iter() + .map(|(_, _, quote, _)| quote.close_group.iter().copied().collect()) + .collect(); + + // Count how many peers mutually recognize each other. + // For each pair (i, j), both i's view must contain j AND j's view must contain i. + let mut mutual_pairs = 0usize; + for i in 0..peer_ids.len() { + for j in (i + 1)..peer_ids.len() { + if views[i].contains(&peer_ids[j]) && views[j].contains(&peer_ids[i]) { + mutual_pairs += 1; + } + } + } + + // A quorum of CLOSE_GROUP_MAJORITY mutually-recognizing peers requires + // at least C(CLOSE_GROUP_MAJORITY, 2) mutual pairs. + let min_pairs = CLOSE_GROUP_MAJORITY * (CLOSE_GROUP_MAJORITY - 1) / 2; + assert!( + mutual_pairs >= min_pairs, + "Expected at least {min_pairs} mutual recognition pairs, got {mutual_pairs}" + ); + + drop(client); + testnet.teardown().await; +} + +// ─── Test 3: Quorum members are prioritized (appear first in quotes) ─────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_quorum_members_prioritized_in_quotes() { + let (client, testnet) = setup().await; + + let content = Bytes::from("quorum prioritization test"); + let address = compute_address(&content); + let data_size = content.len() as u64; + + let quotes = client + .get_store_quotes(&address, data_size, CHUNK_DATA_TYPE) + .await + .expect("get_store_quotes should succeed"); + + // The first CLOSE_GROUP_MAJORITY peers should form a mutual clique + // (get_store_quotes reorders so quorum members come first). + let first_majority: Vec<[u8; 32]> = quotes[..CLOSE_GROUP_MAJORITY] + .iter() + .map(|(pid, _, _, _)| *pid.as_bytes()) + .collect(); + + let first_views: Vec> = quotes[..CLOSE_GROUP_MAJORITY] + .iter() + .map(|(_, _, quote, _)| quote.close_group.iter().copied().collect()) + .collect(); + + // Verify mutual recognition within the first CLOSE_GROUP_MAJORITY peers + for (i, peer_i) in first_majority.iter().enumerate() { + for (j, peer_j) in first_majority.iter().enumerate() { + if i == j { + continue; + } + assert!( + first_views[i].contains(peer_j), + "Quorum member {} should recognize quorum member {} in its close group view", + hex::encode(peer_i), + hex::encode(peer_j), + ); + } + } + + drop(client); + testnet.teardown().await; +} + +// ─── Test 4: Full payment flow with quorum validation ────────────────────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_paid_upload_with_close_group_quorum() { + let (client, testnet) = setup().await; + + let content = Bytes::from("full quorum payment flow test"); + let address = client + .chunk_put(content.clone()) + .await + .expect("chunk_put should succeed with quorum validation"); + + let expected_address = compute_address(&content); + assert_eq!(address, expected_address); + + // Verify the chunk is retrievable + let retrieved = client + .chunk_get(&address) + .await + .expect("chunk_get should succeed"); + let chunk = retrieved.expect("Chunk should be found after quorum-validated upload"); + assert_eq!(chunk.content.as_ref(), content.as_ref()); + + drop(client); + testnet.teardown().await; +} + +// ─── Test 5: Quote close_group is bound by signature ─────────────────────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_quote_close_group_is_signature_bound() { + let (client, testnet) = setup().await; + + let content = Bytes::from("signature binding test payload"); + let address = compute_address(&content); + let data_size = content.len() as u64; + + let quotes = client + .get_store_quotes(&address, data_size, CHUNK_DATA_TYPE) + .await + .expect("get_store_quotes should succeed"); + + // Pick the first quote and verify its signature covers the close_group + let (_peer_id, _addrs, quote, _price) = "es[0]; + + // Reconstruct signing bytes and verify they include close_group + let signing_bytes = PaymentQuote::bytes_for_signing( + quote.content, + quote.timestamp, + "e.price, + "e.rewards_address, + "e.close_group, + ); + + // The self-computed signing bytes should match + let self_bytes = quote.bytes_for_sig(); + assert_eq!( + signing_bytes, self_bytes, + "bytes_for_signing with close_group should match bytes_for_sig" + ); + + // Verify that changing the close_group produces different signing bytes + let tampered_bytes = PaymentQuote::bytes_for_signing( + quote.content, + quote.timestamp, + "e.price, + "e.rewards_address, + &[[0xFFu8; 32]], // fake close group + ); + assert_ne!( + signing_bytes, tampered_bytes, + "Tampered close_group should produce different signing bytes" + ); + + drop(client); + testnet.teardown().await; +} + +// ─── Test 6: Multiple uploads reuse quorum validation ────────────────────── + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_multiple_uploads_with_quorum() { + let (client, testnet) = setup().await; + + let payloads = [ + Bytes::from("quorum multi-upload chunk A"), + Bytes::from("quorum multi-upload chunk B"), + Bytes::from("quorum multi-upload chunk C"), + ]; + + let mut addresses = Vec::new(); + for payload in &payloads { + let addr = client + .chunk_put(payload.clone()) + .await + .expect("chunk_put should succeed"); + addresses.push(addr); + } + + // Verify all chunks are retrievable + for (addr, payload) in addresses.iter().zip(payloads.iter()) { + let chunk = client + .chunk_get(addr) + .await + .expect("chunk_get should succeed") + .expect("chunk should exist"); + assert_eq!(chunk.content.as_ref(), payload.as_ref()); + } + + drop(client); + testnet.teardown().await; +} diff --git a/ant-core/tests/support/mod.rs b/ant-core/tests/support/mod.rs index 166a130..f3676f7 100644 --- a/ant-core/tests/support/mod.rs +++ b/ant-core/tests/support/mod.rs @@ -246,6 +246,7 @@ impl MiniTestnet { }, cache_capacity: 1000, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); let metrics_tracker = QuotingMetricsTracker::new(TEST_MAX_RECORDS); @@ -271,6 +272,7 @@ impl MiniTestnet { storage, payment_verifier, Arc::new(quote_generator), + Some(Arc::clone(&node)), )); // Start message handler loop