From e09e838a5bce459d072dddc6aa6bd42f188679a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 13 May 2026 18:43:21 +0200 Subject: [PATCH 01/10] feat: Only wait for the start of the draining period in `Endpoint::wait_idle` Instead of waiting for draining to have finished. This allows dropping the `Endpoint` once all connections became inactive and there is no need to wait for all connections to have drained. --- noq-proto/src/connection/mod.rs | 4 ++++ noq-proto/src/endpoint.rs | 3 +++ noq-proto/src/shared.rs | 7 ++++++ noq/src/endpoint.rs | 39 +++++++++++++++++++++++++++++++-- noq/src/tests.rs | 8 ++++--- 5 files changed, 56 insertions(+), 5 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index c708aa71f..95b1a582b 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -4381,6 +4381,7 @@ impl Connection { } ConnectionError::VersionMismatch => { self.state.move_to_draining(Some(conn_err)); + self.endpoint_events.push_back(EndpointEventInner::Draining); } ConnectionError::LocallyClosed => { unreachable!("LocallyClosed isn't generated by packet processing"); @@ -4498,6 +4499,7 @@ impl Connection { if let Frame::Close(_error) = frame { self.state.move_to_draining(None); + self.endpoint_events.push_back(EndpointEventInner::Draining); break; } } @@ -4827,6 +4829,7 @@ impl Connection { } Frame::Close(reason) => { self.state.move_to_draining(Some(reason.into())); + self.endpoint_events.push_back(EndpointEventInner::Draining); return Ok(()); } _ => { @@ -5578,6 +5581,7 @@ impl Connection { if let Some(reason) = close { self.state.move_to_draining(Some(reason.into())); + self.endpoint_events.push_back(EndpointEventInner::Draining); self.connection_close_pending = true; } diff --git a/noq-proto/src/endpoint.rs b/noq-proto/src/endpoint.rs index 17c7c9555..bb885d5b6 100644 --- a/noq-proto/src/endpoint.rs +++ b/noq-proto/src/endpoint.rs @@ -129,6 +129,9 @@ impl Endpoint { } } } + Draining => { + // Nothing to do. + } Drained => { if let Some(conn) = self.connections.try_remove(ch.0) { self.index.remove(&conn); diff --git a/noq-proto/src/shared.rs b/noq-proto/src/shared.rs index 4097af02d..703ee9cf7 100644 --- a/noq-proto/src/shared.rs +++ b/noq-proto/src/shared.rs @@ -48,10 +48,17 @@ impl EndpointEvent { pub fn is_drained(&self) -> bool { self.0 == EndpointEventInner::Drained } + + /// Whether this is the event is the event indicating the start of the draining period. + pub fn is_draining(&self) -> bool { + self.0 == EndpointEventInner::Draining + } } #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) enum EndpointEventInner { + /// The connection started draining + Draining, /// The connection has been drained Drained, /// The connection has a new active reset token diff --git a/noq/src/endpoint.rs b/noq/src/endpoint.rs index d4b20d63c..09fec3c9f 100644 --- a/noq/src/endpoint.rs +++ b/noq/src/endpoint.rs @@ -376,7 +376,7 @@ impl Endpoint { /// rejected. Consider calling [`close()`] if that is desired. /// /// [`close()`]: Endpoint::close - pub async fn wait_idle(&self) { + pub async fn wait_drained(&self) { loop { { let endpoint = &mut *self.inner.state.lock().unwrap(); @@ -389,6 +389,21 @@ impl Endpoint { .await; } } + + /// TODO(matheus23): docs + pub async fn wait_idle(&self) { + loop { + { + let endpoint = &mut *self.inner.state.lock().unwrap(); + if endpoint.recv_state.connections.active_connections == 0 { + break; + } + // Construct future while lock is held to avoid race + self.inner.shared.draining.notified() + } + .await; + } + } } /// Statistics on [Endpoint] activity @@ -467,6 +482,7 @@ impl Drop for EndpointDriver { // Drop all outgoing channels, signaling the termination of the endpoint to the associated // connections. endpoint.recv_state.connections.senders.clear(); + endpoint.recv_state.connections.active_connections = 0; } } @@ -551,6 +567,7 @@ pub(crate) struct State { #[derive(Debug)] pub(crate) struct Shared { incoming: Notify, + draining: Notify, idle: Notify, /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: AtomicUsize, @@ -602,7 +619,17 @@ impl State { } }; - if event.is_drained() { + if event.is_draining() { + self.recv_state.connections.active_connections -= 1; + tracing::error!( + active_connections = self.recv_state.connections.active_connections, + "active_connections -= 1" + ); + if self.recv_state.connections.active_connections == 0 { + tracing::error!("active_connections == 0, draining"); + shared.draining.notify_waiters(); + } + } else if event.is_drained() { self.recv_state.connections.senders.remove(&ch); if self.recv_state.connections.is_empty() { shared.idle.notify_waiters(); @@ -700,6 +727,8 @@ struct ConnectionSet { sender: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>, /// Set if the endpoint has been manually closed close: Option<(VarInt, Bytes)>, + /// Counter for all active (non-draining/drained) connections. + active_connections: u64, } impl ConnectionSet { @@ -719,6 +748,10 @@ impl ConnectionSet { .unwrap(); } self.senders.insert(handle, send); + if self.close.is_none() { + self.active_connections += 1; + tracing::error!(self.active_connections, "active_connections += 1"); + } Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime) } @@ -789,6 +822,7 @@ impl EndpointRef { Self(Arc::new(EndpointInner { shared: Shared { incoming: Notify::new(), + draining: Notify::new(), idle: Notify::new(), ref_count: AtomicUsize::new(0), }, @@ -864,6 +898,7 @@ impl RecvState { senders: FxHashMap::default(), sender, close: None, + active_connections: 0, }, incoming: VecDeque::new(), recv_buf: recv_buf.into(), diff --git a/noq/src/tests.rs b/noq/src/tests.rs index c6629e90f..1fedf1e24 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -23,7 +23,9 @@ use std::{ use crate::runtime::TokioRuntime; use crate::{Duration, Instant}; use bytes::Bytes; -use proto::{ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig}; +use proto::{ + ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig, +}; use rand::{Rng, SeedableRng, rngs::StdRng}; use rustls::{ RootCertStore, @@ -1207,7 +1209,7 @@ async fn weak_connection_handle() { assert!(weak.is_alive()); drop(conn); // wait to ensure the connection is fully cleaned up - endpoint2.wait_idle().await; + endpoint2.wait_drained().await; assert!(!weak.is_alive()); }); let client_task = tokio::spawn(async move { @@ -1343,7 +1345,7 @@ async fn path_clone_stats_after_abandon() { // After dropping the path, upgrading fails after the endpoint cleared the connection. drop(path_clone); - client.wait_idle().await; + client.wait_drained().await; assert!(weak_path.upgrade().is_none()); } .instrument(info_span!("client")); From 188306b6a7260d8233bdb81b75592bf53b2f9933 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 13 May 2026 19:14:09 +0200 Subject: [PATCH 02/10] Try sth --- noq-proto/src/connection/mod.rs | 15 ++++++++++++--- noq-proto/src/connection/state.rs | 29 +++++++++++++++++++---------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 95b1a582b..6c00649aa 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -2386,7 +2386,10 @@ impl Connection { match timer { Timer::Conn(timer) => match timer { ConnTimer::Close => { - self.state.move_to_drained(None); + let was_draining = self.state.move_to_drained(None); + if !was_draining { + self.endpoint_events.push_back(EndpointEventInner::Draining); + } // move_to_drained checks that we weren't in drained before. // Adding events to endpoint_events is only legal if `Drained` was never queued before. self.endpoint_events.push_back(EndpointEventInner::Drained); @@ -4370,7 +4373,10 @@ impl Connection { code: TransportErrorCode::AEAD_LIMIT_REACHED, .. }) => { - self.state.move_to_drained(Some(conn_err)); + let was_draining = self.state.move_to_drained(Some(conn_err)); + if !was_draining { + self.endpoint_events.push_back(EndpointEventInner::Draining); + } } ConnectionError::TimedOut => { unreachable!("timeouts aren't generated by packet processing"); @@ -6950,7 +6956,10 @@ impl Connection { /// Terminate the connection instantly, without sending a close packet fn kill(&mut self, reason: ConnectionError) { self.close_common(); - self.state.move_to_drained(Some(reason)); + let was_draining = self.state.move_to_drained(Some(reason)); + if !was_draining { + self.endpoint_events.push_back(EndpointEventInner::Draining); + } // move_to_drained checks that we were never in drained before, so we // never sent a `Drained` event before (it's illegal to send more events after drained). self.endpoint_events.push_back(EndpointEventInner::Drained); diff --git a/noq-proto/src/connection/state.rs b/noq-proto/src/connection/state.rs index 3e6ce2975..c5dc18e56 100644 --- a/noq-proto/src/connection/state.rs +++ b/noq-proto/src/connection/state.rs @@ -2,7 +2,9 @@ use bytes::Bytes; use tracing::trace; use crate::frame::Close; -use crate::{ApplicationClose, ConnectionClose, ConnectionError, TransportError, TransportErrorCode}; +use crate::{ + ApplicationClose, ConnectionClose, ConnectionError, TransportError, TransportErrorCode, +}; #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] @@ -66,14 +68,20 @@ impl State { /// Moves to the drained state. /// /// Panics if the state was already drained. - pub(super) fn move_to_drained(&mut self, error: Option) { - let (error, is_local) = if let Some(error) = error { - (Some(error), false) + /// + /// Returns whether we were in the draining state before. + pub(super) fn move_to_drained(&mut self, error: Option) -> bool { + let (error, is_local, was_draining) = if let Some(error) = error { + ( + Some(error), + false, + matches!(self.inner, InnerState::Draining { .. }), + ) } else { - let error = match &mut self.inner { - InnerState::Draining { error, .. } => error.take(), + let (error, was_draining) = match &mut self.inner { + InnerState::Draining { error, .. } => (error.take(), true), InnerState::Drained { .. } => panic!("invalid state transition drained -> drained"), - InnerState::Closed { error_read, .. } if *error_read => None, + InnerState::Closed { error_read, .. } if *error_read => (None, false), InnerState::Closed { remote_reason, .. } => { let error = match remote_reason.clone().into() { ConnectionError::ConnectionClosed(close) => { @@ -89,14 +97,15 @@ impl State { } e => e, }; - Some(error) + (Some(error), false) } - InnerState::Handshake(_) | InnerState::Established => None, + InnerState::Handshake(_) | InnerState::Established => (None, false), }; - (error, self.is_local_close()) + (error, self.is_local_close(), was_draining) }; self.inner = InnerState::Drained { error, is_local }; trace!("connection state: drained"); + was_draining } /// Moves to a draining state. From 6ad2fa983df1409a785cb47ec0912390f690e0af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 18 May 2026 15:29:22 +0200 Subject: [PATCH 03/10] Add documentation and test for the draining state. Also: - Remove `drained_connections`, it was unused - Add `draining_connections`, so we can test this property of connections - Add `without_time` to the logging setup. We never need timestamps - Add a log line for printing the frames that are processed in the "closed" state --- noq-proto/src/connection/mod.rs | 2 ++ noq-proto/src/tests/mod.rs | 51 +++++++++++++++++++++++++++++++++ noq-proto/src/tests/util.rs | 12 ++++---- noq/src/endpoint.rs | 32 +++++++++++++++------ 4 files changed, 82 insertions(+), 15 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 6c00649aa..912948d9f 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -4498,6 +4498,8 @@ impl Connection { continue; }; + debug!(?frame, "processing frame in closed state"); + self.path_stats .for_path(path_id) .frame_rx diff --git a/noq-proto/src/tests/mod.rs b/noq-proto/src/tests/mod.rs index 2a088b048..b9bdd3bde 100644 --- a/noq-proto/src/tests/mod.rs +++ b/noq-proto/src/tests/mod.rs @@ -4320,3 +4320,54 @@ fn regression_close_without_connection_event() { Some(Event::ConnectionLost { .. }) ); } + +/// Ensures that the draining delay for the server is exactly 0.5 RTT and 1 RTT for the client. +/// +/// The draining delay is the time between the connection being closed and the connection +/// entering the "draining" state (either on the same or on the other side). +/// +/// We expect the side that *receives* the CONNECTION_CLOSE to immediately enter the draining +/// state. However in absolute terms, it'll be delayed by 0.5 RTT (exactly the latency) compared +/// to when `connection.close()` was called. +/// On the side that called `connection.close()` we first enter the "closed" state, and only +/// enter the "draining" state once we *receive* a "reciprocal" CONNECTION_CLOSE from the other +/// side. In the normal case this will be exactly 1 RTT after calling `connection.close()` to +/// account for the latency of CONNECTION_CLOSE going one way and then coming back. +/// +/// The "draining" state from noq-proto is observed by noq to enable `wait_idle` waiting the +/// ideal amount of time before allowing us to close the socket. +#[test] +fn timely_graceful_close() { + let mut pair = Pair::default(); + pair.latency = Duration::from_millis(100); + let (client_ch, server_ch) = pair.connect(); + + let _guard = subscribe(); + let start = pair.time; + let now = pair.time; + pair.client_conn_mut(client_ch) + .close(now, 0u32.into(), Bytes::from_static(b"done!")); + + assert!(!pair.client.draining_connections.contains(&client_ch)); + assert!(!pair.server.draining_connections.contains(&server_ch)); + + pair.drive_client(); + pair.advance_time(); + let now = pair.time; + pair.drive_server(); + + assert!(pair.server.draining_connections.contains(&server_ch)); + let server_draining_delay = now.saturating_duration_since(start); + info!(?server_draining_delay); + assert_eq!(server_draining_delay, Duration::from_millis(100)); + + // already drove server + pair.advance_time(); + let now = pair.time; + pair.drive_client(); + + assert!(pair.client.draining_connections.contains(&client_ch)); + let client_draining_delay = now.saturating_duration_since(start); + info!(?client_draining_delay); + assert_eq!(client_draining_delay, Duration::from_millis(200)); +} diff --git a/noq-proto/src/tests/util.rs b/noq-proto/src/tests/util.rs index b36514e64..b049788b6 100644 --- a/noq-proto/src/tests/util.rs +++ b/noq-proto/src/tests/util.rs @@ -829,7 +829,7 @@ pub(super) struct TestEndpoint { pub(super) inbound: VecDeque, pub(super) accepted: Option>, pub(super) connections: HashMap, - drained_connections: HashSet, + pub(super) draining_connections: HashSet, conn_events: HashMap>, pub(super) captured_packets: Vec>, pub(super) capture_inbound_packets: bool, @@ -872,7 +872,7 @@ impl TestEndpoint { inbound: VecDeque::new(), accepted: None, connections: HashMap::default(), - drained_connections: HashSet::default(), + draining_connections: HashSet::default(), conn_events: HashMap::default(), captured_packets: Vec::new(), capture_inbound_packets: false, @@ -975,8 +975,8 @@ impl TestEndpoint { } for (ch, event) in endpoint_events { - if event.is_drained() { - self.drained_connections.insert(ch); + if event.is_draining() { + self.draining_connections.insert(ch); } if let Some(event) = self.handle_event(ch, event) && let Some(conn) = self.connections.get_mut(&ch) @@ -1084,11 +1084,9 @@ pub(crate) fn subscribe() -> tracing::subscriber::DefaultGuard { .with_default_directive(tracing::Level::TRACE.into()) .from_env_lossy(), ) + .without_time() .with_line_number(true) .with_writer(|| TestWriter); - // tracing uses std::time to trace time, which panics in wasm. - #[cfg(all(target_family = "wasm", target_os = "unknown"))] - let builder = builder.without_time(); tracing::subscriber::set_default(builder.finish()) } diff --git a/noq/src/endpoint.rs b/noq/src/endpoint.rs index 09fec3c9f..c44e1c582 100644 --- a/noq/src/endpoint.rs +++ b/noq/src/endpoint.rs @@ -366,16 +366,16 @@ impl Endpoint { } } - /// Wait for all connections on the endpoint to be cleanly shut down + /// Waits for all connections on the endpoint to be cleanly shut down and drained. /// - /// Waiting for this condition before exiting ensures that a good-faith effort is made to notify - /// peers of recent connection closes, whereas exiting immediately could force them to wait out - /// the idle timeout period. + /// This is equivalent to [`wait_idle()`] with additionally waiting for the connections to be + /// drained. Please see its documentation for more information. /// - /// Does not proactively close existing connections or cause incoming connections to be - /// rejected. Consider calling [`close()`] if that is desired. + /// Use `wait_drained()` in favor of `wait_idle()` if you care about waiting for the + /// [`Connection`] structs to be dropped. /// - /// [`close()`]: Endpoint::close + /// [`wait_idle()`]: Self::wait_idle + /// [`Connection`]: crate::Connection pub async fn wait_drained(&self) { loop { { @@ -390,7 +390,23 @@ impl Endpoint { } } - /// TODO(matheus23): docs + /// Waits for all connections on the endpoint to be ready for shutting down. + /// + /// Waiting for this condition before exiting ensures that a good-faith effort is made to notify + /// peers of recent connection closes, whereas exiting immediately could force them to wait out + /// the idle timeout period. + /// + /// Does not proactively close existing connections or cause incoming connections to be + /// rejected. Consider calling [`close()`] if that is desired. + /// + /// Unlike [`wait_drained()`], this doesn't wait for the full draining period, so it can't be + /// used to wait for all now-idle [`Connection`]s to be dropped. + /// + /// See also this section in the QUIC RFC: + /// + /// [`close()`]: Self::close + /// [`wait_drained()`]: Self::wait_drained + /// [`Connection`]: crate::Connection pub async fn wait_idle(&self) { loop { { From 259ef2bc1d97c4e5b4d855b90726c81706463fad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 18 May 2026 15:39:10 +0200 Subject: [PATCH 04/10] `cargo make format` --- noq-proto/src/connection/state.rs | 4 +--- noq/src/tests.rs | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/noq-proto/src/connection/state.rs b/noq-proto/src/connection/state.rs index c5dc18e56..06b593711 100644 --- a/noq-proto/src/connection/state.rs +++ b/noq-proto/src/connection/state.rs @@ -2,9 +2,7 @@ use bytes::Bytes; use tracing::trace; use crate::frame::Close; -use crate::{ - ApplicationClose, ConnectionClose, ConnectionError, TransportError, TransportErrorCode, -}; +use crate::{ApplicationClose, ConnectionClose, ConnectionError, TransportError, TransportErrorCode}; #[allow(unreachable_pub)] // fuzzing only #[derive(Debug, Clone)] diff --git a/noq/src/tests.rs b/noq/src/tests.rs index 1fedf1e24..730e9dc05 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -23,9 +23,7 @@ use std::{ use crate::runtime::TokioRuntime; use crate::{Duration, Instant}; use bytes::Bytes; -use proto::{ - ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig, -}; +use proto::{ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig}; use rand::{Rng, SeedableRng, rngs::StdRng}; use rustls::{ RootCertStore, From 466f1ea55e460f7fb23aedcd826087c90d00a404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 08:38:32 +0200 Subject: [PATCH 05/10] cr: Rewrite and improve `timely_graceful_close` using `ConnPair` --- noq-proto/src/tests/mod.rs | 28 ++++++++++++++++------------ noq-proto/src/tests/util.rs | 7 +++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/noq-proto/src/tests/mod.rs b/noq-proto/src/tests/mod.rs index b9bdd3bde..76c1d82e9 100644 --- a/noq-proto/src/tests/mod.rs +++ b/noq-proto/src/tests/mod.rs @@ -4338,36 +4338,40 @@ fn regression_close_without_connection_event() { /// ideal amount of time before allowing us to close the socket. #[test] fn timely_graceful_close() { - let mut pair = Pair::default(); - pair.latency = Duration::from_millis(100); - let (client_ch, server_ch) = pair.connect(); + const ONE_WAY_LATENCY: Duration = Duration::from_millis(100); let _guard = subscribe(); + let mut pair = ConnPair::default(); + pair.latency = ONE_WAY_LATENCY; + let start = pair.time; - let now = pair.time; - pair.client_conn_mut(client_ch) - .close(now, 0u32.into(), Bytes::from_static(b"done!")); + pair.close(Client, 0u32.into(), b"done!"); - assert!(!pair.client.draining_connections.contains(&client_ch)); - assert!(!pair.server.draining_connections.contains(&server_ch)); + assert!(!pair.is_draining(Client)); + assert!(!pair.is_draining(Server)); + // The client now sends CONNECTION_CLOSE to the server and it processes it. + // When the server receives CONNECTION_CLOSE, it responds with one of its own + // and enters the draining state. pair.drive_client(); pair.advance_time(); let now = pair.time; pair.drive_server(); - assert!(pair.server.draining_connections.contains(&server_ch)); + assert!(pair.is_draining(Server)); let server_draining_delay = now.saturating_duration_since(start); info!(?server_draining_delay); - assert_eq!(server_draining_delay, Duration::from_millis(100)); + assert_eq!(server_draining_delay, ONE_WAY_LATENCY); + // The server has now sent a CONNECTION_CLOSE back in response and the client processes it. + // The client then enters the draining state once it processed the response. // already drove server pair.advance_time(); let now = pair.time; pair.drive_client(); - assert!(pair.client.draining_connections.contains(&client_ch)); + assert!(pair.is_draining(Client)); let client_draining_delay = now.saturating_duration_since(start); info!(?client_draining_delay); - assert_eq!(client_draining_delay, Duration::from_millis(200)); + assert_eq!(client_draining_delay, ONE_WAY_LATENCY * 2); } diff --git a/noq-proto/src/tests/util.rs b/noq-proto/src/tests/util.rs index b049788b6..ac1a2592d 100644 --- a/noq-proto/src/tests/util.rs +++ b/noq-proto/src/tests/util.rs @@ -812,6 +812,13 @@ impl ConnPair { let now = self.pair.time; self.conn_mut(side).handle_network_change(hint, now); } + + pub(super) fn is_draining(&self, side: Side) -> bool { + match side { + Client => self.client.draining_connections.contains(&self.client_ch), + Server => self.server.draining_connections.contains(&self.server_ch), + } + } } impl Default for Pair { From 4ed7ef6430d6c4c37f6b2c40049a0b64c1b0b6b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 08:46:59 +0200 Subject: [PATCH 06/10] cr: Remove debug logs & add more explanatory comments --- noq/src/endpoint.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/noq/src/endpoint.rs b/noq/src/endpoint.rs index c44e1c582..8cde0847e 100644 --- a/noq/src/endpoint.rs +++ b/noq/src/endpoint.rs @@ -582,8 +582,17 @@ pub(crate) struct State { #[derive(Debug)] pub(crate) struct Shared { + /// Notifies subscribers of new incoming connections. + /// + /// This enables the `Endpoint::accept` API. incoming: Notify, + /// Notifies subscribers when *all* connections have entered the draining state. + /// + /// This powers the `Endpoint::wait_idle` API. draining: Notify, + /// Notifies subscribesr when *all* connections have been dropped. + /// + /// This powers the `Endpoint::wait_drained` API. idle: Notify, /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: AtomicUsize, @@ -637,12 +646,7 @@ impl State { if event.is_draining() { self.recv_state.connections.active_connections -= 1; - tracing::error!( - active_connections = self.recv_state.connections.active_connections, - "active_connections -= 1" - ); if self.recv_state.connections.active_connections == 0 { - tracing::error!("active_connections == 0, draining"); shared.draining.notify_waiters(); } } else if event.is_drained() { @@ -744,6 +748,14 @@ struct ConnectionSet { /// Set if the endpoint has been manually closed close: Option<(VarInt, Bytes)>, /// Counter for all active (non-draining/drained) connections. + /// + /// This is directly related to the QUIC connection states "Initial", "Handshake", + /// "Established", "Closed", "Draining" and "Drained" (see also `proto/src/connection/state.rs`). + /// + /// Any connection state that is not "Draining" or "Drained" is considered active. + /// + /// This counter is updated when new connections are added ([`ConnectionSet::insert`]) and when + /// a connection informs us about entering the draining state ([`State::handle_events`]). active_connections: u64, } @@ -766,7 +778,6 @@ impl ConnectionSet { self.senders.insert(handle, send); if self.close.is_none() { self.active_connections += 1; - tracing::error!(self.active_connections, "active_connections += 1"); } Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime) } From 9780772e0ab3108777c11da5ac2b97ab87952880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 08:56:32 +0200 Subject: [PATCH 07/10] test: Ensure latency is set before connecting --- noq-proto/src/tests/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/noq-proto/src/tests/mod.rs b/noq-proto/src/tests/mod.rs index 76c1d82e9..a4a2cb7ca 100644 --- a/noq-proto/src/tests/mod.rs +++ b/noq-proto/src/tests/mod.rs @@ -4341,8 +4341,9 @@ fn timely_graceful_close() { const ONE_WAY_LATENCY: Duration = Duration::from_millis(100); let _guard = subscribe(); - let mut pair = ConnPair::default(); + let mut pair = Pair::default(); pair.latency = ONE_WAY_LATENCY; + let mut pair = ConnPair::connect_with(pair, client_config()); let start = pair.time; pair.close(Client, 0u32.into(), b"done!"); From c7a5f7262f7f92954c3e953471a493afb8fa3721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 09:06:00 +0200 Subject: [PATCH 08/10] make clippy happy --- noq-proto/src/tests/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noq-proto/src/tests/mod.rs b/noq-proto/src/tests/mod.rs index a4a2cb7ca..5455cda67 100644 --- a/noq-proto/src/tests/mod.rs +++ b/noq-proto/src/tests/mod.rs @@ -4346,7 +4346,7 @@ fn timely_graceful_close() { let mut pair = ConnPair::connect_with(pair, client_config()); let start = pair.time; - pair.close(Client, 0u32.into(), b"done!"); + pair.close(Client, 0, b"done!"); assert!(!pair.is_draining(Client)); assert!(!pair.is_draining(Server)); From 9d809b76343355f55c39d2c08beda934a1cb819a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 11:54:05 +0200 Subject: [PATCH 09/10] Avoid changing the behavior of `wait_idle` and introduce `wait_all_draining` instead. --- bench/src/bin/bulk.rs | 2 +- noq/examples/client.rs | 2 +- noq/examples/connection.rs | 2 +- noq/examples/insecure_connection.rs | 2 +- noq/examples/single_socket.rs | 2 +- noq/src/endpoint.rs | 22 +++++++++++----------- noq/src/tests.rs | 16 ++++++++-------- noq/tests/post_quantum.rs | 2 +- perf/src/client.rs | 2 +- 9 files changed, 26 insertions(+), 26 deletions(-) diff --git a/bench/src/bin/bulk.rs b/bench/src/bin/bulk.rs index 0c268990b..605fd6a68 100644 --- a/bench/src/bin/bulk.rs +++ b/bench/src/bin/bulk.rs @@ -162,7 +162,7 @@ async fn client( // to `Arc`ing them connection.close(0u32.into(), b"Benchmark done"); - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; if opt.stats { println!("\nClient connection stats:\n{:#?}", connection.stats()); diff --git a/noq/examples/client.rs b/noq/examples/client.rs index 2b3bc79c7..0afbca3c8 100644 --- a/noq/examples/client.rs +++ b/noq/examples/client.rs @@ -162,7 +162,7 @@ async fn run(options: Opt) -> Result<()> { conn.close(0u32.into(), b"done"); // Give the server a fair chance to receive the close packet - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; Ok(()) } diff --git a/noq/examples/connection.rs b/noq/examples/connection.rs index 958ca0a8c..ae9a2f07f 100644 --- a/noq/examples/connection.rs +++ b/noq/examples/connection.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box> { let _ = connection.accept_uni().await; // Make sure the server has a chance to clean up - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; Ok(()) } diff --git a/noq/examples/insecure_connection.rs b/noq/examples/insecure_connection.rs index 359a84294..d7fdc0a25 100644 --- a/noq/examples/insecure_connection.rs +++ b/noq/examples/insecure_connection.rs @@ -66,7 +66,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box Result<(), Box> { ); // Make sure the server has a chance to clean up - client.wait_idle().await; + client.wait_all_draining().await; Ok(()) } diff --git a/noq/src/endpoint.rs b/noq/src/endpoint.rs index 8cde0847e..4caae8ea0 100644 --- a/noq/src/endpoint.rs +++ b/noq/src/endpoint.rs @@ -368,15 +368,15 @@ impl Endpoint { /// Waits for all connections on the endpoint to be cleanly shut down and drained. /// - /// This is equivalent to [`wait_idle()`] with additionally waiting for the connections to be + /// This is equivalent to [`wait_all_draining()`] with additionally waiting for the connections to be /// drained. Please see its documentation for more information. /// - /// Use `wait_drained()` in favor of `wait_idle()` if you care about waiting for the + /// Use `wait_idle()` in favor of `wait_all_draining()` if you care about waiting for the /// [`Connection`] structs to be dropped. /// - /// [`wait_idle()`]: Self::wait_idle + /// [`wait_all_draining()`]: Self::wait_all_draining /// [`Connection`]: crate::Connection - pub async fn wait_drained(&self) { + pub async fn wait_idle(&self) { loop { { let endpoint = &mut *self.inner.state.lock().unwrap(); @@ -399,15 +399,15 @@ impl Endpoint { /// Does not proactively close existing connections or cause incoming connections to be /// rejected. Consider calling [`close()`] if that is desired. /// - /// Unlike [`wait_drained()`], this doesn't wait for the full draining period, so it can't be + /// Unlike [`wait_idle()`], this doesn't wait for the full draining period, so it can't be /// used to wait for all now-idle [`Connection`]s to be dropped. /// /// See also this section in the QUIC RFC: /// /// [`close()`]: Self::close - /// [`wait_drained()`]: Self::wait_drained + /// [`wait_idle()`]: Self::wait_idle /// [`Connection`]: crate::Connection - pub async fn wait_idle(&self) { + pub async fn wait_all_draining(&self) { loop { { let endpoint = &mut *self.inner.state.lock().unwrap(); @@ -415,7 +415,7 @@ impl Endpoint { break; } // Construct future while lock is held to avoid race - self.inner.shared.draining.notified() + self.inner.shared.all_draining.notified() } .await; } @@ -589,7 +589,7 @@ pub(crate) struct Shared { /// Notifies subscribers when *all* connections have entered the draining state. /// /// This powers the `Endpoint::wait_idle` API. - draining: Notify, + all_draining: Notify, /// Notifies subscribesr when *all* connections have been dropped. /// /// This powers the `Endpoint::wait_drained` API. @@ -647,7 +647,7 @@ impl State { if event.is_draining() { self.recv_state.connections.active_connections -= 1; if self.recv_state.connections.active_connections == 0 { - shared.draining.notify_waiters(); + shared.all_draining.notify_waiters(); } } else if event.is_drained() { self.recv_state.connections.senders.remove(&ch); @@ -849,7 +849,7 @@ impl EndpointRef { Self(Arc::new(EndpointInner { shared: Shared { incoming: Notify::new(), - draining: Notify::new(), + all_draining: Notify::new(), idle: Notify::new(), ref_count: AtomicUsize::new(0), }, diff --git a/noq/src/tests.rs b/noq/src/tests.rs index 730e9dc05..bb600819f 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -408,7 +408,7 @@ async fn zero_rtt() { drop((stream, connection)); - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; } #[test] @@ -583,7 +583,7 @@ fn run_echo(args: EchoArgs) { tokio::spawn(echo(stream)); } }); - server.wait_idle().await; + server.wait_all_draining().await; }); info!("connecting from {} to {}", args.client_addr, server_addr); @@ -614,7 +614,7 @@ fn run_echo(args: EchoArgs) { assert_eq!(data[..], msg[..], "Data mismatch"); } new_conn.close(0u32.into(), b"done"); - client.wait_idle().await; + client.wait_all_draining().await; } .instrument(error_span!("client")), ); @@ -1207,7 +1207,7 @@ async fn weak_connection_handle() { assert!(weak.is_alive()); drop(conn); // wait to ensure the connection is fully cleaned up - endpoint2.wait_drained().await; + endpoint2.wait_idle().await; assert!(!weak.is_alive()); }); let client_task = tokio::spawn(async move { @@ -1257,7 +1257,7 @@ async fn dropped_connection_cleans_up() { }, async { endpoint.accept().await.unwrap().await.unwrap() } ); - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; } /// Test that accessing stats from `Path` works as expected. @@ -1343,7 +1343,7 @@ async fn path_clone_stats_after_abandon() { // After dropping the path, upgrading fails after the endpoint cleared the connection. drop(path_clone); - client.wait_drained().await; + client.wait_idle().await; assert!(weak_path.upgrade().is_none()); } .instrument(info_span!("client")); @@ -1477,7 +1477,7 @@ async fn close_path() -> TestResult { test_done_tx.send(()).expect("not dropped"); - server.wait_idle().await; + server.wait_all_draining().await; TestResult::Ok(()) } @@ -1523,7 +1523,7 @@ async fn close_path() -> TestResult { test_done_rx.await.expect("not dropped"); client.close(0u8.into(), b"test finished"); - client.wait_idle().await; + client.wait_all_draining().await; TestResult::Ok(()) } diff --git a/noq/tests/post_quantum.rs b/noq/tests/post_quantum.rs index 21896e5d6..cfa608f62 100644 --- a/noq/tests/post_quantum.rs +++ b/noq/tests/post_quantum.rs @@ -79,7 +79,7 @@ async fn check_post_quantum_key_exchange(min_mtu: u16) { let _ = connection.accept_uni().await; // Make sure the server has a chance to clean up - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; jh.await.unwrap(); } diff --git a/perf/src/client.rs b/perf/src/client.rs index 92c1318cf..3835c44a2 100644 --- a/perf/src/client.rs +++ b/perf/src/client.rs @@ -201,7 +201,7 @@ pub async fn run(opt: Opt) -> Result<()> { } } - endpoint.wait_idle().await; + endpoint.wait_all_draining().await; #[cfg(feature = "json-output")] if let Some(path) = opt.json { From e948a24c0ce088f6fa0ddf076f728bc1af4e9a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 19 May 2026 13:44:40 +0200 Subject: [PATCH 10/10] Demote `debug!` to `trace!` --- noq-proto/src/connection/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 912948d9f..892d293b7 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -4498,7 +4498,7 @@ impl Connection { continue; }; - debug!(?frame, "processing frame in closed state"); + trace!(?frame, "processing frame in closed state"); self.path_stats .for_path(path_id)