Skip to content
Merged
2 changes: 1 addition & 1 deletion bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async fn client(
// to `Arc`ing them
connection.close(0u32.into(), b"Benchmark done");

endpoint.wait_idle().await;
endpoint.wait_all_draining().await;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
Expand Down
21 changes: 18 additions & 3 deletions noq-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2386,7 +2386,10 @@ impl Connection {
match timer {
Timer::Conn(timer) => match timer {
ConnTimer::Close => {
self.state.move_to_drained(None);
let was_draining = self.state.move_to_drained(None);
if !was_draining {
self.endpoint_events.push_back(EndpointEventInner::Draining);
}
// move_to_drained checks that we weren't in drained before.
// Adding events to endpoint_events is only legal if `Drained` was never queued before.
self.endpoint_events.push_back(EndpointEventInner::Drained);
Expand Down Expand Up @@ -4370,7 +4373,10 @@ impl Connection {
code: TransportErrorCode::AEAD_LIMIT_REACHED,
..
}) => {
self.state.move_to_drained(Some(conn_err));
let was_draining = self.state.move_to_drained(Some(conn_err));
if !was_draining {
self.endpoint_events.push_back(EndpointEventInner::Draining);
}
}
ConnectionError::TimedOut => {
unreachable!("timeouts aren't generated by packet processing");
Expand All @@ -4381,6 +4387,7 @@ impl Connection {
}
ConnectionError::VersionMismatch => {
self.state.move_to_draining(Some(conn_err));
self.endpoint_events.push_back(EndpointEventInner::Draining);
}
ConnectionError::LocallyClosed => {
unreachable!("LocallyClosed isn't generated by packet processing");
Expand Down Expand Up @@ -4491,13 +4498,16 @@ impl Connection {
continue;
};

trace!(?frame, "processing frame in closed state");

self.path_stats
.for_path(path_id)
.frame_rx
.record(frame.ty());

if let Frame::Close(_error) = frame {
self.state.move_to_draining(None);
self.endpoint_events.push_back(EndpointEventInner::Draining);
Comment thread
matheus23 marked this conversation as resolved.
break;
}
}
Expand Down Expand Up @@ -4827,6 +4837,7 @@ impl Connection {
}
Frame::Close(reason) => {
self.state.move_to_draining(Some(reason.into()));
self.endpoint_events.push_back(EndpointEventInner::Draining);
return Ok(());
}
_ => {
Expand Down Expand Up @@ -5578,6 +5589,7 @@ impl Connection {

if let Some(reason) = close {
self.state.move_to_draining(Some(reason.into()));
self.endpoint_events.push_back(EndpointEventInner::Draining);
self.connection_close_pending = true;
}

Expand Down Expand Up @@ -6946,7 +6958,10 @@ impl Connection {
/// Terminate the connection instantly, without sending a close packet
fn kill(&mut self, reason: ConnectionError) {
self.close_common();
self.state.move_to_drained(Some(reason));
let was_draining = self.state.move_to_drained(Some(reason));
if !was_draining {
self.endpoint_events.push_back(EndpointEventInner::Draining);
}
// move_to_drained checks that we were never in drained before, so we
// never sent a `Drained` event before (it's illegal to send more events after drained).
self.endpoint_events.push_back(EndpointEventInner::Drained);
Expand Down
25 changes: 16 additions & 9 deletions noq-proto/src/connection/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,20 @@ impl State {
/// Moves to the drained state.
///
/// Panics if the state was already drained.
pub(super) fn move_to_drained(&mut self, error: Option<ConnectionError>) {
let (error, is_local) = if let Some(error) = error {
(Some(error), false)
///
/// Returns whether we were in the draining state before.
pub(super) fn move_to_drained(&mut self, error: Option<ConnectionError>) -> bool {
let (error, is_local, was_draining) = if let Some(error) = error {
(
Some(error),
false,
matches!(self.inner, InnerState::Draining { .. }),
)
} else {
let error = match &mut self.inner {
InnerState::Draining { error, .. } => error.take(),
let (error, was_draining) = match &mut self.inner {
InnerState::Draining { error, .. } => (error.take(), true),
InnerState::Drained { .. } => panic!("invalid state transition drained -> drained"),
InnerState::Closed { error_read, .. } if *error_read => None,
InnerState::Closed { error_read, .. } if *error_read => (None, false),
InnerState::Closed { remote_reason, .. } => {
let error = match remote_reason.clone().into() {
ConnectionError::ConnectionClosed(close) => {
Expand All @@ -89,14 +95,15 @@ impl State {
}
e => e,
};
Some(error)
(Some(error), false)
}
InnerState::Handshake(_) | InnerState::Established => None,
InnerState::Handshake(_) | InnerState::Established => (None, false),
};
(error, self.is_local_close())
(error, self.is_local_close(), was_draining)
};
self.inner = InnerState::Drained { error, is_local };
trace!("connection state: drained");
was_draining
}

/// Moves to a draining state.
Expand Down
3 changes: 3 additions & 0 deletions noq-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl Endpoint {
}
}
}
Draining => {
// Nothing to do.
}
Drained => {
if let Some(conn) = self.connections.try_remove(ch.0) {
self.index.remove(&conn);
Expand Down
7 changes: 7 additions & 0 deletions noq-proto/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ impl EndpointEvent {
pub fn is_drained(&self) -> bool {
self.0 == EndpointEventInner::Drained
}

/// Whether this is the event is the event indicating the start of the draining period.
pub fn is_draining(&self) -> bool {
self.0 == EndpointEventInner::Draining
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum EndpointEventInner {
/// The connection started draining
Draining,
/// The connection has been drained
Drained,
/// The connection has a new active reset token
Expand Down
56 changes: 56 additions & 0 deletions noq-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4320,3 +4320,59 @@ fn regression_close_without_connection_event() {
Some(Event::ConnectionLost { .. })
);
}

/// Ensures that the draining delay for the server is exactly 0.5 RTT and 1 RTT for the client.
///
/// The draining delay is the time between the connection being closed and the connection
/// entering the "draining" state (either on the same or on the other side).
///
/// We expect the side that *receives* the CONNECTION_CLOSE to immediately enter the draining
/// state. However in absolute terms, it'll be delayed by 0.5 RTT (exactly the latency) compared
/// to when `connection.close()` was called.
/// On the side that called `connection.close()` we first enter the "closed" state, and only
/// enter the "draining" state once we *receive* a "reciprocal" CONNECTION_CLOSE from the other
/// side. In the normal case this will be exactly 1 RTT after calling `connection.close()` to
/// account for the latency of CONNECTION_CLOSE going one way and then coming back.
///
/// The "draining" state from noq-proto is observed by noq to enable `wait_idle` waiting the
/// ideal amount of time before allowing us to close the socket.
#[test]
fn timely_graceful_close() {
const ONE_WAY_LATENCY: Duration = Duration::from_millis(100);

let _guard = subscribe();
let mut pair = Pair::default();
pair.latency = ONE_WAY_LATENCY;
let mut pair = ConnPair::connect_with(pair, client_config());

let start = pair.time;
pair.close(Client, 0, b"done!");

assert!(!pair.is_draining(Client));
assert!(!pair.is_draining(Server));

// The client now sends CONNECTION_CLOSE to the server and it processes it.
// When the server receives CONNECTION_CLOSE, it responds with one of its own
// and enters the draining state.
pair.drive_client();
pair.advance_time();
let now = pair.time;
pair.drive_server();

assert!(pair.is_draining(Server));
let server_draining_delay = now.saturating_duration_since(start);
info!(?server_draining_delay);
assert_eq!(server_draining_delay, ONE_WAY_LATENCY);

// The server has now sent a CONNECTION_CLOSE back in response and the client processes it.
// The client then enters the draining state once it processed the response.
// already drove server
pair.advance_time();
let now = pair.time;
pair.drive_client();

assert!(pair.is_draining(Client));
let client_draining_delay = now.saturating_duration_since(start);
info!(?client_draining_delay);
assert_eq!(client_draining_delay, ONE_WAY_LATENCY * 2);
}
19 changes: 12 additions & 7 deletions noq-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,13 @@ impl ConnPair {
let now = self.pair.time;
self.conn_mut(side).handle_network_change(hint, now);
}

pub(super) fn is_draining(&self, side: Side) -> bool {
match side {
Client => self.client.draining_connections.contains(&self.client_ch),
Server => self.server.draining_connections.contains(&self.server_ch),
}
}
}

impl Default for Pair {
Expand All @@ -829,7 +836,7 @@ pub(super) struct TestEndpoint {
pub(super) inbound: VecDeque<Inbound>,
pub(super) accepted: Option<Result<ConnectionHandle, ConnectionError>>,
pub(super) connections: HashMap<ConnectionHandle, Connection>,
drained_connections: HashSet<ConnectionHandle>,
pub(super) draining_connections: HashSet<ConnectionHandle>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
pub(super) captured_packets: Vec<Vec<u8>>,
pub(super) capture_inbound_packets: bool,
Expand Down Expand Up @@ -872,7 +879,7 @@ impl TestEndpoint {
inbound: VecDeque::new(),
accepted: None,
connections: HashMap::default(),
drained_connections: HashSet::default(),
draining_connections: HashSet::default(),
conn_events: HashMap::default(),
captured_packets: Vec::new(),
capture_inbound_packets: false,
Expand Down Expand Up @@ -975,8 +982,8 @@ impl TestEndpoint {
}

for (ch, event) in endpoint_events {
if event.is_drained() {
self.drained_connections.insert(ch);
if event.is_draining() {
self.draining_connections.insert(ch);
}
if let Some(event) = self.handle_event(ch, event)
&& let Some(conn) = self.connections.get_mut(&ch)
Expand Down Expand Up @@ -1084,11 +1091,9 @@ pub(crate) fn subscribe() -> tracing::subscriber::DefaultGuard {
.with_default_directive(tracing::Level::TRACE.into())
.from_env_lossy(),
)
.without_time()
.with_line_number(true)
.with_writer(|| TestWriter);
// tracing uses std::time to trace time, which panics in wasm.
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
let builder = builder.without_time();
tracing::subscriber::set_default(builder.finish())
}

Expand Down
2 changes: 1 addition & 1 deletion noq/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async fn run(options: Opt) -> Result<()> {
conn.close(0u32.into(), b"done");

// Give the server a fair chance to receive the close packet
endpoint.wait_idle().await;
endpoint.wait_all_draining().await;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion noq/examples/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = connection.accept_uni().await;

// Make sure the server has a chance to clean up
endpoint.wait_idle().await;
endpoint.wait_all_draining().await;

Ok(())
}
2 changes: 1 addition & 1 deletion noq/examples/insecure_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box<dyn Error + Send
// Dropping handles allows the corresponding objects to automatically shut down
drop(connection);
// Make sure the server has a chance to clean up
endpoint.wait_idle().await;
endpoint.wait_all_draining().await;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion noq/examples/single_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);

// Make sure the server has a chance to clean up
client.wait_idle().await;
client.wait_all_draining().await;

Ok(())
}
Expand Down
Loading
Loading