Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
606d5bc
test: update qbft test
iamquang95 May 14, 2026
0ec38de
fix: compare run on retached thread
iamquang95 May 18, 2026
b752caa
fix: removed hard coded salt in tests
iamquang95 May 18, 2026
ed912ee
fix: return error on Context cancelled
iamquang95 May 18, 2026
e01f64f
fix: hash from string, not magic number
iamquang95 May 18, 2026
b22fb41
fix: one shot cancel when parent is cancelled
iamquang95 May 18, 2026
081d8a4
fix: fix make_is_leader test
iamquang95 May 18, 2026
1a0a337
fix: minors naming and comments
iamquang95 May 18, 2026
183f48e
Merge remote-tracking branch 'origin/main' into iamquang95/qbft
iamquang95 May 18, 2026
44b9a13
fix: linter
iamquang95 May 18, 2026
cc59baf
fix: early cancel on the loop
iamquang95 May 18, 2026
ed82b6e
fix: context cancel in compare
iamquang95 May 18, 2026
7911c23
fix: validate definition
iamquang95 May 18, 2026
829ba5f
fix: add check pr < r
iamquang95 May 18, 2026
d834463
fix: test and document on run
iamquang95 May 18, 2026
739e3b3
fix: compare callback failed should timeout
iamquang95 May 19, 2026
27744cf
fix: test use timeout channel instead of sleep
iamquang95 May 19, 2026
0bd6a4f
fix: add comment on cancellation poll interval
iamquang95 May 19, 2026
e988c35
fix: use enum for invalid defnintion error
iamquang95 May 19, 2026
6c65b32
fix: add more test on check valid round
iamquang95 May 19, 2026
b30e616
fix: using test-case
iamquang95 May 19, 2026
4a364df
fix: small fixes
iamquang95 May 19, 2026
e759a81
fix: linter
iamquang95 May 19, 2026
2d9ef3c
Merge remote-tracking branch 'origin/main' into iamquang95/qbft
iamquang95 May 19, 2026
998b9ce
fix: remove unnecessary filter
iamquang95 May 19, 2026
0451f20
fix: make_is_leader now 0-based
iamquang95 May 19, 2026
e0e51f7
fix: add cancelled state for fake clock
iamquang95 May 19, 2026
6cc22da
fix: flanky tests by ordering and add small settle window time
iamquang95 May 19, 2026
7e48b56
fix: harden fake-clock to avoid flanky scheduling
iamquang95 May 20, 2026
024bff7
fix: lint
iamquang95 May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 174 additions & 25 deletions crates/core/src/qbft/fake_clock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crossbeam::channel as mpmc;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
collections::BTreeMap,
sync::{
Arc, Mutex,
atomic::{AtomicIsize, Ordering},
},
thread,
time::{Duration, Instant},
};
Expand All @@ -11,41 +14,73 @@ pub struct FakeClock {
inner: Arc<Mutex<FakeClockInner>>,
}

#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub enum TimerPriority {
// Match the Go fake-clock harness at equal deadlines: delayed nodes start
// before protocol timers, while delayed input values arrive after them.
StartDelay,
Protocol,
InputValue,
}

struct FakeClockInner {
start: Instant,
now: Instant,
last_id: usize,
clients: HashMap<usize, (mpmc::Sender<Instant>, Instant)>,
cancelled: bool,
clients: BTreeMap<usize, (mpmc::Sender<Instant>, Instant, TimerPriority)>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a new ClientRecord struct instead of the tuple.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do that on the refactoring PR later 👍

}

impl FakeClock {
/// Create a fake clock pinned to an initial instant.
pub fn new(now: Instant) -> Self {
Self {
inner: Arc::new(Mutex::new(FakeClockInner {
start: now,
now,
last_id: 1,
cancelled: false,
clients: Default::default(),
})),
}
}

/// Register a protocol timer with the default protocol priority.
pub fn new_timer(
&self,
duration: Duration,
) -> (
mpmc::Receiver<Instant>,
Box<dyn Fn() + Send + Sync + 'static>,
) {
let (tx, rx) = mpmc::bounded::<Instant>(1);
self.new_timer_with_priority(duration, TimerPriority::Protocol)
}

/// Register a timer with explicit same-deadline ordering priority.
pub fn new_timer_with_priority(
&self,
duration: Duration,
priority: TimerPriority,
) -> (
mpmc::Receiver<Instant>,
Box<dyn Fn() + Send + Sync + 'static>,
) {
// Synchronous expiry handoff: advancing fake time must wait until the
// timer owner observes the tick, otherwise exact-round QBFT tests race
// worker scheduling.
let (tx, rx) = mpmc::bounded::<Instant>(0);

let client_id = {
let mut inner = self.inner.lock().unwrap();
if inner.cancelled {
return (rx, Box::new(|| {}));
}

let id = inner.last_id;
let deadline = inner.now + duration;

inner.last_id += 1;
inner.clients.insert(id, (tx, deadline));
inner.clients.insert(id, (tx, deadline, priority));

id
};
Expand All @@ -59,7 +94,27 @@ impl FakeClock {
(rx, cancel)
}

pub fn advance(&self, duration: Duration) {
/// Advance fake time and deliver all timers expired by the new time.
pub fn advance(&self, duration: Duration) -> usize {
self.advance_inner(duration, None)
}

/// Advance fake time and wait for each delivered timer action to complete.
pub fn advance_and_wait(
&self,
duration: Duration,
pending_timer_actions: &AtomicIsize,
) -> usize {
self.advance_inner(duration, Some(pending_timer_actions))
}

/// Shared advance path; optionally synchronizes timer delivery with the
/// test harness.
fn advance_inner(
&self,
duration: Duration,
pending_timer_actions: Option<&AtomicIsize>,
) -> usize {
// Advance time and collect expired senders under lock, but perform sends
// without holding lock.
let mut expired = vec![];
Expand All @@ -69,85 +124,179 @@ impl FakeClock {
inner.now += duration;
let now = inner.now;

for (&id, (ch, deadline)) in inner.clients.iter() {
for (&id, (ch, deadline, priority)) in &inner.clients {
if *deadline <= now {
expired.push((id, ch.clone()));
expired.push((id, *deadline, *priority, ch.clone()));
}
}

for (id, _) in expired.iter() {
for (id, ..) in expired.iter() {
inner.clients.remove(id);
}

now
};

for (_, ch) in expired {
let _ = ch.send(now);
// Equal-deadline order is part of the test harness contract: these
// tests assert exact rounds, and Go's fake-clock scheduling is stable.
expired.sort_by_key(|(id, deadline, priority, _)| (*deadline, *priority, *id));

let mut delivered = 0;
for (_, _, _, ch) in expired {
if let Some(pending_timer_actions) = pending_timer_actions {
pending_timer_actions.fetch_add(1, Ordering::SeqCst);
}

if ch.send(now).is_ok() {
delivered += 1;
if let Some(pending_timer_actions) = pending_timer_actions {
while pending_timer_actions.load(Ordering::SeqCst) > 0 {
thread::yield_now();
}
}
} else if let Some(pending_timer_actions) = pending_timer_actions {
pending_timer_actions.fetch_sub(1, Ordering::SeqCst);
}
}

delivered
}

/// Return fake time elapsed since clock creation.
pub fn elapsed(&self) -> Duration {
let inner = self.inner.lock().unwrap();
inner.now - inner.start
}

/// Return currently registered timers.
pub fn timer_count(&self) -> usize {
self.inner.lock().unwrap().clients.len()
}

/// Explicit terminal cleanup; do not reintroduce `Drop`, since dropping one
/// clone must not cancel timers owned by other clones.
pub fn cancel(&self) {
let mut inner = self.inner.lock().unwrap();
inner.cancelled = true;
inner.clients.clear();
}
}
Comment thread
iamquang95 marked this conversation as resolved.

impl Drop for FakeClock {
fn drop(&mut self) {
self.cancel();
}
}

#[test]
/// Timers registered by different threads fire after fake time passes
/// deadlines.
fn multiple_threads_timers() {
let clock = FakeClock::new(Instant::now());
let (done_tx, done_rx) = mpmc::bounded(2);

let start = Instant::now();
thread::scope(|s| {
let c1 = clock.clone();
let (ch_1, _) = c1.new_timer(Duration::from_secs(5));
let done_tx_1 = done_tx.clone();
s.spawn(move || {
let _ = ch_1.recv();
done_tx_1.send(ch_1.recv().is_ok()).unwrap();
});

let c2 = clock.clone();
let (ch_2, _) = c2.new_timer(Duration::from_secs(5));
let done_tx_2 = done_tx.clone();
s.spawn(move || {
let _ = ch_2.recv();
done_tx_2.send(ch_2.recv().is_ok()).unwrap();
});

clock.advance(Duration::from_secs(4));
assert!(done_rx.try_recv().is_err());
clock.advance(Duration::from_secs(6));
});

println!("start={:?}, clock={:?}", start.elapsed(), clock.elapsed());
let done = done_rx.try_iter().collect::<Vec<_>>();
assert_eq!(2, done.len());
assert!(done.into_iter().all(|done| done));
assert_eq!(Duration::from_secs(10), clock.elapsed());
}

#[test]
/// Cancelling the clock closes outstanding timers without advancing fake time.
fn multiple_threads_cancellation() {
let clock = FakeClock::new(Instant::now());
let (done_tx, done_rx) = mpmc::bounded(2);

let start = Instant::now();
thread::scope(|s| {
let c1 = clock.clone();
let (ch_1, _) = c1.new_timer(Duration::from_secs(5));
let done_tx_1 = done_tx.clone();
s.spawn(move || {
let _ = ch_1.recv();
done_tx_1.send(ch_1.recv().is_err()).unwrap();
});

let c2 = clock.clone();
let (ch_2, _) = c2.new_timer(Duration::from_secs(5));
let done_tx_2 = done_tx.clone();
s.spawn(move || {
let _ = ch_2.recv();
done_tx_2.send(ch_2.recv().is_err()).unwrap();
});

clock.cancel();
});

println!("start={:?}, clock={:?}", start.elapsed(), clock.elapsed());
let done = done_rx.try_iter().collect::<Vec<_>>();
assert_eq!(2, done.len());
assert!(done.into_iter().all(|done| done));
assert_eq!(Duration::ZERO, clock.elapsed());
}

#[test]
/// A timer created after clock cancellation is immediately closed.
fn timer_created_after_cancel_is_closed() {
let clock = FakeClock::new(Instant::now());
clock.cancel();

let (ch, cancel) = clock.new_timer(Duration::from_secs(5));

assert!(matches!(
ch.try_recv(),
Err(mpmc::TryRecvError::Disconnected)
));
cancel();
}

#[test]
/// Cancelling one timer does not affect other timers with the same deadline.
fn cancel_one_timer_only() {
let clock = FakeClock::new(Instant::now());
let (ch_1, cancel_1) = clock.new_timer(Duration::from_secs(5));
let (ch_2, _) = clock.new_timer(Duration::from_secs(5));
let (done_tx, done_rx) = mpmc::bounded(1);

cancel_1();
thread::scope(|s| {
s.spawn(move || {
done_tx.send(ch_2.recv().is_ok()).unwrap();
});

clock.advance(Duration::from_secs(5));
});

assert!(ch_1.try_recv().is_err());
assert_eq!(Ok(true), done_rx.try_recv());
}

#[test]
/// An expired timer is delivered once and removed from the clock.
fn expired_timer_delivers_once() {
let clock = FakeClock::new(Instant::now());
let (ch, _) = clock.new_timer(Duration::from_secs(5));
let (done_tx, done_rx) = mpmc::bounded(1);

thread::scope(|s| {
s.spawn(move || {
done_tx.send(ch.recv().is_ok()).unwrap();
});

clock.advance(Duration::from_secs(5));
});

assert_eq!(Ok(true), done_rx.try_recv());
clock.advance(Duration::from_secs(5));
assert!(done_rx.try_recv().is_err());
}
Loading
Loading