diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index 1c0bfe74..4bb96a67 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -38,6 +38,7 @@ tick = { workspace = true, features = ["tokio"] } tokio = { workspace = true, features = [ "macros", "rt", + "sync", "time", "rt-multi-thread", ] } diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs index d5a5e718..952d91ea 100644 --- a/crates/uniflight/tests/work.rs +++ b/crates/uniflight/tests/work.rs @@ -10,12 +10,19 @@ use std::time::Duration; use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; +use tokio::sync::Notify; use uniflight::Merger; fn unreachable_future() -> std::future::Pending { std::future::pending() } +/// Waits for a [`Notify`] signal with a generous timeout, panicking with `msg` +/// if the signal is not received. Prevents test hangs if synchronization breaks. +async fn await_notify(notify: &Notify, msg: &str) { + tokio::time::timeout(Duration::from_secs(5), notify.notified()).await.expect(msg); +} + #[cfg_attr(miri, ignore)] #[tokio::test] async fn direct_call() { @@ -152,46 +159,48 @@ async fn cancel() { #[tokio::test] async fn leader_panic_returns_error_to_all() { let group: Arc> = Arc::new(Merger::new()); - - // First task will panic (caught by catch_unwind) - let group_clone = Arc::clone(&group); - let leader_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - tokio::time::sleep(Duration::from_millis(50)).await; - panic!("leader panicked"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await + let leader_registered = Arc::new(Notify::new()); + let follower_registered = Arc::new(Notify::new()); + + // Leader: registers cell, signals readiness, waits for follower, then panics. + let leader_handle = tokio::spawn({ + let group = Arc::clone(&group); + let leader_registered = Arc::clone(&leader_registered); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async move { + // Wait until follower is actively waiting on our cell before panicking + await_notify(&follower_registered, "follower should register before timeout").await; + panic!("leader panicked") + }); + leader_registered.notify_one(); + fut.await + } }); - // Give time for the spawned task to register and start - tokio::time::sleep(Duration::from_millis(10)).await; + await_notify(&leader_registered, "leader should register before timeout").await; - // Second task joins as a follower - let group_clone = Arc::clone(&group); - let follower_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { + // Follower: finds leader's cell, signals readiness, then awaits the result. + let follower_handle = tokio::spawn({ + let group = Arc::clone(&group); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async { // This should never run - we're a follower "follower result".to_string() - }) - .await + }); + // Cell is joined. Signal the leader to proceed. + follower_registered.notify_one(); + fut.await + } }); // Leader gets LeaderPanicked error (panic is caught, not propagated) - let leader_result = leader_handle.await.expect("task should not panic - panic is caught"); - let Err(leader_err) = leader_result else { - panic!("expected Err, got Ok"); - }; + let leader_err = leader_handle.await.expect("task should not panic - panic is caught").unwrap_err(); assert_eq!(leader_err.message(), "leader panicked"); // Follower also gets LeaderPanicked error with same message - let follower_result = follower_handle.await.expect("follower task should not panic"); - let Err(follower_err) = follower_result else { - panic!("expected Err, got Ok"); - }; + let follower_err = follower_handle.await.expect("follower task should not panic").unwrap_err(); assert_eq!(follower_err.message(), "leader panicked"); } @@ -277,13 +286,7 @@ async fn clone_shares_state() { async fn leader_panicked_error_traits() { // Create an error by triggering a panic let group: Merger = Merger::new(); - let result = group - .execute("key", || async { - panic!("test message"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await; + let result = group.execute("key", || async { panic!("test message") }).await; let Err(error) = result else { panic!("expected Err"); }; @@ -318,13 +321,7 @@ async fn retry_after_panic_succeeds() { let group: Merger = Merger::new(); // First call panics - let result = group - .execute("key", || async { - panic!("intentional panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await; + let result = group.execute("key", || async { panic!("intentional panic") }).await; let Err(err) = result else { panic!("expected Err"); }; @@ -357,9 +354,7 @@ async fn mixed_panic_and_success() { // Start multiple keys concurrently - some panic, some succeed let panic_fut = group.execute("panic_key", || async { tokio::time::sleep(Duration::from_millis(10)).await; - panic!("intentional panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() + panic!("intentional panic") }); let success_fut = group.execute("success_key", || async { @@ -384,45 +379,45 @@ async fn mixed_panic_and_success() { async fn follower_closure_not_called_on_panic() { let group: Arc> = Arc::new(Merger::new()); let follower_called = Arc::new(AtomicUsize::new(0)); - - // Leader will panic - let group_clone = Arc::clone(&group); - let leader_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - tokio::time::sleep(Duration::from_millis(50)).await; - panic!("leader panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await + let leader_registered = Arc::new(Notify::new()); + let follower_registered = Arc::new(Notify::new()); + + // Leader: registers cell, signals readiness, waits for follower, then panics. + let leader_handle = tokio::spawn({ + let group = Arc::clone(&group); + let leader_registered = Arc::clone(&leader_registered); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async move { + await_notify(&follower_registered, "follower should register before timeout").await; + panic!("leader panic") + }); + leader_registered.notify_one(); + fut.await + } }); - // Give leader time to start - tokio::time::sleep(Duration::from_millis(10)).await; + await_notify(&leader_registered, "leader should register before timeout").await; - // Follower joins - its closure should NOT be called - let group_clone = Arc::clone(&group); - let follower_called_clone = Arc::clone(&follower_called); - let follower_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - follower_called_clone.fetch_add(1, Acquire); + // Follower: finds leader's cell, signals readiness, then awaits. + let follower_handle = tokio::spawn({ + let group = Arc::clone(&group); + let follower_called = Arc::clone(&follower_called); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async { + follower_called.fetch_add(1, AcqRel); "follower result".to_string() - }) - .await + }); + follower_registered.notify_one(); + fut.await + } }); - let (leader_result, follower_result) = tokio::join!(leader_handle, follower_handle); - - let Err(leader_err) = leader_result.expect("task join") else { - panic!("expected Err"); - }; + let leader_err = leader_handle.await.expect("task join").unwrap_err(); assert_eq!(leader_err.message(), "leader panic"); - let Err(follower_err) = follower_result.expect("task join") else { - panic!("expected Err"); - }; + let follower_err = follower_handle.await.expect("task join").unwrap_err(); assert_eq!(follower_err.message(), "leader panic"); // Follower's closure was never called