From 32bffcf937bbc4ab04eca6e67e9b32555c13cabf Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 16:26:22 +0200 Subject: [PATCH 01/12] Add the QbftTypes trait --- crates/core/src/qbft/mod.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 41d0a2d5..95a957e8 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -33,6 +33,20 @@ use std::{ type Result = std::result::Result; +pub trait QbftTypes { + type Instance; + type Value: PartialEq; + type Compare; +} + +pub struct I64Qbft; + +impl QbftTypes for I64Qbft { + type Compare = i64; + type Instance = i64; + type Value = i64; +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] From e2706b5ff5dfc855bca5461410b8125cb47b1f76 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:06:11 +0200 Subject: [PATCH 02/12] Use I64Qbft type --- crates/core/src/qbft/internal_test.rs | 39 ++-- crates/core/src/qbft/mod.rs | 306 +++++++++++++------------- 2 files changed, 175 insertions(+), 170 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index b7770251..b703d656 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -39,15 +39,10 @@ fn test_qbft(test: Test) { let clock = FakeClock::new(start_time); let cts = CancellationTokenSource::new(); - let mut receives = HashMap::< - i64, - ( - mpmc::Sender>, - mpmc::Receiver>, - ), - >::new(); - let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); - let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); + let mut receives = + HashMap::>, mpmc::Receiver>)>::new(); + let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); + let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); let (run_chan_tx, run_chan_rx) = mpmc::bounded::>(N); let is_leader = Box::new(make_is_leader(N as i64)); @@ -115,7 +110,7 @@ fn test_qbft(test: Test) { thread::scope(|s| { for i in 1..=N as i64 { - let (sender, receiver) = mpmc::bounded::>(1000); + let (sender, receiver) = mpmc::bounded::>(1000); let broadcast_tx = broadcast_tx.clone(); receives.insert(i, (sender.clone(), receiver.clone())); @@ -226,7 +221,7 @@ fn test_qbft(test: Test) { }); } - let mut results = HashMap::>::new(); + let mut results = HashMap::>::new(); let mut count = 0; let mut decided = false; let mut done = 0; @@ -331,8 +326,8 @@ fn new_msg( value_source: i64, pr: i64, pv: i64, - justify: Option<&Vec>>, -) -> Msg { + justify: Option<&Vec>>, +) -> Msg { let msgs = match justify { None => vec![], Some(justify) => justify @@ -365,8 +360,8 @@ fn new_msg( // Delays the message broadcast by between 1x and 2x jitter_ms and drops // messages. fn bcast( - broadcast: mpmc::Sender>, - msg: Msg, + broadcast: mpmc::Sender>, + msg: Msg, jitter_ms: i32, clock: FakeClock, ) { @@ -406,7 +401,7 @@ struct TestMsg { justify: Option>, } -impl SomeMsg for TestMsg { +impl SomeMsg for TestMsg { fn type_(&self) -> MessageType { self.msg_type } @@ -439,12 +434,12 @@ impl SomeMsg for TestMsg { self.pv } - fn justification(&self) -> Vec> { + fn justification(&self) -> Vec> { match self.justify { None => vec![], Some(ref j) => j .iter() - .map(|j| Arc::new(j.clone()) as Msg) + .map(|j| Arc::new(j.clone()) as Msg) .collect(), } } @@ -697,7 +692,7 @@ fn drop_30_percent_const() { }); } -fn noop_definition() -> Definition { +fn noop_definition() -> Definition { Definition { is_leader: Box::new(|_, _, _| false), new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), @@ -711,7 +706,7 @@ fn noop_definition() -> Definition { } } -fn noop_transport() -> Transport { +fn noop_transport() -> Transport { Transport { broadcast: Box::new(|_, _, _, _, _, _, _, _, _| Ok(())), receive: mpmc::never(), @@ -727,7 +722,7 @@ fn duplicate_pre_prepare_rules() { const NO_LEADER: i64 = 1; const LEADER: i64 = 2; - let new_preprepare = |round: i64| -> Msg { + let new_preprepare = |round: i64| -> Msg { new_msg( MSG_PRE_PREPARE, 0, @@ -764,7 +759,7 @@ fn duplicate_pre_prepare_rules() { _ = return_err.send(Ok(())); }); - let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); + let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); r_chan_tx.send(new_preprepare(1)).expect(WRITE_CHAN_ERR); r_chan_tx.send(new_preprepare(2)).expect(WRITE_CHAN_ERR); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 95a957e8..8cd958ff 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -66,9 +66,9 @@ pub enum QbftError { } /// Abstracts the transport layer between processes in the consensus system. -pub struct Transport +pub struct Transport where - V: PartialEq, + T: QbftTypes, { /// Broadcast sends a message with the provided fields to all other /// processes in the system (including this process). @@ -78,13 +78,13 @@ where dyn Fn( /* ct */ &CancellationToken, /* type_ */ MessageType, - /* instance */ &I, + /* instance */ &T::Instance, /* source */ i64, /* round */ i64, - /* value */ &V, + /* value */ &T::Value, /* pr */ i64, - /* pv */ &V, - /* justification */ Option<&Vec>>, + /* pv */ &T::Value, + /* justification */ Option<&Vec>>, ) -> Result<()> + Send + Sync, @@ -92,19 +92,26 @@ where /// Receive returns a stream of messages received /// from other processes in the system (including this process). - pub receive: mpmc::Receiver>, + pub receive: mpmc::Receiver>, } /// Defines the consensus system parameters that are external to the qbft /// algorithm. This remains constant across multiple instances of consensus /// (calls to `run`). -pub struct Definition +pub struct Definition where - V: PartialEq, + T: QbftTypes, { /// A deterministic leader election function. - pub is_leader: - Box bool + Send + Sync>, + pub is_leader: Box< + dyn Fn( + /* instance */ &T::Instance, + /* round */ i64, + /* process */ i64, + ) -> bool + + Send + + Sync, + >, /// Returns a new timer channel and stop function for the round pub new_timer: Box< @@ -119,11 +126,11 @@ where pub compare: Box< dyn Fn( /* ct */ &CancellationToken, - /* qcommit */ &Msg, - /* input_value_source_ch */ &mpmc::Receiver, - /* input_value_source */ &C, + /* qcommit */ &Msg, + /* input_value_source_ch */ &mpmc::Receiver, + /* input_value_source */ &T::Compare, /* return_err */ &mpmc::Sender>, - /* return_value */ &mpmc::Sender, + /* return_value */ &mpmc::Sender, ) + Send + Sync, >, @@ -132,9 +139,9 @@ where pub decide: Box< dyn Fn( /* ct */ &CancellationToken, - /* instance */ &I, - /* value */ &V, - /* qcommit */ &Vec>, + /* instance */ &T::Instance, + /* value */ &T::Value, + /* qcommit */ &Vec>, ) + Send + Sync, >, @@ -143,10 +150,10 @@ where /// It includes the rule that triggered it and all received round messages. pub log_upon_rule: Box< dyn Fn( - /* instance */ &I, + /* instance */ &T::Instance, /* process */ i64, /* round */ i64, - /* msg */ &Msg, + /* msg */ &Msg, /* upon_rule */ UponRule, ) + Send + Sync, @@ -154,19 +161,19 @@ where /// Allows debug logging of round changes. pub log_round_change: Box< dyn Fn( - /* instance */ &I, + /* instance */ &T::Instance, /* process */ i64, /* round */ i64, /* new_round */ i64, /* upon_rule */ UponRule, - /* msgs */ &Vec>, + /* msgs */ &Vec>, ) + Send + Sync, >, /// Allows debug logging of unjust messages. pub log_unjust: - Box) + Send + Sync>, + Box) + Send + Sync>, /// Total number of nodes/processes participating in consensus. pub nodes: i64, @@ -175,9 +182,9 @@ where pub fifo_limit: i64, } -impl Definition +impl Definition where - V: PartialEq, + T: QbftTypes, { /// Quorum count for the system. /// See IBFT 2.0 paper for correct formula: @@ -229,35 +236,35 @@ impl Display for MessageType { } /// Defines the inter process messages. -pub trait SomeMsg: Send + Sync + fmt::Debug +pub trait SomeMsg: Send + Sync + fmt::Debug where - V: PartialEq, + T: QbftTypes, { /// Type of the message. fn type_(&self) -> MessageType; /// Consensus instance. - fn instance(&self) -> I; + fn instance(&self) -> T::Instance; /// Process that sent the message. fn source(&self) -> i64; /// The round the message pertains to. fn round(&self) -> i64; /// The value being proposed, usually a hash. - fn value(&self) -> V; + fn value(&self) -> T::Value; /// Usually the value that was hashed and is returned in `value`. - fn value_source(&self) -> Result; + fn value_source(&self) -> Result; /// The justified prepared round. fn prepared_round(&self) -> i64; /// The justified prepared value. - fn prepared_value(&self) -> V; + fn prepared_value(&self) -> T::Value; /// Set of messages that explicitly justifies this message. - fn justification(&self) -> Vec>; + fn justification(&self) -> Vec>; /// Cast as `Any` to allow downcasting. fn as_any(&self) -> &dyn any::Any; } /// Alias for any `Msg` implementation tracked by reference counting. -pub type Msg = sync::Arc>; +pub type Msg = sync::Arc>; /// Defines the event based rules that are triggered when messages are received. #[derive(PartialEq, Eq, Hash, Clone, Copy)] @@ -303,30 +310,31 @@ struct DedupKey { /// The generic type `V` is the arbitrary data value being proposed; it only /// requires an Equal method. The generic type `C` is the compare value, used to /// compare leader's proposed value with local value and can be anything. -pub fn run( +pub fn run( ct: &CancellationToken, - d: &Definition, - t: &Transport, - instance: &I, + d: &Definition, + t: &Transport, + instance: &T::Instance, process: i64, - mut input_value_ch: mpmc::Receiver, - input_value_source_ch: mpmc::Receiver, + mut input_value_ch: mpmc::Receiver, + input_value_source_ch: mpmc::Receiver, ) -> Result<()> where - V: PartialEq + Eq + Hash + Default, - C: Clone + Send + Sync + Default, + T: QbftTypes, + T::Value: PartialEq + Eq + Hash + Default, + T::Compare: Clone + Send + Sync + Default, { // === State === let round: Cell = Cell::new(1); - let input_value: RefCell = RefCell::new(Default::default()); - let mut input_value_source: C = Default::default(); - let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). + let input_value: RefCell = RefCell::new(Default::default()); + let mut input_value_source: T::Compare = Default::default(); + let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). let prepared_round: Cell = Cell::new(0); - let prepared_value: RefCell = RefCell::new(Default::default()); + let prepared_value: RefCell = RefCell::new(Default::default()); let mut compare_failure_round: i64 = 0; - let prepared_justification: RefCell>>> = RefCell::new(None); - let mut q_commit: Option>> = None; - let buffer: RefCell>>> = RefCell::new(HashMap::new()); + let prepared_justification: RefCell>>> = RefCell::new(None); + let mut q_commit: Option>> = None; + let buffer: RefCell>>> = RefCell::new(HashMap::new()); let dedup_rules: RefCell> = RefCell::new(HashMap::new()); let mut timer_chan: mpmc::Receiver; let mut stop_timer: Box; @@ -335,7 +343,7 @@ where // Broadcasts a non-ROUND-CHANGE message for current round. let broadcast_msg = - |type_: MessageType, value: &V, justification: Option<&Vec>>| { + |type_: MessageType, value: &T::Value, justification: Option<&Vec>>| { (t.broadcast)( ct, type_, @@ -366,7 +374,7 @@ where // Broadcasts a PRE-PREPARE message with current state // and our own input value if present, otherwise it caches the justification // to be used when the input value becomes available. - let broadcast_own_pre_prepare = |justification: Vec>| { + let broadcast_own_pre_prepare = |justification: Vec>| { if ppj_cache.borrow().is_some() { panic!("bug: justification cache must be none") } @@ -381,7 +389,7 @@ where }; // Adds a message to each process' FIFO queue - let buffer_msg = |msg: &Msg| { + let buffer_msg = |msg: &Msg| { let mut b = buffer.borrow_mut(); let fifo = b.entry(msg.source()).or_default(); @@ -604,20 +612,21 @@ where Ok(()) } -fn compare( +fn compare( ct: &CancellationToken, - d: &Definition, - msg: &Msg, - input_value_source_ch: &mpmc::Receiver, - input_value_source: C, + d: &Definition, + msg: &Msg, + input_value_source_ch: &mpmc::Receiver, + input_value_source: T::Compare, timer_chan: &mpmc::Receiver, -) -> Result +) -> Result where - V: PartialEq, - C: Clone + Send + Sync, + T: QbftTypes, + T::Value: PartialEq, + T::Compare: Clone + Send + Sync, { let (compare_err_tx, compare_err_rx) = mpmc::bounded::>(1); - let (compare_value_tx, compare_value_rx) = mpmc::bounded::(1); + let (compare_value_tx, compare_value_rx) = mpmc::bounded::(1); // d.Compare has 2 roles: // 1. Read from the `input_value_source_ch` (if `input_value_source` is empty). @@ -672,12 +681,10 @@ where } /// Returns all messages from the provided round. -fn extract_round_messages( - buffer: &HashMap>>, - round: i64, -) -> Vec> +fn extract_round_messages(buffer: &HashMap>>, round: i64) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut resp = vec![]; @@ -694,16 +701,17 @@ where /// Returns the rule triggered upon receipt of the last message and its /// justifications. -fn classify( - d: &Definition, - instance: &I, +fn classify( + d: &Definition, + instance: &T::Instance, round: i64, process: i64, - buffer: &HashMap>>, - msg: &Msg, -) -> (UponRule, Option>>) + buffer: &HashMap>>, + msg: &Msg, +) -> (UponRule, Option>>) where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { match msg.type_() { MSG_DECIDED => (UPON_JUSTIFIED_DECIDED, Some(msg.justification())), @@ -785,9 +793,10 @@ where /// Implements algorithm 3:6 and returns the next minimum round from received /// round change messages. -fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 +fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { // Get all RoundChange messages with round (rj) higher than current round (ri) if (frc.len() as i64) < d.faulty() + 1 { @@ -813,14 +822,15 @@ where } /// Returns true if message is justified or if it does not need justification. -fn is_justified( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, ) -> bool where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { match msg.type_() { MSG_PRE_PREPARE => is_justified_pre_prepare(d, instance, msg, compare_failure_round), @@ -834,9 +844,10 @@ where /// Returns true if the ROUND_CHANGE message's prepared round and value is /// justified. -fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool +fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool where - V: PartialEq + Default, + T: QbftTypes, + T::Value: PartialEq + Default, { if msg.type_() != MSG_ROUND_CHANGE { panic!("bug: not a round change message"); @@ -859,7 +870,7 @@ where return false; } - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for prepare in prepares { if !uniq(&prepare) { return false; @@ -883,9 +894,10 @@ where /// Returns true if the decided message is justified by quorum COMMIT messages /// of identical round and value. -fn is_justified_decided(d: &Definition, msg: &Msg) -> bool +fn is_justified_decided(d: &Definition, msg: &Msg) -> bool where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { if msg.type_() != MSG_DECIDED { panic!("bug: not a decided message"); @@ -905,14 +917,15 @@ where } /// Returns true if the PRE-PREPARE message is justified. -fn is_justified_pre_prepare( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified_pre_prepare( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, ) -> bool where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { if msg.type_() != MSG_PRE_PREPARE { panic!("bug: not a preprepare message"); @@ -941,13 +954,14 @@ where /// Implements algorithm 4:1 and returns true and pv if the messages contains a /// justified quorum ROUND_CHANGEs (Qrc). -fn contains_justified_qrc( - d: &Definition, - justification: &Vec>, +fn contains_justified_qrc( + d: &Definition, + justification: &Vec>, round: i64, -) -> Option +) -> Option where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { let qrc = filter_round_change(justification, round); if (qrc.len() as i64) < d.quorum() { @@ -994,17 +1008,15 @@ where /// Extracts the single justified Pr and Pv from quorum PREPARES in list of /// messages. It expects only one possible combination. -fn get_single_justified_pr_pv( - d: &Definition, - msgs: &Vec>, -) -> Option<(i64, V)> +fn get_single_justified_pr_pv(d: &Definition, msgs: &Vec>) -> Option<(i64, T::Value)> where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { let mut pr: i64 = 0; - let mut pv: V = Default::default(); + let mut pv: T::Value = Default::default(); let mut count: i64 = 0; - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for msg in msgs { if msg.type_() != MSG_PREPARE { @@ -1033,13 +1045,10 @@ where } /// Implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc) -fn get_justified_qrc( - d: &Definition, - all: &Vec>, - round: i64, -) -> Option>> +fn get_justified_qrc(d: &Definition, all: &Vec>, round: i64) -> Option>> where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { if let (qrc, true) = quorum_null_prepared(d, all, round) { // Return any quorum null pv ROUND_CHANGE messages as Qrc. @@ -1051,11 +1060,11 @@ where for prepares in get_prepare_quorums(d, all) { // See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == // prepares.Round. - let mut qrc: Vec> = vec![]; + let mut qrc: Vec> = vec![]; let mut has_highest_prepared = false; let pr = prepares[0].round(); let pv = prepares[0].value(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for rc in round_changes.iter() { if rc.prepared_round() > pr { @@ -1085,15 +1094,16 @@ where /// Returns true and Faulty+1 ROUND-CHANGE messages (Frc) with the rounds higher /// than the provided round. It returns the highest round per process in order /// to jump furthest. -fn get_fplus1_round_changes( - d: &Definition, - all: &Vec>, +fn get_fplus1_round_changes( + d: &Definition, + all: &Vec>, round: i64, -) -> Option>> +) -> Option>> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - let mut highest_by_source = HashMap::>::new(); + let mut highest_by_source = HashMap::>::new(); for msg in all { if msg.type_() != MSG_ROUND_CHANGE { @@ -1136,14 +1146,12 @@ where value: V, } -fn get_prepare_quorums( - d: &Definition, - all: &Vec>, -) -> Vec>> +fn get_prepare_quorums(d: &Definition, all: &Vec>) -> Vec>> where - V: Eq + Hash, + T: QbftTypes, + T::Value: Eq + Hash, { - let mut sets = HashMap::, HashMap>>::new(); + let mut sets = HashMap::, HashMap>>::new(); for msg in all { if msg.type_() != MSG_PREPARE { @@ -1181,13 +1189,10 @@ where /// Implements condition J1 and returns Qrc and true if a quorum /// of round changes messages (Qrc) for the round have null prepared round and /// value. -fn quorum_null_prepared( - d: &Definition, - all: &Vec>, - round: i64, -) -> (Vec>, bool) +fn quorum_null_prepared(d: &Definition, all: &Vec>, round: i64) -> (Vec>, bool) where - V: PartialEq + Default, + T: QbftTypes, + T::Value: PartialEq + Default, { let null_pr = Default::default(); let null_pv = Some(&Default::default()); @@ -1201,41 +1206,44 @@ where } /// Returns the messages matching the type and value. -fn filter_by_round_and_value( - msgs: &Vec>, +fn filter_by_round_and_value( + msgs: &Vec>, message_type: MessageType, round: i64, - value: V, -) -> Vec> + value: T::Value, +) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { filter_msgs(msgs, message_type, round, Some(&value), None, None) } /// Returns all round change messages for the provided round. -fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> +fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) + filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) } /// Returns one message per process matching the provided type and round and /// optional value, pr, pv. -fn filter_msgs( - msgs: &Vec>, +fn filter_msgs( + msgs: &Vec>, message_type: MessageType, round: i64, - value: Option<&V>, + value: Option<&T::Value>, pr: Option, - pv: Option<&V>, -) -> Vec> + pv: Option<&T::Value>, +) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut resp = Vec::new(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for msg in msgs { if message_type != msg.type_() { @@ -1274,11 +1282,12 @@ where /// Produce a vector containing all the buffered messages as well as all their /// justifications. -fn flatten(buffer: &HashMap>>) -> Vec> +fn flatten(buffer: &HashMap>>) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - let mut resp: Vec> = Vec::new(); + let mut resp: Vec> = Vec::new(); for msgs in buffer.values() { for msg in msgs { @@ -1297,12 +1306,13 @@ where /// Construct a function that returns true if the message is from a unique /// source. -fn uniq_source(vec: Vec>) -> Box) -> bool> +fn uniq_source(vec: Vec>) -> Box) -> bool> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut s = vec.iter().map(|msg| msg.source()).collect::>(); - Box::new(move |msg: &Msg| { + Box::new(move |msg: &Msg| { let source = msg.source(); if s.contains(&source) { false From 34e4344364c63d02231778f0b748d8fd10a0ee35 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:13:40 +0200 Subject: [PATCH 03/12] Add the Decider trait --- crates/core/src/qbft/internal_test.rs | 6 +-- crates/core/src/qbft/mod.rs | 65 ++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index b703d656..b4092c64 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -65,9 +65,9 @@ fn test_qbft(test: Test) { }, decide: { let result_chan_tx = result_chan_tx.clone(); - Box::new(move |_, _, _, q_commit| { + Box::new(FnDecider::new(Box::new(move |_, _, _, q_commit| { result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); - }) + }))) }, compare: Box::new(|_, _, _, _, return_err, _| { return_err.send(Ok(())).expect(WRITE_CHAN_ERR); @@ -696,7 +696,7 @@ fn noop_definition() -> Definition { Definition { is_leader: Box::new(|_, _, _| false), new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), - decide: Box::new(|_, _, _, _| {}), + decide: Box::new(FnDecider::new(Box::new(|_, _, _, _| {}))), compare: Box::new(|_, _, _, _, _, _| {}), nodes: 0, fifo_limit: 0, diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 8cd958ff..5164ecb2 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -28,6 +28,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Display}, hash::Hash, + marker::PhantomData, sync, thread, time, }; @@ -47,6 +48,58 @@ impl QbftTypes for I64Qbft { type Value = i64; } +pub trait Decider: Send + Sync +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ); +} + +pub struct FnDecider +where + T: QbftTypes, +{ + decide: Box>) + Send + Sync>, + _marker: PhantomData T>, +} + +impl FnDecider +where + T: QbftTypes, +{ + pub fn new( + decide: Box< + dyn Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync, + >, + ) -> Self { + Self { + decide, + _marker: PhantomData, + } + } +} + +impl Decider for FnDecider +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ) { + (self.decide)(ct, instance, value, qcommit); + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -136,15 +189,7 @@ where >, /// Called when consensus has been reached on a value. - pub decide: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* instance */ &T::Instance, - /* value */ &T::Value, - /* qcommit */ &Vec>, - ) + Send - + Sync, - >, + pub decide: Box>, /// Allows debug logging of triggered upon rules on message receipt. /// It includes the rule that triggered it and all received round messages. @@ -549,7 +594,7 @@ where let justification = q_commit.as_ref() .expect("Rules `UPON_QUORUM_COMMITS` and `UPON_JUSTIFIED_DECIDED` always include a justification"); - (d.decide)(ct, instance, &msg.value(), justification); + d.decide.decide(ct, instance, &msg.value(), justification); } UPON_F_PLUS1_ROUND_CHANGES => { // Algorithm 3:5 From 89b806969f3ef37b0e9e6cf17b5a5032a310f128 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:22:40 +0200 Subject: [PATCH 04/12] Add DeciderFn --- crates/core/src/qbft/internal_test.rs | 6 +++--- crates/core/src/qbft/mod.rs | 17 ++++++++--------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index b4092c64..c544238e 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -65,9 +65,9 @@ fn test_qbft(test: Test) { }, decide: { let result_chan_tx = result_chan_tx.clone(); - Box::new(FnDecider::new(Box::new(move |_, _, _, q_commit| { + Box::new(DeciderFn::new(move |_, _, _, q_commit| { result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); - }))) + })) }, compare: Box::new(|_, _, _, _, return_err, _| { return_err.send(Ok(())).expect(WRITE_CHAN_ERR); @@ -696,7 +696,7 @@ fn noop_definition() -> Definition { Definition { is_leader: Box::new(|_, _, _| false), new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), - decide: Box::new(FnDecider::new(Box::new(|_, _, _, _| {}))), + decide: Box::new(DeciderFn::new(|_, _, _, _| {})), compare: Box::new(|_, _, _, _, _, _| {}), nodes: 0, fifo_limit: 0, diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 5164ecb2..39b71cad 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -61,7 +61,7 @@ where ); } -pub struct FnDecider +pub struct DeciderFn where T: QbftTypes, { @@ -69,23 +69,22 @@ where _marker: PhantomData T>, } -impl FnDecider +impl DeciderFn where T: QbftTypes, { - pub fn new( - decide: Box< - dyn Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync, - >, - ) -> Self { + pub fn new(decide: F) -> Self + where + F: Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync + 'static, + { Self { - decide, + decide: Box::new(decide), _marker: PhantomData, } } } -impl Decider for FnDecider +impl Decider for DeciderFn where T: QbftTypes, { From dd7cad5970080510a24e0ed218f8912342f1cbd2 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:28:19 +0200 Subject: [PATCH 05/12] Add the leader selector trait --- crates/core/src/qbft/internal_test.rs | 8 ++-- crates/core/src/qbft/mod.rs | 55 +++++++++++++++++++++------ 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index c544238e..93b1f6b5 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -45,10 +45,10 @@ fn test_qbft(test: Test) { let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); let (run_chan_tx, run_chan_rx) = mpmc::bounded::>(N); - let is_leader = Box::new(make_is_leader(N as i64)); + let is_leader = make_is_leader(N as i64); let defs = Arc::new(Definition { - is_leader: is_leader.clone(), + is_leader: Box::new(LeaderSelectorFn::new(make_is_leader(N as i64))), new_timer: { let clock = clock.clone(); @@ -694,7 +694,7 @@ fn drop_30_percent_const() { fn noop_definition() -> Definition { Definition { - is_leader: Box::new(|_, _, _| false), + is_leader: Box::new(LeaderSelectorFn::new(|_, _, _| false)), new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), decide: Box::new(DeciderFn::new(|_, _, _, _| {})), compare: Box::new(|_, _, _, _, _, _| {}), @@ -738,7 +738,7 @@ fn duplicate_pre_prepare_rules() { }; let mut def = noop_definition(); - def.is_leader = Box::new(|_, _, process| process == LEADER); + def.is_leader = Box::new(LeaderSelectorFn::new(|_, _, process| process == LEADER)); def.log_upon_rule = Box::new(move |_, _, round, msg, upon_rule| { println!("UponRule: rule={} round={} ", upon_rule, msg.round()); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 39b71cad..4875e8fb 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -99,6 +99,45 @@ where } } +pub trait LeaderSelector: Send + Sync +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool; +} + +pub struct LeaderSelectorFn +where + T: QbftTypes, +{ + is_leader: Box bool + Send + Sync>, + _marker: PhantomData T>, +} + +impl LeaderSelectorFn +where + T: QbftTypes, +{ + pub fn new(is_leader: F) -> Self + where + F: Fn(&T::Instance, i64, i64) -> bool + Send + Sync + 'static, + { + Self { + is_leader: Box::new(is_leader), + _marker: PhantomData, + } + } +} + +impl LeaderSelector for LeaderSelectorFn +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool { + (self.is_leader)(instance, round, process) + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -155,15 +194,7 @@ where T: QbftTypes, { /// A deterministic leader election function. - pub is_leader: Box< - dyn Fn( - /* instance */ &T::Instance, - /* round */ i64, - /* process */ i64, - ) -> bool - + Send - + Sync, - >, + pub is_leader: Box>, /// Returns a new timer channel and stop function for the round pub new_timer: Box< @@ -472,7 +503,7 @@ where // Algorithm 1:11 { - if (d.is_leader)(instance, round.get(), process) { + if d.is_leader.is_leader(instance, round.get(), process) { // Note round==1 at this point. broadcast_own_pre_prepare(vec![])?; // Empty justification since round==1 } @@ -823,7 +854,7 @@ where return (UPON_UNJUST_QUORUM_ROUND_CHANGES, None); }; - if !(d.is_leader)(instance, msg.round(), process) { + if !d.is_leader.is_leader(instance, msg.round(), process) { return (UPON_NOTHING, None); } @@ -975,7 +1006,7 @@ where panic!("bug: not a preprepare message"); } - if !(d.is_leader)(instance, msg.round(), msg.source()) { + if !d.is_leader.is_leader(instance, msg.round(), msg.source()) { return false; } From f4db23d81382bfa04a5ad74edd2600ff3f67582c Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:33:59 +0200 Subject: [PATCH 06/12] Add QbftLogger --- crates/core/src/qbft/internal_test.rs | 103 ++++++++++---------- crates/core/src/qbft/mod.rs | 129 +++++++++++++++++++------- 2 files changed, 154 insertions(+), 78 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index 93b1f6b5..a3347cf6 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -74,38 +74,41 @@ fn test_qbft(test: Test) { }), nodes: N as i64, fifo_limit: FIFO_LIMIT as i64, - log_round_change: { - let clock = clock.clone(); - - Box::new(move |_, process, round, new_round, upon_rule, _| { - println!( - "{:?} - {}@{} change to {} ~= {}", - clock.elapsed(), - process, - round, - new_round, - upon_rule, - ); - }) - }, - log_unjust: Box::new(|_, _, msg| { - println!("Unjust: {:?}", msg); - }), - log_upon_rule: { - let clock = clock.clone(); - Box::new(move |_, process, round, msg, upon_rule| { - println!( - "{:?} {} => {}@{} -> {}@{} ~= {}", - clock.elapsed(), - msg.source(), - msg.type_(), - msg.round(), - process, - round, - upon_rule, - ); - }) - }, + logger: Box::new(QbftLoggerFn::new( + { + let clock = clock.clone(); + + move |_, process, round, msg, upon_rule| { + println!( + "{:?} {} => {}@{} -> {}@{} ~= {}", + clock.elapsed(), + msg.source(), + msg.type_(), + msg.round(), + process, + round, + upon_rule, + ); + } + }, + { + let clock = clock.clone(); + + move |_, process, round, new_round, upon_rule, _| { + println!( + "{:?} - {}@{} change to {} ~= {}", + clock.elapsed(), + process, + round, + new_round, + upon_rule, + ); + } + }, + |_, _, msg| { + println!("Unjust: {:?}", msg); + }, + )), }); thread::scope(|s| { @@ -700,9 +703,11 @@ fn noop_definition() -> Definition { compare: Box::new(|_, _, _, _, _, _| {}), nodes: 0, fifo_limit: 0, - log_round_change: Box::new(|_, _, _, _, _, _| {}), - log_unjust: Box::new(|_, _, _| {}), - log_upon_rule: Box::new(|_, _, _, _, _| {}), + logger: Box::new(QbftLoggerFn::new( + |_, _, _, _, _| {}, + |_, _, _, _, _, _| {}, + |_, _, _| {}, + )), } } @@ -739,22 +744,26 @@ fn duplicate_pre_prepare_rules() { let mut def = noop_definition(); def.is_leader = Box::new(LeaderSelectorFn::new(|_, _, process| process == LEADER)); - def.log_upon_rule = Box::new(move |_, _, round, msg, upon_rule| { - println!("UponRule: rule={} round={} ", upon_rule, msg.round()); + def.logger = Box::new(QbftLoggerFn::new( + move |_, _, round, msg, upon_rule| { + println!("UponRule: rule={} round={} ", upon_rule, msg.round()); - assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); + assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); - if msg.round() == 1 { - return; - } + if msg.round() == 1 { + return; + } - if msg.round() == 2 { - cts.cancel(); - return; - } + if msg.round() == 2 { + cts.cancel(); + return; + } - panic!("unexpected round {}", round); - }); + panic!("unexpected round {}", round); + }, + |_, _, _, _, _, _| {}, + |_, _, _| {}, + )); def.compare = Box::new(|_, _, _, _, return_err, _| { _ = return_err.send(Ok(())); }); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 4875e8fb..b472ee22 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -138,6 +138,99 @@ where } } +pub trait QbftLogger: Send + Sync +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ); + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ); + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg); +} + +type LogUponRuleFn = + Box::Instance, i64, i64, &Msg, UponRule) + Send + Sync>; +type LogRoundChangeFn = + Box::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync>; +type LogUnjustFn = Box::Instance, i64, Msg) + Send + Sync>; + +pub struct QbftLoggerFn +where + T: QbftTypes, +{ + log_upon_rule: LogUponRuleFn, + log_round_change: LogRoundChangeFn, + log_unjust: LogUnjustFn, + _marker: PhantomData T>, +} + +impl QbftLoggerFn +where + T: QbftTypes, +{ + pub fn new(log_upon_rule: F1, log_round_change: F2, log_unjust: F3) -> Self + where + F1: Fn(&T::Instance, i64, i64, &Msg, UponRule) + Send + Sync + 'static, + F2: Fn(&T::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync + 'static, + F3: Fn(&T::Instance, i64, Msg) + Send + Sync + 'static, + { + Self { + log_upon_rule: Box::new(log_upon_rule), + log_round_change: Box::new(log_round_change), + log_unjust: Box::new(log_unjust), + _marker: PhantomData, + } + } +} + +impl QbftLogger for QbftLoggerFn +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ) { + (self.log_upon_rule)(instance, process, round, msg, upon_rule); + } + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ) { + (self.log_round_change)(instance, process, round, new_round, upon_rule, msgs); + } + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg) { + (self.log_unjust)(instance, process, msg); + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -221,34 +314,8 @@ where /// Called when consensus has been reached on a value. pub decide: Box>, - /// Allows debug logging of triggered upon rules on message receipt. - /// It includes the rule that triggered it and all received round messages. - pub log_upon_rule: Box< - dyn Fn( - /* instance */ &T::Instance, - /* process */ i64, - /* round */ i64, - /* msg */ &Msg, - /* upon_rule */ UponRule, - ) + Send - + Sync, - >, - /// Allows debug logging of round changes. - pub log_round_change: Box< - dyn Fn( - /* instance */ &T::Instance, - /* process */ i64, - /* round */ i64, - /* new_round */ i64, - /* upon_rule */ UponRule, - /* msgs */ &Vec>, - ) + Send - + Sync, - >, - - /// Allows debug logging of unjust messages. - pub log_unjust: - Box) + Send + Sync>, + /// Allows debug logging of QBFT events. + pub logger: Box>, /// Total number of nodes/processes participating in consensus. pub nodes: i64, @@ -487,7 +554,7 @@ where return; } - (d.log_round_change)( + d.logger.log_round_change( instance, process, round.get(), @@ -546,7 +613,7 @@ where // Drop unjust messages if !is_justified(d, instance, &msg, compare_failure_round) { - (d.log_unjust)(instance, process, msg); + d.logger.log_unjust(instance, process, msg); continue; } @@ -559,7 +626,7 @@ where continue; } - (d.log_upon_rule)(instance, process, round.get(), &msg, rule); + d.logger.log_upon_rule(instance, process, round.get(), &msg, rule); match rule { // Algorithm 2:1 From 88326af93379757fa2c09b4d0bfac760d6471912 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:39:09 +0200 Subject: [PATCH 07/12] Add the Brodcaster trait --- crates/core/src/qbft/internal_test.rs | 6 +- crates/core/src/qbft/mod.rs | 121 ++++++++++++++++++++++---- 2 files changed, 107 insertions(+), 20 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index a3347cf6..1d231115 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -121,7 +121,7 @@ fn test_qbft(test: Test) { broadcast: { let clock = clock.clone(); - Box::new( + Box::new(BroadcasterFn::new( move |_, type_, instance, source, round, value, pr, pv, justification| { if round > MAX_ROUND as i64 { return Err(QbftError::MaxRoundReached); @@ -161,7 +161,7 @@ fn test_qbft(test: Test) { Ok(()) }, - ) + )) }, receive: receiver.clone(), }; @@ -713,7 +713,7 @@ fn noop_definition() -> Definition { fn noop_transport() -> Transport { Transport { - broadcast: Box::new(|_, _, _, _, _, _, _, _, _| Ok(())), + broadcast: Box::new(BroadcasterFn::new(|_, _, _, _, _, _, _, _, _| Ok(()))), receive: mpmc::never(), } } diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index b472ee22..e7238e13 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -231,6 +231,107 @@ where } } +pub trait Broadcaster: Send + Sync +where + T: QbftTypes, +{ + #[allow(clippy::too_many_arguments)] + fn broadcast( + &self, + ct: &CancellationToken, + type_: MessageType, + instance: &T::Instance, + source: i64, + round: i64, + value: &T::Value, + pr: i64, + pv: &T::Value, + justification: Option<&Vec>>, + ) -> Result<()>; +} + +type BroadcastFn = Box< + dyn Fn( + &CancellationToken, + MessageType, + &::Instance, + i64, + i64, + &::Value, + i64, + &::Value, + Option<&Vec>>, + ) -> Result<()> + + Send + + Sync, +>; + +pub struct BroadcasterFn +where + T: QbftTypes, +{ + broadcast: BroadcastFn, + _marker: PhantomData T>, +} + +impl BroadcasterFn +where + T: QbftTypes, +{ + pub fn new(broadcast: F) -> Self + where + F: Fn( + &CancellationToken, + MessageType, + &T::Instance, + i64, + i64, + &T::Value, + i64, + &T::Value, + Option<&Vec>>, + ) -> Result<()> + + Send + + Sync + + 'static, + { + Self { + broadcast: Box::new(broadcast), + _marker: PhantomData, + } + } +} + +impl Broadcaster for BroadcasterFn +where + T: QbftTypes, +{ + fn broadcast( + &self, + ct: &CancellationToken, + type_: MessageType, + instance: &T::Instance, + source: i64, + round: i64, + value: &T::Value, + pr: i64, + pv: &T::Value, + justification: Option<&Vec>>, + ) -> Result<()> { + (self.broadcast)( + ct, + type_, + instance, + source, + round, + value, + pr, + pv, + justification, + ) + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -258,21 +359,7 @@ where /// processes in the system (including this process). /// /// Note that an error exits the algorithm. - pub broadcast: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* type_ */ MessageType, - /* instance */ &T::Instance, - /* source */ i64, - /* round */ i64, - /* value */ &T::Value, - /* pr */ i64, - /* pv */ &T::Value, - /* justification */ Option<&Vec>>, - ) -> Result<()> - + Send - + Sync, - >, + pub broadcast: Box>, /// Receive returns a stream of messages received /// from other processes in the system (including this process). @@ -486,7 +573,7 @@ where // Broadcasts a non-ROUND-CHANGE message for current round. let broadcast_msg = |type_: MessageType, value: &T::Value, justification: Option<&Vec>>| { - (t.broadcast)( + t.broadcast.broadcast( ct, type_, instance, @@ -500,7 +587,7 @@ where }; // Broadcasts a ROUND-CHANGE message with current state. let broadcast_round_change = || { - (t.broadcast)( + t.broadcast.broadcast( ct, MSG_ROUND_CHANGE, instance, From ebc8422cf2ce11f5142b75b89f3eebac5078be62 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:47:02 +0200 Subject: [PATCH 08/12] Add BroadcastRequest --- crates/core/src/qbft/internal_test.rs | 83 ++++++++++--------- crates/core/src/qbft/mod.rs | 115 ++++++++------------------ 2 files changed, 79 insertions(+), 119 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index 1d231115..52c9fc73 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -121,47 +121,52 @@ fn test_qbft(test: Test) { broadcast: { let clock = clock.clone(); - Box::new(BroadcasterFn::new( - move |_, type_, instance, source, round, value, pr, pv, justification| { - if round > MAX_ROUND as i64 { - return Err(QbftError::MaxRoundReached); - } - - if type_ == MSG_COMMIT && round <= test.commits_after.into() { - println!( - "{:?} {} dropping commit for round {}", - clock.elapsed(), - source, - round - ); - return Ok(()); - } + Box::new(BroadcasterFn::new(move |request| { + if request.round > MAX_ROUND as i64 { + return Err(QbftError::MaxRoundReached); + } - println!("{:?} {} => {}@{}", clock.elapsed(), source, type_, round); - - let msg = new_msg( - type_, - *instance, - source, - round, - *value, - *value, - pr, - *pv, - justification, + if request.type_ == MSG_COMMIT && request.round <= test.commits_after.into() + { + println!( + "{:?} {} dropping commit for round {}", + clock.elapsed(), + request.source, + request.round ); - sender.send(msg.clone()).expect(WRITE_CHAN_ERR); - - bcast( - broadcast_tx.clone(), - msg.clone(), - test.bcast_jitter_ms, - clock.clone(), - ); // TODO: Add clock + return Ok(()); + } - Ok(()) - }, - )) + println!( + "{:?} {} => {}@{}", + clock.elapsed(), + request.source, + request.type_, + request.round + ); + + let msg = new_msg( + request.type_, + *request.instance, + request.source, + request.round, + *request.value, + *request.value, + request.prepared_round, + *request.prepared_value, + request.justification, + ); + sender.send(msg.clone()).expect(WRITE_CHAN_ERR); + + bcast( + broadcast_tx.clone(), + msg.clone(), + test.bcast_jitter_ms, + clock.clone(), + ); // TODO: Add clock + + Ok(()) + })) }, receive: receiver.clone(), }; @@ -713,7 +718,7 @@ fn noop_definition() -> Definition { fn noop_transport() -> Transport { Transport { - broadcast: Box::new(BroadcasterFn::new(|_, _, _, _, _, _, _, _, _| Ok(()))), + broadcast: Box::new(BroadcasterFn::new(|_| Ok(()))), receive: mpmc::never(), } } diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index e7238e13..431c44a6 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -231,40 +231,29 @@ where } } +pub struct BroadcastRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub type_: MessageType, + pub instance: &'a T::Instance, + pub source: i64, + pub round: i64, + pub value: &'a T::Value, + pub prepared_round: i64, + pub prepared_value: &'a T::Value, + pub justification: Option<&'a Vec>>, +} + pub trait Broadcaster: Send + Sync where T: QbftTypes, { - #[allow(clippy::too_many_arguments)] - fn broadcast( - &self, - ct: &CancellationToken, - type_: MessageType, - instance: &T::Instance, - source: i64, - round: i64, - value: &T::Value, - pr: i64, - pv: &T::Value, - justification: Option<&Vec>>, - ) -> Result<()>; + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()>; } -type BroadcastFn = Box< - dyn Fn( - &CancellationToken, - MessageType, - &::Instance, - i64, - i64, - &::Value, - i64, - &::Value, - Option<&Vec>>, - ) -> Result<()> - + Send - + Sync, ->; +type BroadcastFn = Box) -> Result<()> + Send + Sync>; pub struct BroadcasterFn where @@ -280,20 +269,7 @@ where { pub fn new(broadcast: F) -> Self where - F: Fn( - &CancellationToken, - MessageType, - &T::Instance, - i64, - i64, - &T::Value, - i64, - &T::Value, - Option<&Vec>>, - ) -> Result<()> - + Send - + Sync - + 'static, + F: Fn(BroadcastRequest<'_, T>) -> Result<()> + Send + Sync + 'static, { Self { broadcast: Box::new(broadcast), @@ -306,29 +282,8 @@ impl Broadcaster for BroadcasterFn where T: QbftTypes, { - fn broadcast( - &self, - ct: &CancellationToken, - type_: MessageType, - instance: &T::Instance, - source: i64, - round: i64, - value: &T::Value, - pr: i64, - pv: &T::Value, - justification: Option<&Vec>>, - ) -> Result<()> { - (self.broadcast)( - ct, - type_, - instance, - source, - round, - value, - pr, - pv, - justification, - ) + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()> { + (self.broadcast)(request) } } @@ -573,31 +528,31 @@ where // Broadcasts a non-ROUND-CHANGE message for current round. let broadcast_msg = |type_: MessageType, value: &T::Value, justification: Option<&Vec>>| { - t.broadcast.broadcast( + t.broadcast.broadcast(BroadcastRequest { ct, type_, instance, - process, - round.get(), + source: process, + round: round.get(), value, - 0, - &Default::default(), + prepared_round: 0, + prepared_value: &Default::default(), justification, - ) + }) }; // Broadcasts a ROUND-CHANGE message with current state. let broadcast_round_change = || { - t.broadcast.broadcast( + t.broadcast.broadcast(BroadcastRequest { ct, - MSG_ROUND_CHANGE, + type_: MSG_ROUND_CHANGE, instance, - process, - round.get(), - &Default::default(), - prepared_round.get(), - &prepared_value.borrow(), - prepared_justification.borrow().as_ref(), - ) + source: process, + round: round.get(), + value: &Default::default(), + prepared_round: prepared_round.get(), + prepared_value: &prepared_value.borrow(), + justification: prepared_justification.borrow().as_ref(), + }) }; // Broadcasts a PRE-PREPARE message with current state From 3caf3eb4fdb6ddbd973819b9139d88229e196846 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:51:04 +0200 Subject: [PATCH 09/12] Add TimerFactory --- crates/core/src/qbft/internal_test.rs | 10 +++--- crates/core/src/qbft/mod.rs | 51 +++++++++++++++++++++------ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index 52c9fc73..603f7495 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -49,10 +49,10 @@ fn test_qbft(test: Test) { let defs = Arc::new(Definition { is_leader: Box::new(LeaderSelectorFn::new(make_is_leader(N as i64))), - new_timer: { + new_timer: Box::new(TimerFactoryFn::new({ let clock = clock.clone(); - Box::new(move |round| { + move |round| { let d: Duration = if test.const_period { Duration::from_secs(1) } else { @@ -61,8 +61,8 @@ fn test_qbft(test: Test) { }; clock.new_timer(d) - }) - }, + } + })), decide: { let result_chan_tx = result_chan_tx.clone(); Box::new(DeciderFn::new(move |_, _, _, q_commit| { @@ -703,7 +703,7 @@ fn drop_30_percent_const() { fn noop_definition() -> Definition { Definition { is_leader: Box::new(LeaderSelectorFn::new(|_, _, _| false)), - new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), + new_timer: Box::new(TimerFactoryFn::new(|_| (mpmc::never(), Box::new(|| {})))), decide: Box::new(DeciderFn::new(|_, _, _, _| {})), compare: Box::new(|_, _, _, _, _, _| {}), nodes: 0, diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 431c44a6..1a0e2eb2 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -287,6 +287,41 @@ where } } +pub trait TimerFactory: Send + Sync { + fn new_timer(&self, round: i64) + -> (mpmc::Receiver, Box); +} + +type NewTimerFn = + Box (mpmc::Receiver, Box) + Send + Sync>; + +pub struct TimerFactoryFn { + new_timer: NewTimerFn, +} + +impl TimerFactoryFn { + pub fn new(new_timer: F) -> Self + where + F: Fn(i64) -> (mpmc::Receiver, Box) + + Send + + Sync + + 'static, + { + Self { + new_timer: Box::new(new_timer), + } + } +} + +impl TimerFactory for TimerFactoryFn { + fn new_timer( + &self, + round: i64, + ) -> (mpmc::Receiver, Box) { + (self.new_timer)(round) + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -332,11 +367,7 @@ where pub is_leader: Box>, /// Returns a new timer channel and stop function for the round - pub new_timer: Box< - dyn Fn(/* round */ i64) -> (mpmc::Receiver, Box) - + Send - + Sync, - >, + pub new_timer: Box, /// Called when leader proposes value and we compare it with our local /// value. It's an opt-in feature that should instantly return `None` on @@ -617,7 +648,7 @@ where broadcast_own_pre_prepare(vec![])?; // Empty justification since round==1 } - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); } while !ct.is_canceled() { @@ -676,7 +707,7 @@ where change_round(msg.round(), rule); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); let compare_result = compare( ct, @@ -705,7 +736,7 @@ where change_round(round.get() + 1, UPON_ROUND_TIMEOUT); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } @@ -749,7 +780,7 @@ where ); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } @@ -780,7 +811,7 @@ where change_round(round.get() + 1, UPON_ROUND_TIMEOUT); stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } From 1ce1cc01115bd112fc85e04cb0ea943f0bda5669 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 18:55:48 +0200 Subject: [PATCH 10/12] Add TimerStopper --- crates/core/src/qbft/fake_clock.rs | 10 +++--- crates/core/src/qbft/internal_test.rs | 6 ++-- crates/core/src/qbft/mod.rs | 49 +++++++++++++++++++-------- 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/crates/core/src/qbft/fake_clock.rs b/crates/core/src/qbft/fake_clock.rs index 04e143f5..4694b889 100644 --- a/crates/core/src/qbft/fake_clock.rs +++ b/crates/core/src/qbft/fake_clock.rs @@ -1,3 +1,4 @@ +use super::{TimerStopper, TimerStopperFn}; use crossbeam::channel as mpmc; use std::{ collections::HashMap, @@ -33,10 +34,7 @@ impl FakeClock { pub fn new_timer( &self, duration: Duration, - ) -> ( - mpmc::Receiver, - Box, - ) { + ) -> (mpmc::Receiver, Box) { let (tx, rx) = mpmc::bounded::(1); let client_id = { @@ -51,10 +49,10 @@ impl FakeClock { }; let inner = Arc::clone(&self.inner); - let cancel = Box::new(move || { + let cancel = Box::new(TimerStopperFn::new(move || { let mut inner = inner.lock().unwrap(); inner.clients.remove(&client_id); - }); + })); (rx, cancel) } diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index 603f7495..eb09b77e 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -203,7 +203,7 @@ fn test_qbft(test: Test) { _ = delay_ch.recv(); _ = v_chan_tx.send(i); - cancel(); + cancel.stop(); }); } else if decide_round != 1 { s.spawn(move || { @@ -703,7 +703,9 @@ fn drop_30_percent_const() { fn noop_definition() -> Definition { Definition { is_leader: Box::new(LeaderSelectorFn::new(|_, _, _| false)), - new_timer: Box::new(TimerFactoryFn::new(|_| (mpmc::never(), Box::new(|| {})))), + new_timer: Box::new(TimerFactoryFn::new(|_| { + (mpmc::never(), Box::new(TimerStopperFn::new(|| {}))) + })), decide: Box::new(DeciderFn::new(|_, _, _, _| {})), compare: Box::new(|_, _, _, _, _, _| {}), nodes: 0, diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 1a0e2eb2..a1387d59 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -287,13 +287,37 @@ where } } +pub trait TimerStopper: Send + Sync { + fn stop(&self); +} + +pub struct TimerStopperFn { + stop: Box, +} + +impl TimerStopperFn { + pub fn new(stop: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + Self { + stop: Box::new(stop), + } + } +} + +impl TimerStopper for TimerStopperFn { + fn stop(&self) { + (self.stop)(); + } +} + pub trait TimerFactory: Send + Sync { - fn new_timer(&self, round: i64) - -> (mpmc::Receiver, Box); + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box); } type NewTimerFn = - Box (mpmc::Receiver, Box) + Send + Sync>; + Box (mpmc::Receiver, Box) + Send + Sync>; pub struct TimerFactoryFn { new_timer: NewTimerFn, @@ -302,7 +326,7 @@ pub struct TimerFactoryFn { impl TimerFactoryFn { pub fn new(new_timer: F) -> Self where - F: Fn(i64) -> (mpmc::Receiver, Box) + F: Fn(i64) -> (mpmc::Receiver, Box) + Send + Sync + 'static, @@ -314,10 +338,7 @@ impl TimerFactoryFn { } impl TimerFactory for TimerFactoryFn { - fn new_timer( - &self, - round: i64, - ) -> (mpmc::Receiver, Box) { + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box) { (self.new_timer)(round) } } @@ -552,7 +573,7 @@ where let buffer: RefCell>>> = RefCell::new(HashMap::new()); let dedup_rules: RefCell> = RefCell::new(HashMap::new()); let mut timer_chan: mpmc::Receiver; - let mut stop_timer: Box; + let mut stop_timer: Box; // === Helpers == @@ -706,7 +727,7 @@ where UPON_JUSTIFIED_PRE_PREPARE => { change_round(msg.round(), rule); - stop_timer(); + stop_timer.stop(); (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); let compare_result = compare( @@ -734,7 +755,7 @@ where // this happens, we trigger round change. // Algorithm 3:1 change_round(round.get() + 1, UPON_ROUND_TIMEOUT); - stop_timer(); + stop_timer.stop(); (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); @@ -758,7 +779,7 @@ where // Algorithm 2:8 change_round(msg.round(), rule); q_commit = justification; - stop_timer(); + stop_timer.stop(); timer_chan = mpmc::never(); @@ -779,7 +800,7 @@ where rule, ); - stop_timer(); + stop_timer.stop(); (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; @@ -809,7 +830,7 @@ where result?; change_round(round.get() + 1, UPON_ROUND_TIMEOUT); - stop_timer(); + stop_timer.stop(); (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); From 64f92fc7926577f57c01d34a6326a3385e6f082b Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 19:00:17 +0200 Subject: [PATCH 11/12] Add the comparator --- crates/core/src/qbft/internal_test.rs | 14 ++--- crates/core/src/qbft/mod.rs | 73 ++++++++++++++++++++------- 2 files changed, 63 insertions(+), 24 deletions(-) diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index eb09b77e..fd5c17b5 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -69,9 +69,9 @@ fn test_qbft(test: Test) { result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); })) }, - compare: Box::new(|_, _, _, _, return_err, _| { - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); - }), + compare: Box::new(ComparatorFn::new(|request| { + request.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + })), nodes: N as i64, fifo_limit: FIFO_LIMIT as i64, logger: Box::new(QbftLoggerFn::new( @@ -707,7 +707,7 @@ fn noop_definition() -> Definition { (mpmc::never(), Box::new(TimerStopperFn::new(|| {}))) })), decide: Box::new(DeciderFn::new(|_, _, _, _| {})), - compare: Box::new(|_, _, _, _, _, _| {}), + compare: Box::new(ComparatorFn::new(|_| {})), nodes: 0, fifo_limit: 0, logger: Box::new(QbftLoggerFn::new( @@ -771,9 +771,9 @@ fn duplicate_pre_prepare_rules() { |_, _, _, _, _, _| {}, |_, _, _| {}, )); - def.compare = Box::new(|_, _, _, _, return_err, _| { - _ = return_err.send(Ok(())); - }); + def.compare = Box::new(ComparatorFn::new(|request| { + _ = request.return_err.send(Ok(())); + })); let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); r_chan_tx.send(new_preprepare(1)).expect(WRITE_CHAN_ERR); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index a1387d59..70fb338f 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -343,6 +343,55 @@ impl TimerFactory for TimerFactoryFn { } } +pub struct CompareRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub qcommit: &'a Msg, + pub input_value_source_ch: &'a mpmc::Receiver, + pub input_value_source: &'a T::Compare, + pub return_err: &'a mpmc::Sender>, + pub return_value: &'a mpmc::Sender, +} + +pub trait Comparator: Send + Sync +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>); +} + +pub struct ComparatorFn +where + T: QbftTypes, +{ + compare: Box) + Send + Sync>, +} + +impl ComparatorFn +where + T: QbftTypes, +{ + pub fn new(compare: F) -> Self + where + F: Fn(CompareRequest<'_, T>) + Send + Sync + 'static, + { + Self { + compare: Box::new(compare), + } + } +} + +impl Comparator for ComparatorFn +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>) { + (self.compare)(request); + } +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -393,17 +442,7 @@ where /// Called when leader proposes value and we compare it with our local /// value. It's an opt-in feature that should instantly return `None` on /// `return_err` channel if it is not turned on. - pub compare: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* qcommit */ &Msg, - /* input_value_source_ch */ &mpmc::Receiver, - /* input_value_source */ &T::Compare, - /* return_err */ &mpmc::Sender>, - /* return_value */ &mpmc::Sender, - ) + Send - + Sync, - >, + pub compare: Box>, /// Called when consensus has been reached on a value. pub decide: Box>, @@ -879,14 +918,14 @@ where let compare = &d.compare; s.spawn(move || { - (compare)( + compare.compare(CompareRequest { ct, - msg, + qcommit: msg, input_value_source_ch, - &input_value_source, - &compare_err_tx, - &compare_value_tx, - ); + input_value_source: &input_value_source, + return_err: &compare_err_tx, + return_value: &compare_value_tx, + }); }); loop { From 6bf39f602f821417a2890037729e0c49789a52b8 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Thu, 7 May 2026 19:11:32 +0200 Subject: [PATCH 12/12] Add the callbacks module --- crates/core/src/qbft/callbacks.rs | 348 +++++++++++++++++++++++++++++ crates/core/src/qbft/mod.rs | 353 +----------------------------- 2 files changed, 356 insertions(+), 345 deletions(-) create mode 100644 crates/core/src/qbft/callbacks.rs diff --git a/crates/core/src/qbft/callbacks.rs b/crates/core/src/qbft/callbacks.rs new file mode 100644 index 00000000..4b23e49e --- /dev/null +++ b/crates/core/src/qbft/callbacks.rs @@ -0,0 +1,348 @@ +use super::{MessageType, Msg, QbftTypes, Result, UponRule}; +use cancellation::CancellationToken; +use crossbeam::channel as mpmc; +use std::{marker::PhantomData, time}; + +pub trait Decider: Send + Sync +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ); +} + +pub struct DeciderFn +where + T: QbftTypes, +{ + decide: Box>) + Send + Sync>, + _marker: PhantomData T>, +} + +impl DeciderFn +where + T: QbftTypes, +{ + pub fn new(decide: F) -> Self + where + F: Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync + 'static, + { + Self { + decide: Box::new(decide), + _marker: PhantomData, + } + } +} + +impl Decider for DeciderFn +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ) { + (self.decide)(ct, instance, value, qcommit); + } +} + +pub trait LeaderSelector: Send + Sync +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool; +} + +pub struct LeaderSelectorFn +where + T: QbftTypes, +{ + is_leader: Box bool + Send + Sync>, + _marker: PhantomData T>, +} + +impl LeaderSelectorFn +where + T: QbftTypes, +{ + pub fn new(is_leader: F) -> Self + where + F: Fn(&T::Instance, i64, i64) -> bool + Send + Sync + 'static, + { + Self { + is_leader: Box::new(is_leader), + _marker: PhantomData, + } + } +} + +impl LeaderSelector for LeaderSelectorFn +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool { + (self.is_leader)(instance, round, process) + } +} + +pub trait QbftLogger: Send + Sync +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ); + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ); + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg); +} + +type LogUponRuleFn = + Box::Instance, i64, i64, &Msg, UponRule) + Send + Sync>; +type LogRoundChangeFn = + Box::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync>; +type LogUnjustFn = Box::Instance, i64, Msg) + Send + Sync>; + +pub struct QbftLoggerFn +where + T: QbftTypes, +{ + log_upon_rule: LogUponRuleFn, + log_round_change: LogRoundChangeFn, + log_unjust: LogUnjustFn, + _marker: PhantomData T>, +} + +impl QbftLoggerFn +where + T: QbftTypes, +{ + pub fn new(log_upon_rule: F1, log_round_change: F2, log_unjust: F3) -> Self + where + F1: Fn(&T::Instance, i64, i64, &Msg, UponRule) + Send + Sync + 'static, + F2: Fn(&T::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync + 'static, + F3: Fn(&T::Instance, i64, Msg) + Send + Sync + 'static, + { + Self { + log_upon_rule: Box::new(log_upon_rule), + log_round_change: Box::new(log_round_change), + log_unjust: Box::new(log_unjust), + _marker: PhantomData, + } + } +} + +impl QbftLogger for QbftLoggerFn +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ) { + (self.log_upon_rule)(instance, process, round, msg, upon_rule); + } + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ) { + (self.log_round_change)(instance, process, round, new_round, upon_rule, msgs); + } + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg) { + (self.log_unjust)(instance, process, msg); + } +} + +pub struct BroadcastRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub type_: MessageType, + pub instance: &'a T::Instance, + pub source: i64, + pub round: i64, + pub value: &'a T::Value, + pub prepared_round: i64, + pub prepared_value: &'a T::Value, + pub justification: Option<&'a Vec>>, +} + +pub trait Broadcaster: Send + Sync +where + T: QbftTypes, +{ + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()>; +} + +type BroadcastFn = Box) -> Result<()> + Send + Sync>; + +pub struct BroadcasterFn +where + T: QbftTypes, +{ + broadcast: BroadcastFn, + _marker: PhantomData T>, +} + +impl BroadcasterFn +where + T: QbftTypes, +{ + pub fn new(broadcast: F) -> Self + where + F: Fn(BroadcastRequest<'_, T>) -> Result<()> + Send + Sync + 'static, + { + Self { + broadcast: Box::new(broadcast), + _marker: PhantomData, + } + } +} + +impl Broadcaster for BroadcasterFn +where + T: QbftTypes, +{ + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()> { + (self.broadcast)(request) + } +} + +pub trait TimerStopper: Send + Sync { + fn stop(&self); +} + +pub struct TimerStopperFn { + stop: Box, +} + +impl TimerStopperFn { + pub fn new(stop: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + Self { + stop: Box::new(stop), + } + } +} + +impl TimerStopper for TimerStopperFn { + fn stop(&self) { + (self.stop)(); + } +} + +pub trait TimerFactory: Send + Sync { + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box); +} + +type NewTimerFn = + Box (mpmc::Receiver, Box) + Send + Sync>; + +pub struct TimerFactoryFn { + new_timer: NewTimerFn, +} + +impl TimerFactoryFn { + pub fn new(new_timer: F) -> Self + where + F: Fn(i64) -> (mpmc::Receiver, Box) + + Send + + Sync + + 'static, + { + Self { + new_timer: Box::new(new_timer), + } + } +} + +impl TimerFactory for TimerFactoryFn { + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box) { + (self.new_timer)(round) + } +} + +pub struct CompareRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub qcommit: &'a Msg, + pub input_value_source_ch: &'a mpmc::Receiver, + pub input_value_source: &'a T::Compare, + pub return_err: &'a mpmc::Sender>, + pub return_value: &'a mpmc::Sender, +} + +pub trait Comparator: Send + Sync +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>); +} + +pub struct ComparatorFn +where + T: QbftTypes, +{ + compare: Box) + Send + Sync>, +} + +impl ComparatorFn +where + T: QbftTypes, +{ + pub fn new(compare: F) -> Self + where + F: Fn(CompareRequest<'_, T>) + Send + Sync + 'static, + { + Self { + compare: Box::new(compare), + } + } +} + +impl Comparator for ComparatorFn +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>) { + (self.compare)(request); + } +} diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 70fb338f..28962220 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -28,10 +28,17 @@ use std::{ collections::{HashMap, HashSet}, fmt::{self, Display}, hash::Hash, - marker::PhantomData, sync, thread, time, }; +mod callbacks; + +pub use callbacks::{ + BroadcastRequest, Broadcaster, BroadcasterFn, Comparator, ComparatorFn, CompareRequest, + Decider, DeciderFn, LeaderSelector, LeaderSelectorFn, QbftLogger, QbftLoggerFn, TimerFactory, + TimerFactoryFn, TimerStopper, TimerStopperFn, +}; + type Result = std::result::Result; pub trait QbftTypes { @@ -48,350 +55,6 @@ impl QbftTypes for I64Qbft { type Value = i64; } -pub trait Decider: Send + Sync -where - T: QbftTypes, -{ - fn decide( - &self, - ct: &CancellationToken, - instance: &T::Instance, - value: &T::Value, - qcommit: &Vec>, - ); -} - -pub struct DeciderFn -where - T: QbftTypes, -{ - decide: Box>) + Send + Sync>, - _marker: PhantomData T>, -} - -impl DeciderFn -where - T: QbftTypes, -{ - pub fn new(decide: F) -> Self - where - F: Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync + 'static, - { - Self { - decide: Box::new(decide), - _marker: PhantomData, - } - } -} - -impl Decider for DeciderFn -where - T: QbftTypes, -{ - fn decide( - &self, - ct: &CancellationToken, - instance: &T::Instance, - value: &T::Value, - qcommit: &Vec>, - ) { - (self.decide)(ct, instance, value, qcommit); - } -} - -pub trait LeaderSelector: Send + Sync -where - T: QbftTypes, -{ - fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool; -} - -pub struct LeaderSelectorFn -where - T: QbftTypes, -{ - is_leader: Box bool + Send + Sync>, - _marker: PhantomData T>, -} - -impl LeaderSelectorFn -where - T: QbftTypes, -{ - pub fn new(is_leader: F) -> Self - where - F: Fn(&T::Instance, i64, i64) -> bool + Send + Sync + 'static, - { - Self { - is_leader: Box::new(is_leader), - _marker: PhantomData, - } - } -} - -impl LeaderSelector for LeaderSelectorFn -where - T: QbftTypes, -{ - fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool { - (self.is_leader)(instance, round, process) - } -} - -pub trait QbftLogger: Send + Sync -where - T: QbftTypes, -{ - fn log_upon_rule( - &self, - instance: &T::Instance, - process: i64, - round: i64, - msg: &Msg, - upon_rule: UponRule, - ); - - fn log_round_change( - &self, - instance: &T::Instance, - process: i64, - round: i64, - new_round: i64, - upon_rule: UponRule, - msgs: &Vec>, - ); - - fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg); -} - -type LogUponRuleFn = - Box::Instance, i64, i64, &Msg, UponRule) + Send + Sync>; -type LogRoundChangeFn = - Box::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync>; -type LogUnjustFn = Box::Instance, i64, Msg) + Send + Sync>; - -pub struct QbftLoggerFn -where - T: QbftTypes, -{ - log_upon_rule: LogUponRuleFn, - log_round_change: LogRoundChangeFn, - log_unjust: LogUnjustFn, - _marker: PhantomData T>, -} - -impl QbftLoggerFn -where - T: QbftTypes, -{ - pub fn new(log_upon_rule: F1, log_round_change: F2, log_unjust: F3) -> Self - where - F1: Fn(&T::Instance, i64, i64, &Msg, UponRule) + Send + Sync + 'static, - F2: Fn(&T::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync + 'static, - F3: Fn(&T::Instance, i64, Msg) + Send + Sync + 'static, - { - Self { - log_upon_rule: Box::new(log_upon_rule), - log_round_change: Box::new(log_round_change), - log_unjust: Box::new(log_unjust), - _marker: PhantomData, - } - } -} - -impl QbftLogger for QbftLoggerFn -where - T: QbftTypes, -{ - fn log_upon_rule( - &self, - instance: &T::Instance, - process: i64, - round: i64, - msg: &Msg, - upon_rule: UponRule, - ) { - (self.log_upon_rule)(instance, process, round, msg, upon_rule); - } - - fn log_round_change( - &self, - instance: &T::Instance, - process: i64, - round: i64, - new_round: i64, - upon_rule: UponRule, - msgs: &Vec>, - ) { - (self.log_round_change)(instance, process, round, new_round, upon_rule, msgs); - } - - fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg) { - (self.log_unjust)(instance, process, msg); - } -} - -pub struct BroadcastRequest<'a, T> -where - T: QbftTypes, -{ - pub ct: &'a CancellationToken, - pub type_: MessageType, - pub instance: &'a T::Instance, - pub source: i64, - pub round: i64, - pub value: &'a T::Value, - pub prepared_round: i64, - pub prepared_value: &'a T::Value, - pub justification: Option<&'a Vec>>, -} - -pub trait Broadcaster: Send + Sync -where - T: QbftTypes, -{ - fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()>; -} - -type BroadcastFn = Box) -> Result<()> + Send + Sync>; - -pub struct BroadcasterFn -where - T: QbftTypes, -{ - broadcast: BroadcastFn, - _marker: PhantomData T>, -} - -impl BroadcasterFn -where - T: QbftTypes, -{ - pub fn new(broadcast: F) -> Self - where - F: Fn(BroadcastRequest<'_, T>) -> Result<()> + Send + Sync + 'static, - { - Self { - broadcast: Box::new(broadcast), - _marker: PhantomData, - } - } -} - -impl Broadcaster for BroadcasterFn -where - T: QbftTypes, -{ - fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()> { - (self.broadcast)(request) - } -} - -pub trait TimerStopper: Send + Sync { - fn stop(&self); -} - -pub struct TimerStopperFn { - stop: Box, -} - -impl TimerStopperFn { - pub fn new(stop: F) -> Self - where - F: Fn() + Send + Sync + 'static, - { - Self { - stop: Box::new(stop), - } - } -} - -impl TimerStopper for TimerStopperFn { - fn stop(&self) { - (self.stop)(); - } -} - -pub trait TimerFactory: Send + Sync { - fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box); -} - -type NewTimerFn = - Box (mpmc::Receiver, Box) + Send + Sync>; - -pub struct TimerFactoryFn { - new_timer: NewTimerFn, -} - -impl TimerFactoryFn { - pub fn new(new_timer: F) -> Self - where - F: Fn(i64) -> (mpmc::Receiver, Box) - + Send - + Sync - + 'static, - { - Self { - new_timer: Box::new(new_timer), - } - } -} - -impl TimerFactory for TimerFactoryFn { - fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box) { - (self.new_timer)(round) - } -} - -pub struct CompareRequest<'a, T> -where - T: QbftTypes, -{ - pub ct: &'a CancellationToken, - pub qcommit: &'a Msg, - pub input_value_source_ch: &'a mpmc::Receiver, - pub input_value_source: &'a T::Compare, - pub return_err: &'a mpmc::Sender>, - pub return_value: &'a mpmc::Sender, -} - -pub trait Comparator: Send + Sync -where - T: QbftTypes, -{ - fn compare(&self, request: CompareRequest<'_, T>); -} - -pub struct ComparatorFn -where - T: QbftTypes, -{ - compare: Box) + Send + Sync>, -} - -impl ComparatorFn -where - T: QbftTypes, -{ - pub fn new(compare: F) -> Self - where - F: Fn(CompareRequest<'_, T>) + Send + Sync + 'static, - { - Self { - compare: Box::new(compare), - } - } -} - -impl Comparator for ComparatorFn -where - T: QbftTypes, -{ - fn compare(&self, request: CompareRequest<'_, T>) { - (self.compare)(request); - } -} - #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")]