Skip to content

ReplayingPublisher.subscribe can deliver onNext before downstream.onSubscribe completes, violating Reactive Streams §1.9 #43

@nficano

Description

@nficano

In arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java the subscribe method around line 54 calls live.subscribe(forwarder) inside the publisher-lock region, then exits the lock, then calls downstream.onSubscribe(...) on line 99, and only then iterates the snapshot to push replayed items. The live SubmissionPublisher's forwarder is wired up before downstream has been told about its subscription. Once a live submit fires, the forwarder's onNext immediately invokes downstream.onNext(item) — which can run on the SubmissionPublisher's virtual-thread executor before the main thread reaches the downstream.onSubscribe call below. That is Reactive Streams rule 1.9 ("onSubscribe MUST be signaled before any other signal") violated. The same race lets onComplete and onError reach downstream before onSubscribe.\n\nA secondary problem in the same class is that the subscribe handler ignores backpressure entirely: the Subscription's request(long n) implementation is a no-op, and the snapshot is pushed eagerly via downstream.onNext regardless of demand. For pre-attached subscribers with bounded buffers this can overrun.\n\nThe immediate Reactive Streams violation matters because clients consuming JobHandle.events() through anything that strict-checks the protocol (Reactor, Mutiny, RxJava-Flow adapters) can crash or buffer indefinitely.\n\nFix prompt: In arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java rewrite subscribe so the sequencing becomes (1) snapshot the buffer and wasClosed under the lock without attaching to live, (2) call downstream.onSubscribe(subscription) outside the lock, (3) replay the snapshot, (4) re-enter the lock and attach the forwarder to live. To prevent gap between (1) and (4), keep a per-subscriber sequence counter or buffer late-arriving live items in the forwarder until the replay completes — flip an internal "replay done" flag at the end of step (3) and have the forwarder only call downstream.onNext after that flag is set, queuing pending items into a Deque guarded by the same lock. While there, replace the no-op request(n) with a real demand counter that the snapshot replay and forwarder both consult before delivering, so the publisher honors downstream backpressure. Add a JUnit test using TCK-style subscriber probes that asserts onSubscribe is the first signal and that delivery never exceeds requested demand.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingseverity:highHigh severity

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions