diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index c36751c51..5d4a28ddd 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -1493,7 +1493,7 @@ impl Connection { self.path_data_mut(path_id).update_pacer(now); let congestion_blocked = self.path_congestion_check(space_id, path_id, transmit, &can_send); - if congestion_blocked != PathBlocked::No { + if congestion_blocked.is_blocked() { if let PathBlocked::Pacing(delay) = congestion_blocked { let resume_time = now + delay; self.timers.set_if_earlier( @@ -1788,20 +1788,57 @@ impl Connection { } } + /// Send an MTU probe on the path if necessary. + /// + /// We MTU probe all paths for which all of the following is true: + /// - We have an active destination CID for the path. + /// - The remote address *and* path are validated. + /// - The path is not abandoned. + /// - The MTU Discovery subsystem wants to probe the path. + /// - We have congestion-control window for the probe. fn poll_transmit_mtu_probe( &mut self, now: Instant, buf: &mut Vec, path_id: PathId, ) -> Option { - let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?; + let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?; + let is_eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); + if !is_eligible { + return None; + } - // We are definitely sending a DPLPMTUD probe. + let probe_size = self.path_data_mut(path_id).mtud.next_probe(now)?; let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize); - transmit.start_new_datagram_with_size(probe_size as usize); + let can_send = SendableFrames { + acks: false, + close: false, + space_specific: true, + other: false, + }; + let congestion_blocked = + self.path_congestion_check(SpaceId::Data, path_id, &transmit, &can_send); + if congestion_blocked.is_blocked() { + if let PathBlocked::Pacing(delay) = congestion_blocked { + let resume_time = now + delay; + self.timers.set_if_earlier( + Timer::PerPath(path_id, PathTimer::Pacing), + resume_time, + self.qlog.with_time(now), + ); + } + return None; + } + // We are definitely sending a DPLPMTUD probe. + transmit.start_new_datagram_with_size(probe_size as usize); let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?; + self.path_data_mut(path_id) + .mtud + .probe_in_flight(builder.packet_number, probe_size); // We implement MTU probes as ping packets padded up to the probe size trace!(?probe_size, "writing MTUD probe"); @@ -1822,33 +1859,6 @@ impl Connection { Some(self.build_transmit(path_id, transmit)) } - /// Returns the CID and probe size if a DPLPMTUD probe is needed. - /// - /// We MTU probe all paths for which all of the following is true: - /// - We have an active destination CID for the path. - /// - The remote address *and* path are validated. - /// - The path is not abandoned. - /// - The MTU Discovery subsystem wants to probe the path. - fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { - let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?; - let is_eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - - if !is_eligible { - return None; - } - let next_pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let probe_size = self - .path_data_mut(path_id) - .mtud - .poll_transmit(now, next_pn)?; - - Some((active_cid, probe_size)) - } - /// Returns true if there is a further packet to send on [`PathId::ZERO`]. /// /// In other words this is predicting whether the next call to @@ -1926,7 +1936,7 @@ impl Connection { let bytes_to_send = transmit.segment_size() as u64; let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; - if can_send.other && !need_loss_probe && !can_send.close { + if !need_loss_probe && can_send.is_congestion_controlled() { let path = self.path_data(path_id); if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { trace!( @@ -7256,6 +7266,12 @@ enum PathBlocked { Pacing(Duration), } +impl PathBlocked { + fn is_blocked(&self) -> bool { + *self != Self::No + } +} + /// Fields of `Connection` specific to it being client-side or server-side enum ConnectionSide { Client { diff --git a/noq-proto/src/connection/mtud.rs b/noq-proto/src/connection/mtud.rs index a2cf4350d..be5d0c720 100644 --- a/noq-proto/src/connection/mtud.rs +++ b/noq-proto/src/connection/mtud.rs @@ -65,11 +65,20 @@ impl MtuDiscovery { /// Returns the amount of bytes that should be sent as an MTU probe, if any. /// /// Returns [`None`] if MTUD discovery is disabled. Otherwise delegates to - /// [`EnabledMtuDiscovery::poll_transmit`]. - pub(crate) fn poll_transmit(&mut self, now: Instant, next_pn: u64) -> Option { + /// [`EnabledMtuDiscovery::next_probe`]. + /// + /// If `Some` is returned the probe **must** be sent, the state of the MTU prober + /// assumes it has been sent. + pub(crate) fn next_probe(&mut self, now: Instant) -> Option { self.state .as_mut() - .and_then(|state| state.poll_transmit(now, self.current_mtu, next_pn)) + .and_then(|state| state.next_probe(now, self.current_mtu)) + } + + pub(crate) fn probe_in_flight(&mut self, pn: u64, size: u16) { + if let Some(state) = self.state.as_mut() { + state.probe_in_flight(pn, size); + } } /// Notifies the [`MtuDiscovery`] that the peer's `max_udp_payload_size` transport parameter has @@ -187,7 +196,9 @@ impl EnabledMtuDiscovery { /// - A search for a new MTU is in progress. /// - The MTU discovery was completed but the [`MtuDiscoveryConfig::interval`] expired, /// this re-starts a n MTU search. - fn poll_transmit(&mut self, now: Instant, current_mtu: u16, next_pn: u64) -> Option { + /// + /// Once a probe has been sent, [`Self::probe_in_flight`] needs to be called. + fn next_probe(&mut self, now: Instant, current_mtu: u16) -> Option { if let Phase::Initial = &self.phase { // Start the first search self.phase = Phase::Searching(SearchState::new( @@ -216,7 +227,6 @@ impl EnabledMtuDiscovery { // Retransmit lost probes, if any if 0 < state.lost_probe_count && state.lost_probe_count < MAX_PROBE_RETRANSMITS { - state.in_flight_probe = Some(next_pn); return Some(state.last_probed_mtu); } @@ -229,8 +239,6 @@ impl EnabledMtuDiscovery { } if let Some(probe_udp_payload_size) = state.next_mtu_to_probe(last_probe_succeeded) { - state.in_flight_probe = Some(next_pn); - state.last_probed_mtu = probe_udp_payload_size; return Some(probe_udp_payload_size); } else { let next_mtud_activation = now + self.config.interval; @@ -242,6 +250,14 @@ impl EnabledMtuDiscovery { None } + /// Mark an MTU probe as sent. + fn probe_in_flight(&mut self, pn: u64, size: u16) { + if let Phase::Searching(state) = &mut self.phase { + state.in_flight_probe = Some(pn); + state.last_probed_mtu = size; + } + } + /// Called when a packet is acknowledged in [`SpaceId::Data`] /// /// Returns the new `current_mtu` if the packet number corresponds to the in-flight MTU probe @@ -548,7 +564,10 @@ mod tests { ) -> Vec { let mut probed_sizes = Vec::new(); for probe_pn in 1..100 { - let result = mtud.poll_transmit(now, probe_pn); + let result = mtud.next_probe(now); + if let Some(probe_size) = result { + mtud.probe_in_flight(probe_pn, probe_size); + } if completed(mtud) { break; @@ -605,7 +624,7 @@ mod tests { #[test] fn mtu_discovery_disabled_does_nothing() { let mut mtud = MtuDiscovery::disabled(1_200, 1_200); - let probe_size = mtud.poll_transmit(Instant::now(), 0); + let probe_size = mtud.next_probe(Instant::now()); assert_eq!(probe_size, None); } @@ -663,15 +682,12 @@ mod tests { drive_to_completion(&mut mtud, now, 1_500); // Polling right after completion does not cause new packets to be sent - assert_eq!(mtud.poll_transmit(now, 42), None); + assert_eq!(mtud.next_probe(now), None); assert!(completed(&mtud)); assert_eq!(mtud.current_mtu, 1_471); // Polling after the interval has passed does (taking the current mtu as lower bound) - assert_eq!( - mtud.poll_transmit(now + Duration::from_secs(600), 43), - Some(5235) - ); + assert_eq!(mtud.next_probe(now + Duration::from_secs(600)), Some(5235)); match mtud.state.unwrap().phase { Phase::Searching(state) => { @@ -689,11 +705,13 @@ mod tests { let mut mtud = default_mtud(); let mut probe_sizes = (0..4).map(|i| { - let probe_size = mtud.poll_transmit(Instant::now(), i); - assert!(probe_size.is_some(), "no probe returned for packet {i}"); + let probe_size = mtud + .next_probe(Instant::now()) + .expect("no probe returned for packet {i}"); + mtud.probe_in_flight(i, probe_size); mtud.on_probe_lost(); - probe_size.unwrap() + probe_size }); // After the first probe is lost, it gets retransmitted twice @@ -744,7 +762,7 @@ mod tests { #[should_panic(expected = "Transport parameters received after MTU probing started")] fn mtu_discovery_with_peer_max_udp_payload_size_during_search_panics() { let mut mtud = default_mtud(); - assert!(mtud.poll_transmit(Instant::now(), 0).is_some()); + assert!(mtud.next_probe(Instant::now()).is_some()); assert!(matches!( mtud.state.as_ref().unwrap().phase, Phase::Searching(_) @@ -805,9 +823,11 @@ mod tests { iterations += 1; let probe_pn = i * 2 - 1; - let other_pn = i * 2; - let result = mtud.poll_transmit(Instant::now(), probe_pn); + let result = mtud.next_probe(Instant::now()); + if let Some(size) = result { + mtud.probe_in_flight(probe_pn, size); + } if completed(&mtud) { break; @@ -818,7 +838,7 @@ mod tests { assert!(mtud.in_flight_mtu_probe().is_some()); // Nothing else to send while the probe is in-flight - assert_matches!(mtud.poll_transmit(now, other_pn), None); + assert_matches!(mtud.next_probe(now), None); if i % 2 == 0 { // ACK probe and ensure it results in an increase of current_mtu @@ -870,6 +890,22 @@ mod tests { assert_eq!(state.upper_bound, 1450); } + #[test] + fn next_probe_should_be_idempotent() { + let mut config = MtuDiscoveryConfig::default(); + config.upper_bound(MAX_UDP_PAYLOAD); + let mut mtud = MtuDiscovery::new(1200, 1200, None, config); + + let now = Instant::now(); + let first_probe = mtud.next_probe(now); + let second_probe = mtud.next_probe(now); + assert_eq!(first_probe, second_probe); + + mtud.probe_in_flight(1, first_probe.unwrap()); + let third_probe = mtud.next_probe(now); + assert_ne!(first_probe, third_probe); + } + // Loss of packets larger than have been acknowledged should indicate a black hole #[test] fn simple_black_hole_detection() { diff --git a/noq-proto/src/connection/spaces.rs b/noq-proto/src/connection/spaces.rs index 8cb3f89a1..e42e124bf 100644 --- a/noq-proto/src/connection/spaces.rs +++ b/noq-proto/src/connection/spaces.rs @@ -967,6 +967,25 @@ impl SendableFrames { } = *self; !acks && !close && !space_specific && !other } + + /// Whether the packet will be congestion controlled. + /// + /// Packets with only ACK and CONNECTION_CLOSE frames are not congestion controlled. + pub(super) fn is_congestion_controlled(&self) -> bool { + let Self { + acks, + close, + space_specific, + other, + } = *self; + // ACK-only packets are not congestion controlled. + // TODO(flub): We do not congestion-control CONNECTION_CLOSE frames currently. It is + // unclear how correct this is. I suspect this wrong for the first + // CONNECTION_CLOSE being sent, but correct for any further CONNECTION_CLOSE that + // is sent in response to more incoming packets. + let is_ack_only = acks && !(space_specific || other); + !is_ack_only && !close + } } impl ::std::ops::BitOrAssign for SendableFrames {