diff --git a/noq-proto/src/config/mod.rs b/noq-proto/src/config/mod.rs index 32b944223..c27a0a717 100644 --- a/noq-proto/src/config/mod.rs +++ b/noq-proto/src/config/mod.rs @@ -1,7 +1,7 @@ use std::{ fmt, net::{SocketAddrV4, SocketAddrV6}, - num::TryFromIntError, + num::{NonZeroUsize, TryFromIntError}, sync::Arc, }; @@ -33,6 +33,8 @@ mod transport; pub use qlog::{QlogConfig, QlogFactory, QlogFileFactory}; pub use transport::{AckFrequencyConfig, IdleTimeout, MtuDiscoveryConfig, TransportConfig}; +const DEFAULT_MAX_TRANSMIT_DATAGRAMS: NonZeroUsize = NonZeroUsize::new(80).expect("nonzero"); + /// Global configuration for the endpoint, affecting all connections /// /// Default values should be suitable for most internet applications. @@ -51,6 +53,7 @@ pub struct EndpointConfig { pub(crate) min_reset_interval: Duration, /// Optional seed to be used internally for random number generation pub(crate) rng_seed: Option<[u8; 32]>, + pub(crate) max_transmit_datagrams: NonZeroUsize, } impl EndpointConfig { @@ -66,6 +69,7 @@ impl EndpointConfig { grease_quic_bit: true, min_reset_interval: Duration::from_millis(20), rng_seed: None, + max_transmit_datagrams: DEFAULT_MAX_TRANSMIT_DATAGRAMS, } } @@ -162,6 +166,25 @@ impl EndpointConfig { self.rng_seed = seed; self } + + /// The maximum amount of datagrams which will be produced in a single + /// transmit drive iteration. + /// + /// This limits the amount of CPU resources consumed by datagram generation, and + /// allows other tasks (like receiving ACKs) to run in between. + /// + /// Kept in lockstep with [`TransportConfig::max_transmit_segments`] so a single + /// GSO super-segment can hold a couple of transmit drives back-to-back without + /// bouncing to the scheduler. + pub fn max_transmit_datagrams(&mut self, value: NonZeroUsize) -> &mut Self { + self.max_transmit_datagrams = value; + self + } + + /// Get the current value of [`max_transmit_datagrams`](Self::max_transmit_datagrams). + pub fn get_max_transmit_datagrams(&self) -> NonZeroUsize { + self.max_transmit_datagrams + } } impl fmt::Debug for EndpointConfig { @@ -173,6 +196,7 @@ impl fmt::Debug for EndpointConfig { .field("supported_versions", &self.supported_versions) .field("grease_quic_bit", &self.grease_quic_bit) .field("rng_seed", &self.rng_seed) + .field("max_transmit_datagrams", &self.max_transmit_datagrams) .finish_non_exhaustive() } } diff --git a/noq-proto/src/config/transport.rs b/noq-proto/src/config/transport.rs index 223976836..2bf058082 100644 --- a/noq-proto/src/config/transport.rs +++ b/noq-proto/src/config/transport.rs @@ -3,7 +3,7 @@ use std::path::Path; use std::{ fmt, net::SocketAddr, - num::{NonZeroU8, NonZeroU32}, + num::{NonZeroU8, NonZeroU32, NonZeroUsize}, sync::Arc, }; @@ -14,6 +14,8 @@ use crate::{ #[cfg(feature = "qlog")] use crate::{QlogFactory, QlogFileFactory}; +const DEFAULT_MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(40).expect("nonzero"); + /// Parameters governing the core QUIC state machine /// /// Default values should be suitable for most internet applications. Applications protocols which @@ -58,6 +60,7 @@ pub struct TransportConfig { pub(crate) congestion_controller_factory: Arc, pub(crate) enable_segmentation_offload: bool, + pub(crate) max_transmit_segments: NonZeroUsize, pub(crate) address_discovery_role: address_discovery::Role, @@ -364,6 +367,21 @@ impl TransportConfig { self } + /// The maximum amount of datagrams that are sent in a single transmit. + /// + /// This can be lower than the maximum platform capabilities, to avoid excessive + /// memory allocations when calling `poll_transmit()`. Benchmarks have shown that + /// numbers around 10 are a good compromise on small payloads, but with + /// `UDP_SEGMENT` (GSO) on Linux it pays off to batch more aggressively: at MTU + /// 1280 this means 51.2 KB per `sendmsg()` call instead of 12.8 KB. The Linux + /// kernel hard limit `UDP_MAX_SEGMENTS` is 64, so 40 stays comfortably within + /// bounds. On uplink-saturated benchmarks this gives a meaningful throughput + /// improvement at the cost of slightly more work per drive call. + pub fn max_transmit_segments(&mut self, value: NonZeroUsize) -> &mut Self { + self.max_transmit_segments = value; + self + } + /// Whether to send observed address reports to peers. /// /// This will aid peers in inferring their reachable address, which in most NATd networks @@ -562,6 +580,7 @@ impl Default for TransportConfig { congestion_controller_factory: Arc::new(congestion::CubicConfig::default()), enable_segmentation_offload: true, + max_transmit_segments: DEFAULT_MAX_TRANSMIT_SEGMENTS, address_discovery_role: address_discovery::Role::default(), @@ -608,6 +627,7 @@ impl fmt::Debug for TransportConfig { deterministic_packet_numbers: _, congestion_controller_factory: _, enable_segmentation_offload, + max_transmit_segments, address_discovery_role, max_concurrent_multipath_paths, default_path_max_idle_timeout, @@ -648,6 +668,7 @@ impl fmt::Debug for TransportConfig { .field("datagram_send_buffer_size", datagram_send_buffer_size) // congestion_controller_factory not debug .field("enable_segmentation_offload", enable_segmentation_offload) + .field("max_transmit_segments", max_transmit_segments) .field("address_discovery_role", address_discovery_role) .field( "max_concurrent_multipath_paths", diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 32a0a2df5..dbc38e8cf 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -1025,7 +1025,7 @@ impl Connection { ) -> Option { let max_datagrams = match self.config.enable_segmentation_offload { false => NonZeroUsize::MIN, - true => max_datagrams, + true => max_datagrams.min(self.config.max_transmit_segments), }; // Each call to poll_transmit can only send datagrams to one destination, because diff --git a/noq-proto/src/tests/mod.rs b/noq-proto/src/tests/mod.rs index 2a088b048..cc8c413d0 100644 --- a/noq-proto/src/tests/mod.rs +++ b/noq-proto/src/tests/mod.rs @@ -2,6 +2,7 @@ use std::{ convert::TryInto, mem, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + num::NonZeroUsize, sync::{Arc, Mutex}, }; @@ -2191,6 +2192,40 @@ fn tail_loss_respect_max_datagrams() { assert_eq!(client_stats.udp_tx.ios, client_stats.udp_tx.datagrams); } +#[test] +fn max_transmit_segments_setter_caps_batch_size() { + let _guard = subscribe(); + const CAP: usize = 4; + let client_config = { + let mut c_config = client_config(); + let mut t_config = TransportConfig::default(); + t_config.max_transmit_segments(NonZeroUsize::new(CAP).expect("nonzero")); + c_config.transport_config(t_config.into()); + c_config + }; + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect_with(client_config); + let before = pair.client_conn_mut(client_ch).stats().udp_tx; + + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + pair.client_send(client_ch, s) + .write(&[42u8; 64 * 1024]) + .unwrap(); + pair.drive(); + + let after = pair.client_conn_mut(client_ch).stats().udp_tx; + let ios = after.ios - before.ios; + let datagrams = after.datagrams - before.datagrams; + assert!( + ios > 0, + "expected the stream transfer to produce at least one UDP I/O" + ); + assert!( + datagrams <= ios * CAP as u64, + "max_transmit_segments({CAP}) should cap each batch, but {datagrams} datagrams across {ios} ios exceeds {CAP} per io", + ); +} + #[test] fn datagram_send_recv() { let _guard = subscribe(); diff --git a/noq/src/connection.rs b/noq/src/connection.rs index 6cca0fff2..8abe8b812 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -45,9 +45,11 @@ pub struct Connecting { } impl Connecting { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( handle: ConnectionHandle, conn: proto::Connection, + max_transmit_datagrams: NonZeroUsize, endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>, conn_events: mpsc::UnboundedReceiver, sender: Pin>, @@ -66,6 +68,7 @@ impl Connecting { on_connected_send, sender, runtime.clone(), + max_transmit_datagrams, )), shared: Shared::default(), }))); @@ -1428,6 +1431,7 @@ pub(crate) struct State { pub(crate) path_events: tokio::sync::broadcast::Sender, sender: Pin>, pub(crate) runtime: Arc, + max_transmit_datagrams: usize, send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block buffered_transmit: Option, @@ -1449,6 +1453,7 @@ impl State { on_connected: oneshot::Sender, sender: Pin>, runtime: Arc, + max_transmit_datagrams: NonZeroUsize, ) -> Self { Self { inner, @@ -1469,6 +1474,7 @@ impl State { error: None, sender, runtime, + max_transmit_datagrams: max_transmit_datagrams.get(), send_buffer: Vec::new(), buffered_transmit: None, path_events: tokio::sync::broadcast::channel(32).0, @@ -1484,10 +1490,8 @@ impl State { let now = self.runtime.now(); let mut transmits = 0; - let max_datagrams = self - .sender - .max_transmit_segments() - .min(MAX_TRANSMIT_SEGMENTS); + let max_datagrams = self.sender.max_transmit_segments(); + let max_transmit_datagrams = self.max_transmit_datagrams; loop { // Retry the last transmit, or get a new one. @@ -1525,7 +1529,7 @@ impl State { Poll::Ready(Ok(())) => {} } - if transmits >= MAX_TRANSMIT_DATAGRAMS { + if transmits >= max_transmit_datagrams { // TODO: What isn't ideal here yet is that if we don't poll all // datagrams that could be sent we don't go into the `app_limited` // state and CWND continues to grow until we get here the next time. @@ -1875,16 +1879,3 @@ pub enum SendDatagramError { #[error("connection lost")] ConnectionLost(#[from] ConnectionError), } - -/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call -/// -/// This limits the amount of CPU resources consumed by datagram generation, -/// and allows other tasks (like receiving ACKs) to run in between. -const MAX_TRANSMIT_DATAGRAMS: usize = 20; - -/// The maximum amount of datagrams that are sent in a single transmit -/// -/// This can be lower than the maximum platform capabilities, to avoid excessive -/// memory allocations when calling `poll_transmit()`. Benchmarks have shown -/// that numbers around 10 are a good compromise. -const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known"); diff --git a/noq/src/endpoint.rs b/noq/src/endpoint.rs index d4b20d63c..4372a6444 100644 --- a/noq/src/endpoint.rs +++ b/noq/src/endpoint.rs @@ -251,16 +251,20 @@ impl Endpoint { addr }; + let max_transmit_datagrams = endpoint.inner.config().get_max_transmit_datagrams(); let (ch, conn) = endpoint .inner .connect(self.runtime.now(), config, addr, server_name)?; let sender = endpoint.socket.create_sender(); endpoint.stats.outgoing_handshakes += 1; - Ok(endpoint - .recv_state - .connections - .insert(ch, conn, sender, self.runtime.clone())) + Ok(endpoint.recv_state.connections.insert( + ch, + conn, + max_transmit_datagrams, + sender, + self.runtime.clone(), + )) } /// Switch to a new UDP socket @@ -485,6 +489,7 @@ impl EndpointInner { let mut state = self.state.lock().unwrap(); let mut response_buffer = Vec::new(); let now = state.runtime.now(); + let max_transmit_datagrams = state.inner.config().get_max_transmit_datagrams(); match state .inner .accept(incoming, now, &mut response_buffer, server_config) @@ -493,10 +498,13 @@ impl EndpointInner { state.stats.accepted_handshakes += 1; let sender = state.socket.create_sender(); let runtime = state.runtime.clone(); - Ok(state - .recv_state - .connections - .insert(handle, conn, sender, runtime)) + Ok(state.recv_state.connections.insert( + handle, + conn, + max_transmit_datagrams, + sender, + runtime, + )) } Err(error) => { if let Some(transmit) = error.response { @@ -707,6 +715,7 @@ impl ConnectionSet { &mut self, handle: ConnectionHandle, conn: proto::Connection, + max_transmit_datagrams: NonZeroUsize, sender: Pin>, runtime: Arc, ) -> Connecting { @@ -719,7 +728,15 @@ impl ConnectionSet { .unwrap(); } self.senders.insert(handle, send); - Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime) + Connecting::new( + handle, + conn, + max_transmit_datagrams, + self.sender.clone(), + recv, + sender, + runtime, + ) } fn is_empty(&self) -> bool {