ARCPClient.listJobs awaits its correlated reply with transport.receive().first { it.correlationId == requestId } at lib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:112. MemoryTransport.receive() returns inbound.receiveAsFlow() at lib/src/main/kotlin/dev/arcp/transport/MemoryTransport.kt:25, which is a cold flow over a Channel — a first { … } consumer pulls items off the channel and silently discards any that fail the predicate. The same client exposes the underlying flow as the public receive() API at lib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:98, and the README quickstart at README.md:72 and getting-started guides assume the consumer can collect every envelope. If a caller calls listJobs while another coroutine collects client.receive(), only one of them sees each envelope, and envelopes that arrive in the listJobs window are dropped from the user-facing flow.
Fix prompt: Replace the in-place first { … } with a single fan-out. The simplest fix is to make the client own a hot SharedFlow that mirrors transport.receive() and have both receive() and listJobs subscribe to it, so awaiting a correlated reply does not consume from a queue. An alternative is to expose listJobs only as a low-level send and require the caller to filter receive() themselves, removing the misleading convenience method. Add an integration test in tests/src/test/kotlin/dev/arcp/tests/ that submits a job, calls listJobs, and asserts the job-event flow collected concurrently still observes the JobAccepted envelope.
ARCPClient.listJobsawaits its correlated reply withtransport.receive().first { it.correlationId == requestId }atlib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:112.MemoryTransport.receive()returnsinbound.receiveAsFlow()atlib/src/main/kotlin/dev/arcp/transport/MemoryTransport.kt:25, which is a cold flow over aChannel— afirst { … }consumer pulls items off the channel and silently discards any that fail the predicate. The same client exposes the underlying flow as the publicreceive()API atlib/src/main/kotlin/dev/arcp/client/ARCPClient.kt:98, and the README quickstart atREADME.md:72and getting-started guides assume the consumer can collect every envelope. If a caller callslistJobswhile another coroutine collectsclient.receive(), only one of them sees each envelope, and envelopes that arrive in the listJobs window are dropped from the user-facing flow.Fix prompt: Replace the in-place
first { … }with a single fan-out. The simplest fix is to make the client own a hotSharedFlowthat mirrorstransport.receive()and have bothreceive()andlistJobssubscribe to it, so awaiting a correlated reply does not consume from a queue. An alternative is to exposelistJobsonly as a low-level send and require the caller to filterreceive()themselves, removing the misleading convenience method. Add an integration test intests/src/test/kotlin/dev/arcp/tests/that submits a job, callslistJobs, and asserts the job-event flow collected concurrently still observes theJobAcceptedenvelope.