From 3736d8be309b7b87089d75a5937c808f255d2926 Mon Sep 17 00:00:00 2001 From: "liquan.eth" Date: Sat, 9 May 2026 16:42:17 +0800 Subject: [PATCH 1/2] add tip_buffer cli args --- bin/stateless-validator/src/app.rs | 11 ++- bin/stateless-validator/tests/integration.rs | 72 +++++++++----------- crates/stateless-core/src/lib.rs | 5 +- crates/stateless-core/src/pipeline/config.rs | 4 ++ crates/stateless-core/src/pipeline/mod.rs | 2 +- 5 files changed, 49 insertions(+), 45 deletions(-) diff --git a/bin/stateless-validator/src/app.rs b/bin/stateless-validator/src/app.rs index 6b1a1b63..75e53608 100644 --- a/bin/stateless-validator/src/app.rs +++ b/bin/stateless-validator/src/app.rs @@ -116,6 +116,13 @@ pub struct CommandLineArgs { #[clap(long, env = "STATELESS_VALIDATOR_ERROR_RESTART_DELAY_MS")] pub error_restart_delay_ms: Option, + /// Safety margin below the remote tip: the fetcher will not spawn fetches for blocks + /// `> chain_latest - tip_buffer`. Gives the upstream witness generator headroom to + /// finish the very block we'd otherwise race it for. `0` disables the buffer. Defaults + /// to `DEFAULT_TIP_BUFFER` (see `stateless_core::DEFAULT_TIP_BUFFER`). + #[clap(long, env = "STATELESS_VALIDATOR_TIP_BUFFER")] + pub tip_buffer: Option, + /// Initial round-level RPC retry backoff (milliseconds). Applied after every provider in a /// round has failed; doubles each round up to `--rpc-max-backoff-ms`. #[clap(long, env = "STATELESS_VALIDATOR_RPC_INITIAL_BACKOFF_MS")] @@ -260,9 +267,7 @@ pub async fn run() -> Result<()> { override_ms(args.poll_interval_ms, pipeline_config.poll_interval); pipeline_config.error_restart_delay = override_ms(args.error_restart_delay_ms, pipeline_config.error_restart_delay); - // Stay 3 blocks behind the remote tip so the upstream witness generator has headroom - // to finish the block we'd otherwise race it for. - pipeline_config.tip_buffer = 3; + pipeline_config.tip_buffer = args.tip_buffer.unwrap_or(stateless_core::DEFAULT_TIP_BUFFER); let result = workers::run_with_signals( client, diff --git a/bin/stateless-validator/tests/integration.rs b/bin/stateless-validator/tests/integration.rs index 3e105e41..cf665c4e 100644 --- a/bin/stateless-validator/tests/integration.rs +++ b/bin/stateless-validator/tests/integration.rs @@ -31,6 +31,18 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use tracing_subscriber::EnvFilter; +/// Argv prefix for tests that exercise an *optional* flag — both required endpoints are +/// already supplied so the parse only depends on the flag under test. +const BASE_ARGS: &[&str] = &[ + "stateless-validator", + "--data-dir", + "/tmp/x", + "--rpc-endpoint", + "http://rpc", + "--witness-endpoint", + "http://w", +]; + /// Verifies that an endpoint flag accepts repeated flags, CSV values, and env var — /// ensuring container deployments configured purely via env are not silently limited /// to one endpoint (clap's `value_delimiter` applies to env-var values too). @@ -76,77 +88,59 @@ fn rpc_endpoint_accepts_multiple_forms() { ); } -/// Verifies that a concurrency cap flag parses as `Some(n)` via CLI and env var, -/// and omission leaves the value `None` (unbounded). -fn assert_concurrency_flag( +/// Verifies that an optional numeric flag parses as `Some(n)` via CLI and env var, +/// and omission leaves the value `None`. +fn assert_optional_numeric_flag( flag: &str, env: &str, - base: &[&str], - extract: impl Fn(CommandLineArgs) -> Option, -) { + extract: impl Fn(CommandLineArgs) -> Option, +) where + T: std::str::FromStr + std::fmt::Debug + PartialEq, + T::Err: std::fmt::Debug, +{ let guard = stateless_test_utils::env::env_lock(); let parse = |extra: &[&str]| { - extract(CommandLineArgs::try_parse_from(base.iter().chain(extra)).unwrap()) + extract(CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra)).unwrap()) }; assert_eq!(parse(&[]), None); - assert_eq!(parse(&[flag, "7"]), Some(7)); + assert_eq!(parse(&[flag, "7"]), Some("7".parse().unwrap())); let from_env = stateless_test_utils::env::with_env_var(&guard, env, "12", || parse(&[])); - assert_eq!(from_env, Some(12)); + assert_eq!(from_env, Some("12".parse().unwrap())); } #[test] fn data_max_concurrent_requests_flag_and_env() { - assert_concurrency_flag( + assert_optional_numeric_flag::( "--data-max-concurrent-requests", "STATELESS_VALIDATOR_DATA_MAX_CONCURRENT_REQUESTS", - &[ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ], |a| a.data_max_concurrent_requests, ); } #[test] fn witness_max_concurrent_requests_flag_and_env() { - assert_concurrency_flag( + assert_optional_numeric_flag::( "--witness-max-concurrent-requests", "STATELESS_VALIDATOR_WITNESS_MAX_CONCURRENT_REQUESTS", - &[ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ], |a| a.witness_max_concurrent_requests, ); } +#[test] +fn tip_buffer_flag_and_env() { + assert_optional_numeric_flag::("--tip-buffer", "STATELESS_VALIDATOR_TIP_BUFFER", |a| { + a.tip_buffer + }); +} + /// `canonical_chain_max_length` must reject 0 at parse time. A value of 0 would make /// `advance_chain` prune the entire canonical chain on every successful advance, /// rolling the pipeline back to the anchor each round and looping forever. #[test] fn canonical_chain_max_length_rejects_zero() { - let base = [ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ]; - let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(base.iter().chain(extra)); + let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra)); assert_eq!(parse(&[]).unwrap().canonical_chain_max_length, None); assert_eq!( diff --git a/crates/stateless-core/src/lib.rs b/crates/stateless-core/src/lib.rs index c0c3dc3d..ae1d1c9a 100644 --- a/crates/stateless-core/src/lib.rs +++ b/crates/stateless-core/src/lib.rs @@ -39,7 +39,8 @@ pub use executor::{ValidationError, ValidationStats, replay_block, validate_bloc pub mod pipeline; #[cfg(feature = "std")] pub use pipeline::{ - BlockFetcher, BlockProcessor, ErrorAction, PipelineConfig, PipelineHooks, PipelineOutcome, - ProcessedBlock, ReorgEvent, block_fetcher, find_divergence_point, run_pipeline, + BlockFetcher, BlockProcessor, DEFAULT_TIP_BUFFER, ErrorAction, PipelineConfig, PipelineHooks, + PipelineOutcome, ProcessedBlock, ReorgEvent, block_fetcher, find_divergence_point, + run_pipeline, }; pub mod withdrawals; diff --git a/crates/stateless-core/src/pipeline/config.rs b/crates/stateless-core/src/pipeline/config.rs index 1f0830ab..9110dadd 100644 --- a/crates/stateless-core/src/pipeline/config.rs +++ b/crates/stateless-core/src/pipeline/config.rs @@ -4,6 +4,10 @@ use std::time::Duration; use alloy_primitives::{BlockHash, BlockNumber}; +/// Default `tip_buffer` for callers that race the upstream witness generator. See the +/// field comment on [`PipelineConfig::tip_buffer`]. +pub const DEFAULT_TIP_BUFFER: u64 = 3; + /// Configuration for the chain sync pipeline. /// /// Binary-specific settings (metrics, reporting, pruner) live in binary CLI args. diff --git a/crates/stateless-core/src/pipeline/mod.rs b/crates/stateless-core/src/pipeline/mod.rs index d1c6ecaa..7fc81bff 100644 --- a/crates/stateless-core/src/pipeline/mod.rs +++ b/crates/stateless-core/src/pipeline/mod.rs @@ -25,7 +25,7 @@ use std::{sync::Arc, time::Duration}; use advancer::chain_advancer; use config::WorkerResult; -pub use config::{ErrorAction, PipelineConfig, PipelineOutcome, ReorgEvent}; +pub use config::{DEFAULT_TIP_BUFFER, ErrorAction, PipelineConfig, PipelineOutcome, ReorgEvent}; pub use divergence::{DivergenceError, DivergenceLookups, find_divergence_point}; use eyre::{Result, anyhow}; pub use fetcher::block_fetcher; From 518680811426540417947762db8bbfc95b17adb5 Mon Sep 17 00:00:00 2001 From: "liquan.eth" Date: Sat, 9 May 2026 16:51:25 +0800 Subject: [PATCH 2/2] move the DEFAULT_TIP_BUFFER to the bin --- bin/stateless-validator/src/app.rs | 9 +++++++-- crates/stateless-core/src/lib.rs | 5 ++--- crates/stateless-core/src/pipeline/config.rs | 4 ---- crates/stateless-core/src/pipeline/mod.rs | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bin/stateless-validator/src/app.rs b/bin/stateless-validator/src/app.rs index 75e53608..981ab4fd 100644 --- a/bin/stateless-validator/src/app.rs +++ b/bin/stateless-validator/src/app.rs @@ -19,6 +19,11 @@ use crate::{metrics, validator_db::ValidatorDB, workers}; /// Database filename for the validator. pub const VALIDATOR_DB_FILENAME: &str = "validator.redb"; +/// Default `--tip-buffer`. The validator races the upstream witness generator, so it stays +/// 3 blocks behind by default. Core's `PipelineConfig::tip_buffer` defaults to `0` because +/// the buffer is opt-in per binary; the validator opts in here. +const DEFAULT_TIP_BUFFER: u64 = 3; + /// Loads or creates a ChainSpec from the database or a genesis file. pub fn load_or_create_chain_spec( validator_db: &ValidatorDB, @@ -119,7 +124,7 @@ pub struct CommandLineArgs { /// Safety margin below the remote tip: the fetcher will not spawn fetches for blocks /// `> chain_latest - tip_buffer`. Gives the upstream witness generator headroom to /// finish the very block we'd otherwise race it for. `0` disables the buffer. Defaults - /// to `DEFAULT_TIP_BUFFER` (see `stateless_core::DEFAULT_TIP_BUFFER`). + /// to `DEFAULT_TIP_BUFFER`. #[clap(long, env = "STATELESS_VALIDATOR_TIP_BUFFER")] pub tip_buffer: Option, @@ -267,7 +272,7 @@ pub async fn run() -> Result<()> { override_ms(args.poll_interval_ms, pipeline_config.poll_interval); pipeline_config.error_restart_delay = override_ms(args.error_restart_delay_ms, pipeline_config.error_restart_delay); - pipeline_config.tip_buffer = args.tip_buffer.unwrap_or(stateless_core::DEFAULT_TIP_BUFFER); + pipeline_config.tip_buffer = args.tip_buffer.unwrap_or(DEFAULT_TIP_BUFFER); let result = workers::run_with_signals( client, diff --git a/crates/stateless-core/src/lib.rs b/crates/stateless-core/src/lib.rs index ae1d1c9a..c0c3dc3d 100644 --- a/crates/stateless-core/src/lib.rs +++ b/crates/stateless-core/src/lib.rs @@ -39,8 +39,7 @@ pub use executor::{ValidationError, ValidationStats, replay_block, validate_bloc pub mod pipeline; #[cfg(feature = "std")] pub use pipeline::{ - BlockFetcher, BlockProcessor, DEFAULT_TIP_BUFFER, ErrorAction, PipelineConfig, PipelineHooks, - PipelineOutcome, ProcessedBlock, ReorgEvent, block_fetcher, find_divergence_point, - run_pipeline, + BlockFetcher, BlockProcessor, ErrorAction, PipelineConfig, PipelineHooks, PipelineOutcome, + ProcessedBlock, ReorgEvent, block_fetcher, find_divergence_point, run_pipeline, }; pub mod withdrawals; diff --git a/crates/stateless-core/src/pipeline/config.rs b/crates/stateless-core/src/pipeline/config.rs index 9110dadd..1f0830ab 100644 --- a/crates/stateless-core/src/pipeline/config.rs +++ b/crates/stateless-core/src/pipeline/config.rs @@ -4,10 +4,6 @@ use std::time::Duration; use alloy_primitives::{BlockHash, BlockNumber}; -/// Default `tip_buffer` for callers that race the upstream witness generator. See the -/// field comment on [`PipelineConfig::tip_buffer`]. -pub const DEFAULT_TIP_BUFFER: u64 = 3; - /// Configuration for the chain sync pipeline. /// /// Binary-specific settings (metrics, reporting, pruner) live in binary CLI args. diff --git a/crates/stateless-core/src/pipeline/mod.rs b/crates/stateless-core/src/pipeline/mod.rs index 7fc81bff..d1c6ecaa 100644 --- a/crates/stateless-core/src/pipeline/mod.rs +++ b/crates/stateless-core/src/pipeline/mod.rs @@ -25,7 +25,7 @@ use std::{sync::Arc, time::Duration}; use advancer::chain_advancer; use config::WorkerResult; -pub use config::{DEFAULT_TIP_BUFFER, ErrorAction, PipelineConfig, PipelineOutcome, ReorgEvent}; +pub use config::{ErrorAction, PipelineConfig, PipelineOutcome, ReorgEvent}; pub use divergence::{DivergenceError, DivergenceLookups, find_divergence_point}; use eyre::{Result, anyhow}; pub use fetcher::block_fetcher;