Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions crates/catalyst-cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,62 @@ pub struct NetworkConfig {

/// Network timeouts
pub timeouts: NetworkTimeouts,

/// P2P safety limits (DoS bounding) applied by the networking layer.
#[serde(default)]
pub safety_limits: P2pSafetyLimits,

/// Relay cache bounds (used for multi-hop rebroadcast + dedup).
#[serde(default)]
pub relay_cache: RelayCacheConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct P2pSafetyLimits {
pub max_gossip_message_bytes: usize,
pub per_peer_max_msgs_per_sec: u32,
pub per_peer_max_bytes_per_sec: usize,
pub max_tcp_frame_bytes: usize,
pub per_conn_max_msgs_per_sec: u32,
pub per_conn_max_bytes_per_sec: usize,
pub max_hops: u8,
pub dedup_cache_max_entries: usize,
pub dial_jitter_max_ms: u64,
pub dial_backoff_max_ms: u64,
}

impl Default for P2pSafetyLimits {
fn default() -> Self {
Self {
max_gossip_message_bytes: 8 * 1024 * 1024,
per_peer_max_msgs_per_sec: 200,
per_peer_max_bytes_per_sec: 8 * 1024 * 1024,
max_tcp_frame_bytes: 8 * 1024 * 1024,
per_conn_max_msgs_per_sec: 200,
per_conn_max_bytes_per_sec: 8 * 1024 * 1024,
max_hops: 10,
dedup_cache_max_entries: 20_000,
dial_jitter_max_ms: 250,
dial_backoff_max_ms: 60_000,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayCacheConfig {
pub max_entries: usize,
pub target_entries: usize,
pub retention_seconds: u64,
}

impl Default for RelayCacheConfig {
fn default() -> Self {
Self {
max_entries: 5000,
target_entries: 4000,
retention_seconds: 10 * 60,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -458,6 +514,8 @@ impl Default for NodeConfig {
request_timeout: 10,
keep_alive_interval: 30,
},
safety_limits: P2pSafetyLimits::default(),
relay_cache: RelayCacheConfig::default(),
},
storage: StorageConfig {
data_dir: PathBuf::from("data"),
Expand Down Expand Up @@ -669,6 +727,51 @@ impl NodeConfig {
if self.network.listen_addresses.is_empty() {
return Err(anyhow::anyhow!("At least one listen address must be specified"));
}

// Validate P2P safety limits
{
let sl = &self.network.safety_limits;
if sl.max_gossip_message_bytes == 0
|| sl.max_tcp_frame_bytes == 0
|| sl.per_peer_max_msgs_per_sec == 0
|| sl.per_conn_max_msgs_per_sec == 0
|| sl.per_peer_max_bytes_per_sec == 0
|| sl.per_conn_max_bytes_per_sec == 0
{
return Err(anyhow::anyhow!("network.safety_limits values must be > 0"));
}
if sl.max_hops == 0 {
return Err(anyhow::anyhow!("network.safety_limits.max_hops must be > 0"));
}
if sl.dedup_cache_max_entries == 0 {
return Err(anyhow::anyhow!(
"network.safety_limits.dedup_cache_max_entries must be > 0"
));
}
if sl.dial_backoff_max_ms == 0 {
return Err(anyhow::anyhow!(
"network.safety_limits.dial_backoff_max_ms must be > 0"
));
}
}

// Validate relay cache bounds
{
let rc = &self.network.relay_cache;
if rc.max_entries == 0 || rc.target_entries == 0 {
return Err(anyhow::anyhow!("network.relay_cache max/target must be > 0"));
}
if rc.max_entries < rc.target_entries {
return Err(anyhow::anyhow!(
"network.relay_cache.max_entries must be >= target_entries"
));
}
if rc.retention_seconds == 0 {
return Err(anyhow::anyhow!(
"network.relay_cache.retention_seconds must be > 0"
));
}
}

// Validate consensus configuration
if self.consensus.cycle_duration_seconds < 10 {
Expand Down
56 changes: 49 additions & 7 deletions crates/catalyst-cli/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,37 @@ async fn persist_lsu_history(
.await;
}

#[derive(Debug, Default)]
#[derive(Debug, Clone)]
struct RelayCacheCfg {
max_entries: usize,
target_entries: usize,
retention_ms: u64,
}

impl Default for RelayCacheCfg {
fn default() -> Self {
Self {
max_entries: 5000,
target_entries: 4000,
retention_ms: 10 * 60 * 1000,
}
}
}

#[derive(Debug)]
struct RelayCache {
cfg: RelayCacheCfg,
seen: std::collections::HashMap<String, u64>,
}

impl RelayCache {
fn new(cfg: RelayCacheCfg) -> Self {
Self {
cfg,
seen: std::collections::HashMap::new(),
}
}

fn should_relay(&mut self, env: &MessageEnvelope, now_ms: u64) -> bool {
if env.is_expired() {
return false;
Expand All @@ -154,20 +179,18 @@ impl RelayCache {
self.seen.insert(env.id.clone(), now_ms);

// Prune old ids (best-effort).
let keep_after = now_ms.saturating_sub(10 * 60 * 1000);
let keep_after = now_ms.saturating_sub(self.cfg.retention_ms);
self.seen.retain(|_, ts| *ts >= keep_after);

// Cap size to prevent unbounded growth under attack.
const MAX: usize = 5000;
const TARGET: usize = 4000;
if self.seen.len() > MAX {
if self.seen.len() > self.cfg.max_entries {
let mut v: Vec<(String, u64)> = self
.seen
.iter()
.map(|(k, ts)| (k.clone(), *ts))
.collect();
v.sort_by_key(|(_, ts)| *ts);
let drop_n = v.len().saturating_sub(TARGET);
let drop_n = v.len().saturating_sub(self.cfg.target_entries);
for (k, _) in v.into_iter().take(drop_n) {
self.seen.remove(&k);
}
Expand Down Expand Up @@ -1946,6 +1969,20 @@ impl CatalystNode {

// Put keypair in node dir (even if unused by simple transport).
net_cfg.peer.keypair_path = Some(self.config.storage.data_dir.join("p2p_keypair"));
net_cfg.peer.max_peers = self.config.network.max_peers as usize;
net_cfg.peer.min_peers = self.config.network.min_peers as usize;

// Wire safety limits from node config.
net_cfg.safety_limits.max_gossip_message_bytes = self.config.network.safety_limits.max_gossip_message_bytes;
net_cfg.safety_limits.per_peer_max_msgs_per_sec = self.config.network.safety_limits.per_peer_max_msgs_per_sec;
net_cfg.safety_limits.per_peer_max_bytes_per_sec = self.config.network.safety_limits.per_peer_max_bytes_per_sec;
net_cfg.safety_limits.max_tcp_frame_bytes = self.config.network.safety_limits.max_tcp_frame_bytes;
net_cfg.safety_limits.per_conn_max_msgs_per_sec = self.config.network.safety_limits.per_conn_max_msgs_per_sec;
net_cfg.safety_limits.per_conn_max_bytes_per_sec = self.config.network.safety_limits.per_conn_max_bytes_per_sec;
net_cfg.safety_limits.max_hops = self.config.network.safety_limits.max_hops;
net_cfg.safety_limits.dedup_cache_max_entries = self.config.network.safety_limits.dedup_cache_max_entries;
net_cfg.safety_limits.dial_jitter_max_ms = self.config.network.safety_limits.dial_jitter_max_ms;
net_cfg.safety_limits.dial_backoff_max_ms = self.config.network.safety_limits.dial_backoff_max_ms;
net_cfg.peer.bootstrap_peers = Vec::new();

let network = Arc::new(P2pService::new(net_cfg).await?);
Expand Down Expand Up @@ -2431,8 +2468,13 @@ impl CatalystNode {

let last_lsu: Arc<tokio::sync::RwLock<std::collections::HashMap<u64, catalyst_consensus::types::LedgerStateUpdate>>> =
Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
let relay_cfg = RelayCacheCfg {
max_entries: self.config.network.relay_cache.max_entries,
target_entries: self.config.network.relay_cache.target_entries,
retention_ms: self.config.network.relay_cache.retention_seconds.saturating_mul(1000),
};
let relay_cache: Arc<tokio::sync::Mutex<RelayCache>> =
Arc::new(tokio::sync::Mutex::new(RelayCache::default()));
Arc::new(tokio::sync::Mutex::new(RelayCache::new(relay_cfg)));
#[derive(Debug, Default)]
struct CatchupState {
observed_head_cycle: u64,
Expand Down
88 changes: 88 additions & 0 deletions crates/catalyst-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub struct NetworkConfig {

/// Gossip protocol configuration
pub gossip: GossipConfig,

/// Safety limits (DoS bounding) applied by transports.
pub safety_limits: SafetyLimitsConfig,

/// Discovery configuration
pub discovery: DiscoveryConfig,
Expand All @@ -120,6 +123,40 @@ pub struct NetworkConfig {
pub monitoring: MonitoringConfig,
}

/// Transport-agnostic DoS safety limits.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafetyLimitsConfig {
/// Maximum gossipsub message bytes accepted by the libp2p transport.
pub max_gossip_message_bytes: usize,

/// Per-peer message rate limit (msgs/sec) for libp2p transport.
pub per_peer_max_msgs_per_sec: u32,

/// Per-peer bandwidth cap (bytes/sec) for libp2p transport.
pub per_peer_max_bytes_per_sec: usize,

/// Maximum TCP frame bytes accepted by the simple transport.
pub max_tcp_frame_bytes: usize,

/// Per-connection message rate limit (msgs/sec) for simple transport.
pub per_conn_max_msgs_per_sec: u32,

/// Per-connection bandwidth cap (bytes/sec) for simple transport.
pub per_conn_max_bytes_per_sec: usize,

/// Maximum hops for multi-hop rebroadcast.
pub max_hops: u8,

/// Maximum number of recently seen envelope ids stored for deduplication.
pub dedup_cache_max_entries: usize,

/// Maximum dial jitter (milliseconds) applied to backoff scheduling.
pub dial_jitter_max_ms: u64,

/// Maximum backoff cap (milliseconds) applied to exponential dial backoff.
pub dial_backoff_max_ms: u64,
}

/// Peer-specific configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerConfig {
Expand Down Expand Up @@ -1727,6 +1764,7 @@ impl Default for NetworkConfig {
peer: PeerConfig::default(),
transport: TransportConfig::default(),
gossip: GossipConfig::default(),
safety_limits: SafetyLimitsConfig::default(),
discovery: DiscoveryConfig::default(),
security: SecurityConfig::default(),
bandwidth: BandwidthConfig::default(),
Expand All @@ -1736,6 +1774,24 @@ impl Default for NetworkConfig {
}
}

impl Default for SafetyLimitsConfig {
fn default() -> Self {
Self {
// Match the previously hard-coded defaults in the transports.
max_gossip_message_bytes: 8 * 1024 * 1024,
per_peer_max_msgs_per_sec: 200,
per_peer_max_bytes_per_sec: 8 * 1024 * 1024,
max_tcp_frame_bytes: 8 * 1024 * 1024,
per_conn_max_msgs_per_sec: 200,
per_conn_max_bytes_per_sec: 8 * 1024 * 1024,
max_hops: 10,
dedup_cache_max_entries: 20_000,
dial_jitter_max_ms: 250,
dial_backoff_max_ms: 60_000,
}
}
}

impl Default for PeerConfig {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -2537,6 +2593,38 @@ impl NetworkConfig {
));
}

// Validate safety limits
let sl = &self.safety_limits;
if sl.max_gossip_message_bytes == 0 || sl.max_tcp_frame_bytes == 0 {
return Err(NetworkError::ConfigError(
"safety_limits max message/frame bytes must be > 0".to_string(),
));
}
if sl.per_peer_max_msgs_per_sec == 0
|| sl.per_conn_max_msgs_per_sec == 0
|| sl.per_peer_max_bytes_per_sec == 0
|| sl.per_conn_max_bytes_per_sec == 0
{
return Err(NetworkError::ConfigError(
"safety_limits per-peer/per-conn budgets must be > 0".to_string(),
));
}
if sl.max_hops == 0 {
return Err(NetworkError::ConfigError(
"safety_limits.max_hops must be > 0".to_string(),
));
}
if sl.dedup_cache_max_entries == 0 {
return Err(NetworkError::ConfigError(
"safety_limits.dedup_cache_max_entries must be > 0".to_string(),
));
}
if sl.dial_backoff_max_ms == 0 {
return Err(NetworkError::ConfigError(
"safety_limits.dial_backoff_max_ms must be > 0".to_string(),
));
}

// Validate port ranges
if self.transport.tcp_port_range.0 >= self.transport.tcp_port_range.1 {
return Err(NetworkError::ConfigError(
Expand Down
Loading