From 068218ca6f721c8b3c1f6789521da4080576860a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 9 Apr 2026 11:23:52 +0100 Subject: [PATCH 1/3] feat(proto): MTU discovery probes should be congestion controlled Previously we sent MTU probes without checking congestion control limits. But at the same time they are accounted as bytes-in-flight, which means you can easily go over the congestion control window if you lose connectivity during MTU probing. Which in turn makes recovering from that congestion slower because more bytes are considered to be in-flight that need to arrive or be declared lost before the window grows enough again. --- noq-proto/src/connection/mod.rs | 80 ++++++++++++++++++------------ noq-proto/src/connection/mtud.rs | 63 +++++++++++++++-------- noq-proto/src/connection/spaces.rs | 19 +++++++ 3 files changed, 109 insertions(+), 53 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index c36751c51..5d4a28ddd 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -1493,7 +1493,7 @@ impl Connection { self.path_data_mut(path_id).update_pacer(now); let congestion_blocked = self.path_congestion_check(space_id, path_id, transmit, &can_send); - if congestion_blocked != PathBlocked::No { + if congestion_blocked.is_blocked() { if let PathBlocked::Pacing(delay) = congestion_blocked { let resume_time = now + delay; self.timers.set_if_earlier( @@ -1788,20 +1788,57 @@ impl Connection { } } + /// Send an MTU probe on the path if necessary. + /// + /// We MTU probe all paths for which all of the following is true: + /// - We have an active destination CID for the path. + /// - The remote address *and* path are validated. + /// - The path is not abandoned. + /// - The MTU Discovery subsystem wants to probe the path. + /// - We have congestion-control window for the probe. fn poll_transmit_mtu_probe( &mut self, now: Instant, buf: &mut Vec, path_id: PathId, ) -> Option { - let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?; + let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?; + let is_eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); + if !is_eligible { + return None; + } - // We are definitely sending a DPLPMTUD probe. + let probe_size = self.path_data_mut(path_id).mtud.next_probe(now)?; let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize); - transmit.start_new_datagram_with_size(probe_size as usize); + let can_send = SendableFrames { + acks: false, + close: false, + space_specific: true, + other: false, + }; + let congestion_blocked = + self.path_congestion_check(SpaceId::Data, path_id, &transmit, &can_send); + if congestion_blocked.is_blocked() { + if let PathBlocked::Pacing(delay) = congestion_blocked { + let resume_time = now + delay; + self.timers.set_if_earlier( + Timer::PerPath(path_id, PathTimer::Pacing), + resume_time, + self.qlog.with_time(now), + ); + } + return None; + } + // We are definitely sending a DPLPMTUD probe. + transmit.start_new_datagram_with_size(probe_size as usize); let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?; + self.path_data_mut(path_id) + .mtud + .probe_in_flight(builder.packet_number, probe_size); // We implement MTU probes as ping packets padded up to the probe size trace!(?probe_size, "writing MTUD probe"); @@ -1822,33 +1859,6 @@ impl Connection { Some(self.build_transmit(path_id, transmit)) } - /// Returns the CID and probe size if a DPLPMTUD probe is needed. - /// - /// We MTU probe all paths for which all of the following is true: - /// - We have an active destination CID for the path. - /// - The remote address *and* path are validated. - /// - The path is not abandoned. - /// - The MTU Discovery subsystem wants to probe the path. - fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { - let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?; - let is_eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - - if !is_eligible { - return None; - } - let next_pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let probe_size = self - .path_data_mut(path_id) - .mtud - .poll_transmit(now, next_pn)?; - - Some((active_cid, probe_size)) - } - /// Returns true if there is a further packet to send on [`PathId::ZERO`]. /// /// In other words this is predicting whether the next call to @@ -1926,7 +1936,7 @@ impl Connection { let bytes_to_send = transmit.segment_size() as u64; let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; - if can_send.other && !need_loss_probe && !can_send.close { + if !need_loss_probe && can_send.is_congestion_controlled() { let path = self.path_data(path_id); if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { trace!( @@ -7256,6 +7266,12 @@ enum PathBlocked { Pacing(Duration), } +impl PathBlocked { + fn is_blocked(&self) -> bool { + *self != Self::No + } +} + /// Fields of `Connection` specific to it being client-side or server-side enum ConnectionSide { Client { diff --git a/noq-proto/src/connection/mtud.rs b/noq-proto/src/connection/mtud.rs index a2cf4350d..537ac6c32 100644 --- a/noq-proto/src/connection/mtud.rs +++ b/noq-proto/src/connection/mtud.rs @@ -65,11 +65,21 @@ impl MtuDiscovery { /// Returns the amount of bytes that should be sent as an MTU probe, if any. /// /// Returns [`None`] if MTUD discovery is disabled. Otherwise delegates to - /// [`EnabledMtuDiscovery::poll_transmit`]. - pub(crate) fn poll_transmit(&mut self, now: Instant, next_pn: u64) -> Option { + /// [`EnabledMtuDiscovery::next_probe`]. + /// + /// If `Some` is returned the probe **must** be sent, the state of the MTU prober + /// assumes it has been sent. + pub(crate) fn next_probe(&mut self, now: Instant) -> Option { self.state .as_mut() - .and_then(|state| state.poll_transmit(now, self.current_mtu, next_pn)) + .and_then(|state| state.next_probe(now, self.current_mtu)) + } + + pub(crate) fn probe_in_flight(&mut self, pn: u64, size: u16) { + match self.state.as_mut() { + Some(state) => state.probe_in_flight(pn, size), + None => (), + } } /// Notifies the [`MtuDiscovery`] that the peer's `max_udp_payload_size` transport parameter has @@ -187,7 +197,9 @@ impl EnabledMtuDiscovery { /// - A search for a new MTU is in progress. /// - The MTU discovery was completed but the [`MtuDiscoveryConfig::interval`] expired, /// this re-starts a n MTU search. - fn poll_transmit(&mut self, now: Instant, current_mtu: u16, next_pn: u64) -> Option { + /// + /// Once a probe has been sent, [`Self::probe_in_flight`] needs to be called. + fn next_probe(&mut self, now: Instant, current_mtu: u16) -> Option { if let Phase::Initial = &self.phase { // Start the first search self.phase = Phase::Searching(SearchState::new( @@ -216,7 +228,6 @@ impl EnabledMtuDiscovery { // Retransmit lost probes, if any if 0 < state.lost_probe_count && state.lost_probe_count < MAX_PROBE_RETRANSMITS { - state.in_flight_probe = Some(next_pn); return Some(state.last_probed_mtu); } @@ -229,8 +240,6 @@ impl EnabledMtuDiscovery { } if let Some(probe_udp_payload_size) = state.next_mtu_to_probe(last_probe_succeeded) { - state.in_flight_probe = Some(next_pn); - state.last_probed_mtu = probe_udp_payload_size; return Some(probe_udp_payload_size); } else { let next_mtud_activation = now + self.config.interval; @@ -242,6 +251,14 @@ impl EnabledMtuDiscovery { None } + /// Mark an MTU probe as sent. + fn probe_in_flight(&mut self, pn: u64, size: u16) { + if let Phase::Searching(state) = &mut self.phase { + state.in_flight_probe = Some(pn); + state.last_probed_mtu = size; + } + } + /// Called when a packet is acknowledged in [`SpaceId::Data`] /// /// Returns the new `current_mtu` if the packet number corresponds to the in-flight MTU probe @@ -548,7 +565,10 @@ mod tests { ) -> Vec { let mut probed_sizes = Vec::new(); for probe_pn in 1..100 { - let result = mtud.poll_transmit(now, probe_pn); + let result = mtud.next_probe(now); + if let Some(probe_size) = result { + mtud.probe_in_flight(probe_pn, probe_size); + } if completed(mtud) { break; @@ -605,7 +625,7 @@ mod tests { #[test] fn mtu_discovery_disabled_does_nothing() { let mut mtud = MtuDiscovery::disabled(1_200, 1_200); - let probe_size = mtud.poll_transmit(Instant::now(), 0); + let probe_size = mtud.next_probe(Instant::now()); assert_eq!(probe_size, None); } @@ -663,15 +683,12 @@ mod tests { drive_to_completion(&mut mtud, now, 1_500); // Polling right after completion does not cause new packets to be sent - assert_eq!(mtud.poll_transmit(now, 42), None); + assert_eq!(mtud.next_probe(now), None); assert!(completed(&mtud)); assert_eq!(mtud.current_mtu, 1_471); // Polling after the interval has passed does (taking the current mtu as lower bound) - assert_eq!( - mtud.poll_transmit(now + Duration::from_secs(600), 43), - Some(5235) - ); + assert_eq!(mtud.next_probe(now + Duration::from_secs(600)), Some(5235)); match mtud.state.unwrap().phase { Phase::Searching(state) => { @@ -689,11 +706,13 @@ mod tests { let mut mtud = default_mtud(); let mut probe_sizes = (0..4).map(|i| { - let probe_size = mtud.poll_transmit(Instant::now(), i); - assert!(probe_size.is_some(), "no probe returned for packet {i}"); + let probe_size = mtud + .next_probe(Instant::now()) + .expect("no probe returned for packet {i}"); + mtud.probe_in_flight(i, probe_size); mtud.on_probe_lost(); - probe_size.unwrap() + probe_size }); // After the first probe is lost, it gets retransmitted twice @@ -744,7 +763,7 @@ mod tests { #[should_panic(expected = "Transport parameters received after MTU probing started")] fn mtu_discovery_with_peer_max_udp_payload_size_during_search_panics() { let mut mtud = default_mtud(); - assert!(mtud.poll_transmit(Instant::now(), 0).is_some()); + assert!(mtud.next_probe(Instant::now()).is_some()); assert!(matches!( mtud.state.as_ref().unwrap().phase, Phase::Searching(_) @@ -805,9 +824,11 @@ mod tests { iterations += 1; let probe_pn = i * 2 - 1; - let other_pn = i * 2; - let result = mtud.poll_transmit(Instant::now(), probe_pn); + let result = mtud.next_probe(Instant::now()); + if let Some(size) = result { + mtud.probe_in_flight(probe_pn, size); + } if completed(&mtud) { break; @@ -818,7 +839,7 @@ mod tests { assert!(mtud.in_flight_mtu_probe().is_some()); // Nothing else to send while the probe is in-flight - assert_matches!(mtud.poll_transmit(now, other_pn), None); + assert_matches!(mtud.next_probe(now), None); if i % 2 == 0 { // ACK probe and ensure it results in an increase of current_mtu diff --git a/noq-proto/src/connection/spaces.rs b/noq-proto/src/connection/spaces.rs index 8cb3f89a1..e42e124bf 100644 --- a/noq-proto/src/connection/spaces.rs +++ b/noq-proto/src/connection/spaces.rs @@ -967,6 +967,25 @@ impl SendableFrames { } = *self; !acks && !close && !space_specific && !other } + + /// Whether the packet will be congestion controlled. + /// + /// Packets with only ACK and CONNECTION_CLOSE frames are not congestion controlled. + pub(super) fn is_congestion_controlled(&self) -> bool { + let Self { + acks, + close, + space_specific, + other, + } = *self; + // ACK-only packets are not congestion controlled. + // TODO(flub): We do not congestion-control CONNECTION_CLOSE frames currently. It is + // unclear how correct this is. I suspect this wrong for the first + // CONNECTION_CLOSE being sent, but correct for any further CONNECTION_CLOSE that + // is sent in response to more incoming packets. + let is_ack_only = acks && !(space_specific || other); + !is_ack_only && !close + } } impl ::std::ops::BitOrAssign for SendableFrames { From b5943aa52538c03534fa8275cfb23ea4cc1d09bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 16 Apr 2026 08:59:04 +0200 Subject: [PATCH 2/3] Write a test verifying that `next_probe` is idempotent (As long as `probe_in_flight` hasn't been called yet) --- noq-proto/src/connection/mtud.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/noq-proto/src/connection/mtud.rs b/noq-proto/src/connection/mtud.rs index 537ac6c32..2aee3e903 100644 --- a/noq-proto/src/connection/mtud.rs +++ b/noq-proto/src/connection/mtud.rs @@ -891,6 +891,22 @@ mod tests { assert_eq!(state.upper_bound, 1450); } + #[test] + fn next_probe_should_be_idempotent() { + let mut config = MtuDiscoveryConfig::default(); + config.upper_bound(MAX_UDP_PAYLOAD); + let mut mtud = MtuDiscovery::new(1200, 1200, None, config); + + let now = Instant::now(); + let first_probe = mtud.next_probe(now); + let second_probe = mtud.next_probe(now); + assert_eq!(first_probe, second_probe); + + mtud.probe_in_flight(1, first_probe.unwrap()); + let third_probe = mtud.next_probe(now); + assert_ne!(first_probe, third_probe); + } + // Loss of packets larger than have been acknowledged should indicate a black hole #[test] fn simple_black_hole_detection() { From a39d6eece38699e5360f676ac6b3ea8512242559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 16 Apr 2026 10:47:52 +0200 Subject: [PATCH 3/3] Make clippy happy --- noq-proto/src/connection/mtud.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/noq-proto/src/connection/mtud.rs b/noq-proto/src/connection/mtud.rs index 2aee3e903..be5d0c720 100644 --- a/noq-proto/src/connection/mtud.rs +++ b/noq-proto/src/connection/mtud.rs @@ -76,9 +76,8 @@ impl MtuDiscovery { } pub(crate) fn probe_in_flight(&mut self, pn: u64, size: u16) { - match self.state.as_mut() { - Some(state) => state.probe_in_flight(pn, size), - None => (), + if let Some(state) = self.state.as_mut() { + state.probe_in_flight(pn, size); } }