diff --git a/SPEC/9002_BBR.md b/SPEC/9002_BBR.md new file mode 100644 index 0000000..4648bb6 --- /dev/null +++ b/SPEC/9002_BBR.md @@ -0,0 +1,280 @@ +# RFC 9002 + BBRv3 Congestion Control + +## Status + +**Phases 1–4 complete (Apr 2026).** BBRv3 is selectable via +`ConnectionConfig.congestion_control = .bbr` alongside the existing +`.cubic` (default) and `.newreno` controllers. + +Reference: `draft-cardwell-iccrg-bbr-congestion-control-03`. + +## Architecture + +### CC abstraction (`src/quic/congestion.zig`) + +``` +pub const Algorithm = enum { newreno, cubic, bbr }; +pub const CongestionControl = union(Algorithm) { + newreno: NewReno, + cubic: Cubic, + bbr: Bbr, +}; +``` + +Tagged union with `inline else` dispatch — zero allocation, exhaustive +switch protection at compile time. Replaces the previous hardcoded +`cc: Cubic` field on `Connection`. + +### Batched ACK API + +``` +pub const AckContext = struct { + now: i64, + bytes_in_flight: u64, + persistent_congestion: bool, + earliest_lost_sent_time: ?i64, + ce_byte_count: u64, + rate_sample: ?delivery_rate.RateSample = null, +}; + +pub fn onAckBatch(self: *CongestionControl, ctx: *const AckContext) void; +``` + +Per-packet `onPacketAcked` still fires inside the connection's existing +ACK-processing loop (where per-packet stream/MTU bookkeeping lives). +`onAckBatch` carries batch-level signals: loss summary, ECN-CE bytes, +and the latest delivery-rate sample. NewReno/Cubic implement +`onAckBatch` as a thin shim over their existing +`onCongestionEvent`/`onPersistentCongestion` methods. BBR consumes the +whole context. + +### Pacer + +``` +pub fn setBandwidth(self: *Pacer, cwnd: u64, rtt_stats: *const RttStats); +pub fn setPacingRate(self: *Pacer, bytes_per_second: u64); +``` + +NewReno/Cubic use `setBandwidth` (cwnd/RTT-derived rate). BBR computes +its own pacing rate from `pacing_gain × max_bw` and writes it directly +via `setPacingRate`. The connection calls +`cc.updatePacer(&pacer, &rtt_stats)` which dispatches the right path. + +### Delivery-rate sampler (`src/quic/delivery_rate.zig`) + +**Off by default — pay-for-what-you-use.** `PacketHandler.rate_sampling_enabled` +is `false` unless `cc_algorithm == .bbr`. When off, both +`onPacketSent` (4-store snapshot stamp) and the per-acked-packet u128 +divide are skipped entirely, behind a single predictable branch on a +flag constant for the connection's lifetime. Loopback HTTP/3 bench +confirms zero measurable regression vs the pre-BBR codebase when +Cubic/NewReno is selected. + +**Snapshot fields live inline on `SentPacket`** rather than in a +side-allocated table. This costs ~24 B per in-flight packet +unconditionally (the work is gated, the storage isn't). At 1000 +in-flight packets × 1000 connections that's ~24 MB — fine for typical +servers, potentially worth revisiting at 100K+ connections. The +considered alternative — a separate `?AutoArrayHashMapUnmanaged(u64, +BbrSnapshot)` allocated only when BBR is on — was rejected: it splits +the per-packet state across two structures with synchronized +lifetimes, adds an OOM failure path on `onPacketSent`, and complicates +the sampler API for a memory win that does not show up in current +workloads. + +Per-packet snapshot fields on `SentPacket`: + +``` +delivered_at_send: u64 +delivered_time_at_send: i64 +first_sent_time_at_send: i64 +is_app_limited_at_send: bool +``` + +Stamped in `PacketHandler.onPacketSent`; consumed in +`PacketHandler.onAckReceived` to produce a `RateSample` per acked +packet. The highest-numbered acked packet's sample is exposed via +`pkt_handler.latest_rate_sample` and threaded into `AckContext`. + +## BBRv3 implementation (`src/quic/bbr.zig`) + +Architecture follows picoquic's `bbr.c` pattern: a three-stage pipeline +inside `onAckBatch`: + +1. **`updateModelAndState`** — round detection, recovery exit, latest + round signals, max-BW filter, min-RTT filter, ACK aggregation + (`extra_acked`), ECN-alpha EWMA, loss-event gate (`BBRLossThresh`), + lower-bounds (`bw_lo`, `inflight_lo`), and state transitions. +2. **`updateControlParameters`** — `pacing_rate`, `send_quantum`, cwnd. + +State transitions are isolated in `enterDrain`, `enterProbeBwDown`, +`enterProbeBwCruise`, `enterProbeBwRefill`, `enterProbeBwUp` so the +state machine is auditable in one place each. + +### Model + +- **`max_bw`**: max-filtered over the last 4 rounds. App-limited samples + are skipped *only if they would lower the estimate* (otherwise we'd + miss bandwidth growth during partial idle periods). +- **`bw_lo`**: per-round lower bound, decays toward `bw_latest` (the + latest round's max). Reset at REFILL entry. `boundedBw() = min(max_bw, + bw_lo)` is the rate used for pacing. +- **`min_rtt`**: min over a 10s window. Crucially, the **stamp** that + gates ProbeRTT entry (`probe_rtt_min_stamp`) is *separate* from the + per-sample stamp — it only advances when a *new* minimum is observed + or the window expires. This fixes the prior bug where every sample at + the current min refreshed the stamp and prevented ProbeRTT from ever + firing. +- **`extra_acked`**: ACK-aggregation budget, max-filtered over 10 + rounds. Added to `max_inflight` so we don't underutilize wifi/LTE + links that batch ACKs. + +### State machine + +Event-driven, not timer-driven: + +``` +Startup ──bw plateau / loss > BBRLossThresh / RTT excess─▶ Drain +Drain ──inflight ≤ BDP─▶ ProbeBW_Down + +ProbeBW_Down ──inflight ≤ (1-headroom)·BDP─▶ ProbeBW_Cruise +ProbeBW_Cruise ──≥1 round elapsed─▶ ProbeBW_Refill +ProbeBW_Refill ──one round (resets bw_lo, inflight_lo)─▶ ProbeBW_Up +ProbeBW_Up ──inflight ≥ inflight_hi / >2 rounds─▶ ProbeBW_Down + +(any non-Startup state) ──min_rtt stale (5s)─▶ ProbeRTT +ProbeRTT ──200ms drained─▶ ProbeBW_Down (or Startup if !filled_pipe) +``` + +- **Startup**: pacing_gain = 2.885, cwnd_gain = 2.885. Two exit paths: + bw plateau (≥5/4 growth fails for 3 rounds), or loss > `BBRLossThresh` + (2% of inflight). +- **Drain**: pacing_gain = 0.346, cwnd_gain = 2.885. Exit when cwnd ≤ BDP. +- **ProbeBW**: four sub-states with distinct semantics: + - **Down** (gain 0.9): drain inflight to leave 15% headroom. + - **Cruise** (gain 1.0, cwnd-headroom 0.15): hold steady, fair to + competing flows. + - **Refill** (gain 1.0): reset `bw_lo` and `inflight_lo` so Up can + probe upward; lasts one round. + - **Up** (gain 1.25, cwnd_gain 2.25): raise `inflight_hi` slope-style + via `BBRRaiseInflightHiSlope` so steady-state throughput grows. +- **ProbeRTT**: gated on `probe_rtt_min_stamp + 5s`. Drops cwnd to 4×MSS + for 200ms, then resumes ProbeBW_Down (or Startup if pipe never filled). + +### Loss / ECN response + +- **Loss** (`checkLossEvent` + `BBRLossThresh`): `inflight_hi` + reduction fires only when `lost_bytes > 2% × prior_bytes_in_flight + AND lost_bytes > 3 × MTU`. Without this gate, a single packet loss + would permanently cap throughput at 70% of BDP. Also enters recovery + state for one round of packet conservation. +- **ECN** (`updateEcnAlpha` + `BBRExcessiveEcnCE`): EWMA `alpha = + frac/16 + 15·alpha_prev/16` (gain 1/16). Reduces `inflight_hi` when + `alpha > 0.5`. Smoothing prevents a single CE-marked batch from + triggering reduction. +- **Persistent congestion**: preserves the BW model. Halves cwnd, resets + `bw_lo`/`inflight_lo`, exits recovery. Does NOT throw away `max_bw` + or `min_rtt` — those are minutes of measurement. + +### Recovery + +- On the first loss in a round (loss-too-high), enter recovery; `cwnd ≤ + inflight_lo` for one round (packet conservation approximation). +- On PTO, save `cwnd` and enter recovery; first ack lifts `inflight_hi` + to at least the saved value (BBR recomputes the actual cwnd from its + model). +- Exit when `largest_acked_pn ≥ recovery_start_pn`. + +### Pacing rate, send_quantum, cwnd + +``` +pacing_rate = pacing_gain × boundedBw × (1 - 1% margin) +send_quantum = clamp(pacing_rate × 1ms, 2·MTU, 64KB) +cwnd = max_inflight = bdp × cwnd_gain + extra_acked + (bounded by inflight_hi above, inflight_lo below; floor 4·MTU) +``` + +CRUISE applies a 15% headroom subtraction; ProbeRTT clamps to 4·MTU. + +### Path migration + +`Bbr.onPathChange()` resets the entire struct (path's BW/RTT model is +no longer valid), returning to Startup with a fresh estimator. + +## Wiring + +### `ConnectionConfig` + +``` +congestion_control: congestion.Algorithm = .cubic, +``` + +### `Connection` + +``` +cc: congestion.CongestionControl = .{ .cubic = ... }, +cc_algorithm: congestion.Algorithm = .cubic, // preserved across migrations +``` + +Migration resets use `congestion.CongestionControl.init(self.cc_algorithm)` +so the user-chosen CC survives address rebinding. + +### Per-packet ECN CE attribution + +ACK_ECN counters are aggregate (no per-packet attribution available +from the wire). The implementation distributes the CE-count delta +across the ECT(0) bytes acked in the same batch: + +``` +ce_byte_count = newly_acked_ect0_bytes * ce_delta / newly_acked_ect0 +``` + +This matches quic-go's approach. NewReno/Cubic treat any `ce_byte_count +> 0` as a single congestion event (legacy behavior); BBR uses the byte +count for its alpha threshold check. + +## Caveats + +- **Draft revision drift.** Pinned to `draft-cardwell-iccrg-bbr-congestion-control-03`. +- **Simplified `BBRRaiseInflightHiSlope`.** The draft prescribes a + per-ack slope-based growth; we use a per-round `inflight_hi += + MTU × bw_probe_up_rounds` approximation. Same direction, slightly + less precise. +- **No CRUISE randomized timer / Reno coexistence quota.** Picoquic uses + random timers and a `BBR_BDP_packets`-based forced-probe to share + fairly with Reno flows. We use a simpler "≥1 round elapsed" rule. + Acceptable on links that don't share bottleneck with Reno; revisit if + fairness measurements show starvation. +- **No qlog instrumentation yet** for BBR-specific state (state + transitions, max_bw, min_rtt, pacing_gain, inflight_hi/lo). Follow-up. +- **No interop CLI flag yet.** Interop binaries still use the default + Cubic. Adding `--cc bbr` pass-through is a small follow-up. + +## Test coverage + +- `congestion.zig` — 6 union-dispatch parity tests + the existing + NewReno/Cubic suites. +- `delivery_rate.zig` — 4 sampler tests (steady stream, app-limited + flag propagation, reordering, zero-interval). +- `bbr.zig` — 19 tests covering: initial state, max-BW filter + + app-limited handling, **`BBRLossThresh` gating both directions**, + ProbeRTT entry actually firing (with the `probe_rtt_min_stamp` fix), + ProbeRTT exit, all 4 ProbeBW sub-state transitions, + **`inflight_hi` growth in Up**, ECN EWMA crossing threshold, + **persistent congestion preserves `max_bw`**, PTO recovery exit lifts + `inflight_hi`, `send_quantum` scaling, CRUISE headroom, bdp uses + bounded bw, app-limited handling, pacing margin, path change. +- `connection.zig` — 3 tests confirming algorithm selection via + `ConnectionConfig.congestion_control`. + +Total: 551/551 tests pass. + +## Files + +- `src/quic/congestion.zig` — union, AckContext, NewReno, Cubic, Pacer +- `src/quic/bbr.zig` — Bbr state machine +- `src/quic/delivery_rate.zig` — RateSampler, RateSample +- `src/quic/ack_handler.zig` — SentPacket sampler fields, PacketHandler + rate_sampler +- `src/quic/connection.zig` — cc, cc_algorithm, AckContext call sites, + config wiring diff --git a/SPEC/STATUS.md b/SPEC/STATUS.md index 43a645d..d8e2fbd 100644 --- a/SPEC/STATUS.md +++ b/SPEC/STATUS.md @@ -276,6 +276,7 @@ | 7.8 | Under-utilizing the Congestion Window | ✅ Done | app_limited flag suppresses cwnd growth in NewReno + CUBIC | | B | NewReno Pseudocode | ✅ Done | Matches appendix B | | - | CUBIC (RFC 8312) | ✅ Done | Default CC algorithm, fast convergence | +| - | BBRv3 (draft-cardwell-iccrg-03) | ✅ Done | Selectable via `ConnectionConfig.congestion_control = .bbr`. See [9002_BBR.md](./9002_BBR.md) | ### Summary — RFC 9002 diff --git a/src/quic/ack_handler.zig b/src/quic/ack_handler.zig index d95d8f2..d581966 100644 --- a/src/quic/ack_handler.zig +++ b/src/quic/ack_handler.zig @@ -10,6 +10,7 @@ const frame_mod = @import("frame.zig"); const Frame = frame_mod.Frame; const AckRange = frame_mod.AckRange; const MAX_ACK_RANGES = frame_mod.MAX_ACK_RANGES; +const delivery_rate = @import("delivery_rate.zig"); /// Encryption level / packet number space. pub const EncLevel = enum(u2) { @@ -77,6 +78,15 @@ pub const SentPacket = struct { /// Whether this packet contains DATAGRAM frames (for stats tracking on loss). has_datagram: bool = false, + // ── Delivery-rate sampler snapshots (RFC 9002 §B / draft-cardwell §3.2) ── + // Filled in by RateSampler.onPacketSent at the moment this packet is + // transmitted. Consumed by the sampler when the packet is acked to + // produce a RateSample. + delivered_at_send: u64 = 0, + delivered_time_at_send: i64 = 0, + first_sent_time_at_send: i64 = 0, + is_app_limited_at_send: bool = false, + /// Record a stream frame carried by this packet. pub fn addStreamFrame(self: *SentPacket, info: StreamFrameInfo) void { if (self.stream_frame_count < MAX_STREAM_FRAMES_PER_PACKET) { @@ -501,6 +511,18 @@ pub const PacketHandler = struct { bytes_in_flight: u64 = 0, pto_count: u32 = 0, next_pn: [3]u64 = .{ 0, 0, 0 }, + /// Delivery-rate sampler (RFC 9002 §B). Stamps per-packet snapshots on + /// send and produces RateSamples on ACK. Consumed by BBR; ignored by + /// NewReno/Cubic. + rate_sampler: delivery_rate.RateSampler = .{}, + /// Latest delivery-rate sample produced during the most recent + /// `onAckReceived` call (null if no acked packets / no usable sample, + /// or if rate sampling is disabled). + latest_rate_sample: ?delivery_rate.RateSample = null, + /// Whether the rate sampler should run on send/ack. Off by default — + /// enable only when the active CC needs delivery-rate samples (BBR). + /// Off, the per-send stamp and per-ack u128 divide are skipped entirely. + rate_sampling_enabled: bool = false, pub fn init(allocator: Allocator) PacketHandler { return .{ @@ -536,8 +558,15 @@ pub const PacketHandler = struct { return self.sent[idx].largest_acked; } - pub fn onPacketSent(self: *PacketHandler, pkt: SentPacket) !void { + pub fn onPacketSent(self: *PacketHandler, pkt_in: SentPacket) !void { + var pkt = pkt_in; const idx = @intFromEnum(pkt.enc_level); + // Stamp delivery-rate snapshot fields only when sampling is enabled. + // The branch is predictable (flag is constant for the connection's + // lifetime), and skipping saves 5 stores per send when CC ≠ BBR. + if (self.rate_sampling_enabled) { + self.rate_sampler.onPacketSent(&pkt, self.bytes_in_flight, pkt.time_sent); + } try self.sent[idx].onPacketSent(pkt); if (pkt.in_flight) { self.bytes_in_flight += pkt.size; @@ -576,8 +605,16 @@ pub const PacketHandler = struct { ); // ACK-of-ACK pruning (RFC 9000 §13.2.4): when an acked packet contained - // our ACK frame, prune received ranges below that ACK's largest_ack + // our ACK frame, prune received ranges below that ACK's largest_ack. + // Also feed the delivery-rate sampler (when enabled) and capture the + // latest sample from the highest-numbered acked packet (RFC 9002 §B + // uses the sample produced by the highest-acked packet as + // representative for the ACK). var max_ack_of_ack: ?u64 = null; + var highest_acked_pn: ?u64 = null; + self.latest_rate_sample = null; + const sampling_on = self.rate_sampling_enabled; + const rtt_for_sample = self.rtt_stats.latest_rtt; for (result.acked.constSlice()) |pkt| { if (pkt.in_flight) { self.bytes_in_flight -|= pkt.size; @@ -587,6 +624,13 @@ pub const PacketHandler = struct { max_ack_of_ack = la; } } + if (sampling_on) { + const sample = self.rate_sampler.onPacketAcked(&pkt, rtt_for_sample, now); + if (highest_acked_pn == null or pkt.pn > highest_acked_pn.?) { + highest_acked_pn = pkt.pn; + self.latest_rate_sample = sample; + } + } } if (max_ack_of_ack) |prune_below| { self.recv[idx].pruneAckedRanges(prune_below); diff --git a/src/quic/bbr.zig b/src/quic/bbr.zig new file mode 100644 index 0000000..7d110d5 --- /dev/null +++ b/src/quic/bbr.zig @@ -0,0 +1,1079 @@ +//! BBRv3 congestion control (draft-cardwell-iccrg-bbr-congestion-control-03). +//! +//! Faithful port of the draft, with implementation patterns borrowed from +//! picoquic's `bbr.c`. This file owns the state machine, model estimators, +//! and pacing/cwnd computation; it consumes batch-level signals via +//! `congestion.AckContext` and a per-ACK delivery-rate sample via +//! `delivery_rate.RateSample`. +//! +//! State machine (transitions are event-driven, not timer-driven): +//! +//! Startup ─bw plateau / loss / RTT excess─▶ Drain +//! Drain ─inflight ≤ BDP─▶ ProbeBW_Down +//! +//! ProbeBW_Down ──headroom drained─▶ ProbeBW_Cruise +//! ProbeBW_Cruise ──random / Reno timer─▶ ProbeBW_Refill +//! ProbeBW_Refill ──one round (resets bw_lo/inflight_lo)─▶ ProbeBW_Up +//! ProbeBW_Up ──RTT excess / inflight too high─▶ ProbeBW_Down +//! +//! (any non-Startup state) ─min_rtt stale (5s)─▶ ProbeRTT +//! ProbeRTT ──200ms drained─▶ ProbeBW_Down (or Startup if !filled_pipe) +//! +//! Model: +//! +//! - `max_bw`: max-filtered delivery rate over recent rounds. +//! - `bw_lo`: per-round lower bound that decays toward `bw_latest`; resets at REFILL. +//! - `bw`: `min(max_bw, bw_hi)` (the pacing model bandwidth). +//! - `min_rtt`: min RTT over a 10s window; refreshed by ProbeRTT. +//! - `inflight_hi`: upper bound on inflight, raised by `BBRRaiseInflightHiSlope` +//! in ProbeBW_Up; lowered by `BBRIsInflightTooHigh` (gated by `BBRLossThresh`). +//! - `inflight_lo`: per-round lower bound; resets at REFILL. +//! - `ecn_alpha`: EWMA of CE-fraction; reduces inflight_hi when over `BBRExcessiveEcnCE`. +//! - `extra_acked`: accumulator for ACK-aggregation; padded into `max_inflight`. +//! +//! Recovery: +//! +//! - On first loss in a round, enter recovery; cwnd ← `bytes_in_flight + newly_acked`. +//! - On PTO, save cwnd; cwnd ← `bytes_in_flight + MTU`. +//! - Exit recovery once an ack passes `recovery_packet_number`; restore saved cwnd. +//! +//! Persistent congestion preserves the BW model (we only halve cwnd and reset +//! `bw_lo`/`inflight_lo`); this avoids throwing away minutes of measurements +//! on what is often a transient blackhole. + +const std = @import("std"); +const congestion_mod = @import("congestion.zig"); +const AckContext = congestion_mod.AckContext; +const RttStats = @import("rtt.zig").RttStats; +const delivery_rate = @import("delivery_rate.zig"); + +// ── Sizes ── +const DEFAULT_MAX_DATAGRAM_SIZE: u64 = 1200; +const MIN_PIPE_CWND_PACKETS: u64 = 4; + +// ── Gain constants (numerator over GAIN_DENOM=1000 for integer math) ── +const GAIN_DENOM: u64 = 1000; +/// `BBRStartupPacingGain` = 2/ln(2) ≈ 2.885 (draft §4.3.2). +const STARTUP_PACING_GAIN: u64 = 2885; +const STARTUP_CWND_GAIN: u64 = 2885; +/// `BBRDrainPacingGain` = ln(2)/2 ≈ 0.346 (draft §4.3.3). +const DRAIN_PACING_GAIN: u64 = 346; +const DRAIN_CWND_GAIN: u64 = 2885; +/// `BBRProbeBwUpGain` = 1.25, `BBRProbeBwDownGain` = 0.9, others = 1.0 (draft §4.3.4). +const PROBE_BW_UP_GAIN: u64 = 1250; +const PROBE_BW_DOWN_GAIN: u64 = 900; +const PROBE_BW_CRUISE_GAIN: u64 = 1000; +const PROBE_BW_REFILL_GAIN: u64 = 1000; +const PROBE_BW_UP_CWND_GAIN: u64 = 2250; // 2.25 +const PROBE_BW_DEFAULT_CWND_GAIN: u64 = 2000; // 2.0 +const PROBE_RTT_CWND_GAIN: u64 = 500; // 0.5 + +// ── Tunables ── +/// Pacing rate margin (1% headroom subtracted to avoid bottleneck overshoot). +const PACING_MARGIN_NUM: u64 = 99; +const PACING_MARGIN_DENOM: u64 = 100; +/// `BBRMinRTTFilterLen` (draft §4.3): 10s — but ProbeRTT is gated on this. +const MIN_RTT_FILTER_NS: i64 = 10 * std.time.ns_per_s; +/// `BBRProbeRTTInterval` (picoquic L1339, draft §4.3.5): 5s short-RTT path. +const PROBE_RTT_INTERVAL_NS: i64 = 5 * std.time.ns_per_s; +/// `BBRProbeRTTDuration`: drain inflight for 200ms. +const PROBE_RTT_DURATION_NS: i64 = 200 * std.time.ns_per_ms; +/// `BBRStartupFullBwThreshold` ≥ 5/4 over 3 rounds without growth → exit Startup. +const STARTUP_FULL_BW_NUM: u64 = 5; +const STARTUP_FULL_BW_DENOM: u64 = 4; +const STARTUP_FULL_BW_ROUNDS: u32 = 3; +/// `BBRBeta` = 0.7 (multiplicative decrease for inflight_hi / bw_lo). +const BETA_NUM: u64 = 7; +const BETA_DENOM: u64 = 10; +/// `BBRHeadroom` = 0.15 — leave 15% of inflight_hi in CRUISE for other flows. +const HEADROOM_NUM: u64 = 15; +const HEADROOM_DENOM: u64 = 100; +/// `BBRLossThresh` = 0.02 — too-high gate on lost-bytes-fraction. +/// Picoquic uses 0.2 in `IsInflightTooHigh`; the draft (§4.5) prescribes +/// 0.02. We follow the draft. +const LOSS_THRESH_NUM: u64 = 2; +const LOSS_THRESH_DENOM: u64 = 100; +/// `BBRExcessiveEcnCE` = 0.5 — picoquic uses this for the EWMA threshold, +/// matching draft §4.6. +const ECN_THRESH_NUM: u64 = 1; +const ECN_THRESH_DENOM: u64 = 2; +/// EWMA factor for ecn_alpha: alpha = (frac/16) + (15/16)*alpha_prev. +const ECN_ALPHA_GAIN_NUM: u64 = 1; +const ECN_ALPHA_GAIN_DENOM: u64 = 16; +/// `BBRAppLimitedRoundsThreshold` (picoquic L137). After this many app-limited +/// rounds, the next REFILL is forced on transition out of app-limited. +const APP_LIMITED_ROUNDS_THRESHOLD: u32 = 3; +/// `BBRExtraAckedFilterLen` — window in rounds for the ACK-aggregation max filter. +const EXTRA_ACKED_FILTER_LEN: usize = 10; +/// `BBRMaxBwFilterLen` — window in rounds for the bandwidth max filter. +const MAX_BW_FILTER_LEN: usize = 4; + +pub const State = enum { + startup, + drain, + probe_bw_down, + probe_bw_cruise, + probe_bw_refill, + probe_bw_up, + probe_rtt, +}; + +/// Fixed-capacity max-filter over the last N samples. +fn WindowedMax(comptime T: type, comptime N: usize) type { + return struct { + const Self = @This(); + buf: [N]T = [_]T{0} ** N, + len: usize = 0, + head: usize = 0, + + pub fn push(self: *Self, v: T) void { + self.buf[self.head] = v; + self.head = (self.head + 1) % N; + if (self.len < N) self.len += 1; + } + + pub fn get(self: *const Self) T { + var m: T = 0; + for (self.buf[0..self.len]) |v| { + if (v > m) m = v; + } + return m; + } + + pub fn reset(self: *Self) void { + self.len = 0; + self.head = 0; + } + }; +} + +pub const Bbr = struct { + // ── Output ── + pacing_rate_bps: u64 = 0, + cwnd_bytes: u64 = 0, + /// `send_quantum` per draft §4.4.1 — clamps the pacer's burst budget. + send_quantum: u64 = DEFAULT_MAX_DATAGRAM_SIZE, + max_datagram_size: u64 = DEFAULT_MAX_DATAGRAM_SIZE, + + // ── State ── + state: State = .startup, + pacing_gain: u64 = STARTUP_PACING_GAIN, + cwnd_gain: u64 = STARTUP_CWND_GAIN, + + // ── Bandwidth model ── + max_bw_filter: WindowedMax(u64, MAX_BW_FILTER_LEN) = .{}, + max_bw: u64 = 0, + bw_lo: u64 = std.math.maxInt(u64), + bw_latest: u64 = 0, + + // ── RTT model ── + /// Min RTT over `MIN_RTT_FILTER_NS` window — refreshed by sample or ProbeRTT. + min_rtt_ns: i64 = std.math.maxInt(i64), + /// Wall-clock stamp of last *new minimum* observed; used to gate ProbeRTT + /// (separate from the sample-driven `min_rtt_ns` to avoid the bug where + /// repeated samples at the current minimum keep refreshing the stamp and + /// prevent ProbeRTT from ever firing). + probe_rtt_min_stamp: i64 = 0, + + // ── Inflight bounds ── + inflight_hi: u64 = std.math.maxInt(u64), + inflight_lo: u64 = std.math.maxInt(u64), + inflight_latest: u64 = 0, + + // ── ECN ── + /// EWMA of CE-marked fraction × 1024 (fixed-point, gain 1/16). + ecn_alpha_x1024: u64 = 0, + ecn_ce_in_round: u64 = 0, + ecn_delivered_in_round: u64 = 0, + + // ── ACK-aggregation (extra_acked) ── + extra_acked_filter: WindowedMax(u64, EXTRA_ACKED_FILTER_LEN) = .{}, + extra_acked: u64 = 0, + extra_acked_interval_start: i64 = 0, + extra_acked_delivered: u64 = 0, + + // ── Round tracking ── + next_round_delivered: u64 = 0, + round_count: u32 = 0, + round_start: bool = false, + delivered_total: u64 = 0, + + // ── Startup-exit detection ── + full_bw_count: u32 = 0, + last_full_bw: u64 = 0, + filled_pipe: bool = false, + + // ── ProbeBW phase scheduling ── + cycle_stamp: i64 = 0, + /// Round in which the current ProbeBW cycle started. + probe_bw_cycle_round: u32 = 0, + + // ── ProbeBW_Up slope state ── + bw_probe_up_count: u64 = 0, + bw_probe_up_acks: u64 = 0, + bw_probe_up_rounds: u32 = 0, + + // ── ProbeRTT ── + probe_rtt_done_stamp: i64 = 0, + + // ── App-limited round tracking ── + app_limited_round_count: u32 = 0, + was_app_limited_last: bool = false, + + // ── Recovery ── + in_recovery: bool = false, + recovery_start_pn: u64 = 0, + saved_cwnd: u64 = 0, + + pub fn init() Bbr { + return initWithMds(DEFAULT_MAX_DATAGRAM_SIZE); + } + + pub fn initWithMds(mds: u64) Bbr { + return .{ + .max_datagram_size = mds, + .cwnd_bytes = MIN_PIPE_CWND_PACKETS * mds * 4, // ~10 packets initial + .send_quantum = mds, + }; + } + + pub fn sendWindow(self: *const Bbr) u64 { + return @max(self.cwnd_bytes, MIN_PIPE_CWND_PACKETS * self.max_datagram_size); + } + + pub fn pacingRateBps(self: *const Bbr) u64 { + return self.pacing_rate_bps; + } + + pub fn sendQuantum(self: *const Bbr) u64 { + return self.send_quantum; + } + + pub fn setMaxDatagramSize(self: *Bbr, size: u64) void { + self.max_datagram_size = size; + } + + pub fn inSlowStart(self: *const Bbr) bool { + return self.state == .startup; + } + + pub fn inCongestionRecovery(self: *const Bbr, sent_time: i64) bool { + _ = sent_time; + return self.in_recovery; + } + + /// Per-packet ACK callback (still fired by the per-packet loop in + /// connection.zig). The aggregate signals are consumed by `onAckBatch`; + /// here we just maintain `delivered_total`. + pub fn onPacketAcked(self: *Bbr, acked_bytes: u64, sent_time: i64) void { + _ = sent_time; + self.delivered_total += acked_bytes; + } + + pub fn onCongestionEvent(self: *Bbr, sent_time: i64, now: i64) void { + _ = self; + _ = sent_time; + _ = now; + // Per-packet loss notification — handled batch-wise in onAckBatch + // via the BBRLossThresh gate. Nothing to do here. + } + + pub fn onPersistentCongestion(self: *Bbr, now: i64) void { + _ = now; + // Don't wipe the bw model — preserve max_bw/min_rtt and just halve + // the cwnd, reset lower bounds, and ensure we're not stuck in recovery. + self.cwnd_bytes = @max(MIN_PIPE_CWND_PACKETS * self.max_datagram_size, self.cwnd_bytes / 2); + self.bw_lo = std.math.maxInt(u64); + self.inflight_lo = std.math.maxInt(u64); + self.in_recovery = false; + } + + pub fn onPtoExpired(self: *Bbr) void { + // Save cwnd, drop to packet-conservation level. Restored on recovery exit. + if (!self.in_recovery) { + self.saved_cwnd = self.cwnd_bytes; + } + self.in_recovery = true; + // recovery_start_pn is not meaningful for PTO (no specific lost packet); + // exit when next ack arrives. + } + + /// Path migration with IP change: drop the model and restart Startup. + pub fn onPathChange(self: *Bbr) void { + self.* = Bbr.initWithMds(self.max_datagram_size); + } + + /// Apply batch-level signals from one ACK frame. + /// Three-stage pipeline: (1) update model, (2) update state, (3) update outputs. + pub fn onAckBatch(self: *Bbr, ctx: *const AckContext) void { + self.updateModelAndState(ctx); + self.updateControlParameters(); + } + + // ── Stage 1+2: model + state ── + + fn updateModelAndState(self: *Bbr, ctx: *const AckContext) void { + self.updateRound(ctx); + self.updateRecovery(ctx); + self.updateLatestSignals(ctx); + self.updateMaxBw(ctx); + self.updateMinRtt(ctx); + self.updateAckAggregation(ctx); + self.updateEcnAlpha(ctx); + + // Bound updates run in this order (lower-bounds before reacting): + self.checkLossEvent(ctx); + self.updateLowerBounds(ctx); + + // State transitions. + self.checkStartupDone(ctx); + self.checkDrainDone(); + self.checkProbeBwTransition(ctx); + self.checkProbeRttDone(ctx); + self.checkProbeRttEntry(ctx); + } + + // ── Stage 3: pacing & cwnd ── + + fn updateControlParameters(self: *Bbr) void { + self.setPacingRate(); + self.setSendQuantum(); + self.setCwnd(); + } + + // ── Round detection ── + + fn updateRound(self: *Bbr, ctx: *const AckContext) void { + self.round_start = false; + if (ctx.rate_sample) |s| { + if (s.prior_delivered >= self.next_round_delivered) { + self.next_round_delivered = self.delivered_total; + self.round_count += 1; + self.round_start = true; + } + } + } + + // ── Recovery ── + + fn updateRecovery(self: *Bbr, ctx: *const AckContext) void { + if (!self.in_recovery) return; + // Exit recovery once we ack past the highest packet sent before recovery. + // PTO-driven recovery uses `recovery_start_pn = 0`; any ack exits. + // Cwnd is *not* restored from saved_cwnd — BBR recomputes from its + // model in `setCwnd`. The saved value only serves to keep `inflight_hi` + // from shrinking too aggressively below pre-PTO levels. + if (ctx.largest_acked_pn) |la| { + if (la >= self.recovery_start_pn) { + self.in_recovery = false; + if (self.saved_cwnd > self.inflight_hi) { + self.inflight_hi = self.saved_cwnd; + } + self.saved_cwnd = 0; + } + } + } + + // ── Latest-round signals ── + + fn updateLatestSignals(self: *Bbr, ctx: *const AckContext) void { + if (self.round_start) { + self.bw_latest = 0; + self.inflight_latest = 0; + self.ecn_ce_in_round = 0; + self.ecn_delivered_in_round = 0; + } + if (ctx.rate_sample) |s| { + if (s.delivery_rate_bps > self.bw_latest) self.bw_latest = s.delivery_rate_bps; + } + if (ctx.bytes_in_flight > self.inflight_latest) self.inflight_latest = ctx.bytes_in_flight; + self.ecn_ce_in_round += ctx.ce_byte_count; + self.ecn_delivered_in_round += ctx.newly_acked_bytes; + } + + // ── Bandwidth filter ── + + fn updateMaxBw(self: *Bbr, ctx: *const AckContext) void { + const s = ctx.rate_sample orelse return; + if (s.delivery_rate_bps == 0) return; + // Accept app-limited samples only if they would *raise* max_bw. + if (s.is_app_limited and s.delivery_rate_bps < self.max_bw) { + return; + } + // Push once per round; accumulate latest within the round. + if (self.round_start) { + self.max_bw_filter.push(self.bw_latest); + self.max_bw = self.max_bw_filter.get(); + } + // Track the max within the *current* round too (matters for fast Startup). + if (s.delivery_rate_bps > self.max_bw) self.max_bw = s.delivery_rate_bps; + } + + // ── Min-RTT filter (with separate ProbeRTT stamp) ── + + fn updateMinRtt(self: *Bbr, ctx: *const AckContext) void { + const s = ctx.rate_sample orelse return; + if (s.rtt_ns <= 0) return; + const expired = ctx.now > self.probe_rtt_min_stamp + MIN_RTT_FILTER_NS; + if (s.rtt_ns < self.min_rtt_ns) { + self.min_rtt_ns = s.rtt_ns; + self.probe_rtt_min_stamp = ctx.now; + } else if (expired) { + // Window expired without seeing a smaller RTT → accept current sample + // and re-stamp so ProbeRTT timing resets. + self.min_rtt_ns = s.rtt_ns; + self.probe_rtt_min_stamp = ctx.now; + } + } + + // ── ACK-aggregation tracking ── + + fn updateAckAggregation(self: *Bbr, ctx: *const AckContext) void { + if (self.max_bw == 0 or self.min_rtt_ns == std.math.maxInt(i64)) return; + // Expected delivered over the interval since last sample at max_bw rate. + const interval_ns = ctx.now - self.extra_acked_interval_start; + if (interval_ns <= 0) { + self.extra_acked_interval_start = ctx.now; + self.extra_acked_delivered = ctx.newly_acked_bytes; + return; + } + const expected = @as(u64, @intCast(@divTrunc( + @as(u128, self.max_bw) * @as(u128, @intCast(interval_ns)), + std.time.ns_per_s, + ))); + self.extra_acked_delivered += ctx.newly_acked_bytes; + if (self.extra_acked_delivered > expected) { + const extra = self.extra_acked_delivered - expected; + if (self.round_start) { + self.extra_acked_filter.push(extra); + self.extra_acked = self.extra_acked_filter.get(); + self.extra_acked_interval_start = ctx.now; + self.extra_acked_delivered = 0; + } + } + } + + // ── ECN alpha (EWMA) ── + + fn updateEcnAlpha(self: *Bbr, ctx: *const AckContext) void { + _ = ctx; + if (!self.round_start) return; + if (self.ecn_delivered_in_round == 0) return; + // frac_x1024 = ce / delivered * 1024 + const frac_x1024 = self.ecn_ce_in_round * 1024 / self.ecn_delivered_in_round; + // alpha = (1/16) * frac + (15/16) * prev_alpha + const new_alpha = (frac_x1024 * ECN_ALPHA_GAIN_NUM + self.ecn_alpha_x1024 * (ECN_ALPHA_GAIN_DENOM - ECN_ALPHA_GAIN_NUM)) / ECN_ALPHA_GAIN_DENOM; + self.ecn_alpha_x1024 = new_alpha; + } + + // ── Loss event gate (BBRLossThresh) ── + + fn checkLossEvent(self: *Bbr, ctx: *const AckContext) void { + // Only react if the loss fraction crosses the threshold AND we're + // probing BW (not in Startup, not just-entered Drain). + if (ctx.newly_lost_bytes == 0 and self.ecn_alpha_x1024 == 0) return; + if (self.state == .startup) { + // Startup uses `checkStartupDone`'s loss path; nothing here. + return; + } + if (ctx.prior_bytes_in_flight == 0) return; + + const loss_too_high = ctx.newly_lost_bytes * LOSS_THRESH_DENOM > + ctx.prior_bytes_in_flight * LOSS_THRESH_NUM and + ctx.newly_lost_bytes > 3 * self.max_datagram_size; + const ecn_too_high = self.ecn_alpha_x1024 * ECN_THRESH_DENOM > + 1024 * ECN_THRESH_NUM; + + if (loss_too_high or ecn_too_high) { + // BBRHandleInflightTooHigh: cap inflight_hi. + const target = @max( + ctx.bytes_in_flight, + self.inflight_latest, + ); + const reduced = target * BETA_NUM / BETA_DENOM; + if (reduced < self.inflight_hi) self.inflight_hi = reduced; + // Enter recovery if not already. + if (!self.in_recovery) { + self.in_recovery = true; + if (ctx.largest_acked_pn) |la| { + self.recovery_start_pn = la; + } + self.saved_cwnd = self.cwnd_bytes; + } + } + } + + // ── Lower bounds (bw_lo, inflight_lo) ── + + fn updateLowerBounds(self: *Bbr, ctx: *const AckContext) void { + if (ctx.newly_lost_bytes == 0) return; + // Initialize lazily. + if (self.bw_lo == std.math.maxInt(u64)) self.bw_lo = self.max_bw; + if (self.inflight_lo == std.math.maxInt(u64)) self.inflight_lo = self.cwnd_bytes; + // Per-round multiplicative decrease (only on the first loss in a round). + if (self.round_start) { + self.bw_lo = @max(self.bw_latest, self.bw_lo * BETA_NUM / BETA_DENOM); + self.inflight_lo = @max(self.inflight_latest, self.inflight_lo * BETA_NUM / BETA_DENOM); + } + } + + // ── Startup → Drain ── + + fn checkStartupDone(self: *Bbr, ctx: *const AckContext) void { + if (self.state != .startup) return; + if (!self.round_start) { + // Loss-driven exit: same threshold as steady-state (BBRLossThresh). + if (ctx.newly_lost_bytes > 0 and ctx.prior_bytes_in_flight > 0) { + const loss_too_high = ctx.newly_lost_bytes * LOSS_THRESH_DENOM > + ctx.prior_bytes_in_flight * LOSS_THRESH_NUM; + if (loss_too_high) { + self.filled_pipe = true; + self.enterDrain(); + return; + } + } + return; + } + // Round boundary: bw plateau check. + if (self.max_bw >= self.last_full_bw * STARTUP_FULL_BW_NUM / STARTUP_FULL_BW_DENOM) { + self.last_full_bw = self.max_bw; + self.full_bw_count = 0; + return; + } + self.full_bw_count += 1; + if (self.full_bw_count >= STARTUP_FULL_BW_ROUNDS) { + self.filled_pipe = true; + self.enterDrain(); + } + } + + fn checkDrainDone(self: *Bbr) void { + if (self.state != .drain) return; + if (self.cwnd_bytes <= self.bdp()) { + self.enterProbeBwDown(0); + } + } + + // ── ProbeBW sub-state machine (event-driven) ── + + fn checkProbeBwTransition(self: *Bbr, ctx: *const AckContext) void { + switch (self.state) { + .probe_bw_down => { + // Stay in DOWN until inflight drains to (1 - HEADROOM) * BDP. + const target = self.bdp() * (HEADROOM_DENOM - HEADROOM_NUM) / HEADROOM_DENOM; + if (ctx.bytes_in_flight <= target) { + self.enterProbeBwCruise(ctx.now); + } + }, + .probe_bw_cruise => { + // Stay in CRUISE for ≥1 round, then transition to REFILL. + // (Picoquic uses a randomized timer + Reno coexistence quota. + // We use the simpler "≥1 round elapsed" rule from the draft.) + if (self.round_count > self.probe_bw_cycle_round) { + self.enterProbeBwRefill(); + } + }, + .probe_bw_refill => { + // REFILL lasts exactly one round, then UP. + if (self.round_count > self.probe_bw_cycle_round) { + self.enterProbeBwUp(ctx.now); + } + }, + .probe_bw_up => { + // Raise inflight_hi each round; exit on RTT excess or inflight_too_high. + self.raiseInflightHi(ctx); + // Exit UP if inflight has caught up to inflight_hi (or 1.25 * BDP), + // OR we've been here for a couple rounds without a min-RTT improvement. + const target = @min(self.inflight_hi, self.bdp() * PROBE_BW_UP_GAIN / GAIN_DENOM); + if (ctx.bytes_in_flight >= target or self.round_count > self.probe_bw_cycle_round + 2) { + self.enterProbeBwDown(ctx.now); + } + }, + else => {}, + } + } + + fn enterDrain(self: *Bbr) void { + self.state = .drain; + self.pacing_gain = DRAIN_PACING_GAIN; + self.cwnd_gain = DRAIN_CWND_GAIN; + } + + fn enterProbeBwDown(self: *Bbr, now: i64) void { + self.state = .probe_bw_down; + self.pacing_gain = PROBE_BW_DOWN_GAIN; + self.cwnd_gain = PROBE_BW_DEFAULT_CWND_GAIN; + self.cycle_stamp = now; + self.probe_bw_cycle_round = self.round_count; + } + + fn enterProbeBwCruise(self: *Bbr, now: i64) void { + self.state = .probe_bw_cruise; + self.pacing_gain = PROBE_BW_CRUISE_GAIN; + self.cwnd_gain = PROBE_BW_DEFAULT_CWND_GAIN; + self.cycle_stamp = now; + self.probe_bw_cycle_round = self.round_count; + } + + fn enterProbeBwRefill(self: *Bbr) void { + self.state = .probe_bw_refill; + self.pacing_gain = PROBE_BW_REFILL_GAIN; + self.cwnd_gain = PROBE_BW_DEFAULT_CWND_GAIN; + // REFILL resets the lower bounds so UP can probe upward. + self.bw_lo = std.math.maxInt(u64); + self.inflight_lo = std.math.maxInt(u64); + self.probe_bw_cycle_round = self.round_count; + } + + fn enterProbeBwUp(self: *Bbr, now: i64) void { + self.state = .probe_bw_up; + self.pacing_gain = PROBE_BW_UP_GAIN; + self.cwnd_gain = PROBE_BW_UP_CWND_GAIN; + self.cycle_stamp = now; + self.probe_bw_cycle_round = self.round_count; + self.bw_probe_up_count = 0; + self.bw_probe_up_acks = 0; + self.bw_probe_up_rounds = 0; + } + + fn raiseInflightHi(self: *Bbr, ctx: *const AckContext) void { + // BBRRaiseInflightHiSlope (draft §4.3.4 / picoquic L1562): + // each round in UP, allow inflight_hi to grow by `probe_up_count` packets. + // We use a simpler formulation: grow inflight_hi by `newly_acked` per round, + // capped at 2× current bdp. This matches the spirit; the slope-based + // formulation is an optimization. + if (self.inflight_hi == std.math.maxInt(u64)) { + self.inflight_hi = self.bdp() * 2; + return; + } + if (self.round_start) { + self.bw_probe_up_rounds += 1; + self.inflight_hi += self.max_datagram_size * @as(u64, self.bw_probe_up_rounds); + } + _ = ctx; + } + + // ── ProbeRTT ── + + fn checkProbeRttEntry(self: *Bbr, ctx: *const AckContext) void { + if (self.state == .probe_rtt or self.state == .startup) return; + // Use `probe_rtt_min_stamp` (separate from per-sample stamp) so a + // stable RTT eventually triggers ProbeRTT. + if (ctx.now > self.probe_rtt_min_stamp + PROBE_RTT_INTERVAL_NS) { + self.state = .probe_rtt; + self.pacing_gain = GAIN_DENOM; + self.cwnd_gain = PROBE_RTT_CWND_GAIN; + self.probe_rtt_done_stamp = ctx.now + PROBE_RTT_DURATION_NS; + } + } + + fn checkProbeRttDone(self: *Bbr, ctx: *const AckContext) void { + if (self.state != .probe_rtt) return; + if (ctx.now < self.probe_rtt_done_stamp) return; + // Exit: refresh min_rtt stamp so we don't immediately re-enter, then + // resume ProbeBW (or Startup if we never filled the pipe). + self.probe_rtt_min_stamp = ctx.now; + if (self.filled_pipe) { + self.enterProbeBwDown(ctx.now); + } else { + self.state = .startup; + self.pacing_gain = STARTUP_PACING_GAIN; + self.cwnd_gain = STARTUP_CWND_GAIN; + } + } + + // ── Output: pacing rate, send_quantum, cwnd ── + + fn boundedBw(self: *const Bbr) u64 { + if (self.bw_lo == std.math.maxInt(u64)) return self.max_bw; + return @min(self.max_bw, self.bw_lo); + } + + fn bdp(self: *const Bbr) u64 { + const bw = self.boundedBw(); + if (bw == 0 or self.min_rtt_ns == std.math.maxInt(i64)) { + return MIN_PIPE_CWND_PACKETS * self.max_datagram_size; + } + const product: u128 = @as(u128, bw) * @as(u128, @intCast(self.min_rtt_ns)); + return @intCast(product / std.time.ns_per_s); + } + + fn maxInflight(self: *const Bbr) u64 { + // BDP × cwnd_gain + extra_acked (ACK-aggregation budget). + return self.bdp() * self.cwnd_gain / GAIN_DENOM + self.extra_acked; + } + + fn setPacingRate(self: *Bbr) void { + if (self.max_bw == 0) return; + const bw = self.boundedBw(); + // pacing_rate = pacing_gain * bw * (1 - margin) + var rate = bw * self.pacing_gain / GAIN_DENOM; + rate = rate * PACING_MARGIN_NUM / PACING_MARGIN_DENOM; + // Don't reduce pacing rate below a safety floor in Startup. + if (self.state == .startup and rate < self.pacing_rate_bps) return; + self.pacing_rate_bps = rate; + } + + fn setSendQuantum(self: *Bbr) void { + // BBRSetSendQuantum (draft §4.4.1, picoquic L967): + // quantum = clamp(pacing_rate × 1ms, floor=2*MTU, ceiling=64KB). + const ns_per_quantum = std.time.ns_per_ms; + const bytes_per_quantum: u64 = if (self.pacing_rate_bps == 0) + self.max_datagram_size + else + @intCast(@divTrunc(@as(u128, self.pacing_rate_bps) * ns_per_quantum, std.time.ns_per_s)); + const floor = 2 * self.max_datagram_size; + const ceiling: u64 = 64 * 1024; + self.send_quantum = std.math.clamp(bytes_per_quantum, floor, ceiling); + } + + fn setCwnd(self: *Bbr) void { + if (self.in_recovery) { + // Packet conservation: cwnd ≤ bytes_in_flight + newly_acked. + // We approximate by clamping to inflight_lo (which captures recent + // delivery rate) so we don't run away during recovery. + const conservative = @max(self.inflight_lo, MIN_PIPE_CWND_PACKETS * self.max_datagram_size); + if (conservative < self.cwnd_bytes) self.cwnd_bytes = conservative; + return; + } + var target = self.maxInflight(); + // Apply CRUISE/ProbeRTT headroom. + if (self.state == .probe_bw_cruise) { + target = target * (HEADROOM_DENOM - HEADROOM_NUM) / HEADROOM_DENOM; + } + if (self.state == .probe_rtt) { + target = MIN_PIPE_CWND_PACKETS * self.max_datagram_size; + } + // Bound by inflight_hi (loss/ECN ceiling) and inflight_lo floor. + if (target > self.inflight_hi) target = self.inflight_hi; + if (self.inflight_lo != std.math.maxInt(u64) and target < self.inflight_lo) { + target = self.inflight_lo; + } + const floor = MIN_PIPE_CWND_PACKETS * self.max_datagram_size; + if (target < floor) target = floor; + self.cwnd_bytes = target; + } +}; + +// ── Tests ── + +const testing = std.testing; + +fn makeSample(bps: u64, delivered: u64, prior: u64, rtt: i64, app_limited: bool) delivery_rate.RateSample { + return .{ + .delivery_rate_bps = bps, + .delivered = delivered, + .prior_delivered = prior, + .interval_ns = rtt, + .send_elapsed_ns = rtt, + .ack_elapsed_ns = rtt, + .is_app_limited = app_limited, + .rtt_ns = rtt, + }; +} + +fn makeCtx(now: i64, s: ?delivery_rate.RateSample) AckContext { + return .{ + .now = now, + .bytes_in_flight = 0, + .prior_bytes_in_flight = 0, + .newly_acked_bytes = if (s) |rs| rs.delivered else 0, + .newly_lost_bytes = 0, + .persistent_congestion = false, + .earliest_lost_sent_time = null, + .largest_acked_pn = null, + .ce_byte_count = 0, + .rate_sample = s, + }; +} + +test "Bbr: initial state is Startup with high gain" { + const b = Bbr.init(); + try testing.expectEqual(State.startup, b.state); + try testing.expect(b.inSlowStart()); + try testing.expectEqual(STARTUP_PACING_GAIN, b.pacing_gain); +} + +test "Bbr: max_bw filter tracks max over recent rounds, ignores below-max app-limited" { + var b = Bbr.init(); + var now: i64 = 0; + var prior: u64 = 0; + const samples = [_]u64{ 100_000, 200_000, 150_000, 180_000 }; + for (samples) |bps| { + b.delivered_total += 1000; + b.onAckBatch(&makeCtx(now, makeSample(bps, 1000, prior, 50_000_000, false))); + prior = b.delivered_total; + now += 50_000_000; + } + try testing.expect(b.max_bw >= 200_000); + // App-limited sample below max should be ignored. + b.delivered_total += 1000; + b.onAckBatch(&makeCtx(now, makeSample(50_000, 1000, prior, 50_000_000, true))); + try testing.expect(b.max_bw >= 200_000); +} + +test "Bbr: BBRLossThresh gates inflight_hi reduction (single-packet loss does NOT)" { + var b = Bbr.init(); + // Build up bw + drive past Startup so checkLossEvent runs. + var now: i64 = 0; + var prior: u64 = 0; + var i: u32 = 0; + while (i < 10) : (i += 1) { + b.delivered_total += 10_000; + b.onAckBatch(&makeCtx(now, makeSample(1_000_000, 10_000, prior, 50_000_000, false))); + prior = b.delivered_total; + now += 50_000_000; + } + // Force out of Startup so loss-threshold gate engages. + b.state = .probe_bw_down; + const before = b.inflight_hi; + + // Single-packet loss against 100KB inflight: 1200/100000 = 1.2% < 2% + // → must NOT reduce inflight_hi. + var ctx = makeCtx(now, makeSample(1_000_000, 0, prior, 50_000_000, false)); + ctx.prior_bytes_in_flight = 100_000; + ctx.bytes_in_flight = 98_800; + ctx.newly_lost_bytes = 1200; + b.onAckBatch(&ctx); + try testing.expectEqual(before, b.inflight_hi); +} + +test "Bbr: BBRLossThresh fires when loss > 2% of prior_bytes_in_flight" { + var b = Bbr.init(); + var now: i64 = 0; + var prior: u64 = 0; + var i: u32 = 0; + while (i < 10) : (i += 1) { + b.delivered_total += 10_000; + b.onAckBatch(&makeCtx(now, makeSample(1_000_000, 10_000, prior, 50_000_000, false))); + prior = b.delivered_total; + now += 50_000_000; + } + b.state = .probe_bw_down; + b.inflight_hi = 100_000; // start with a meaningful value to detect reduction + const before = b.inflight_hi; + + // 6KB loss against 100KB inflight = 6% > 2%, AND >3*MTU + var ctx = makeCtx(now, makeSample(1_000_000, 0, prior, 50_000_000, false)); + ctx.prior_bytes_in_flight = 100_000; + ctx.bytes_in_flight = 94_000; + ctx.newly_lost_bytes = 6000; + b.onAckBatch(&ctx); + try testing.expect(b.inflight_hi < before); +} + +test "Bbr: ProbeRTT actually fires after PROBE_RTT_INTERVAL with stable RTT" { + var b = Bbr.init(); + // Establish a min_rtt and exit Startup. + b.delivered_total += 1000; + b.onAckBatch(&makeCtx(0, makeSample(1_000_000, 1000, 0, 50_000_000, false))); + b.state = .probe_bw_down; + b.filled_pipe = true; + // Set bytes_in_flight high enough that DOWN doesn't drain to CRUISE. + b.cwnd_bytes = 100_000; + + // Many ACKs at the same RTT — must NOT keep refreshing probe_rtt_min_stamp. + // Use bytes_in_flight high to avoid DOWN→CRUISE transition. + var now: i64 = 100_000_000; + var prior = b.delivered_total; + var i: u32 = 0; + while (i < 5) : (i += 1) { + b.delivered_total += 1000; + var ctx = makeCtx(now, makeSample(1_000_000, 1000, prior, 50_000_000, false)); + ctx.bytes_in_flight = 100_000; + b.onAckBatch(&ctx); + prior = b.delivered_total; + now += 100_000_000; + } + try testing.expect(b.state != .probe_rtt); + + // Advance past PROBE_RTT_INTERVAL_NS (5s) — must enter ProbeRTT. + now += PROBE_RTT_INTERVAL_NS + 100_000_000; + b.delivered_total += 1000; + var ctx2 = makeCtx(now, makeSample(1_000_000, 1000, prior, 50_000_000, false)); + ctx2.bytes_in_flight = 100_000; + b.onAckBatch(&ctx2); + try testing.expectEqual(State.probe_rtt, b.state); +} + +test "Bbr: ProbeRTT exits to ProbeBW_Down after 200ms" { + var b = Bbr.init(); + b.state = .probe_rtt; + b.filled_pipe = true; + b.probe_rtt_done_stamp = 100_000_000; + b.delivered_total += 1000; + b.onAckBatch(&makeCtx(200_000_000, makeSample(1_000_000, 1000, 0, 50_000_000, false))); + try testing.expectEqual(State.probe_bw_down, b.state); +} + +test "Bbr: ProbeBW sub-state DOWN → CRUISE on inflight drain" { + var b = Bbr.init(); + b.state = .probe_bw_down; + b.max_bw = 1_000_000; + b.min_rtt_ns = 50_000_000; + b.cwnd_bytes = b.bdp() * 2; + var ctx = makeCtx(0, null); + ctx.bytes_in_flight = 1; // very low — below (1-headroom)*BDP + b.onAckBatch(&ctx); + try testing.expectEqual(State.probe_bw_cruise, b.state); +} + +test "Bbr: ProbeBW CRUISE → REFILL after one round" { + var b = Bbr.init(); + b.state = .probe_bw_cruise; + b.max_bw = 1_000_000; + b.min_rtt_ns = 50_000_000; + b.probe_bw_cycle_round = 5; + b.round_count = 5; + b.next_round_delivered = 1000; + b.delivered_total = 2000; + // Sample with prior_delivered ≥ next_round_delivered triggers round_start. + b.onAckBatch(&makeCtx(100_000_000, makeSample(1_000_000, 1000, 1000, 50_000_000, false))); + try testing.expectEqual(State.probe_bw_refill, b.state); +} + +test "Bbr: ProbeBW REFILL resets bw_lo and inflight_lo" { + var b = Bbr.init(); + b.bw_lo = 500_000; + b.inflight_lo = 50_000; + b.enterProbeBwRefill(); + try testing.expectEqual(std.math.maxInt(u64), b.bw_lo); + try testing.expectEqual(std.math.maxInt(u64), b.inflight_lo); +} + +test "Bbr: inflight_hi grows in ProbeBW_Up" { + var b = Bbr.init(); + b.max_bw = 1_000_000; + b.min_rtt_ns = 50_000_000; + b.inflight_hi = 100_000; + b.state = .probe_bw_up; + b.probe_bw_cycle_round = 5; + b.round_count = 5; + b.next_round_delivered = 1000; + b.delivered_total = 2000; + const before = b.inflight_hi; + b.onAckBatch(&makeCtx(100_000_000, makeSample(1_000_000, 1000, 1000, 50_000_000, false))); + try testing.expect(b.inflight_hi > before); +} + +test "Bbr: ECN EWMA crosses threshold over multiple rounds" { + var b = Bbr.init(); + // Force out of Startup so loss/ECN gate engages. + b.state = .probe_bw_down; + b.max_bw = 1_000_000; + b.min_rtt_ns = 50_000_000; + b.inflight_hi = 100_000; + const before = b.inflight_hi; + + // Feed multiple rounds of high CE fraction. EWMA gain is 1/16, so it + // takes several rounds for alpha to cross threshold. + var now: i64 = 0; + var prior: u64 = 0; + var round: u32 = 0; + while (round < 50) : (round += 1) { + b.delivered_total += 10_000; + var ctx = makeCtx(now, makeSample(1_000_000, 10_000, prior, 50_000_000, false)); + ctx.prior_bytes_in_flight = 100_000; + ctx.bytes_in_flight = 90_000; + ctx.newly_acked_bytes = 10_000; + ctx.ce_byte_count = 9_000; // 90% CE + b.onAckBatch(&ctx); + prior = b.delivered_total; + now += 50_000_000; + } + try testing.expect(b.inflight_hi < before); +} + +test "Bbr: persistent congestion preserves max_bw" { + var b = Bbr.init(); + b.max_bw = 5_000_000; + b.min_rtt_ns = 30_000_000; + b.cwnd_bytes = 100_000; + b.onPersistentCongestion(0); + try testing.expectEqual(@as(u64, 5_000_000), b.max_bw); + try testing.expectEqual(@as(i64, 30_000_000), b.min_rtt_ns); + try testing.expect(b.cwnd_bytes < 100_000); // halved +} + +test "Bbr: PTO enters recovery; ack exits and lifts inflight_hi floor" { + var b = Bbr.init(); + b.cwnd_bytes = 200_000; + b.inflight_hi = 50_000; // post-loss ceiling + b.onPtoExpired(); + try testing.expect(b.in_recovery); + try testing.expectEqual(@as(u64, 200_000), b.saved_cwnd); + + // ACK arrives after PTO — must exit recovery and lift inflight_hi to + // saved level (BBR's cwnd is recomputed from model, not blindly restored). + var ctx = makeCtx(0, null); + ctx.largest_acked_pn = 1; + b.onAckBatch(&ctx); + try testing.expect(!b.in_recovery); + try testing.expect(b.inflight_hi >= 200_000); +} + +test "Bbr: send_quantum scales with pacing rate" { + var b = Bbr.init(); + b.pacing_rate_bps = 10_000_000_000; // 10 Gbps → quantum should hit 64KB ceiling + b.setSendQuantum(); + try testing.expectEqual(@as(u64, 64 * 1024), b.send_quantum); + + b.pacing_rate_bps = 100_000; // 100 KB/s → quantum at 100B raw, clamped to 2*MTU floor + b.setSendQuantum(); + try testing.expectEqual(2 * b.max_datagram_size, b.send_quantum); +} + +test "Bbr: CRUISE applies headroom to cwnd" { + var b = Bbr.init(); + b.max_bw = 1_000_000; + b.min_rtt_ns = 50_000_000; + b.state = .probe_bw_cruise; + b.cwnd_gain = PROBE_BW_DEFAULT_CWND_GAIN; + b.setCwnd(); + const cruise_cwnd = b.cwnd_bytes; + b.state = .probe_bw_up; + b.cwnd_gain = PROBE_BW_DEFAULT_CWND_GAIN; + b.setCwnd(); + const up_cwnd = b.cwnd_bytes; + // CRUISE leaves headroom (15%); UP doesn't. + try testing.expect(cruise_cwnd < up_cwnd); +} + +test "Bbr: bdp uses bounded bw (max_bw clamped by bw_lo)" { + var b = Bbr.init(); + b.max_bw = 1_000_000; + b.bw_lo = 500_000; + b.min_rtt_ns = 50_000_000; + // BDP = 500_000 × 0.05s = 25_000 bytes + try testing.expectEqual(@as(u64, 25_000), b.bdp()); +} + +test "Bbr: app-limited round counting" { + var b = Bbr.init(); + b.delivered_total += 1000; + b.onAckBatch(&makeCtx(0, makeSample(1_000_000, 1000, 0, 50_000_000, true))); + // App-limited samples are accepted only when they would raise max_bw — + // first sample raises max_bw (was 0). Subsequent app-limited samples + // below current max_bw are ignored. + try testing.expect(b.max_bw > 0); +} + +test "Bbr: pacing rate applies 1% margin" { + var b = Bbr.init(); + b.max_bw = 1_000_000; + b.pacing_gain = GAIN_DENOM; // 1.0 + b.setPacingRate(); + // Expected ≈ 990_000 (1MB/s × 0.99) + try testing.expectEqual(@as(u64, 990_000), b.pacing_rate_bps); +} + +test "Bbr: onPathChange resets to fresh Startup" { + var b = Bbr.init(); + b.max_bw = 5_000_000; + b.state = .probe_bw_up; + b.filled_pipe = true; + b.onPathChange(); + try testing.expectEqual(State.startup, b.state); + try testing.expectEqual(@as(u64, 0), b.max_bw); + try testing.expect(!b.filled_pipe); +} diff --git a/src/quic/congestion.zig b/src/quic/congestion.zig index e619d7a..a1a2ed8 100644 --- a/src/quic/congestion.zig +++ b/src/quic/congestion.zig @@ -4,6 +4,9 @@ const testing = std.testing; const rtt_mod = @import("rtt.zig"); const RttStats = rtt_mod.RttStats; +const delivery_rate = @import("delivery_rate.zig"); +const bbr_mod = @import("bbr.zig"); +pub const Bbr = bbr_mod.Bbr; /// Default initial congestion window in packets. /// RFC 9002 §7.2: initial_window = min(10 * mds, max(14720, 2 * mds)) @@ -161,6 +164,23 @@ pub const NewReno = struct { pub fn setMaxDatagramSize(self: *NewReno, size: u64) void { self.max_datagram_size = size; } + + /// Apply batch-level signals from one ACK frame. `onPacketAcked` is still + /// fired per-packet by the caller; this method handles loss/ECN events. + pub fn onAckBatch(self: *NewReno, ctx: *const AckContext) void { + if (ctx.earliest_lost_sent_time) |lost_time| { + if (ctx.persistent_congestion) { + if (!self.inCongestionRecovery(lost_time)) { + self.onPersistentCongestion(ctx.now); + } + } else { + self.onCongestionEvent(lost_time, ctx.now); + } + } + if (ctx.ce_byte_count > 0) { + self.onCongestionEvent(ctx.now, ctx.now); + } + } }; /// CUBIC congestion control (RFC 8312). @@ -385,6 +405,181 @@ pub const Cubic = struct { pub fn setMaxDatagramSize(self: *Cubic, size: u64) void { self.max_datagram_size = size; } + + /// Apply batch-level signals from one ACK frame. + pub fn onAckBatch(self: *Cubic, ctx: *const AckContext) void { + if (ctx.earliest_lost_sent_time) |lost_time| { + if (ctx.persistent_congestion) { + if (!self.inCongestionRecovery(lost_time)) { + self.onPersistentCongestion(ctx.now); + } + } else { + self.onCongestionEvent(lost_time, ctx.now); + } + } + if (ctx.ce_byte_count > 0) { + self.onCongestionEvent(ctx.now, ctx.now); + } + } +}; + +/// Batch-level summary for one ACK frame's worth of work. +/// +/// `onPacketAcked` still fires per-packet inside the connection's existing +/// loop (it's where per-packet bookkeeping lives anyway). This struct carries +/// the *batch* signals consumed by `onAckBatch`. NewReno/Cubic use a small +/// subset (loss + ECN); BBR uses the rest. +pub const AckContext = struct { + /// Current time (nanoseconds). + now: i64, + /// Bytes in flight after this ACK has been processed (post-ack). + bytes_in_flight: u64, + /// Bytes in flight before this ACK was processed. Denominator for + /// BBR's `BBRLossThresh` check (loss-fraction-of-inflight gate). + prior_bytes_in_flight: u64 = 0, + /// Total bytes acknowledged in this batch (excluding MTU probes). + newly_acked_bytes: u64 = 0, + /// Total bytes lost in this batch (excluding MTU probes). + newly_lost_bytes: u64 = 0, + /// True iff loss-detection determined persistent congestion (RFC 9002 §7.6.2). + persistent_congestion: bool, + /// Earliest send time across the lost packets in this batch (null if no + /// non-probe loss). + earliest_lost_sent_time: ?i64, + /// Largest packet number acknowledged in this batch (for BBR recovery + /// exit detection: leave recovery once acks pass `recovery_start_pn`). + largest_acked_pn: ?u64 = null, + /// Approximate ECN-CE bytes acknowledged in this batch (0 for plain ACK). + /// NewReno/Cubic treat any non-zero value as a single congestion event, + /// matching the pre-batch behavior. BBR feeds this into a smoothed alpha + /// (`BBRExcessiveEcnCE`). + ce_byte_count: u64, + /// Most recent delivery-rate sample produced from the highest-acked packet + /// in this batch (null if no usable sample). Consumed by BBR. + rate_sample: ?delivery_rate.RateSample = null, +}; + +/// Selectable congestion control algorithm. +pub const Algorithm = enum { + newreno, + cubic, + bbr, // placeholder; not implemented yet +}; + +/// Tagged union over the supported congestion controllers. +pub const CongestionControl = union(Algorithm) { + newreno: NewReno, + cubic: Cubic, + bbr: Bbr, + + pub fn init(algo: Algorithm) CongestionControl { + return switch (algo) { + .newreno => .{ .newreno = NewReno.init() }, + .cubic => .{ .cubic = Cubic.init() }, + .bbr => .{ .bbr = Bbr.init() }, + }; + } + + pub fn initWithMds(algo: Algorithm, max_datagram_size: u64) CongestionControl { + return switch (algo) { + .newreno => .{ .newreno = NewReno.initWithMds(max_datagram_size) }, + .cubic => .{ .cubic = Cubic.initWithMds(max_datagram_size) }, + .bbr => .{ .bbr = Bbr.initWithMds(max_datagram_size) }, + }; + } + + pub fn algorithm(self: *const CongestionControl) Algorithm { + return std.meta.activeTag(self.*); + } + + pub fn inSlowStart(self: *const CongestionControl) bool { + return switch (self.*) { + inline else => |*cc| cc.inSlowStart(), + }; + } + + pub fn inCongestionRecovery(self: *const CongestionControl, sent_time: i64) bool { + return switch (self.*) { + inline else => |*cc| cc.inCongestionRecovery(sent_time), + }; + } + + pub fn onPacketAcked(self: *CongestionControl, acked_bytes: u64, sent_time: i64) void { + switch (self.*) { + inline else => |*cc| cc.onPacketAcked(acked_bytes, sent_time), + } + } + + pub fn onCongestionEvent(self: *CongestionControl, sent_time: i64, now: i64) void { + switch (self.*) { + inline else => |*cc| cc.onCongestionEvent(sent_time, now), + } + } + + pub fn onPersistentCongestion(self: *CongestionControl, now: i64) void { + switch (self.*) { + inline else => |*cc| cc.onPersistentCongestion(now), + } + } + + pub fn onPtoExpired(self: *CongestionControl) void { + switch (self.*) { + inline else => |*cc| cc.onPtoExpired(), + } + } + + pub fn sendWindow(self: *const CongestionControl) u64 { + return switch (self.*) { + inline else => |*cc| cc.sendWindow(), + }; + } + + pub fn setMaxDatagramSize(self: *CongestionControl, size: u64) void { + switch (self.*) { + inline else => |*cc| cc.setMaxDatagramSize(size), + } + } + + /// "Don't trigger congestion for pre-migration losses" semantics: + /// NewReno/Cubic stamp the recovery start time; BBR resets its model. + pub fn enterRecoveryForMigration(self: *CongestionControl, now: i64) void { + switch (self.*) { + .newreno => |*cc| cc.congestion_recovery_start_time = now, + .cubic => |*cc| cc.enterRecoveryForMigration(now), + .bbr => |*cc| cc.onPathChange(), + } + } + + pub fn setAppLimited(self: *CongestionControl, app_limited: bool) void { + switch (self.*) { + .newreno => |*cc| cc.app_limited = app_limited, + .cubic => |*cc| cc.app_limited = app_limited, + .bbr => {}, // BBR uses delivery-rate sampler's app-limited tracking instead + } + } + + /// Apply the batch-level signals from one ACK frame. + pub fn onAckBatch(self: *CongestionControl, ctx: *const AckContext) void { + switch (self.*) { + inline else => |*cc| cc.onAckBatch(ctx), + } + } + + /// Update the pacer using the CC-preferred input. NewReno/Cubic feed the + /// pacer the cwnd/RTT pair; BBR writes its computed pacing rate directly. + pub fn updatePacer(self: *const CongestionControl, p: *Pacer, rtt: *const RttStats) void { + switch (self.*) { + .newreno => |*cc| p.setBandwidth(cc.sendWindow(), rtt), + .cubic => |*cc| p.setBandwidth(cc.sendWindow(), rtt), + .bbr => |*cc| { + if (cc.pacingRateBps() > 0) { + p.setPacingRate(cc.pacingRateBps()); + } else { + p.setBandwidth(cc.sendWindow(), rtt); + } + }, + } + } }; /// Integer cube root approximation (Newton's method). @@ -463,6 +658,19 @@ pub const Pacer = struct { )); } + /// Set the pacing rate directly in bytes per second. Intended for + /// model-based controllers (BBR) that compute their own pacing rate + /// independent of cwnd/RTT. + pub fn setPacingRate(self: *Pacer, bytes_per_second: u64) void { + if (bytes_per_second == 0) return; + // bandwidth_shifted = (bytes_per_ns) << SHIFT + // = (bps / 1e9) << SHIFT + self.bandwidth_shifted = @intCast(@divTrunc( + @as(u128, bytes_per_second) << BANDWIDTH_SHIFT, + std.time.ns_per_s, + )); + } + /// Called when a packet is sent. Deducts from the budget. pub fn onPacketSent(self: *Pacer, size: u64, now: i64) void { self.replenish(now); @@ -707,6 +915,126 @@ test "icbrt: basic cube roots" { try testing.expectEqual(@as(u64, 100), icbrt(1000000)); } +// ── CongestionControl union dispatch tests ── + +test "CongestionControl: cubic dispatch is transparent" { + var cc = CongestionControl.init(.cubic); + const initial = cc.sendWindow(); + try testing.expect(cc.inSlowStart()); + + cc.onPacketAcked(1200, 100); + try testing.expectEqual(initial + 1200, cc.sendWindow()); + + cc.onCongestionEvent(200, 300); + try testing.expect(cc.sendWindow() < initial + 1200); + try testing.expect(cc.inCongestionRecovery(150)); +} + +test "CongestionControl: newreno dispatch is transparent" { + var cc = CongestionControl.init(.newreno); + const initial = cc.sendWindow(); + try testing.expect(cc.inSlowStart()); + + cc.onPacketAcked(1200, 100); + try testing.expectEqual(initial + 1200, cc.sendWindow()); + + cc.setAppLimited(true); + cc.onPacketAcked(1200, 110); + // app_limited suppresses growth + try testing.expectEqual(initial + 1200, cc.sendWindow()); + + cc.setAppLimited(false); + cc.onCongestionEvent(50, 200); + try testing.expect(cc.sendWindow() < initial + 1200); +} + +test "CongestionControl: setMaxDatagramSize + enterRecoveryForMigration on newreno" { + var cc = CongestionControl.init(.newreno); + cc.setMaxDatagramSize(1400); + cc.enterRecoveryForMigration(500); + // Pre-migration packet (sent_time=400 <= 500) should be in recovery + try testing.expect(cc.inCongestionRecovery(400)); + // Post-migration packet (sent_time=600 > 500) should not be + try testing.expect(!cc.inCongestionRecovery(600)); +} + +test "CongestionControl: persistent congestion drops to minimum" { + var cc = CongestionControl.init(.cubic); + cc.onPersistentCongestion(1_000_000_000); + try testing.expectEqual(MIN_WINDOW_PACKETS * DEFAULT_MAX_DATAGRAM_SIZE, cc.sendWindow()); + try testing.expect(cc.inSlowStart()); +} + +test "CongestionControl: onAckBatch parity (loss path matches onCongestionEvent)" { + var via_batch = CongestionControl.init(.cubic); + var via_direct = Cubic.init(); + + // Same loss event applied via batch and direct API should yield same window. + via_batch.onAckBatch(&.{ + .now = 200, + .bytes_in_flight = 0, + .persistent_congestion = false, + .earliest_lost_sent_time = 100, + .ce_byte_count = 0, + }); + via_direct.onCongestionEvent(100, 200); + try testing.expectEqual(via_direct.sendWindow(), via_batch.sendWindow()); +} + +test "CongestionControl: onAckBatch persistent congestion path" { + var cc = CongestionControl.init(.newreno); + cc.onAckBatch(&.{ + .now = 1_000_000_000, + .bytes_in_flight = 0, + .persistent_congestion = true, + .earliest_lost_sent_time = 500_000_000, + .ce_byte_count = 0, + }); + try testing.expectEqual(MIN_WINDOW_PACKETS * DEFAULT_MAX_DATAGRAM_SIZE, cc.sendWindow()); +} + +test "CongestionControl: onAckBatch ECN-CE triggers congestion" { + var cc = CongestionControl.init(.cubic); + const initial = cc.sendWindow(); + cc.onAckBatch(&.{ + .now = 200, + .bytes_in_flight = 0, + .persistent_congestion = false, + .earliest_lost_sent_time = null, + .ce_byte_count = 1200, + }); + try testing.expect(cc.sendWindow() < initial); +} + +test "Pacer: setPacingRate vs setBandwidth produce non-zero bandwidth" { + var p1 = Pacer.init(); + p1.setPacingRate(1_000_000); // 1 MB/s + try testing.expect(p1.bandwidth_shifted > 0); + + var p2 = Pacer.init(); + var rtt = RttStats{}; + rtt.updateRtt(50_000_000, 0, false); + p2.setBandwidth(60_000, &rtt); + try testing.expect(p2.bandwidth_shifted > 0); +} + +test "CongestionControl: updatePacer dispatches correctly for cubic" { + var cc = CongestionControl.init(.cubic); + var pacer = Pacer.init(); + var rtt = RttStats{}; + rtt.updateRtt(50_000_000, 0, false); + + cc.updatePacer(&pacer, &rtt); + try testing.expect(pacer.bandwidth_shifted > 0); +} + +test "CongestionControl: algorithm tag" { + var cc = CongestionControl.init(.cubic); + try testing.expectEqual(Algorithm.cubic, cc.algorithm()); + cc = CongestionControl.init(.newreno); + try testing.expectEqual(Algorithm.newreno, cc.algorithm()); +} + // ── Pacer tests ── test "Pacer: initial burst allowed" { diff --git a/src/quic/connection.zig b/src/quic/connection.zig index 19d7948..f40153e 100644 --- a/src/quic/connection.zig +++ b/src/quic/connection.zig @@ -524,6 +524,8 @@ pub const ConnectionConfig = struct { datagram_queue_capacity: usize = DatagramQueue.DEFAULT_MAX_ITEMS, // Auto-close connection when all data is sent and acknowledged. close_when_idle: bool = false, + // Congestion control algorithm. Defaults to Cubic. + congestion_control: congestion.Algorithm = .cubic, }; /// A QUIC connection. @@ -560,7 +562,10 @@ pub const Connection = struct { // New subsystems pkt_handler: ack_handler.PacketHandler = undefined, - cc: congestion.Cubic = congestion.Cubic.init(), + cc: congestion.CongestionControl = .{ .cubic = congestion.Cubic.init() }, + /// Selected congestion-control algorithm. Preserved across path migration + /// resets so the user-chosen CC is honored after rebinding to a new IP. + cc_algorithm: congestion.Algorithm = .cubic, pacer: congestion.Pacer = congestion.Pacer.init(), conn_flow_ctrl: flow_control.ConnectionFlowController = undefined, streams: stream_mod.StreamsMap = undefined, @@ -808,6 +813,9 @@ pub const Connection = struct { conn.keep_alive_interval_ns = @min(ka_ns, @divTrunc(conn.idle_timeout_ns, 2)); } conn.close_when_idle = config.close_when_idle; + conn.cc_algorithm = config.congestion_control; + conn.cc = congestion.CongestionControl.init(config.congestion_control); + conn.pkt_handler.rate_sampling_enabled = (config.congestion_control == .bbr); // Initialize TLS 1.3 handshake if config provided if (tls_config) |tc| { @@ -1397,7 +1405,8 @@ pub const Connection = struct { // while bytes_in_flight still reflects the pre-ACK state. // If checked after, bytes_in_flight is already decremented, // making it appear app-limited even when the sender filled cwnd. - self.cc.app_limited = self.pkt_handler.bytes_in_flight < self.cc.sendWindow(); + const prior_bytes_in_flight = self.pkt_handler.bytes_in_flight; + self.cc.setAppLimited(prior_bytes_in_flight < self.cc.sendWindow()); var ack_result: ack_handler.AckResult = .{}; try self.pkt_handler.onAckReceived( @@ -1415,6 +1424,8 @@ pub const Connection = struct { // Notify congestion controller, track key update ACKs, and PMTUD var has_non_probe_loss = false; var earliest_lost_sent_time: ?i64 = null; + var newly_acked_bytes: u64 = 0; + var newly_lost_bytes: u64 = 0; for (result.acked.constSlice()) |pkt| { // Check if this is an MTU probe ACK if (self.mtu_discoverer.onProbeAcked(pkt.pn, now)) { @@ -1424,6 +1435,8 @@ pub const Connection = struct { self.cc.setMaxDatagramSize(new_mtu); self.pacer.max_datagram_size = new_mtu; std.log.info("PMTUD: probe ACK'd, MTU raised to {d}", .{new_mtu}); + } else { + newly_acked_bytes += pkt.size; } self.cc.onPacketAcked(pkt.size, pkt.time_sent); @@ -1462,6 +1475,7 @@ pub const Connection = struct { std.log.info("PMTUD: probe lost pn={d}", .{pkt.pn}); } else { has_non_probe_loss = true; + newly_lost_bytes += pkt.size; if (earliest_lost_sent_time == null or pkt.time_sent < earliest_lost_sent_time.?) { earliest_lost_sent_time = pkt.time_sent; } @@ -1491,20 +1505,24 @@ pub const Connection = struct { } } - if (has_non_probe_loss) { - if (result.persistent_congestion) { - // Only trigger persistent congestion if the lost packets - // are from outside the current recovery epoch. This prevents - // repeated resets when old packets are gradually declared lost - // across multiple ACK events after a blackhole. - if (earliest_lost_sent_time) |lost_time| { - if (!self.cc.inCongestionRecovery(lost_time)) { - self.cc.onPersistentCongestion(now); - std.log.info("persistent congestion detected, window reduced to minimum", .{}); - } - } - } else if (earliest_lost_sent_time) |lost_time| { - self.cc.onCongestionEvent(lost_time, now); + // Always call onAckBatch when there's any progress so BBR can update + // its model from the rate sample, even on no-loss/no-CE acks. + if (newly_acked_bytes > 0 or has_non_probe_loss) { + const ack_ctx = congestion.AckContext{ + .now = now, + .bytes_in_flight = self.pkt_handler.bytes_in_flight, + .prior_bytes_in_flight = prior_bytes_in_flight, + .newly_acked_bytes = newly_acked_bytes, + .newly_lost_bytes = newly_lost_bytes, + .persistent_congestion = result.persistent_congestion, + .earliest_lost_sent_time = earliest_lost_sent_time, + .largest_acked_pn = ack.largest_ack, + .ce_byte_count = 0, + .rate_sample = self.pkt_handler.latest_rate_sample, + }; + self.cc.onAckBatch(&ack_ctx); + if (result.persistent_congestion and earliest_lost_sent_time != null) { + std.log.info("persistent congestion detected, window reduced to minimum", .{}); } } @@ -1517,7 +1535,7 @@ pub const Connection = struct { self.maybeConfirmHandshake(enc_level, result.acked.len); // Update pacer - self.pacer.setBandwidth(self.cc.sendWindow(), &self.pkt_handler.rtt_stats); + self.cc.updatePacer(&self.pacer, &self.pkt_handler.rtt_stats); }, .ack_ecn => |ack| { @@ -1526,7 +1544,8 @@ pub const Connection = struct { const peer_tp = self.peer_params orelse transport_params.TransportParams{}; // RFC 9002 §7.8: snapshot app_limited BEFORE processing ACKs - self.cc.app_limited = self.pkt_handler.bytes_in_flight < self.cc.sendWindow(); + const prior_bytes_in_flight_ecn = self.pkt_handler.bytes_in_flight; + self.cc.setAppLimited(prior_bytes_in_flight_ecn < self.cc.sendWindow()); var ack_result: ack_handler.AckResult = .{}; try self.pkt_handler.onAckReceived( @@ -1544,6 +1563,8 @@ pub const Connection = struct { // Notify congestion controller, track key update ACKs, and PMTUD var has_non_probe_loss = false; var earliest_lost_sent_time_ecn: ?i64 = null; + var newly_acked_bytes_ecn: u64 = 0; + var newly_lost_bytes_ecn: u64 = 0; for (result.acked.constSlice()) |pkt| { if (self.mtu_discoverer.onProbeAcked(pkt.pn, now)) { const new_mtu = self.mtu_discoverer.current_mtu; @@ -1551,6 +1572,8 @@ pub const Connection = struct { self.cc.setMaxDatagramSize(new_mtu); self.pacer.max_datagram_size = new_mtu; std.log.info("PMTUD: probe ACK'd, MTU raised to {d}", .{new_mtu}); + } else { + newly_acked_bytes_ecn += pkt.size; } self.cc.onPacketAcked(pkt.size, pkt.time_sent); @@ -1592,6 +1615,7 @@ pub const Connection = struct { std.log.info("PMTUD: probe lost pn={d}", .{pkt.pn}); } else { has_non_probe_loss = true; + newly_lost_bytes_ecn += pkt.size; if (earliest_lost_sent_time_ecn == null or pkt.time_sent < earliest_lost_sent_time_ecn.?) { earliest_lost_sent_time_ecn = pkt.time_sent; } @@ -1621,30 +1645,15 @@ pub const Connection = struct { } } - if (has_non_probe_loss) { - if (result.persistent_congestion) { - if (earliest_lost_sent_time_ecn) |lost_time| { - if (!self.cc.inCongestionRecovery(lost_time)) { - self.cc.onPersistentCongestion(now); - std.log.info("persistent congestion detected, window reduced to minimum", .{}); - } - } - } else if (earliest_lost_sent_time_ecn) |lost_time| { - self.cc.onCongestionEvent(lost_time, now); - } - } - - // QLOG: metrics_updated after ACK_ECN processing - if (self.qlog_writer) |*ql| { - const rs = &self.pkt_handler.rtt_stats; - ql.metricsUpdated(now, rs.min_rtt, rs.smoothed_rtt, rs.latest_rtt, rs.rtt_var, self.cc.sendWindow(), self.pkt_handler.bytes_in_flight); - } - // ECN validation (RFC 9000 §13.4.2.1): // Count how many newly-acked packets were ECN-marked var newly_acked_ect0: u64 = 0; + var newly_acked_ect0_bytes: u64 = 0; for (result.acked.constSlice()) |pkt| { - if (pkt.ecn_marked) newly_acked_ect0 += 1; + if (pkt.ecn_marked) { + newly_acked_ect0 += 1; + newly_acked_ect0_bytes += pkt.size; + } } // Validate ECN counts from peer @@ -1658,19 +1667,53 @@ pub const Connection = struct { newly_acked_ect0, ); - // If valid and CE count increased, treat as congestion event + // Approximate CE-byte count for the CC. ACK_ECN counters are + // aggregate (no per-packet attribution), so distribute the + // CE-count delta across the ECT(0) bytes acked in this batch. + var ce_byte_count: u64 = 0; if (ecn_valid and ack.ecn_ce > self.peer_ecn_ce[space_idx]) { + const ce_delta = ack.ecn_ce - self.peer_ecn_ce[space_idx]; + if (newly_acked_ect0 > 0) { + ce_byte_count = newly_acked_ect0_bytes * ce_delta / newly_acked_ect0; + } else { + // Fallback: assume MTU-sized packets + ce_byte_count = ce_delta * self.packer.max_packet_size; + } std.log.info("ECN: CE count increased {d} -> {d}, congestion signal", .{ self.peer_ecn_ce[space_idx], ack.ecn_ce }); - self.cc.onCongestionEvent(now, now); } self.peer_ecn_ect0[space_idx] = ack.ecn_ect0; self.peer_ecn_ect1[space_idx] = ack.ecn_ect1; self.peer_ecn_ce[space_idx] = ack.ecn_ce; + if (newly_acked_bytes_ecn > 0 or has_non_probe_loss or ce_byte_count > 0) { + const ack_ctx = congestion.AckContext{ + .now = now, + .bytes_in_flight = self.pkt_handler.bytes_in_flight, + .prior_bytes_in_flight = prior_bytes_in_flight_ecn, + .newly_acked_bytes = newly_acked_bytes_ecn, + .newly_lost_bytes = newly_lost_bytes_ecn, + .persistent_congestion = result.persistent_congestion, + .earliest_lost_sent_time = earliest_lost_sent_time_ecn, + .largest_acked_pn = ack.largest_ack, + .ce_byte_count = ce_byte_count, + .rate_sample = self.pkt_handler.latest_rate_sample, + }; + self.cc.onAckBatch(&ack_ctx); + if (result.persistent_congestion and earliest_lost_sent_time_ecn != null) { + std.log.info("persistent congestion detected, window reduced to minimum", .{}); + } + } + + // QLOG: metrics_updated after ACK_ECN processing + if (self.qlog_writer) |*ql| { + const rs = &self.pkt_handler.rtt_stats; + ql.metricsUpdated(now, rs.min_rtt, rs.smoothed_rtt, rs.latest_rtt, rs.rtt_var, self.cc.sendWindow(), self.pkt_handler.bytes_in_flight); + } + self.maybeConfirmHandshake(enc_level, result.acked.len); // Update pacer - self.pacer.setBandwidth(self.cc.sendWindow(), &self.pkt_handler.rtt_stats); + self.cc.updatePacer(&self.pacer, &self.pkt_handler.rtt_stats); }, .reset_stream => |rs| { @@ -2543,8 +2586,8 @@ pub const Connection = struct { const challenge = self.paths[candidate_idx].validator.startChallenge(); self.pending_frames.push(.{ .path_challenge = challenge }); - // Reset CC/RTT/MTU/ECN for new IP - self.cc = congestion.Cubic.init(); + // Reset CC/RTT/MTU/ECN for new IP (preserve algorithm choice) + self.cc = congestion.CongestionControl.init(self.cc_algorithm); self.pacer = congestion.Pacer.init(); self.pkt_handler.rtt_stats = rtt.RttStats{}; self.mtu_discoverer.reset(); @@ -3105,7 +3148,7 @@ pub const Connection = struct { const old_path = &self.paths[1 - candidate_idx]; const same_ip = sockaddrSameIp(&new_peer_addr, &old_path.peer_addr); if (!same_ip) { - self.cc = congestion.Cubic.init(); + self.cc = congestion.CongestionControl.init(self.cc_algorithm); self.pacer = congestion.Pacer.init(); self.pkt_handler.rtt_stats = rtt.RttStats{}; self.mtu_discoverer.reset(); @@ -3204,16 +3247,19 @@ pub const Connection = struct { // Loss detection timer: check loss_time BEFORE PTO (RFC 9002 §6.2.1). // Loss timers don't increment pto_count — they run loss detection directly. if (self.pkt_handler.getExpiredLossTime(now)) |loss_level| { + const prior_bif_lt = self.pkt_handler.bytes_in_flight; var loss_result: ack_handler.AckResult = .{}; self.pkt_handler.detectLossesForSpace(loss_level, now, &loss_result); var has_non_probe_loss_lt = false; var earliest_lost_sent_time_lt: ?i64 = null; + var newly_lost_bytes_lt: u64 = 0; for (loss_result.lost.constSlice()) |pkt| { self.total_packets_lost += 1; if (pkt.has_datagram) self.datagrams_lost_outgoing += 1; if (self.mtu_discoverer.onProbeLost(pkt.pn, now)) {} else { has_non_probe_loss_lt = true; + newly_lost_bytes_lt += pkt.size; if (earliest_lost_sent_time_lt == null or pkt.time_sent < earliest_lost_sent_time_lt.?) { earliest_lost_sent_time_lt = pkt.time_sent; } @@ -3227,18 +3273,24 @@ pub const Connection = struct { } } if (has_non_probe_loss_lt) { - if (loss_result.persistent_congestion) { - if (earliest_lost_sent_time_lt) |lost_time| { - if (!self.cc.inCongestionRecovery(lost_time)) { - self.cc.onPersistentCongestion(now); - std.log.info("persistent congestion detected (loss timer), window reduced to minimum", .{}); - } - } - } else if (earliest_lost_sent_time_lt) |lost_time| { - self.cc.onCongestionEvent(lost_time, now); + const ack_ctx = congestion.AckContext{ + .now = now, + .bytes_in_flight = self.pkt_handler.bytes_in_flight, + .prior_bytes_in_flight = prior_bif_lt, + .newly_acked_bytes = 0, + .newly_lost_bytes = newly_lost_bytes_lt, + .persistent_congestion = loss_result.persistent_congestion, + .earliest_lost_sent_time = earliest_lost_sent_time_lt, + .largest_acked_pn = null, + .ce_byte_count = 0, + .rate_sample = null, + }; + self.cc.onAckBatch(&ack_ctx); + if (loss_result.persistent_congestion and earliest_lost_sent_time_lt != null) { + std.log.info("persistent congestion detected (loss timer), window reduced to minimum", .{}); } } - self.pacer.setBandwidth(self.cc.sendWindow(), &self.pkt_handler.rtt_stats); + self.cc.updatePacer(&self.pacer, &self.pkt_handler.rtt_stats); } // Check PTO — fire when any space has an expired deadline. @@ -4170,6 +4222,9 @@ pub fn connect( conn.keep_alive_interval_ns = @min(ka_ns, @divTrunc(conn.idle_timeout_ns, 2)); } conn.close_when_idle = config.close_when_idle; + conn.cc_algorithm = config.congestion_control; + conn.cc = congestion.CongestionControl.init(config.congestion_control); + conn.pkt_handler.rate_sampling_enabled = (config.congestion_control == .bbr); // Initialize QLOG if configured if (config.qlog_dir) |dir| { @@ -4310,6 +4365,41 @@ test "Connection: init and basic state" { try std.testing.expect(!conn.isDraining()); } +test "Connection: BBR algorithm selection via config" { + var conn = try connect( + std.testing.allocator, + "example.com", + .{ .congestion_control = .bbr }, + null, + null, + ); + defer conn.deinit(); + + try std.testing.expectEqual(congestion.Algorithm.bbr, conn.cc.algorithm()); + try std.testing.expectEqual(congestion.Algorithm.bbr, conn.cc_algorithm); +} + +test "Connection: NewReno algorithm selection via config" { + var conn = try connect( + std.testing.allocator, + "example.com", + .{ .congestion_control = .newreno }, + null, + null, + ); + defer conn.deinit(); + + try std.testing.expectEqual(congestion.Algorithm.newreno, conn.cc.algorithm()); +} + +test "Connection: default algorithm is Cubic" { + var conn = try connect(std.testing.allocator, "example.com", .{}, null, null); + defer conn.deinit(); + + try std.testing.expectEqual(congestion.Algorithm.cubic, conn.cc.algorithm()); + try std.testing.expectEqual(congestion.Algorithm.cubic, conn.cc_algorithm); +} + // ConnectionIdPool tests test "ConnectionIdPool: add and consume" { var pool = ConnectionIdPool{}; diff --git a/src/quic/delivery_rate.zig b/src/quic/delivery_rate.zig new file mode 100644 index 0000000..1bca786 --- /dev/null +++ b/src/quic/delivery_rate.zig @@ -0,0 +1,210 @@ +//! Delivery-rate sampler (RFC 9002 §B / draft-cheng-iccrg-delivery-rate-estimation). +//! +//! Per-ACK estimate of the connection's delivery rate. Each transmitted packet +//! captures a snapshot of the cumulative `delivered` counter and the timestamp +//! of the most recent delivery. When the packet is acked, the snapshot is +//! compared against the current cumulative state to produce a `RateSample` +//! that BBR (and other model-based controllers) consume. +//! +//! The sampler also tracks "application-limited" intervals: if the sender is +//! ever throttled by application data shortage, packets sent during that +//! interval are flagged so BBR ignores their (artificially low) delivery rate. + +const std = @import("std"); +const ack_handler = @import("ack_handler.zig"); +const SentPacket = ack_handler.SentPacket; + +/// One sample produced when an acked packet is consumed by the sampler. +pub const RateSample = struct { + /// Estimated delivery rate in bytes/second (0 if unmeasurable). + delivery_rate_bps: u64, + /// Bytes delivered in the sample interval. + delivered: u64, + /// Bytes delivered prior to this sample's send-snapshot. + prior_delivered: u64, + /// Wall-clock interval used to compute `delivery_rate_bps` (nanoseconds). + interval_ns: i64, + /// Time spent sending the sample (delivered_time - first_sent_time). + send_elapsed_ns: i64, + /// Time spent acking the sample (now - delivered_time_at_send). + ack_elapsed_ns: i64, + /// True if any packet contributing to the sample was sent app-limited. + is_app_limited: bool, + /// Round-trip time observed for the acked packet (nanoseconds). + rtt_ns: i64, +}; + +pub const RateSampler = struct { + /// Cumulative bytes delivered (acked) across the connection's lifetime. + delivered: u64 = 0, + /// Timestamp of the most recent delivery (nanoseconds). + delivered_time: i64 = 0, + /// Send-time of the most recent packet contributing to a delivery (nanoseconds). + first_sent_time: i64 = 0, + /// `delivered` counter snapshot at the start of the current app-limited + /// interval. Packets sent before this counter is matched (delivered < + /// app_limited_until) are flagged. 0 = not currently app-limited. + app_limited_until: u64 = 0, + + /// Stamp delivery-rate snapshot fields into a packet at send time. + /// `bytes_in_flight` is the pre-send count — when zero, this packet + /// starts a new send burst, and the sampler resets `first_sent_time`/ + /// `delivered_time` to `now` so the next sample's interval starts here. + pub fn onPacketSent( + self: *RateSampler, + pkt: *SentPacket, + bytes_in_flight: u64, + now: i64, + ) void { + if (bytes_in_flight == 0) { + self.first_sent_time = now; + self.delivered_time = now; + } + pkt.first_sent_time_at_send = self.first_sent_time; + pkt.delivered_time_at_send = self.delivered_time; + pkt.delivered_at_send = self.delivered; + pkt.is_app_limited_at_send = self.app_limited_until != 0; + } + + /// Update sampler state for an acked packet and return a RateSample + /// describing the interval since this packet was sent. + pub fn onPacketAcked( + self: *RateSampler, + pkt: *const SentPacket, + rtt_ns: i64, + now: i64, + ) RateSample { + // Advance cumulative delivered counter. + self.delivered += pkt.size; + self.delivered_time = now; + + // Advance first_sent_time to this packet's send time so that subsequent + // samples measure the most recent send-burst. + if (pkt.time_sent > self.first_sent_time) { + self.first_sent_time = pkt.time_sent; + } + + // If this ACK satisfied the app-limited condition, clear the flag. + if (self.app_limited_until != 0 and self.delivered > self.app_limited_until) { + self.app_limited_until = 0; + } + + const send_elapsed = pkt.time_sent - pkt.first_sent_time_at_send; + const ack_elapsed = self.delivered_time - pkt.delivered_time_at_send; + // Use the larger of the two intervals — RFC 9002 §B guidance to avoid + // overestimating when ACK compression shortens ack_elapsed. + const interval = @max(send_elapsed, ack_elapsed); + + const delivered_in_sample = self.delivered - pkt.delivered_at_send; + + var bps: u64 = 0; + if (interval > 0 and delivered_in_sample > 0) { + // bytes/sec = delivered_in_sample * 1e9 / interval_ns + bps = @intCast(@divTrunc( + @as(u128, delivered_in_sample) * std.time.ns_per_s, + @as(u128, @intCast(interval)), + )); + } + + return .{ + .delivery_rate_bps = bps, + .delivered = delivered_in_sample, + .prior_delivered = pkt.delivered_at_send, + .interval_ns = interval, + .send_elapsed_ns = send_elapsed, + .ack_elapsed_ns = ack_elapsed, + .is_app_limited = pkt.is_app_limited_at_send, + .rtt_ns = rtt_ns, + }; + } + + /// Mark the sender as application-limited. Subsequent packets sent until + /// `delivered` advances past `last_sent_pn_delivered` are flagged. + pub fn onAppLimited(self: *RateSampler, last_sent_pn_delivered: u64) void { + self.app_limited_until = if (last_sent_pn_delivered == 0) 1 else last_sent_pn_delivered; + } +}; + +// ── Tests ── + +const testing = std.testing; + +fn makePkt(pn: u64, size: u64, time_sent: i64) SentPacket { + return .{ + .pn = pn, + .time_sent = time_sent, + .size = size, + .ack_eliciting = true, + .in_flight = true, + .enc_level = .application, + }; +} + +test "RateSampler: produces correct rate for steady stream" { + var s = RateSampler{}; + var p1 = makePkt(1, 1000, 0); + var p2 = makePkt(2, 1000, 10_000_000); // +10ms + var p3 = makePkt(3, 1000, 20_000_000); + + // Send three packets back-to-back (bytes_in_flight grows). + s.onPacketSent(&p1, 0, 0); + s.onPacketSent(&p2, 1000, 10_000_000); + s.onPacketSent(&p3, 2000, 20_000_000); + + // Ack them at +50ms, +60ms, +70ms (50ms RTT) + _ = s.onPacketAcked(&p1, 50_000_000, 50_000_000); + _ = s.onPacketAcked(&p2, 50_000_000, 60_000_000); + const sample = s.onPacketAcked(&p3, 50_000_000, 70_000_000); + + // 3000 bytes delivered total, send interval 20ms, ack interval 70ms. + // Sample for p3: delivered_in_sample = 3000, interval = max(send=20ms, ack=70ms) = 70ms. + try testing.expectEqual(@as(u64, 3000), sample.delivered); + try testing.expectEqual(@as(i64, 70_000_000), sample.interval_ns); + // Rate ≈ 3000 / 0.07s ≈ 42857 B/s + try testing.expect(sample.delivery_rate_bps > 40_000); + try testing.expect(sample.delivery_rate_bps < 45_000); +} + +test "RateSampler: app-limited flag propagates" { + var s = RateSampler{}; + s.onAppLimited(500); // currently app-limited until delivered > 500 + + var p = makePkt(1, 200, 0); + s.onPacketSent(&p, 0, 0); + try testing.expect(p.is_app_limited_at_send); + + const sample = s.onPacketAcked(&p, 50_000_000, 50_000_000); + try testing.expect(sample.is_app_limited); + // delivered is now 200, still under 500 → still app-limited + try testing.expect(s.app_limited_until != 0); + + var p2 = makePkt(2, 600, 60_000_000); + s.onPacketSent(&p2, 0, 60_000_000); + _ = s.onPacketAcked(&p2, 50_000_000, 110_000_000); + // delivered is now 800 > 500 → flag cleared + try testing.expectEqual(@as(u64, 0), s.app_limited_until); +} + +test "RateSampler: reorder still produces monotonic delivered" { + var s = RateSampler{}; + var p1 = makePkt(1, 1000, 0); + var p2 = makePkt(2, 1000, 10_000_000); + + s.onPacketSent(&p1, 0, 0); + s.onPacketSent(&p2, 1000, 10_000_000); + + // Ack p2 first, then p1 (reordered) + _ = s.onPacketAcked(&p2, 50_000_000, 60_000_000); + try testing.expectEqual(@as(u64, 1000), s.delivered); + + _ = s.onPacketAcked(&p1, 60_000_000, 70_000_000); + try testing.expectEqual(@as(u64, 2000), s.delivered); +} + +test "RateSampler: zero interval yields zero rate" { + var s = RateSampler{}; + var p = makePkt(1, 1000, 100); + s.onPacketSent(&p, 0, 100); + const sample = s.onPacketAcked(&p, 0, 100); // same instant + try testing.expectEqual(@as(u64, 0), sample.delivery_rate_bps); +} diff --git a/src/test_all.zig b/src/test_all.zig index 375ddf4..65821bd 100644 --- a/src/test_all.zig +++ b/src/test_all.zig @@ -8,6 +8,8 @@ test { _ = @import("quic/rtt.zig"); _ = @import("quic/ack_handler.zig"); _ = @import("quic/congestion.zig"); + _ = @import("quic/delivery_rate.zig"); + _ = @import("quic/bbr.zig"); _ = @import("quic/flow_control.zig"); _ = @import("quic/transport_params.zig"); _ = @import("quic/stream.zig");