Skip to content

Commit a750b8f

Browse files
nficanoclaude
andcommitted
fix: batch of correctness, performance, and API fixes
Addresses these open issues with the noted change: - #23 sessions map key stability: key sessions by stable pendingKey() so removeSession() finds the entry after handshake flips the id. - #24 concurrent submit correlation: pendingSubmits is now a FIFO Deque keyed by request order; submitLock serializes put-and-send. - #25 transport failAll path: failAll() drains every outstanding future and live subscription on transport error, unexpected onComplete, and heartbeat loss. - #26 list_jobs pagination: server respects req.limit() and req.cursor() with stable ordering by (createdAt, jobId); cursor is opaque url-safe base64. - #27 canonical idempotency fingerprint: SHA-256 over canonical JSON of (agent, input, lease_request, lease_constraints, max_runtime_sec) replaces the 32-bit hashCode payload hash. - #28 SubmissionPublisher executor lifecycle: MemoryTransport, WebSocketTransport, ReplayingPublisher, and ArcpClient.subscribe now own their virtual-thread executors and shut them down on close. - #29 close() respects external executor ownership: ArcpRuntime and ArcpClient track owned vs externally supplied scheduler/workerPool and only shut down the ones they created. - #30 empty feature set: safeFeatureCopy(Set.of()) returns an empty EnumSet instead of throwing IllegalArgumentException. - #31 advertised exposes mutable state: ArcpRuntime.advertised is wrapped in Collections.unmodifiableSet at construction. - #32 Session/Page mutable aliases: compact constructors defensively copy collection components into immutable views. - #34 partial Javadoc on named public APIs. - #35 Capabilities Javadoc now accurately documents drop behavior. - #36 CONFORMANCE.md stdio + version 1.1 entries refreshed. - #37 BudgetCounters Javadoc corrected to describe void semantics. - #38 WebSocket middleware send synchronization: writeLock added to Spring, Jakarta, Vert.x, and Jetty WebSocket transports. - #42 WebSocketTransport pre-attach frame race: listener captures transport directly and buffers inbound frames; send() gates on socket attachment. - #43 ReplayingPublisher Reactive Streams §1.9: onSubscribe is the first downstream signal; live items queued during replay. - #46 LeaseGuard regex cache: per-guard ConcurrentHashMap caches compiled glob Patterns; matchesCached used on the hot path. - #47 JobRecord credentials atomic ops: synchronized lock around credentials list; drainCredentials, replaceCredential, and setCredentials are atomic. - #48 BearerVerifier constant-time compare + SHA-256 acceptAny. - #50 SessionLoop shutdown atomic: phase is now AtomicReference; only the first caller runs cleanup. - #51 ArcpOtel guards extensions cast: returns envelope unchanged when extensions exists but is not an object. - #52 ArcpMapper.shared Javadoc revised to document mutable contract. - #53 fromWire lookup maps: O(1) BY_WIRE maps replace stream walks on Message.Type, EventBody.Kind, and Feature. - #55 IdempotencyStore scheduled prune: prune moved off the claim hot path; runs every minute under the runtime's scheduler. - #56 FileCredentialRevocationStore durable journal: long-lived RandomAccessFile in rwd mode with explicit FD sync per append. - #57 ArcpClient.listJobs typed exceptions: declared InterruptedException, TimeoutException, ArcpException; restores interrupt flag. - #59 ArcpClient.subscribe idempotent: JOB_SUBSCRIBE sent only on publisher insert; unsubscribe() closes local publisher and notifies the runtime. Server-side handleSubscribe is idempotent. - #60 CredentialBinding retry/backoff: narrowed catch, log full cause, exponential sleep between attempts, markRevocationFailed. - #61 JobRecord.subscribers encapsulation: unmodifiable view + addSubscriber/removeSubscribersWhere mutators. - #63 MemoryTransport.pair record: typed Pair(runtime, client) replaces the array return; all callers updated. - #64 WebSocketTransport HttpClient close: HttpClient stored as field and closed in close() to release selector threads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d911452 commit a750b8f

58 files changed

Lines changed: 954 additions & 308 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CONFORMANCE.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ works" is decomposed into multiple binary rows.
139139

140140
- HTTP/2 + QUIC transports
141141
- mTLS / OAuth2 auth schemes
142-
- stdio newline-delimited JSON transport (`MemoryTransport` covers in-process use)
143142
- §15.6 trust elevation
144143
- Quarkus and Helidon middleware (Phase 5 deferred them; `arcp-runtime-jetty`,
145144
`arcp-middleware-jakarta`, `arcp-middleware-spring-boot`, and

arcp-client/src/main/java/dev/arcp/client/ArcpClient.java

Lines changed: 174 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.Set;
5050
import java.util.concurrent.CompletableFuture;
5151
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.ConcurrentLinkedDeque;
53+
import java.util.concurrent.ExecutorService;
5254
import java.util.concurrent.Executors;
5355
import java.util.concurrent.Flow;
5456
import java.util.concurrent.ScheduledExecutorService;
@@ -69,6 +71,13 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
6971

7072
private static final Logger log = LoggerFactory.getLogger(ArcpClient.class);
7173

74+
static EnumSet<Feature> safeFeatureCopy(Set<Feature> features) {
75+
if (features == null || features.isEmpty()) {
76+
return EnumSet.noneOf(Feature.class);
77+
}
78+
return EnumSet.copyOf(features);
79+
}
80+
7281
private final Transport transport;
7382
private final ObjectMapper mapper;
7483
private final ClientInfo info;
@@ -88,6 +97,11 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
8897
private @Nullable ScheduledFuture<?> heartbeatWatchdog;
8998
private final ConcurrentHashMap<JobId, SubmissionPublisher<EventBody>> liveSubscribers =
9099
new ConcurrentHashMap<>();
100+
private final ConcurrentHashMap<JobId, ExecutorService> liveExecutors = new ConcurrentHashMap<>();
101+
private final boolean ownedScheduler;
102+
// Reserved for future use by tests asserting FIFO insertion order of pending submits.
103+
@SuppressWarnings("unused")
104+
private final ConcurrentLinkedDeque<MessageId> pendingSubmitOrder = new ConcurrentLinkedDeque<>();
91105

92106
@SuppressWarnings("unused")
93107
private Flow.@Nullable Subscription subscription;
@@ -103,16 +117,20 @@ private ArcpClient(Builder b) {
103117
this.mapper = b.mapper != null ? b.mapper : ArcpMapper.shared();
104118
this.info = b.info;
105119
this.auth = b.auth;
106-
this.requestedFeatures = EnumSet.copyOf(b.features);
120+
this.requestedFeatures = safeFeatureCopy(b.features);
107121
this.autoAck = b.autoAck;
108122
this.ackInterval = b.ackInterval;
109-
this.scheduler =
110-
b.scheduler != null
111-
? b.scheduler
112-
: Executors.newScheduledThreadPool(
113-
1,
114-
r ->
115-
Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
123+
if (b.scheduler != null) {
124+
this.scheduler = b.scheduler;
125+
this.ownedScheduler = false;
126+
} else {
127+
this.scheduler =
128+
Executors.newScheduledThreadPool(
129+
1,
130+
r ->
131+
Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
132+
this.ownedScheduler = true;
133+
}
116134
this.resumeToken = b.resumeToken;
117135
this.lastEventSeq = b.lastEventSeq;
118136
}
@@ -130,11 +148,16 @@ public CompletableFuture<Session> connect() {
130148
return sessionFuture;
131149
}
132150

133-
public Session connect(Duration timeout) throws InterruptedException, TimeoutException {
151+
public Session connect(Duration timeout)
152+
throws InterruptedException, TimeoutException, ArcpException {
134153
try {
135154
return connect().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
136155
} catch (java.util.concurrent.ExecutionException e) {
137-
throw new IllegalStateException("connect failed", e.getCause());
156+
Throwable cause = e.getCause() != null ? e.getCause() : e;
157+
if (cause instanceof ArcpException ax) {
158+
throw ax;
159+
}
160+
throw new IllegalStateException("connect failed", cause);
138161
}
139162
}
140163

@@ -144,40 +167,108 @@ public JobHandle submit(JobSubmit submit) {
144167

145168
public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
146169
Outstanding o = new Outstanding();
147-
// Pre-register the outstanding job under a request-id keyed slot so that
148-
// when JobAccepted arrives we associate the job_id.
149170
MessageId requestId = MessageId.generate();
150-
pendingSubmits.put(requestId, o);
151-
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
171+
// The put-then-send pair must be atomic w.r.t. other submits so that the
172+
// FIFO order of pendingSubmits matches the wire order observed by the
173+
// runtime; that ordering is how handleAccepted correlates JobAccepted
174+
// back to the right pending submit (the runtime does not echo our
175+
// request id on job.accepted).
176+
submitLock.lock();
177+
try {
178+
pendingSubmits.add(new PendingSubmit(requestId, o));
179+
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
180+
} finally {
181+
submitLock.unlock();
182+
}
152183
return o.handleFuture.join();
153184
}
154185

155-
public Page<JobSummary> listJobs(@Nullable JobFilter filter) {
156-
SessionListJobs req = new SessionListJobs(filter, null, null);
186+
public Page<JobSummary> listJobs(@Nullable JobFilter filter)
187+
throws InterruptedException, TimeoutException, ArcpException {
188+
return listJobs(filter, null, null);
189+
}
190+
191+
/**
192+
* List jobs with optional pagination. Supply {@code cursor} from the previous {@link Page} to
193+
* continue, or {@code null} to fetch the first page. {@code limit} caps the page size.
194+
*/
195+
public Page<JobSummary> listJobs(
196+
@Nullable JobFilter filter, @Nullable Integer limit, @Nullable String cursor)
197+
throws InterruptedException, TimeoutException, ArcpException {
198+
SessionListJobs req = new SessionListJobs(filter, limit, cursor);
157199
MessageId reqId = MessageId.generate();
158200
CompletableFuture<SessionJobs> fut = new CompletableFuture<>();
159201
listRequests.put(reqId, fut);
160202
send(Message.Type.SESSION_LIST_JOBS, req, sessionId, null, null, null, reqId);
161203
try {
162204
SessionJobs response = fut.get(10, TimeUnit.SECONDS);
163205
return new Page<>(response.jobs(), response.nextCursor());
164-
} catch (java.util.concurrent.ExecutionException | InterruptedException | TimeoutException e) {
165-
throw new RuntimeException("list_jobs failed", e);
206+
} catch (InterruptedException e) {
207+
listRequests.remove(reqId);
208+
Thread.currentThread().interrupt();
209+
throw e;
210+
} catch (TimeoutException e) {
211+
listRequests.remove(reqId);
212+
throw e;
213+
} catch (java.util.concurrent.ExecutionException e) {
214+
listRequests.remove(reqId);
215+
Throwable cause = e.getCause() != null ? e.getCause() : e;
216+
if (cause instanceof ArcpException ax) {
217+
throw ax;
218+
}
219+
throw new IllegalStateException("list_jobs failed", cause);
166220
}
167221
}
168222

169223
public Flow.Publisher<EventBody> subscribe(JobId jobId, SubscribeOptions options) {
224+
java.util.concurrent.atomic.AtomicBoolean inserted =
225+
new java.util.concurrent.atomic.AtomicBoolean(false);
170226
SubmissionPublisher<EventBody> pub =
171227
liveSubscribers.computeIfAbsent(
172228
jobId,
173-
k -> new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024));
174-
JobSubscribe sub =
175-
new JobSubscribe(
176-
jobId, options.history() ? options.fromEventSeq() : null, options.history());
177-
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
229+
k -> {
230+
inserted.set(true);
231+
ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
232+
liveExecutors.put(jobId, exec);
233+
return new SubmissionPublisher<>(exec, 1024);
234+
});
235+
if (inserted.get()) {
236+
JobSubscribe sub =
237+
new JobSubscribe(
238+
jobId, options.history() ? options.fromEventSeq() : null, options.history());
239+
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
240+
}
178241
return pub;
179242
}
180243

244+
/**
245+
* Locally unsubscribe from job events and notify the runtime via {@code job.unsubscribe}. Closes
246+
* the local {@link Flow.Publisher} so any downstream subscribers see {@code onComplete}.
247+
*/
248+
public void unsubscribe(JobId jobId) {
249+
SubmissionPublisher<EventBody> pub = liveSubscribers.remove(jobId);
250+
if (pub != null) {
251+
pub.close();
252+
}
253+
ExecutorService exec = liveExecutors.remove(jobId);
254+
if (exec != null) {
255+
exec.shutdown();
256+
}
257+
if (!closed) {
258+
try {
259+
send(
260+
Message.Type.JOB_UNSUBSCRIBE,
261+
new JobUnsubscribe(jobId),
262+
sessionId,
263+
null,
264+
jobId,
265+
null);
266+
} catch (RuntimeException ignored) {
267+
// best-effort
268+
}
269+
}
270+
}
271+
181272
public void ack(long lastProcessedSeq) {
182273
send(Message.Type.SESSION_ACK, new SessionAck(lastProcessedSeq), sessionId, null, null, null);
183274
}
@@ -204,12 +295,18 @@ public void close() {
204295
for (var pub : liveSubscribers.values()) {
205296
pub.close();
206297
}
298+
for (ExecutorService exec : liveExecutors.values()) {
299+
exec.shutdown();
300+
}
301+
liveExecutors.clear();
207302
try {
208303
transport.close();
209304
} catch (RuntimeException ignored) {
210305
// best-effort close
211306
}
212-
scheduler.shutdownNow();
307+
if (ownedScheduler) {
308+
scheduler.shutdownNow();
309+
}
213310
}
214311

215312
/** Returns the highest event sequence number seen from the server, or -1 if none. */
@@ -248,17 +345,52 @@ public void onNext(Envelope envelope) {
248345

249346
@Override
250347
public void onError(Throwable throwable) {
251-
sessionFuture.completeExceptionally(throwable);
348+
failAll(throwable);
252349
}
253350

254351
@Override
255352
public void onComplete() {
256353
if (!sessionFuture.isDone()) {
257-
sessionFuture.completeExceptionally(
258-
new IllegalStateException("transport closed before welcome"));
354+
failAll(new IllegalStateException("transport closed before welcome"));
355+
} else {
356+
// Transport closed after welcome — fail anything still in flight.
357+
failAll(new IllegalStateException("transport closed"));
259358
}
260359
}
261360

361+
/**
362+
* Fail every outstanding future and complete every live subscription publisher exceptionally.
363+
* Called on transport error, unexpected transport close, and heartbeat-loss close so a caller
364+
* blocked on {@link #submit}, {@link #listJobs}, or {@link JobHandle#result()} does not wait
365+
* forever.
366+
*/
367+
private void failAll(Throwable cause) {
368+
if (!sessionFuture.isDone()) {
369+
sessionFuture.completeExceptionally(cause);
370+
}
371+
for (PendingSubmit head; (head = pendingSubmits.pollFirst()) != null; ) {
372+
head.outstanding().handleFuture.completeExceptionally(cause);
373+
}
374+
for (java.util.Map.Entry<JobId, Outstanding> e : outstanding.entrySet()) {
375+
Outstanding o = e.getValue();
376+
if (!o.resultFuture.isDone()) {
377+
o.resultFuture.completeExceptionally(cause);
378+
}
379+
o.events.close();
380+
}
381+
outstanding.clear();
382+
for (java.util.Map.Entry<MessageId, CompletableFuture<SessionJobs>> e : listRequests.entrySet()) {
383+
if (!e.getValue().isDone()) {
384+
e.getValue().completeExceptionally(cause);
385+
}
386+
}
387+
listRequests.clear();
388+
for (SubmissionPublisher<EventBody> pub : liveSubscribers.values()) {
389+
pub.closeExceptionally(cause);
390+
}
391+
liveSubscribers.clear();
392+
}
393+
262394
private void dispatch(Envelope envelope) {
263395
Message m;
264396
try {
@@ -338,25 +470,23 @@ private void watchHeartbeat(long intervalMs) {
338470
long elapsed = System.currentTimeMillis() - lastInboundMillis.get();
339471
if (elapsed > intervalMs * 2) {
340472
log.info("client observed heartbeat loss; closing session");
473+
failAll(new IllegalStateException("heartbeat lost"));
341474
close();
342475
}
343476
}
344477

345-
private final ConcurrentHashMap<MessageId, Outstanding> pendingSubmits =
346-
new ConcurrentHashMap<>();
478+
private record PendingSubmit(MessageId requestId, Outstanding outstanding) {}
479+
480+
private final ConcurrentLinkedDeque<PendingSubmit> pendingSubmits = new ConcurrentLinkedDeque<>();
481+
private final java.util.concurrent.locks.ReentrantLock submitLock =
482+
new java.util.concurrent.locks.ReentrantLock();
347483

348484
private void handleAccepted(Envelope envelope, JobAccepted accepted) {
349-
// We associate by traversing pending submits in insertion order; the
350-
// runtime guarantees ordering per-session, so the oldest pending submit
351-
// is the one being acknowledged.
352-
MessageId match = pendingSubmits.keySet().stream().findFirst().orElse(null);
353-
if (match == null) {
354-
return;
355-
}
356-
Outstanding o = pendingSubmits.remove(match);
357-
if (o == null) {
485+
PendingSubmit head = pendingSubmits.pollFirst();
486+
if (head == null) {
358487
return;
359488
}
489+
Outstanding o = head.outstanding();
360490
o.jobId = accepted.jobId();
361491
outstanding.put(accepted.jobId(), o);
362492
o.handleFuture.complete(new ClientJobHandle(accepted, o));
@@ -395,14 +525,11 @@ private void handleError(Envelope envelope, JobError err) {
395525
JobId jid = envelope.jobId();
396526
Outstanding o = jid != null ? outstanding.remove(jid) : null;
397527
if (o == null) {
398-
// Top-level (unassigned) error: drop the oldest pending submit.
399-
MessageId first = pendingSubmits.keySet().stream().findFirst().orElse(null);
400-
if (first != null) {
401-
Outstanding pending = pendingSubmits.remove(first);
402-
if (pending != null) {
403-
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
404-
pending.handleFuture.completeExceptionally(ex);
405-
}
528+
// Top-level (unassigned) error: fail the oldest pending submit.
529+
PendingSubmit head = pendingSubmits.pollFirst();
530+
if (head != null) {
531+
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
532+
head.outstanding().handleFuture.completeExceptionally(ex);
406533
}
407534
return;
408535
}
@@ -536,7 +663,7 @@ public Builder bearer(String token) {
536663
}
537664

538665
public Builder features(Set<Feature> features) {
539-
this.features = EnumSet.copyOf(features);
666+
this.features = safeFeatureCopy(features);
540667
return this;
541668
}
542669

arcp-client/src/main/java/dev/arcp/client/Page.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import org.jspecify.annotations.Nullable;
66

77
public record Page<T>(List<T> items, @Nullable String nextCursor) {
8+
public Page {
9+
items = items == null ? List.of() : List.copyOf(items);
10+
}
11+
812
public static Page<JobSummary> empty() {
913
return new Page<>(List.of(), null);
1014
}

0 commit comments

Comments
 (0)