From e8f297f7b093ced99ef40b89a83874cf4b5cda23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kate=C5=99ina=20Churanov=C3=A1?= Date: Thu, 26 Mar 2026 10:57:07 +0100 Subject: [PATCH] fix(uniflight): eliminate race condition in panic propagation tests Replace tokio::spawn + sleep-based synchronization with a Notify handshake protocol that deterministically coordinates leader and follower tasks. The original tests relied on sleep(10ms) to ensure the follower task was polled before the leader completed. In tokio's single-threaded runtime, the leader could finish and clean up the DashMap entry before the follower task was ever polled, causing the follower to become a new leader and return Ok instead of Err(LeaderPanicked). The fix uses two Notify signals to enforce strict ordering: 1. Leader signals after calling execute() (cell registered) 2. Follower signals after calling execute() (cell joined) 3. Leader waits for follower's signal before panicking This guarantees the follower is actively waiting on the leader's cell when the panic occurs, regardless of whether execute() registers cells eagerly or lazily. Also simplifies panic expressions by using panic!() as the tail expression (returns !) instead of dead code with #[expect(unreachable_code)]. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- crates/uniflight/Cargo.toml | 1 + crates/uniflight/tests/work.rs | 149 ++++++++++++++++----------------- 2 files changed, 73 insertions(+), 77 deletions(-) diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index 1c0bfe743..4bb96a671 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -38,6 +38,7 @@ tick = { workspace = true, features = ["tokio"] } tokio = { workspace = true, features = [ "macros", "rt", + "sync", "time", "rt-multi-thread", ] } diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs index d5a5e718a..952d91eaf 100644 --- a/crates/uniflight/tests/work.rs +++ b/crates/uniflight/tests/work.rs @@ -10,12 +10,19 @@ use std::time::Duration; use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; +use tokio::sync::Notify; use uniflight::Merger; fn unreachable_future() -> std::future::Pending { std::future::pending() } +/// Waits for a [`Notify`] signal with a generous timeout, panicking with `msg` +/// if the signal is not received. Prevents test hangs if synchronization breaks. +async fn await_notify(notify: &Notify, msg: &str) { + tokio::time::timeout(Duration::from_secs(5), notify.notified()).await.expect(msg); +} + #[cfg_attr(miri, ignore)] #[tokio::test] async fn direct_call() { @@ -152,46 +159,48 @@ async fn cancel() { #[tokio::test] async fn leader_panic_returns_error_to_all() { let group: Arc> = Arc::new(Merger::new()); - - // First task will panic (caught by catch_unwind) - let group_clone = Arc::clone(&group); - let leader_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - tokio::time::sleep(Duration::from_millis(50)).await; - panic!("leader panicked"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await + let leader_registered = Arc::new(Notify::new()); + let follower_registered = Arc::new(Notify::new()); + + // Leader: registers cell, signals readiness, waits for follower, then panics. + let leader_handle = tokio::spawn({ + let group = Arc::clone(&group); + let leader_registered = Arc::clone(&leader_registered); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async move { + // Wait until follower is actively waiting on our cell before panicking + await_notify(&follower_registered, "follower should register before timeout").await; + panic!("leader panicked") + }); + leader_registered.notify_one(); + fut.await + } }); - // Give time for the spawned task to register and start - tokio::time::sleep(Duration::from_millis(10)).await; + await_notify(&leader_registered, "leader should register before timeout").await; - // Second task joins as a follower - let group_clone = Arc::clone(&group); - let follower_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { + // Follower: finds leader's cell, signals readiness, then awaits the result. + let follower_handle = tokio::spawn({ + let group = Arc::clone(&group); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async { // This should never run - we're a follower "follower result".to_string() - }) - .await + }); + // Cell is joined. Signal the leader to proceed. + follower_registered.notify_one(); + fut.await + } }); // Leader gets LeaderPanicked error (panic is caught, not propagated) - let leader_result = leader_handle.await.expect("task should not panic - panic is caught"); - let Err(leader_err) = leader_result else { - panic!("expected Err, got Ok"); - }; + let leader_err = leader_handle.await.expect("task should not panic - panic is caught").unwrap_err(); assert_eq!(leader_err.message(), "leader panicked"); // Follower also gets LeaderPanicked error with same message - let follower_result = follower_handle.await.expect("follower task should not panic"); - let Err(follower_err) = follower_result else { - panic!("expected Err, got Ok"); - }; + let follower_err = follower_handle.await.expect("follower task should not panic").unwrap_err(); assert_eq!(follower_err.message(), "leader panicked"); } @@ -277,13 +286,7 @@ async fn clone_shares_state() { async fn leader_panicked_error_traits() { // Create an error by triggering a panic let group: Merger = Merger::new(); - let result = group - .execute("key", || async { - panic!("test message"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await; + let result = group.execute("key", || async { panic!("test message") }).await; let Err(error) = result else { panic!("expected Err"); }; @@ -318,13 +321,7 @@ async fn retry_after_panic_succeeds() { let group: Merger = Merger::new(); // First call panics - let result = group - .execute("key", || async { - panic!("intentional panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await; + let result = group.execute("key", || async { panic!("intentional panic") }).await; let Err(err) = result else { panic!("expected Err"); }; @@ -357,9 +354,7 @@ async fn mixed_panic_and_success() { // Start multiple keys concurrently - some panic, some succeed let panic_fut = group.execute("panic_key", || async { tokio::time::sleep(Duration::from_millis(10)).await; - panic!("intentional panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() + panic!("intentional panic") }); let success_fut = group.execute("success_key", || async { @@ -384,45 +379,45 @@ async fn mixed_panic_and_success() { async fn follower_closure_not_called_on_panic() { let group: Arc> = Arc::new(Merger::new()); let follower_called = Arc::new(AtomicUsize::new(0)); - - // Leader will panic - let group_clone = Arc::clone(&group); - let leader_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - tokio::time::sleep(Duration::from_millis(50)).await; - panic!("leader panic"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await + let leader_registered = Arc::new(Notify::new()); + let follower_registered = Arc::new(Notify::new()); + + // Leader: registers cell, signals readiness, waits for follower, then panics. + let leader_handle = tokio::spawn({ + let group = Arc::clone(&group); + let leader_registered = Arc::clone(&leader_registered); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async move { + await_notify(&follower_registered, "follower should register before timeout").await; + panic!("leader panic") + }); + leader_registered.notify_one(); + fut.await + } }); - // Give leader time to start - tokio::time::sleep(Duration::from_millis(10)).await; + await_notify(&leader_registered, "leader should register before timeout").await; - // Follower joins - its closure should NOT be called - let group_clone = Arc::clone(&group); - let follower_called_clone = Arc::clone(&follower_called); - let follower_handle = tokio::spawn(async move { - group_clone - .execute("key", || async { - follower_called_clone.fetch_add(1, Acquire); + // Follower: finds leader's cell, signals readiness, then awaits. + let follower_handle = tokio::spawn({ + let group = Arc::clone(&group); + let follower_called = Arc::clone(&follower_called); + let follower_registered = Arc::clone(&follower_registered); + async move { + let fut = group.execute("key", || async { + follower_called.fetch_add(1, AcqRel); "follower result".to_string() - }) - .await + }); + follower_registered.notify_one(); + fut.await + } }); - let (leader_result, follower_result) = tokio::join!(leader_handle, follower_handle); - - let Err(leader_err) = leader_result.expect("task join") else { - panic!("expected Err"); - }; + let leader_err = leader_handle.await.expect("task join").unwrap_err(); assert_eq!(leader_err.message(), "leader panic"); - let Err(follower_err) = follower_result.expect("task join") else { - panic!("expected Err"); - }; + let follower_err = follower_handle.await.expect("task join").unwrap_err(); assert_eq!(follower_err.message(), "leader panic"); // Follower's closure was never called