diff --git a/Cargo.lock b/Cargo.lock index f9337a348..fffa5a7c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,7 +1679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.53.4", ] [[package]] @@ -2481,9 +2481,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "aws-lc-rs", "ring", diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index f22291b88..c36751c51 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -1488,9 +1488,20 @@ impl Connection { // if we will need to start a new datagram. If we are coalescing into an already // started datagram we do not need to check congestion control again. if transmit.datagram_remaining_mut() == 0 { + // We need to update the pacer once to make sure it actually has update its available tokens in case + // we need them in `path_congestion_check`. + self.path_data_mut(path_id).update_pacer(now); let congestion_blocked = - self.path_congestion_check(space_id, path_id, transmit, &can_send, now); + self.path_congestion_check(space_id, path_id, transmit, &can_send); if congestion_blocked != PathBlocked::No { + if let PathBlocked::Pacing(delay) = congestion_blocked { + let resume_time = now + delay; + self.timers.set_if_earlier( + Timer::PerPath(path_id, PathTimer::Pacing), + resume_time, + self.qlog.with_time(now), + ); + } // Previous iterations of this loop may have built packets already. return match last_packet_number { Some(pn) => PollPathSpaceStatus::WrotePacket { @@ -1879,14 +1890,22 @@ impl Connection { false } - /// Checks if creating a new datagram would be blocked by congestion control + /// Checks if creating a new datagram would be blocked by congestion control or pacing. + /// + /// This is a pure function - it doesn't modify state, so neither does it update the pacer + /// nor does it set the pacing timer. Callers need to handle that: + /// + /// - To make new pacing tokens available, the pacer needs to be update before a call to + /// this function using [`PathData::update_pacer`]. + /// - When this function returns [`PathBlocked::Pacing`], and you decide to actually send + /// the given packet, call [`TimerTable::set_if_earlier`] with [`PathTimer::Pacing`] + /// and given delay relative to the time used for `update_pacer`. fn path_congestion_check( &mut self, space_id: SpaceId, path_id: PathId, transmit: &TransmitBuf<'_>, can_send: &SendableFrames, - now: Instant, ) -> PathBlocked { // Anti-amplification is only based on `total_sent`, which gets updated after // the transmit is sent. Therefore we pass the amount of bytes for datagrams @@ -1922,17 +1941,11 @@ impl Connection { } // Pacing check. - if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) { - let resume_time = now + delay; - self.timers.set( - Timer::PerPath(path_id, PathTimer::Pacing), - resume_time, - self.qlog.with_time(now), - ); + if let Some(delay) = self.path_data(path_id).pacing_delay(bytes_to_send) { // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though // they are not congestion controlled. trace!(?space_id, %path_id, ?delay, "blocked by pacing"); - return PathBlocked::Pacing; + return PathBlocked::Pacing(delay); } PathBlocked::No @@ -7240,7 +7253,7 @@ enum PathBlocked { No, AntiAmplification, Congestion, - Pacing, + Pacing(Duration), } /// Fields of `Connection` specific to it being client-side or server-side diff --git a/noq-proto/src/connection/pacing.rs b/noq-proto/src/connection/pacing.rs index 794b8070c..7f19396d6 100644 --- a/noq-proto/src/connection/pacing.rs +++ b/noq-proto/src/connection/pacing.rs @@ -48,13 +48,41 @@ impl Pacer { /// The 5/4 ratio used here comes from the suggestion that N = 1.25 in the draft IETF /// RFC for QUIC. pub(super) fn delay( - &mut self, + &self, smoothed_rtt: Duration, bytes_to_send: u64, + window: u64, + ) -> Option { + // if we can already send a packet, there is no need for delay + if self.tokens >= bytes_to_send { + return None; + } + + if smoothed_rtt.as_nanos() == 0 { + return None; + } + + // we disable pacing for extremely large windows + let window = u32::try_from(window).ok()?; + + let unscaled_delay = smoothed_rtt + .checked_mul((bytes_to_send.max(self.capacity) - self.tokens) as _) + .unwrap_or(Duration::MAX) + / window; + + // divisions come before multiplications to prevent overflow + // this is the time at which the pacing window becomes empty + Some((unscaled_delay / 5) * 4) + } + + /// Updates this pacer, potentially filling it with more tokens for transmission. + pub(super) fn update( + &mut self, + smoothed_rtt: Duration, mtu: u16, window: u64, now: Instant, - ) -> Option { + ) -> &mut Self { debug_assert_ne!( window, 0, "zero-sized congestion control window is nonsense" @@ -69,17 +97,10 @@ impl Pacer { self.last_mtu = mtu; } - // if we can already send a packet, there is no need for delay - if self.tokens >= bytes_to_send { - return None; - } - // we disable pacing for extremely large windows - if window > u64::from(u32::MAX) { - return None; - } - - let window = window as u32; + let Ok(window) = u32::try_from(window) else { + return self; + }; let time_elapsed = now.checked_duration_since(self.prev).unwrap_or_else(|| { warn!("received a timestamp early than a previous recorded time, ignoring"); @@ -87,7 +108,7 @@ impl Pacer { }); if smoothed_rtt.as_nanos() == 0 { - return None; + return self; } let elapsed_rtts = time_elapsed.as_secs_f64() / smoothed_rtt.as_secs_f64(); @@ -100,19 +121,7 @@ impl Pacer { self.prev = now; } - // if we can already send a packet, there is no need for delay - if self.tokens >= bytes_to_send { - return None; - } - - let unscaled_delay = smoothed_rtt - .checked_mul((bytes_to_send.max(self.capacity) - self.tokens) as _) - .unwrap_or(Duration::MAX) - / window; - - // divisions come before multiplications to prevent overflow - // this is the time at which the pacing window becomes empty - Some((unscaled_delay / 5) * 4) + self } } @@ -176,17 +185,20 @@ mod tests { assert!( Pacer::new(rtt, 30000, 1500, new_instant) - .delay(Duration::from_micros(0), 0, 1500, 1, old_instant) + .update(Duration::from_micros(0), 1500, 1, old_instant) + .delay(Duration::from_micros(0), 0, 1500) .is_none() ); assert!( Pacer::new(rtt, 30000, 1500, new_instant) - .delay(Duration::from_micros(0), 1600, 1500, 1, old_instant) + .update(Duration::from_micros(0), 1500, 1, old_instant) + .delay(Duration::from_micros(0), 1600, 1500) .is_none() ); assert!( Pacer::new(rtt, 30000, 1500, new_instant) - .delay(Duration::from_micros(0), 1500, 1500, 3000, old_instant) + .update(Duration::from_micros(0), 1500, 3000, old_instant) + .delay(Duration::from_micros(0), 1500, 1500) .is_none() ); } @@ -229,27 +241,35 @@ mod tests { assert_eq!(pacer.tokens, pacer.capacity); let initial_tokens = pacer.tokens; - pacer.delay(rtt, mtu as u64, mtu, window * 2, now); + pacer + .update(rtt, mtu, window * 2, now) + .delay(rtt, mtu as u64, window); assert_eq!( pacer.capacity, (2 * window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64 ); assert_eq!(pacer.tokens, initial_tokens); - pacer.delay(rtt, mtu as u64, mtu, window / 2, now); + pacer + .update(rtt, mtu, window / 2, now) + .delay(rtt, mtu as u64, window); assert_eq!( pacer.capacity, (window as u128 / 2 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64 ); assert_eq!(pacer.tokens, initial_tokens / 2); - pacer.delay(rtt, mtu as u64, mtu * 2, window, now); + pacer + .update(rtt, mtu * 2, window, now) + .delay(rtt, mtu as u64, window); assert_eq!( pacer.capacity, (window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64 ); - pacer.delay(rtt, mtu as u64, 20_000, window, now); + pacer + .update(rtt, 20_000, window, now) + .delay(rtt, mtu as u64, window); assert_eq!(pacer.capacity, 20_000_u64 * MIN_BURST_SIZE); } @@ -265,7 +285,9 @@ mod tests { for _ in 0..packet_capacity { assert_eq!( - pacer.delay(rtt, mtu as u64, mtu, window, old_instant), + pacer + .update(rtt, mtu, window, old_instant) + .delay(rtt, mtu as u64, window), None, "When capacity is available packets should be sent immediately" ); @@ -276,7 +298,8 @@ mod tests { let pace_duration = Duration::from_nanos((TARGET_BURST_INTERVAL.as_nanos() * 4 / 5) as u64); let actual_delay = pacer - .delay(rtt, mtu as u64, mtu, window, old_instant) + .update(rtt, mtu, window, old_instant) + .delay(rtt, mtu as u64, window) .expect("Send must be delayed"); let diff = actual_delay.abs_diff(pace_duration); @@ -288,20 +311,18 @@ mod tests { ); // Refill half of the tokens assert_eq!( - pacer.delay( - rtt, - mtu as u64, - mtu, - window, - old_instant + pace_duration / 2 - ), + pacer + .update(rtt, mtu, window, old_instant + pace_duration / 2) + .delay(rtt, mtu as u64, window), None ); assert_eq!(pacer.tokens, pacer.capacity / 2); for _ in 0..packet_capacity / 2 { assert_eq!( - pacer.delay(rtt, mtu as u64, mtu, window, old_instant), + pacer + .update(rtt, mtu, window, old_instant) + .delay(rtt, mtu as u64, window), None, "When capacity is available packets should be sent immediately" ); @@ -311,13 +332,9 @@ mod tests { // Refill all capacity by waiting more than the expected duration assert_eq!( - pacer.delay( - rtt, - mtu as u64, - mtu, - window, - old_instant + pace_duration * 3 / 2 - ), + pacer + .update(rtt, mtu, window, old_instant + pace_duration * 3 / 2) + .delay(rtt, mtu as u64, window), None ); assert_eq!(pacer.tokens, pacer.capacity); diff --git a/noq-proto/src/connection/paths.rs b/noq-proto/src/connection/paths.rs index ceeefc006..d24bf3304 100644 --- a/noq-proto/src/connection/paths.rs +++ b/noq-proto/src/connection/paths.rs @@ -618,18 +618,22 @@ impl PathData { event } - /// Return how long we need to wait before sending `bytes_to_send` - /// - /// See [`Pacer::delay`]. - pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option { - let smoothed_rtt = self.rtt.get(); - self.pacing.delay( - smoothed_rtt, - bytes_to_send, + /// Updates the pacer, potentially giving it more tokens for transmission again. + pub(super) fn update_pacer(&mut self, now: Instant) { + self.pacing.update( + self.rtt.get(), self.current_mtu(), self.congestion.window(), now, - ) + ); + } + + /// Return how long we need to wait before sending `bytes_to_send` + /// + /// See [`Pacer::delay`]. + pub(super) fn pacing_delay(&self, bytes_to_send: u64) -> Option { + self.pacing + .delay(self.rtt.get(), bytes_to_send, self.congestion.window()) } /// Updates the last observed address report received on this path. diff --git a/noq-proto/src/connection/timer.rs b/noq-proto/src/connection/timer.rs index 4b2a47ee9..10e285415 100644 --- a/noq-proto/src/connection/timer.rs +++ b/noq-proto/src/connection/timer.rs @@ -292,6 +292,20 @@ impl TimerTable { qlog.emit_timer_set(timer, time); } + /// Sets the timer, but only if the timer was unset or was set to a later value before. + pub(super) fn set_if_earlier( + &mut self, + timer: Timer, + time: Instant, + qlog: QlogSinkWithTime<'_>, + ) { + match self.get(timer) { + Some(already_set) if already_set <= time => {} + None => {} + Some(_) => self.set(timer, time, qlog), + } + } + pub(super) fn get(&self, timer: Timer) -> Option { match timer { Timer::Conn(timer) => self.generic[timer as usize],