Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
606d5bc
test: update qbft test
iamquang95 May 14, 2026
0ec38de
fix: compare run on retached thread
iamquang95 May 18, 2026
b752caa
fix: removed hard coded salt in tests
iamquang95 May 18, 2026
ed912ee
fix: return error on Context cancelled
iamquang95 May 18, 2026
e01f64f
fix: hash from string, not magic number
iamquang95 May 18, 2026
b22fb41
fix: one shot cancel when parent is cancelled
iamquang95 May 18, 2026
081d8a4
fix: fix make_is_leader test
iamquang95 May 18, 2026
1a0a337
fix: minors naming and comments
iamquang95 May 18, 2026
183f48e
Merge remote-tracking branch 'origin/main' into iamquang95/qbft
iamquang95 May 18, 2026
44b9a13
fix: linter
iamquang95 May 18, 2026
cc59baf
fix: early cancel on the loop
iamquang95 May 18, 2026
ed82b6e
fix: context cancel in compare
iamquang95 May 18, 2026
7911c23
fix: validate definition
iamquang95 May 18, 2026
829ba5f
fix: add check pr < r
iamquang95 May 18, 2026
d834463
fix: test and document on run
iamquang95 May 18, 2026
739e3b3
fix: compare callback failed should timeout
iamquang95 May 19, 2026
27744cf
fix: test use timeout channel instead of sleep
iamquang95 May 19, 2026
0bd6a4f
fix: add comment on cancellation poll interval
iamquang95 May 19, 2026
e988c35
fix: use enum for invalid defnintion error
iamquang95 May 19, 2026
6c65b32
fix: add more test on check valid round
iamquang95 May 19, 2026
b30e616
fix: using test-case
iamquang95 May 19, 2026
4a364df
fix: small fixes
iamquang95 May 19, 2026
e759a81
fix: linter
iamquang95 May 19, 2026
2d9ef3c
Merge remote-tracking branch 'origin/main' into iamquang95/qbft
iamquang95 May 19, 2026
998b9ce
fix: remove unnecessary filter
iamquang95 May 19, 2026
0451f20
fix: make_is_leader now 0-based
iamquang95 May 19, 2026
e0e51f7
fix: add cancelled state for fake clock
iamquang95 May 19, 2026
6cc22da
fix: flanky tests by ordering and add small settle window time
iamquang95 May 19, 2026
7e48b56
fix: harden fake-clock to avoid flanky scheduling
iamquang95 May 20, 2026
024bff7
fix: lint
iamquang95 May 20, 2026
1b3664b
refactor: introduce QbftTypes
iamquang95 May 19, 2026
132bc4f
refactor: group QBFT callbacks into typed request/logger structs
iamquang95 May 20, 2026
1038890
refactor: add ClientRecord
iamquang95 May 20, 2026
8500d01
refactor: simplify internal_tests
iamquang95 May 21, 2026
42a4913
refactor: simplify qbft mod.rs
iamquang95 May 21, 2026
70e3ce9
refactor: remove skip clippy on qbft mod
iamquang95 May 21, 2026
8170a5d
Merge remote-tracking branch 'origin/main' into iamquang95/refactor-qbft
iamquang95 May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions crates/core/src/qbft/callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use super::{MessageType, Msg, QbftTypes, Result, UponRule};
use cancellation::CancellationToken;
use crossbeam::channel as mpmc;
use std::time;

pub(super) type BroadcastFn<T> =
dyn for<'a> Fn(BroadcastRequest<'a, T>) -> Result<()> + Send + Sync;
pub(super) type CompareFn<T> = dyn for<'a> Fn(CompareRequest<'a, T>) + Send + Sync + 'static;
pub(super) type UponRuleLoggerFn<T> = dyn for<'a> Fn(UponRuleLog<'a, T>) + Send + Sync;
pub(super) type RoundChangeLoggerFn<T> = dyn for<'a> Fn(RoundChangeLog<'a, T>) + Send + Sync;
pub(super) type UnjustLoggerFn<T> = dyn for<'a> Fn(UnjustLog<'a, T>) + Send + Sync;
pub(super) type LeaderFn<T> = dyn for<'a> Fn(LeaderRequest<'a, T>) -> bool + Send + Sync;
pub(super) type DecideFn<T> = dyn for<'a> Fn(DecideRequest<'a, T>) + Send + Sync;

/// Input passed to `Transport::broadcast`.
pub struct BroadcastRequest<'a, T: QbftTypes> {
/// Parent cancellation token.
pub ct: &'a CancellationToken,
/// Message type to broadcast.
pub type_: MessageType,
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Sending process.
pub source: i64,
/// Message round.
pub round: i64,
/// Proposal value.
pub value: &'a T::Value,
/// Prepared round carried by ROUND-CHANGE messages.
pub prepared_round: i64,
/// Prepared value carried by ROUND-CHANGE messages.
pub prepared_value: &'a T::Value,
/// Optional justification piggybacked on the message.
pub justification: Option<&'a Vec<Msg<T>>>,
}

/// Input passed to `Definition::compare`.
pub struct CompareRequest<'a, T: QbftTypes> {
/// Compare-scoped cancellation token.
pub ct: &'a CancellationToken,
/// Proposed commit quorum message.
pub qcommit: &'a Msg<T>,
/// Channel carrying the local compare value if it was not cached yet.
pub input_value_source_ch: &'a mpmc::Receiver<T::Compare>,
/// Cached local compare value.
pub input_value_source: &'a T::Compare,
/// Channel used by the callback to return compare status.
pub return_err: &'a mpmc::Sender<Result<()>>,
/// Channel used by the callback to cache the local compare value.
pub return_value: &'a mpmc::Sender<T::Compare>,
}

/// Timer returned by `Definition::new_timer`.
pub struct Timer {
/// Channel that fires when the timer expires.
pub receive: mpmc::Receiver<time::Instant>,
/// Stops the timer.
pub stop: Box<dyn Fn() + Send + Sync>,
}

/// Input passed to `Definition::is_leader`.
pub struct LeaderRequest<'a, T: QbftTypes> {
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Round being evaluated.
pub round: i64,
/// Process being evaluated.
pub process: i64,
}

/// Input passed to `Definition::decide`.
pub struct DecideRequest<'a, T: QbftTypes> {
/// Parent cancellation token.
pub ct: &'a CancellationToken,
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Decided value.
pub value: &'a T::Value,
/// Commit quorum justifying the decision.
pub qcommit: &'a Vec<Msg<T>>,
}

/// Input passed to `QbftLogger::upon_rule`.
pub struct UponRuleLog<'a, T: QbftTypes> {
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Local process.
pub process: i64,
/// Current local round.
pub round: i64,
/// Message that triggered classification.
pub msg: &'a Msg<T>,
/// Rule that fired.
pub upon_rule: UponRule,
}

/// Input passed to `QbftLogger::round_change`.
pub struct RoundChangeLog<'a, T: QbftTypes> {
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Local process.
pub process: i64,
/// Previous local round.
pub round: i64,
/// New local round.
pub new_round: i64,
/// Rule that caused the round change.
pub upon_rule: UponRule,
/// Messages from the previous round.
pub msgs: &'a Vec<Msg<T>>,
}

/// Input passed to `QbftLogger::unjust`.
pub struct UnjustLog<'a, T: QbftTypes> {
/// Consensus instance identifier.
pub instance: &'a T::Instance,
/// Local process.
pub process: i64,
/// Rejected message.
pub msg: Msg<T>,
}
25 changes: 20 additions & 5 deletions crates/core/src/qbft/fake_clock.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::arithmetic_side_effects)]

use crossbeam::channel as mpmc;
use std::{
collections::BTreeMap,
Expand Down Expand Up @@ -28,7 +30,13 @@ struct FakeClockInner {
now: Instant,
last_id: usize,
cancelled: bool,
clients: BTreeMap<usize, (mpmc::Sender<Instant>, Instant, TimerPriority)>,
clients: BTreeMap<usize, ClientRecord>,
}

struct ClientRecord {
sender: mpmc::Sender<Instant>,
deadline: Instant,
priority: TimerPriority,
}

impl FakeClock {
Expand Down Expand Up @@ -80,7 +88,14 @@ impl FakeClock {
let deadline = inner.now + duration;

inner.last_id += 1;
inner.clients.insert(id, (tx, deadline, priority));
inner.clients.insert(
id,
ClientRecord {
sender: tx,
deadline,
priority,
},
);

id
};
Expand Down Expand Up @@ -124,9 +139,9 @@ impl FakeClock {
inner.now += duration;
let now = inner.now;

for (&id, (ch, deadline, priority)) in &inner.clients {
if *deadline <= now {
expired.push((id, *deadline, *priority, ch.clone()));
for (&id, record) in &inner.clients {
if record.deadline <= now {
expired.push((id, record.deadline, record.priority, record.sender.clone()));
}
}

Expand Down
Loading
Loading