diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index c708aa71f..7892c6561 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -74,7 +74,8 @@ pub(crate) use packet_crypto::EncryptionLevel; mod paths; pub use paths::{ - ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError, + ClosedPath, OpenPathOpts, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, + SetPathStatusError, }; use paths::{PathData, PathState}; @@ -518,47 +519,19 @@ impl Connection { } } - /// Opens a new path only if no path on the same network path currently exists. - /// - /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path` - /// and pass it existing path's network paths. - /// - /// This means that you can pass `local_ip: None` to make the comparison only compare - /// remote addresses. - /// - /// This avoids having to guess which local interface will be used to communicate with the - /// remote, should it not be known yet. We assume that if we already have a path to the remote, - /// the OS is likely to use the same interface to talk to said remote. - /// - /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id, - /// false)` if was opened. - /// - /// [`open_path`]: Connection::open_path - pub fn open_path_ensure( - &mut self, - network_path: FourTuple, - initial_status: PathStatus, - now: Instant, - ) -> Result<(PathId, bool), PathError> { - let existing_open_path = self.paths.iter().find(|(id, path)| { - network_path.is_probably_same_path(&path.data.network_path) - && !self.abandoned_paths.contains(*id) - }); - match existing_open_path { - Some((path_id, _state)) => Ok((*path_id, true)), - None => Ok((self.open_path(network_path, initial_status, now)?, false)), - } - } - /// Opens a new path. /// + /// By default, if a path with `network_path` already exists, + /// [`PathError::PathExistsForNetworkPath`] is returned. Set + /// [`OpenPathOpts::allow_duplicate`] to `true` to open a new path regardless. + /// /// Further errors might occur and they will be emitted in [`PathEvent::Abandoned`] /// events with this path id. Once the path is opened and can carry application data it /// will be reported using a [`PathEvent::Established`] event. pub fn open_path( &mut self, network_path: FourTuple, - initial_status: PathStatus, + opts: OpenPathOpts, now: Instant, ) -> Result { if !self.is_multipath_negotiated() { @@ -568,6 +541,17 @@ impl Connection { return Err(PathError::ServerSideNotAllowed); } + if !opts.allow_duplicate { + let existing_open_path = self.paths.iter().find(|(id, path)| { + network_path.is_probably_same_path(&path.data.network_path) + && !self.abandoned_paths.contains(*id) + }); + + if let Some((path_id, _state)) = existing_open_path { + return Err(PathError::PathExistsForNetworkPath(*path_id)); + } + } + let max_abandoned = self.abandoned_paths.iter().max().copied(); let max_used = self.paths.keys().last().copied(); let path_id = max_abandoned @@ -596,7 +580,7 @@ impl Connection { } let path = self.ensure_path(path_id, network_path, now, None); - path.status.local_update(initial_status); + path.status.local_update(opts.initial_status); Ok(path_id) } @@ -5640,16 +5624,23 @@ impl Connection { .ok() .and_then(|s| s.pop_pending_path_open()) { - match self.open_path_ensure(network_path, PathStatus::Backup, now) { - Ok((path_id, already_existed)) => { + let opts = OpenPathOpts::default().initial_status(PathStatus::Backup); + match self.open_path(network_path, opts, now) { + Ok(path_id) => { debug!( %path_id, ?network_path, - new_path = !already_existed, "Opened NAT traversal path", ); } Err(err) => match err { + PathError::PathExistsForNetworkPath(path_id) => { + debug!( + %path_id, + ?network_path, + "NAT traversal path already exists", + ); + } PathError::MultipathNotNegotiated | PathError::ServerSideNotAllowed | PathError::ValidationFailed @@ -5857,7 +5848,13 @@ impl Connection { local_ip: None, /* allow the local ip to be discovered */ }; - if open_first && let Err(e) = self.open_path(network_path, status, now) { + // We set `allow_duplicate` to true: the loop above cleared `local_ip` on every open + // path, so any other path with the same remote would otherwise fail to open. + let opts = OpenPathOpts::default() + .initial_status(status) + .allow_duplicate(true); + + if open_first && let Err(e) = self.open_path(network_path, opts, now) { if self.side().is_client() { debug!(%e, "Failed to open new path for network change"); } @@ -5874,7 +5871,7 @@ impl Connection { continue; } - if !open_first && let Err(e) = self.open_path(network_path, status, now) { + if !open_first && let Err(e) = self.open_path(network_path, opts, now) { // Path has already been closed if we got here. Since the path was not recoverable, // this might be desirable in any case, because other paths exist (!open_first) and // this was is considered non recoverable @@ -7447,6 +7444,9 @@ pub enum PathError { /// The remote address for the path is not supported by the endpoint #[error("invalid remote address")] InvalidRemoteAddress(SocketAddr), + /// A path with the same network 4-tuple already exists. + #[error("a path with this network 4-tuple already exists (path id {_0})")] + PathExistsForNetworkPath(PathId), } /// Errors triggered when abandoning a path diff --git a/noq-proto/src/connection/paths.rs b/noq-proto/src/connection/paths.rs index 15910c6fd..c0c7e20db 100644 --- a/noq-proto/src/connection/paths.rs +++ b/noq-proto/src/connection/paths.rs @@ -1039,6 +1039,41 @@ pub enum PathStatus { Backup, } +/// Options for opening a new path via [`Connection::open_path`]. +/// +/// Defaults to [`PathStatus::Available`] and `allow_duplicate: false`. +/// +/// [`Connection::open_path`]: crate::Connection::open_path +#[derive(Debug, Copy, Clone, Default)] +pub struct OpenPathOpts { + pub(crate) initial_status: PathStatus, + pub(crate) allow_duplicate: bool, +} + +impl OpenPathOpts { + /// Sets the initial [`PathStatus`] for the new path. + #[must_use] + pub fn initial_status(mut self, initial_status: PathStatus) -> Self { + self.initial_status = initial_status; + self + } + + /// Whether to open a new path even if another open path with the same network 4-tuple + /// already exists. + /// + /// When `false` (the default), [`Connection::open_path`] returns + /// [`PathError::PathExistsForNetworkPath`] if an open path with the same network 4-tuple + /// already exists. When `true`, a new path is opened regardless. + /// + /// [`Connection::open_path`]: crate::Connection::open_path + /// [`PathError::PathExistsForNetworkPath`]: crate::PathError::PathExistsForNetworkPath + #[must_use] + pub fn allow_duplicate(mut self, allow_duplicate: bool) -> Self { + self.allow_duplicate = allow_duplicate; + self + } +} + /// Application events about paths #[derive(Debug, Clone, PartialEq, Eq)] pub enum PathEvent { diff --git a/noq-proto/src/lib.rs b/noq-proto/src/lib.rs index e100f281c..8aaa022b0 100644 --- a/noq-proto/src/lib.rs +++ b/noq-proto/src/lib.rs @@ -45,8 +45,8 @@ pub(crate) mod connection; pub use crate::connection::{ Chunk, Chunks, ClosePathError, ClosedPath, ClosedStream, Connection, ConnectionError, ConnectionStats, Datagrams, Event, FinishError, FrameStats, MultipathNotNegotiated, - NetworkChangeHint, PathAbandonReason, PathError, PathEvent, PathId, PathStats, PathStatus, - ReadError, ReadableError, RecvStream, RttEstimator, SendDatagramError, SendStream, + NetworkChangeHint, OpenPathOpts, PathAbandonReason, PathError, PathEvent, PathId, PathStats, + PathStatus, ReadError, ReadableError, RecvStream, RttEstimator, SendDatagramError, SendStream, SetPathStatusError, ShouldTransmit, StreamEvent, Streams, UdpStats, WriteError, }; #[cfg(test)] diff --git a/noq-proto/src/tests/multipath.rs b/noq-proto/src/tests/multipath.rs index 5735e3847..46b7677ee 100644 --- a/noq-proto/src/tests/multipath.rs +++ b/noq-proto/src/tests/multipath.rs @@ -605,11 +605,9 @@ fn open_path_ensure_after_abandon() -> TestResult { ); info!("opening path 2"); - let (path_id, existed) = pair.open_path_ensure(Client, second_path, PathStatus::Available)?; + let path_id = pair.open_path(Client, second_path, PathStatus::Available)?; pair.drive(); - assert!(!existed); - // The path should have been opened: assert_matches!( pair.poll(Client), diff --git a/noq-proto/src/tests/random_interaction.rs b/noq-proto/src/tests/random_interaction.rs index 8ecf34c14..8cdfbbb0d 100644 --- a/noq-proto/src/tests/random_interaction.rs +++ b/noq-proto/src/tests/random_interaction.rs @@ -5,7 +5,8 @@ use test_strategy::Arbitrary; use tracing::{debug, error, info, trace}; use crate::{ - ClientConfig, Connection, ConnectionHandle, Dir, FourTuple, PathId, PathStatus, Side, StreamId, + ClientConfig, Connection, ConnectionHandle, Dir, FourTuple, OpenPathOpts, PathId, PathStatus, + Side, StreamId, tests::{Pair, TestEndpoint}, }; @@ -166,7 +167,8 @@ impl TestOp { remote, local_ip: None, }; - conn.open_path(network_path, status, now) + let opts = OpenPathOpts::default().initial_status(status); + conn.open_path(network_path, opts, now) .inspect_err(|err| error!(?err, "OpenPath failed")) .ok(); } diff --git a/noq-proto/src/tests/util.rs b/noq-proto/src/tests/util.rs index b36514e64..ac99a3087 100644 --- a/noq-proto/src/tests/util.rs +++ b/noq-proto/src/tests/util.rs @@ -500,17 +500,6 @@ impl ConnPair { self.conn_mut(side).send_stream(id) } - pub(super) fn open_path_ensure( - &mut self, - side: Side, - network_path: FourTuple, - initial_status: PathStatus, - ) -> Result<(PathId, bool), PathError> { - let now = self.pair.time; - self.conn_mut(side) - .open_path_ensure(network_path, initial_status, now) - } - pub(super) fn open_path( &mut self, side: Side, @@ -518,8 +507,10 @@ impl ConnPair { initial_status: PathStatus, ) -> Result { let now = self.pair.time; - self.conn_mut(side) - .open_path(network_path, initial_status, now) + let opts = OpenPathOpts::default() + .initial_status(initial_status) + .allow_duplicate(true); + self.conn_mut(side).open_path(network_path, opts, now) } pub(super) fn close_path( diff --git a/noq/src/connection.rs b/noq/src/connection.rs index ee7f88b60..426c0a311 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -31,9 +31,9 @@ use crate::{ udp_transmit, }; use proto::{ - ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, PathError, - PathEvent, PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, - TransportErrorCode, congestion::Controller, n0_nat_traversal, + ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, + OpenPathOpts, PathError, PathEvent, PathId, PathStats, Side, StreamEvent, StreamId, + TransportError, TransportErrorCode, congestion::Controller, n0_nat_traversal, }; /// In-progress connection attempt future @@ -378,79 +378,26 @@ impl Connection { } } - /// Opens a new path if no path exists yet for the remote address. - /// - /// Otherwise behaves exactly as [`open_path`]. - /// - /// [`open_path`]: Self::open_path - pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath { - let mut state = self.0.lock_and_wake("open_path"); - - // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6. - // If not, we do not support IPv6. We can not access endpoint::State from here - // however, but either all our paths use an IPv6 address, or all our paths use an - // IPv4 address. So we can use that information. - let ipv6 = state - .inner - .paths() - .iter() - .filter_map(|id| { - state - .inner - .network_path(*id) - .map(|addrs| addrs.remote().is_ipv6()) - .ok() - }) - .next() - .unwrap_or_default(); - if addr.is_ipv6() && !ipv6 { - return OpenPath::rejected(PathError::InvalidRemoteAddress(addr)); - } - let addr = if ipv6 { - SocketAddr::V6(ensure_ipv6(addr)) - } else { - addr - }; - - let now = state.runtime.now(); - // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently. - // However, changing that would mean changing the API. - let addrs = FourTuple::from_remote(addr); - let open_res = state.inner.open_path_ensure(addrs, initial_status, now); - match open_res { - Ok((path_id, existed)) if existed => { - let recv = state.open_path.get(&path_id).map(|tx| tx.subscribe()); - drop(state); - match recv { - Some(recv) => OpenPath::new(path_id, recv, self.0.clone()), - None => OpenPath::ready(path_id, self.0.clone()), - } - } - Ok((path_id, _)) => { - let (tx, rx) = watch::channel(Ok(())); - state.open_path.insert(path_id, tx); - drop(state); - OpenPath::new(path_id, rx, self.0.clone()) - } - Err(err) => OpenPath::rejected(err), - } - } - - /// Opens an additional path if the multipath extension is negotiated. + /// Opens an additional path via `network_path` if the multipath extension is negotiated. /// /// The returned future completes once the path is either fully opened and ready to /// carry application data, or if there was an error. /// - /// Dropping the returned future does not cancel the opening of the path, the - /// [`PathEvent::Established`] event will still be emitted from [`Self::path_events`] if - /// the path opens. The [`PathId`] for the events can be extracted from - /// [`OpenPath::path_id`]. + /// Returns `Err` immediately if the path cannot be opened (for example, multipath + /// is not negotiated, the maximum path id is reached or if no connection ids are available + /// for the new path). + /// + /// If a path with `network_path` already exists, and [`OpenPathOpts::allow_duplicate`] + /// is not set to `true`, then [`PathError::PathExistsForNetworkPath`] is returned. Set + /// [`OpenPathOpts::allow_duplicate`] to `true` to open a new path regardless. /// - /// Failure to open a path can either occur immediately, before polling the returned - /// future, or at a later time. If the failure is immediate [`OpenPath::path_id`] will - /// return `None` and the future will be ready immediately. If the failure happens - /// later, a [`PathEvent`] will be emitted. - pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath { + /// Dropping the returned future does not cancel the opening of the path. The + /// [`PathEvent::Established`] event will still be emitted from [`Self::path_events`]. + pub fn open_path( + &self, + network_path: FourTuple, + opts: OpenPathOpts, + ) -> Result { let mut state = self.0.lock_and_wake("open_path"); // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6. @@ -470,29 +417,22 @@ impl Connection { }) .next() .unwrap_or_default(); - if addr.is_ipv6() && !ipv6 { - return OpenPath::rejected(PathError::InvalidRemoteAddress(addr)); + let remote = network_path.remote(); + if remote.is_ipv6() && !ipv6 { + return Err(PathError::InvalidRemoteAddress(remote)); } - let addr = if ipv6 { - SocketAddr::V6(ensure_ipv6(addr)) + let network_path = if ipv6 { + FourTuple::new(SocketAddr::V6(ensure_ipv6(remote)), network_path.local_ip()) } else { - addr + network_path }; - let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(())); + let (on_open_path_send, on_open_path_recv) = oneshot::channel(); let now = state.runtime.now(); - // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently. - // However, changing that would mean changing the API. - let addrs = FourTuple::from_remote(addr); - let open_res = state.inner.open_path(addrs, initial_status, now); - match open_res { - Ok(path_id) => { - state.open_path.insert(path_id, on_open_path_send); - drop(state); - OpenPath::new(path_id, on_open_path_recv, self.0.clone()) - } - Err(err) => OpenPath::rejected(err), - } + let path_id = state.inner.open_path(network_path, opts, now)?; + state.open_path.insert(path_id, on_open_path_send); + drop(state); + Ok(OpenPath::new(path_id, on_open_path_recv, self.0.clone())) } /// Returns the [`Path`] structure of an open path @@ -1410,7 +1350,7 @@ pub(crate) struct State { /// Always set to Some before the connection becomes drained pub(crate) error: Option, /// Tracks paths being opened - open_path: FxHashMap>>, + open_path: FxHashMap>>, /// Tracks reference counts for paths. /// /// I.e. how many [`Path`] and [`WeakPathHandle`] structs are alive for a path. @@ -1642,7 +1582,7 @@ impl State { Path(ref evt @ PathEvent::Established { id }) => { self.path_events.send(evt.clone()).ok(); if let Some(sender) = self.open_path.remove(&id) { - sender.send_modify(|value| *value = Ok(())); + sender.send(Ok(())).ok(); } } Path(ref evt @ PathEvent::Discarded { id, ref path_stats }) => { @@ -1661,7 +1601,7 @@ impl State { // The previous iteration of this code had another event `PathEvent::LocallyClosed` which // contained a `PathError`, but that was only ever set to `ValidationFailed`. let error = PathError::ValidationFailed; - sender.send_modify(|value| *value = Err(error)); + sender.send(Err(error)).ok(); } // this will happen also for already opened paths self.path_events.send(evt.clone()).ok(); diff --git a/noq/src/lib.rs b/noq/src/lib.rs index e4b351eef..0316ad2a0 100644 --- a/noq/src/lib.rs +++ b/noq/src/lib.rs @@ -64,11 +64,12 @@ pub use proto::{ AckFrequencyConfig, ApplicationClose, Chunk, ClientConfig, ClosePathError, ClosedPath, ClosedStream, ConfigError, ConnectError, ConnectionClose, ConnectionError, ConnectionId, ConnectionIdGenerator, ConnectionStats, DecryptedInitial, Dir, EcnCodepoint, EndpointConfig, - FrameStats, FrameType, IdleTimeout, InvalidCid, MtuDiscoveryConfig, NetworkChangeHint, - NoneTokenLog, NoneTokenStore, PathError, PathEvent, PathId, PathStats, PathStatus, - ServerConfig, SetPathStatusError, Side, StdSystemTime, StreamId, TimeSource, TokenLog, - TokenMemoryCache, TokenReuseError, TokenStore, Transmit, TransportConfig, TransportErrorCode, - UdpStats, ValidationTokenConfig, VarInt, VarIntBoundsExceeded, congestion, crypto, + FourTuple, FrameStats, FrameType, IdleTimeout, InvalidCid, MtuDiscoveryConfig, + NetworkChangeHint, NoneTokenLog, NoneTokenStore, OpenPathOpts, PathError, PathEvent, PathId, + PathStats, PathStatus, ServerConfig, SetPathStatusError, Side, StdSystemTime, StreamId, + TimeSource, TokenLog, TokenMemoryCache, TokenReuseError, TokenStore, Transmit, TransportConfig, + TransportErrorCode, UdpStats, ValidationTokenConfig, VarInt, VarIntBoundsExceeded, congestion, + crypto, }; #[cfg(feature = "qlog")] pub use proto::{QlogConfig, QlogFactory, QlogFileFactory}; diff --git a/noq/src/path.rs b/noq/src/path.rs index 780d6cb73..b764e6a54 100644 --- a/noq/src/path.rs +++ b/noq/src/path.rs @@ -7,99 +7,54 @@ use std::task::{Context, Poll, ready}; use std::time::Duration; use proto::{ - ClosePathError, ClosedPath, PathError, PathEvent, PathId, PathStats, PathStatus, + ClosePathError, ClosedPath, FourTuple, PathError, PathEvent, PathId, PathStats, PathStatus, SetPathStatusError, TransportErrorCode, }; -use tokio::sync::watch; +use tokio::sync::{oneshot, watch}; use tokio_stream::{Stream, wrappers::WatchStream}; use crate::connection::ConnectionRef; use crate::{Runtime, WeakConnectionHandle}; /// Future produced by [`crate::Connection::open_path`] -pub struct OpenPath(OpenPathInner); - -enum OpenPathInner { - /// Opening a path in underway - /// - /// This might fail later on. - Ongoing { - opened: WatchStream>, - path_id: PathId, - conn: ConnectionRef, - }, - /// Opening a path failed immediately - Rejected { - /// The error that occurred - err: PathError, - }, - /// The path is already open - Ready { - path_id: PathId, - conn: ConnectionRef, - }, +pub struct OpenPath { + opened: oneshot::Receiver>, + path_id: PathId, + conn: ConnectionRef, } impl OpenPath { pub(crate) fn new( path_id: PathId, - opened: watch::Receiver>, + opened: oneshot::Receiver>, conn: ConnectionRef, ) -> Self { - Self(OpenPathInner::Ongoing { - opened: WatchStream::from_changes(opened), + Self { + opened, path_id, conn, - }) - } - - pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self { - Self(OpenPathInner::Ready { path_id, conn }) - } - - pub(crate) fn rejected(err: PathError) -> Self { - Self(OpenPathInner::Rejected { err }) + } } /// Returns the path ID of the new path being opened. - /// - /// If an error occurred before a path ID was allocated, `None` is returned. In this - /// case the future is ready and polling it will immediately yield the error. - /// - /// The returned value remains the same for the entire lifetime of this future. - pub fn path_id(&self) -> Option { - match self.0 { - OpenPathInner::Ongoing { path_id, .. } => Some(path_id), - OpenPathInner::Rejected { .. } => None, - OpenPathInner::Ready { path_id, .. } => Some(path_id), - } + pub fn path_id(&self) -> PathId { + self.path_id } } impl Future for OpenPath { type Output = Result; - fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - match self.get_mut().0 { - OpenPathInner::Ongoing { - ref mut opened, - path_id, - ref mut conn, - } => match ready!(Pin::new(opened).poll_next(ctx)) { - Some(value) => { - Poll::Ready(value.map(|_| Path::new_unchecked(conn.clone(), path_id))) - } - None => { - // This only happens if receiving a notification change failed, this means the - // sender was dropped. This generally should not happen so we use a transient - // error - Poll::Ready(Err(PathError::ValidationFailed)) - } - }, - OpenPathInner::Ready { - path_id, - ref mut conn, - } => Poll::Ready(Ok(Path::new_unchecked(conn.clone(), path_id))), - OpenPathInner::Rejected { err } => Poll::Ready(Err(err)), + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.opened).poll(ctx)) { + Ok(value) => { + Poll::Ready(value.map(|_| Path::new_unchecked(self.conn.clone(), self.path_id))) + } + Err(_sender_dropped) => { + // This only happens if receiving a notification change failed, this means the + // sender was dropped. This generally should not happen so we use a transient + // error + Poll::Ready(Err(PathError::ValidationFailed)) + } } } } @@ -269,6 +224,14 @@ impl Path { Ok(state.inner.network_path(self.id())?.local_ip()) } + /// The network path used for this path, if known. + /// + /// Returns a [`FourTuple`], combining [`Self::remote_address`] and [`Self::local_ip`]. + pub fn network_path(&self) -> Result { + let state = self.conn.lock_without_waking("per_path_local_ip"); + state.inner.network_path(self.id()) + } + /// Ping the remote endpoint over this path. pub fn ping(&self) -> Result<(), ClosedPath> { let mut state = self.conn.lock_and_wake("ping"); diff --git a/noq/src/tests.rs b/noq/src/tests.rs index c6629e90f..24124f2f8 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -23,7 +23,10 @@ use std::{ use crate::runtime::TokioRuntime; use crate::{Duration, Instant}; use bytes::Bytes; -use proto::{ConnectionError, PathId, RandomConnectionIdGenerator, crypto::rustls::QuicClientConfig}; +use proto::{ + ConnectionError, FourTuple, OpenPathOpts, PathError, PathId, RandomConnectionIdGenerator, + crypto::rustls::QuicClientConfig, +}; use rand::{Rng, SeedableRng, rngs::StdRng}; use rustls::{ RootCertStore, @@ -1024,17 +1027,8 @@ async fn test_open_path_ensure_existing_path() { // Re-ensuring the already-established path (PathId::ZERO) takes the // `existed` branch in `open_path_ensure`. - let fut = conn.open_path_ensure(server_addr, proto::PathStatus::Available); - let expected_path_id = fut - .path_id() - .expect("open_path_ensure should allocate or reuse a path id"); - - let path = tokio::time::timeout(Duration::from_millis(200), fut) - .await - .expect("open_path_ensure(existing path) timed out") - .expect("open_path_ensure(existing path) failed"); - assert_eq!(path.id(), expected_path_id); - assert_eq!(path.remote_address().unwrap(), server_addr); + let res = conn.open_path(FourTuple::from_remote(server_addr), OpenPathOpts::default()); + assert!(matches!(res, Err(PathError::PathExistsForNetworkPath(id)) if id == PathId::ZERO)); } .instrument(info_span!("client")); @@ -1078,7 +1072,11 @@ async fn test_multipath_observed_address() { // this sleep after the poll_transmit unwraps have been addressed tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let path = conn - .open_path(server_addr, proto::PathStatus::Available) + .open_path( + FourTuple::from_remote(server_addr), + OpenPathOpts::default().allow_duplicate(true), + ) + .expect("open_path") .await .unwrap(); let mut reports = path.observed_external_addr().unwrap(); @@ -1292,11 +1290,11 @@ async fn path_clone_stats_after_abandon() { // Open a second path, while giving the remote some time to issue cids. let path = tokio::time::timeout(Duration::from_secs(1), async { loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { - Ok(path) => break path, + match conn.open_path( + FourTuple::from_remote(server_addr), + OpenPathOpts::default().allow_duplicate(true), + ) { + Ok(fut) => break fut.await.expect("path open failed"), Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await; } @@ -1379,15 +1377,15 @@ async fn closed_includes_path_stats_for_all_known_paths() -> TestResult { // Open a second path. let path2 = loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { - Ok(p) => break p, + match conn.open_path( + FourTuple::from_remote(server_addr), + OpenPathOpts::default().allow_duplicate(true), + ) { Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await; } Err(err) => Err(err)?, + Ok(fut) => break fut.await?, } }; let path2_id = path2.id(); @@ -1493,11 +1491,11 @@ async fn close_path() -> TestResult { // Open a second path, retrying until remote CIDs are available let path = loop { - match conn - .open_path(server_addr, proto::PathStatus::Available) - .await - { - Ok(path) => break path, + match conn.open_path( + FourTuple::from_remote(server_addr), + OpenPathOpts::default().allow_duplicate(true), + ) { + Ok(fut) => break fut.await?, Err(proto::PathError::RemoteCidsExhausted) => { tokio::time::sleep(Duration::from_millis(20)).await; }