From 38b967481b05d26349e242a34173f8e07a98fa7c Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 6 Mar 2026 15:10:51 -0300 Subject: [PATCH] Convert P2P layer from manual tokio::select! event loop to spawned-concurrency actor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the monolithic P2PServer into two components for better separation of concerns and improved swarm liveness: - SwarmDriver (plain tokio task): Owns the libp2p Swarm, polls it continuously, decodes gossip/req-resp events, and forwards typed protocol messages to the actor. Executes SwarmCommands (publish, dial, send_request, send_response) from the actor. - P2PServer (spawned actor): All P2P business logic — peer tracking, request correlation, retries via send_after, gossip publishing. Receives decoded events from SwarmDriver and P2PMessages from BlockChain via a bridge task. This ensures the swarm keeps being polled even when the actor is busy processing messages, matching the actor pattern already used by BlockChain. --- Cargo.lock | 1 + bin/ethlambda/src/main.rs | 29 +- crates/net/p2p/Cargo.toml | 1 + crates/net/p2p/src/gossipsub/handler.rs | 206 ----- crates/net/p2p/src/gossipsub/mod.rs | 6 +- crates/net/p2p/src/lib.rs | 1051 +++++++++++++++++------ crates/net/p2p/src/req_resp/handlers.rs | 253 +----- crates/net/p2p/src/req_resp/mod.rs | 4 +- crates/net/p2p/src/swarm_driver.rs | 340 ++++++++ 9 files changed, 1142 insertions(+), 749 deletions(-) delete mode 100644 crates/net/p2p/src/gossipsub/handler.rs create mode 100644 crates/net/p2p/src/swarm_driver.rs diff --git a/Cargo.lock b/Cargo.lock index 85d759f8..071e13c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2112,6 +2112,7 @@ dependencies = [ "rand 0.8.5", "sha2", "snap", + "spawned-concurrency 0.4.5 (git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc)", "ssz_types", "tokio", "tracing", diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 984b20f2..fc181cf8 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -18,6 +18,7 @@ use std::{ }; use clap::Parser; +use ethlambda_blockchain::P2PMessage; use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p}; use ethlambda_types::primitives::H256; use ethlambda_types::{ @@ -132,23 +133,39 @@ async fn main() -> eyre::Result<()> { .await .inspect_err(|err| error!(%err, "Failed to initialize state"))?; - let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); + let (p2p_tx, mut p2p_rx) = tokio::sync::mpsc::unbounded_channel(); // Use first validator ID for subnet subscription let first_validator_id = validator_keys.keys().min().copied(); let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); - let p2p_handle = tokio::spawn(start_p2p( + let (p2p, driver_handle) = start_p2p( node_p2p_key, bootnodes, p2p_socket, blockchain, - p2p_rx, store.clone(), first_validator_id, options.attestation_committee_count, options.is_aggregator, - )); + ) + .await + .expect("Failed to start P2P"); + + // Bridge: forward P2PMessages from blockchain to the P2P actor + let p2p_bridge = p2p.clone(); + tokio::spawn(async move { + while let Some(msg) = p2p_rx.recv().await { + match msg { + P2PMessage::PublishAttestation(a) => p2p_bridge.publish_attestation(a), + P2PMessage::PublishBlock(b) => p2p_bridge.publish_block(b), + P2PMessage::PublishAggregatedAttestation(a) => { + p2p_bridge.publish_aggregated_attestation(a) + } + P2PMessage::FetchBlock(root) => p2p_bridge.fetch_block(root), + } + } + }); ethlambda_rpc::start_rpc_server(metrics_socket, store) .await @@ -157,8 +174,8 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); tokio::select! { - result = p2p_handle => { - panic!("P2P node task has exited unexpectedly: {result:?}"); + result = driver_handle => { + panic!("P2P SwarmDriver has exited unexpectedly: {result:?}"); } _ = tokio::signal::ctrl_c() => { // Ctrl-C received, shutting down diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 9c26e343..d5063bd9 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -16,6 +16,7 @@ ethlambda-metrics.workspace = true ethlambda-types.workspace = true async-trait = "0.1" +spawned-concurrency.workspace = true # Fork with request-response feature for outbound protocol selection libp2p = { git = "https://github.com/lambdaclass/rust-libp2p.git", rev = "cd6cc3b1e5db2c5e23e133c2201c23b063fc4895", features = [ diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs deleted file mode 100644 index 8f156a22..00000000 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ /dev/null @@ -1,206 +0,0 @@ -use ethlambda_types::{ - ShortRoot, - attestation::{SignedAggregatedAttestation, SignedAttestation}, - block::SignedBlockWithAttestation, - primitives::ssz::{Decode, Encode, TreeHash}, -}; -use libp2p::gossipsub::Event; -use tracing::{error, info, trace}; - -use super::{ - encoding::{compress_message, decompress_message}, - messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, -}; -use crate::P2PServer; - -pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { - let Event::Message { - propagation_source: _, - message_id: _, - message, - } = event - else { - unreachable!("we already matched on event_loop"); - }; - let topic_kind = message.topic.as_str().split("/").nth(3); - match topic_kind { - Some(BLOCK_TOPIC_KIND) => { - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) - else { - return; - }; - - let Ok(signed_block) = SignedBlockWithAttestation::from_ssz_bytes(&uncompressed_data) - .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) - else { - return; - }; - let slot = signed_block.message.block.slot; - let block_root = signed_block.message.block.tree_hash_root(); - let proposer = signed_block.message.block.proposer_index; - let parent_root = signed_block.message.block.parent_root; - let attestation_count = signed_block.message.block.body.attestations.len(); - info!( - %slot, - proposer, - block_root = %ShortRoot(&block_root.0), - parent_root = %ShortRoot(&parent_root.0), - attestation_count, - "Received block from gossip" - ); - server.blockchain.notify_new_block(signed_block); - } - Some(AGGREGATION_TOPIC_KIND) => { - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) - else { - return; - }; - - let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) - .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) - else { - return; - }; - let slot = aggregation.data.slot; - info!( - %slot, - target_slot = aggregation.data.target.slot, - target_root = %ShortRoot(&aggregation.data.target.root.0), - source_slot = aggregation.data.source.slot, - source_root = %ShortRoot(&aggregation.data.source.root.0), - "Received aggregated attestation from gossip" - ); - server - .blockchain - .notify_new_aggregated_attestation(aggregation); - } - Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) - else { - return; - }; - - let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) - .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) - else { - return; - }; - let slot = signed_attestation.data.slot; - let validator = signed_attestation.validator_id; - info!( - %slot, - validator, - head_root = %ShortRoot(&signed_attestation.data.head.root.0), - target_slot = signed_attestation.data.target.slot, - target_root = %ShortRoot(&signed_attestation.data.target.root.0), - source_slot = signed_attestation.data.source.slot, - source_root = %ShortRoot(&signed_attestation.data.source.root.0), - "Received attestation from gossip" - ); - server.blockchain.notify_new_attestation(signed_attestation); - } - _ => { - trace!("Received message on unknown topic: {}", message.topic); - } - } -} - -pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) { - let slot = attestation.data.slot; - let validator = attestation.validator_id; - - // Encode to SSZ - let ssz_bytes = attestation.as_ssz_bytes(); - - // Compress with raw snappy - let compressed = compress_message(&ssz_bytes); - - // Publish to the attestation subnet topic - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.attestation_topic.clone(), compressed) - .inspect(|_| info!( - %slot, - validator, - target_slot = attestation.data.target.slot, - target_root = %ShortRoot(&attestation.data.target.root.0), - source_slot = attestation.data.source.slot, - source_root = %ShortRoot(&attestation.data.source.root.0), - "Published attestation to gossipsub" - )) - .inspect_err(|err| { - tracing::warn!(%slot, %validator, %err, "Failed to publish attestation to gossipsub") - }); -} - -pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWithAttestation) { - let slot = signed_block.message.block.slot; - let proposer = signed_block.message.block.proposer_index; - let block_root = signed_block.message.block.tree_hash_root(); - let parent_root = signed_block.message.block.parent_root; - let attestation_count = signed_block.message.block.body.attestations.len(); - - // Encode to SSZ - let ssz_bytes = signed_block.as_ssz_bytes(); - - // Compress with raw snappy - let compressed = compress_message(&ssz_bytes); - - // Publish to gossipsub - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.block_topic.clone(), compressed) - .inspect(|_| { - info!( - %slot, - proposer, - block_root = %ShortRoot(&block_root.0), - parent_root = %ShortRoot(&parent_root.0), - attestation_count, - "Published block to gossipsub" - ) - }) - .inspect_err( - |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), - ); -} - -pub async fn publish_aggregated_attestation( - server: &mut P2PServer, - attestation: SignedAggregatedAttestation, -) { - let slot = attestation.data.slot; - - // Encode to SSZ - let ssz_bytes = attestation.as_ssz_bytes(); - - // Compress with raw snappy - let compressed = compress_message(&ssz_bytes); - - // Publish to the aggregation topic - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.aggregation_topic.clone(), compressed) - .inspect(|_| { - info!( - %slot, - target_slot = attestation.data.target.slot, - target_root = %ShortRoot(&attestation.data.target.root.0), - source_slot = attestation.data.source.slot, - source_root = %ShortRoot(&attestation.data.source.root.0), - "Published aggregated attestation to gossipsub" - ) - }) - .inspect_err(|err| { - tracing::warn!(%slot, %err, "Failed to publish aggregated attestation to gossipsub") - }); -} diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index a6924286..2e807e0e 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -1,8 +1,4 @@ -mod encoding; -mod handler; +pub(crate) mod encoding; mod messages; -pub use handler::{ - handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, -}; pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2ebec1d3..00fe5fca 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -1,42 +1,58 @@ use std::{ collections::{HashMap, HashSet}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::{Arc, Mutex}, time::Duration, }; -use ethlambda_blockchain::{BlockChain, P2PMessage}; +use ethlambda_blockchain::BlockChain; use ethlambda_storage::Store; -use ethlambda_types::primitives::H256; +use ethlambda_types::{ + ShortRoot, + attestation::{SignedAggregatedAttestation, SignedAttestation}, + block::SignedBlockWithAttestation, + primitives::{ + H256, + ssz::{Encode, TreeHash}, + }, +}; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; use libp2p::{ Multiaddr, PeerId, StreamProtocol, - futures::StreamExt, - gossipsub::{MessageAuthenticity, ValidationMode}, + gossipsub::{self as libp2p_gossipsub, MessageAuthenticity, ValidationMode}, identity::{PublicKey, secp256k1}, multiaddr::Protocol, - request_response::{self, OutboundRequestId}, - swarm::{NetworkBehaviour, SwarmEvent}, + request_response, + swarm::NetworkBehaviour, }; +use rand::seq::SliceRandom; use sha2::Digest; +use spawned_concurrency::actor; +use spawned_concurrency::error::ActorError; +use spawned_concurrency::protocol; +use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; use tokio::sync::mpsc; -use tracing::{info, trace, warn}; +use tracing::{error, info, trace, warn}; use crate::{ gossipsub::{ AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, - publish_aggregated_attestation, publish_attestation, publish_block, + encoding::compress_message, }, req_resp::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, - STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, + Request, RequestedBlockRoots, Response, ResponsePayload, STATUS_PROTOCOL_V1, Status, + build_status, }, + swarm_driver::{SwarmCommand, SwarmDriver}, }; mod gossipsub; pub mod metrics; mod req_resp; +mod swarm_driver; pub use metrics::populate_name_registry; @@ -46,14 +62,741 @@ const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; -enum RetryMessage { - BlockFetch(H256), - PeerRedial(PeerId), -} - pub(crate) struct PendingRequest { pub(crate) attempts: u32, - pub(crate) last_peer: Option, +} + +/// Wrapper for ResponseChannel that implements Clone via Arc>>. +/// +/// The spawned-concurrency `#[protocol]` macro derives Clone on all generated +/// message structs, but libp2p's ResponseChannel is not Clone. This wrapper +/// allows the channel to be sent through the actor's mailbox. The inner Option +/// is taken when the response is sent, ensuring single-use semantics. +#[derive(Clone)] +pub(crate) struct ResponseChannelWrapper( + Arc>>>, +); + +impl ResponseChannelWrapper { + pub(crate) fn new(channel: request_response::ResponseChannel) -> Self { + Self(Arc::new(Mutex::new(Some(channel)))) + } + + pub(crate) fn take(&self) -> Option> { + self.0.lock().unwrap().take() + } +} + +// --- P2P Protocol --- + +#[protocol] +pub(crate) trait P2pProtocol: Send + Sync { + // From SwarmDriver — gossip + fn on_gossip_block(&self, block: SignedBlockWithAttestation) -> Result<(), ActorError>; + fn on_gossip_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>; + fn on_gossip_aggregated_attestation( + &self, + attestation: SignedAggregatedAttestation, + ) -> Result<(), ActorError>; + + // From SwarmDriver — req/resp + fn on_status_request( + &self, + status: Status, + channel: ResponseChannelWrapper, + peer: PeerId, + ) -> Result<(), ActorError>; + fn on_blocks_by_root_request( + &self, + request: BlocksByRootRequest, + channel: ResponseChannelWrapper, + peer: PeerId, + ) -> Result<(), ActorError>; + fn on_status_response(&self, status: Status, peer: PeerId) -> Result<(), ActorError>; + fn on_blocks_by_root_response( + &self, + blocks: Vec, + peer: PeerId, + correlation_id: u64, + ) -> Result<(), ActorError>; + fn on_req_resp_failure( + &self, + peer: PeerId, + correlation_id: u64, + error: String, + ) -> Result<(), ActorError>; + + // From SwarmDriver — connections + fn on_peer_connected( + &self, + peer_id: PeerId, + direction: String, + first_connection: bool, + ) -> Result<(), ActorError>; + fn on_peer_disconnected( + &self, + peer_id: PeerId, + direction: String, + reason: String, + last_connection: bool, + ) -> Result<(), ActorError>; + fn on_outgoing_connection_error( + &self, + peer_id: Option, + error: String, + ) -> Result<(), ActorError>; + + // From BlockChain (via P2PMessage bridge) + fn publish_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>; + fn publish_block(&self, signed_block: SignedBlockWithAttestation) -> Result<(), ActorError>; + fn publish_aggregated_attestation( + &self, + attestation: SignedAggregatedAttestation, + ) -> Result<(), ActorError>; + fn fetch_block(&self, root: H256) -> Result<(), ActorError>; + + // Self-scheduled retries + #[allow(dead_code)] + fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>; + #[allow(dead_code)] + fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>; +} + +// --- P2PServer (actor state) --- + +pub(crate) struct P2PServer { + swarm_tx: mpsc::UnboundedSender, + blockchain: BlockChain, + store: Store, + attestation_topic: libp2p_gossipsub::IdentTopic, + block_topic: libp2p_gossipsub::IdentTopic, + aggregation_topic: libp2p_gossipsub::IdentTopic, + connected_peers: HashSet, + pending_requests: HashMap, + /// Maps correlation IDs to block roots for tracking outbound BlocksByRoot requests. + correlation_id_map: HashMap, + next_correlation_id: u64, + /// Bootnode addresses for redialing when disconnected. + bootnode_addrs: HashMap, +} + +impl P2PServer { + fn next_correlation_id(&mut self) -> u64 { + let id = self.next_correlation_id; + self.next_correlation_id += 1; + id + } + + /// Fetch a missing block from a random connected peer. + fn fetch_block_from_peer(&mut self, root: H256) -> bool { + if self.connected_peers.is_empty() { + warn!(%root, "Cannot fetch block: no connected peers"); + return false; + } + + let peers: Vec<_> = self.connected_peers.iter().copied().collect(); + let peer = match peers.choose(&mut rand::thread_rng()) { + Some(&p) => p, + None => { + warn!(%root, "Failed to select random peer"); + return false; + } + }; + + let mut roots = RequestedBlockRoots::empty(); + if let Err(err) = roots.push(root) { + error!(%root, ?err, "Failed to create BlocksByRoot request"); + return false; + } + let request = BlocksByRootRequest { roots }; + + info!(%peer, %root, "Sending BlocksByRoot request for missing block"); + + let correlation_id = self.next_correlation_id(); + let _ = self.swarm_tx.send(SwarmCommand::SendRequest { + correlation_id, + peer_id: peer, + request: Request::BlocksByRoot(request), + protocol: StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), + }); + + self.pending_requests + .entry(root) + .or_insert(PendingRequest { attempts: 1 }); + + self.correlation_id_map.insert(correlation_id, root); + + true + } + + /// Handle a fetch failure by scheduling a retry with exponential backoff. + fn handle_fetch_failure(&mut self, root: H256, peer: PeerId, ctx: &Context) { + let Some(pending) = self.pending_requests.get_mut(&root) else { + return; + }; + + if pending.attempts >= MAX_FETCH_RETRIES { + error!( + %root, %peer, attempts = %pending.attempts, + "Block fetch failed after max retries, giving up" + ); + self.pending_requests.remove(&root); + return; + } + + let backoff_ms = INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(pending.attempts - 1); + let backoff = Duration::from_millis(backoff_ms); + + warn!( + %root, %peer, attempts = %pending.attempts, ?backoff, + "Block fetch failed, scheduling retry" + ); + + pending.attempts += 1; + + send_after(backoff, ctx.clone(), p2p_protocol::RetryBlockFetch { root }); + } +} + +// --- Actor implementation --- + +#[actor(protocol = P2pProtocol)] +impl P2PServer { + // --- Gossip handlers --- + + #[send_handler] + async fn handle_on_gossip_block( + &mut self, + msg: p2p_protocol::OnGossipBlock, + _ctx: &Context, + ) { + let signed_block = msg.block; + let slot = signed_block.message.block.slot; + let block_root = signed_block.message.block.tree_hash_root(); + let proposer = signed_block.message.block.proposer_index; + let parent_root = signed_block.message.block.parent_root; + let attestation_count = signed_block.message.block.body.attestations.len(); + info!( + %slot, + proposer, + block_root = %ShortRoot(&block_root.0), + parent_root = %ShortRoot(&parent_root.0), + attestation_count, + "Received block from gossip" + ); + self.blockchain.notify_new_block(signed_block); + } + + #[send_handler] + async fn handle_on_gossip_attestation( + &mut self, + msg: p2p_protocol::OnGossipAttestation, + _ctx: &Context, + ) { + let attestation = msg.attestation; + let slot = attestation.data.slot; + let validator = attestation.validator_id; + info!( + %slot, + validator, + head_root = %ShortRoot(&attestation.data.head.root.0), + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Received attestation from gossip" + ); + self.blockchain.notify_new_attestation(attestation); + } + + #[send_handler] + async fn handle_on_gossip_aggregated_attestation( + &mut self, + msg: p2p_protocol::OnGossipAggregatedAttestation, + _ctx: &Context, + ) { + let attestation = msg.attestation; + let slot = attestation.data.slot; + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Received aggregated attestation from gossip" + ); + self.blockchain + .notify_new_aggregated_attestation(attestation); + } + + // --- Req/resp handlers --- + + #[send_handler] + async fn handle_on_status_request( + &mut self, + msg: p2p_protocol::OnStatusRequest, + _ctx: &Context, + ) { + let request = msg.status; + let peer = msg.peer; + info!( + finalized_slot = %request.finalized.slot, + head_slot = %request.head.slot, + "Received status request from peer {peer}" + ); + let our_status = build_status(&self.store); + if let Some(channel) = msg.channel.take() { + let response = Response::success(ResponsePayload::Status(our_status)); + let _ = self + .swarm_tx + .send(SwarmCommand::SendResponse { channel, response }); + } + } + + #[send_handler] + async fn handle_on_blocks_by_root_request( + &mut self, + msg: p2p_protocol::OnBlocksByRootRequest, + _ctx: &Context, + ) { + let request = msg.request; + let peer = msg.peer; + let num_roots = request.roots.len(); + info!(%peer, num_roots, "Received BlocksByRoot request"); + + let mut blocks = Vec::new(); + for root in request.roots.iter() { + if let Some(signed_block) = self.store.get_signed_block(root) { + blocks.push(signed_block); + } + // Missing blocks are silently skipped (per spec) + } + + let found = blocks.len(); + info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); + + if let Some(channel) = msg.channel.take() { + let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); + let _ = self + .swarm_tx + .send(SwarmCommand::SendResponse { channel, response }) + .inspect_err( + |err| warn!(%peer, %err, "Failed to send BlocksByRoot response command"), + ); + } + } + + #[send_handler] + async fn handle_on_status_response( + &mut self, + msg: p2p_protocol::OnStatusResponse, + _ctx: &Context, + ) { + info!( + finalized_slot = %msg.status.finalized.slot, + head_slot = %msg.status.head.slot, + "Received status response from peer {}", + msg.peer + ); + } + + #[send_handler] + async fn handle_on_blocks_by_root_response( + &mut self, + msg: p2p_protocol::OnBlocksByRootResponse, + ctx: &Context, + ) { + let blocks = msg.blocks; + let peer = msg.peer; + let correlation_id = msg.correlation_id; + info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); + + let Some(requested_root) = self.correlation_id_map.remove(&correlation_id) else { + warn!(%peer, correlation_id, "Received response for unknown correlation_id"); + return; + }; + + if blocks.is_empty() { + self.correlation_id_map + .insert(correlation_id, requested_root); + warn!(%peer, "Received empty BlocksByRoot response"); + self.handle_fetch_failure(requested_root, peer, ctx); + return; + } + + for block in blocks { + let root = block.message.block.tree_hash_root(); + if root != requested_root { + warn!( + %peer, + received_root = %ShortRoot(&root.0), + expected_root = %ShortRoot(&requested_root.0), + "Received block with mismatched root, ignoring" + ); + continue; + } + self.pending_requests.remove(&root); + self.blockchain.notify_new_block(block); + } + } + + #[send_handler] + async fn handle_on_req_resp_failure( + &mut self, + msg: p2p_protocol::OnReqRespFailure, + ctx: &Context, + ) { + if let Some(root) = self.correlation_id_map.remove(&msg.correlation_id) { + self.handle_fetch_failure(root, msg.peer, ctx); + } + } + + // --- Connection handlers --- + + #[send_handler] + async fn handle_on_peer_connected( + &mut self, + msg: p2p_protocol::OnPeerConnected, + _ctx: &Context, + ) { + let peer_id = msg.peer_id; + let direction = &msg.direction; + + if msg.first_connection { + self.connected_peers.insert(peer_id); + let peer_count = self.connected_peers.len(); + metrics::notify_peer_connected(&Some(peer_id), direction, "success"); + + let our_status = build_status(&self.store); + let our_finalized_slot = our_status.finalized.slot; + let our_head_slot = our_status.head.slot; + info!( + %peer_id, + %direction, + peer_count, + our_finalized_slot, + our_head_slot, + "Peer connected" + ); + + // Send status request on first connection + let correlation_id = self.next_correlation_id(); + let _ = self.swarm_tx.send(SwarmCommand::SendRequest { + correlation_id, + peer_id, + request: Request::Status(our_status), + protocol: StreamProtocol::new(STATUS_PROTOCOL_V1), + }); + } else { + info!(%peer_id, %direction, "Added peer connection"); + } + } + + #[send_handler] + async fn handle_on_peer_disconnected( + &mut self, + msg: p2p_protocol::OnPeerDisconnected, + ctx: &Context, + ) { + let peer_id = msg.peer_id; + let direction = &msg.direction; + let reason = &msg.reason; + + if msg.last_connection { + self.connected_peers.remove(&peer_id); + let peer_count = self.connected_peers.len(); + metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); + + info!( + %peer_id, + %direction, + %reason, + peer_count, + "Peer disconnected" + ); + + if self.bootnode_addrs.contains_key(&peer_id) { + send_after( + Duration::from_secs(PEER_REDIAL_INTERVAL_SECS), + ctx.clone(), + p2p_protocol::RetryPeerRedial { peer_id }, + ); + info!( + %peer_id, + "Scheduled bootnode redial in {}s", + PEER_REDIAL_INTERVAL_SECS + ); + } + } else { + info!( + %peer_id, %direction, %reason, + "Peer connection closed but other connections remain" + ); + } + } + + #[send_handler] + async fn handle_on_outgoing_connection_error( + &mut self, + msg: p2p_protocol::OnOutgoingConnectionError, + ctx: &Context, + ) { + let peer_id = msg.peer_id; + let error = &msg.error; + let error_lower = error.to_lowercase(); + let result = if error_lower.contains("timeout") + || error_lower.contains("timedout") + || error_lower.contains("timed out") + { + "timeout" + } else { + "error" + }; + metrics::notify_peer_connected(&peer_id, "outbound", result); + warn!(?peer_id, %error, "Outgoing connection error"); + + if let Some(pid) = peer_id + && self.bootnode_addrs.contains_key(&pid) + && !self.connected_peers.contains(&pid) + { + send_after( + Duration::from_secs(PEER_REDIAL_INTERVAL_SECS), + ctx.clone(), + p2p_protocol::RetryPeerRedial { peer_id: pid }, + ); + info!(%pid, "Scheduled bootnode redial after connection error"); + } + } + + // --- Publish handlers (from BlockChain via bridge) --- + + #[send_handler] + async fn handle_publish_attestation( + &mut self, + msg: p2p_protocol::PublishAttestation, + _ctx: &Context, + ) { + let attestation = msg.attestation; + let slot = attestation.data.slot; + let validator = attestation.validator_id; + + let ssz_bytes = attestation.as_ssz_bytes(); + let compressed = compress_message(&ssz_bytes); + + let _ = self + .swarm_tx + .send(SwarmCommand::GossipPublish { + topic: self.attestation_topic.clone(), + data: compressed, + }) + .inspect(|_| { + info!( + %slot, + validator, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published attestation to gossipsub" + ) + }) + .inspect_err(|err| { + warn!( + %slot, %validator, %err, + "Failed to publish attestation to gossipsub" + ) + }); + } + + #[send_handler] + async fn handle_publish_block( + &mut self, + msg: p2p_protocol::PublishBlock, + _ctx: &Context, + ) { + let signed_block = msg.signed_block; + let slot = signed_block.message.block.slot; + let proposer = signed_block.message.block.proposer_index; + let block_root = signed_block.message.block.tree_hash_root(); + let parent_root = signed_block.message.block.parent_root; + let attestation_count = signed_block.message.block.body.attestations.len(); + + let ssz_bytes = signed_block.as_ssz_bytes(); + let compressed = compress_message(&ssz_bytes); + + let _ = self + .swarm_tx + .send(SwarmCommand::GossipPublish { + topic: self.block_topic.clone(), + data: compressed, + }) + .inspect(|_| { + info!( + %slot, + proposer, + block_root = %ShortRoot(&block_root.0), + parent_root = %ShortRoot(&parent_root.0), + attestation_count, + "Published block to gossipsub" + ) + }) + .inspect_err(|err| { + warn!( + %slot, %proposer, %err, + "Failed to publish block to gossipsub" + ) + }); + } + + #[send_handler] + async fn handle_publish_aggregated_attestation( + &mut self, + msg: p2p_protocol::PublishAggregatedAttestation, + _ctx: &Context, + ) { + let attestation = msg.attestation; + let slot = attestation.data.slot; + + let ssz_bytes = attestation.as_ssz_bytes(); + let compressed = compress_message(&ssz_bytes); + + let _ = self + .swarm_tx + .send(SwarmCommand::GossipPublish { + topic: self.aggregation_topic.clone(), + data: compressed, + }) + .inspect(|_| { + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published aggregated attestation to gossipsub" + ) + }) + .inspect_err(|err| { + warn!( + %slot, %err, + "Failed to publish aggregated attestation to gossipsub" + ) + }); + } + + // --- Fetch and retry handlers --- + + #[send_handler] + async fn handle_fetch_block(&mut self, msg: p2p_protocol::FetchBlock, _ctx: &Context) { + let root = msg.root; + + // Deduplicate — if already pending, ignore + if self.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch already in progress, ignoring duplicate"); + return; + } + + self.fetch_block_from_peer(root); + } + + #[send_handler] + async fn handle_retry_block_fetch( + &mut self, + msg: p2p_protocol::RetryBlockFetch, + _ctx: &Context, + ) { + let root = msg.root; + + if !self.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch completed during backoff, skipping retry"); + return; + } + + info!(%root, "Retrying block fetch after backoff"); + + if !self.fetch_block_from_peer(root) { + error!(%root, "Failed to retry block fetch, giving up"); + self.pending_requests.remove(&root); + } + } + + #[send_handler] + async fn handle_retry_peer_redial( + &mut self, + msg: p2p_protocol::RetryPeerRedial, + ctx: &Context, + ) { + let peer_id = msg.peer_id; + + if self.connected_peers.contains(&peer_id) { + trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); + return; + } + + if let Some(addr) = self.bootnode_addrs.get(&peer_id) { + info!(%peer_id, "Redialing disconnected bootnode"); + let _ = self + .swarm_tx + .send(SwarmCommand::Dial(addr.clone())) + .inspect_err(|e| { + warn!(%peer_id, %e, "Failed to send redial command, will retry"); + send_after( + Duration::from_secs(PEER_REDIAL_INTERVAL_SECS), + ctx.clone(), + p2p_protocol::RetryPeerRedial { peer_id }, + ); + }); + } + } +} + +// --- Public API --- + +/// Handle to the P2P actor. +#[derive(Clone)] +pub struct P2P { + handle: ActorRef, +} + +impl P2P { + pub fn publish_attestation(&self, attestation: SignedAttestation) { + let _ = self + .handle + .publish_attestation(attestation) + .inspect_err(|err| error!(%err, "Failed to publish attestation to P2P actor")); + } + + pub fn publish_block(&self, signed_block: SignedBlockWithAttestation) { + let _ = self + .handle + .publish_block(signed_block) + .inspect_err(|err| error!(%err, "Failed to publish block to P2P actor")); + } + + pub fn publish_aggregated_attestation(&self, attestation: SignedAggregatedAttestation) { + let _ = self + .handle + .publish_aggregated_attestation(attestation) + .inspect_err( + |err| error!(%err, "Failed to publish aggregated attestation to P2P actor"), + ); + } + + pub fn fetch_block(&self, root: H256) { + let _ = self + .handle + .fetch_block(root) + .inspect_err(|err| error!(%err, "Failed to fetch block via P2P actor")); + } +} + +// --- Initialization --- + +/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour { + gossipsub: libp2p::gossipsub::Behaviour, + req_resp: request_response::Behaviour, } #[allow(clippy::too_many_arguments)] @@ -62,12 +805,11 @@ pub async fn start_p2p( bootnodes: Vec, listening_socket: SocketAddr, blockchain: BlockChain, - p2p_rx: mpsc::UnboundedReceiver, store: Store, validator_id: Option, attestation_committee_count: u64, is_aggregator: bool, -) -> Result<(), libp2p::gossipsub::SubscriptionError> { +) -> Result<(P2P, tokio::task::JoinHandle<()>), libp2p::gossipsub::SubscriptionError> { let config = libp2p::gossipsub::ConfigBuilder::default() // d .mesh_n(8) @@ -120,8 +862,6 @@ pub async fn start_p2p( let secret_key = secp256k1::SecretKey::try_from_bytes(node_key).expect("invalid node key"); let identity = libp2p::identity::Keypair::from(secp256k1::Keypair::from(secret_key)); - // TODO: implement Executor with spawned? - // libp2p::swarm::Config::with_executor(executor) let mut swarm = libp2p::SwarmBuilder::with_existing_identity(identity) .with_tokio() .with_quic() @@ -160,7 +900,7 @@ pub async fn start_p2p( // Subscribe to block topic (all nodes) let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); - let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); + let block_topic = libp2p_gossipsub::IdentTopic::new(block_topic_str); swarm .behaviour_mut() .gossipsub @@ -170,7 +910,7 @@ pub async fn start_p2p( // Subscribe to aggregation topic (all validators) let aggregation_topic_str = format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); - let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); + let aggregation_topic = libp2p_gossipsub::IdentTopic::new(aggregation_topic_str); swarm .behaviour_mut() .gossipsub @@ -187,7 +927,7 @@ pub async fn start_p2p( }; let attestation_topic_str = format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); - let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + let attestation_topic = libp2p_gossipsub::IdentTopic::new(attestation_topic_str); // Only aggregators subscribe to attestation subnets; non-aggregators // publish via gossipsub's fanout mechanism without subscribing. @@ -201,269 +941,32 @@ pub async fn start_p2p( info!(socket=%listening_socket, "P2P node started"); - let (retry_tx, retry_rx) = mpsc::unbounded_channel(); + // Create channels between actor and swarm driver + let (swarm_cmd_tx, swarm_cmd_rx) = mpsc::unbounded_channel(); let server = P2PServer { - swarm, + swarm_tx: swarm_cmd_tx, blockchain, store, - p2p_rx, attestation_topic, block_topic, aggregation_topic, connected_peers: HashSet::new(), pending_requests: HashMap::new(), - request_id_map: HashMap::new(), + correlation_id_map: HashMap::new(), + next_correlation_id: 0, bootnode_addrs, - retry_tx, - retry_rx, }; - event_loop(server).await; - Ok(()) -} - -/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours -#[derive(NetworkBehaviour)] -pub(crate) struct Behaviour { - gossipsub: libp2p::gossipsub::Behaviour, - req_resp: request_response::Behaviour, -} - -pub(crate) struct P2PServer { - pub(crate) swarm: libp2p::Swarm, - pub(crate) blockchain: BlockChain, - pub(crate) store: Store, - pub(crate) p2p_rx: mpsc::UnboundedReceiver, - pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, - pub(crate) block_topic: libp2p::gossipsub::IdentTopic, - pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, - pub(crate) connected_peers: HashSet, - pub(crate) pending_requests: HashMap, - pub(crate) request_id_map: HashMap, - /// Bootnode addresses for redialing when disconnected - bootnode_addrs: HashMap, - /// Channel for scheduling retries (block fetches and peer redials) - pub(crate) retry_tx: mpsc::UnboundedSender, - retry_rx: mpsc::UnboundedReceiver, -} + let actor_ref = server.start(); -/// Event loop for the P2P crate. -/// Processes swarm events, incoming requests, responses, gossip, and outgoing messages from blockchain. -async fn event_loop(mut server: P2PServer) { - loop { - tokio::select! { - biased; - - message = server.p2p_rx.recv() => { - let Some(message) = message else { - break; - }; - handle_p2p_message(&mut server, message).await; - } - event = server.swarm.next() => { - let Some(event) = event else { - break; - }; - handle_swarm_event(&mut server, event).await; - } - Some(msg) = server.retry_rx.recv() => { - match msg { - RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await, - RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await, - } - } - } - } -} + let driver = SwarmDriver::new(swarm, swarm_cmd_rx, actor_ref.clone()); + let driver_handle = tokio::spawn(driver.run()); -async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent) { - match event { - SwarmEvent::Behaviour(BehaviourEvent::ReqResp(req_resp_event)) => { - req_resp::handle_req_resp_message(server, req_resp_event).await; - } - SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( - message @ libp2p::gossipsub::Event::Message { .. }, - )) => { - gossipsub::handle_gossipsub_message(server, message).await; - } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - .. - } => { - let direction = connection_direction(&endpoint); - if num_established.get() == 1 { - server.connected_peers.insert(peer_id); - let peer_count = server.connected_peers.len(); - metrics::notify_peer_connected(&Some(peer_id), direction, "success"); - // Send status request on first connection to this peer - let our_status = build_status(&server.store); - let our_finalized_slot = our_status.finalized.slot; - let our_head_slot = our_status.head.slot; - info!( - %peer_id, - %direction, - peer_count, - our_finalized_slot, - our_head_slot, - "Peer connected" - ); - server - .swarm - .behaviour_mut() - .req_resp - .send_request_with_protocol( - &peer_id, - Request::Status(our_status), - libp2p::StreamProtocol::new(STATUS_PROTOCOL_V1), - ); - } else { - info!(%peer_id, %direction, "Added peer connection"); - } - } - SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - num_established, - cause, - .. - } => { - let direction = connection_direction(&endpoint); - let reason = match cause { - None => "remote_close", - Some(err) => { - // Categorize disconnection reasons - let err_str = err.to_string().to_lowercase(); - if err_str.contains("timeout") - || err_str.contains("timedout") - || err_str.contains("keepalive") - { - "timeout" - } else if err_str.contains("reset") || err_str.contains("connectionreset") { - "remote_close" - } else { - "error" - } - } - }; - if num_established == 0 { - server.connected_peers.remove(&peer_id); - let peer_count = server.connected_peers.len(); - metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); - - info!( - %peer_id, - %direction, - %reason, - peer_count, - "Peer disconnected" - ); - - // Schedule redial if this is a bootnode - if server.bootnode_addrs.contains_key(&peer_id) { - schedule_peer_redial(server.retry_tx.clone(), peer_id); - info!(%peer_id, "Scheduled bootnode redial in {}s", PEER_REDIAL_INTERVAL_SECS); - } - } else { - info!(%peer_id, %direction, %reason, "Peer connection closed but other connections remain"); - } - } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - let result = if error.to_string().to_lowercase().contains("timed out") { - "timeout" - } else { - "error" - }; - metrics::notify_peer_connected(&peer_id, "outbound", result); - warn!(?peer_id, %error, "Outgoing connection error"); - - // Schedule redial if this was a bootnode - if let Some(pid) = peer_id - && server.bootnode_addrs.contains_key(&pid) - && !server.connected_peers.contains(&pid) - { - schedule_peer_redial(server.retry_tx.clone(), pid); - info!(%pid, "Scheduled bootnode redial after connection error"); - } - } - SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { - metrics::notify_peer_connected(&peer_id, "inbound", "error"); - warn!(%error, "Incoming connection error"); - } - _ => { - trace!(?event, "Ignored swarm event"); - } - } -} - -async fn handle_p2p_message(server: &mut P2PServer, message: P2PMessage) { - match message { - P2PMessage::PublishAttestation(attestation) => { - publish_attestation(server, attestation).await; - } - P2PMessage::PublishBlock(signed_block) => { - publish_block(server, signed_block).await; - } - P2PMessage::PublishAggregatedAttestation(attestation) => { - publish_aggregated_attestation(server, attestation).await; - } - P2PMessage::FetchBlock(root) => { - // Deduplicate - if already pending, ignore - if server.pending_requests.contains_key(&root) { - trace!(%root, "Block fetch already in progress, ignoring duplicate"); - return; - } - - // Send request and track it (tracking handled internally by fetch_block_from_peer) - fetch_block_from_peer(server, root).await; - } - } -} - -async fn handle_retry(server: &mut P2PServer, root: H256) { - // Check if still pending (might have succeeded during backoff) - if !server.pending_requests.contains_key(&root) { - trace!(%root, "Block fetch completed during backoff, skipping retry"); - return; - } - - info!(%root, "Retrying block fetch after backoff"); - - // Retry the fetch (tracking handled internally by fetch_block_from_peer) - if !fetch_block_from_peer(server, root).await { - tracing::error!(%root, "Failed to retry block fetch, giving up"); - server.pending_requests.remove(&root); - } -} - -async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { - // Skip if already reconnected - if server.connected_peers.contains(&peer_id) { - trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); - return; - } - - if let Some(addr) = server.bootnode_addrs.get(&peer_id) { - info!(%peer_id, "Redialing disconnected bootnode"); - // NOTE: this dial does some checks and adds a pending outbound connection attempt. - // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. - let _ = server.swarm.dial(addr.clone()).inspect_err(|e| { - warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); - // Schedule another redial attempt - schedule_peer_redial(server.retry_tx.clone(), peer_id); - }); - } + Ok((P2P { handle: actor_ref }, driver_handle)) } -/// Schedules a peer redial after the configured delay interval. -pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender, peer_id: PeerId) { - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; - let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); - }); -} +// --- Types and utilities --- pub struct Bootnode { pub(crate) ip: IpAddr, @@ -524,14 +1027,6 @@ pub fn parse_enrs(enrs: Vec) -> Vec { bootnodes } -fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str { - if endpoint.is_dialer() { - "outbound" - } else { - "inbound" - } -} - fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index e08f4357..769e9446 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -1,177 +1,7 @@ use ethlambda_storage::Store; -use libp2p::{PeerId, request_response}; -use rand::seq::SliceRandom; -use tokio::time::Duration; -use tracing::{debug, error, info, warn}; - use ethlambda_types::checkpoint::Checkpoint; -use ethlambda_types::primitives::ssz::TreeHash; -use ethlambda_types::{block::SignedBlockWithAttestation, primitives::H256}; - -use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status, -}; -use crate::{ - BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - RetryMessage, req_resp::RequestedBlockRoots, -}; - -pub async fn handle_req_resp_message( - server: &mut P2PServer, - event: request_response::Event, -) { - match event { - request_response::Event::Message { peer, message, .. } => match message { - request_response::Message::Request { - request, channel, .. - } => match request { - Request::Status(status) => { - handle_status_request(server, status, channel, peer).await; - } - Request::BlocksByRoot(request) => { - handle_blocks_by_root_request(server, request, channel, peer).await; - } - }, - request_response::Message::Response { - request_id, - response, - } => match response { - Response::Success { payload } => match payload { - ResponsePayload::Status(status) => { - handle_status_response(status, peer).await; - } - ResponsePayload::BlocksByRoot(blocks) => { - handle_blocks_by_root_response(server, blocks, peer, request_id).await; - } - }, - Response::Error { code, message } => { - let error_str = String::from_utf8_lossy(&message); - warn!(%peer, ?code, %error_str, "Received error response"); - } - }, - }, - request_response::Event::OutboundFailure { - peer, - request_id, - error, - .. - } => { - warn!(%peer, ?request_id, %error, "Outbound request failed"); - - // Check if this was a block fetch request - if let Some(root) = server.request_id_map.remove(&request_id) { - handle_fetch_failure(server, root, peer).await; - } - } - request_response::Event::InboundFailure { - peer, - request_id, - error, - .. - } => { - warn!(%peer, ?request_id, %error, "Inbound request failed"); - } - request_response::Event::ResponseSent { - peer, request_id, .. - } => { - debug!(%peer, ?request_id, "Response sent successfully"); - } - } -} - -async fn handle_status_request( - server: &mut P2PServer, - request: Status, - channel: request_response::ResponseChannel, - peer: PeerId, -) { - info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); - let our_status = build_status(&server.store); - server - .swarm - .behaviour_mut() - .req_resp - .send_response( - channel, - Response::success(ResponsePayload::Status(our_status)), - ) - .unwrap(); -} - -async fn handle_status_response(status: Status, peer: PeerId) { - info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); -} - -async fn handle_blocks_by_root_request( - server: &mut P2PServer, - request: BlocksByRootRequest, - channel: request_response::ResponseChannel, - peer: PeerId, -) { - let num_roots = request.roots.len(); - info!(%peer, num_roots, "Received BlocksByRoot request"); - - let mut blocks = Vec::new(); - for root in request.roots.iter() { - if let Some(signed_block) = server.store.get_signed_block(root) { - blocks.push(signed_block); - } - // Missing blocks are silently skipped (per spec) - } - - let found = blocks.len(); - info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); - - let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); - let _ = server - .swarm - .behaviour_mut() - .req_resp - .send_response(channel, response) - .inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response")); -} - -async fn handle_blocks_by_root_response( - server: &mut P2PServer, - blocks: Vec, - peer: PeerId, - request_id: request_response::OutboundRequestId, -) { - info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - - // Look up which root was requested for this specific request - let Some(requested_root) = server.request_id_map.remove(&request_id) else { - warn!(%peer, ?request_id, "Received response for unknown request_id"); - return; - }; - - if blocks.is_empty() { - server.request_id_map.insert(request_id, requested_root); - warn!(%peer, "Received empty BlocksByRoot response"); - handle_fetch_failure(server, requested_root, peer).await; - return; - } - - for block in blocks { - let root = block.message.block.tree_hash_root(); - - // Validate that this block matches what we requested - if root != requested_root { - warn!( - %peer, - received_root = %ethlambda_types::ShortRoot(&root.0), - expected_root = %ethlambda_types::ShortRoot(&requested_root.0), - "Received block with mismatched root, ignoring" - ); - continue; - } - // Clean up tracking for this root - server.pending_requests.remove(&root); - - server.blockchain.notify_new_block(block); - } -} +use super::Status; /// Build a Status message from the current Store state. pub fn build_status(store: &Store) -> Status { @@ -189,84 +19,3 @@ pub fn build_status(store: &Store) -> Status { }, } } - -/// Fetch a missing block from a random connected peer. -/// Handles tracking in both pending_requests and request_id_map. -pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { - if server.connected_peers.is_empty() { - warn!(%root, "Cannot fetch block: no connected peers"); - return false; - } - - // Select random peer - let peers: Vec<_> = server.connected_peers.iter().copied().collect(); - let peer = match peers.choose(&mut rand::thread_rng()) { - Some(&p) => p, - None => { - warn!(%root, "Failed to select random peer"); - return false; - } - }; - - // Create BlocksByRoot request with single root - let mut roots = RequestedBlockRoots::empty(); - if let Err(err) = roots.push(root) { - error!(%root, ?err, "Failed to create BlocksByRoot request"); - return false; - } - let request = BlocksByRootRequest { roots }; - - info!(%peer, %root, "Sending BlocksByRoot request for missing block"); - let request_id = server - .swarm - .behaviour_mut() - .req_resp - .send_request_with_protocol( - &peer, - Request::BlocksByRoot(request), - libp2p::StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), - ); - - // Track the request if not already tracked (new request) - let pending = server - .pending_requests - .entry(root) - .or_insert(PendingRequest { - attempts: 1, - last_peer: None, - }); - - // Update last_peer - pending.last_peer = Some(peer); - - // Map request_id to root for failure handling - server.request_id_map.insert(request_id, root); - - true -} - -async fn handle_fetch_failure(server: &mut P2PServer, root: H256, peer: PeerId) { - let Some(pending) = server.pending_requests.get_mut(&root) else { - return; - }; - - if pending.attempts >= MAX_FETCH_RETRIES { - error!(%root, %peer, attempts=%pending.attempts, - "Block fetch failed after max retries, giving up"); - server.pending_requests.remove(&root); - return; - } - - let backoff_ms = INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(pending.attempts - 1); - let backoff = Duration::from_millis(backoff_ms); - - warn!(%root, %peer, attempts=%pending.attempts, ?backoff, "Block fetch failed, scheduling retry"); - - pending.attempts += 1; - - let retry_tx = server.retry_tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(backoff).await; - let _ = retry_tx.send(RetryMessage::BlockFetch(root)); - }); -} diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index 6b574daf..1207b76a 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -1,11 +1,11 @@ mod codec; mod encoding; -pub mod handlers; +mod handlers; mod messages; pub use codec::Codec; pub use encoding::MAX_COMPRESSED_PAYLOAD_SIZE; -pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; +pub use handlers::build_status; pub use messages::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response, ResponsePayload, STATUS_PROTOCOL_V1, Status, diff --git a/crates/net/p2p/src/swarm_driver.rs b/crates/net/p2p/src/swarm_driver.rs new file mode 100644 index 00000000..d5069d35 --- /dev/null +++ b/crates/net/p2p/src/swarm_driver.rs @@ -0,0 +1,340 @@ +use std::collections::HashMap; + +use libp2p::{ + Multiaddr, PeerId, StreamProtocol, + futures::StreamExt, + gossipsub as libp2p_gossipsub, + request_response::{self, OutboundRequestId}, + swarm::SwarmEvent, +}; +use spawned_concurrency::tasks::ActorRef; +use tokio::sync::mpsc; +use tracing::{debug, error, trace, warn}; + +use ethlambda_types::{ + attestation::{SignedAggregatedAttestation, SignedAttestation}, + block::SignedBlockWithAttestation, + primitives::ssz::Decode, +}; + +use crate::{ + Behaviour, BehaviourEvent, P2PServer, P2pProtocol, ResponseChannelWrapper, + gossipsub::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + encoding::decompress_message, + }, + req_resp::{Request, Response, ResponsePayload}, +}; + +/// Commands sent from the P2PServer actor to the SwarmDriver. +pub(crate) enum SwarmCommand { + GossipPublish { + topic: libp2p_gossipsub::IdentTopic, + data: Vec, + }, + SendRequest { + correlation_id: u64, + peer_id: PeerId, + request: Request, + protocol: StreamProtocol, + }, + SendResponse { + channel: request_response::ResponseChannel, + response: Response, + }, + Dial(Multiaddr), +} + +/// Owns the libp2p Swarm and polls it continuously. +/// +/// Forwards decoded network events to the P2PServer actor and executes +/// SwarmCommands received from it. Runs as a plain tokio task so the +/// swarm keeps being polled even when the actor is busy processing messages. +pub(crate) struct SwarmDriver { + swarm: libp2p::Swarm, + command_rx: mpsc::UnboundedReceiver, + p2p_actor: ActorRef, + /// Maps libp2p OutboundRequestId → our correlation ID + outbound_request_map: HashMap, +} + +impl SwarmDriver { + pub(crate) fn new( + swarm: libp2p::Swarm, + command_rx: mpsc::UnboundedReceiver, + p2p_actor: ActorRef, + ) -> Self { + Self { + swarm, + command_rx, + p2p_actor, + outbound_request_map: HashMap::new(), + } + } + + pub(crate) async fn run(mut self) { + loop { + tokio::select! { + Some(command) = self.command_rx.recv() => { + self.handle_command(command); + } + Some(event) = self.swarm.next() => { + self.handle_swarm_event(event); + } + } + } + } + + fn handle_command(&mut self, command: SwarmCommand) { + match command { + SwarmCommand::GossipPublish { topic, data } => { + let _ = self + .swarm + .behaviour_mut() + .gossipsub + .publish(topic, data) + .inspect_err(|err| warn!(%err, "Failed to publish gossip message")); + } + SwarmCommand::SendRequest { + correlation_id, + peer_id, + request, + protocol, + } => { + let outbound_id = self + .swarm + .behaviour_mut() + .req_resp + .send_request_with_protocol(&peer_id, request, protocol); + self.outbound_request_map + .insert(outbound_id, correlation_id); + } + SwarmCommand::SendResponse { channel, response } => { + let _ = self + .swarm + .behaviour_mut() + .req_resp + .send_response(channel, response) + .inspect_err(|err| warn!(?err, "Failed to send response")); + } + SwarmCommand::Dial(addr) => { + let _ = self + .swarm + .dial(addr) + .inspect_err(|err| warn!(%err, "Failed to dial")); + } + } + } + + fn handle_swarm_event(&mut self, event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(BehaviourEvent::ReqResp(req_resp_event)) => { + self.handle_req_resp_event(req_resp_event); + } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( + libp2p_gossipsub::Event::Message { message, .. }, + )) => { + self.handle_gossip_message(message); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let direction = connection_direction(&endpoint).to_string(); + let first_connection = num_established.get() == 1; + let _ = self + .p2p_actor + .on_peer_connected(peer_id, direction, first_connection); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let direction = connection_direction(&endpoint).to_string(); + let reason = categorize_disconnection(&cause).to_string(); + let last_connection = num_established == 0; + let _ = self.p2p_actor.on_peer_disconnected( + peer_id, + direction, + reason, + last_connection, + ); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + let _ = self + .p2p_actor + .on_outgoing_connection_error(peer_id, error.to_string()); + } + SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { + crate::metrics::notify_peer_connected(&peer_id, "inbound", "error"); + warn!(%error, "Incoming connection error"); + } + _ => { + trace!(?event, "Ignored swarm event"); + } + } + } + + fn handle_gossip_message(&mut self, message: libp2p_gossipsub::Message) { + let topic_kind = message.topic.as_str().split("/").nth(3); + match topic_kind { + Some(BLOCK_TOPIC_KIND) => { + let Ok(uncompressed) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) + else { + return; + }; + let Ok(signed_block) = SignedBlockWithAttestation::from_ssz_bytes(&uncompressed) + .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) + else { + return; + }; + let _ = self.p2p_actor.on_gossip_block(signed_block); + } + Some(AGGREGATION_TOPIC_KIND) => { + let Ok(uncompressed) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) + else { + return; + }; + let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed) + .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) + else { + return; + }; + let _ = self.p2p_actor.on_gossip_aggregated_attestation(aggregation); + } + Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + let Ok(uncompressed) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) + else { + return; + }; + let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed) + .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) + else { + return; + }; + let _ = self.p2p_actor.on_gossip_attestation(signed_attestation); + } + _ => { + trace!("Received message on unknown topic: {}", message.topic); + } + } + } + + fn handle_req_resp_event(&mut self, event: request_response::Event) { + match event { + request_response::Event::Message { peer, message, .. } => match message { + request_response::Message::Request { + request, channel, .. + } => match request { + Request::Status(status) => { + let _ = self.p2p_actor.on_status_request( + status, + ResponseChannelWrapper::new(channel), + peer, + ); + } + Request::BlocksByRoot(request) => { + let _ = self.p2p_actor.on_blocks_by_root_request( + request, + ResponseChannelWrapper::new(channel), + peer, + ); + } + }, + request_response::Message::Response { + request_id, + response, + } => match response { + Response::Success { payload } => match payload { + ResponsePayload::Status(status) => { + self.outbound_request_map.remove(&request_id); + let _ = self.p2p_actor.on_status_response(status, peer); + } + ResponsePayload::BlocksByRoot(blocks) => { + if let Some(correlation_id) = + self.outbound_request_map.remove(&request_id) + { + let _ = self.p2p_actor.on_blocks_by_root_response( + blocks, + peer, + correlation_id, + ); + } else { + warn!( + %peer, + ?request_id, + "Received BlocksByRoot response for unknown request_id" + ); + } + } + }, + Response::Error { code, message } => { + let error_str = String::from_utf8_lossy(&message); + warn!(%peer, ?code, %error_str, "Received error response"); + } + }, + }, + request_response::Event::OutboundFailure { + peer, + request_id, + error, + .. + } => { + warn!(%peer, ?request_id, %error, "Outbound request failed"); + if let Some(correlation_id) = self.outbound_request_map.remove(&request_id) { + let _ = + self.p2p_actor + .on_req_resp_failure(peer, correlation_id, error.to_string()); + } + } + request_response::Event::InboundFailure { + peer, + request_id, + error, + .. + } => { + warn!(%peer, ?request_id, %error, "Inbound request failed"); + } + request_response::Event::ResponseSent { + peer, request_id, .. + } => { + debug!(%peer, ?request_id, "Response sent successfully"); + } + } + } +} + +fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str { + if endpoint.is_dialer() { + "outbound" + } else { + "inbound" + } +} + +fn categorize_disconnection(cause: &Option) -> &'static str { + match cause { + None => "remote_close", + Some(err) => { + let err_str = err.to_string().to_lowercase(); + if err_str.contains("timeout") + || err_str.contains("timedout") + || err_str.contains("keepalive") + { + "timeout" + } else if err_str.contains("reset") || err_str.contains("connectionreset") { + "remote_close" + } else { + "error" + } + } + } +}