diff --git a/noq-proto/proptest-regressions/tests/proptests.txt b/noq-proto/proptest-regressions/tests/proptests.txt index 7bc939ff5..7e8ce21bb 100644 --- a/noq-proto/proptest-regressions/tests/proptests.txt +++ b/noq-proto/proptest-regressions/tests/proptests.txt @@ -32,3 +32,4 @@ cc 91184c7b6b718961d2dc03365f02098a18ac0035ca85b95654fbafa430d93664 # shrinks to cc ec5baef3027436b012a332a97d46814ed157f259337c3f651bbb7a3233bd9c7f # shrinks to input = _RandomInteractionWithMultipathSimpleRoutingArgs { seed: [121, 74, 209, 215, 123, 149, 7, 227, 67, 200, 91, 12, 216, 81, 208, 77, 83, 181, 39, 2, 207, 186, 233, 211, 254, 178, 230, 22, 100, 197, 215, 43], interactions: [Drive { side: Client }, ClosePath { side: Client, path_idx: 0, error_code: 0 }] } cc b1429b84bf576bb9000e8d0d6d53cff4c93efacf033c9df898aeb9856d1b03fe # shrinks to input = _RandomInteractionWithMultipathSimpleRoutingArgs { seed: [159, 14, 107, 252, 130, 4, 190, 131, 86, 208, 127, 29, 140, 30, 55, 65, 242, 192, 2, 158, 40, 51, 110, 116, 46, 139, 156, 165, 64, 109, 33, 62], interactions: [PassiveMigration { side: Server, addr_idx: 0 }, OpenPath { side: Client, status: Available, addr_idx: 0 }] } cc 4f717acb71d562f33601dfc8c7fbcca89b13c8259676a20ffc764a92e3ea07a1 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [84, 97, 201, 172, 244, 139, 252, 60, 222, 107, 135, 245, 103, 45, 188, 138, 26, 198, 1, 97, 144, 22, 42, 228, 19, 154, 45, 135, 222, 137, 231, 16], interactions: [PassiveMigration { side: Server, addr_idx: 0 }, OpenPath { side: Client, status: Available, addr_idx: 0 }, ClosePath { side: Client, path_idx: 0, error_code: 0 }, PathSetStatus { side: Server, path_idx: 0, status: Backup }], routes: RoutingTable { client_routes: [([::ffff:1.1.1.0]:44433, 0)], server_routes: [([::ffff:2.2.2.0]:4433, 0)] } } +cc e5068a25dcc49304a9afb3ff2bb0b62f9c6f6ca4370dae0b6bfa79e109592345 # shrinks to input = _RandomInteractionArgs { setup: PairSetup { seed: Zeroes, extensions: MultipathOnly, mtud_enabled: true, routing_setup: SimpleSymmetric }, establishment: Full, interactions: [PassiveMigration { side: Server, addr_idx: 0 }, PathSetStatus { side: Client, path_idx: 0, status: Backup }, DriveBothToIdle, OpenPath { side: Client, status: Backup, addr_idx: 0 }] } diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index c708aa71f..74655a109 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -28,7 +28,7 @@ use crate::{ qlog::{QlogRecvPacket, QlogSink}, spaces::LostPacket, stats::PathStatsMap, - timer::{ConnTimer, PathTimer}, + timer::{ConnTimer, PathTimer, Timer, TimerTable}, }, crypto::{self, Keys}, frame::{ @@ -102,8 +102,7 @@ pub use streams::{ ShouldTransmit, StreamEvent, Streams, WriteError, }; -mod timer; -use timer::{Timer, TimerTable}; +pub(crate) mod timer; mod transmit_buf; use transmit_buf::TransmitBuf; @@ -458,6 +457,12 @@ impl Connection { self.timers.peek() } + /// Returns the unhandled timers that would expire at given instant. + #[cfg(test)] + pub(crate) fn peek_expiring_timers(&self, now: Instant) -> Vec { + self.timers.timers_before(now) + } + /// Returns application-facing events /// /// Connections should be polled for events after: @@ -3804,6 +3809,7 @@ impl Connection { .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now)); } else { let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space)); + trace!(?dt, "setting idle timer"); self.timers.set( Timer::Conn(ConnTimer::Idle), now + dt, @@ -6810,9 +6816,13 @@ impl Connection { .saturating_sub(path.in_flight.bytes) } - /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running + /// Which timer keeps this connection from being idle right now. + /// + /// Idle in this case means the next thing that would happen on this + /// connection (unless other traffic is received) would be the connection just sending + /// more keep alives, CIDs, key updates or outright closing due to idle. #[cfg(test)] - pub(crate) fn is_idle(&self) -> bool { + pub(crate) fn non_idle_timer(&self) -> Option { let current_timers = self.timers.values(); current_timers .into_iter() @@ -6826,12 +6836,13 @@ impl Connection { ) }) .min_by_key(|(_, time)| *time) - .is_none_or(|(timer, _)| { - matches!( - timer, + .filter(|(next_timer, _)| { + !matches!( + next_timer, Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle) ) }) + .map(|(timer, _time)| timer) } /// Whether explicit congestion notification is in use on outgoing packets. diff --git a/noq-proto/src/connection/timer.rs b/noq-proto/src/connection/timer.rs index 4b2a47ee9..320254c43 100644 --- a/noq-proto/src/connection/timer.rs +++ b/noq-proto/src/connection/timer.rs @@ -419,6 +419,31 @@ impl TimerTable { values } + + #[cfg(test)] + pub(crate) fn timers_before(&self, now: Instant) -> Vec { + let mut values = Vec::new(); + + for timer in ConnTimer::VALUES { + if let Some(time) = self.generic[timer as usize] + && time <= now + { + values.push(Timer::Conn(timer)); + } + } + + for timer in PathTimer::VALUES { + for (path_id, timers) in self.path_timers.iter() { + if let Some(time) = timers.timers[timer as usize] + && time <= now + { + values.push(Timer::PerPath(*path_id, timer)); + } + } + } + + values + } } #[cfg(test)] diff --git a/noq-proto/src/tests/proptests.rs b/noq-proto/src/tests/proptests.rs index 5251b9de5..3fbaae1f8 100644 --- a/noq-proto/src/tests/proptests.rs +++ b/noq-proto/src/tests/proptests.rs @@ -13,11 +13,11 @@ use test_strategy::proptest; use tracing::error; use crate::{ - ClientConfig, Connection, ConnectionClose, ConnectionError, Event, PathStatus, Side, - TransportConfig, TransportErrorCode, + ClientConfig, Connection, ConnectionClose, ConnectionError, Event, MtuDiscoveryConfig, + PathStatus, Side, TransportConfig, TransportErrorCode, tests::{ Pair, RoutingTable, client_config, - random_interaction::{TestOp, run_random_interaction}, + random_interaction::{Establishment, TestOp, run_random_interaction}, server_config, subscribe, }, }; @@ -72,6 +72,7 @@ const SERVER_ADDRS: [SocketAddr; 3] = [ struct PairSetup { seed: Seed, extensions: Extensions, + mtud_enabled: bool, routing_setup: RoutingSetup, } @@ -143,6 +144,9 @@ impl PairSetup { transport.max_remote_nat_traversal_addresses(MAX_QNT_ADDRS); } + // enable/disable MTUD + transport.mtu_discovery_config(self.mtud_enabled.then_some(MtuDiscoveryConfig::default())); + // Initialize the server config let mut server_cfg = server_config(); @@ -201,18 +205,22 @@ impl Seed { #[proptest(cases = 256)] fn random_interaction( setup: PairSetup, + establishment: Establishment, #[strategy(vec(any::(), 0..100))] interactions: Vec, ) { let (mut pair, client_config) = setup.run("random_interaction"); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, establishment); prop_assert!(!pair.drive_bounded(1000), "connection never became idle"); prop_assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - prop_assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + prop_assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } fn routing_table() -> impl Strategy { @@ -307,6 +315,7 @@ fn regression_unset_packet_acked() { 33, 89, 203, 28, 107, 123, 117, 6, 54, 215, 244, 47, 1, ]), extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -328,15 +337,18 @@ fn regression_unset_packet_acked() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -348,6 +360,7 @@ fn regression_invalid_key() { 130, 117, 84, 250, 190, 50, 237, 14, 167, 60, 5, 140, 149, ]), extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -367,15 +380,18 @@ fn regression_invalid_key() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// Regression test for the "invalid key" panic in `noq-proto::Endpoint::handle_event`. @@ -395,6 +411,7 @@ fn regression_invalid_key2() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -418,15 +435,18 @@ fn regression_invalid_key2() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -438,6 +458,7 @@ fn regression_key_update_error() { 187, 208, 54, 158, 239, 190, 82, 198, 62, 91, 51, 53, 226, ]), extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -452,15 +473,18 @@ fn regression_key_update_error() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -469,6 +493,7 @@ fn regression_never_idle() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -491,15 +516,18 @@ fn regression_never_idle() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -508,6 +536,7 @@ fn regression_never_idle2() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -532,16 +561,19 @@ fn regression_never_idle2() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); // We needed to increase the bounds. It eventually times out. assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -550,6 +582,7 @@ fn regression_packet_number_space_missing() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -574,15 +607,18 @@ fn regression_packet_number_space_missing() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -591,6 +627,7 @@ fn regression_peer_failed_to_respond_with_path_abandon() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(old_routing_table()), }; let interactions = vec![ @@ -608,15 +645,18 @@ fn regression_peer_failed_to_respond_with_path_abandon() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -625,6 +665,7 @@ fn regression_peer_failed_to_respond_with_path_abandon2() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -652,15 +693,18 @@ fn regression_peer_failed_to_respond_with_path_abandon2() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// This test sets up two addresses for the server side: @@ -700,6 +744,7 @@ fn regression_path_validation() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(RoutingTable::from_routes( vec![("[::ffff:1.1.1.0]:44433".parse().unwrap(), 0)], vec![ @@ -731,15 +776,18 @@ fn regression_path_validation() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// This regression test used to fail with the client never becoming idle. @@ -764,6 +812,7 @@ fn regression_never_idle3() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -788,15 +837,18 @@ fn regression_never_idle3() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -805,6 +857,7 @@ fn regression_frame_encoding_error() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -827,15 +880,18 @@ fn regression_frame_encoding_error() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } #[test] @@ -844,6 +900,7 @@ fn regression_there_should_be_at_least_one_path() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -859,15 +916,18 @@ fn regression_there_should_be_at_least_one_path() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// This test will loop forever, unless the loss detection timer is allowed to back off @@ -895,6 +955,7 @@ fn regression_conn_never_idle5() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -911,15 +972,18 @@ fn regression_conn_never_idle5() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// Yet another regression with PATH_ABANDON "not being answered" by our peer. @@ -950,6 +1014,7 @@ fn regression_peer_ignored_path_abandon() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -979,15 +1044,18 @@ fn regression_peer_ignored_path_abandon() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// A regression test that used to put noq into a state of sending PATH_CHALLENGE @@ -1019,6 +1087,7 @@ fn regression_never_idle4() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::Complex(RoutingTable::from_routes( vec![ ("[::ffff:1.1.1.0]:44433".parse().unwrap(), 0), @@ -1068,15 +1137,18 @@ fn regression_never_idle4() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// This test reproduced an infinite loop in loss detection. @@ -1109,6 +1181,7 @@ fn regression_infinite_loop() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::MultipathOnly, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -1130,7 +1203,8 @@ fn regression_infinite_loop() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); // This bug originally occurred at exactly 4540 iterations. // At 4539 it still finishes (but fails the assertion). @@ -1139,9 +1213,11 @@ fn regression_infinite_loop() { assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); - assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) - ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } /// This test reproduced a situation in which a QNT-enabled connection sends path challenges indefinitely. @@ -1168,6 +1244,7 @@ fn regression_qnt_revalidating_path_forever() { let setup = PairSetup { seed: Seed::Zeroes, extensions: Extensions::QntAndMultipath, + mtud_enabled: true, routing_setup: RoutingSetup::SimpleSymmetric, }; let interactions = vec![ @@ -1191,13 +1268,154 @@ fn regression_qnt_revalidating_path_forever() { let _guard = subscribe(); let (mut pair, client_config) = setup.run(prefix); - let (client_ch, server_ch) = run_random_interaction(&mut pair, interactions, client_config); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( pair.client_conn_mut(client_ch) ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } +} + +#[test] +fn regression_1() { + let prefix = "regression_1"; + let setup = PairSetup { + seed: Seed::Zeroes, + extensions: Extensions::MultipathOnly, + mtud_enabled: true, + routing_setup: RoutingSetup::SimpleSymmetric, + }; + let establishment = Establishment::Full; + let interactions = vec![ + TestOp::PassiveMigration { + side: Side::Server, + addr_idx: 0, + }, + TestOp::SendDatagram { + side: Side::Client, + size: 0, + drop: false, + }, + TestOp::DriveBothToIdle, + TestOp::OpenPath { + side: Side::Client, + status: PathStatus::Available, + addr_idx: 0, + }, + ]; + + let _guard = subscribe(); + let (mut pair, client_config) = setup.run(prefix); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, establishment); + + assert!(!pair.drive_bounded(1000), "connection never became idle"); + assert!(allowed_error(poll_to_close( + pair.client_conn_mut(client_ch) + ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } +} + +#[test] +fn regression_2() { + let prefix = "regression_2"; + let setup = PairSetup { + seed: Seed::Zeroes, + extensions: Extensions::MultipathOnly, + mtud_enabled: true, + routing_setup: RoutingSetup::SimpleSymmetric, + }; + let interactions = vec![ + TestOp::SendDatagram { + side: Side::Client, + size: 0, + drop: false, + }, + TestOp::PassiveMigration { + side: Side::Server, + addr_idx: 0, + }, + TestOp::DriveBothToIdle, + TestOp::FinishConnect, + TestOp::DriveBothToIdle, + TestOp::OpenPath { + side: Side::Client, + status: PathStatus::Available, + addr_idx: 0, + }, + ]; + + let _guard = subscribe(); + let (mut pair, client_config) = setup.run(prefix); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, Establishment::Full); + + assert!(!pair.drive_bounded(1000), "connection never became idle"); assert!(allowed_error(poll_to_close( - pair.server_conn_mut(server_ch) + pair.client_conn_mut(client_ch) ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } +} + +#[test] +fn regression_3() { + let prefix = "regression_3"; + let setup = PairSetup { + seed: Seed::Zeroes, + extensions: Extensions::MultipathOnly, + mtud_enabled: true, + routing_setup: RoutingSetup::SimpleSymmetric, + }; + let establishment = Establishment::BeforeHandshake; + let interactions = vec![ + TestOp::DriveBothToIdle, + TestOp::PassiveMigration { + side: Side::Server, + addr_idx: 0, + }, + TestOp::OpenPath { + side: Side::Client, + status: PathStatus::Available, + addr_idx: 2, + }, + TestOp::PassiveMigration { + side: Side::Server, + addr_idx: 2, + }, + TestOp::DriveBothToIdle, + TestOp::OpenPath { + side: Side::Client, + status: PathStatus::Available, + addr_idx: 0, + }, + ]; + + let _guard = subscribe(); + let (mut pair, client_config) = setup.run(prefix); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, client_config, establishment); + + assert!(!pair.drive_bounded(1000), "connection never became idle"); + assert!(allowed_error(poll_to_close( + pair.client_conn_mut(client_ch) + ))); + if let Some(server_ch) = server_ch { + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); + } } diff --git a/noq-proto/src/tests/random_interaction.rs b/noq-proto/src/tests/random_interaction.rs index 8ecf34c14..08150eda8 100644 --- a/noq-proto/src/tests/random_interaction.rs +++ b/noq-proto/src/tests/random_interaction.rs @@ -1,4 +1,7 @@ -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}; +use std::{ + fmt::Debug, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, +}; use bytes::Bytes; use test_strategy::Arbitrary; @@ -13,6 +16,10 @@ use super::RoutingTable; #[derive(Debug, Clone, Copy, Arbitrary)] pub(super) enum TestOp { + /// Finish the started connection attempt by accepting it on the server side. + FinishConnect, + /// Drive both sides until the connection is idle. + DriveBothToIdle, /// Drive the endpoint on the given `side`, processing all pending I/O. Drive { side: Side }, /// Advance the simulated time forward, unless both endpoints are idle. @@ -60,6 +67,15 @@ pub(super) enum TestOp { }, /// Perform a stream-level operation on the connection belonging to `side`. StreamOp { side: Side, stream_op: StreamOp }, + /// Send a datagram. + SendDatagram { + side: Side, + #[strategy(0..2000usize)] + size: usize, + drop: bool, + }, + /// Read all datagrams on given `side`. + ReadDatagrams { side: Side }, /// Close the connection belonging to `side`. CloseConn { side: Side, @@ -96,39 +112,94 @@ pub(super) enum StreamOp { Stop(#[strategy(0..3usize)] usize, u32), } +/// The type of connection establishment to generate. +#[derive(Debug, Clone, Copy, Arbitrary)] +pub(super) enum Establishment { + /// Fully establish the connection before test operations. + Full, + /// Start running test operations before the handshake is finished. + BeforeHandshake, +} + pub(super) struct State { send_streams: Vec, recv_streams: Vec, - handle: ConnectionHandle, + handle: Option, side: Side, } +/// Possible reasons a [`TestOp`] might not apply cleanly. +#[derive(Debug)] +pub(super) enum Error { + /// Running the operation has no effect. + /// + /// E.g. the operation requires some stream to be active or some path to be opened, + /// which isn't opened at this point in time, and thus the operation is skipped. + NoEffect, + /// Running the operation returned an error. + /// + /// E.g. we attempted to run multipath operations before we negotiated the multipath extension. + ApiError(Box), +} + +impl Error { + fn api(api_error: impl Debug + 'static) -> Self { + Self::ApiError(Box::new(api_error)) + } +} + impl TestOp { - fn run(self, pair: &mut Pair, client: &mut State, server: &mut State) -> Option<()> { + fn run(self, pair: &mut Pair, client: &mut State, server: &mut State) -> Result<(), Error> { let now = pair.time; match self { + Self::FinishConnect => { + let accept = pair + .server + .accepted + .take() + .ok_or(Error::NoEffect)? + .map_err(Error::api)?; + server.handle = Some(accept); + } + Self::DriveBothToIdle => { + if pair.drive_bounded(100) { + error!("DriveBothToIdle exceeded 100 steps"); + } + } Self::Drive { side: Side::Client } => pair.drive_client(), Self::Drive { side: Side::Server } => pair.drive_server(), Self::AdvanceTime => { + let before = pair.time; // If we advance during idle, we just immediately hit the idle timeout if !pair.client.is_idle() || !pair.server.is_idle() { pair.advance_time(); } + if before == pair.time { + return Err(Error::NoEffect); + } } Self::DropInbound { side: Side::Client } => { - debug!(len = pair.client.inbound.len(), "dropping inbound"); + let len = pair.client.inbound.len(); + debug!(len, "dropping inbound"); pair.client.inbound.clear(); + if len == 0 { + return Err(Error::NoEffect); + } } Self::DropInbound { side: Side::Server } => { - debug!(len = pair.server.inbound.len(), "dropping inbound"); + let len = pair.server.inbound.len(); + debug!(len, "dropping inbound"); pair.server.inbound.clear(); + if len == 0 { + return Err(Error::NoEffect); + } } Self::ReorderInbound { side: Side::Client } => { - let item = pair.client.inbound.pop_front()?; + let item = pair.client.inbound.pop_front().ok_or(Error::NoEffect)?; pair.client.inbound.push_back(item); } Self::ReorderInbound { side: Side::Server } => { - let item = pair.server.inbound.pop_front()?; + let item = pair.server.inbound.pop_front().ok_or(Error::NoEffect)?; pair.server.inbound.push_back(item); } Self::ForceKeyUpdate { side: Side::Client } => client.conn(pair)?.force_key_update(), @@ -137,14 +208,18 @@ impl TestOp { side: Side::Client, addr_idx, } => { - let routes = pair.downcast_routes_mut::()?; + let routes = pair + .downcast_routes_mut::() + .ok_or(Error::NoEffect)?; routes.sim_client_migration(addr_idx, inc_last_addr_octet); } Self::PassiveMigration { side: Side::Server, addr_idx, } => { - let routes = pair.downcast_routes_mut::()?; + let routes = pair + .downcast_routes_mut::() + .ok_or(Error::NoEffect)?; routes.sim_server_migration(addr_idx, inc_last_addr_octet); } Self::OpenPath { @@ -152,10 +227,12 @@ impl TestOp { status, addr_idx, } => { - let routes = pair.downcast_routes_ref::()?; + let routes = pair + .downcast_routes_ref::() + .ok_or(Error::NoEffect)?; let remote = match side { - Side::Client => routes.server_addr(addr_idx)?, - Side::Server => routes.client_addr(addr_idx)?, + Side::Client => routes.server_addr(addr_idx).ok_or(Error::NoEffect)?, + Side::Server => routes.client_addr(addr_idx).ok_or(Error::NoEffect)?, }; let state = match side { Side::Client => client, @@ -167,8 +244,7 @@ impl TestOp { local_ip: None, }; conn.open_path(network_path, status, now) - .inspect_err(|err| error!(?err, "OpenPath failed")) - .ok(); + .map_err(Error::api)?; } Self::ClosePath { side, @@ -182,8 +258,7 @@ impl TestOp { let conn = state.conn(pair)?; let path_id = get_path_id(conn, path_idx)?; conn.close_path(now, path_id, error_code.into()) - .inspect_err(|err| error!(?err, "ClosePath failed")) - .ok(); + .map_err(Error::api)?; } Self::PathSetStatus { side, @@ -196,16 +271,35 @@ impl TestOp { }; let conn = state.conn(pair)?; let path_id = get_path_id(conn, path_idx)?; - conn.set_path_status(path_id, status) - .inspect_err(|err| error!(?err, "PathSetStatus failed")) - .ok(); + conn.set_path_status(path_id, status).map_err(Error::api)?; } Self::StreamOp { side, stream_op } => { let state = match side { Side::Client => client, Side::Server => server, }; - stream_op.run(pair, state); + stream_op.run(pair, state)?; + } + Self::SendDatagram { side, size, drop } => { + let state = match side { + Side::Client => client, + Side::Server => server, + }; + let data = vec![42u8; size]; + state + .conn(pair)? + .datagrams() + .send(data.into(), drop) + .map_err(Error::api)?; + } + Self::ReadDatagrams { side } => { + let state = match side { + Side::Client => client, + Side::Server => server, + }; + while let Some(data) = state.conn(pair)?.datagrams().recv() { + trace!(len = data.len(), "ReadDatagrams read a datagram"); + } } Self::CloseConn { side, error_code } => { let state = match side { @@ -216,10 +310,12 @@ impl TestOp { conn.close(now, error_code.into(), Bytes::new()); } Self::AddHpAddr { side, addr_idx } => { - let routes = pair.downcast_routes_ref::()?; + let routes = pair + .downcast_routes_ref::() + .ok_or(Error::NoEffect)?; let address = match side { - Side::Client => routes.client_addr(addr_idx)?, - Side::Server => routes.server_addr(addr_idx)?, + Side::Client => routes.client_addr(addr_idx).ok_or(Error::NoEffect)?, + Side::Server => routes.server_addr(addr_idx).ok_or(Error::NoEffect)?, }; let state = match side { Side::Client => client, @@ -227,8 +323,7 @@ impl TestOp { }; let conn = state.conn(pair)?; conn.add_nat_traversal_address(address) - .inspect_err(|err| error!(?err, "AddHpAddr failed")) - .ok(); + .map_err(Error::api)?; } Self::InitiateHpRound { side } => { let state = match side { @@ -236,60 +331,67 @@ impl TestOp { Side::Server => server, }; let conn = state.conn(pair)?; - let addrs = conn - .initiate_nat_traversal_round(now) - .inspect_err(|err| error!(?err, "InitiateHpRound failed")) - .ok()?; + let addrs = conn.initiate_nat_traversal_round(now).map_err(Error::api)?; trace!(?addrs, "initiating NAT Traversal"); } } - Some(()) + Ok(()) } } impl StreamOp { - fn run(self, pair: &mut Pair, state: &mut State) -> Option<()> { + fn run(self, pair: &mut Pair, state: &mut State) -> Result<(), Error> { let conn = state.conn(pair)?; // We generally ignore application-level errors. It's legal to call these APIs, so we do. We don't expect them to work all the time. match self { Self::Open(kind) => state.send_streams.extend(conn.streams().open(kind)), Self::Send { stream, num_bytes } => { - let stream_id = state.send_streams.get(stream)?; + let stream_id = state.send_streams.get(stream).ok_or(Error::NoEffect)?; let data = vec![0; num_bytes]; - let bytes = conn.send_stream(*stream_id).write(&data).ok()?; + let bytes = conn + .send_stream(*stream_id) + .write(&data) + .map_err(Error::api)?; trace!(attempted_write = %num_bytes, actually_written = %bytes, "random interaction: Wrote stream bytes"); } Self::Finish(stream) => { - let stream_id = state.send_streams.get(stream)?; - conn.send_stream(*stream_id).finish().ok(); + let stream_id = state.send_streams.get(stream).ok_or(Error::NoEffect)?; + conn.send_stream(*stream_id).finish().map_err(Error::api)?; } Self::Reset(stream, code) => { - let stream_id = state.send_streams.get(stream)?; - conn.send_stream(*stream_id).reset(code.into()).ok(); + let stream_id = state.send_streams.get(stream).ok_or(Error::NoEffect)?; + conn.send_stream(*stream_id) + .reset(code.into()) + .map_err(Error::api)?; } Self::Accept(kind) => state.recv_streams.extend(conn.streams().accept(kind)), Self::Receive(stream, ordered) => { - let stream_id = state.recv_streams.get(stream)?; + let stream_id = state.recv_streams.get(stream).ok_or(Error::NoEffect)?; let mut recv_stream = conn.recv_stream(*stream_id); - let mut chunks = recv_stream.read(ordered).ok()?; - let chunk = chunks.next(usize::MAX).ok()??; + let mut chunks = recv_stream.read(ordered).map_err(Error::api)?; + let chunk = chunks + .next(usize::MAX) + .map_err(Error::api)? + .ok_or(Error::NoEffect)?; trace!(chunk_len = %chunk.bytes.len(), offset = %chunk.offset, "read from stream"); } Self::Stop(stream, code) => { - let stream_id = state.recv_streams.get(stream)?; - conn.recv_stream(*stream_id).stop(code.into()).ok(); + let stream_id = state.recv_streams.get(stream).ok_or(Error::NoEffect)?; + conn.recv_stream(*stream_id) + .stop(code.into()) + .map_err(Error::api)?; } }; - Some(()) + Ok(()) } } impl State { - fn new(side: Side, handle: ConnectionHandle) -> Self { + fn new(side: Side) -> Self { Self { send_streams: Vec::new(), recv_streams: Vec::new(), - handle, + handle: None, side, } } @@ -301,16 +403,20 @@ impl State { } } - fn conn<'a>(&self, pair: &'a mut Pair) -> Option<&'a mut Connection> { - self.endpoint(pair).connections.get_mut(&self.handle) + fn conn<'a>(&self, pair: &'a mut Pair) -> Result<&'a mut Connection, Error> { + self.endpoint(pair) + .connections + .get_mut(&self.handle.ok_or(Error::NoEffect)?) + .ok_or(Error::NoEffect) } } -fn get_path_id(conn: &mut Connection, idx: usize) -> Option { +fn get_path_id(conn: &mut Connection, idx: usize) -> Result { let paths = conn.paths(); paths .get(idx.clamp(0, paths.len().saturating_sub(1))) .copied() + .ok_or(Error::NoEffect) } fn inc_last_addr_octet(addr: SocketAddr) -> SocketAddr { @@ -334,16 +440,39 @@ pub(super) fn run_random_interaction( pair: &mut Pair, interactions: Vec, client_config: ClientConfig, -) -> (ConnectionHandle, ConnectionHandle) { - let (client_ch, server_ch) = pair.connect_with(client_config); - pair.drive(); // finish establishing the connection; - info!("INTERACTION SETUP FINISHED"); - let mut client = State::new(Side::Client, client_ch); - let mut server = State::new(Side::Server, server_ch); + establishment: Establishment, +) -> (ConnectionHandle, Option) { + let mut client = State::new(Side::Client); + let mut server = State::new(Side::Server); + + let client_handle = pair.begin_connect(client_config); + client.handle = Some(client_handle); + + if matches!(establishment, Establishment::Full) { + pair.drive(); + TestOp::FinishConnect + .run(pair, &mut client, &mut server) + .expect("server experienced error connecting"); + pair.drive(); + } + + info!(?establishment, "INTERACTION SETUP COMPLETE"); for interaction in interactions { info!(?interaction, "INTERACTION STEP"); - interaction.run(pair, &mut client, &mut server); + match interaction.run(pair, &mut client, &mut server) { + Ok(()) => {} + Err(Error::NoEffect) => { + info!( + ?interaction, + "interaction step skipped due to invalid state" + ); + } + Err(Error::ApiError(err)) => { + error!(?interaction, ?err, "interaction step produced an API error"); + } + } } - (client.handle, server.handle) + + (client_handle, server.handle) } diff --git a/noq-proto/src/tests/util.rs b/noq-proto/src/tests/util.rs index b36514e64..927657a74 100644 --- a/noq-proto/src/tests/util.rs +++ b/noq-proto/src/tests/util.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::{HashMap, HashSet, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, io::{self, Write}, mem, net::{Ipv6Addr, SocketAddr}, @@ -21,7 +21,7 @@ use tracing::{debug, info_span, trace}; use super::crypto::rustls::{QuicClientConfig, QuicServerConfig, configured_provider}; use super::*; -use crate::{Duration, Instant, congestion::Controller}; +use crate::{Duration, Instant, congestion::Controller, connection::timer::Timer}; pub(super) const DEFAULT_MTU: usize = 1452; @@ -152,8 +152,13 @@ impl Pair { if client_blackhole { self.client.inbound.clear(); } - if self.client.is_idle() && self.server.is_idle() { + + let client_timers = self.client.non_idle_timers().collect::>(); + let server_timers = self.server.non_idle_timers().collect::>(); + if client_timers.is_empty() && server_timers.is_empty() { return false; + } else { + trace!(?client_timers, ?server_timers, "pair not yet idle"); } self.advance_time() @@ -248,14 +253,34 @@ impl Pair { Some(t) if Some(t) == client_t => { if t != self.time { self.time = self.time.max(t); - trace!("advancing to {:?} for client", self.time - self.epoch); + let expiring_timers = self + .client + .connections + .iter() + .map(|(&handle, conn)| (handle, conn.peek_expiring_timers(self.time))) + .collect::>>(); + trace!( + ?expiring_timers, + "advancing to {:?} for client", + self.time - self.epoch + ); } true } Some(t) if Some(t) == server_t => { if t != self.time { self.time = self.time.max(t); - trace!("advancing to {:?} for server", self.time - self.epoch); + let expiring_timers = self + .server + .connections + .iter() + .map(|(&handle, conn)| (handle, conn.peek_expiring_timers(self.time))) + .collect::>>(); + trace!( + ?expiring_timers, + "advancing to {:?} for server", + self.time - self.epoch + ); } true } @@ -993,7 +1018,13 @@ impl TestEndpoint { } pub(super) fn is_idle(&self) -> bool { - self.connections.values().all(|x| x.is_idle()) + self.non_idle_timers().next().is_none() + } + + pub(super) fn non_idle_timers(&self) -> impl Iterator { + self.connections + .values() + .filter_map(|conn| conn.non_idle_timer()) } pub(super) fn delay_outbound(&mut self) {