Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion noq-proto/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
fmt,
net::{SocketAddrV4, SocketAddrV6},
num::TryFromIntError,
num::{NonZeroUsize, TryFromIntError},
sync::Arc,
};

Expand Down Expand Up @@ -33,6 +33,8 @@ mod transport;
pub use qlog::{QlogConfig, QlogFactory, QlogFileFactory};
pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, TransportConfig};

const DEFAULT_MAX_TRANSMIT_DATAGRAMS: NonZeroUsize = NonZeroUsize::new(80).expect("nonzero");

/// Global configuration for the endpoint, affecting all connections
///
/// Default values should be suitable for most internet applications.
Expand All @@ -51,6 +53,7 @@ pub struct EndpointConfig {
pub(crate) min_reset_interval: Duration,
/// Optional seed to be used internally for random number generation
pub(crate) rng_seed: Option<[u8; 32]>,
pub(crate) max_transmit_datagrams: NonZeroUsize,
}

impl EndpointConfig {
Expand All @@ -66,6 +69,7 @@ impl EndpointConfig {
grease_quic_bit: true,
min_reset_interval: Duration::from_millis(20),
rng_seed: None,
max_transmit_datagrams: DEFAULT_MAX_TRANSMIT_DATAGRAMS,
}
}

Expand Down Expand Up @@ -162,6 +166,25 @@ impl EndpointConfig {
self.rng_seed = seed;
self
}

/// The maximum amount of datagrams which will be produced in a single
/// transmit drive iteration.
///
/// This limits the amount of CPU resources consumed by datagram generation, and
/// allows other tasks (like receiving ACKs) to run in between.
///
/// Kept in lockstep with [`TransportConfig::max_transmit_segments`] so a single
/// GSO super-segment can hold a couple of transmit drives back-to-back without
/// bouncing to the scheduler.
pub fn max_transmit_datagrams(&mut self, value: NonZeroUsize) -> &mut Self {
self.max_transmit_datagrams = value;
self
}

/// Get the current value of [`max_transmit_datagrams`](Self::max_transmit_datagrams).
pub fn get_max_transmit_datagrams(&self) -> NonZeroUsize {
self.max_transmit_datagrams
}
}

impl fmt::Debug for EndpointConfig {
Expand All @@ -173,6 +196,7 @@ impl fmt::Debug for EndpointConfig {
.field("supported_versions", &self.supported_versions)
.field("grease_quic_bit", &self.grease_quic_bit)
.field("rng_seed", &self.rng_seed)
.field("max_transmit_datagrams", &self.max_transmit_datagrams)
.finish_non_exhaustive()
}
}
Expand Down
23 changes: 22 additions & 1 deletion noq-proto/src/config/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::Path;
use std::{
fmt,
net::SocketAddr,
num::{NonZeroU8, NonZeroU32},
num::{NonZeroU8, NonZeroU32, NonZeroUsize},
sync::Arc,
};

Expand All @@ -14,6 +14,8 @@ use crate::{
#[cfg(feature = "qlog")]
use crate::{QlogFactory, QlogFileFactory};

const DEFAULT_MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(40).expect("nonzero");

/// Parameters governing the core QUIC state machine
///
/// Default values should be suitable for most internet applications. Applications protocols which
Expand Down Expand Up @@ -58,6 +60,7 @@ pub struct TransportConfig {
pub(crate) congestion_controller_factory: Arc<dyn congestion::ControllerFactory + Send + Sync>,

pub(crate) enable_segmentation_offload: bool,
pub(crate) max_transmit_segments: NonZeroUsize,

pub(crate) address_discovery_role: address_discovery::Role,

Expand Down Expand Up @@ -364,6 +367,21 @@ impl TransportConfig {
self
}

/// The maximum amount of datagrams that are sent in a single transmit.
///
/// This can be lower than the maximum platform capabilities, to avoid excessive
/// memory allocations when calling `poll_transmit()`. Benchmarks have shown that
/// numbers around 10 are a good compromise on small payloads, but with
/// `UDP_SEGMENT` (GSO) on Linux it pays off to batch more aggressively: at MTU
/// 1280 this means 51.2 KB per `sendmsg()` call instead of 12.8 KB. The Linux
/// kernel hard limit `UDP_MAX_SEGMENTS` is 64, so 40 stays comfortably within
/// bounds. On uplink-saturated benchmarks this gives a meaningful throughput
/// improvement at the cost of slightly more work per drive call.
pub fn max_transmit_segments(&mut self, value: NonZeroUsize) -> &mut Self {
self.max_transmit_segments = value;
self
}

/// Whether to send observed address reports to peers.
///
/// This will aid peers in inferring their reachable address, which in most NATd networks
Expand Down Expand Up @@ -562,6 +580,7 @@ impl Default for TransportConfig {
congestion_controller_factory: Arc::new(congestion::CubicConfig::default()),

enable_segmentation_offload: true,
max_transmit_segments: DEFAULT_MAX_TRANSMIT_SEGMENTS,

address_discovery_role: address_discovery::Role::default(),

Expand Down Expand Up @@ -608,6 +627,7 @@ impl fmt::Debug for TransportConfig {
deterministic_packet_numbers: _,
congestion_controller_factory: _,
enable_segmentation_offload,
max_transmit_segments,
address_discovery_role,
max_concurrent_multipath_paths,
default_path_max_idle_timeout,
Expand Down Expand Up @@ -648,6 +668,7 @@ impl fmt::Debug for TransportConfig {
.field("datagram_send_buffer_size", datagram_send_buffer_size)
// congestion_controller_factory not debug
.field("enable_segmentation_offload", enable_segmentation_offload)
.field("max_transmit_segments", max_transmit_segments)
.field("address_discovery_role", address_discovery_role)
.field(
"max_concurrent_multipath_paths",
Expand Down
2 changes: 1 addition & 1 deletion noq-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ impl Connection {
) -> Option<Transmit> {
let max_datagrams = match self.config.enable_segmentation_offload {
false => NonZeroUsize::MIN,
true => max_datagrams,
true => max_datagrams.min(self.config.max_transmit_segments),
};

// Each call to poll_transmit can only send datagrams to one destination, because
Expand Down
35 changes: 35 additions & 0 deletions noq-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
convert::TryInto,
mem,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
num::NonZeroUsize,
sync::{Arc, Mutex},
};

Expand Down Expand Up @@ -2191,6 +2192,40 @@ fn tail_loss_respect_max_datagrams() {
assert_eq!(client_stats.udp_tx.ios, client_stats.udp_tx.datagrams);
}

#[test]
fn max_transmit_segments_setter_caps_batch_size() {
let _guard = subscribe();
const CAP: usize = 4;
let client_config = {
let mut c_config = client_config();
let mut t_config = TransportConfig::default();
t_config.max_transmit_segments(NonZeroUsize::new(CAP).expect("nonzero"));
c_config.transport_config(t_config.into());
c_config
};
let mut pair = Pair::default();
let (client_ch, _) = pair.connect_with(client_config);
let before = pair.client_conn_mut(client_ch).stats().udp_tx;

let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s)
.write(&[42u8; 64 * 1024])
.unwrap();
pair.drive();

let after = pair.client_conn_mut(client_ch).stats().udp_tx;
let ios = after.ios - before.ios;
let datagrams = after.datagrams - before.datagrams;
assert!(
ios > 0,
"expected the stream transfer to produce at least one UDP I/O"
);
assert!(
datagrams <= ios * CAP as u64,
"max_transmit_segments({CAP}) should cap each batch, but {datagrams} datagrams across {ios} ios exceeds {CAP} per io",
);
Comment thread
poka-IT marked this conversation as resolved.
}

#[test]
fn datagram_send_recv() {
let _guard = subscribe();
Expand Down
27 changes: 9 additions & 18 deletions noq/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ pub struct Connecting {
}

impl Connecting {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
handle: ConnectionHandle,
conn: proto::Connection,
max_transmit_datagrams: NonZeroUsize,
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
sender: Pin<Box<dyn UdpSender>>,
Expand All @@ -66,6 +68,7 @@ impl Connecting {
on_connected_send,
sender,
runtime.clone(),
max_transmit_datagrams,
)),
shared: Shared::default(),
})));
Expand Down Expand Up @@ -1428,6 +1431,7 @@ pub(crate) struct State {
pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
sender: Pin<Box<dyn UdpSender>>,
pub(crate) runtime: Arc<dyn Runtime>,
max_transmit_datagrams: usize,
send_buffer: Vec<u8>,
/// We buffer a transmit when the underlying I/O would block
buffered_transmit: Option<proto::Transmit>,
Expand All @@ -1449,6 +1453,7 @@ impl State {
on_connected: oneshot::Sender<bool>,
sender: Pin<Box<dyn UdpSender>>,
runtime: Arc<dyn Runtime>,
max_transmit_datagrams: NonZeroUsize,
) -> Self {
Self {
inner,
Expand All @@ -1469,6 +1474,7 @@ impl State {
error: None,
sender,
runtime,
max_transmit_datagrams: max_transmit_datagrams.get(),
send_buffer: Vec::new(),
buffered_transmit: None,
path_events: tokio::sync::broadcast::channel(32).0,
Expand All @@ -1484,10 +1490,8 @@ impl State {
let now = self.runtime.now();
let mut transmits = 0;

let max_datagrams = self
.sender
.max_transmit_segments()
.min(MAX_TRANSMIT_SEGMENTS);
let max_datagrams = self.sender.max_transmit_segments();
let max_transmit_datagrams = self.max_transmit_datagrams;

loop {
// Retry the last transmit, or get a new one.
Expand Down Expand Up @@ -1525,7 +1529,7 @@ impl State {
Poll::Ready(Ok(())) => {}
}

if transmits >= MAX_TRANSMIT_DATAGRAMS {
if transmits >= max_transmit_datagrams {
Comment thread
poka-IT marked this conversation as resolved.
// TODO: What isn't ideal here yet is that if we don't poll all
// datagrams that could be sent we don't go into the `app_limited`
// state and CWND continues to grow until we get here the next time.
Expand Down Expand Up @@ -1875,16 +1879,3 @@ pub enum SendDatagramError {
#[error("connection lost")]
ConnectionLost(#[from] ConnectionError),
}

/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
///
/// This limits the amount of CPU resources consumed by datagram generation,
/// and allows other tasks (like receiving ACKs) to run in between.
const MAX_TRANSMIT_DATAGRAMS: usize = 20;

/// The maximum amount of datagrams that are sent in a single transmit
///
/// This can be lower than the maximum platform capabilities, to avoid excessive
/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
/// that numbers around 10 are a good compromise.
const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known");
35 changes: 26 additions & 9 deletions noq/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,20 @@ impl Endpoint {
addr
};

let max_transmit_datagrams = endpoint.inner.config().get_max_transmit_datagrams();
let (ch, conn) = endpoint
.inner
.connect(self.runtime.now(), config, addr, server_name)?;

let sender = endpoint.socket.create_sender();
endpoint.stats.outgoing_handshakes += 1;
Ok(endpoint
.recv_state
.connections
.insert(ch, conn, sender, self.runtime.clone()))
Ok(endpoint.recv_state.connections.insert(
ch,
conn,
max_transmit_datagrams,
sender,
self.runtime.clone(),
))
}

/// Switch to a new UDP socket
Expand Down Expand Up @@ -485,6 +489,7 @@ impl EndpointInner {
let mut state = self.state.lock().unwrap();
let mut response_buffer = Vec::new();
let now = state.runtime.now();
let max_transmit_datagrams = state.inner.config().get_max_transmit_datagrams();
match state
.inner
.accept(incoming, now, &mut response_buffer, server_config)
Expand All @@ -493,10 +498,13 @@ impl EndpointInner {
state.stats.accepted_handshakes += 1;
let sender = state.socket.create_sender();
let runtime = state.runtime.clone();
Ok(state
.recv_state
.connections
.insert(handle, conn, sender, runtime))
Ok(state.recv_state.connections.insert(
handle,
conn,
max_transmit_datagrams,
sender,
runtime,
))
}
Err(error) => {
if let Some(transmit) = error.response {
Expand Down Expand Up @@ -707,6 +715,7 @@ impl ConnectionSet {
&mut self,
handle: ConnectionHandle,
conn: proto::Connection,
max_transmit_datagrams: NonZeroUsize,
sender: Pin<Box<dyn UdpSender>>,
runtime: Arc<dyn Runtime>,
) -> Connecting {
Expand All @@ -719,7 +728,15 @@ impl ConnectionSet {
.unwrap();
}
self.senders.insert(handle, send);
Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime)
Connecting::new(
handle,
conn,
max_transmit_datagrams,
self.sender.clone(),
recv,
sender,
runtime,
)
}

fn is_empty(&self) -> bool {
Expand Down
Loading