diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2ebec1d3..ffaec47e 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -53,7 +53,7 @@ enum RetryMessage { pub(crate) struct PendingRequest { pub(crate) attempts: u32, - pub(crate) last_peer: Option, + pub(crate) failed_peers: HashSet, } #[allow(clippy::too_many_arguments)] diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 9e50ea66..3deba876 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use ethlambda_storage::Store; use libp2p::{PeerId, request_response}; use rand::seq::SliceRandom; @@ -198,9 +200,34 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { return false; } - // Select random peer - let peers: Vec<_> = server.connected_peers.iter().copied().collect(); - let peer = match peers.choose(&mut rand::thread_rng()) { + // Exclude peers that already returned empty responses for this root + let failed = server.pending_requests.get(&root).map(|p| &p.failed_peers); + let pool: Vec<_> = if failed.is_none_or(|f| f.is_empty()) { + server.connected_peers.iter().copied().collect() + } else { + let failed = failed.unwrap(); + server + .connected_peers + .iter() + .copied() + .filter(|p| !failed.contains(p)) + .collect() + }; + + // Fall back to full set if all peers have failed (new peers may have connected, + // or previously-failing peers may have caught up). Clear failed_peers so subsequent + // retries start a fresh round of elimination. + let pool = if pool.is_empty() { + warn!(%root, "All peers failed for this block, retrying with full peer set"); + if let Some(pending) = server.pending_requests.get_mut(&root) { + pending.failed_peers.clear(); + } + server.connected_peers.iter().copied().collect() + } else { + pool + }; + + let peer = match pool.choose(&mut rand::thread_rng()) { Some(&p) => p, None => { warn!(%root, "Failed to select random peer"); @@ -216,7 +243,8 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { } let request = BlocksByRootRequest { roots }; - info!(%peer, %root, "Sending BlocksByRoot request for missing block"); + let excluded = server.connected_peers.len() - pool.len(); + info!(%peer, %root, excluded, "Sending BlocksByRoot request for missing block"); let request_id = server .swarm .behaviour_mut() @@ -228,17 +256,14 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { ); // Track the request if not already tracked (new request) - let pending = server + server .pending_requests .entry(root) .or_insert(PendingRequest { attempts: 1, - last_peer: None, + failed_peers: HashSet::new(), }); - // Update last_peer - pending.last_peer = Some(peer); - // Map request_id to root for failure handling server.request_id_map.insert(request_id, root); @@ -250,6 +275,8 @@ async fn handle_fetch_failure(server: &mut P2PServer, root: H256, peer: PeerId) return; }; + pending.failed_peers.insert(peer); + if pending.attempts >= MAX_FETCH_RETRIES { error!(%root, %peer, attempts=%pending.attempts, "Block fetch failed after max retries, giving up");