Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions noq/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,43 @@ impl Future for ConnectionDriver {
}
}

/// Options for opening a new path on a [`Connection`].
///
/// Defaults to [`PathStatus::Available`] and no preferred local IP.
///
/// # Examples
///
/// ```ignore
/// let opts = OpenPathOpts::default()
/// .initial_status(PathStatus::Backup)
/// .local_ip(Some(local_ip));
/// connection.open_path(remote_addr, opts);
/// ```
#[derive(Debug, Clone, Default)]
pub struct OpenPathOpts {
initial_status: PathStatus,
local_ip: Option<IpAddr>,
}

impl OpenPathOpts {
/// Sets the initial [`PathStatus`] for the new path.
#[must_use]
pub fn initial_status(mut self, initial_status: PathStatus) -> Self {
self.initial_status = initial_status;
self
}

/// Sets the local IP address to use for the new path.
///
/// When `Some`, the path is bound to the given local IP. When `None`, the local IP is
/// left unspecified and the operating system selects a source address.
#[must_use]
pub fn local_ip(mut self, local_ip: Option<IpAddr>) -> Self {
self.local_ip = local_ip;
self
}
}

/// A QUIC connection.
///
/// If all references to a connection (including every clone of the `Connection` handle, streams of
Expand Down Expand Up @@ -383,7 +420,7 @@ impl Connection {
/// Otherwise behaves exactly as [`open_path`].
///
/// [`open_path`]: Self::open_path
pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
pub fn open_path_ensure(&self, addr: SocketAddr, opts: OpenPathOpts) -> OpenPath {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind I was expecting to change addr: SocketAddr into network_path: FourTuple. That would mean making FourTuple public I guess but is kind of consistent with how things are named, at least with the internals.

Wonder what @matheus23 's thinks.

let mut state = self.0.lock_and_wake("open_path");

// If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
Expand Down Expand Up @@ -415,8 +452,10 @@ impl Connection {
let now = state.runtime.now();
// TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently.
// However, changing that would mean changing the API.
let addrs = FourTuple::from_remote(addr);
let open_res = state.inner.open_path_ensure(addrs, initial_status, now);
let addrs = FourTuple::new(addr, opts.local_ip);
let open_res = state
.inner
.open_path_ensure(addrs, opts.initial_status, now);
match open_res {
Ok((path_id, existed)) if existed => {
let recv = state.open_path.get(&path_id).map(|tx| tx.subscribe());
Expand Down Expand Up @@ -450,7 +489,7 @@ impl Connection {
/// future, or at a later time. If the failure is immediate [`OpenPath::path_id`] will
/// return `None` and the future will be ready immediately. If the failure happens
/// later, a [`PathEvent`] will be emitted.
pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
pub fn open_path(&self, addr: SocketAddr, opts: OpenPathOpts) -> OpenPath {
let mut state = self.0.lock_and_wake("open_path");

// If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
Expand Down Expand Up @@ -483,8 +522,8 @@ impl Connection {
let now = state.runtime.now();
// TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently.
// However, changing that would mean changing the API.
let addrs = FourTuple::from_remote(addr);
let open_res = state.inner.open_path(addrs, initial_status, now);
let addrs = FourTuple::new(addr, opts.local_ip);
let open_res = state.inner.open_path(addrs, opts.initial_status, now);
match open_res {
Ok(path_id) => {
state.open_path.insert(path_id, on_open_path_send);
Expand Down
4 changes: 2 additions & 2 deletions noq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ pub use rustls;
pub use udp;

pub use crate::connection::{
AcceptBi, AcceptUni, Closed, Connecting, Connection, OnClosed, OpenBi, OpenUni, ReadDatagram,
SendDatagram, SendDatagramError, WeakConnectionHandle, ZeroRttAccepted,
AcceptBi, AcceptUni, Closed, Connecting, Connection, OnClosed, OpenBi, OpenPathOpts, OpenUni,
ReadDatagram, SendDatagram, SendDatagramError, WeakConnectionHandle, ZeroRttAccepted,
};
pub use crate::endpoint::{Accept, Endpoint, EndpointStats};
pub use crate::event_stream::{Lagged, NatTraversalUpdates, ObservedExternalAddr, PathEvents};
Expand Down
21 changes: 6 additions & 15 deletions noq/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};

use crate::runtime::TokioRuntime;
use crate::{Duration, Instant};
use crate::{Duration, Instant, OpenPathOpts};
use bytes::Bytes;
use proto::{ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig};
use rand::{Rng, SeedableRng, rngs::StdRng};
Expand Down Expand Up @@ -1024,7 +1024,7 @@ async fn test_open_path_ensure_existing_path() {

// Re-ensuring the already-established path (PathId::ZERO) takes the
// `existed` branch in `open_path_ensure`.
let fut = conn.open_path_ensure(server_addr, proto::PathStatus::Available);
let fut = conn.open_path_ensure(server_addr, OpenPathOpts::default());
let expected_path_id = fut
.path_id()
.expect("open_path_ensure should allocate or reuse a path id");
Expand Down Expand Up @@ -1078,7 +1078,7 @@ async fn test_multipath_observed_address() {
// this sleep after the poll_transmit unwraps have been addressed
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let path = conn
.open_path(server_addr, proto::PathStatus::Available)
.open_path(server_addr, OpenPathOpts::default())
.await
.unwrap();
let mut reports = path.observed_external_addr().unwrap();
Expand Down Expand Up @@ -1292,10 +1292,7 @@ async fn path_clone_stats_after_abandon() {
// Open a second path, while giving the remote some time to issue cids.
let path = tokio::time::timeout(Duration::from_secs(1), async {
loop {
match conn
.open_path(server_addr, proto::PathStatus::Available)
.await
{
match conn.open_path(server_addr, OpenPathOpts::default()).await {
Ok(path) => break path,
Err(proto::PathError::RemoteCidsExhausted) => {
tokio::time::sleep(Duration::from_millis(20)).await;
Expand Down Expand Up @@ -1379,10 +1376,7 @@ async fn closed_includes_path_stats_for_all_known_paths() -> TestResult {

// Open a second path.
let path2 = loop {
match conn
.open_path(server_addr, proto::PathStatus::Available)
.await
{
match conn.open_path(server_addr, OpenPathOpts::default()).await {
Ok(p) => break p,
Err(proto::PathError::RemoteCidsExhausted) => {
tokio::time::sleep(Duration::from_millis(20)).await;
Expand Down Expand Up @@ -1493,10 +1487,7 @@ async fn close_path() -> TestResult {

// Open a second path, retrying until remote CIDs are available
let path = loop {
match conn
.open_path(server_addr, proto::PathStatus::Available)
.await
{
match conn.open_path(server_addr, OpenPathOpts::default()).await {
Ok(path) => break path,
Err(proto::PathError::RemoteCidsExhausted) => {
tokio::time::sleep(Duration::from_millis(20)).await;
Expand Down
Loading