Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions bin/stateless-validator/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use crate::{metrics, validator_db::ValidatorDB, workers};
/// Database filename for the validator.
pub const VALIDATOR_DB_FILENAME: &str = "validator.redb";

/// Default `--tip-buffer`. The validator races the upstream witness generator, so it stays
/// 3 blocks behind by default. Core's `PipelineConfig::tip_buffer` defaults to `0` because
/// the buffer is opt-in per binary; the validator opts in here.
const DEFAULT_TIP_BUFFER: u64 = 3;
Comment thread
flyq marked this conversation as resolved.

/// Loads or creates a ChainSpec from the database or a genesis file.
pub fn load_or_create_chain_spec(
validator_db: &ValidatorDB,
Expand Down Expand Up @@ -116,6 +121,13 @@ pub struct CommandLineArgs {
#[clap(long, env = "STATELESS_VALIDATOR_ERROR_RESTART_DELAY_MS")]
pub error_restart_delay_ms: Option<u64>,

/// Safety margin below the remote tip: the fetcher will not spawn fetches for blocks
/// `> chain_latest - tip_buffer`. Gives the upstream witness generator headroom to
/// finish the very block we'd otherwise race it for. `0` disables the buffer. Defaults
/// to `DEFAULT_TIP_BUFFER`.
#[clap(long, env = "STATELESS_VALIDATOR_TIP_BUFFER")]
pub tip_buffer: Option<u64>,

/// Initial round-level RPC retry backoff (milliseconds). Applied after every provider in a
/// round has failed; doubles each round up to `--rpc-max-backoff-ms`.
#[clap(long, env = "STATELESS_VALIDATOR_RPC_INITIAL_BACKOFF_MS")]
Expand Down Expand Up @@ -260,9 +272,7 @@ pub async fn run() -> Result<()> {
override_ms(args.poll_interval_ms, pipeline_config.poll_interval);
pipeline_config.error_restart_delay =
override_ms(args.error_restart_delay_ms, pipeline_config.error_restart_delay);
// Stay 3 blocks behind the remote tip so the upstream witness generator has headroom
// to finish the block we'd otherwise race it for.
pipeline_config.tip_buffer = 3;
pipeline_config.tip_buffer = args.tip_buffer.unwrap_or(DEFAULT_TIP_BUFFER);

let result = workers::run_with_signals(
client,
Expand Down
72 changes: 33 additions & 39 deletions bin/stateless-validator/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use tracing_subscriber::EnvFilter;

/// Argv prefix for tests that exercise an *optional* flag — both required endpoints are
/// already supplied so the parse only depends on the flag under test.
const BASE_ARGS: &[&str] = &[
"stateless-validator",
"--data-dir",
"/tmp/x",
"--rpc-endpoint",
"http://rpc",
"--witness-endpoint",
"http://w",
];

/// Verifies that an endpoint flag accepts repeated flags, CSV values, and env var —
/// ensuring container deployments configured purely via env are not silently limited
/// to one endpoint (clap's `value_delimiter` applies to env-var values too).
Expand Down Expand Up @@ -76,77 +88,59 @@ fn rpc_endpoint_accepts_multiple_forms() {
);
}

/// Verifies that a concurrency cap flag parses as `Some(n)` via CLI and env var,
/// and omission leaves the value `None` (unbounded).
fn assert_concurrency_flag(
/// Verifies that an optional numeric flag parses as `Some(n)` via CLI and env var,
/// and omission leaves the value `None`.
fn assert_optional_numeric_flag<T>(
flag: &str,
env: &str,
base: &[&str],
extract: impl Fn(CommandLineArgs) -> Option<usize>,
) {
extract: impl Fn(CommandLineArgs) -> Option<T>,
) where
T: std::str::FromStr + std::fmt::Debug + PartialEq,
T::Err: std::fmt::Debug,
{
let guard = stateless_test_utils::env::env_lock();
let parse = |extra: &[&str]| {
extract(CommandLineArgs::try_parse_from(base.iter().chain(extra)).unwrap())
extract(CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra)).unwrap())
};

assert_eq!(parse(&[]), None);
assert_eq!(parse(&[flag, "7"]), Some(7));
assert_eq!(parse(&[flag, "7"]), Some("7".parse().unwrap()));

let from_env = stateless_test_utils::env::with_env_var(&guard, env, "12", || parse(&[]));
assert_eq!(from_env, Some(12));
assert_eq!(from_env, Some("12".parse().unwrap()));
}

#[test]
fn data_max_concurrent_requests_flag_and_env() {
assert_concurrency_flag(
assert_optional_numeric_flag::<usize>(
"--data-max-concurrent-requests",
"STATELESS_VALIDATOR_DATA_MAX_CONCURRENT_REQUESTS",
&[
"stateless-validator",
"--data-dir",
"/tmp/x",
"--rpc-endpoint",
"http://rpc",
"--witness-endpoint",
"http://w",
],
|a| a.data_max_concurrent_requests,
);
}

#[test]
fn witness_max_concurrent_requests_flag_and_env() {
assert_concurrency_flag(
assert_optional_numeric_flag::<usize>(
"--witness-max-concurrent-requests",
"STATELESS_VALIDATOR_WITNESS_MAX_CONCURRENT_REQUESTS",
&[
"stateless-validator",
"--data-dir",
"/tmp/x",
"--rpc-endpoint",
"http://rpc",
"--witness-endpoint",
"http://w",
],
|a| a.witness_max_concurrent_requests,
);
}

#[test]
fn tip_buffer_flag_and_env() {
assert_optional_numeric_flag::<u64>("--tip-buffer", "STATELESS_VALIDATOR_TIP_BUFFER", |a| {
a.tip_buffer
});
}

/// `canonical_chain_max_length` must reject 0 at parse time. A value of 0 would make
/// `advance_chain` prune the entire canonical chain on every successful advance,
/// rolling the pipeline back to the anchor each round and looping forever.
#[test]
fn canonical_chain_max_length_rejects_zero() {
let base = [
"stateless-validator",
"--data-dir",
"/tmp/x",
"--rpc-endpoint",
"http://rpc",
"--witness-endpoint",
"http://w",
];
let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(base.iter().chain(extra));
let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra));

assert_eq!(parse(&[]).unwrap().canonical_chain_max_length, None);
assert_eq!(
Expand Down
Loading