Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/uniflight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tick = { workspace = true, features = ["tokio"] }
tokio = { workspace = true, features = [
"macros",
"rt",
"sync",
"time",
"rt-multi-thread",
] }
Expand Down
149 changes: 72 additions & 77 deletions crates/uniflight/tests/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ use std::time::Duration;

use futures_util::StreamExt;
use futures_util::stream::FuturesUnordered;
use tokio::sync::Notify;
use uniflight::Merger;

fn unreachable_future() -> std::future::Pending<String> {
std::future::pending()
}

/// Waits for a [`Notify`] signal with a generous timeout, panicking with `msg`
/// if the signal is not received. Prevents test hangs if synchronization breaks.
async fn await_notify(notify: &Notify, msg: &str) {
tokio::time::timeout(Duration::from_secs(5), notify.notified()).await.expect(msg);
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn direct_call() {
Expand Down Expand Up @@ -152,46 +159,48 @@ async fn cancel() {
#[tokio::test]
async fn leader_panic_returns_error_to_all() {
let group: Arc<Merger<String, String>> = Arc::new(Merger::new());

// First task will panic (caught by catch_unwind)
let group_clone = Arc::clone(&group);
let leader_handle = tokio::spawn(async move {
group_clone
.execute("key", || async {
tokio::time::sleep(Duration::from_millis(50)).await;
panic!("leader panicked");
#[expect(unreachable_code, reason = "Required to satisfy return type after panic")]
"never".to_string()
})
.await
let leader_registered = Arc::new(Notify::new());
let follower_registered = Arc::new(Notify::new());

// Leader: registers cell, signals readiness, waits for follower, then panics.
let leader_handle = tokio::spawn({
let group = Arc::clone(&group);
let leader_registered = Arc::clone(&leader_registered);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async move {
// Wait until follower is actively waiting on our cell before panicking
await_notify(&follower_registered, "follower should register before timeout").await;
panic!("leader panicked")
});
leader_registered.notify_one();
fut.await
}
});

// Give time for the spawned task to register and start
tokio::time::sleep(Duration::from_millis(10)).await;
await_notify(&leader_registered, "leader should register before timeout").await;

// Second task joins as a follower
let group_clone = Arc::clone(&group);
let follower_handle = tokio::spawn(async move {
group_clone
.execute("key", || async {
// Follower: finds leader's cell, signals readiness, then awaits the result.
let follower_handle = tokio::spawn({
let group = Arc::clone(&group);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async {
// This should never run - we're a follower
"follower result".to_string()
})
.await
});
// Cell is joined. Signal the leader to proceed.
follower_registered.notify_one();
fut.await
}
});

// Leader gets LeaderPanicked error (panic is caught, not propagated)
let leader_result = leader_handle.await.expect("task should not panic - panic is caught");
let Err(leader_err) = leader_result else {
panic!("expected Err, got Ok");
};
let leader_err = leader_handle.await.expect("task should not panic - panic is caught").unwrap_err();
assert_eq!(leader_err.message(), "leader panicked");

// Follower also gets LeaderPanicked error with same message
let follower_result = follower_handle.await.expect("follower task should not panic");
let Err(follower_err) = follower_result else {
panic!("expected Err, got Ok");
};
let follower_err = follower_handle.await.expect("follower task should not panic").unwrap_err();
assert_eq!(follower_err.message(), "leader panicked");
}

Expand Down Expand Up @@ -277,13 +286,7 @@ async fn clone_shares_state() {
async fn leader_panicked_error_traits() {
// Create an error by triggering a panic
let group: Merger<String, String> = Merger::new();
let result = group
.execute("key", || async {
panic!("test message");
#[expect(unreachable_code, reason = "Required to satisfy return type after panic")]
"never".to_string()
})
.await;
let result = group.execute("key", || async { panic!("test message") }).await;
let Err(error) = result else {
panic!("expected Err");
};
Expand Down Expand Up @@ -318,13 +321,7 @@ async fn retry_after_panic_succeeds() {
let group: Merger<String, String> = Merger::new();

// First call panics
let result = group
.execute("key", || async {
panic!("intentional panic");
#[expect(unreachable_code, reason = "Required to satisfy return type after panic")]
"never".to_string()
})
.await;
let result = group.execute("key", || async { panic!("intentional panic") }).await;
let Err(err) = result else {
panic!("expected Err");
};
Expand Down Expand Up @@ -357,9 +354,7 @@ async fn mixed_panic_and_success() {
// Start multiple keys concurrently - some panic, some succeed
let panic_fut = group.execute("panic_key", || async {
tokio::time::sleep(Duration::from_millis(10)).await;
panic!("intentional panic");
#[expect(unreachable_code, reason = "Required to satisfy return type after panic")]
"never".to_string()
panic!("intentional panic")
});

let success_fut = group.execute("success_key", || async {
Expand All @@ -384,45 +379,45 @@ async fn mixed_panic_and_success() {
async fn follower_closure_not_called_on_panic() {
let group: Arc<Merger<String, String>> = Arc::new(Merger::new());
let follower_called = Arc::new(AtomicUsize::new(0));

// Leader will panic
let group_clone = Arc::clone(&group);
let leader_handle = tokio::spawn(async move {
group_clone
.execute("key", || async {
tokio::time::sleep(Duration::from_millis(50)).await;
panic!("leader panic");
#[expect(unreachable_code, reason = "Required to satisfy return type after panic")]
"never".to_string()
})
.await
let leader_registered = Arc::new(Notify::new());
let follower_registered = Arc::new(Notify::new());

// Leader: registers cell, signals readiness, waits for follower, then panics.
let leader_handle = tokio::spawn({
let group = Arc::clone(&group);
let leader_registered = Arc::clone(&leader_registered);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async move {
await_notify(&follower_registered, "follower should register before timeout").await;
panic!("leader panic")
});
leader_registered.notify_one();
fut.await
}
});

// Give leader time to start
tokio::time::sleep(Duration::from_millis(10)).await;
await_notify(&leader_registered, "leader should register before timeout").await;

// Follower joins - its closure should NOT be called
let group_clone = Arc::clone(&group);
let follower_called_clone = Arc::clone(&follower_called);
let follower_handle = tokio::spawn(async move {
group_clone
.execute("key", || async {
follower_called_clone.fetch_add(1, Acquire);
// Follower: finds leader's cell, signals readiness, then awaits.
let follower_handle = tokio::spawn({
let group = Arc::clone(&group);
let follower_called = Arc::clone(&follower_called);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async {
follower_called.fetch_add(1, AcqRel);
"follower result".to_string()
})
.await
});
follower_registered.notify_one();
fut.await
}
});

let (leader_result, follower_result) = tokio::join!(leader_handle, follower_handle);

let Err(leader_err) = leader_result.expect("task join") else {
panic!("expected Err");
};
let leader_err = leader_handle.await.expect("task join").unwrap_err();
assert_eq!(leader_err.message(), "leader panic");

let Err(follower_err) = follower_result.expect("task join") else {
panic!("expected Err");
};
let follower_err = follower_handle.await.expect("task join").unwrap_err();
assert_eq!(follower_err.message(), "leader panic");

// Follower's closure was never called
Expand Down
Loading