Skip to content

ARCPClient.listJobs steals envelopes from the shared receive() flow #58

@nficano

Description

@nficano

ARCPClient.listJobs awaits its correlated reply with transport.receive().first { it.correlationId == requestId } at lib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:112. MemoryTransport.receive() returns inbound.receiveAsFlow() at lib/src/main/kotlin/dev/arcp/transport/MemoryTransport.kt:25, which is a cold flow over a Channel — a first { … } consumer pulls items off the channel and silently discards any that fail the predicate. The same client exposes the underlying flow as the public receive() API at lib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:98, and the README quickstart at README.md:72 and getting-started guides assume the consumer can collect every envelope. If a caller calls listJobs while another coroutine collects client.receive(), only one of them sees each envelope, and envelopes that arrive in the listJobs window are dropped from the user-facing flow.

Fix prompt: Replace the in-place first { … } with a single fan-out. The simplest fix is to make the client own a hot SharedFlow that mirrors transport.receive() and have both receive() and listJobs subscribe to it, so awaiting a correlated reply does not consume from a queue. An alternative is to expose listJobs only as a low-level send and require the caller to filter receive() themselves, removing the misleading convenience method. Add an integration test in tests/src/test/kotlin/dev/arcp/tests/ that submits a job, calls listJobs, and asserts the job-event flow collected concurrently still observes the JobAccepted envelope.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingseverity:highHigh severity issue

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions