From ba074c021b57a358b159f7f3103f10414f6571d1 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 6 Mar 2026 18:31:56 -0300 Subject: [PATCH] Migrate P2P module from tokio::select! event loop to spawned-concurrency actor Replace the hand-rolled event loop in start_p2p() with a proper GenServer actor (P2PServer) using the spawned-concurrency framework, matching how BlockChainServer already works. Key changes: - New ethlambda-network-api crate with shared message types (PublishBlock, FetchBlock, NewBlock, etc.) to break the dependency cycle between blockchain and P2P crates - SwarmAdapter: thin I/O bridge that owns the libp2p Swarm and communicates via channels (SwarmCommand/SwarmEvent), keeping select! isolated to infra - P2P actor receives swarm events via spawn_listener, network-api messages via manual Handler impls, and retry scheduling via send_after - BlockChain protocol simplified: network-api messages use manual Handler impls instead of protocol-wrapped types, enabling Recipient across actor boundaries - Actors wired at startup via InitP2P/InitBlockChain init messages carrying Recipient handles, replacing mpsc channels --- Cargo.lock | 15 +- Cargo.toml | 2 + bin/ethlambda/Cargo.toml | 1 + bin/ethlambda/src/main.rs | 52 +-- crates/blockchain/Cargo.toml | 1 + crates/blockchain/src/lib.rs | 175 +++++----- crates/net/api/Cargo.toml | 14 + crates/net/api/src/lib.rs | 50 +++ crates/net/p2p/Cargo.toml | 6 +- crates/net/p2p/src/gossipsub/handler.rs | 118 +++---- crates/net/p2p/src/lib.rs | 403 ++++++++++++++---------- crates/net/p2p/src/req_resp/handlers.rs | 72 +++-- crates/net/p2p/src/swarm_adapter.rs | 159 ++++++++++ 13 files changed, 691 insertions(+), 377 deletions(-) create mode 100644 crates/net/api/Cargo.toml create mode 100644 crates/net/api/src/lib.rs create mode 100644 crates/net/p2p/src/swarm_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index 85d759f8..dec29435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2023,6 +2023,7 @@ version = "0.1.0" dependencies = [ "clap", "ethlambda-blockchain", + "ethlambda-network-api", "ethlambda-p2p", "ethlambda-rpc", "ethlambda-storage", @@ -2050,6 +2051,7 @@ dependencies = [ "ethlambda-crypto", "ethlambda-fork-choice", "ethlambda-metrics", + "ethlambda-network-api", "ethlambda-state-transition", "ethlambda-storage", "ethlambda-test-fixtures", @@ -2093,6 +2095,14 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "ethlambda-network-api" +version = "0.1.0" +dependencies = [ + "ethlambda-types", + "spawned-concurrency 0.4.5 (git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc)", +] + [[package]] name = "ethlambda-p2p" version = "0.1.0" @@ -2100,20 +2110,23 @@ dependencies = [ "async-trait", "ethereum_ssz", "ethereum_ssz_derive", - "ethlambda-blockchain", "ethlambda-metrics", + "ethlambda-network-api", "ethlambda-storage", "ethlambda-types", "ethrex-common", "ethrex-p2p", "ethrex-rlp", + "futures", "hex", "libp2p", "rand 0.8.5", "sha2", "snap", + "spawned-concurrency 0.4.5 (git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc)", "ssz_types", "tokio", + "tokio-stream", "tracing", "tree_hash", "tree_hash_derive", diff --git a/Cargo.toml b/Cargo.toml index 46fdc657..0d1fb902 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/common/metrics", "crates/common/test-fixtures", "crates/common/types", + "crates/net/api", "crates/net/p2p", "crates/net/rpc", "crates/storage", @@ -33,6 +34,7 @@ ethlambda-crypto = { path = "crates/common/crypto" } ethlambda-metrics = { path = "crates/common/metrics" } ethlambda-test-fixtures = { path = "crates/common/test-fixtures" } ethlambda-types = { path = "crates/common/types" } +ethlambda-network-api = { path = "crates/net/api" } ethlambda-p2p = { path = "crates/net/p2p" } ethlambda-rpc = { path = "crates/net/rpc" } ethlambda-storage = { path = "crates/storage" } diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index 7b46286f..27bc3d76 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] ethlambda-blockchain.workspace = true +ethlambda-network-api.workspace = true ethlambda-p2p.workspace = true ethlambda-types.workspace = true ethlambda-rpc.workspace = true diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 984b20f2..faa6b671 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -18,7 +18,8 @@ use std::{ }; use clap::Parser; -use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p}; +use ethlambda_network_api::{InitBlockChain, InitP2P}; +use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs}; use ethlambda_types::primitives::H256; use ethlambda_types::{ genesis::GenesisConfig, @@ -132,23 +133,37 @@ async fn main() -> eyre::Result<()> { .await .inspect_err(|err| error!(%err, "Failed to initialize state"))?; - let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); // Use first validator ID for subnet subscription let first_validator_id = validator_keys.keys().min().copied(); - let blockchain = - BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); + let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); - let p2p_handle = tokio::spawn(start_p2p( - node_p2p_key, + let built = build_swarm(SwarmConfig { + node_key: node_p2p_key, bootnodes, - p2p_socket, - blockchain, - p2p_rx, - store.clone(), - first_validator_id, - options.attestation_committee_count, - options.is_aggregator, - )); + listening_socket: p2p_socket, + validator_id: first_validator_id, + attestation_committee_count: options.attestation_committee_count, + is_aggregator: options.is_aggregator, + }) + .expect("failed to build swarm"); + + let p2p = P2P::spawn(built, store.clone()); + + // Wire actors together via init messages + let _ = blockchain.actor_ref().recipient::().send(InitP2P { + publish_block: p2p.actor_ref().recipient(), + publish_attestation: p2p.actor_ref().recipient(), + publish_aggregated: p2p.actor_ref().recipient(), + fetch_block: p2p.actor_ref().recipient(), + }); + let _ = p2p + .actor_ref() + .recipient::() + .send(InitBlockChain { + new_block: blockchain.actor_ref().recipient(), + new_attestation: blockchain.actor_ref().recipient(), + new_aggregated: blockchain.actor_ref().recipient(), + }); ethlambda_rpc::start_rpc_server(metrics_socket, store) .await @@ -156,14 +171,7 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); - tokio::select! { - result = p2p_handle => { - panic!("P2P node task has exited unexpectedly: {result:?}"); - } - _ = tokio::signal::ctrl_c() => { - // Ctrl-C received, shutting down - } - } + tokio::signal::ctrl_c().await.ok(); println!("Shutting down..."); Ok(()) diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index 535b010c..5cd60fdb 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true autotests = false [dependencies] +ethlambda-network-api.workspace = true ethlambda-storage.workspace = true ethlambda-state-transition.workspace = true ethlambda-fork-choice.workspace = true diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 095d6189..6e807275 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,6 +1,9 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, SystemTime}; +use ethlambda_network_api::{ + FetchBlock, InitP2P, PublishAggregatedAttestation, PublishAttestation, PublishBlock, +}; use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; use ethlambda_types::{ @@ -14,8 +17,9 @@ use ethlambda_types::{ use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; use spawned_concurrency::protocol; -use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; -use tokio::sync::mpsc; +use spawned_concurrency::tasks::{ + Actor, ActorRef, ActorStart, Context, Handler, Recipient, send_after, +}; use tracing::{error, info, trace, warn}; use crate::store::StoreError; @@ -25,19 +29,6 @@ pub mod key_manager; pub mod metrics; pub mod store; -/// Messages sent from the blockchain to the P2P layer. -#[derive(Clone, Debug)] -pub enum P2PMessage { - /// Publish an attestation to the gossip network. - PublishAttestation(SignedAttestation), - /// Publish a block to the gossip network. - PublishBlock(SignedBlockWithAttestation), - /// Publish an aggregated attestation to the gossip network. - PublishAggregatedAttestation(SignedAggregatedAttestation), - /// Fetch a block by its root hash. - FetchBlock(H256), -} - pub struct BlockChain { handle: ActorRef, } @@ -51,7 +42,6 @@ pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER impl BlockChain { pub fn spawn( store: Store, - p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, is_aggregator: bool, ) -> BlockChain { @@ -59,7 +49,10 @@ impl BlockChain { let key_manager = key_manager::KeyManager::new(validator_keys); let handle = BlockChainServer { store, - p2p_tx, + publish_block: None, + publish_attestation: None, + publish_aggregated: None, + fetch_block: None, key_manager, pending_blocks: HashMap::new(), is_aggregator, @@ -77,30 +70,8 @@ impl BlockChain { BlockChain { handle } } - /// Sends a block to the BlockChain for processing. - pub fn notify_new_block(&self, block: SignedBlockWithAttestation) { - let _ = self - .handle - .new_block(block) - .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new block")); - } - - /// Sends an attestation to the BlockChain for processing. - pub fn notify_new_attestation(&self, attestation: SignedAttestation) { - let _ = self - .handle - .new_attestation(attestation) - .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation")); - } - - /// Sends an aggregated attestation to the BlockChain for processing. - pub fn notify_new_aggregated_attestation(&self, attestation: SignedAggregatedAttestation) { - let _ = self - .handle - .new_aggregated_attestation(attestation) - .inspect_err( - |err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"), - ); + pub fn actor_ref(&self) -> &ActorRef { + &self.handle } } @@ -110,9 +81,15 @@ impl BlockChain { /// Right now it also handles block processing, but in the future /// those updates might be done in parallel with only writes being /// processed by this server. -struct BlockChainServer { +pub struct BlockChainServer { store: Store, - p2p_tx: mpsc::UnboundedSender, + + // P2P actor recipients (set via InitP2P message) + publish_block: Option>, + publish_attestation: Option>, + publish_aggregated: Option>, + fetch_block: Option>, + key_manager: key_manager::KeyManager, // Pending block roots waiting for their parent (block data stored in DB) @@ -153,11 +130,14 @@ impl BlockChainServer { self.is_aggregator, ); - for aggregate in new_aggregates { - let _ = self - .p2p_tx - .send(P2PMessage::PublishAggregatedAttestation(aggregate)) - .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + if let Some(ref recipient) = self.publish_aggregated { + for aggregate in new_aggregates { + let _ = recipient + .send(PublishAggregatedAttestation { + attestation: aggregate, + }) + .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + } } // Now build and publish the block (after attestations have been accepted) @@ -223,16 +203,16 @@ impl BlockChainServer { }; // Publish to gossip network - let Ok(_) = self - .p2p_tx - .send(P2PMessage::PublishAttestation(signed_attestation)) - .inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), - ) - else { - continue; - }; - info!(%slot, %validator_id, "Published attestation"); + if let Some(ref recipient) = self.publish_attestation { + let _ = recipient + .send(PublishAttestation { + attestation: signed_attestation, + }) + .inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), + ); + info!(%slot, %validator_id, "Published attestation"); + } } } @@ -294,13 +274,13 @@ impl BlockChainServer { }; // Publish to gossip network - let Ok(()) = self - .p2p_tx - .send(P2PMessage::PublishBlock(signed_block)) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block")) - else { - return; - }; + if let Some(ref recipient) = self.publish_block { + let _ = recipient + .send(PublishBlock { + block: signed_block, + }) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block")); + } info!(%slot, %validator_id, "Published block"); } @@ -426,13 +406,14 @@ impl BlockChainServer { fn request_missing_block(&mut self, block_root: H256) { // Send request to P2P layer (deduplication handled by P2P module) - let _ = self - .p2p_tx - .send(P2PMessage::FetchBlock(block_root)) - .inspect(|_| info!(%block_root, "Requested missing block from network")) - .inspect_err( - |err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P"), - ); + if let Some(ref recipient) = self.fetch_block { + let _ = recipient + .send(FetchBlock { root: block_root }) + .inspect(|_| info!(%block_root, "Requested missing block from network")) + .inspect_err( + |err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P"), + ); + } } /// Move pending children of `parent_root` into the work queue for iterative @@ -484,14 +465,11 @@ impl BlockChainServer { } } +// Protocol trait for internal messages only (tick scheduling). +// Network-api messages are handled via manual Handler impls to allow +// Recipient to work across actor boundaries. #[protocol] pub(crate) trait BlockChainProtocol: Send + Sync { - fn new_block(&self, block: SignedBlockWithAttestation) -> Result<(), ActorError>; - fn new_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>; - fn new_aggregated_attestation( - &self, - attestation: SignedAggregatedAttestation, - ) -> Result<(), ActorError>; #[allow(dead_code)] // invoked via send_after(Tick), not called directly fn tick(&self) -> Result<(), ActorError>; } @@ -514,31 +492,36 @@ impl BlockChainServer { block_chain_protocol::Tick, ); } +} - #[send_handler] - async fn handle_new_block( - &mut self, - msg: block_chain_protocol::NewBlock, - _ctx: &Context, - ) { +// --- Manual Handler impls for network-api messages (used via Recipient) --- + +use ethlambda_network_api::{NewAggregatedAttestation, NewAttestation, NewBlock}; + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: InitP2P, _ctx: &Context) { + self.publish_block = Some(msg.publish_block); + self.publish_attestation = Some(msg.publish_attestation); + self.publish_aggregated = Some(msg.publish_aggregated); + self.fetch_block = Some(msg.fetch_block); + info!("P2P recipients initialized"); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: NewBlock, _ctx: &Context) { self.on_block(msg.block); } +} - #[send_handler] - async fn handle_new_attestation( - &mut self, - msg: block_chain_protocol::NewAttestation, - _ctx: &Context, - ) { +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: NewAttestation, _ctx: &Context) { self.on_gossip_attestation(msg.attestation); } +} - #[send_handler] - async fn handle_new_aggregated_attestation( - &mut self, - msg: block_chain_protocol::NewAggregatedAttestation, - _ctx: &Context, - ) { +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: NewAggregatedAttestation, _ctx: &Context) { self.on_gossip_aggregated_attestation(msg.attestation); } } diff --git a/crates/net/api/Cargo.toml b/crates/net/api/Cargo.toml new file mode 100644 index 00000000..fad001ba --- /dev/null +++ b/crates/net/api/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ethlambda-network-api" +authors.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +ethlambda-types.workspace = true +spawned-concurrency.workspace = true diff --git a/crates/net/api/src/lib.rs b/crates/net/api/src/lib.rs new file mode 100644 index 00000000..b46d399a --- /dev/null +++ b/crates/net/api/src/lib.rs @@ -0,0 +1,50 @@ +use ethlambda_types::{ + attestation::{SignedAggregatedAttestation, SignedAttestation}, + block::SignedBlockWithAttestation, + primitives::H256, +}; +use spawned_concurrency::tasks::Recipient; + +// --- Messages: BlockChain -> P2P --- + +spawned_concurrency::send_messages! { + PublishBlock { block: SignedBlockWithAttestation }; + PublishAttestation { attestation: SignedAttestation }; + PublishAggregatedAttestation { attestation: SignedAggregatedAttestation }; + FetchBlock { root: H256 } +} + +// --- Messages: P2P -> BlockChain --- + +spawned_concurrency::send_messages! { + NewBlock { block: SignedBlockWithAttestation }; + NewAttestation { attestation: SignedAttestation }; + NewAggregatedAttestation { attestation: SignedAggregatedAttestation } +} + +// --- Init messages --- +// Defined manually because #[protocol] requires Clone, and send_messages! +// doesn't derive it. + +use spawned_concurrency::message::Message; + +#[derive(Clone)] +pub struct InitP2P { + pub publish_block: Recipient, + pub publish_attestation: Recipient, + pub publish_aggregated: Recipient, + pub fetch_block: Recipient, +} +impl Message for InitP2P { + type Result = (); +} + +#[derive(Clone)] +pub struct InitBlockChain { + pub new_block: Recipient, + pub new_attestation: Recipient, + pub new_aggregated: Recipient, +} +impl Message for InitBlockChain { + type Result = (); +} diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 9c26e343..d91643cd 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -10,12 +10,16 @@ rust-version.workspace = true version.workspace = true [dependencies] -ethlambda-blockchain.workspace = true +ethlambda-network-api.workspace = true ethlambda-storage.workspace = true ethlambda-metrics.workspace = true ethlambda-types.workspace = true +spawned-concurrency.workspace = true + async-trait = "0.1" +futures = "0.3" +tokio-stream = "0.1" # Fork with request-response feature for outbound protocol selection libp2p = { git = "https://github.com/lambdaclass/rust-libp2p.git", rev = "cd6cc3b1e5db2c5e23e133c2201c23b063fc4895", features = [ diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 8f156a22..6cea46aa 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -1,3 +1,4 @@ +use ethlambda_network_api::{NewAggregatedAttestation, NewAttestation, NewBlock}; use ethlambda_types::{ ShortRoot, attestation::{SignedAggregatedAttestation, SignedAttestation}, @@ -20,7 +21,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { message, } = event else { - unreachable!("we already matched on event_loop"); + unreachable!("we already matched on Message variant in handle_swarm_event"); }; let topic_kind = message.topic.as_str().split("/").nth(3); match topic_kind { @@ -49,7 +50,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { attestation_count, "Received block from gossip" ); - server.blockchain.notify_new_block(signed_block); + if let Some(ref recipient) = server.new_block { + let _ = recipient + .send(NewBlock { + block: signed_block, + }) + .inspect_err(|err| error!(%err, "Failed to forward block to blockchain")); + } } Some(AGGREGATION_TOPIC_KIND) => { let Ok(uncompressed_data) = decompress_message(&message.data) @@ -72,9 +79,15 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { source_root = %ShortRoot(&aggregation.data.source.root.0), "Received aggregated attestation from gossip" ); - server - .blockchain - .notify_new_aggregated_attestation(aggregation); + if let Some(ref recipient) = server.new_aggregated { + let _ = recipient + .send(NewAggregatedAttestation { + attestation: aggregation, + }) + .inspect_err( + |err| error!(%err, "Failed to forward aggregated attestation to blockchain"), + ); + } } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { let Ok(uncompressed_data) = decompress_message(&message.data) @@ -100,7 +113,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { source_root = %ShortRoot(&signed_attestation.data.source.root.0), "Received attestation from gossip" ); - server.blockchain.notify_new_attestation(signed_attestation); + if let Some(ref recipient) = server.new_attestation { + let _ = recipient + .send(NewAttestation { + attestation: signed_attestation, + }) + .inspect_err(|err| error!(%err, "Failed to forward attestation to blockchain")); + } } _ => { trace!("Received message on unknown topic: {}", message.topic); @@ -119,23 +138,18 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte let compressed = compress_message(&ssz_bytes); // Publish to the attestation subnet topic - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.attestation_topic.clone(), compressed) - .inspect(|_| info!( - %slot, - validator, - target_slot = attestation.data.target.slot, - target_root = %ShortRoot(&attestation.data.target.root.0), - source_slot = attestation.data.source.slot, - source_root = %ShortRoot(&attestation.data.source.root.0), - "Published attestation to gossipsub" - )) - .inspect_err(|err| { - tracing::warn!(%slot, %validator, %err, "Failed to publish attestation to gossipsub") - }); + server + .swarm_handle + .publish(server.attestation_topic.clone(), compressed); + info!( + %slot, + validator, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published attestation to gossipsub" + ); } pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWithAttestation) { @@ -152,24 +166,17 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWith let compressed = compress_message(&ssz_bytes); // Publish to gossipsub - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.block_topic.clone(), compressed) - .inspect(|_| { - info!( - %slot, - proposer, - block_root = %ShortRoot(&block_root.0), - parent_root = %ShortRoot(&parent_root.0), - attestation_count, - "Published block to gossipsub" - ) - }) - .inspect_err( - |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), - ); + server + .swarm_handle + .publish(server.block_topic.clone(), compressed); + info!( + %slot, + proposer, + block_root = %ShortRoot(&block_root.0), + parent_root = %ShortRoot(&parent_root.0), + attestation_count, + "Published block to gossipsub" + ); } pub async fn publish_aggregated_attestation( @@ -185,22 +192,15 @@ pub async fn publish_aggregated_attestation( let compressed = compress_message(&ssz_bytes); // Publish to the aggregation topic - let _ = server - .swarm - .behaviour_mut() - .gossipsub - .publish(server.aggregation_topic.clone(), compressed) - .inspect(|_| { - info!( - %slot, - target_slot = attestation.data.target.slot, - target_root = %ShortRoot(&attestation.data.target.root.0), - source_slot = attestation.data.source.slot, - source_root = %ShortRoot(&attestation.data.source.root.0), - "Published aggregated attestation to gossipsub" - ) - }) - .inspect_err(|err| { - tracing::warn!(%slot, %err, "Failed to publish aggregated attestation to gossipsub") - }); + server + .swarm_handle + .publish(server.aggregation_topic.clone(), compressed); + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published aggregated attestation to gossipsub" + ); } diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2ebec1d3..1d4b6767 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,15 +4,18 @@ use std::{ time::Duration, }; -use ethlambda_blockchain::{BlockChain, P2PMessage}; +use ethlambda_network_api::{ + FetchBlock, InitBlockChain, NewAggregatedAttestation, NewAttestation, NewBlock, + PublishAggregatedAttestation, PublishAttestation, PublishBlock, +}; use ethlambda_storage::Store; use ethlambda_types::primitives::H256; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; +use futures::StreamExt; use libp2p::{ Multiaddr, PeerId, StreamProtocol, - futures::StreamExt, gossipsub::{MessageAuthenticity, ValidationMode}, identity::{PublicKey, secp256k1}, multiaddr::Protocol, @@ -20,7 +23,13 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent}, }; use sha2::Digest; -use tokio::sync::mpsc; +use spawned_concurrency::actor; +use spawned_concurrency::error::ActorError; +use spawned_concurrency::message::Message; +use spawned_concurrency::protocol; +use spawned_concurrency::tasks::{ + Actor, ActorRef, ActorStart, Context, Handler, Recipient, send_after, spawn_listener, +}; use tracing::{info, trace, warn}; use crate::{ @@ -32,11 +41,13 @@ use crate::{ BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, }, + swarm_adapter::SwarmHandle, }; mod gossipsub; pub mod metrics; mod req_resp; +pub(crate) mod swarm_adapter; pub use metrics::populate_name_registry; @@ -46,29 +57,44 @@ const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; -enum RetryMessage { - BlockFetch(H256), - PeerRedial(PeerId), -} - pub(crate) struct PendingRequest { pub(crate) attempts: u32, pub(crate) last_peer: Option, } -#[allow(clippy::too_many_arguments)] -pub async fn start_p2p( - node_key: Vec, - bootnodes: Vec, - listening_socket: SocketAddr, - blockchain: BlockChain, - p2p_rx: mpsc::UnboundedReceiver, - store: Store, - validator_id: Option, - attestation_committee_count: u64, - is_aggregator: bool, -) -> Result<(), libp2p::gossipsub::SubscriptionError> { - let config = libp2p::gossipsub::ConfigBuilder::default() +// --- Swarm construction --- + +/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour { + gossipsub: libp2p::gossipsub::Behaviour, + req_resp: request_response::Behaviour, +} + +/// Configuration for building the libp2p swarm. +pub struct SwarmConfig { + pub node_key: Vec, + pub bootnodes: Vec, + pub listening_socket: SocketAddr, + pub validator_id: Option, + pub attestation_committee_count: u64, + pub is_aggregator: bool, +} + +/// Result of building the swarm — contains all pieces needed to start the P2P actor. +pub struct BuiltSwarm { + pub(crate) swarm: libp2p::Swarm, + pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) bootnode_addrs: HashMap, +} + +/// Build and configure the libp2p swarm, dial bootnodes, subscribe to topics. +pub fn build_swarm( + config: SwarmConfig, +) -> Result { + let gossipsub_config = libp2p::gossipsub::ConfigBuilder::default() // d .mesh_n(8) // d_low @@ -93,8 +119,9 @@ pub async fn start_p2p( .build() .expect("invalid gossipsub config"); - let gossipsub = libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, config) - .expect("failed to initiate behaviour"); + let gossipsub = + libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, gossipsub_config) + .expect("failed to initiate behaviour"); let req_resp = request_response::Behaviour::new( vec![ @@ -117,24 +144,23 @@ pub async fn start_p2p( // TODO: set peer scoring params - let secret_key = secp256k1::SecretKey::try_from_bytes(node_key).expect("invalid node key"); + let secret_key = + secp256k1::SecretKey::try_from_bytes(config.node_key).expect("invalid node key"); let identity = libp2p::identity::Keypair::from(secp256k1::Keypair::from(secret_key)); - // TODO: implement Executor with spawned? - // libp2p::swarm::Config::with_executor(executor) let mut swarm = libp2p::SwarmBuilder::with_existing_identity(identity) .with_tokio() .with_quic() .with_behaviour(|_| behavior) .expect("failed to add behaviour to swarm") - .with_swarm_config(|config| { + .with_swarm_config(|c| { // Disable idle connection timeout - config.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + c.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) }) .build(); let local_peer_id = *swarm.local_peer_id(); let mut bootnode_addrs = HashMap::new(); - for bootnode in bootnodes { + for bootnode in config.bootnodes { let peer_id = PeerId::from_public_key(&bootnode.public_key); if peer_id == local_peer_id { continue; @@ -149,8 +175,8 @@ pub async fn start_p2p( swarm.dial(addr).unwrap(); } let addr = Multiaddr::empty() - .with(listening_socket.ip().into()) - .with(Protocol::Udp(listening_socket.port())) + .with(config.listening_socket.ip().into()) + .with(Protocol::Udp(config.listening_socket.port())) .with(Protocol::QuicV1); swarm .listen_on(addr) @@ -179,7 +205,9 @@ pub async fn start_p2p( // Build attestation subnet topic (needed for publishing even without subscribing) // attestation_committee_count is validated to be >= 1 by clap at CLI parse time. - let subnet_id = validator_id.map(|vid| vid % attestation_committee_count); + let subnet_id = config + .validator_id + .map(|vid| vid % config.attestation_committee_count); let attestation_topic_kind = match subnet_id { Some(id) => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{id}"), // Non-validators use subnet 0 for publishing @@ -191,7 +219,7 @@ pub async fn start_p2p( // Only aggregators subscribe to attestation subnets; non-aggregators // publish via gossipsub's fanout mechanism without subscribing. - if is_aggregator { + if config.is_aggregator { swarm .behaviour_mut() .gossipsub @@ -199,88 +227,191 @@ pub async fn start_p2p( info!(%attestation_topic_kind, "Subscribed to attestation subnet"); } - info!(socket=%listening_socket, "P2P node started"); - - let (retry_tx, retry_rx) = mpsc::unbounded_channel(); + info!(socket=%config.listening_socket, "P2P node started"); - let server = P2PServer { + Ok(BuiltSwarm { swarm, - blockchain, - store, - p2p_rx, attestation_topic, block_topic, aggregation_topic, - connected_peers: HashSet::new(), - pending_requests: HashMap::new(), - request_id_map: HashMap::new(), bootnode_addrs, - retry_tx, - retry_rx, - }; + }) +} + +// --- P2P Actor --- - event_loop(server).await; - Ok(()) +/// Public handle to the P2P actor. +pub struct P2P { + handle: ActorRef, } -/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours -#[derive(NetworkBehaviour)] -pub(crate) struct Behaviour { - gossipsub: libp2p::gossipsub::Behaviour, - req_resp: request_response::Behaviour, +impl P2P { + /// Build swarm, start I/O adapter, spawn actor, and wire the swarm event stream. + pub fn spawn(built: BuiltSwarm, store: Store) -> P2P { + let (swarm_stream, swarm_handle) = swarm_adapter::start_swarm_adapter(built.swarm); + + let server = P2PServer { + swarm_handle, + store, + new_block: None, + new_attestation: None, + new_aggregated: None, + attestation_topic: built.attestation_topic, + block_topic: built.block_topic, + aggregation_topic: built.aggregation_topic, + connected_peers: HashSet::new(), + pending_requests: HashMap::new(), + request_id_map: HashMap::new(), + bootnode_addrs: built.bootnode_addrs, + }; + let handle = server.start(); + spawn_listener(handle.context(), swarm_stream.map(WrappedSwarmEvent)); + P2P { handle } + } + + pub fn actor_ref(&self) -> &ActorRef { + &self.handle + } } -pub(crate) struct P2PServer { - pub(crate) swarm: libp2p::Swarm, - pub(crate) blockchain: BlockChain, +/// Message wrapper for swarm events. Not part of the protocol because +/// `SwarmEvent` contains non-Clone types (e.g. `ResponseChannel`). +pub(crate) struct WrappedSwarmEvent(SwarmEvent); +impl Message for WrappedSwarmEvent { + type Result = (); +} + +/// P2P actor state. +pub struct P2PServer { + pub(crate) swarm_handle: SwarmHandle, pub(crate) store: Store, - pub(crate) p2p_rx: mpsc::UnboundedReceiver, + + // BlockChain actor recipients (set via InitBlockChain message) + pub(crate) new_block: Option>, + pub(crate) new_attestation: Option>, + pub(crate) new_aggregated: Option>, + pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, + pub(crate) connected_peers: HashSet, - pub(crate) pending_requests: HashMap, - pub(crate) request_id_map: HashMap, - /// Bootnode addresses for redialing when disconnected + pub(crate) pending_requests: HashMap, + pub(crate) request_id_map: HashMap, bootnode_addrs: HashMap, - /// Channel for scheduling retries (block fetches and peer redials) - pub(crate) retry_tx: mpsc::UnboundedSender, - retry_rx: mpsc::UnboundedReceiver, } -/// Event loop for the P2P crate. -/// Processes swarm events, incoming requests, responses, gossip, and outgoing messages from blockchain. -async fn event_loop(mut server: P2PServer) { - loop { - tokio::select! { - biased; - - message = server.p2p_rx.recv() => { - let Some(message) = message else { - break; - }; - handle_p2p_message(&mut server, message).await; - } - event = server.swarm.next() => { - let Some(event) = event else { - break; - }; - handle_swarm_event(&mut server, event).await; - } - Some(msg) = server.retry_rx.recv() => { - match msg { - RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await, - RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await, - } - } +// Protocol trait for internal messages only (retry scheduling). +// Network-api messages and swarm events are handled via manual Handler impls. +#[protocol] +pub(crate) trait P2PProtocol: Send + Sync { + #[allow(dead_code)] // invoked via send_after, not called directly + fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>; + #[allow(dead_code)] // invoked via send_after, not called directly + fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>; +} + +#[actor(protocol = P2PProtocol)] +impl P2PServer { + #[send_handler] + async fn handle_retry_block_fetch( + &mut self, + msg: p2_p_protocol::RetryBlockFetch, + _ctx: &Context, + ) { + let root = msg.root; + // Check if still pending (might have succeeded during backoff) + if !self.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch completed during backoff, skipping retry"); + return; + } + + info!(%root, "Retrying block fetch after backoff"); + + if !fetch_block_from_peer(self, root).await { + tracing::error!(%root, "Failed to retry block fetch, giving up"); + self.pending_requests.remove(&root); + } + } + + #[send_handler] + async fn handle_retry_peer_redial( + &mut self, + msg: p2_p_protocol::RetryPeerRedial, + _ctx: &Context, + ) { + let peer_id = msg.peer_id; + + // Skip if already reconnected + if self.connected_peers.contains(&peer_id) { + trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); + return; + } + + if let Some(addr) = self.bootnode_addrs.get(&peer_id) { + info!(%peer_id, "Redialing disconnected bootnode"); + self.swarm_handle.dial(addr.clone()); } } } -async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent) { +// --- Manual Handler impls for network-api messages --- + +impl Handler for P2PServer { + async fn handle(&mut self, msg: InitBlockChain, _ctx: &Context) { + self.new_block = Some(msg.new_block); + self.new_attestation = Some(msg.new_attestation); + self.new_aggregated = Some(msg.new_aggregated); + info!("BlockChain recipients initialized"); + } +} + +impl Handler for P2PServer { + async fn handle(&mut self, msg: PublishBlock, _ctx: &Context) { + publish_block(self, msg.block).await; + } +} + +impl Handler for P2PServer { + async fn handle(&mut self, msg: PublishAttestation, _ctx: &Context) { + publish_attestation(self, msg.attestation).await; + } +} + +impl Handler for P2PServer { + async fn handle(&mut self, msg: PublishAggregatedAttestation, _ctx: &Context) { + publish_aggregated_attestation(self, msg.attestation).await; + } +} + +impl Handler for P2PServer { + async fn handle(&mut self, msg: FetchBlock, _ctx: &Context) { + let root = msg.root; + // Deduplicate - if already pending, ignore + if self.pending_requests.contains_key(&root) { + trace!(%root, "Block fetch already in progress, ignoring duplicate"); + return; + } + fetch_block_from_peer(self, root).await; + } +} + +// --- Manual Handler for swarm events --- + +impl Handler for P2PServer { + async fn handle(&mut self, msg: WrappedSwarmEvent, ctx: &Context) { + handle_swarm_event(self, msg.0, ctx).await; + } +} + +async fn handle_swarm_event( + server: &mut P2PServer, + event: SwarmEvent, + ctx: &Context, +) { match event { SwarmEvent::Behaviour(BehaviourEvent::ReqResp(req_resp_event)) => { - req_resp::handle_req_resp_message(server, req_resp_event).await; + req_resp::handle_req_resp_message(server, req_resp_event, ctx).await; } SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( message @ libp2p::gossipsub::Event::Message { .. }, @@ -311,14 +442,13 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent { - publish_attestation(server, attestation).await; - } - P2PMessage::PublishBlock(signed_block) => { - publish_block(server, signed_block).await; - } - P2PMessage::PublishAggregatedAttestation(attestation) => { - publish_aggregated_attestation(server, attestation).await; - } - P2PMessage::FetchBlock(root) => { - // Deduplicate - if already pending, ignore - if server.pending_requests.contains_key(&root) { - trace!(%root, "Block fetch already in progress, ignoring duplicate"); - return; - } - - // Send request and track it (tracking handled internally by fetch_block_from_peer) - fetch_block_from_peer(server, root).await; - } - } -} - -async fn handle_retry(server: &mut P2PServer, root: H256) { - // Check if still pending (might have succeeded during backoff) - if !server.pending_requests.contains_key(&root) { - trace!(%root, "Block fetch completed during backoff, skipping retry"); - return; - } - - info!(%root, "Retrying block fetch after backoff"); - - // Retry the fetch (tracking handled internally by fetch_block_from_peer) - if !fetch_block_from_peer(server, root).await { - tracing::error!(%root, "Failed to retry block fetch, giving up"); - server.pending_requests.remove(&root); - } -} - -async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { - // Skip if already reconnected - if server.connected_peers.contains(&peer_id) { - trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); - return; - } - - if let Some(addr) = server.bootnode_addrs.get(&peer_id) { - info!(%peer_id, "Redialing disconnected bootnode"); - // NOTE: this dial does some checks and adds a pending outbound connection attempt. - // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. - let _ = server.swarm.dial(addr.clone()).inspect_err(|e| { - warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); - // Schedule another redial attempt - schedule_peer_redial(server.retry_tx.clone(), peer_id); - }); - } -} - -/// Schedules a peer redial after the configured delay interval. -pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender, peer_id: PeerId) { - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; - let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); - }); -} +// --- Bootnode parsing --- pub struct Bootnode { pub(crate) ip: IpAddr, @@ -524,6 +597,8 @@ pub fn parse_enrs(enrs: Vec) -> Vec { bootnodes } +// --- Utility functions --- + fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str { if endpoint.is_dialer() { "outbound" diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index e08f4357..6e3623f4 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -1,7 +1,9 @@ +use ethlambda_network_api::NewBlock; use ethlambda_storage::Store; use libp2p::{PeerId, request_response}; use rand::seq::SliceRandom; -use tokio::time::Duration; +use spawned_concurrency::tasks::{Context, send_after}; +use std::time::Duration; use tracing::{debug, error, info, warn}; use ethlambda_types::checkpoint::Checkpoint; @@ -13,12 +15,13 @@ use super::{ }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - RetryMessage, req_resp::RequestedBlockRoots, + p2_p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( server: &mut P2PServer, event: request_response::Event, + ctx: &Context, ) { match event { request_response::Event::Message { peer, message, .. } => match message { @@ -41,7 +44,7 @@ pub async fn handle_req_resp_message( handle_status_response(status, peer).await; } ResponsePayload::BlocksByRoot(blocks) => { - handle_blocks_by_root_response(server, blocks, peer, request_id).await; + handle_blocks_by_root_response(server, blocks, peer, request_id, ctx).await; } }, Response::Error { code, message } => { @@ -60,7 +63,7 @@ pub async fn handle_req_resp_message( // Check if this was a block fetch request if let Some(root) = server.request_id_map.remove(&request_id) { - handle_fetch_failure(server, root, peer).await; + handle_fetch_failure(server, root, peer, ctx).await; } } request_response::Event::InboundFailure { @@ -87,15 +90,8 @@ async fn handle_status_request( ) { info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); let our_status = build_status(&server.store); - server - .swarm - .behaviour_mut() - .req_resp - .send_response( - channel, - Response::success(ResponsePayload::Status(our_status)), - ) - .unwrap(); + let response = Response::success(ResponsePayload::Status(our_status)); + server.swarm_handle.send_response(channel, response); } async fn handle_status_response(status: Status, peer: PeerId) { @@ -123,12 +119,7 @@ async fn handle_blocks_by_root_request( info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); - let _ = server - .swarm - .behaviour_mut() - .req_resp - .send_response(channel, response) - .inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response")); + server.swarm_handle.send_response(channel, response); } async fn handle_blocks_by_root_response( @@ -136,6 +127,7 @@ async fn handle_blocks_by_root_response( blocks: Vec, peer: PeerId, request_id: request_response::OutboundRequestId, + ctx: &Context, ) { info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); @@ -148,7 +140,7 @@ async fn handle_blocks_by_root_response( if blocks.is_empty() { server.request_id_map.insert(request_id, requested_root); warn!(%peer, "Received empty BlocksByRoot response"); - handle_fetch_failure(server, requested_root, peer).await; + handle_fetch_failure(server, requested_root, peer, ctx).await; return; } @@ -169,7 +161,11 @@ async fn handle_blocks_by_root_response( // Clean up tracking for this root server.pending_requests.remove(&root); - server.blockchain.notify_new_block(block); + if let Some(ref recipient) = server.new_block { + let _ = recipient + .send(NewBlock { block }) + .inspect_err(|err| error!(%err, "Failed to forward fetched block to blockchain")); + } } } @@ -217,15 +213,18 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { let request = BlocksByRootRequest { roots }; info!(%peer, %root, "Sending BlocksByRoot request for missing block"); - let request_id = server - .swarm - .behaviour_mut() - .req_resp - .send_request_with_protocol( - &peer, + let Some(request_id) = server + .swarm_handle + .send_request( + peer, Request::BlocksByRoot(request), libp2p::StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), - ); + ) + .await + else { + warn!(%root, "Failed to send BlocksByRoot request (swarm adapter closed)"); + return false; + }; // Track the request if not already tracked (new request) let pending = server @@ -245,7 +244,12 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { true } -async fn handle_fetch_failure(server: &mut P2PServer, root: H256, peer: PeerId) { +async fn handle_fetch_failure( + server: &mut P2PServer, + root: H256, + peer: PeerId, + ctx: &Context, +) { let Some(pending) = server.pending_requests.get_mut(&root) else { return; }; @@ -264,9 +268,9 @@ async fn handle_fetch_failure(server: &mut P2PServer, root: H256, peer: PeerId) pending.attempts += 1; - let retry_tx = server.retry_tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(backoff).await; - let _ = retry_tx.send(RetryMessage::BlockFetch(root)); - }); + send_after( + backoff, + ctx.clone(), + p2_p_protocol::RetryBlockFetch { root }, + ); } diff --git a/crates/net/p2p/src/swarm_adapter.rs b/crates/net/p2p/src/swarm_adapter.rs new file mode 100644 index 00000000..c0f3bb1b --- /dev/null +++ b/crates/net/p2p/src/swarm_adapter.rs @@ -0,0 +1,159 @@ +use libp2p::{ + Multiaddr, PeerId, StreamProtocol, + futures::StreamExt, + request_response::{self, OutboundRequestId}, + swarm::SwarmEvent, +}; +use tokio::sync::mpsc; +use tracing::warn; + +use crate::{Behaviour, BehaviourEvent, req_resp::Request, req_resp::Response}; + +pub enum SwarmCommand { + Publish { + topic: libp2p::gossipsub::IdentTopic, + data: Vec, + }, + Dial(Multiaddr), + SendRequest { + peer: PeerId, + request: Request, + protocol: StreamProtocol, + /// Callback to report the assigned OutboundRequestId. + request_id_tx: Option>, + }, + SendResponse { + channel: request_response::ResponseChannel, + response: Response, + }, +} + +#[derive(Clone)] +pub struct SwarmHandle { + cmd_tx: mpsc::UnboundedSender, +} + +impl SwarmHandle { + pub fn publish(&self, topic: libp2p::gossipsub::IdentTopic, data: Vec) { + let _ = self + .cmd_tx + .send(SwarmCommand::Publish { topic, data }) + .inspect_err(|_| warn!("Swarm adapter closed, cannot publish")); + } + + pub fn dial(&self, addr: Multiaddr) { + let _ = self + .cmd_tx + .send(SwarmCommand::Dial(addr)) + .inspect_err(|_| warn!("Swarm adapter closed, cannot dial")); + } + + /// Send a request and return the assigned OutboundRequestId. + /// Must be called from an async context (actor handlers are async). + pub async fn send_request( + &self, + peer: PeerId, + request: Request, + protocol: StreamProtocol, + ) -> Option { + let (tx, rx) = tokio::sync::oneshot::channel(); + if self + .cmd_tx + .send(SwarmCommand::SendRequest { + peer, + request, + protocol, + request_id_tx: Some(tx), + }) + .is_err() + { + warn!("Swarm adapter closed, cannot send request"); + return None; + } + rx.await.ok() + } + + pub fn send_response( + &self, + channel: request_response::ResponseChannel, + response: Response, + ) { + let _ = self + .cmd_tx + .send(SwarmCommand::SendResponse { channel, response }) + .inspect_err(|_| warn!("Swarm adapter closed, cannot send response")); + } +} + +pub fn start_swarm_adapter( + swarm: libp2p::Swarm, +) -> ( + impl futures::Stream>, + SwarmHandle, +) { + let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + + tokio::spawn(swarm_loop(swarm, event_tx, cmd_rx)); + + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx); + let handle = SwarmHandle { cmd_tx }; + (stream, handle) +} + +async fn swarm_loop( + mut swarm: libp2p::Swarm, + event_tx: mpsc::UnboundedSender>, + mut cmd_rx: mpsc::UnboundedReceiver, +) { + loop { + tokio::select! { + event = swarm.next() => { + let Some(event) = event else { break }; + if event_tx.send(event).is_err() { break } + } + cmd = cmd_rx.recv() => { + let Some(cmd) = cmd else { break }; + execute_command(&mut swarm, cmd); + } + } + } +} + +fn execute_command(swarm: &mut libp2p::Swarm, cmd: SwarmCommand) { + match cmd { + SwarmCommand::Publish { topic, data } => { + let _ = swarm + .behaviour_mut() + .gossipsub + .publish(topic, data) + .inspect_err(|err| warn!(%err, "Swarm adapter: publish failed")); + } + SwarmCommand::Dial(addr) => { + let _ = swarm + .dial(addr) + .inspect_err(|err| warn!(%err, "Swarm adapter: dial failed")); + } + SwarmCommand::SendRequest { + peer, + request, + protocol, + request_id_tx, + } => { + let request_id = swarm + .behaviour_mut() + .req_resp + .send_request_with_protocol(&peer, request, protocol); + if let Some(tx) = request_id_tx { + let _ = tx.send(request_id); + } + } + SwarmCommand::SendResponse { channel, response } => { + let _ = swarm + .behaviour_mut() + .req_resp + .send_response(channel, response) + .inspect_err(|err| warn!(?err, "Swarm adapter: send_response failed")); + } + } +}