Skip to content

fix: prevent pipe_stream stall on multi-hop PUT forwarding#3568

Merged
sanity merged 5 commits intomainfrom
fix-pipe-stream-stall
Mar 16, 2026
Merged

fix: prevent pipe_stream stall on multi-hop PUT forwarding#3568
sanity merged 5 commits intomainfrom
fix-pipe-stream-stall

Conversation

@sanity
Copy link
Copy Markdown
Collaborator

@sanity sanity commented Mar 16, 2026

Problem

When a relay peer receives a streamed PUT and forwards it to the next hop via pipe_stream, the pipe stalls after sending exactly one fragment (~1,263 bytes) and hangs for 25-33 minutes before eventually failing with "stream was cancelled". This causes all PUT operations to time out on multi-hop routes.

Diagnosed from user report Q5WGQF: the PUT routing messages propagated through 10 hops in <1 second, but the streamed 253KB payload stalled at hop 3. Telemetry confirmed the first two hops transferred successfully, but all downstream hops transferred only 1,263 bytes before stalling.

User impact: PUT operations fail with timeout on any route longer than 2 hops. Contracts cannot be stored in the network.

Approach

Three independent bugs contributed to the stall:

1. fork() shared cancellation state with the original handle

StreamHandle::fork() was implemented as clone(), sharing the same Arc<RwLock<SyncState>>. When the upstream PeerConnection dropped (connection pruning, keepalive timeout), its Drop impl cancelled ALL streaming handles — including the forked handle used by the pipe task. The pipe would then fail with "stream was cancelled" even though all fragments were already in the shared buffer.

Fix: fork() now creates an independent SyncState while sharing the same Arc<LockFreeStreamBuffer>. Cancelling the original handle no longer cancels forks.

2. poll_next wakeup mechanism didn't work for forked handles

With independent SyncState, push_fragment() on the original handle calls sync.write().wake_all() — but this only wakes wakers registered on the original handle's sync, not the fork's sync. The fork's poll_next registered wakers on its own sync that nobody would ever wake.

Fix: poll_next now also registers an event_listener::EventListener on the buffer's data_available Event, which is fired by every buffer.insert() regardless of which handle called push_fragment(). This provides a notification path that works for all consumers (forked or not).

3. pipe_stream had no inactivity timeout

Unlike assemble() which has STREAM_INACTIVITY_TIMEOUT (30s), the pipe task would hang forever on stream.next().await if fragments stopped arriving. Any wakeup failure meant a permanent stall instead of a bounded failure.

Fix: pipe_stream now wraps stream.next() with tokio::time::timeout(STREAM_INACTIVITY_TIMEOUT), failing gracefully with diagnostic logging instead of hanging indefinitely.

Testing

  • test_fork_independent_cancellation: Verifies cancelling the original handle does NOT cancel the fork, and the fork can still read buffered data
  • test_fork_stream_reads_after_original_cancel: End-to-end test that a forked stream can read all fragments after the original handle is cancelled
  • All 2038 existing tests pass (2 new tests added)

Why didn't CI catch this?

The streaming tests use a single handle or clone() (which shares SyncState). The fork+cancel interaction only manifests when:

  1. A forked handle is used by a pipe task
  2. The original handle is cancelled (by PeerConnection drop) while the pipe is still reading
  3. The pipe's fragments arrive gradually (not all at once)

This combination doesn't occur in unit tests — it requires multi-hop streaming with realistic timing. The two new tests close this gap.

[AI-assisted - Claude]

When a relay peer receives a streamed PUT, it forks the inbound
StreamHandle and pipes fragments to the next hop. Two bugs caused
the pipe to stall after sending only the first fragment (~1.2KB):

1. fork() shared SyncState with the original handle. When the upstream
   PeerConnection dropped (e.g., connection pruning), it cancelled ALL
   streaming handles — including the pipe's fork. The pipe would hang
   for minutes then fail with "stream was cancelled" even though all
   fragments were already in the shared buffer.

   Fix: fork() now creates an independent SyncState. Cancelling the
   original handle no longer cancels the fork.

2. poll_next relied solely on SyncState wakers for notifications. With
   independent SyncState on forks, push_fragment() on the original
   handle's sync wouldn't wake the fork's wakers.

   Fix: poll_next now also listens on the buffer's event_listener::Event
   (fired by every buffer.insert()), providing a notification path that
   works regardless of which handle was used for push_fragment().

3. pipe_stream had no inactivity timeout. Unlike assemble() which has
   STREAM_INACTIVITY_TIMEOUT (30s), the pipe would hang forever on
   stream.next().await if fragments stopped arriving.

   Fix: pipe_stream now wraps stream.next() with tokio::time::timeout
   using the same 30s inactivity timeout, failing gracefully with
   diagnostic logging instead of hanging indefinitely.

Diagnosed from user report Q5WGQF: PUT operation timed out after 60s,
telemetry showed pipe forwarding stalled at 1,263 bytes (one fragment)
for 25-33 minutes across all downstream hops before being cancelled.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 16, 2026

✅ Rule Review: No issues found

Rules checked: transport.md, code-style.md, testing.md, git-workflow.md
Files reviewed: 2 (outbound_stream.rs, streaming.rs)

No rule violations detected — merge is not blocked by this check.


Spot-checks performed:

  • DST time usage (outbound_stream.rs:376–400): New tokio::select! arm uses time_source.sleep(inactivity_timeout), not tokio::time::sleep() or std::time::Instant. The comment explicitly calls this out. ✓
  • No .unwrap() in production code: poll_next avoids it throughout; the comment at the spurious-notification path explicitly notes "we use if-let to satisfy the no-unwrap rule." ✓
  • tokio::time::timeout in tests (streaming.rs:808): Appears only in #[tokio::test] (not a DST/simulation test) as a hang-guard. Acceptable per testing rules. ✓
  • tokio::spawn in test (streaming.rs:800): Fire-and-forget inside a unit test only. Not production code. ✓
  • No biased; in the new tokio::select! in outbound_stream.rs. ✓
  • No deleted/commented-out tests. Three new tests added, all correctly structured. ✓
  • fork() independence: New implementation creates a fresh Arc<RwLock<SyncState>>, preserving cancelled state but decoupling future cancellations. Matches the documented invariant. ✓
  • event_listener TOCTOU pattern: Listener is created before the re-check, matching the correct usage pattern described in the inline comments. ✓

Automated review against .claude/rules/. Critical and Warning findings block merge — check boxes or post /ack to acknowledge.

sanity and others added 2 commits March 15, 2026 23:43
- Fix TOCTOU race in poll_next: create EventListener BEFORE re-checking
  the buffer, following the event_listener crate's recommended pattern.
  Previously, a notification could fire between the buffer check and
  listener creation, causing a missed wakeup for forked handles.

- Fix spin-loop risk: when EventListener fires, re-check buffer inline
  instead of wake_by_ref + Pending (which caused extra poll cycles and
  waker accumulation in SyncState).

- Remove SyncState waker registration from poll_next entirely. The
  EventListener on buffer.data_available is the sole notification path.
  cancel() already fires buffer.notifier().notify() so cancellation
  wakeups still work. This eliminates unbounded waker accumulation.

- Replace tokio::time::timeout with tokio::select! + time_source.sleep()
  in pipe_stream for DST (deterministic simulation testing) compatibility.

- Add test_fork_incremental_wakeup: verifies a forked handle wakes up
  when fragments arrive via push_fragment on the original handle. This
  directly tests the EventListener notification path that was broken.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
fork() now copies the current cancelled flag from the original handle's
SyncState. Previously, forking an already-cancelled handle would create
a fork with cancelled=false, effectively resurrecting a dead stream and
causing an avoidable 30s inactivity timeout instead of immediate failure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@sanity sanity enabled auto-merge March 16, 2026 04:52
@github-actions
Copy link
Copy Markdown
Contributor

⚠️ Performance Benchmark Regressions Detected

Found 1 benchmark(s) with performance regressions:

  • level0/crypto/nonce/counter: +3.5271%

⚠️ Important: This may be a false positive!

Common causes of false positives:

  1. Stale baseline: If recent PRs improved performance on main, this PR (which doesn't include those changes) will show as "regressed" when compared to the new baseline
  2. GitHub runner variance: Benchmarks run on shared ubuntu-latest runners with variable CPU contention
  3. Old baseline: The baseline might be from an older main commit if the cache restore used restore-keys fallback

To verify if this is a real regression:

  1. Check if recent commits on main touched transport or benchmark code
  2. Merge main into your branch and re-run benchmarks
  3. Review the baseline age in the "Download main branch baseline" step

This is informational only and does not block the PR.

View full benchmark summary

Handle the Poll result from EventListener properly instead of
discarding it with `let _ =`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

⚠️ Extended Benchmark Regressions Detected

Some extended benchmarks show performance regressions.

Note: Extended benchmarks include high-latency and packet-loss scenarios
which have higher variance than standard CI benchmarks.

View full results

View full benchmark summary

Replace two .unwrap() calls on self.listener with if-let patterns
to satisfy the code-style.md no-unwrap rule for production code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@sanity
Copy link
Copy Markdown
Collaborator Author

sanity commented Mar 16, 2026

Fixed both .unwrap() warnings from rule review in ce8f897 — replaced with if-let patterns. The info-level finding about tokio::time::timeout in tests is acceptable (test-guard assertion, not simulation logic).

/ack

@sanity sanity added this pull request to the merge queue Mar 16, 2026
Merged via the queue into main with commit 431c853 Mar 16, 2026
15 checks passed
@sanity sanity deleted the fix-pipe-stream-stall branch March 16, 2026 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants