From fce53514c48923ce53aa5e6e58929e869d8b948c Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 13 May 2026 20:13:27 +0200 Subject: [PATCH] feat(noq)!: open path by four tuple --- noq/src/connection.rs | 51 ++++++++++++++++++++++++++++++++++++++----- noq/src/lib.rs | 4 ++-- noq/src/tests.rs | 21 +++++------------- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/noq/src/connection.rs b/noq/src/connection.rs index ee7f88b60..987c023d9 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -292,6 +292,43 @@ impl Future for ConnectionDriver { } } +/// Options for opening a new path on a [`Connection`]. +/// +/// Defaults to [`PathStatus::Available`] and no preferred local IP. +/// +/// # Examples +/// +/// ```ignore +/// let opts = OpenPathOpts::default() +/// .initial_status(PathStatus::Backup) +/// .local_ip(Some(local_ip)); +/// connection.open_path(remote_addr, opts); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct OpenPathOpts { + initial_status: PathStatus, + local_ip: Option, +} + +impl OpenPathOpts { + /// Sets the initial [`PathStatus`] for the new path. + #[must_use] + pub fn initial_status(mut self, initial_status: PathStatus) -> Self { + self.initial_status = initial_status; + self + } + + /// Sets the local IP address to use for the new path. + /// + /// When `Some`, the path is bound to the given local IP. When `None`, the local IP is + /// left unspecified and the operating system selects a source address. + #[must_use] + pub fn local_ip(mut self, local_ip: Option) -> Self { + self.local_ip = local_ip; + self + } +} + /// A QUIC connection. /// /// If all references to a connection (including every clone of the `Connection` handle, streams of @@ -383,7 +420,7 @@ impl Connection { /// Otherwise behaves exactly as [`open_path`]. /// /// [`open_path`]: Self::open_path - pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath { + pub fn open_path_ensure(&self, addr: SocketAddr, opts: OpenPathOpts) -> OpenPath { let mut state = self.0.lock_and_wake("open_path"); // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6. @@ -415,8 +452,10 @@ impl Connection { let now = state.runtime.now(); // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently. // However, changing that would mean changing the API. - let addrs = FourTuple::from_remote(addr); - let open_res = state.inner.open_path_ensure(addrs, initial_status, now); + let addrs = FourTuple::new(addr, opts.local_ip); + let open_res = state + .inner + .open_path_ensure(addrs, opts.initial_status, now); match open_res { Ok((path_id, existed)) if existed => { let recv = state.open_path.get(&path_id).map(|tx| tx.subscribe()); @@ -450,7 +489,7 @@ impl Connection { /// future, or at a later time. If the failure is immediate [`OpenPath::path_id`] will /// return `None` and the future will be ready immediately. If the failure happens /// later, a [`PathEvent`] will be emitted. - pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath { + pub fn open_path(&self, addr: SocketAddr, opts: OpenPathOpts) -> OpenPath { let mut state = self.0.lock_and_wake("open_path"); // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6. @@ -483,8 +522,8 @@ impl Connection { let now = state.runtime.now(); // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently. // However, changing that would mean changing the API. - let addrs = FourTuple::from_remote(addr); - let open_res = state.inner.open_path(addrs, initial_status, now); + let addrs = FourTuple::new(addr, opts.local_ip); + let open_res = state.inner.open_path(addrs, opts.initial_status, now); match open_res { Ok(path_id) => { state.open_path.insert(path_id, on_open_path_send); diff --git a/noq/src/lib.rs b/noq/src/lib.rs index e4b351eef..8000b3485 100644 --- a/noq/src/lib.rs +++ b/noq/src/lib.rs @@ -77,8 +77,8 @@ pub use rustls; pub use udp; pub use crate::connection::{ - AcceptBi, AcceptUni, Closed, Connecting, Connection, OnClosed, OpenBi, OpenUni, ReadDatagram, - SendDatagram, SendDatagramError, WeakConnectionHandle, ZeroRttAccepted, + AcceptBi, AcceptUni, Closed, Connecting, Connection, OnClosed, OpenBi, OpenPathOpts, OpenUni, + ReadDatagram, SendDatagram, SendDatagramError, WeakConnectionHandle, ZeroRttAccepted, }; pub use crate::endpoint::{Accept, Endpoint, EndpointStats}; pub use crate::event_stream::{Lagged, NatTraversalUpdates, ObservedExternalAddr, PathEvents}; diff --git a/noq/src/tests.rs b/noq/src/tests.rs index c6629e90f..63d36d6f1 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -21,7 +21,7 @@ use std::{ }; use crate::runtime::TokioRuntime; -use crate::{Duration, Instant}; +use crate::{Duration, Instant, OpenPathOpts}; use bytes::Bytes; use proto::{ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig}; use rand::{Rng, SeedableRng, rngs::StdRng}; @@ -1024,7 +1024,7 @@ async fn test_open_path_ensure_existing_path() { // Re-ensuring the already-established path (PathId::ZERO) takes the // `existed` branch in `open_path_ensure`. - let fut = conn.open_path_ensure(server_addr, proto::PathStatus::Available); + let fut = conn.open_path_ensure(server_addr, OpenPathOpts::default()); let expected_path_id = fut .path_id() .expect("open_path_ensure should allocate or reuse a path id"); @@ -1078,7 +1078,7 @@ async fn test_multipath_observed_address() { // this sleep after the poll_transmit unwraps have been addressed tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let path = conn - .open_path(server_addr, proto::PathStatus::Available) + .open_path(server_addr, OpenPathOpts::default()) .await .unwrap(); let mut reports = path.observed_external_addr().unwrap(); @@ -1292,10 +1292,7 @@ async fn path_clone_stats_after_abandon() { // Open a second path, while giving the remote some time to issue cids. let path = tokio::time::timeout(Duration::from_secs(1), async { loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { + match conn.open_path(server_addr, OpenPathOpts::default()).await { Ok(path) => break path, Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await; @@ -1379,10 +1376,7 @@ async fn closed_includes_path_stats_for_all_known_paths() -> TestResult { // Open a second path. let path2 = loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { + match conn.open_path(server_addr, OpenPathOpts::default()).await { Ok(p) => break p, Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await; @@ -1493,10 +1487,7 @@ async fn close_path() -> TestResult { // Open a second path, retrying until remote CIDs are available let path = loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { + match conn.open_path(server_addr, OpenPathOpts::default()).await { Ok(path) => break path, Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await;