fix: prevent pipe_stream stall on multi-hop PUT forwarding#3568
Conversation
When a relay peer receives a streamed PUT, it forks the inbound StreamHandle and pipes fragments to the next hop. Two bugs caused the pipe to stall after sending only the first fragment (~1.2KB): 1. fork() shared SyncState with the original handle. When the upstream PeerConnection dropped (e.g., connection pruning), it cancelled ALL streaming handles — including the pipe's fork. The pipe would hang for minutes then fail with "stream was cancelled" even though all fragments were already in the shared buffer. Fix: fork() now creates an independent SyncState. Cancelling the original handle no longer cancels the fork. 2. poll_next relied solely on SyncState wakers for notifications. With independent SyncState on forks, push_fragment() on the original handle's sync wouldn't wake the fork's wakers. Fix: poll_next now also listens on the buffer's event_listener::Event (fired by every buffer.insert()), providing a notification path that works regardless of which handle was used for push_fragment(). 3. pipe_stream had no inactivity timeout. Unlike assemble() which has STREAM_INACTIVITY_TIMEOUT (30s), the pipe would hang forever on stream.next().await if fragments stopped arriving. Fix: pipe_stream now wraps stream.next() with tokio::time::timeout using the same 30s inactivity timeout, failing gracefully with diagnostic logging instead of hanging indefinitely. Diagnosed from user report Q5WGQF: PUT operation timed out after 60s, telemetry showed pipe forwarding stalled at 1,263 bytes (one fragment) for 25-33 minutes across all downstream hops before being cancelled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
✅ Rule Review: No issues foundRules checked: No rule violations detected — merge is not blocked by this check. Spot-checks performed:
Automated review against |
- Fix TOCTOU race in poll_next: create EventListener BEFORE re-checking the buffer, following the event_listener crate's recommended pattern. Previously, a notification could fire between the buffer check and listener creation, causing a missed wakeup for forked handles. - Fix spin-loop risk: when EventListener fires, re-check buffer inline instead of wake_by_ref + Pending (which caused extra poll cycles and waker accumulation in SyncState). - Remove SyncState waker registration from poll_next entirely. The EventListener on buffer.data_available is the sole notification path. cancel() already fires buffer.notifier().notify() so cancellation wakeups still work. This eliminates unbounded waker accumulation. - Replace tokio::time::timeout with tokio::select! + time_source.sleep() in pipe_stream for DST (deterministic simulation testing) compatibility. - Add test_fork_incremental_wakeup: verifies a forked handle wakes up when fragments arrive via push_fragment on the original handle. This directly tests the EventListener notification path that was broken. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
fork() now copies the current cancelled flag from the original handle's SyncState. Previously, forking an already-cancelled handle would create a fork with cancelled=false, effectively resurrecting a dead stream and causing an avoidable 30s inactivity timeout instead of immediate failure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Handle the Poll result from EventListener properly instead of discarding it with `let _ =`. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Replace two .unwrap() calls on self.listener with if-let patterns to satisfy the code-style.md no-unwrap rule for production code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Fixed both .unwrap() warnings from rule review in ce8f897 — replaced with if-let patterns. The info-level finding about tokio::time::timeout in tests is acceptable (test-guard assertion, not simulation logic). /ack |
Problem
When a relay peer receives a streamed PUT and forwards it to the next hop via
pipe_stream, the pipe stalls after sending exactly one fragment (~1,263 bytes) and hangs for 25-33 minutes before eventually failing with "stream was cancelled". This causes all PUT operations to time out on multi-hop routes.Diagnosed from user report Q5WGQF: the PUT routing messages propagated through 10 hops in <1 second, but the streamed 253KB payload stalled at hop 3. Telemetry confirmed the first two hops transferred successfully, but all downstream hops transferred only 1,263 bytes before stalling.
User impact: PUT operations fail with timeout on any route longer than 2 hops. Contracts cannot be stored in the network.
Approach
Three independent bugs contributed to the stall:
1.
fork()shared cancellation state with the original handleStreamHandle::fork()was implemented asclone(), sharing the sameArc<RwLock<SyncState>>. When the upstreamPeerConnectiondropped (connection pruning, keepalive timeout), itsDropimpl cancelled ALL streaming handles — including the forked handle used by the pipe task. The pipe would then fail with "stream was cancelled" even though all fragments were already in the shared buffer.Fix:
fork()now creates an independentSyncStatewhile sharing the sameArc<LockFreeStreamBuffer>. Cancelling the original handle no longer cancels forks.2.
poll_nextwakeup mechanism didn't work for forked handlesWith independent
SyncState,push_fragment()on the original handle callssync.write().wake_all()— but this only wakes wakers registered on the original handle's sync, not the fork's sync. The fork'spoll_nextregistered wakers on its own sync that nobody would ever wake.Fix:
poll_nextnow also registers anevent_listener::EventListeneron the buffer'sdata_availableEvent, which is fired by everybuffer.insert()regardless of which handle calledpush_fragment(). This provides a notification path that works for all consumers (forked or not).3.
pipe_streamhad no inactivity timeoutUnlike
assemble()which hasSTREAM_INACTIVITY_TIMEOUT(30s), the pipe task would hang forever onstream.next().awaitif fragments stopped arriving. Any wakeup failure meant a permanent stall instead of a bounded failure.Fix:
pipe_streamnow wrapsstream.next()withtokio::time::timeout(STREAM_INACTIVITY_TIMEOUT), failing gracefully with diagnostic logging instead of hanging indefinitely.Testing
test_fork_independent_cancellation: Verifies cancelling the original handle does NOT cancel the fork, and the fork can still read buffered datatest_fork_stream_reads_after_original_cancel: End-to-end test that a forked stream can read all fragments after the original handle is cancelledWhy didn't CI catch this?
The streaming tests use a single handle or clone() (which shares SyncState). The fork+cancel interaction only manifests when:
This combination doesn't occur in unit tests — it requires multi-hop streaming with realistic timing. The two new tests close this gap.
[AI-assisted - Claude]