diff --git a/crates/core/src/qbft/callbacks.rs b/crates/core/src/qbft/callbacks.rs new file mode 100644 index 00000000..4b23e49e --- /dev/null +++ b/crates/core/src/qbft/callbacks.rs @@ -0,0 +1,348 @@ +use super::{MessageType, Msg, QbftTypes, Result, UponRule}; +use cancellation::CancellationToken; +use crossbeam::channel as mpmc; +use std::{marker::PhantomData, time}; + +pub trait Decider: Send + Sync +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ); +} + +pub struct DeciderFn +where + T: QbftTypes, +{ + decide: Box>) + Send + Sync>, + _marker: PhantomData T>, +} + +impl DeciderFn +where + T: QbftTypes, +{ + pub fn new(decide: F) -> Self + where + F: Fn(&CancellationToken, &T::Instance, &T::Value, &Vec>) + Send + Sync + 'static, + { + Self { + decide: Box::new(decide), + _marker: PhantomData, + } + } +} + +impl Decider for DeciderFn +where + T: QbftTypes, +{ + fn decide( + &self, + ct: &CancellationToken, + instance: &T::Instance, + value: &T::Value, + qcommit: &Vec>, + ) { + (self.decide)(ct, instance, value, qcommit); + } +} + +pub trait LeaderSelector: Send + Sync +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool; +} + +pub struct LeaderSelectorFn +where + T: QbftTypes, +{ + is_leader: Box bool + Send + Sync>, + _marker: PhantomData T>, +} + +impl LeaderSelectorFn +where + T: QbftTypes, +{ + pub fn new(is_leader: F) -> Self + where + F: Fn(&T::Instance, i64, i64) -> bool + Send + Sync + 'static, + { + Self { + is_leader: Box::new(is_leader), + _marker: PhantomData, + } + } +} + +impl LeaderSelector for LeaderSelectorFn +where + T: QbftTypes, +{ + fn is_leader(&self, instance: &T::Instance, round: i64, process: i64) -> bool { + (self.is_leader)(instance, round, process) + } +} + +pub trait QbftLogger: Send + Sync +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ); + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ); + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg); +} + +type LogUponRuleFn = + Box::Instance, i64, i64, &Msg, UponRule) + Send + Sync>; +type LogRoundChangeFn = + Box::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync>; +type LogUnjustFn = Box::Instance, i64, Msg) + Send + Sync>; + +pub struct QbftLoggerFn +where + T: QbftTypes, +{ + log_upon_rule: LogUponRuleFn, + log_round_change: LogRoundChangeFn, + log_unjust: LogUnjustFn, + _marker: PhantomData T>, +} + +impl QbftLoggerFn +where + T: QbftTypes, +{ + pub fn new(log_upon_rule: F1, log_round_change: F2, log_unjust: F3) -> Self + where + F1: Fn(&T::Instance, i64, i64, &Msg, UponRule) + Send + Sync + 'static, + F2: Fn(&T::Instance, i64, i64, i64, UponRule, &Vec>) + Send + Sync + 'static, + F3: Fn(&T::Instance, i64, Msg) + Send + Sync + 'static, + { + Self { + log_upon_rule: Box::new(log_upon_rule), + log_round_change: Box::new(log_round_change), + log_unjust: Box::new(log_unjust), + _marker: PhantomData, + } + } +} + +impl QbftLogger for QbftLoggerFn +where + T: QbftTypes, +{ + fn log_upon_rule( + &self, + instance: &T::Instance, + process: i64, + round: i64, + msg: &Msg, + upon_rule: UponRule, + ) { + (self.log_upon_rule)(instance, process, round, msg, upon_rule); + } + + fn log_round_change( + &self, + instance: &T::Instance, + process: i64, + round: i64, + new_round: i64, + upon_rule: UponRule, + msgs: &Vec>, + ) { + (self.log_round_change)(instance, process, round, new_round, upon_rule, msgs); + } + + fn log_unjust(&self, instance: &T::Instance, process: i64, msg: Msg) { + (self.log_unjust)(instance, process, msg); + } +} + +pub struct BroadcastRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub type_: MessageType, + pub instance: &'a T::Instance, + pub source: i64, + pub round: i64, + pub value: &'a T::Value, + pub prepared_round: i64, + pub prepared_value: &'a T::Value, + pub justification: Option<&'a Vec>>, +} + +pub trait Broadcaster: Send + Sync +where + T: QbftTypes, +{ + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()>; +} + +type BroadcastFn = Box) -> Result<()> + Send + Sync>; + +pub struct BroadcasterFn +where + T: QbftTypes, +{ + broadcast: BroadcastFn, + _marker: PhantomData T>, +} + +impl BroadcasterFn +where + T: QbftTypes, +{ + pub fn new(broadcast: F) -> Self + where + F: Fn(BroadcastRequest<'_, T>) -> Result<()> + Send + Sync + 'static, + { + Self { + broadcast: Box::new(broadcast), + _marker: PhantomData, + } + } +} + +impl Broadcaster for BroadcasterFn +where + T: QbftTypes, +{ + fn broadcast(&self, request: BroadcastRequest<'_, T>) -> Result<()> { + (self.broadcast)(request) + } +} + +pub trait TimerStopper: Send + Sync { + fn stop(&self); +} + +pub struct TimerStopperFn { + stop: Box, +} + +impl TimerStopperFn { + pub fn new(stop: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + Self { + stop: Box::new(stop), + } + } +} + +impl TimerStopper for TimerStopperFn { + fn stop(&self) { + (self.stop)(); + } +} + +pub trait TimerFactory: Send + Sync { + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box); +} + +type NewTimerFn = + Box (mpmc::Receiver, Box) + Send + Sync>; + +pub struct TimerFactoryFn { + new_timer: NewTimerFn, +} + +impl TimerFactoryFn { + pub fn new(new_timer: F) -> Self + where + F: Fn(i64) -> (mpmc::Receiver, Box) + + Send + + Sync + + 'static, + { + Self { + new_timer: Box::new(new_timer), + } + } +} + +impl TimerFactory for TimerFactoryFn { + fn new_timer(&self, round: i64) -> (mpmc::Receiver, Box) { + (self.new_timer)(round) + } +} + +pub struct CompareRequest<'a, T> +where + T: QbftTypes, +{ + pub ct: &'a CancellationToken, + pub qcommit: &'a Msg, + pub input_value_source_ch: &'a mpmc::Receiver, + pub input_value_source: &'a T::Compare, + pub return_err: &'a mpmc::Sender>, + pub return_value: &'a mpmc::Sender, +} + +pub trait Comparator: Send + Sync +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>); +} + +pub struct ComparatorFn +where + T: QbftTypes, +{ + compare: Box) + Send + Sync>, +} + +impl ComparatorFn +where + T: QbftTypes, +{ + pub fn new(compare: F) -> Self + where + F: Fn(CompareRequest<'_, T>) + Send + Sync + 'static, + { + Self { + compare: Box::new(compare), + } + } +} + +impl Comparator for ComparatorFn +where + T: QbftTypes, +{ + fn compare(&self, request: CompareRequest<'_, T>) { + (self.compare)(request); + } +} diff --git a/crates/core/src/qbft/fake_clock.rs b/crates/core/src/qbft/fake_clock.rs index 04e143f5..4694b889 100644 --- a/crates/core/src/qbft/fake_clock.rs +++ b/crates/core/src/qbft/fake_clock.rs @@ -1,3 +1,4 @@ +use super::{TimerStopper, TimerStopperFn}; use crossbeam::channel as mpmc; use std::{ collections::HashMap, @@ -33,10 +34,7 @@ impl FakeClock { pub fn new_timer( &self, duration: Duration, - ) -> ( - mpmc::Receiver, - Box, - ) { + ) -> (mpmc::Receiver, Box) { let (tx, rx) = mpmc::bounded::(1); let client_id = { @@ -51,10 +49,10 @@ impl FakeClock { }; let inner = Arc::clone(&self.inner); - let cancel = Box::new(move || { + let cancel = Box::new(TimerStopperFn::new(move || { let mut inner = inner.lock().unwrap(); inner.clients.remove(&client_id); - }); + })); (rx, cancel) } diff --git a/crates/core/src/qbft/internal_test.rs b/crates/core/src/qbft/internal_test.rs index b7770251..fd5c17b5 100644 --- a/crates/core/src/qbft/internal_test.rs +++ b/crates/core/src/qbft/internal_test.rs @@ -39,25 +39,20 @@ fn test_qbft(test: Test) { let clock = FakeClock::new(start_time); let cts = CancellationTokenSource::new(); - let mut receives = HashMap::< - i64, - ( - mpmc::Sender>, - mpmc::Receiver>, - ), - >::new(); - let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); - let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); + let mut receives = + HashMap::>, mpmc::Receiver>)>::new(); + let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); + let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); let (run_chan_tx, run_chan_rx) = mpmc::bounded::>(N); - let is_leader = Box::new(make_is_leader(N as i64)); + let is_leader = make_is_leader(N as i64); let defs = Arc::new(Definition { - is_leader: is_leader.clone(), - new_timer: { + is_leader: Box::new(LeaderSelectorFn::new(make_is_leader(N as i64))), + new_timer: Box::new(TimerFactoryFn::new({ let clock = clock.clone(); - Box::new(move |round| { + move |round| { let d: Duration = if test.const_period { Duration::from_secs(1) } else { @@ -66,56 +61,59 @@ fn test_qbft(test: Test) { }; clock.new_timer(d) - }) - }, + } + })), decide: { let result_chan_tx = result_chan_tx.clone(); - Box::new(move |_, _, _, q_commit| { + Box::new(DeciderFn::new(move |_, _, _, q_commit| { result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); - }) + })) }, - compare: Box::new(|_, _, _, _, return_err, _| { - return_err.send(Ok(())).expect(WRITE_CHAN_ERR); - }), + compare: Box::new(ComparatorFn::new(|request| { + request.return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + })), nodes: N as i64, fifo_limit: FIFO_LIMIT as i64, - log_round_change: { - let clock = clock.clone(); - - Box::new(move |_, process, round, new_round, upon_rule, _| { - println!( - "{:?} - {}@{} change to {} ~= {}", - clock.elapsed(), - process, - round, - new_round, - upon_rule, - ); - }) - }, - log_unjust: Box::new(|_, _, msg| { - println!("Unjust: {:?}", msg); - }), - log_upon_rule: { - let clock = clock.clone(); - Box::new(move |_, process, round, msg, upon_rule| { - println!( - "{:?} {} => {}@{} -> {}@{} ~= {}", - clock.elapsed(), - msg.source(), - msg.type_(), - msg.round(), - process, - round, - upon_rule, - ); - }) - }, + logger: Box::new(QbftLoggerFn::new( + { + let clock = clock.clone(); + + move |_, process, round, msg, upon_rule| { + println!( + "{:?} {} => {}@{} -> {}@{} ~= {}", + clock.elapsed(), + msg.source(), + msg.type_(), + msg.round(), + process, + round, + upon_rule, + ); + } + }, + { + let clock = clock.clone(); + + move |_, process, round, new_round, upon_rule, _| { + println!( + "{:?} - {}@{} change to {} ~= {}", + clock.elapsed(), + process, + round, + new_round, + upon_rule, + ); + } + }, + |_, _, msg| { + println!("Unjust: {:?}", msg); + }, + )), }); thread::scope(|s| { for i in 1..=N as i64 { - let (sender, receiver) = mpmc::bounded::>(1000); + let (sender, receiver) = mpmc::bounded::>(1000); let broadcast_tx = broadcast_tx.clone(); receives.insert(i, (sender.clone(), receiver.clone())); @@ -123,47 +121,52 @@ fn test_qbft(test: Test) { broadcast: { let clock = clock.clone(); - Box::new( - move |_, type_, instance, source, round, value, pr, pv, justification| { - if round > MAX_ROUND as i64 { - return Err(QbftError::MaxRoundReached); - } - - if type_ == MSG_COMMIT && round <= test.commits_after.into() { - println!( - "{:?} {} dropping commit for round {}", - clock.elapsed(), - source, - round - ); - return Ok(()); - } + Box::new(BroadcasterFn::new(move |request| { + if request.round > MAX_ROUND as i64 { + return Err(QbftError::MaxRoundReached); + } - println!("{:?} {} => {}@{}", clock.elapsed(), source, type_, round); - - let msg = new_msg( - type_, - *instance, - source, - round, - *value, - *value, - pr, - *pv, - justification, + if request.type_ == MSG_COMMIT && request.round <= test.commits_after.into() + { + println!( + "{:?} {} dropping commit for round {}", + clock.elapsed(), + request.source, + request.round ); - sender.send(msg.clone()).expect(WRITE_CHAN_ERR); - - bcast( - broadcast_tx.clone(), - msg.clone(), - test.bcast_jitter_ms, - clock.clone(), - ); // TODO: Add clock - - Ok(()) - }, - ) + return Ok(()); + } + + println!( + "{:?} {} => {}@{}", + clock.elapsed(), + request.source, + request.type_, + request.round + ); + + let msg = new_msg( + request.type_, + *request.instance, + request.source, + request.round, + *request.value, + *request.value, + request.prepared_round, + *request.prepared_value, + request.justification, + ); + sender.send(msg.clone()).expect(WRITE_CHAN_ERR); + + bcast( + broadcast_tx.clone(), + msg.clone(), + test.bcast_jitter_ms, + clock.clone(), + ); // TODO: Add clock + + Ok(()) + })) }, receive: receiver.clone(), }; @@ -200,7 +203,7 @@ fn test_qbft(test: Test) { _ = delay_ch.recv(); _ = v_chan_tx.send(i); - cancel(); + cancel.stop(); }); } else if decide_round != 1 { s.spawn(move || { @@ -226,7 +229,7 @@ fn test_qbft(test: Test) { }); } - let mut results = HashMap::>::new(); + let mut results = HashMap::>::new(); let mut count = 0; let mut decided = false; let mut done = 0; @@ -331,8 +334,8 @@ fn new_msg( value_source: i64, pr: i64, pv: i64, - justify: Option<&Vec>>, -) -> Msg { + justify: Option<&Vec>>, +) -> Msg { let msgs = match justify { None => vec![], Some(justify) => justify @@ -365,8 +368,8 @@ fn new_msg( // Delays the message broadcast by between 1x and 2x jitter_ms and drops // messages. fn bcast( - broadcast: mpmc::Sender>, - msg: Msg, + broadcast: mpmc::Sender>, + msg: Msg, jitter_ms: i32, clock: FakeClock, ) { @@ -406,7 +409,7 @@ struct TestMsg { justify: Option>, } -impl SomeMsg for TestMsg { +impl SomeMsg for TestMsg { fn type_(&self) -> MessageType { self.msg_type } @@ -439,12 +442,12 @@ impl SomeMsg for TestMsg { self.pv } - fn justification(&self) -> Vec> { + fn justification(&self) -> Vec> { match self.justify { None => vec![], Some(ref j) => j .iter() - .map(|j| Arc::new(j.clone()) as Msg) + .map(|j| Arc::new(j.clone()) as Msg) .collect(), } } @@ -697,23 +700,27 @@ fn drop_30_percent_const() { }); } -fn noop_definition() -> Definition { +fn noop_definition() -> Definition { Definition { - is_leader: Box::new(|_, _, _| false), - new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), - decide: Box::new(|_, _, _, _| {}), - compare: Box::new(|_, _, _, _, _, _| {}), + is_leader: Box::new(LeaderSelectorFn::new(|_, _, _| false)), + new_timer: Box::new(TimerFactoryFn::new(|_| { + (mpmc::never(), Box::new(TimerStopperFn::new(|| {}))) + })), + decide: Box::new(DeciderFn::new(|_, _, _, _| {})), + compare: Box::new(ComparatorFn::new(|_| {})), nodes: 0, fifo_limit: 0, - log_round_change: Box::new(|_, _, _, _, _, _| {}), - log_unjust: Box::new(|_, _, _| {}), - log_upon_rule: Box::new(|_, _, _, _, _| {}), + logger: Box::new(QbftLoggerFn::new( + |_, _, _, _, _| {}, + |_, _, _, _, _, _| {}, + |_, _, _| {}, + )), } } -fn noop_transport() -> Transport { +fn noop_transport() -> Transport { Transport { - broadcast: Box::new(|_, _, _, _, _, _, _, _, _| Ok(())), + broadcast: Box::new(BroadcasterFn::new(|_| Ok(()))), receive: mpmc::never(), } } @@ -727,7 +734,7 @@ fn duplicate_pre_prepare_rules() { const NO_LEADER: i64 = 1; const LEADER: i64 = 2; - let new_preprepare = |round: i64| -> Msg { + let new_preprepare = |round: i64| -> Msg { new_msg( MSG_PRE_PREPARE, 0, @@ -743,28 +750,32 @@ fn duplicate_pre_prepare_rules() { }; let mut def = noop_definition(); - def.is_leader = Box::new(|_, _, process| process == LEADER); - def.log_upon_rule = Box::new(move |_, _, round, msg, upon_rule| { - println!("UponRule: rule={} round={} ", upon_rule, msg.round()); - - assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); + def.is_leader = Box::new(LeaderSelectorFn::new(|_, _, process| process == LEADER)); + def.logger = Box::new(QbftLoggerFn::new( + move |_, _, round, msg, upon_rule| { + println!("UponRule: rule={} round={} ", upon_rule, msg.round()); - if msg.round() == 1 { - return; - } + assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); - if msg.round() == 2 { - cts.cancel(); - return; - } + if msg.round() == 1 { + return; + } - panic!("unexpected round {}", round); - }); - def.compare = Box::new(|_, _, _, _, return_err, _| { - _ = return_err.send(Ok(())); - }); + if msg.round() == 2 { + cts.cancel(); + return; + } - let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); + panic!("unexpected round {}", round); + }, + |_, _, _, _, _, _| {}, + |_, _, _| {}, + )); + def.compare = Box::new(ComparatorFn::new(|request| { + _ = request.return_err.send(Ok(())); + })); + + let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); r_chan_tx.send(new_preprepare(1)).expect(WRITE_CHAN_ERR); r_chan_tx.send(new_preprepare(2)).expect(WRITE_CHAN_ERR); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 41d0a2d5..28962220 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -31,8 +31,30 @@ use std::{ sync, thread, time, }; +mod callbacks; + +pub use callbacks::{ + BroadcastRequest, Broadcaster, BroadcasterFn, Comparator, ComparatorFn, CompareRequest, + Decider, DeciderFn, LeaderSelector, LeaderSelectorFn, QbftLogger, QbftLoggerFn, TimerFactory, + TimerFactoryFn, TimerStopper, TimerStopperFn, +}; + type Result = std::result::Result; +pub trait QbftTypes { + type Instance; + type Value: PartialEq; + type Compare; +} + +pub struct I64Qbft; + +impl QbftTypes for I64Qbft { + type Compare = i64; + type Instance = i64; + type Value = i64; +} + #[derive(Debug, thiserror::Error)] pub enum QbftError { #[error("Timeout")] @@ -52,107 +74,44 @@ pub enum QbftError { } /// Abstracts the transport layer between processes in the consensus system. -pub struct Transport +pub struct Transport where - V: PartialEq, + T: QbftTypes, { /// Broadcast sends a message with the provided fields to all other /// processes in the system (including this process). /// /// Note that an error exits the algorithm. - pub broadcast: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* type_ */ MessageType, - /* instance */ &I, - /* source */ i64, - /* round */ i64, - /* value */ &V, - /* pr */ i64, - /* pv */ &V, - /* justification */ Option<&Vec>>, - ) -> Result<()> - + Send - + Sync, - >, + pub broadcast: Box>, /// Receive returns a stream of messages received /// from other processes in the system (including this process). - pub receive: mpmc::Receiver>, + pub receive: mpmc::Receiver>, } /// Defines the consensus system parameters that are external to the qbft /// algorithm. This remains constant across multiple instances of consensus /// (calls to `run`). -pub struct Definition +pub struct Definition where - V: PartialEq, + T: QbftTypes, { /// A deterministic leader election function. - pub is_leader: - Box bool + Send + Sync>, + pub is_leader: Box>, /// Returns a new timer channel and stop function for the round - pub new_timer: Box< - dyn Fn(/* round */ i64) -> (mpmc::Receiver, Box) - + Send - + Sync, - >, + pub new_timer: Box, /// Called when leader proposes value and we compare it with our local /// value. It's an opt-in feature that should instantly return `None` on /// `return_err` channel if it is not turned on. - pub compare: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* qcommit */ &Msg, - /* input_value_source_ch */ &mpmc::Receiver, - /* input_value_source */ &C, - /* return_err */ &mpmc::Sender>, - /* return_value */ &mpmc::Sender, - ) + Send - + Sync, - >, + pub compare: Box>, /// Called when consensus has been reached on a value. - pub decide: Box< - dyn Fn( - /* ct */ &CancellationToken, - /* instance */ &I, - /* value */ &V, - /* qcommit */ &Vec>, - ) + Send - + Sync, - >, - - /// Allows debug logging of triggered upon rules on message receipt. - /// It includes the rule that triggered it and all received round messages. - pub log_upon_rule: Box< - dyn Fn( - /* instance */ &I, - /* process */ i64, - /* round */ i64, - /* msg */ &Msg, - /* upon_rule */ UponRule, - ) + Send - + Sync, - >, - /// Allows debug logging of round changes. - pub log_round_change: Box< - dyn Fn( - /* instance */ &I, - /* process */ i64, - /* round */ i64, - /* new_round */ i64, - /* upon_rule */ UponRule, - /* msgs */ &Vec>, - ) + Send - + Sync, - >, - - /// Allows debug logging of unjust messages. - pub log_unjust: - Box) + Send + Sync>, + pub decide: Box>, + + /// Allows debug logging of QBFT events. + pub logger: Box>, /// Total number of nodes/processes participating in consensus. pub nodes: i64, @@ -161,9 +120,9 @@ where pub fifo_limit: i64, } -impl Definition +impl Definition where - V: PartialEq, + T: QbftTypes, { /// Quorum count for the system. /// See IBFT 2.0 paper for correct formula: @@ -215,35 +174,35 @@ impl Display for MessageType { } /// Defines the inter process messages. -pub trait SomeMsg: Send + Sync + fmt::Debug +pub trait SomeMsg: Send + Sync + fmt::Debug where - V: PartialEq, + T: QbftTypes, { /// Type of the message. fn type_(&self) -> MessageType; /// Consensus instance. - fn instance(&self) -> I; + fn instance(&self) -> T::Instance; /// Process that sent the message. fn source(&self) -> i64; /// The round the message pertains to. fn round(&self) -> i64; /// The value being proposed, usually a hash. - fn value(&self) -> V; + fn value(&self) -> T::Value; /// Usually the value that was hashed and is returned in `value`. - fn value_source(&self) -> Result; + fn value_source(&self) -> Result; /// The justified prepared round. fn prepared_round(&self) -> i64; /// The justified prepared value. - fn prepared_value(&self) -> V; + fn prepared_value(&self) -> T::Value; /// Set of messages that explicitly justifies this message. - fn justification(&self) -> Vec>; + fn justification(&self) -> Vec>; /// Cast as `Any` to allow downcasting. fn as_any(&self) -> &dyn any::Any; } /// Alias for any `Msg` implementation tracked by reference counting. -pub type Msg = sync::Arc>; +pub type Msg = sync::Arc>; /// Defines the event based rules that are triggered when messages are received. #[derive(PartialEq, Eq, Hash, Clone, Copy)] @@ -289,70 +248,71 @@ struct DedupKey { /// The generic type `V` is the arbitrary data value being proposed; it only /// requires an Equal method. The generic type `C` is the compare value, used to /// compare leader's proposed value with local value and can be anything. -pub fn run( +pub fn run( ct: &CancellationToken, - d: &Definition, - t: &Transport, - instance: &I, + d: &Definition, + t: &Transport, + instance: &T::Instance, process: i64, - mut input_value_ch: mpmc::Receiver, - input_value_source_ch: mpmc::Receiver, + mut input_value_ch: mpmc::Receiver, + input_value_source_ch: mpmc::Receiver, ) -> Result<()> where - V: PartialEq + Eq + Hash + Default, - C: Clone + Send + Sync + Default, + T: QbftTypes, + T::Value: PartialEq + Eq + Hash + Default, + T::Compare: Clone + Send + Sync + Default, { // === State === let round: Cell = Cell::new(1); - let input_value: RefCell = RefCell::new(Default::default()); - let mut input_value_source: C = Default::default(); - let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). + let input_value: RefCell = RefCell::new(Default::default()); + let mut input_value_source: T::Compare = Default::default(); + let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). let prepared_round: Cell = Cell::new(0); - let prepared_value: RefCell = RefCell::new(Default::default()); + let prepared_value: RefCell = RefCell::new(Default::default()); let mut compare_failure_round: i64 = 0; - let prepared_justification: RefCell>>> = RefCell::new(None); - let mut q_commit: Option>> = None; - let buffer: RefCell>>> = RefCell::new(HashMap::new()); + let prepared_justification: RefCell>>> = RefCell::new(None); + let mut q_commit: Option>> = None; + let buffer: RefCell>>> = RefCell::new(HashMap::new()); let dedup_rules: RefCell> = RefCell::new(HashMap::new()); let mut timer_chan: mpmc::Receiver; - let mut stop_timer: Box; + let mut stop_timer: Box; // === Helpers == // Broadcasts a non-ROUND-CHANGE message for current round. let broadcast_msg = - |type_: MessageType, value: &V, justification: Option<&Vec>>| { - (t.broadcast)( + |type_: MessageType, value: &T::Value, justification: Option<&Vec>>| { + t.broadcast.broadcast(BroadcastRequest { ct, type_, instance, - process, - round.get(), + source: process, + round: round.get(), value, - 0, - &Default::default(), + prepared_round: 0, + prepared_value: &Default::default(), justification, - ) + }) }; // Broadcasts a ROUND-CHANGE message with current state. let broadcast_round_change = || { - (t.broadcast)( + t.broadcast.broadcast(BroadcastRequest { ct, - MSG_ROUND_CHANGE, + type_: MSG_ROUND_CHANGE, instance, - process, - round.get(), - &Default::default(), - prepared_round.get(), - &prepared_value.borrow(), - prepared_justification.borrow().as_ref(), - ) + source: process, + round: round.get(), + value: &Default::default(), + prepared_round: prepared_round.get(), + prepared_value: &prepared_value.borrow(), + justification: prepared_justification.borrow().as_ref(), + }) }; // Broadcasts a PRE-PREPARE message with current state // and our own input value if present, otherwise it caches the justification // to be used when the input value becomes available. - let broadcast_own_pre_prepare = |justification: Vec>| { + let broadcast_own_pre_prepare = |justification: Vec>| { if ppj_cache.borrow().is_some() { panic!("bug: justification cache must be none") } @@ -367,7 +327,7 @@ where }; // Adds a message to each process' FIFO queue - let buffer_msg = |msg: &Msg| { + let buffer_msg = |msg: &Msg| { let mut b = buffer.borrow_mut(); let fifo = b.entry(msg.source()).or_default(); @@ -390,7 +350,7 @@ where return; } - (d.log_round_change)( + d.logger.log_round_change( instance, process, round.get(), @@ -406,12 +366,12 @@ where // Algorithm 1:11 { - if (d.is_leader)(instance, round.get(), process) { + if d.is_leader.is_leader(instance, round.get(), process) { // Note round==1 at this point. broadcast_own_pre_prepare(vec![])?; // Empty justification since round==1 } - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); } while !ct.is_canceled() { @@ -449,7 +409,7 @@ where // Drop unjust messages if !is_justified(d, instance, &msg, compare_failure_round) { - (d.log_unjust)(instance, process, msg); + d.logger.log_unjust(instance, process, msg); continue; } @@ -462,15 +422,15 @@ where continue; } - (d.log_upon_rule)(instance, process, round.get(), &msg, rule); + d.logger.log_upon_rule(instance, process, round.get(), &msg, rule); match rule { // Algorithm 2:1 UPON_JUSTIFIED_PRE_PREPARE => { change_round(msg.round(), rule); - stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + stop_timer.stop(); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); let compare_result = compare( ct, @@ -497,9 +457,9 @@ where // this happens, we trigger round change. // Algorithm 3:1 change_round(round.get() + 1, UPON_ROUND_TIMEOUT); - stop_timer(); + stop_timer.stop(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } @@ -521,13 +481,13 @@ where // Algorithm 2:8 change_round(msg.round(), rule); q_commit = justification; - stop_timer(); + stop_timer.stop(); timer_chan = mpmc::never(); let justification = q_commit.as_ref() .expect("Rules `UPON_QUORUM_COMMITS` and `UPON_JUSTIFIED_DECIDED` always include a justification"); - (d.decide)(ct, instance, &msg.value(), justification); + d.decide.decide(ct, instance, &msg.value(), justification); } UPON_F_PLUS1_ROUND_CHANGES => { // Algorithm 3:5 @@ -542,8 +502,8 @@ where rule, ); - stop_timer(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + stop_timer.stop(); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } @@ -572,9 +532,9 @@ where result?; change_round(round.get() + 1, UPON_ROUND_TIMEOUT); - stop_timer(); + stop_timer.stop(); - (timer_chan, stop_timer) = (d.new_timer)(round.get()); + (timer_chan, stop_timer) = d.new_timer.new_timer(round.get()); broadcast_round_change()?; } @@ -590,20 +550,21 @@ where Ok(()) } -fn compare( +fn compare( ct: &CancellationToken, - d: &Definition, - msg: &Msg, - input_value_source_ch: &mpmc::Receiver, - input_value_source: C, + d: &Definition, + msg: &Msg, + input_value_source_ch: &mpmc::Receiver, + input_value_source: T::Compare, timer_chan: &mpmc::Receiver, -) -> Result +) -> Result where - V: PartialEq, - C: Clone + Send + Sync, + T: QbftTypes, + T::Value: PartialEq, + T::Compare: Clone + Send + Sync, { let (compare_err_tx, compare_err_rx) = mpmc::bounded::>(1); - let (compare_value_tx, compare_value_rx) = mpmc::bounded::(1); + let (compare_value_tx, compare_value_rx) = mpmc::bounded::(1); // d.Compare has 2 roles: // 1. Read from the `input_value_source_ch` (if `input_value_source` is empty). @@ -620,14 +581,14 @@ where let compare = &d.compare; s.spawn(move || { - (compare)( + compare.compare(CompareRequest { ct, - msg, + qcommit: msg, input_value_source_ch, - &input_value_source, - &compare_err_tx, - &compare_value_tx, - ); + input_value_source: &input_value_source, + return_err: &compare_err_tx, + return_value: &compare_value_tx, + }); }); loop { @@ -658,12 +619,10 @@ where } /// Returns all messages from the provided round. -fn extract_round_messages( - buffer: &HashMap>>, - round: i64, -) -> Vec> +fn extract_round_messages(buffer: &HashMap>>, round: i64) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut resp = vec![]; @@ -680,16 +639,17 @@ where /// Returns the rule triggered upon receipt of the last message and its /// justifications. -fn classify( - d: &Definition, - instance: &I, +fn classify( + d: &Definition, + instance: &T::Instance, round: i64, process: i64, - buffer: &HashMap>>, - msg: &Msg, -) -> (UponRule, Option>>) + buffer: &HashMap>>, + msg: &Msg, +) -> (UponRule, Option>>) where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { match msg.type_() { MSG_DECIDED => (UPON_JUSTIFIED_DECIDED, Some(msg.justification())), @@ -757,7 +717,7 @@ where return (UPON_UNJUST_QUORUM_ROUND_CHANGES, None); }; - if !(d.is_leader)(instance, msg.round(), process) { + if !d.is_leader.is_leader(instance, msg.round(), process) { return (UPON_NOTHING, None); } @@ -771,9 +731,10 @@ where /// Implements algorithm 3:6 and returns the next minimum round from received /// round change messages. -fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 +fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { // Get all RoundChange messages with round (rj) higher than current round (ri) if (frc.len() as i64) < d.faulty() + 1 { @@ -799,14 +760,15 @@ where } /// Returns true if message is justified or if it does not need justification. -fn is_justified( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, ) -> bool where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { match msg.type_() { MSG_PRE_PREPARE => is_justified_pre_prepare(d, instance, msg, compare_failure_round), @@ -820,9 +782,10 @@ where /// Returns true if the ROUND_CHANGE message's prepared round and value is /// justified. -fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool +fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool where - V: PartialEq + Default, + T: QbftTypes, + T::Value: PartialEq + Default, { if msg.type_() != MSG_ROUND_CHANGE { panic!("bug: not a round change message"); @@ -845,7 +808,7 @@ where return false; } - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for prepare in prepares { if !uniq(&prepare) { return false; @@ -869,9 +832,10 @@ where /// Returns true if the decided message is justified by quorum COMMIT messages /// of identical round and value. -fn is_justified_decided(d: &Definition, msg: &Msg) -> bool +fn is_justified_decided(d: &Definition, msg: &Msg) -> bool where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { if msg.type_() != MSG_DECIDED { panic!("bug: not a decided message"); @@ -891,20 +855,21 @@ where } /// Returns true if the PRE-PREPARE message is justified. -fn is_justified_pre_prepare( - d: &Definition, - instance: &I, - msg: &Msg, +fn is_justified_pre_prepare( + d: &Definition, + instance: &T::Instance, + msg: &Msg, compare_failure_round: i64, ) -> bool where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { if msg.type_() != MSG_PRE_PREPARE { panic!("bug: not a preprepare message"); } - if !(d.is_leader)(instance, msg.round(), msg.source()) { + if !d.is_leader.is_leader(instance, msg.round(), msg.source()) { return false; } @@ -927,13 +892,14 @@ where /// Implements algorithm 4:1 and returns true and pv if the messages contains a /// justified quorum ROUND_CHANGEs (Qrc). -fn contains_justified_qrc( - d: &Definition, - justification: &Vec>, +fn contains_justified_qrc( + d: &Definition, + justification: &Vec>, round: i64, -) -> Option +) -> Option where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { let qrc = filter_round_change(justification, round); if (qrc.len() as i64) < d.quorum() { @@ -980,17 +946,15 @@ where /// Extracts the single justified Pr and Pv from quorum PREPARES in list of /// messages. It expects only one possible combination. -fn get_single_justified_pr_pv( - d: &Definition, - msgs: &Vec>, -) -> Option<(i64, V)> +fn get_single_justified_pr_pv(d: &Definition, msgs: &Vec>) -> Option<(i64, T::Value)> where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { let mut pr: i64 = 0; - let mut pv: V = Default::default(); + let mut pv: T::Value = Default::default(); let mut count: i64 = 0; - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for msg in msgs { if msg.type_() != MSG_PREPARE { @@ -1019,13 +983,10 @@ where } /// Implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc) -fn get_justified_qrc( - d: &Definition, - all: &Vec>, - round: i64, -) -> Option>> +fn get_justified_qrc(d: &Definition, all: &Vec>, round: i64) -> Option>> where - V: Eq + Hash + Default, + T: QbftTypes, + T::Value: Eq + Hash + Default, { if let (qrc, true) = quorum_null_prepared(d, all, round) { // Return any quorum null pv ROUND_CHANGE messages as Qrc. @@ -1037,11 +998,11 @@ where for prepares in get_prepare_quorums(d, all) { // See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == // prepares.Round. - let mut qrc: Vec> = vec![]; + let mut qrc: Vec> = vec![]; let mut has_highest_prepared = false; let pr = prepares[0].round(); let pv = prepares[0].value(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for rc in round_changes.iter() { if rc.prepared_round() > pr { @@ -1071,15 +1032,16 @@ where /// Returns true and Faulty+1 ROUND-CHANGE messages (Frc) with the rounds higher /// than the provided round. It returns the highest round per process in order /// to jump furthest. -fn get_fplus1_round_changes( - d: &Definition, - all: &Vec>, +fn get_fplus1_round_changes( + d: &Definition, + all: &Vec>, round: i64, -) -> Option>> +) -> Option>> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - let mut highest_by_source = HashMap::>::new(); + let mut highest_by_source = HashMap::>::new(); for msg in all { if msg.type_() != MSG_ROUND_CHANGE { @@ -1122,14 +1084,12 @@ where value: V, } -fn get_prepare_quorums( - d: &Definition, - all: &Vec>, -) -> Vec>> +fn get_prepare_quorums(d: &Definition, all: &Vec>) -> Vec>> where - V: Eq + Hash, + T: QbftTypes, + T::Value: Eq + Hash, { - let mut sets = HashMap::, HashMap>>::new(); + let mut sets = HashMap::, HashMap>>::new(); for msg in all { if msg.type_() != MSG_PREPARE { @@ -1167,13 +1127,10 @@ where /// Implements condition J1 and returns Qrc and true if a quorum /// of round changes messages (Qrc) for the round have null prepared round and /// value. -fn quorum_null_prepared( - d: &Definition, - all: &Vec>, - round: i64, -) -> (Vec>, bool) +fn quorum_null_prepared(d: &Definition, all: &Vec>, round: i64) -> (Vec>, bool) where - V: PartialEq + Default, + T: QbftTypes, + T::Value: PartialEq + Default, { let null_pr = Default::default(); let null_pv = Some(&Default::default()); @@ -1187,41 +1144,44 @@ where } /// Returns the messages matching the type and value. -fn filter_by_round_and_value( - msgs: &Vec>, +fn filter_by_round_and_value( + msgs: &Vec>, message_type: MessageType, round: i64, - value: V, -) -> Vec> + value: T::Value, +) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { filter_msgs(msgs, message_type, round, Some(&value), None, None) } /// Returns all round change messages for the provided round. -fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> +fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) + filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) } /// Returns one message per process matching the provided type and round and /// optional value, pr, pv. -fn filter_msgs( - msgs: &Vec>, +fn filter_msgs( + msgs: &Vec>, message_type: MessageType, round: i64, - value: Option<&V>, + value: Option<&T::Value>, pr: Option, - pv: Option<&V>, -) -> Vec> + pv: Option<&T::Value>, +) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut resp = Vec::new(); - let mut uniq = uniq_source::(vec![]); + let mut uniq = uniq_source::(vec![]); for msg in msgs { if message_type != msg.type_() { @@ -1260,11 +1220,12 @@ where /// Produce a vector containing all the buffered messages as well as all their /// justifications. -fn flatten(buffer: &HashMap>>) -> Vec> +fn flatten(buffer: &HashMap>>) -> Vec> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { - let mut resp: Vec> = Vec::new(); + let mut resp: Vec> = Vec::new(); for msgs in buffer.values() { for msg in msgs { @@ -1283,12 +1244,13 @@ where /// Construct a function that returns true if the message is from a unique /// source. -fn uniq_source(vec: Vec>) -> Box) -> bool> +fn uniq_source(vec: Vec>) -> Box) -> bool> where - V: PartialEq, + T: QbftTypes, + T::Value: PartialEq, { let mut s = vec.iter().map(|msg| msg.source()).collect::>(); - Box::new(move |msg: &Msg| { + Box::new(move |msg: &Msg| { let source = msg.source(); if s.contains(&source) { false