From 946eff0dbe030a50fc1ffbcedb603e7c643c541c Mon Sep 17 00:00:00 2001 From: TheNewAutonomy Date: Thu, 26 Feb 2026 23:48:01 +0000 Subject: [PATCH] p2p: make safety limits configurable Add configurable safety limits for message/frame caps, per-peer/per-conn budgets, hop limits, and dial jitter/backoff caps. Wire these from node config into the networking layer and make the relay dedup cache bounds configurable as well. Made-with: Cursor --- crates/catalyst-cli/src/config.rs | 103 +++++++++++++++++++++++++ crates/catalyst-cli/src/node.rs | 56 ++++++++++++-- crates/catalyst-network/src/config.rs | 88 +++++++++++++++++++++ crates/catalyst-network/src/service.rs | 35 +++++---- crates/catalyst-network/src/simple.rs | 46 ++++++----- 5 files changed, 288 insertions(+), 40 deletions(-) diff --git a/crates/catalyst-cli/src/config.rs b/crates/catalyst-cli/src/config.rs index c50a419..d7acb07 100644 --- a/crates/catalyst-cli/src/config.rs +++ b/crates/catalyst-cli/src/config.rs @@ -141,6 +141,62 @@ pub struct NetworkConfig { /// Network timeouts pub timeouts: NetworkTimeouts, + + /// P2P safety limits (DoS bounding) applied by the networking layer. + #[serde(default)] + pub safety_limits: P2pSafetyLimits, + + /// Relay cache bounds (used for multi-hop rebroadcast + dedup). + #[serde(default)] + pub relay_cache: RelayCacheConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct P2pSafetyLimits { + pub max_gossip_message_bytes: usize, + pub per_peer_max_msgs_per_sec: u32, + pub per_peer_max_bytes_per_sec: usize, + pub max_tcp_frame_bytes: usize, + pub per_conn_max_msgs_per_sec: u32, + pub per_conn_max_bytes_per_sec: usize, + pub max_hops: u8, + pub dedup_cache_max_entries: usize, + pub dial_jitter_max_ms: u64, + pub dial_backoff_max_ms: u64, +} + +impl Default for P2pSafetyLimits { + fn default() -> Self { + Self { + max_gossip_message_bytes: 8 * 1024 * 1024, + per_peer_max_msgs_per_sec: 200, + per_peer_max_bytes_per_sec: 8 * 1024 * 1024, + max_tcp_frame_bytes: 8 * 1024 * 1024, + per_conn_max_msgs_per_sec: 200, + per_conn_max_bytes_per_sec: 8 * 1024 * 1024, + max_hops: 10, + dedup_cache_max_entries: 20_000, + dial_jitter_max_ms: 250, + dial_backoff_max_ms: 60_000, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RelayCacheConfig { + pub max_entries: usize, + pub target_entries: usize, + pub retention_seconds: u64, +} + +impl Default for RelayCacheConfig { + fn default() -> Self { + Self { + max_entries: 5000, + target_entries: 4000, + retention_seconds: 10 * 60, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -458,6 +514,8 @@ impl Default for NodeConfig { request_timeout: 10, keep_alive_interval: 30, }, + safety_limits: P2pSafetyLimits::default(), + relay_cache: RelayCacheConfig::default(), }, storage: StorageConfig { data_dir: PathBuf::from("data"), @@ -669,6 +727,51 @@ impl NodeConfig { if self.network.listen_addresses.is_empty() { return Err(anyhow::anyhow!("At least one listen address must be specified")); } + + // Validate P2P safety limits + { + let sl = &self.network.safety_limits; + if sl.max_gossip_message_bytes == 0 + || sl.max_tcp_frame_bytes == 0 + || sl.per_peer_max_msgs_per_sec == 0 + || sl.per_conn_max_msgs_per_sec == 0 + || sl.per_peer_max_bytes_per_sec == 0 + || sl.per_conn_max_bytes_per_sec == 0 + { + return Err(anyhow::anyhow!("network.safety_limits values must be > 0")); + } + if sl.max_hops == 0 { + return Err(anyhow::anyhow!("network.safety_limits.max_hops must be > 0")); + } + if sl.dedup_cache_max_entries == 0 { + return Err(anyhow::anyhow!( + "network.safety_limits.dedup_cache_max_entries must be > 0" + )); + } + if sl.dial_backoff_max_ms == 0 { + return Err(anyhow::anyhow!( + "network.safety_limits.dial_backoff_max_ms must be > 0" + )); + } + } + + // Validate relay cache bounds + { + let rc = &self.network.relay_cache; + if rc.max_entries == 0 || rc.target_entries == 0 { + return Err(anyhow::anyhow!("network.relay_cache max/target must be > 0")); + } + if rc.max_entries < rc.target_entries { + return Err(anyhow::anyhow!( + "network.relay_cache.max_entries must be >= target_entries" + )); + } + if rc.retention_seconds == 0 { + return Err(anyhow::anyhow!( + "network.relay_cache.retention_seconds must be > 0" + )); + } + } // Validate consensus configuration if self.consensus.cycle_duration_seconds < 10 { diff --git a/crates/catalyst-cli/src/node.rs b/crates/catalyst-cli/src/node.rs index 8f81b9c..ee08111 100644 --- a/crates/catalyst-cli/src/node.rs +++ b/crates/catalyst-cli/src/node.rs @@ -133,12 +133,37 @@ async fn persist_lsu_history( .await; } -#[derive(Debug, Default)] +#[derive(Debug, Clone)] +struct RelayCacheCfg { + max_entries: usize, + target_entries: usize, + retention_ms: u64, +} + +impl Default for RelayCacheCfg { + fn default() -> Self { + Self { + max_entries: 5000, + target_entries: 4000, + retention_ms: 10 * 60 * 1000, + } + } +} + +#[derive(Debug)] struct RelayCache { + cfg: RelayCacheCfg, seen: std::collections::HashMap, } impl RelayCache { + fn new(cfg: RelayCacheCfg) -> Self { + Self { + cfg, + seen: std::collections::HashMap::new(), + } + } + fn should_relay(&mut self, env: &MessageEnvelope, now_ms: u64) -> bool { if env.is_expired() { return false; @@ -154,20 +179,18 @@ impl RelayCache { self.seen.insert(env.id.clone(), now_ms); // Prune old ids (best-effort). - let keep_after = now_ms.saturating_sub(10 * 60 * 1000); + let keep_after = now_ms.saturating_sub(self.cfg.retention_ms); self.seen.retain(|_, ts| *ts >= keep_after); // Cap size to prevent unbounded growth under attack. - const MAX: usize = 5000; - const TARGET: usize = 4000; - if self.seen.len() > MAX { + if self.seen.len() > self.cfg.max_entries { let mut v: Vec<(String, u64)> = self .seen .iter() .map(|(k, ts)| (k.clone(), *ts)) .collect(); v.sort_by_key(|(_, ts)| *ts); - let drop_n = v.len().saturating_sub(TARGET); + let drop_n = v.len().saturating_sub(self.cfg.target_entries); for (k, _) in v.into_iter().take(drop_n) { self.seen.remove(&k); } @@ -1946,6 +1969,20 @@ impl CatalystNode { // Put keypair in node dir (even if unused by simple transport). net_cfg.peer.keypair_path = Some(self.config.storage.data_dir.join("p2p_keypair")); + net_cfg.peer.max_peers = self.config.network.max_peers as usize; + net_cfg.peer.min_peers = self.config.network.min_peers as usize; + + // Wire safety limits from node config. + net_cfg.safety_limits.max_gossip_message_bytes = self.config.network.safety_limits.max_gossip_message_bytes; + net_cfg.safety_limits.per_peer_max_msgs_per_sec = self.config.network.safety_limits.per_peer_max_msgs_per_sec; + net_cfg.safety_limits.per_peer_max_bytes_per_sec = self.config.network.safety_limits.per_peer_max_bytes_per_sec; + net_cfg.safety_limits.max_tcp_frame_bytes = self.config.network.safety_limits.max_tcp_frame_bytes; + net_cfg.safety_limits.per_conn_max_msgs_per_sec = self.config.network.safety_limits.per_conn_max_msgs_per_sec; + net_cfg.safety_limits.per_conn_max_bytes_per_sec = self.config.network.safety_limits.per_conn_max_bytes_per_sec; + net_cfg.safety_limits.max_hops = self.config.network.safety_limits.max_hops; + net_cfg.safety_limits.dedup_cache_max_entries = self.config.network.safety_limits.dedup_cache_max_entries; + net_cfg.safety_limits.dial_jitter_max_ms = self.config.network.safety_limits.dial_jitter_max_ms; + net_cfg.safety_limits.dial_backoff_max_ms = self.config.network.safety_limits.dial_backoff_max_ms; net_cfg.peer.bootstrap_peers = Vec::new(); let network = Arc::new(P2pService::new(net_cfg).await?); @@ -2431,8 +2468,13 @@ impl CatalystNode { let last_lsu: Arc>> = Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())); + let relay_cfg = RelayCacheCfg { + max_entries: self.config.network.relay_cache.max_entries, + target_entries: self.config.network.relay_cache.target_entries, + retention_ms: self.config.network.relay_cache.retention_seconds.saturating_mul(1000), + }; let relay_cache: Arc> = - Arc::new(tokio::sync::Mutex::new(RelayCache::default())); + Arc::new(tokio::sync::Mutex::new(RelayCache::new(relay_cfg))); #[derive(Debug, Default)] struct CatchupState { observed_head_cycle: u64, diff --git a/crates/catalyst-network/src/config.rs b/crates/catalyst-network/src/config.rs index 9c815e1..d801387 100644 --- a/crates/catalyst-network/src/config.rs +++ b/crates/catalyst-network/src/config.rs @@ -103,6 +103,9 @@ pub struct NetworkConfig { /// Gossip protocol configuration pub gossip: GossipConfig, + + /// Safety limits (DoS bounding) applied by transports. + pub safety_limits: SafetyLimitsConfig, /// Discovery configuration pub discovery: DiscoveryConfig, @@ -120,6 +123,40 @@ pub struct NetworkConfig { pub monitoring: MonitoringConfig, } +/// Transport-agnostic DoS safety limits. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SafetyLimitsConfig { + /// Maximum gossipsub message bytes accepted by the libp2p transport. + pub max_gossip_message_bytes: usize, + + /// Per-peer message rate limit (msgs/sec) for libp2p transport. + pub per_peer_max_msgs_per_sec: u32, + + /// Per-peer bandwidth cap (bytes/sec) for libp2p transport. + pub per_peer_max_bytes_per_sec: usize, + + /// Maximum TCP frame bytes accepted by the simple transport. + pub max_tcp_frame_bytes: usize, + + /// Per-connection message rate limit (msgs/sec) for simple transport. + pub per_conn_max_msgs_per_sec: u32, + + /// Per-connection bandwidth cap (bytes/sec) for simple transport. + pub per_conn_max_bytes_per_sec: usize, + + /// Maximum hops for multi-hop rebroadcast. + pub max_hops: u8, + + /// Maximum number of recently seen envelope ids stored for deduplication. + pub dedup_cache_max_entries: usize, + + /// Maximum dial jitter (milliseconds) applied to backoff scheduling. + pub dial_jitter_max_ms: u64, + + /// Maximum backoff cap (milliseconds) applied to exponential dial backoff. + pub dial_backoff_max_ms: u64, +} + /// Peer-specific configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerConfig { @@ -1727,6 +1764,7 @@ impl Default for NetworkConfig { peer: PeerConfig::default(), transport: TransportConfig::default(), gossip: GossipConfig::default(), + safety_limits: SafetyLimitsConfig::default(), discovery: DiscoveryConfig::default(), security: SecurityConfig::default(), bandwidth: BandwidthConfig::default(), @@ -1736,6 +1774,24 @@ impl Default for NetworkConfig { } } +impl Default for SafetyLimitsConfig { + fn default() -> Self { + Self { + // Match the previously hard-coded defaults in the transports. + max_gossip_message_bytes: 8 * 1024 * 1024, + per_peer_max_msgs_per_sec: 200, + per_peer_max_bytes_per_sec: 8 * 1024 * 1024, + max_tcp_frame_bytes: 8 * 1024 * 1024, + per_conn_max_msgs_per_sec: 200, + per_conn_max_bytes_per_sec: 8 * 1024 * 1024, + max_hops: 10, + dedup_cache_max_entries: 20_000, + dial_jitter_max_ms: 250, + dial_backoff_max_ms: 60_000, + } + } +} + impl Default for PeerConfig { fn default() -> Self { Self { @@ -2537,6 +2593,38 @@ impl NetworkConfig { )); } + // Validate safety limits + let sl = &self.safety_limits; + if sl.max_gossip_message_bytes == 0 || sl.max_tcp_frame_bytes == 0 { + return Err(NetworkError::ConfigError( + "safety_limits max message/frame bytes must be > 0".to_string(), + )); + } + if sl.per_peer_max_msgs_per_sec == 0 + || sl.per_conn_max_msgs_per_sec == 0 + || sl.per_peer_max_bytes_per_sec == 0 + || sl.per_conn_max_bytes_per_sec == 0 + { + return Err(NetworkError::ConfigError( + "safety_limits per-peer/per-conn budgets must be > 0".to_string(), + )); + } + if sl.max_hops == 0 { + return Err(NetworkError::ConfigError( + "safety_limits.max_hops must be > 0".to_string(), + )); + } + if sl.dedup_cache_max_entries == 0 { + return Err(NetworkError::ConfigError( + "safety_limits.dedup_cache_max_entries must be > 0".to_string(), + )); + } + if sl.dial_backoff_max_ms == 0 { + return Err(NetworkError::ConfigError( + "safety_limits.dial_backoff_max_ms must be > 0".to_string(), + )); + } + // Validate port ranges if self.transport.tcp_port_range.0 >= self.transport.tcp_port_range.1 { return Err(NetworkError::ConfigError( diff --git a/crates/catalyst-network/src/service.rs b/crates/catalyst-network/src/service.rs index 2d0a18d..b8cf261 100644 --- a/crates/catalyst-network/src/service.rs +++ b/crates/catalyst-network/src/service.rs @@ -35,11 +35,6 @@ use std::{ }; use tokio::sync::{mpsc, Mutex, RwLock}; -// --- Safety limits (anti-DoS) --- -// These are conservative defaults for public testnets. -const MAX_GOSSIP_MESSAGE_BYTES: usize = 8 * 1024 * 1024; // 8 MiB hard cap per message -const PER_PEER_MAX_MSGS_PER_SEC: u32 = 200; -const PER_PEER_MAX_BYTES_PER_SEC: usize = 8 * 1024 * 1024; // 8 MiB/s per peer const IDENTIFY_PROTOCOL_VERSION: &str = "catalyst/1"; #[derive(Debug, Clone)] @@ -62,7 +57,7 @@ struct PeerBudget { } impl PeerBudget { - fn allow(&mut self, now: Instant, size: usize) -> bool { + fn allow(&mut self, now: Instant, size: usize, max_msgs: u32, max_bytes: usize) -> bool { if now.duration_since(self.window_start) >= Duration::from_secs(1) { self.window_start = now; self.msgs = 0; @@ -70,7 +65,7 @@ impl PeerBudget { } self.msgs = self.msgs.saturating_add(1); self.bytes = self.bytes.saturating_add(size); - self.msgs <= PER_PEER_MAX_MSGS_PER_SEC && self.bytes <= PER_PEER_MAX_BYTES_PER_SEC + self.msgs <= max_msgs && self.bytes <= max_bytes } } @@ -259,6 +254,7 @@ impl NetworkService { let stats = self.stats.clone(); let peer_conns = self.peer_conns.clone(); let topic = self.topic.clone(); + let limits = self.config.safety_limits.clone(); // Bootstrap dial manager (WAN-hardening): retry with backoff+jitter until we meet `min_peers`. let bootstrap: Vec<(PeerId, Multiaddr)> = self.config.peer.bootstrap_peers.clone(); @@ -296,8 +292,9 @@ impl NetworkService { // Schedule next attempt before dialing to avoid tight loops. let attempts = dial_backoff.get(pid).map(|s| s.attempts).unwrap_or(0) + 1; - let backoff = compute_backoff(base_backoff, attempts).unwrap_or(base_backoff); - let jitter = jitter_ms(pid, attempts); + let backoff = compute_backoff(base_backoff, attempts, limits.dial_backoff_max_ms) + .unwrap_or(base_backoff); + let jitter = jitter_ms(pid, attempts, limits.dial_jitter_max_ms); dial_backoff.insert(*pid, DialBackoff { attempts, next_at: now + backoff + Duration::from_millis(jitter), @@ -327,7 +324,7 @@ impl NetworkService { }, SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(e)) => { if let gossipsub::Event::Message { message, propagation_source, .. } = e { - if message.data.len() > MAX_GOSSIP_MESSAGE_BYTES { + if message.data.len() > limits.max_gossip_message_bytes { continue; } let now = Instant::now(); @@ -336,7 +333,12 @@ impl NetworkService { msgs: 0, bytes: 0, }); - if !b.allow(now, message.data.len()) { + if !b.allow( + now, + message.data.len(), + limits.per_peer_max_msgs_per_sec, + limits.per_peer_max_bytes_per_sec, + ) { continue; } let env: MessageEnvelope = match decode_envelope_wire(&message.data) { @@ -471,21 +473,24 @@ impl NetworkService { } } -fn compute_backoff(base: Duration, attempts: u32) -> Option { +fn compute_backoff(base: Duration, attempts: u32, max_ms: u64) -> Option { let pow = attempts.saturating_sub(1).min(10); let mult = 1u64.checked_shl(pow)?; let ms = base.as_millis().saturating_mul(mult as u128); - let ms = ms.min(60_000); + let ms = ms.min(max_ms as u128); Some(Duration::from_millis(ms as u64)) } -fn jitter_ms(peer_id: &PeerId, attempts: u32) -> u64 { +fn jitter_ms(peer_id: &PeerId, attempts: u32, max_ms: u64) -> u64 { + if max_ms == 0 { + return 0; + } use std::hash::{Hash, Hasher}; let mut h = std::collections::hash_map::DefaultHasher::new(); peer_id.hash(&mut h); attempts.hash(&mut h); let v = h.finish(); - (v % 250) as u64 + (v % (max_ms + 1)) as u64 } fn load_or_generate_keypair(path: &Path) -> NetworkResult { diff --git a/crates/catalyst-network/src/simple.rs b/crates/catalyst-network/src/simple.rs index 9b27374..a62a211 100644 --- a/crates/catalyst-network/src/simple.rs +++ b/crates/catalyst-network/src/simple.rs @@ -25,17 +25,13 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex, RwLock}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -// --- Safety limits (anti-DoS) --- -// These are conservative defaults for public testnets. -const MAX_FRAME_BYTES: usize = 8 * 1024 * 1024; // 8 MiB hard cap per frame -const PER_CONN_MAX_MSGS_PER_SEC: u32 = 200; -const PER_CONN_MAX_BYTES_PER_SEC: usize = 8 * 1024 * 1024; // 8 MiB/s per connection - #[derive(Debug, Clone)] struct ConnBudget { window_start: std::time::Instant, msgs: u32, bytes: usize, + max_msgs: u32, + max_bytes: usize, } impl ConnBudget { @@ -47,7 +43,7 @@ impl ConnBudget { } self.msgs = self.msgs.saturating_add(1); self.bytes = self.bytes.saturating_add(size); - self.msgs <= PER_CONN_MAX_MSGS_PER_SEC && self.bytes <= PER_CONN_MAX_BYTES_PER_SEC + self.msgs <= self.max_msgs && self.bytes <= self.max_bytes } } @@ -196,6 +192,7 @@ impl NetworkService { let local_id = self.local_id.clone(); let seen = self.seen.clone(); let dedup_window = self.config.gossip.duplicate_detection_window; + let limits = self.config.safety_limits.clone(); let (out_tx, mut out_rx) = mpsc::unbounded_channel::>(); { @@ -221,9 +218,11 @@ impl NetworkService { window_start: std::time::Instant::now(), msgs: 0, bytes: 0, + max_msgs: limits.per_conn_max_msgs_per_sec, + max_bytes: limits.per_conn_max_bytes_per_sec, }; while let Some(Ok(bytes)) = stream.next().await { - if bytes.len() > MAX_FRAME_BYTES { + if bytes.len() > limits.max_tcp_frame_bytes { continue; } let now = std::time::Instant::now(); @@ -249,6 +248,9 @@ impl NetworkService { { let mut s = seen.lock().await; s.retain(|_, t| now.duration_since(*t) <= dedup_window); + if s.len() > limits.dedup_cache_max_entries { + s.clear(); + } if s.contains_key(&env.id) { continue; } @@ -270,7 +272,7 @@ impl NetworkService { // Multi-hop rebroadcast: forward broadcast envelopes to all peers except sender // with hop/loop limits. if env.target.is_none() && should_forward(&env, &local_id) { - if let Some(fwd) = forwarded(env, &local_id) { + if let Some(fwd) = forwarded(env, &local_id, limits.max_hops) { let bytes = match encode_envelope_wire(&fwd) { Ok(b) => b, Err(_) => continue, @@ -316,6 +318,7 @@ impl NetworkService { let mut incompatible: HashSet = HashSet::new(); let base = self.config.peer.retry_backoff; + let limits = self.config.safety_limits.clone(); let max_attempts = self.config.peer.max_retry_attempts; let mut tick = tokio::time::interval(self.config.discovery.bootstrap_interval); tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -345,8 +348,8 @@ impl NetworkService { let socket = *addr; let svc = self.clone(); let attempts_next = attempts.saturating_add(1); - let delay = compute_backoff(base, attempts_next).unwrap_or(base); - let jitter = jitter_ms(socket, attempts_next); + let delay = compute_backoff(base, attempts_next, limits.dial_backoff_max_ms).unwrap_or(base); + let jitter = jitter_ms(socket, attempts_next, limits.dial_jitter_max_ms); backoff.insert( *addr, ( @@ -409,11 +412,15 @@ fn should_forward(env: &MessageEnvelope, local_id: &str) -> bool { true } -fn forwarded(mut env: MessageEnvelope, local_id: &str) -> Option { +fn forwarded(mut env: MessageEnvelope, local_id: &str, default_max_hops: u8) -> Option { if env.is_expired() { return None; } - let mut r = env.routing_info.take().unwrap_or_else(|| RoutingInfo::new(10)); + // Preserve configured max_hops if the sender already attached routing info. + let mut r = env + .routing_info + .take() + .unwrap_or_else(|| RoutingInfo::new(default_max_hops)); if r.hop_count >= r.max_hops { return None; } @@ -426,22 +433,25 @@ fn forwarded(mut env: MessageEnvelope, local_id: &str) -> Option Option { - // base * 2^(attempts-1), clamped to 60s +fn compute_backoff(base: std::time::Duration, attempts: u32, max_ms: u64) -> Option { + // base * 2^(attempts-1), clamped to max_ms let pow = attempts.saturating_sub(1).min(10); // 2^10 = 1024x let mult = 1u64.checked_shl(pow)?; let ms = base.as_millis().saturating_mul(mult as u128); - let ms = ms.min(60_000); + let ms = ms.min(max_ms as u128); Some(std::time::Duration::from_millis(ms as u64)) } -fn jitter_ms(addr: SocketAddr, attempts: u32) -> u64 { +fn jitter_ms(addr: SocketAddr, attempts: u32, max_ms: u64) -> u64 { + if max_ms == 0 { + return 0; + } use std::hash::{Hash, Hasher}; let mut h = std::collections::hash_map::DefaultHasher::new(); addr.hash(&mut h); attempts.hash(&mut h); let v = h.finish(); - (v % 250) as u64 + (v % (max_ms + 1)) as u64 } fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option {