diff --git a/crates/core/src/qbft/callbacks.rs b/crates/core/src/qbft/callbacks.rs new file mode 100644 index 00000000..5cba3815 --- /dev/null +++ b/crates/core/src/qbft/callbacks.rs @@ -0,0 +1,121 @@ +use super::{MessageType, Msg, QbftTypes, Result, UponRule}; +use cancellation::CancellationToken; +use crossbeam::channel as mpmc; +use std::time; + +pub(super) type BroadcastFn = + dyn for<'a> Fn(BroadcastRequest<'a, T>) -> Result<()> + Send + Sync; +pub(super) type CompareFn = dyn for<'a> Fn(CompareRequest<'a, T>) + Send + Sync + 'static; +pub(super) type UponRuleLoggerFn = dyn for<'a> Fn(UponRuleLog<'a, T>) + Send + Sync; +pub(super) type RoundChangeLoggerFn = dyn for<'a> Fn(RoundChangeLog<'a, T>) + Send + Sync; +pub(super) type UnjustLoggerFn = dyn for<'a> Fn(UnjustLog<'a, T>) + Send + Sync; +pub(super) type LeaderFn = dyn for<'a> Fn(LeaderRequest<'a, T>) -> bool + Send + Sync; +pub(super) type DecideFn = dyn for<'a> Fn(DecideRequest<'a, T>) + Send + Sync; + +/// Input passed to `Transport::broadcast`. +pub struct BroadcastRequest<'a, T: QbftTypes> { + /// Parent cancellation token. + pub ct: &'a CancellationToken, + /// Message type to broadcast. + pub type_: MessageType, + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Sending process. + pub source: i64, + /// Message round. + pub round: i64, + /// Proposal value. + pub value: &'a T::Value, + /// Prepared round carried by ROUND-CHANGE messages. + pub prepared_round: i64, + /// Prepared value carried by ROUND-CHANGE messages. + pub prepared_value: &'a T::Value, + /// Optional justification piggybacked on the message. + pub justification: Option<&'a Vec>>, +} + +/// Input passed to `Definition::compare`. +pub struct CompareRequest<'a, T: QbftTypes> { + /// Compare-scoped cancellation token. + pub ct: &'a CancellationToken, + /// Proposed commit quorum message. + pub qcommit: &'a Msg, + /// Channel carrying the local compare value if it was not cached yet. + pub input_value_source_ch: &'a mpmc::Receiver, + /// Cached local compare value. + pub input_value_source: &'a T::Compare, + /// Channel used by the callback to return compare status. + pub return_err: &'a mpmc::Sender>, + /// Channel used by the callback to cache the local compare value. + pub return_value: &'a mpmc::Sender, +} + +/// Timer returned by `Definition::new_timer`. +pub struct Timer { + /// Channel that fires when the timer expires. + pub receive: mpmc::Receiver, + /// Stops the timer. + pub stop: Box, +} + +/// Input passed to `Definition::is_leader`. +pub struct LeaderRequest<'a, T: QbftTypes> { + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Round being evaluated. + pub round: i64, + /// Process being evaluated. + pub process: i64, +} + +/// Input passed to `Definition::decide`. +pub struct DecideRequest<'a, T: QbftTypes> { + /// Parent cancellation token. + pub ct: &'a CancellationToken, + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Decided value. + pub value: &'a T::Value, + /// Commit quorum justifying the decision. + pub qcommit: &'a Vec>, +} + +/// Input passed to `QbftLogger::upon_rule`. +pub struct UponRuleLog<'a, T: QbftTypes> { + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Local process. + pub process: i64, + /// Current local round. + pub round: i64, + /// Message that triggered classification. + pub msg: &'a Msg, + /// Rule that fired. + pub upon_rule: UponRule, +} + +/// Input passed to `QbftLogger::round_change`. +pub struct RoundChangeLog<'a, T: QbftTypes> { + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Local process. + pub process: i64, + /// Previous local round. + pub round: i64, + /// New local round. + pub new_round: i64, + /// Rule that caused the round change. + pub upon_rule: UponRule, + /// Messages from the previous round. + pub msgs: &'a Vec>, +} + +/// Input passed to `QbftLogger::unjust`. +pub struct UnjustLog<'a, T: QbftTypes> { + /// Consensus instance identifier. + pub instance: &'a T::Instance, + /// Local process. + pub process: i64, + /// Rejected message. + pub msg: Msg, +} diff --git a/crates/core/src/qbft/fake_clock.rs b/crates/core/src/qbft/fake_clock.rs index 6bc32495..c1891fb5 100644 --- a/crates/core/src/qbft/fake_clock.rs +++ b/crates/core/src/qbft/fake_clock.rs @@ -1,3 +1,5 @@ +#![allow(clippy::arithmetic_side_effects)] + use crossbeam::channel as mpmc; use std::{ collections::BTreeMap, @@ -28,7 +30,13 @@ struct FakeClockInner { now: Instant, last_id: usize, cancelled: bool, - clients: BTreeMap, Instant, TimerPriority)>, + clients: BTreeMap, +} + +struct ClientRecord { + sender: mpmc::Sender, + deadline: Instant, + priority: TimerPriority, } impl FakeClock { @@ -80,7 +88,14 @@ impl FakeClock { let deadline = inner.now + duration; inner.last_id += 1; - inner.clients.insert(id, (tx, deadline, priority)); + inner.clients.insert( + id, + ClientRecord { + sender: tx, + deadline, + priority, + }, + ); id }; @@ -124,9 +139,9 @@ impl FakeClock { inner.now += duration; let now = inner.now; - for (&id, (ch, deadline, priority)) in &inner.clients { - if *deadline <= now { - expired.push((id, *deadline, *priority, ch.clone())); + for (&id, record) in &inner.clients { + if record.deadline <= now { + expired.push((id, record.deadline, record.priority, record.sender.clone())); } } diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index 167f2c95..a3113927 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -1,3 +1,12 @@ +#![allow( + clippy::arithmetic_side_effects, + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + clippy::cast_precision_loss, + clippy::cast_sign_loss, + clippy::collapsible_if +)] + use crate::qbft::{ self, fake_clock::{FakeClock, TimerPriority}, @@ -6,7 +15,7 @@ use crate::qbft::{ use cancellation::CancellationTokenSource; use crossbeam::channel as mpmc; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, VecDeque}, fmt::Write as _, panic::{self, AssertUnwindSafe}, sync::{ @@ -31,13 +40,22 @@ const TEST_STREAM_MSG_ROUND: u64 = 11; const TEST_STREAM_MSG_VALUE: u64 = 12; const TEST_STREAM_MSG_PREPARED_ROUND: u64 = 13; const TEST_STREAM_MSG_PREPARED_VALUE: u64 = 14; +const TRACE_DUMP_LIMIT: usize = 200; const TEST_WAIT_TIMEOUT: Duration = Duration::from_secs(1); // Wall-clock guard catches lack of harness progress. Fake time still controls // protocol progress, so slow-but-progressing parallel runs should not fail. const TEST_STALL_TIMEOUT: Duration = Duration::from_secs(20); type RunOutcome = std::thread::Result>; -type TestMsgRef = Msg; +type TestMsgRef = Msg; + +struct TestQbft; + +impl QbftTypes for TestQbft { + type Compare = i64; + type Instance = i64; + type Value = i64; +} struct PendingCompareGuard { pending_compares: Arc, @@ -114,16 +132,11 @@ fn test_qbft(test: Test) { // Keep peer iteration deterministic. These fake-clock tests assert exact // rounds, and broadcast fanout order affects which node observes quorums // first when tests run in parallel. - let mut receives = BTreeMap::< - i64, - ( - mpmc::Sender>, - mpmc::Receiver>, - ), - >::new(); + let mut receives = + BTreeMap::>, mpmc::Receiver>)>::new(); let (broadcast_tx, broadcast_rx) = mpmc::unbounded::(); let (unjust_tx, unjust_rx) = mpmc::unbounded::(); - let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); + let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); let (run_chan_tx, run_chan_rx) = mpmc::bounded::<(i64, RunOutcome)>(N); let expected_initial_timers = N + test .value_delay @@ -146,84 +159,89 @@ fn test_qbft(test: Test) { Duration::from_secs(u64::pow(2, (round as u32) - 1)) }; - clock.new_timer(d) + let (receive, stop) = clock.new_timer(d); + Timer { receive, stop } }) }, decide: { let result_chan_tx = result_chan_tx.clone(); - Box::new(move |_, _, _, q_commit| { - result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); + Box::new(move |req| { + result_chan_tx + .send(req.qcommit.clone()) + .expect(WRITE_CHAN_ERR); }) }, compare: { let pending_compares = pending_compares.clone(); - Arc::new(move |_, _, _, _, return_err, _| { + Arc::new(move |req| { let _guard = PendingCompareGuard { pending_compares: pending_compares.clone(), }; - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + req.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); }) }, nodes: N as i64, fifo_limit: FIFO_LIMIT as i64, - log_round_change: { - let clock = clock.clone(); - let trace = trace.clone(); - let pending_timer_actions = pending_timer_actions.clone(); - - Box::new(move |_, process, round, new_round, upon_rule, _| { - if upon_rule == UPON_ROUND_TIMEOUT { - complete_timer_action(&pending_timer_actions); - } + logger: QbftLogger { + round_change: { + let clock = clock.clone(); + let trace = trace.clone(); + let pending_timer_actions = pending_timer_actions.clone(); + + Box::new(move |req| { + if req.upon_rule == UPON_ROUND_TIMEOUT { + complete_timer_action(&pending_timer_actions); + } - trace.push(format!( - "{:?} - {}@{} change to {} ~= {}", - clock.elapsed(), - process, - round, - new_round, - upon_rule, - )); - }) - }, - log_unjust: { - let trace = trace.clone(); - let unjust_tx = unjust_tx.clone(); - let fuzz = test.fuzz; - Box::new(move |_, process, msg| { - let line = format!("Unjust: process={} msg={:?}", process, msg); - trace.push(line.clone()); - if !fuzz { - unjust_tx.send(line).expect(WRITE_CHAN_ERR); - } - }) - }, - log_upon_rule: { - let clock = clock.clone(); - let trace = trace.clone(); - let pending_compares = pending_compares.clone(); - Box::new(move |_, process, round, msg, upon_rule| { - if upon_rule == UPON_JUSTIFIED_PRE_PREPARE { - pending_compares.fetch_add(1, Ordering::SeqCst); - } + trace.push(format!( + "{:?} - {}@{} change to {} ~= {}", + clock.elapsed(), + req.process, + req.round, + req.new_round, + req.upon_rule, + )); + }) + }, + unjust: { + let trace = trace.clone(); + let unjust_tx = unjust_tx.clone(); + let fuzz = test.fuzz; + Box::new(move |req| { + let line = format!("Unjust: process={} msg={:?}", req.process, req.msg); + trace.push(line.clone()); + if !fuzz { + unjust_tx.send(line).expect(WRITE_CHAN_ERR); + } + }) + }, + upon_rule: { + let clock = clock.clone(); + let trace = trace.clone(); + let pending_compares = pending_compares.clone(); + Box::new(move |req| { + if req.upon_rule == UPON_JUSTIFIED_PRE_PREPARE { + pending_compares.fetch_add(1, Ordering::SeqCst); + } - trace.push(format!( - "{:?} {} => {}@{} -> {}@{} ~= {}", - clock.elapsed(), - msg.source(), - msg.type_(), - msg.round(), - process, - round, - upon_rule, - )); - }) + trace.push(format!( + "{:?} {} => {}@{} -> {}@{} ~= {}", + clock.elapsed(), + req.msg.source(), + req.msg.type_(), + req.msg.round(), + req.process, + req.round, + req.upon_rule, + )); + }) + }, }, }); thread::scope(|s| { for i in 1..=N as i64 { - let (sender, receiver) = mpmc::bounded::>(1000); + let (sender, receiver) = mpmc::bounded::>(1000); let broadcast_tx = broadcast_tx.clone(); receives.insert(i, (sender.clone(), receiver.clone())); @@ -232,55 +250,53 @@ fn test_qbft(test: Test) { let clock = clock.clone(); let trace = trace.clone(); - Box::new( - move |_, type_, instance, source, round, value, pr, pv, justification| { - if round > MAX_ROUND as i64 { - return Err(QbftError::MaxRoundReached); - } - - if type_ == MSG_COMMIT && round <= test.commits_after.into() { - trace.push(format!( - "{:?} {} dropping commit for round {}", - clock.elapsed(), - source, - round - )); - return Ok(()); - } + Box::new(move |req| { + if req.round > MAX_ROUND as i64 { + return Err(QbftError::MaxRoundReached); + } + if req.type_ == MSG_COMMIT && req.round <= test.commits_after.into() { trace.push(format!( - "{:?} {} => {}@{}", + "{:?} {} dropping commit for round {}", clock.elapsed(), - source, - type_, - round + req.source, + req.round )); + return Ok(()); + } - let msg = new_msg( - type_, - *instance, - source, - round, - *value, - *value, - pr, - *pv, - justification, - ); - sender.send(msg.clone()).expect(WRITE_CHAN_ERR); - - bcast( - broadcast_tx.clone(), - msg.clone(), - test.bcast_jitter_ms, - clock.clone(), - trace.clone(), - seed, - ); + trace.push(format!( + "{:?} {} => {}@{}", + clock.elapsed(), + req.source, + req.type_, + req.round + )); - Ok(()) - }, - ) + let msg = new_msg( + req.type_, + *req.instance, + req.source, + req.round, + *req.value, + *req.value, + req.prepared_round, + *req.prepared_value, + req.justification, + ); + sender.send(msg.clone()).expect(WRITE_CHAN_ERR); + + bcast( + broadcast_tx.clone(), + msg.clone(), + test.bcast_jitter_ms, + clock.clone(), + trace.clone(), + seed, + ); + + Ok(()) + }) }, receive: receiver.clone(), }; @@ -358,7 +374,11 @@ fn test_qbft(test: Test) { s.spawn(move || { _ = v_chan_tx_send.send(i); }); - } else if is_leader(&test.instance, 1, i) { + } else if is_leader(LeaderRequest { + instance: &test.instance, + round: 1, + process: i, + }) { let v_chan_tx_send = keep_value_sender .as_ref() .expect("value sender kept until run returns") @@ -403,7 +423,7 @@ fn test_qbft(test: Test) { } } - let mut results = BTreeMap::>::new(); + let mut results = BTreeMap::>::new(); let mut count = 0; let mut decided = false; let mut done = 0; @@ -541,7 +561,11 @@ fn test_qbft(test: Test) { ); } } else { // Otherwise check that leader value was used. - if !is_leader(&test.instance, commit.round(), commit.value()) { + if !is_leader(LeaderRequest { + instance: &test.instance, + round: commit.round(), + process: commit.value(), + }) { cts.cancel(); clock.cancel(); panic!( @@ -658,7 +682,7 @@ fn test_qbft(test: Test) { } #[derive(Clone, Default)] -struct Trace(Arc>>); +struct Trace(Arc>>); impl Trace { fn new() -> Self { @@ -666,14 +690,17 @@ impl Trace { } fn push(&self, line: String) { - self.0.lock().unwrap().push(line); + let mut lines = self.0.lock().unwrap(); + if lines.len() == TRACE_DUMP_LIMIT { + lines.pop_front(); + } + lines.push_back(line); } fn dump(&self) -> String { let lines = self.0.lock().unwrap(); - let start = lines.len().saturating_sub(200); let mut out = String::new(); - for line in &lines[start..] { + for line in lines.iter() { let _ = writeln!(out, "{line}"); } out @@ -700,6 +727,10 @@ fn outcome_is_error(outcome: &RunOutcome, expected: fn(&QbftError) -> bool) -> b matches!(outcome, Ok(Err(err)) if expected(err)) } +fn assert_upon_rule(expected: UponRule, actual: UponRule) { + assert!(actual == expected, "want {expected}, got {actual}"); +} + fn test_seed(test: &Test) -> u64 { let mut seed = seed_from_label(TEST_SEED_LABEL); seed ^= test.instance as u64; @@ -722,10 +753,8 @@ fn seed_from_label(label: &str) -> u64 { } /// Construct a leader election function. -fn make_is_leader(n: i64) -> impl Fn(&i64, i64, i64) -> bool + Clone { - move |instance: &i64, round: i64, process: i64| -> bool { - (instance + round).rem_euclid(n) == process - } +fn make_is_leader(n: i64) -> impl for<'a> Fn(LeaderRequest<'a, TestQbft>) -> bool + Clone { + move |req| (*req.instance + req.round).rem_euclid(n) == req.process } /// Returns a new message to be broadcast. @@ -739,8 +768,8 @@ fn new_msg( value_source: i64, pr: i64, pv: i64, - justify: Option<&Vec>>, -) -> Msg { + justify: Option<&Vec>>, +) -> Msg { let msgs = match justify { None => vec![], Some(justify) => justify @@ -790,7 +819,7 @@ fn new_round_change_quorum(round: i64, pr: i64, pv: i64) -> Vec { // messages. fn bcast( broadcast: mpmc::Sender, - msg: Msg, + msg: Msg, jitter_ms: i32, clock: FakeClock, trace: Trace, @@ -837,10 +866,9 @@ fn deliver_ready_broadcasts( .iter() .take_while(|delayed| delayed.deliver_at <= clock.elapsed()) .count(); - let ready = pending.drain(..ready_count).collect::>(); - ready - .into_iter() + pending + .drain(..ready_count) .map(|delayed| fanout_broadcast(receives, drop_prob, seed, trace, clock, delayed.msg)) .sum() } @@ -893,7 +921,7 @@ fn fanout_broadcast( broadcasts } -fn random_msg(instance: i64, peer_idx: i64, seed: u64, counter: u64) -> Msg { +fn random_msg(instance: i64, peer_idx: i64, seed: u64, counter: u64) -> Msg { let message_types = [ MSG_PRE_PREPARE, MSG_PREPARE, @@ -915,12 +943,12 @@ fn random_msg(instance: i64, peer_idx: i64, seed: u64, counter: u64) -> Msg, target: i64, stream_id: u64) -> f64 { +fn deterministic_unit(seed: u64, msg: &Msg, target: i64, stream_id: u64) -> f64 { let value = deterministic_msg_u64(seed, msg, target, stream_id) >> 11; value as f64 / ((1_u64 << 53) as f64) } -fn deterministic_msg_u64(seed: u64, msg: &Msg, target: i64, stream_id: u64) -> u64 { +fn deterministic_msg_u64(seed: u64, msg: &Msg, target: i64, stream_id: u64) -> u64 { let mut value = splitmix64(seed ^ stream_id); value = splitmix64(value ^ i64_to_u64(msg.type_().0)); value = splitmix64(value ^ i64_to_u64(msg.instance())); @@ -967,7 +995,7 @@ struct TestMsg { justify: Option>, } -impl SomeMsg for TestMsg { +impl SomeMsg for TestMsg { fn type_(&self) -> MessageType { self.msg_type } @@ -1000,12 +1028,12 @@ impl SomeMsg for TestMsg { self.pv } - fn justification(&self) -> Vec> { + fn justification(&self) -> Vec> { match self.justify { None => vec![], Some(ref j) => j .iter() - .map(|j| Arc::new(j.clone()) as Msg) + .map(|j| Arc::new(j.clone()) as Msg) .collect(), } } @@ -1206,27 +1234,177 @@ fn fuzzed(start_delay_secs: Option, decide_round: i32, random_round: bool) }); } -fn noop_definition() -> Definition { +fn noop_definition() -> Definition { Definition { - is_leader: Box::new(|_, _, _| false), - new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), - decide: Box::new(|_, _, _, _| {}), - compare: Arc::new(|_, _, _, _, _, _| {}), + is_leader: Box::new(|_| false), + new_timer: Box::new(|_| Timer { + receive: mpmc::never(), + stop: Box::new(|| {}), + }), + decide: Box::new(|_| {}), + compare: Arc::new(|_| {}), nodes: 0, fifo_limit: 0, - log_round_change: Box::new(|_, _, _, _, _, _| {}), - log_unjust: Box::new(|_, _, _| {}), - log_upon_rule: Box::new(|_, _, _, _, _| {}), + logger: QbftLogger { + round_change: Box::new(|_| {}), + unjust: Box::new(|_| {}), + upon_rule: Box::new(|_| {}), + }, } } -fn noop_transport() -> Transport { +fn noop_transport() -> Transport { Transport { - broadcast: Box::new(|_, _, _, _, _, _, _, _, _| Ok(())), + broadcast: Box::new(|_| Ok(())), receive: mpmc::never(), } } +#[derive(Debug, PartialEq, Eq)] +struct BroadcastRecord { + canceled: bool, + type_: MessageType, + instance: i64, + source: i64, + round: i64, + value: i64, + prepared_round: i64, + prepared_value: i64, + justification_len: usize, +} + +#[test] +fn broadcast_request_maps_protocol_fields() { + let (receive_tx, receive_rx) = mpmc::bounded::>(4); + receive_tx + .send(new_msg(MSG_PRE_PREPARE, 0, 1, 1, 7, 7, 0, 0, None)) + .expect(WRITE_CHAN_ERR); + for source in 1..=3 { + receive_tx + .send(new_msg(MSG_PREPARE, 0, source, 1, 7, 7, 0, 0, None)) + .expect(WRITE_CHAN_ERR); + } + + let cts = CancellationTokenSource::new(); + let token = cts.token().clone(); + let (record_tx, record_rx) = mpmc::unbounded(); + let transport = Transport { + broadcast: Box::new(move |req| { + record_tx + .send(BroadcastRecord { + canceled: req.ct.is_canceled(), + type_: req.type_, + instance: *req.instance, + source: req.source, + round: req.round, + value: *req.value, + prepared_round: req.prepared_round, + prepared_value: *req.prepared_value, + justification_len: req.justification.map_or(0, Vec::len), + }) + .expect(WRITE_CHAN_ERR); + if req.type_ == MSG_COMMIT { + cts.cancel(); + } + Ok(()) + }), + receive: receive_rx, + }; + let mut def = noop_definition(); + def.nodes = 4; + def.fifo_limit = 100; + def.is_leader = Box::new(|req| req.process == 1); + def.compare = Arc::new(|req| req.return_err.send(Ok(())).expect(WRITE_CHAN_ERR)); + + let (_input_tx, input_rx) = mpmc::bounded::(1); + let (_source_tx, source_rx) = mpmc::bounded::(1); + assert!(matches!( + qbft::run(&token, &def, &transport, &0, 2, input_rx, source_rx), + Err(QbftError::ContextCanceled) + )); + assert_eq!( + record_rx.try_iter().collect::>(), + vec![ + BroadcastRecord { + canceled: false, + type_: MSG_PREPARE, + instance: 0, + source: 2, + round: 1, + value: 7, + prepared_round: 0, + prepared_value: 0, + justification_len: 0, + }, + BroadcastRecord { + canceled: false, + type_: MSG_COMMIT, + instance: 0, + source: 2, + round: 1, + value: 7, + prepared_round: 0, + prepared_value: 0, + justification_len: 0, + }, + ] + ); + + let (timer_tx, timer_rx) = mpmc::bounded(1); + timer_tx.send(time::Instant::now()).expect(WRITE_CHAN_ERR); + let cts = CancellationTokenSource::new(); + let token = cts.token().clone(); + let (record_tx, record_rx) = mpmc::unbounded(); + let transport = Transport { + broadcast: Box::new(move |req| { + record_tx + .send(BroadcastRecord { + canceled: req.ct.is_canceled(), + type_: req.type_, + instance: *req.instance, + source: req.source, + round: req.round, + value: *req.value, + prepared_round: req.prepared_round, + prepared_value: *req.prepared_value, + justification_len: req.justification.map_or(0, Vec::len), + }) + .expect(WRITE_CHAN_ERR); + cts.cancel(); + Ok(()) + }), + receive: mpmc::never(), + }; + let mut def = noop_definition(); + def.nodes = 4; + def.fifo_limit = 100; + def.new_timer = Box::new(move |_| Timer { + receive: timer_rx.clone(), + stop: Box::new(|| {}), + }); + + let (_input_tx, input_rx) = mpmc::bounded::(1); + let (_source_tx, source_rx) = mpmc::bounded::(1); + assert!(matches!( + qbft::run(&token, &def, &transport, &0, 2, input_rx, source_rx), + Err(QbftError::ContextCanceled) + )); + assert_eq!( + record_rx.try_iter().collect::>(), + vec![BroadcastRecord { + canceled: false, + type_: MSG_ROUND_CHANGE, + instance: 0, + source: 2, + round: 2, + value: 0, + prepared_round: 0, + prepared_value: 0, + justification_len: 0, + }] + ); +} + // Tests quorum/faulty formulas across node counts. // Expect quorum and tolerated-fault counts to match the Charon formula. #[test_case(1, 1, 0 ; "n1")] @@ -1252,7 +1430,7 @@ fn noop_transport() -> Transport { #[test_case(21, 14, 6 ; "n21")] #[test_case(22, 15, 7 ; "n22")] fn formulas(n: i64, q: i64, f: i64) { - let d = Definition:: { + let d = Definition:: { nodes: n, ..noop_definition() }; @@ -1301,7 +1479,7 @@ fn duplicate_pre_prepare_rules() { const NO_LEADER: i64 = 1; const LEADER: i64 = 2; - let new_preprepare = |round: i64| -> Msg { + let new_preprepare = |round: i64| -> Msg { new_msg( MSG_PRE_PREPARE, 0, @@ -1319,33 +1497,37 @@ fn duplicate_pre_prepare_rules() { let mut def = noop_definition(); def.nodes = 4; def.fifo_limit = 100; - def.is_leader = Box::new(|_, _, process| process == LEADER); - def.log_upon_rule = Box::new(move |_, _, round, msg, upon_rule| { - println!("UponRule: rule={} round={} ", upon_rule, msg.round()); + def.is_leader = Box::new(|req| req.process == LEADER); + def.logger.upon_rule = Box::new(move |req| { + println!( + "UponRule: rule={} round={} ", + req.upon_rule, + req.msg.round() + ); - assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); + assert!(req.upon_rule == UPON_JUSTIFIED_PRE_PREPARE); - if msg.round() == 1 { + if req.msg.round() == 1 { return; } - if msg.round() == 2 { + if req.msg.round() == 2 { cts.cancel(); return; } - panic!("unexpected round {}", round); + panic!("unexpected round {}", req.round); }); - def.compare = Arc::new(|_, msg, _, _, return_err, _| { - let result = if msg.round() == 1 { + def.compare = Arc::new(|req| { + let result = if req.qcommit.round() == 1 { Err(QbftError::CompareError) } else { Ok(()) }; - return_err.send(result).expect(WRITE_CHAN_ERR); + req.return_err.send(result).expect(WRITE_CHAN_ERR); }); - let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); + let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); r_chan_tx.send(new_preprepare(1)).expect(WRITE_CHAN_ERR); r_chan_tx.send(new_preprepare(2)).expect(WRITE_CHAN_ERR); @@ -1407,29 +1589,41 @@ fn idle_run_returns_when_cancelled() { )); } -// Tests definition validation at the `run` boundary. -// Expect invalid node count and FIFO limit to return typed errors. -#[test_case(0, 1, true ; "invalid_nodes")] -#[test_case(4, 0, false ; "invalid_fifo_limit")] -fn invalid_definition_rejected(nodes: i64, fifo_limit: i64, invalid_nodes: bool) { +fn run_with_definition(def: &Definition) -> Result<()> { let cts = CancellationTokenSource::new(); let transport = noop_transport(); let (_input_tx, input_rx) = mpmc::bounded::(1); let (_source_tx, source_rx) = mpmc::bounded::(1); + qbft::run(cts.token(), def, &transport, &0, 1, input_rx, source_rx) +} + +// Tests definition validation at the `run` boundary. +// Expect invalid node count to return a typed error. +#[test] +fn invalid_nodes_rejected() { let mut def = noop_definition(); - def.nodes = nodes; - def.fifo_limit = fifo_limit; - - let result = qbft::run(cts.token(), &def, &transport, &0, 1, input_rx, source_rx); - if invalid_nodes { - assert!(matches!(result, Err(QbftError::InvalidNodes { nodes: 0 }))); - } else { - assert!(matches!( - result, - Err(QbftError::InvalidFifoLimit { fifo_limit: 0 }) - )); - } + def.nodes = 0; + def.fifo_limit = 1; + + assert!(matches!( + run_with_definition(&def), + Err(QbftError::InvalidNodes { nodes: 0 }) + )); +} + +// Tests definition validation at the `run` boundary. +// Expect invalid FIFO limit to return a typed error. +#[test] +fn invalid_fifo_limit_rejected() { + let mut def = noop_definition(); + def.nodes = 4; + def.fifo_limit = 0; + + assert!(matches!( + run_with_definition(&def), + Err(QbftError::InvalidFifoLimit { fifo_limit: 0 }) + )); } // Tests cancellation under a continuously hot receive channel. @@ -1442,7 +1636,7 @@ fn run_cancels_under_hot_receive_stream() { def.nodes = 4; def.fifo_limit = 100; - let (receive_tx, receive_rx) = mpmc::bounded::>(1024); + let (receive_tx, receive_rx) = mpmc::bounded::>(1024); let transport = Transport { receive: receive_rx, ..noop_transport() @@ -1499,15 +1693,17 @@ fn classify_rules() { def.is_leader = Box::new(make_is_leader(4)); let preprepare = new_msg(MSG_PRE_PREPARE, 0, 1, 1, 1, 0, 0, 0, None); - assert!(classify(&def, &0, 1, 2, &HashMap::new(), &preprepare).0 == UPON_JUSTIFIED_PRE_PREPARE); + assert_upon_rule( + UPON_JUSTIFIED_PRE_PREPARE, + classify(&def, &0, 1, 2, &HashMap::new(), &preprepare).0, + ); - let prepares = vec![ - new_msg(MSG_PREPARE, 0, 1, 1, 2, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 2, 1, 2, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 3, 1, 2, 0, 0, 0, None), - ]; + let prepares = new_prepare_quorum(1, 2); let buffer = buffer_by_source(&prepares); - assert!(classify(&def, &0, 1, 2, &buffer, &prepares[2]).0 == UPON_QUORUM_PREPARES); + assert_upon_rule( + UPON_QUORUM_PREPARES, + classify(&def, &0, 1, 2, &buffer, &prepares[2]).0, + ); let commits = vec![ new_msg(MSG_COMMIT, 0, 1, 1, 2, 0, 0, 0, None), @@ -1515,7 +1711,10 @@ fn classify_rules() { new_msg(MSG_COMMIT, 0, 3, 1, 2, 0, 0, 0, None), ]; let buffer = buffer_by_source(&commits); - assert!(classify(&def, &0, 1, 2, &buffer, &commits[2]).0 == UPON_QUORUM_COMMITS); + assert_upon_rule( + UPON_QUORUM_COMMITS, + classify(&def, &0, 1, 2, &buffer, &commits[2]).0, + ); let future_round_changes = vec![ new_msg(MSG_ROUND_CHANGE, 0, 1, 3, 0, 0, 0, 0, None), @@ -1526,15 +1725,11 @@ fn classify_rules() { classify(&def, &0, 1, 2, &buffer, &future_round_changes[1]).0 == UPON_F_PLUS1_ROUND_CHANGES ); - let unjust_round_changes = vec![ - new_msg(MSG_ROUND_CHANGE, 0, 1, 1, 0, 0, 2, 9, None), - new_msg(MSG_ROUND_CHANGE, 0, 2, 1, 0, 0, 2, 9, None), - new_msg(MSG_ROUND_CHANGE, 0, 3, 1, 0, 0, 2, 9, None), - ]; + let unjust_round_changes = new_round_change_quorum(1, 2, 9); let buffer = buffer_by_source(&unjust_round_changes); - assert!( - classify(&def, &0, 1, 2, &buffer, &unjust_round_changes[2]).0 - == UPON_UNJUST_QUORUM_ROUND_CHANGES + assert_upon_rule( + UPON_UNJUST_QUORUM_ROUND_CHANGES, + classify(&def, &0, 1, 2, &buffer, &unjust_round_changes[2]).0, ); } @@ -1545,33 +1740,21 @@ fn classify_rules() { fn justified_qrc_j1_and_j2() { let mut def = noop_definition(); def.nodes = 4; - let j1 = vec![ - new_msg(MSG_ROUND_CHANGE, 0, 1, 2, 0, 0, 0, 0, None), - new_msg(MSG_ROUND_CHANGE, 0, 2, 2, 0, 0, 0, 0, None), - new_msg(MSG_ROUND_CHANGE, 0, 3, 2, 0, 0, 0, 0, None), - ]; + let j1 = new_round_change_quorum(2, 0, 0); assert_eq!(Some(0), contains_justified_qrc(&def, &j1, 2)); assert_eq!(3, get_justified_qrc(&def, &j1, 2).unwrap().len()); - let j2 = vec![ + let mut j2 = vec![ new_msg(MSG_ROUND_CHANGE, 0, 1, 2, 0, 0, 1, 7, None), new_msg(MSG_ROUND_CHANGE, 0, 2, 2, 0, 0, 1, 7, None), new_msg(MSG_ROUND_CHANGE, 0, 3, 2, 0, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 1, 1, 7, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 2, 1, 7, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 3, 1, 7, 0, 0, 0, None), ]; + j2.extend(new_prepare_quorum(1, 7)); assert_eq!(Some(7), contains_justified_qrc(&def, &j2, 2)); assert!(get_justified_qrc(&def, &j2, 2).unwrap().len() >= 6); - let invalid_pr = vec![ - new_msg(MSG_ROUND_CHANGE, 0, 1, 2, 0, 0, 2, 7, None), - new_msg(MSG_ROUND_CHANGE, 0, 2, 2, 0, 0, 2, 7, None), - new_msg(MSG_ROUND_CHANGE, 0, 3, 2, 0, 0, 2, 7, None), - new_msg(MSG_PREPARE, 0, 1, 2, 7, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 2, 2, 7, 0, 0, 0, None), - new_msg(MSG_PREPARE, 0, 3, 2, 7, 0, 0, 0, None), - ]; + let mut invalid_pr = new_round_change_quorum(2, 2, 7); + invalid_pr.extend(new_prepare_quorum(2, 7)); assert_eq!(None, contains_justified_qrc(&def, &invalid_pr, 2)); assert!(get_justified_qrc(&def, &invalid_pr, 2).is_none()); } @@ -1692,8 +1875,8 @@ fn compare_success_error_cached_value_source_and_timeout() { let (_vs_tx, vs_rx) = mpmc::bounded::(1); let timer = mpmc::never(); let mut def = noop_definition(); - def.compare = Arc::new(|_, _, _, _, return_err, _| { - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + def.compare = Arc::new(|req| { + req.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); }); assert!(matches!( compare(cts.token(), &def, &msg, &vs_rx, 0, &timer), @@ -1701,8 +1884,8 @@ fn compare_success_error_cached_value_source_and_timeout() { )); let mut def = noop_definition(); - def.compare = Arc::new(|_, _, _, _, return_err, _| { - let return_err = return_err.clone(); + def.compare = Arc::new(|req| { + let return_err = req.return_err.clone(); thread::spawn(move || { thread::sleep(Duration::from_millis(10)); return_err.send(Ok(())).expect(WRITE_CHAN_ERR); @@ -1714,8 +1897,8 @@ fn compare_success_error_cached_value_source_and_timeout() { )); let mut def = noop_definition(); - def.compare = Arc::new(|_, _, _, _, return_err, _| { - return_err + def.compare = Arc::new(|req| { + req.return_err .send(Err(QbftError::CompareError)) .expect(WRITE_CHAN_ERR); }); @@ -1727,19 +1910,17 @@ fn compare_success_error_cached_value_source_and_timeout() { let (vs_tx, vs_rx) = mpmc::bounded::(1); vs_tx.send(42).expect(WRITE_CHAN_ERR); let mut def = noop_definition(); - def.compare = Arc::new( - |_, _, input_value_source_ch, input_value_source, return_err, return_value| { - let cached = if *input_value_source == 0 { - let value = input_value_source_ch.recv().expect(READ_CHAN_ERR); - return_value.send(value).expect(WRITE_CHAN_ERR); - value - } else { - *input_value_source - }; - assert_eq!(42, cached); - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); - }, - ); + def.compare = Arc::new(|req| { + let cached = if *req.input_value_source == 0 { + let value = req.input_value_source_ch.recv().expect(READ_CHAN_ERR); + req.return_value.send(value).expect(WRITE_CHAN_ERR); + value + } else { + *req.input_value_source + }; + assert_eq!(42, cached); + req.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + }); assert!(matches!( compare(cts.token(), &def, &msg, &vs_rx, 0, &timer), (42, Ok(())) @@ -1748,21 +1929,19 @@ fn compare_success_error_cached_value_source_and_timeout() { let (vs_tx, vs_rx) = mpmc::bounded::(1); vs_tx.send(43).expect(WRITE_CHAN_ERR); let mut def = noop_definition(); - def.compare = Arc::new( - |_, _, input_value_source_ch, input_value_source, return_err, return_value| { - let cached = if *input_value_source == 0 { - let value = input_value_source_ch.recv().expect(READ_CHAN_ERR); - return_value.send(value).expect(WRITE_CHAN_ERR); - value - } else { - *input_value_source - }; - assert_eq!(43, cached); - return_err - .send(Err(QbftError::CompareError)) - .expect(WRITE_CHAN_ERR); - }, - ); + def.compare = Arc::new(|req| { + let cached = if *req.input_value_source == 0 { + let value = req.input_value_source_ch.recv().expect(READ_CHAN_ERR); + req.return_value.send(value).expect(WRITE_CHAN_ERR); + value + } else { + *req.input_value_source + }; + assert_eq!(43, cached); + req.return_err + .send(Err(QbftError::CompareError)) + .expect(WRITE_CHAN_ERR); + }); assert!(matches!( compare(cts.token(), &def, &msg, &vs_rx, 0, &timer), (43, Err(QbftError::CompareError)) @@ -1771,9 +1950,9 @@ fn compare_success_error_cached_value_source_and_timeout() { let (timer_tx, timer_rx) = mpmc::bounded(1); timer_tx.send(time::Instant::now()).expect(WRITE_CHAN_ERR); let mut def = noop_definition(); - def.compare = Arc::new(|_, _, _, _, return_err, _| { + def.compare = Arc::new(|req| { thread::sleep(Duration::from_millis(20)); - let _ = return_err.send(Ok(())); + let _ = req.return_err.send(Ok(())); }); assert!(matches!( compare(cts.token(), &def, &msg, &vs_rx, 44, &timer_rx), @@ -1792,11 +1971,11 @@ fn compare_timeout_does_not_wait_for_blocked_callback() { timer_tx.send(time::Instant::now()).expect(WRITE_CHAN_ERR); let mut def = noop_definition(); - def.compare = Arc::new(|ct, _, _, _, return_err, _| { - while !ct.is_canceled() { + def.compare = Arc::new(|req| { + while !req.ct.is_canceled() { thread::sleep(Duration::from_millis(1)); } - let _ = return_err.send(Ok(())); + let _ = req.return_err.send(Ok(())); }); let (result_tx, result_rx) = mpmc::bounded(1); @@ -1826,7 +2005,7 @@ fn compare_callback_exit_without_status_waits_for_timer() { let (callback_done_tx, callback_done_rx) = mpmc::bounded(1); let mut def = noop_definition(); - def.compare = Arc::new(move |_, _, _, _, _, _| { + def.compare = Arc::new(move |_| { callback_done_tx.send(()).expect(WRITE_CHAN_ERR); }); @@ -1868,13 +2047,13 @@ fn compare_parent_cancel_cancels_callback_token() { let (token_cancelled_tx, token_cancelled_rx) = mpmc::bounded(1); let mut def = noop_definition(); - def.compare = Arc::new(move |ct, _, _, _, return_err, _| { + def.compare = Arc::new(move |req| { compare_started_tx.send(()).expect(WRITE_CHAN_ERR); - while !ct.is_canceled() { + while !req.ct.is_canceled() { thread::sleep(Duration::from_millis(1)); } token_cancelled_tx.send(()).expect(WRITE_CHAN_ERR); - let _ = return_err.send(Ok(())); + let _ = req.return_err.send(Ok(())); }); let (result_tx, result_rx) = mpmc::bounded(1); @@ -1919,20 +2098,20 @@ fn run_parent_cancel_during_compare_does_not_prepare() { let mut def = noop_definition(); def.nodes = 4; def.fifo_limit = 100; - def.is_leader = Box::new(|_, _, process| process == LEADER); - def.compare = Arc::new(move |ct, _, _, _, return_err, _| { + def.is_leader = Box::new(|req| req.process == LEADER); + def.compare = Arc::new(move |req| { compare_started_tx.send(()).expect(WRITE_CHAN_ERR); - while !ct.is_canceled() { + while !req.ct.is_canceled() { thread::sleep(Duration::from_millis(1)); } compare_cancelled_tx.send(()).expect(WRITE_CHAN_ERR); - let _ = return_err.send(Ok(())); + let _ = req.return_err.send(Ok(())); }); let (broadcast_tx, broadcast_rx) = mpmc::bounded(1); let transport = Transport { - broadcast: Box::new(move |_, type_, _, _, _, _, _, _, _| { - broadcast_tx.send(type_).expect(WRITE_CHAN_ERR); + broadcast: Box::new(move |req| { + broadcast_tx.send(req.type_).expect(WRITE_CHAN_ERR); Ok(()) }), receive: receive_rx, @@ -1969,7 +2148,7 @@ fn run_parent_cancel_during_compare_does_not_prepare() { ); } -fn buffer_by_source(msgs: &[Msg]) -> HashMap>> { +fn buffer_by_source(msgs: &[Msg]) -> HashMap>> { let mut buffer = HashMap::new(); for msg in msgs { buffer @@ -2026,15 +2205,10 @@ fn test_qbft_chain_split(test: ChainSplitTest) { // Keep peer iteration deterministic. These fake-clock tests assert exact // rounds, and broadcast fanout order affects which node observes quorums // first when tests run in parallel. - let mut receives = BTreeMap::< - i64, - ( - mpmc::Sender>, - mpmc::Receiver>, - ), - >::new(); - let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); - let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); + let mut receives = + BTreeMap::>, mpmc::Receiver>)>::new(); + let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); + let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); let (run_chan_tx, run_chan_rx) = mpmc::bounded::<(i64, RunOutcome)>(N); let instance = 0; @@ -2043,135 +2217,132 @@ fn test_qbft_chain_split(test: ChainSplitTest) { new_timer: { let clock = clock.clone(); Box::new(move |round| { - clock.new_timer(Duration::from_secs(u64::pow(2, (round as u32) - 1))) + let (receive, stop) = + clock.new_timer(Duration::from_secs(u64::pow(2, (round as u32) - 1))); + Timer { receive, stop } }) }, decide: { let result_chan_tx = result_chan_tx.clone(); - Box::new(move |_, _, _, q_commit| { - result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); + Box::new(move |req| { + result_chan_tx + .send(req.qcommit.clone()) + .expect(WRITE_CHAN_ERR); }) }, compare: { let pending_compares = pending_compares.clone(); - Arc::new( - move |_, - qcommit, - input_value_source_ch, - input_value_source, - return_err, - return_value| { - let _guard = PendingCompareGuard { - pending_compares: pending_compares.clone(), - }; - let leader_value_source = qcommit.value_source().expect("value source"); - let local = if *input_value_source == 0 { - let value = input_value_source_ch.recv().expect(READ_CHAN_ERR); - return_value.send(value).expect(WRITE_CHAN_ERR); - value - } else { - *input_value_source - }; - - if leader_value_source != local { - return_err - .send(Err(QbftError::CompareError)) - .expect(WRITE_CHAN_ERR); - return; - } + Arc::new(move |req| { + let _guard = PendingCompareGuard { + pending_compares: pending_compares.clone(), + }; + let leader_value_source = req.qcommit.value_source().expect("value source"); + let local = if *req.input_value_source == 0 { + let value = req.input_value_source_ch.recv().expect(READ_CHAN_ERR); + req.return_value.send(value).expect(WRITE_CHAN_ERR); + value + } else { + *req.input_value_source + }; - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); - }, - ) - }, - nodes: N as i64, - fifo_limit: FIFO_LIMIT, - log_round_change: { - let clock = clock.clone(); - let trace = trace.clone(); - let pending_timer_actions = pending_timer_actions.clone(); - Box::new(move |_, process, round, new_round, upon_rule, _| { - if upon_rule == UPON_ROUND_TIMEOUT { - complete_timer_action(&pending_timer_actions); + if leader_value_source != local { + req.return_err + .send(Err(QbftError::CompareError)) + .expect(WRITE_CHAN_ERR); + return; } - trace.push(format!( - "{:?} - {}@{} change to {} ~= {}", - clock.elapsed(), - process, - round, - new_round, - upon_rule - )); + req.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); }) }, - log_unjust: { - let trace = trace.clone(); - Box::new(move |_, process, msg| { - trace.push(format!("Unjust: process={} msg={:?}", process, msg)) - }) - }, - log_upon_rule: { - let clock = clock.clone(); - let trace = trace.clone(); - let pending_compares = pending_compares.clone(); - Box::new(move |_, process, round, msg, upon_rule| { - if upon_rule == UPON_JUSTIFIED_PRE_PREPARE { - pending_compares.fetch_add(1, Ordering::SeqCst); - } + nodes: N as i64, + fifo_limit: FIFO_LIMIT, + logger: QbftLogger { + round_change: { + let clock = clock.clone(); + let trace = trace.clone(); + let pending_timer_actions = pending_timer_actions.clone(); + Box::new(move |req| { + if req.upon_rule == UPON_ROUND_TIMEOUT { + complete_timer_action(&pending_timer_actions); + } - trace.push(format!( - "{:?} {} => {}@{} -> {}@{} ~= {}", - clock.elapsed(), - msg.source(), - msg.type_(), - msg.round(), - process, - round, - upon_rule - )); - }) + trace.push(format!( + "{:?} - {}@{} change to {} ~= {}", + clock.elapsed(), + req.process, + req.round, + req.new_round, + req.upon_rule + )); + }) + }, + unjust: { + let trace = trace.clone(); + Box::new(move |req| { + trace.push(format!("Unjust: process={} msg={:?}", req.process, req.msg)) + }) + }, + upon_rule: { + let clock = clock.clone(); + let trace = trace.clone(); + let pending_compares = pending_compares.clone(); + Box::new(move |req| { + if req.upon_rule == UPON_JUSTIFIED_PRE_PREPARE { + pending_compares.fetch_add(1, Ordering::SeqCst); + } + + trace.push(format!( + "{:?} {} => {}@{} -> {}@{} ~= {}", + clock.elapsed(), + req.msg.source(), + req.msg.type_(), + req.msg.round(), + req.process, + req.round, + req.upon_rule + )); + }) + }, }, }); thread::scope(|s| { for i in 1..=N as i64 { - let (sender, receiver) = mpmc::bounded::>(1000); + let (sender, receiver) = mpmc::bounded::>(1000); receives.insert(i, (sender.clone(), receiver.clone())); let broadcast_tx = broadcast_tx.clone(); let trace = trace.clone(); let clock = clock.clone(); let transport = Transport { - broadcast: Box::new( - move |_, type_, instance, source, round, value, pr, pv, justification| { - if round > MAX_ROUND { - return Err(QbftError::MaxRoundReached); - } + broadcast: Box::new(move |req| { + if req.round > MAX_ROUND { + return Err(QbftError::MaxRoundReached); + } - trace.push(format!( - "{:?} {} => {}@{}", - clock.elapsed(), - source, - type_, - round - )); - let msg = new_msg( - type_, - *instance, - source, - round, - *value, - *value, - pr, - *pv, - justification, - ); - sender.send(msg.clone()).expect(WRITE_CHAN_ERR); - broadcast_tx.send(msg).expect(WRITE_CHAN_ERR); - Ok(()) - }, - ), + trace.push(format!( + "{:?} {} => {}@{}", + clock.elapsed(), + req.source, + req.type_, + req.round + )); + let msg = new_msg( + req.type_, + *req.instance, + req.source, + req.round, + *req.value, + *req.value, + req.prepared_round, + *req.prepared_value, + req.justification, + ); + sender.send(msg.clone()).expect(WRITE_CHAN_ERR); + broadcast_tx.send(msg).expect(WRITE_CHAN_ERR); + Ok(()) + }), receive: receiver, }; @@ -2208,11 +2379,33 @@ fn test_qbft_chain_split(test: ChainSplitTest) { } } - let mut results = BTreeMap::>::new(); + let mut results = BTreeMap::>::new(); let mut count = 0; let mut decided = false; let mut done = 0; let mut last_progress = time::Instant::now(); + let chain_split_seed = seed_from_label(CHAIN_SPLIT_SEED_LABEL); + // The no-consensus halt case must reach round 11; using Go's 1ms tick + // here makes this Rust harness exceed its real-time guard, so only that + // halt path fast-forwards fake time. + let tick = if test.should_halt { + Duration::from_millis(100) + } else { + Duration::from_millis(1) + }; + let timeout_limit = if test.should_halt { + let max_round = u32::try_from(MAX_ROUND).expect("MAX_ROUND fits u32"); + let seconds = 1_u64 + .checked_shl( + max_round + .checked_add(1) + .expect("MAX_ROUND permits timeout limit"), + ) + .expect("MAX_ROUND permits timeout limit"); + Duration::from_secs(seconds) + } else { + Duration::from_secs(60) + }; loop { mpmc::select! { @@ -2224,13 +2417,7 @@ fn test_qbft_chain_split(test: ChainSplitTest) { continue; } out_tx.send(msg.clone()).expect(WRITE_CHAN_ERR); - if deterministic_unit( - seed_from_label(CHAIN_SPLIT_SEED_LABEL), - &msg, - *target, - TEST_STREAM_DUPLICATE, - ) < 0.1 - { + if deterministic_unit(chain_split_seed, &msg, *target, TEST_STREAM_DUPLICATE) < 0.1 { out_tx.send(msg.clone()).expect(WRITE_CHAN_ERR); } } @@ -2249,7 +2436,7 @@ fn test_qbft_chain_split(test: ChainSplitTest) { ); } - for commit in q_commit.clone() { + for commit in &q_commit { for previous in results.values() { if previous.value() != commit.value() { cts.cancel(); @@ -2289,7 +2476,7 @@ fn test_qbft_chain_split(test: ChainSplitTest) { ); } } - results.insert(commit.source(), commit); + results.insert(commit.source(), commit.clone()); } count += 1; if count == N { @@ -2347,26 +2534,9 @@ fn test_qbft_chain_split(test: ChainSplitTest) { // Matches the Go harness throttle; ordering correctness comes // from the pending-work barriers, not this duration. thread::sleep(Duration::from_micros(1)); - // The no-consensus halt case must reach round 11; using Go's - // 1ms tick here makes this Rust harness exceed its real-time - // guard, so only that halt path fast-forwards fake time. - let tick = if test.should_halt { - Duration::from_millis(100) - } else { - Duration::from_millis(1) - }; clock.advance_and_wait(tick, &pending_timer_actions); last_progress = time::Instant::now(); - let limit = if test.should_halt { - let max_round = u32::try_from(MAX_ROUND).expect("MAX_ROUND fits u32"); - let seconds = 1_u64 - .checked_shl(max_round.checked_add(1).expect("MAX_ROUND permits timeout limit")) - .expect("MAX_ROUND permits timeout limit"); - Duration::from_secs(seconds) - } else { - Duration::from_secs(60) - }; - if clock.elapsed() > limit { + if clock.elapsed() > timeout_limit { cts.cancel(); clock.cancel(); panic!("chain split hang: decided={decided} done={done} count={count} elapsed={:?}\n{}", clock.elapsed(), trace.dump()); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 956f17e8..554fcfb6 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -10,16 +10,6 @@ //! - No domain-specific dependencies. //! - Explicit justifications. -// TODO: Remove these checks -#![allow(missing_docs)] -#![allow(clippy::type_complexity)] -#![allow(clippy::collapsible_if)] -#![allow(clippy::cast_sign_loss)] -#![allow(clippy::cast_precision_loss)] -#![allow(clippy::cast_possible_wrap)] -#![allow(clippy::cast_possible_truncation)] -#![allow(clippy::arithmetic_side_effects)] - use cancellation::{CancellationToken, CancellationTokenSource}; use crossbeam::channel as mpmc; use std::{ @@ -31,6 +21,16 @@ use std::{ sync, thread, time, }; +mod callbacks; +use callbacks::{ + BroadcastFn, CompareFn, DecideFn, LeaderFn, RoundChangeLoggerFn, UnjustLoggerFn, + UponRuleLoggerFn, +}; +pub use callbacks::{ + BroadcastRequest, CompareRequest, DecideRequest, LeaderRequest, RoundChangeLog, Timer, + UnjustLog, UponRuleLog, +}; + type Result = std::result::Result; // The `cancellation` crate is callback-based, not channel-based, so it cannot @@ -38,139 +38,105 @@ type Result = std::result::Result; // does not need sub-millisecond latency, and idle instances should stay cheap. const CANCELLATION_POLL_INTERVAL: time::Duration = time::Duration::from_millis(50); -type CompareFn = dyn Fn( - /* ct */ &CancellationToken, - /* qcommit */ &Msg, - /* input_value_source_ch */ &mpmc::Receiver, - /* input_value_source */ &C, - /* return_err */ &mpmc::Sender>, - /* return_value */ &mpmc::Sender, - ) + Send - + Sync - + 'static; +/// Associated types used by a QBFT instance. +pub trait QbftTypes: 'static { + /// Consensus instance identifier. + type Instance: Send + Sync + 'static; + /// Consensus value. + type Value: Eq + Hash + Default + 'static; + /// Application value used by the compare callback. + type Compare: Clone + Send + Sync + Default + 'static; +} +/// Errors returned by the QBFT core. #[derive(Debug, thiserror::Error)] pub enum QbftError { + /// Round timer expired before compare completed. #[error("Timeout")] TimeoutError, + /// Leader proposal failed application-level comparison. #[error("Compare leader value with local value failed")] CompareError, + /// Compare returned an error variant that core does not expect. #[error("bug: expected only comparison or timeout error, got {0}")] UnexpectedCompareError(Box), + /// Parent cancellation token was canceled. #[error("context canceled")] ContextCanceled, + /// Test or caller configured maximum round was reached. #[error("Maximum round reached")] MaxRoundReached, + /// Own input value was the null/default value. #[error("Zero input value not supported")] ZeroInputValue, + /// Node count must be positive. #[error("invalid node count: must be greater than zero, got {nodes}")] - InvalidNodes { nodes: i64 }, + InvalidNodes { + /// Configured node count. + nodes: i64, + }, + /// Per-source FIFO limit must be positive. #[error("invalid FIFO limit: must be greater than zero, got {fifo_limit}")] - InvalidFifoLimit { fifo_limit: i64 }, + InvalidFifoLimit { + /// Configured FIFO limit. + fifo_limit: i64, + }, + /// Receive channel closed unexpectedly. #[error("Failed to read from channel: {0}")] ChannelError(#[from] mpmc::RecvError), } /// Abstracts the transport layer between processes in the consensus system. -pub struct Transport -where - V: PartialEq, -{ +pub struct Transport { /// Broadcast sends a message with the provided fields to all other /// processes in the system (including this process). /// /// Note that an error exits the algorithm. - pub broadcast: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* type_ */ MessageType, - /* instance */ &I, - /* source */ i64, - /* round */ i64, - /* value */ &V, - /* pr */ i64, - /* pv */ &V, - /* justification */ Option<&Vec>>, - ) -> Result<()> - + Send - + Sync, - >, + pub broadcast: Box>, /// Receive returns a stream of messages received /// from other processes in the system (including this process). - pub receive: mpmc::Receiver>, + pub receive: mpmc::Receiver>, +} + +/// Debug hooks for QBFT state transitions and rejected messages. +pub struct QbftLogger { + /// Called when an upon-rule fires. + pub upon_rule: Box>, + /// Called when the local process changes round. + pub round_change: Box>, + /// Called when an unjustified message is rejected. + pub unjust: Box>, } /// Defines the consensus system parameters that are external to the qbft /// algorithm. This remains constant across multiple instances of consensus /// (calls to `run`). -pub struct Definition -where - V: PartialEq, -{ +pub struct Definition { /// A deterministic leader election function. - pub is_leader: - Box bool + Send + Sync>, + pub is_leader: Box>, /// Returns a new timer channel and stop function for the round - pub new_timer: Box< - dyn Fn(/* round */ i64) -> (mpmc::Receiver, Box) - + Send - + Sync, - >, + pub new_timer: Box Timer + Send + Sync>, /// Charon parity hook called when the leader proposes a value. The core /// algorithm only runs this callback and reacts to its result; any /// value-source comparison policy belongs to the caller. - pub compare: sync::Arc>, + pub compare: sync::Arc>, /// Called when consensus has been reached on a value. - pub decide: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* instance */ &I, - /* value */ &V, - /* qcommit */ &Vec>, - ) + Send - + Sync, - >, - - /// Allows debug logging of triggered upon rules on message receipt. - /// It includes the rule that triggered it and all received round messages. - pub log_upon_rule: Box< - dyn Fn( - /* instance */ &I, - /* process */ i64, - /* round */ i64, - /* msg */ &Msg, - /* upon_rule */ UponRule, - ) + Send - + Sync, - >, - /// Allows debug logging of round changes. - pub log_round_change: Box< - dyn Fn( - /* instance */ &I, - /* process */ i64, - /* round */ i64, - /* new_round */ i64, - /* upon_rule */ UponRule, - /* msgs */ &Vec>, - ) + Send - + Sync, - >, - - /// Allows debug logging of unjust messages. - pub log_unjust: - Box) + Send + Sync>, + pub decide: Box>, + + /// Debug logging callbacks. + pub logger: QbftLogger, /// Total number of nodes/processes participating in consensus. pub nodes: i64, @@ -179,20 +145,36 @@ where pub fifo_limit: i64, } -impl Definition -where - V: PartialEq, -{ +impl Definition { /// Quorum count for the system. /// See IBFT 2.0 paper for correct formula: pub fn quorum(&self) -> i64 { - (self.nodes as u64 * 2).div_ceil(3) as i64 + self.nodes + .checked_mul(2) + .and_then(|nodes| nodes.checked_add(2)) + .and_then(|nodes| nodes.checked_div(3)) + .expect("node count permits quorum calculation") } /// Maximum number of faulty/byzantine nodes supported in the system. /// See IBFT 2.0 paper for correct formula: pub fn faulty(&self) -> i64 { - (self.nodes - 1) / 3 + self.nodes + .checked_sub(1) + .and_then(|nodes| nodes.checked_div(3)) + .expect("node count permits faulty-node calculation") + } + + fn quorum_count(&self) -> usize { + usize::try_from(self.quorum()).expect("quorum fits usize") + } + + fn faulty_plus_one_count(&self) -> usize { + let threshold = self + .faulty() + .checked_add(1) + .expect("faulty-node count permits threshold calculation"); + usize::try_from(threshold).expect("faulty-node threshold fits usize") } } @@ -202,11 +184,17 @@ pub struct MessageType(i64); // NOTE: message type ordering MUST not change, since it breaks backwards // compatibility. +/// Unknown message type. pub const MSG_UNKNOWN: MessageType = MessageType(0); +/// PRE-PREPARE message type. pub const MSG_PRE_PREPARE: MessageType = MessageType(1); +/// PREPARE message type. pub const MSG_PREPARE: MessageType = MessageType(2); +/// COMMIT message type. pub const MSG_COMMIT: MessageType = MessageType(3); +/// ROUND-CHANGE message type. pub const MSG_ROUND_CHANGE: MessageType = MessageType(4); +/// DECIDED catch-up message type. pub const MSG_DECIDED: MessageType = MessageType(5); const MSG_SENTINEL: MessageType = MessageType(6); // intentionally not public @@ -235,48 +223,54 @@ impl Display for MessageType { } /// Defines the inter process messages. -pub trait SomeMsg: Send + Sync + fmt::Debug -where - V: PartialEq, -{ +pub trait SomeMsg: Send + Sync + fmt::Debug { /// Type of the message. fn type_(&self) -> MessageType; /// Consensus instance. - fn instance(&self) -> I; + fn instance(&self) -> T::Instance; /// Process that sent the message. fn source(&self) -> i64; /// The round the message pertains to. fn round(&self) -> i64; /// The value being proposed, usually a hash. - fn value(&self) -> V; + fn value(&self) -> T::Value; /// Usually the value that was hashed and is returned in `value`. - fn value_source(&self) -> Result; + fn value_source(&self) -> Result; /// The justified prepared round. fn prepared_round(&self) -> i64; /// The justified prepared value. - fn prepared_value(&self) -> V; + fn prepared_value(&self) -> T::Value; /// Set of messages that explicitly justifies this message. - fn justification(&self) -> Vec>; + fn justification(&self) -> Vec>; /// Cast as `Any` to allow downcasting. fn as_any(&self) -> &dyn any::Any; } /// Alias for any `Msg` implementation tracked by reference counting. -pub type Msg = sync::Arc>; +pub type Msg = sync::Arc>; /// Defines the event based rules that are triggered when messages are received. #[derive(PartialEq, Eq, Hash, Clone, Copy)] pub struct UponRule(i64); +/// No upon-rule fired. pub const UPON_NOTHING: UponRule = UponRule(0); +/// PRE-PREPARE was justified. pub const UPON_JUSTIFIED_PRE_PREPARE: UponRule = UponRule(1); +/// Quorum PREPARE messages was received. pub const UPON_QUORUM_PREPARES: UponRule = UponRule(2); +/// Quorum COMMIT messages was received. pub const UPON_QUORUM_COMMITS: UponRule = UponRule(3); +/// Quorum ROUND-CHANGE messages was received but not justified. pub const UPON_UNJUST_QUORUM_ROUND_CHANGES: UponRule = UponRule(4); +/// F+1 future ROUND-CHANGE messages was received. pub const UPON_F_PLUS1_ROUND_CHANGES: UponRule = UponRule(5); +/// Quorum ROUND-CHANGE messages was received. pub const UPON_QUORUM_ROUND_CHANGES: UponRule = UponRule(6); +/// DECIDED message was justified. pub const UPON_JUSTIFIED_DECIDED: UponRule = UponRule(7); +/// Round timer expired. pub const UPON_ROUND_TIMEOUT: UponRule = UponRule(8); // This is not triggered by a message, but by a timer. impl Display for UponRule { @@ -311,76 +305,75 @@ struct DedupKey { /// remains active so it can answer later `ROUND_CHANGE` messages with `DECIDED` /// catch-up messages. /// -/// `I` identifies the consensus instance, `V` is the comparable proposed value, -/// and `C` is the application value used by `Definition::compare` to compare a -/// leader proposal with the local input source. -pub fn run( +/// `T::Instance` identifies the consensus instance, `T::Value` is the +/// comparable proposed value, and `T::Compare` is the application value used by +/// `Definition::compare` to compare a leader proposal with the local input +/// source. +pub fn run( ct: &CancellationToken, - d: &Definition, - t: &Transport, - instance: &I, + d: &Definition, + t: &Transport, + instance: &T::Instance, process: i64, - mut input_value_ch: mpmc::Receiver, - input_value_source_ch: mpmc::Receiver, -) -> Result<()> -where - I: Send + Sync + 'static, - V: PartialEq + Eq + Hash + Default + 'static, - C: Clone + Send + Sync + Default + 'static, -{ + mut input_value_ch: mpmc::Receiver, + input_value_source_ch: mpmc::Receiver, +) -> Result<()> { validate_definition(d)?; + let fifo_limit = usize::try_from(d.fifo_limit).expect("validated FIFO limit fits usize"); // === State === let round: Cell = Cell::new(1); - let input_value: RefCell = RefCell::new(Default::default()); - let mut input_value_source: C = Default::default(); - let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). + let input_value: RefCell = RefCell::new(Default::default()); + let mut input_value_source: T::Compare = Default::default(); + let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). let prepared_round: Cell = Cell::new(0); - let prepared_value: RefCell = RefCell::new(Default::default()); + let prepared_value: RefCell = RefCell::new(Default::default()); let mut compare_failure_round: i64 = 0; - let prepared_justification: RefCell>>> = RefCell::new(None); - let mut q_commit: Option>> = None; - let buffer: RefCell>>> = RefCell::new(HashMap::new()); - let dedup_rules: RefCell> = RefCell::new(HashMap::new()); + let prepared_justification: RefCell>>> = RefCell::new(None); + let mut q_commit: Option>> = None; + let buffer: RefCell>>> = RefCell::new(HashMap::new()); + let dedup_rules: RefCell> = RefCell::new(HashSet::new()); let mut timer_chan: mpmc::Receiver; - let mut stop_timer: Box; + let mut stop_timer: Box; // === Helpers == // Broadcasts a non-ROUND-CHANGE message for current round. let broadcast_msg = - |type_: MessageType, value: &V, justification: Option<&Vec>>| { - (t.broadcast)( + |type_: MessageType, value: &T::Value, justification: Option<&Vec>>| { + let default_value = T::Value::default(); + (t.broadcast)(BroadcastRequest { ct, type_, instance, - process, - round.get(), + source: process, + round: round.get(), value, - 0, - &Default::default(), + prepared_round: 0, + prepared_value: &default_value, justification, - ) + }) }; // Broadcasts a ROUND-CHANGE message with current state. let broadcast_round_change = || { - (t.broadcast)( + let default_value = T::Value::default(); + (t.broadcast)(BroadcastRequest { ct, - MSG_ROUND_CHANGE, + type_: MSG_ROUND_CHANGE, instance, - process, - round.get(), - &Default::default(), - prepared_round.get(), - &prepared_value.borrow(), - prepared_justification.borrow().as_ref(), - ) + source: process, + round: round.get(), + value: &default_value, + prepared_round: prepared_round.get(), + prepared_value: &prepared_value.borrow(), + justification: prepared_justification.borrow().as_ref(), + }) }; // Broadcasts a PRE-PREPARE message with current state // and our own input value if present, otherwise it caches the justification // to be used when the input value becomes available. - let broadcast_own_pre_prepare = |justification: Vec>| { + let broadcast_own_pre_prepare = |justification: Vec>| { if ppj_cache.borrow().is_some() { panic!("bug: justification cache must be none") } @@ -395,13 +388,17 @@ where }; // Adds a message to each process' FIFO queue - let buffer_msg = |msg: &Msg| { + let buffer_msg = |msg: &Msg| { let mut b = buffer.borrow_mut(); let fifo = b.entry(msg.source()).or_default(); fifo.push(msg.clone()); - if fifo.len() as i64 > d.fifo_limit { - fifo.drain(0..(fifo.len() - d.fifo_limit as usize)); + if fifo.len() > fifo_limit { + let expired = fifo + .len() + .checked_sub(fifo_limit) + .expect("FIFO length exceeds limit"); + fifo.drain(0..expired); } }; @@ -409,7 +406,7 @@ where // change. let is_duplicated_rule = |upon_rule: UponRule, round: i64| { let k = DedupKey { upon_rule, round }; - dedup_rules.borrow_mut().insert(k, true).is_some() + !dedup_rules.borrow_mut().insert(k) }; // Updates round and clears the rule dedup state. @@ -418,28 +415,34 @@ where return; } - (d.log_round_change)( + (d.logger.round_change)(RoundChangeLog { instance, process, - round.get(), + round: round.get(), new_round, - rule, - &extract_round_messages(&buffer.borrow(), round.get()), - ); + upon_rule: rule, + msgs: &extract_round_messages(&buffer.borrow(), round.get()), + }); round.set(new_round); - dedup_rules.replace(HashMap::new()); + dedup_rules.replace(HashSet::new()); ppj_cache.replace(None); }; // Algorithm 1:11 { - if (d.is_leader)(instance, round.get(), process) { + if (d.is_leader)(LeaderRequest { + instance, + round: round.get(), + process, + }) { // Note round==1 at this point. broadcast_own_pre_prepare(vec![])?; // Empty justification since round==1 } - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + let timer = (d.new_timer)(round.get()); + timer_chan = timer.receive; + stop_timer = timer.stop; } loop { @@ -468,20 +471,24 @@ where recv(t.receive) -> result => { let msg = result?; - if let Some(v) = q_commit.as_ref() { - if !v.is_empty() { - if msg.source() != process && msg.type_() == MSG_ROUND_CHANGE { - // Algorithm 3:17 - broadcast_msg(MSG_DECIDED, &v[0].value(), Some(v))?; - } - - continue; + if let Some(v) = q_commit.as_ref() + && !v.is_empty() + { + if msg.source() != process && msg.type_() == MSG_ROUND_CHANGE { + // Algorithm 3:17 + broadcast_msg(MSG_DECIDED, &v[0].value(), Some(v))?; } + + continue; } // Drop unjust messages if !is_justified(d, instance, &msg, compare_failure_round) { - (d.log_unjust)(instance, process, msg); + (d.logger.unjust)(UnjustLog { + instance, + process, + msg, + }); continue; } @@ -494,7 +501,13 @@ where continue; } - (d.log_upon_rule)(instance, process, round.get(), &msg, rule); + (d.logger.upon_rule)(UponRuleLog { + instance, + process, + round: round.get(), + msg: &msg, + upon_rule: rule, + }); match rule { // Algorithm 2:1 @@ -502,7 +515,9 @@ where change_round(msg.round(), rule); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + let timer = (d.new_timer)(round.get()); + timer_chan = timer.receive; + stop_timer = timer.stop; let (new_input_value_source, compare_result) = compare( ct, @@ -526,10 +541,16 @@ where // might timeout in the meantime. If // this happens, we trigger round change. // Algorithm 3:1 - change_round(round.get() + 1, UPON_ROUND_TIMEOUT); + let next_round = round + .get() + .checked_add(1) + .expect("round permits increment"); + change_round(next_round, UPON_ROUND_TIMEOUT); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + let timer = (d.new_timer)(round.get()); + timer_chan = timer.receive; + stop_timer = timer.stop; broadcast_round_change()?; } @@ -562,7 +583,12 @@ where let justification = q_commit.as_ref() .expect("Rules `UPON_QUORUM_COMMITS` and `UPON_JUSTIFIED_DECIDED` always include a justification"); - (d.decide)(ct, instance, &msg.value(), justification); + (d.decide)(DecideRequest { + ct, + instance, + value: &msg.value(), + qcommit: justification, + }); } UPON_F_PLUS1_ROUND_CHANGES => { // Algorithm 3:5 @@ -578,7 +604,9 @@ where ); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + let timer = (d.new_timer)(round.get()); + timer_chan = timer.receive; + stop_timer = timer.stop; broadcast_round_change()?; } @@ -606,10 +634,16 @@ where recv(timer_chan) -> result => { result?; - change_round(round.get() + 1, UPON_ROUND_TIMEOUT); + let next_round = round + .get() + .checked_add(1) + .expect("round permits increment"); + change_round(next_round, UPON_ROUND_TIMEOUT); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + let timer = (d.new_timer)(round.get()); + timer_chan = timer.receive; + stop_timer = timer.stop; broadcast_round_change()?; } @@ -623,10 +657,7 @@ where } } -fn validate_definition(d: &Definition) -> Result<()> -where - V: PartialEq, -{ +fn validate_definition(d: &Definition) -> Result<()> { if d.nodes <= 0 { return Err(QbftError::InvalidNodes { nodes: d.nodes }); } @@ -643,21 +674,16 @@ where /// The callback may cache the local input source and return success/failure. /// This helper only preserves that callback result and lets the round timer win /// if the callback blocks. -fn compare( +fn compare( ct: &CancellationToken, - d: &Definition, - msg: &Msg, - input_value_source_ch: &mpmc::Receiver, - input_value_source: C, + d: &Definition, + msg: &Msg, + input_value_source_ch: &mpmc::Receiver, + input_value_source: T::Compare, timer_chan: &mpmc::Receiver, -) -> (C, Result<()>) -where - I: Send + Sync + 'static, - V: PartialEq + 'static, - C: Clone + Send + Sync + 'static, -{ +) -> (T::Compare, Result<()>) { let (compare_err_tx, mut compare_err_rx) = mpmc::bounded::>(1); - let (compare_value_tx, mut compare_value_rx) = mpmc::bounded::(1); + let (compare_value_tx, mut compare_value_rx) = mpmc::bounded::(1); // d.Compare has 2 roles: // 1. Read from the `input_value_source_ch` (if `input_value_source` is empty). @@ -680,14 +706,14 @@ where // caller-provided compare callback ignores cancellation and never reports, // it may outlive this call. thread::spawn(move || { - (compare)( - &compare_ct, - &msg, - &input_value_source_ch, - &input_value_source, - &compare_err_tx, - &compare_value_tx, - ); + (compare)(CompareRequest { + ct: &compare_ct, + qcommit: &msg, + input_value_source_ch: &input_value_source_ch, + input_value_source: &input_value_source, + return_err: &compare_err_tx, + return_value: &compare_value_tx, + }); }); loop { @@ -748,13 +774,10 @@ where } /// Returns all messages from the provided round. -fn extract_round_messages( - buffer: &HashMap>>, +fn extract_round_messages( + buffer: &HashMap>>, round: i64, -) -> Vec> -where - V: PartialEq, -{ +) -> Vec> { let mut resp = vec![]; for msgs in buffer.values() { @@ -770,17 +793,14 @@ where /// Returns the rule triggered upon receipt of the last message and its /// justifications. -fn classify( - d: &Definition, - instance: &I, +fn classify( + d: &Definition, + instance: &T::Instance, round: i64, process: i64, - buffer: &HashMap>>, - msg: &Msg, -) -> (UponRule, Option>>) -where - V: Eq + Hash + Default, -{ + buffer: &HashMap>>, + msg: &Msg, +) -> (UponRule, Option>>) { match msg.type_() { MSG_DECIDED => (UPON_JUSTIFIED_DECIDED, Some(msg.justification())), MSG_PRE_PREPARE => { @@ -799,7 +819,7 @@ where let prepares = filter_by_round_and_value(&flatten(buffer), MSG_PREPARE, msg.round(), msg.value()); - if prepares.len() as i64 >= d.quorum() { + if prepares.len() >= d.quorum_count() { (UPON_QUORUM_PREPARES, Some(prepares)) } else { (UPON_NOTHING, None) @@ -813,7 +833,7 @@ where let commits = filter_by_round_and_value(&flatten(buffer), MSG_COMMIT, msg.round(), msg.value()); - if commits.len() as i64 >= d.quorum() { + if commits.len() >= d.quorum_count() { (UPON_QUORUM_COMMITS, Some(commits)) } else { (UPON_NOTHING, None) @@ -839,7 +859,7 @@ where /* else msg.round() == round */ let qrc = filter_round_change(&all, msg.round()); - if (qrc.len() as i64) < d.quorum() { + if qrc.len() < d.quorum_count() { return (UPON_NOTHING, None); } @@ -847,7 +867,11 @@ where return (UPON_UNJUST_QUORUM_ROUND_CHANGES, None); }; - if !(d.is_leader)(instance, msg.round(), process) { + if !(d.is_leader)(LeaderRequest { + instance, + round: msg.round(), + process, + }) { return (UPON_NOTHING, None); } @@ -861,12 +885,9 @@ where /// Implements algorithm 3:6 and returns the next minimum round from received /// round change messages. -fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 -where - V: PartialEq, -{ +fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 { // Get all RoundChange messages with round (rj) higher than current round (ri) - if (frc.len() as i64) < d.faulty() + 1 { + if frc.len() < d.faulty_plus_one_count() { panic!("bug: Frc too short"); } @@ -889,15 +910,12 @@ where } /// Returns true if message is justified or if it does not need justification. -fn is_justified( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, -) -> bool -where - V: Eq + Hash + Default, -{ +) -> bool { match msg.type_() { MSG_PRE_PREPARE => is_justified_pre_prepare(d, instance, msg, compare_failure_round), MSG_PREPARE => true, @@ -910,10 +928,7 @@ where /// Returns true if the ROUND_CHANGE message's prepared round and value is /// justified. -fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool -where - V: PartialEq + Default, -{ +fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool { if msg.type_() != MSG_ROUND_CHANGE { panic!("bug: not a round change message"); } @@ -938,11 +953,11 @@ where // No need to check for all possible combinations, since justified should only // contain a one. - if (prepares.len() as i64) < d.quorum() { + if prepares.len() < d.quorum_count() { return false; } - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source(); for prepare in prepares { if !uniq(&prepare) { return false; @@ -964,20 +979,14 @@ where true } -fn valid_round_change_prepared_round(msg: &Msg) -> bool -where - V: PartialEq, -{ +fn valid_round_change_prepared_round(msg: &Msg) -> bool { let pr = msg.prepared_round(); pr >= 0 && pr < msg.round() } /// Returns true if the decided message is justified by quorum COMMIT messages /// of identical round and value. -fn is_justified_decided(d: &Definition, msg: &Msg) -> bool -where - V: PartialEq, -{ +fn is_justified_decided(d: &Definition, msg: &Msg) -> bool { if msg.type_() != MSG_DECIDED { panic!("bug: not a decided message"); } @@ -992,30 +1001,34 @@ where None, ); - (commits.len() as i64) >= d.quorum() + commits.len() >= d.quorum_count() } /// Returns true if the PRE-PREPARE message is justified. -fn is_justified_pre_prepare( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified_pre_prepare( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, -) -> bool -where - V: Eq + Hash + Default, -{ +) -> bool { if msg.type_() != MSG_PRE_PREPARE { panic!("bug: not a preprepare message"); } - if !(d.is_leader)(instance, msg.round(), msg.source()) { + if !(d.is_leader)(LeaderRequest { + instance, + round: msg.round(), + process: msg.source(), + }) { return false; } // Justified if PrePrepare is the first round OR if comparison failed previous // round. - if msg.round() == 1 || (msg.round() == compare_failure_round + 1) { + let next_compare_round = compare_failure_round + .checked_add(1) + .expect("compare failure round permits increment"); + if msg.round() == 1 || (msg.round() == next_compare_round) { return true; } @@ -1032,19 +1045,16 @@ where /// Implements algorithm 4:1 and returns true and pv if the messages contains a /// justified quorum ROUND_CHANGEs (Qrc). -fn contains_justified_qrc( - d: &Definition, - justification: &Vec>, +fn contains_justified_qrc( + d: &Definition, + justification: &Vec>, round: i64, -) -> Option -where - V: Eq + Hash + Default, -{ +) -> Option { let qrc = filter_round_change(justification, round) .into_iter() .filter(valid_round_change_prepared_round) .collect::>(); - if (qrc.len() as i64) < d.quorum() { + if qrc.len() < d.quorum_count() { return None; } @@ -1089,17 +1099,14 @@ where /// Extracts the single justified Pr and Pv from quorum PREPARES in list of /// messages. It expects only one possible combination. -fn get_single_justified_pr_pv( - d: &Definition, - msgs: &Vec>, -) -> Option<(i64, V)> -where - V: Eq + Hash + Default, -{ +fn get_single_justified_pr_pv( + d: &Definition, + msgs: &Vec>, +) -> Option<(i64, T::Value)> { let mut pr: i64 = 0; - let mut pv: V = Default::default(); - let mut count: i64 = 0; - let mut uniq = uniq_source::(vec![]); + let mut pv: T::Value = Default::default(); + let mut count: usize = 0; + let mut uniq = uniq_source(); for msg in msgs { if msg.type_() != MSG_PREPARE { @@ -1117,10 +1124,12 @@ where return None; } - count += 1; + count = count + .checked_add(1) + .expect("prepare count permits increment"); } - if count >= d.quorum() { + if count >= d.quorum_count() { Some((pr, pv)) } else { None @@ -1128,14 +1137,11 @@ where } /// Implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc) -fn get_justified_qrc( - d: &Definition, - all: &Vec>, +fn get_justified_qrc( + d: &Definition, + all: &Vec>, round: i64, -) -> Option>> -where - V: Eq + Hash + Default, -{ +) -> Option>> { if let (qrc, true) = quorum_null_prepared(d, all, round) { // Return any quorum null pv ROUND_CHANGE messages as Qrc. return Some(qrc); @@ -1149,11 +1155,11 @@ where for prepares in get_prepare_quorums(d, all) { // See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == // prepares.Round. - let mut qrc: Vec> = vec![]; + let mut qrc: Vec> = vec![]; let mut has_highest_prepared = false; let pr = prepares[0].round(); let pv = prepares[0].value(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source(); for rc in round_changes.iter() { if rc.prepared_round() > pr { @@ -1171,7 +1177,7 @@ where qrc.push(rc.clone()); } - if (qrc.len() as i64) >= d.quorum() && has_highest_prepared { + if qrc.len() >= d.quorum_count() && has_highest_prepared { qrc.extend(prepares); return Some(qrc); } @@ -1183,15 +1189,12 @@ where /// Returns true and Faulty+1 ROUND-CHANGE messages (Frc) with the rounds higher /// than the provided round. It returns the highest round per process in order /// to jump furthest. -fn get_fplus1_round_changes( - d: &Definition, - all: &Vec>, +fn get_fplus1_round_changes( + d: &Definition, + all: &Vec>, round: i64, -) -> Option>> -where - V: PartialEq, -{ - let mut highest_by_source = HashMap::>::new(); +) -> Option>> { + let mut highest_by_source = HashMap::>::new(); for msg in all { if msg.type_() != MSG_ROUND_CHANGE { @@ -1202,20 +1205,20 @@ where continue; } - if let Some(highest) = highest_by_source.get(&msg.source()) { - if highest.round() > msg.round() { - continue; - } + if let Some(highest) = highest_by_source.get(&msg.source()) + && highest.round() > msg.round() + { + continue; } highest_by_source.insert(msg.source(), msg.clone()); - if (highest_by_source.len() as i64) == d.faulty() + 1 { + if highest_by_source.len() == d.faulty_plus_one_count() { break; } } - if (highest_by_source.len() as i64) < d.faulty() + 1 { + if highest_by_source.len() < d.faulty_plus_one_count() { return None; } @@ -1236,14 +1239,8 @@ where /// Returns all unique-source PREPARE quorums grouped by identical round and /// value. -fn get_prepare_quorums( - d: &Definition, - all: &Vec>, -) -> Vec>> -where - V: Eq + Hash, -{ - let mut sets = HashMap::, HashMap>>::new(); +fn get_prepare_quorums(d: &Definition, all: &Vec>) -> Vec>> { + let mut sets = HashMap::, HashMap>>::new(); for msg in all { if msg.type_() != MSG_PREPARE { @@ -1263,16 +1260,11 @@ where let mut quorums = vec![]; for (_, msgs) in sets { - if (msgs.len() as i64) < d.quorum() { + if msgs.len() < d.quorum_count() { continue; } - let mut quorum = vec![]; - for (_, msg) in msgs { - quorum.push(msg); - } - - quorums.push(quorum); + quorums.push(msgs.into_values().collect()); } quorums @@ -1281,61 +1273,47 @@ where /// Implements condition J1 and returns Qrc and true if a quorum /// of round changes messages (Qrc) for the round have null prepared round and /// value. -fn quorum_null_prepared( - d: &Definition, - all: &Vec>, +fn quorum_null_prepared( + d: &Definition, + all: &Vec>, round: i64, -) -> (Vec>, bool) -where - V: PartialEq + Default, -{ +) -> (Vec>, bool) { let null_pr = Default::default(); let null_pv = Some(&Default::default()); let justification = filter_msgs(all, MSG_ROUND_CHANGE, round, None, Some(null_pr), null_pv); + let has_quorum = justification.len() >= d.quorum_count(); - ( - justification.clone(), - justification.len() as i64 >= d.quorum(), - ) + (justification, has_quorum) } /// Returns the messages matching the type and value. -fn filter_by_round_and_value( - msgs: &Vec>, +fn filter_by_round_and_value( + msgs: &Vec>, message_type: MessageType, round: i64, - value: V, -) -> Vec> -where - V: PartialEq, -{ + value: T::Value, +) -> Vec> { filter_msgs(msgs, message_type, round, Some(&value), None, None) } /// Returns all round change messages for the provided round. -fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> -where - V: PartialEq, -{ - filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) +fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> { + filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) } /// Returns one message per process matching the provided type and round and /// optional value, pr, pv. -fn filter_msgs( - msgs: &Vec>, +fn filter_msgs( + msgs: &Vec>, message_type: MessageType, round: i64, - value: Option<&V>, + value: Option<&T::Value>, pr: Option, - pv: Option<&V>, -) -> Vec> -where - V: PartialEq, -{ + pv: Option<&T::Value>, +) -> Vec> { let mut resp = Vec::new(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source(); for msg in msgs { if message_type != msg.type_() { @@ -1374,11 +1352,8 @@ where /// Produce a vector containing all the buffered messages as well as all their /// justifications. -fn flatten(buffer: &HashMap>>) -> Vec> -where - V: PartialEq, -{ - let mut resp: Vec> = Vec::new(); +fn flatten(buffer: &HashMap>>) -> Vec> { + let mut resp: Vec> = Vec::new(); for msgs in buffer.values() { for msg in msgs { @@ -1397,20 +1372,9 @@ where /// Construct a function that returns true if the message is from a unique /// source. -fn uniq_source(vec: Vec>) -> Box) -> bool> -where - V: PartialEq, -{ - let mut s = vec.iter().map(|msg| msg.source()).collect::>(); - Box::new(move |msg: &Msg| { - let source = msg.source(); - if s.contains(&source) { - false - } else { - s.insert(source); - true - } - }) +fn uniq_source() -> impl FnMut(&Msg) -> bool { + let mut sources = HashSet::new(); + move |msg: &Msg| sources.insert(msg.source()) } #[cfg(test)]