Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 118 additions & 63 deletions src/quic/ack_handler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ const MAX_PTO: i64 = 60_000_000_000;
/// Caps backoff to ensure timely retransmission under extreme packet loss.
const MAX_HANDSHAKE_PTO: i64 = 3_000_000_000;

/// Maximum number of packets tracked per ACK result.
const MAX_ACK_RESULT: usize = 256;

/// Maximum number of stream frame records per sent packet.
/// Must be large enough to track all stream frames in a packet (e.g., many small 0-RTT streams).
// Cap inline stream-frame records per sent packet. Each is 32 bytes and lives
Expand Down Expand Up @@ -91,37 +88,51 @@ pub const SentPacket = struct {
}
};

/// Fixed-capacity list of SentPackets for ACK results.
/// Heap-backed list of SentPackets for ACK results.
///
/// Bulk benchmark rows can acknowledge or declare lost thousands of packets in
/// a single ACK/loss pass after ACK compression or loss bursts. A fixed small
/// result buffer silently dropped the excess packets, which left
/// bytes_in_flight and stream ack_offset accounting stale even though packets
/// had been removed from the sent-packet map.
pub const SentPacketList = struct {
buf: [MAX_ACK_RESULT]SentPacket = undefined,
len: usize = 0,
items: std.ArrayListUnmanaged(SentPacket) = .empty,

pub fn append(self: *SentPacketList, item: SentPacket) void {
if (self.len < MAX_ACK_RESULT) {
self.buf[self.len] = item;
self.len += 1;
}
pub fn deinit(self: *SentPacketList, allocator: Allocator) void {
self.items.deinit(allocator);
}

pub fn clearRetainingCapacity(self: *SentPacketList) void {
self.items.clearRetainingCapacity();
}

pub fn append(self: *SentPacketList, allocator: Allocator, item: SentPacket) !void {
try self.items.append(allocator, item);
}

pub fn constSlice(self: *const SentPacketList) []const SentPacket {
return self.buf[0..self.len];
return self.items.items;
}

pub fn count(self: *const SentPacketList) usize {
return self.items.items.len;
}
};

/// Fixed-capacity list of u64 for tracking packet numbers.
/// Heap-backed list of u64 for tracking packet numbers.
const PnList = struct {
buf: [MAX_ACK_RESULT]u64 = undefined,
len: usize = 0,
items: std.ArrayListUnmanaged(u64) = .empty,

pub fn append(self: *PnList, item: u64) void {
if (self.len < MAX_ACK_RESULT) {
self.buf[self.len] = item;
self.len += 1;
}
pub fn deinit(self: *PnList, allocator: Allocator) void {
self.items.deinit(allocator);
}

pub fn append(self: *PnList, allocator: Allocator, item: u64) !void {
try self.items.append(allocator, item);
}

pub fn constSlice(self: *const PnList) []const u64 {
return self.buf[0..self.len];
return self.items.items;
}
};

Expand All @@ -130,6 +141,17 @@ pub const AckResult = struct {
acked: SentPacketList = .{},
lost: SentPacketList = .{},
persistent_congestion: bool = false,

pub fn deinit(self: *AckResult, allocator: Allocator) void {
self.acked.deinit(allocator);
self.lost.deinit(allocator);
}

pub fn reset(self: *AckResult) void {
self.acked.clearRetainingCapacity();
self.lost.clearRetainingCapacity();
self.persistent_congestion = false;
}
};

/// Tracks sent packets and handles loss detection for a single packet number space.
Expand Down Expand Up @@ -183,59 +205,58 @@ pub const SentPacketTracker = struct {
now: i64,
result: *AckResult,
) !void {
result.* = .{};
result.reset();

if (self.largest_acked == null or largest_ack > self.largest_acked.?) {
self.largest_acked = largest_ack;
}

// Process the first ACK range: [largest_ack - first_ack_range, largest_ack]
{
const range_start = largest_ack -| first_ack_range;
var pn = range_start;
while (pn <= largest_ack) : (pn += 1) {
if (self.sent_packets.fetchSwapRemove(pn)) |kv| {
const pkt = kv.value;
if (pkt.ack_eliciting) {
self.ack_eliciting_in_flight -|= 1;
}

if (pkt.pn == largest_ack) {
const send_delta = now - pkt.time_sent;
rtt_stats.updateRtt(send_delta, ack_delay_ns, true);
// ACK ranges can span hundreds of thousands of packet numbers after ACK
// compression. Iterate the packets still in flight instead of walking
// every packet number in the encoded ranges.
const first_range_start = largest_ack -| first_ack_range;
var i: usize = 0;
while (i < self.sent_packets.count()) {
const pn = self.sent_packets.keys()[i];
var acked = pn >= first_range_start and pn <= largest_ack;
if (!acked) {
for (ack_ranges) |range| {
if (pn >= range.start and pn <= range.end) {
acked = true;
break;
}

result.acked.append(pkt);
}
if (pn == largest_ack) break;
}
}
if (!acked) {
i += 1;
continue;
}

// Process additional ACK ranges
for (ack_ranges) |range| {
var pn = range.start;
while (pn <= range.end) : (pn += 1) {
if (self.sent_packets.fetchSwapRemove(pn)) |kv| {
const pkt = kv.value;
if (pkt.ack_eliciting) {
self.ack_eliciting_in_flight -|= 1;
}
result.acked.append(pkt);
}
if (pn == range.end) break;
const kv = self.sent_packets.fetchSwapRemove(pn).?;
const pkt = kv.value;
if (pkt.ack_eliciting) {
self.ack_eliciting_in_flight -|= 1;
}

if (pkt.pn == largest_ack) {
const send_delta = now - pkt.time_sent;
rtt_stats.updateRtt(send_delta, ack_delay_ns, true);
}

try result.acked.append(self.allocator, pkt);
}

// Detect lost packets
self.detectLostPackets(rtt_stats, now, result);
try self.detectLostPackets(rtt_stats, now, result);
}

fn detectLostPackets(self: *SentPacketTracker, rtt_stats: *RttStats, now: i64, result: *AckResult) void {
fn detectLostPackets(self: *SentPacketTracker, rtt_stats: *RttStats, now: i64, result: *AckResult) !void {
self.loss_time = null;
const loss_delay = rtt_stats.lossDelay();
const lost_send_time = now - loss_delay;

var to_remove: PnList = .{};
defer to_remove.deinit(self.allocator);

// Track earliest and latest send times of lost ack-eliciting packets
// for persistent congestion detection (RFC 9002 §7.6.2)
Expand All @@ -250,8 +271,8 @@ pub const SentPacketTracker = struct {
}

if (pkt.time_sent <= lost_send_time) {
result.lost.append(pkt);
to_remove.append(pkt.pn);
try result.lost.append(self.allocator, pkt);
try to_remove.append(self.allocator, pkt.pn);
if (pkt.ack_eliciting) {
if (earliest_lost_time == null or pkt.time_sent < earliest_lost_time.?) {
earliest_lost_time = pkt.time_sent;
Expand All @@ -266,8 +287,8 @@ pub const SentPacketTracker = struct {
if (self.largest_acked.? >= PACKET_THRESHOLD and
pkt.pn <= self.largest_acked.? - PACKET_THRESHOLD)
{
result.lost.append(pkt);
to_remove.append(pkt.pn);
try result.lost.append(self.allocator, pkt);
try to_remove.append(self.allocator, pkt.pn);
if (pkt.ack_eliciting) {
if (earliest_lost_time == null or pkt.time_sent < earliest_lost_time.?) {
earliest_lost_time = pkt.time_sent;
Expand Down Expand Up @@ -695,10 +716,10 @@ pub const PacketHandler = struct {

/// Run loss detection for a specific packet number space (called when loss_time fires).
/// Returns the lost packets for congestion control processing.
pub fn detectLossesForSpace(self: *PacketHandler, level: EncLevel, now: i64, result: *AckResult) void {
pub fn detectLossesForSpace(self: *PacketHandler, level: EncLevel, now: i64, result: *AckResult) !void {
const idx = @intFromEnum(level);
result.* = .{};
self.sent[idx].detectLostPackets(&self.rtt_stats, now, result);
result.reset();
try self.sent[idx].detectLostPackets(&self.rtt_stats, now, result);
for (result.lost.constSlice()) |pkt| {
if (pkt.in_flight) {
self.bytes_in_flight -|= pkt.size;
Expand Down Expand Up @@ -762,9 +783,10 @@ test "SentPacketTracker: basic send and ack" {

const ack_time = now + 50_000_000;
var result: AckResult = .{};
defer result.deinit(testing.allocator);
try tracker.onAckReceived(0, 0, &.{}, 0, &rtt_stats, ack_time, &result);

try testing.expectEqual(@as(usize, 1), result.acked.len);
try testing.expectEqual(@as(usize, 1), result.acked.count());
try testing.expectEqual(@as(u64, 0), result.acked.constSlice()[0].pn);
try testing.expectEqual(@as(u32, 0), tracker.ack_eliciting_in_flight);
try testing.expect(rtt_stats.has_measurement);
Expand All @@ -782,6 +804,39 @@ test "ReceivedPacketTracker: immediate ACK on reorder" {
try testing.expect(tracker.ack_queued);
}

test "SentPacketTracker: large ACK range scans in-flight packets" {
var tracker = SentPacketTracker.init(testing.allocator);
defer tracker.deinit();

var rtt_stats = RttStats{};
const now: i64 = 1_000_000_000;

try tracker.onPacketSent(.{
.pn = 10,
.time_sent = now,
.size = 1200,
.ack_eliciting = true,
.in_flight = true,
.enc_level = .application,
});
try tracker.onPacketSent(.{
.pn = 1_000_000,
.time_sent = now + 1_000,
.size = 1200,
.ack_eliciting = true,
.in_flight = true,
.enc_level = .application,
});

var result: AckResult = .{};
defer result.deinit(testing.allocator);
try tracker.onAckReceived(1_000_000, 0, &.{}, 999_990, &rtt_stats, now + 50_000_000, &result);

try testing.expectEqual(@as(usize, 2), result.acked.count());
try testing.expectEqual(@as(usize, 0), tracker.sent_packets.count());
try testing.expectEqual(@as(u32, 0), tracker.ack_eliciting_in_flight);
}

test "PacketHandler: integration" {
var handler = PacketHandler.init(testing.allocator);
defer handler.deinit();
Expand All @@ -804,6 +859,7 @@ test "PacketHandler: integration" {

const ack_time = now + 50_000_000;
var result: AckResult = .{};
defer result.deinit(testing.allocator);
try handler.onAckReceived(.initial, 0, 0, 3, &.{}, 0, ack_time, &result);

try testing.expectEqual(@as(u64, 0), handler.bytes_in_flight);
Expand Down Expand Up @@ -947,4 +1003,3 @@ test "NewReno: app_limited suppresses cwnd growth" {
cc.onPacketAcked(1200, 300);
try testing.expect(cc.congestion_window > after_ack);
}

Loading
Loading