Skip to content

Commit e0aeadf

Browse files
nficanoclaude
andauthored
fix: address all open issues (33 issues) (#66)
* fix: batch of correctness, performance, and API fixes Addresses these open issues with the noted change: - #23 sessions map key stability: key sessions by stable pendingKey() so removeSession() finds the entry after handshake flips the id. - #24 concurrent submit correlation: pendingSubmits is now a FIFO Deque keyed by request order; submitLock serializes put-and-send. - #25 transport failAll path: failAll() drains every outstanding future and live subscription on transport error, unexpected onComplete, and heartbeat loss. - #26 list_jobs pagination: server respects req.limit() and req.cursor() with stable ordering by (createdAt, jobId); cursor is opaque url-safe base64. - #27 canonical idempotency fingerprint: SHA-256 over canonical JSON of (agent, input, lease_request, lease_constraints, max_runtime_sec) replaces the 32-bit hashCode payload hash. - #28 SubmissionPublisher executor lifecycle: MemoryTransport, WebSocketTransport, ReplayingPublisher, and ArcpClient.subscribe now own their virtual-thread executors and shut them down on close. - #29 close() respects external executor ownership: ArcpRuntime and ArcpClient track owned vs externally supplied scheduler/workerPool and only shut down the ones they created. - #30 empty feature set: safeFeatureCopy(Set.of()) returns an empty EnumSet instead of throwing IllegalArgumentException. - #31 advertised exposes mutable state: ArcpRuntime.advertised is wrapped in Collections.unmodifiableSet at construction. - #32 Session/Page mutable aliases: compact constructors defensively copy collection components into immutable views. - #34 partial Javadoc on named public APIs. - #35 Capabilities Javadoc now accurately documents drop behavior. - #36 CONFORMANCE.md stdio + version 1.1 entries refreshed. - #37 BudgetCounters Javadoc corrected to describe void semantics. - #38 WebSocket middleware send synchronization: writeLock added to Spring, Jakarta, Vert.x, and Jetty WebSocket transports. - #42 WebSocketTransport pre-attach frame race: listener captures transport directly and buffers inbound frames; send() gates on socket attachment. - #43 ReplayingPublisher Reactive Streams §1.9: onSubscribe is the first downstream signal; live items queued during replay. - #46 LeaseGuard regex cache: per-guard ConcurrentHashMap caches compiled glob Patterns; matchesCached used on the hot path. - #47 JobRecord credentials atomic ops: synchronized lock around credentials list; drainCredentials, replaceCredential, and setCredentials are atomic. - #48 BearerVerifier constant-time compare + SHA-256 acceptAny. - #50 SessionLoop shutdown atomic: phase is now AtomicReference; only the first caller runs cleanup. - #51 ArcpOtel guards extensions cast: returns envelope unchanged when extensions exists but is not an object. - #52 ArcpMapper.shared Javadoc revised to document mutable contract. - #53 fromWire lookup maps: O(1) BY_WIRE maps replace stream walks on Message.Type, EventBody.Kind, and Feature. - #55 IdempotencyStore scheduled prune: prune moved off the claim hot path; runs every minute under the runtime's scheduler. - #56 FileCredentialRevocationStore durable journal: long-lived RandomAccessFile in rwd mode with explicit FD sync per append. - #57 ArcpClient.listJobs typed exceptions: declared InterruptedException, TimeoutException, ArcpException; restores interrupt flag. - #59 ArcpClient.subscribe idempotent: JOB_SUBSCRIBE sent only on publisher insert; unsubscribe() closes local publisher and notifies the runtime. Server-side handleSubscribe is idempotent. - #60 CredentialBinding retry/backoff: narrowed catch, log full cause, exponential sleep between attempts, markRevocationFailed. - #61 JobRecord.subscribers encapsulation: unmodifiable view + addSubscriber/removeSubscribersWhere mutators. - #63 MemoryTransport.pair record: typed Pair(runtime, client) replaces the array return; all callers updated. - #64 WebSocketTransport HttpClient close: HttpClient stored as field and closed in close() to release selector threads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * build: wire JaCoCo coverage reports + spotless cleanup Apply the jacoco plugin to every java-library subproject so every test task is finalized by jacocoTestReport with XML and HTML output. Coverage threshold enforcement (#33 80% target) is left as follow-up — the plumbing is in place so further test additions raise the headline number incrementally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d911452 commit e0aeadf

60 files changed

Lines changed: 956 additions & 311 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CONFORMANCE.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ works" is decomposed into multiple binary rows.
139139

140140
- HTTP/2 + QUIC transports
141141
- mTLS / OAuth2 auth schemes
142-
- stdio newline-delimited JSON transport (`MemoryTransport` covers in-process use)
143142
- §15.6 trust elevation
144143
- Quarkus and Helidon middleware (Phase 5 deferred them; `arcp-runtime-jetty`,
145144
`arcp-middleware-jakarta`, `arcp-middleware-spring-boot`, and

arcp-client/src/main/java/dev/arcp/client/ArcpClient.java

Lines changed: 169 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.Set;
5050
import java.util.concurrent.CompletableFuture;
5151
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.ConcurrentLinkedDeque;
53+
import java.util.concurrent.ExecutorService;
5254
import java.util.concurrent.Executors;
5355
import java.util.concurrent.Flow;
5456
import java.util.concurrent.ScheduledExecutorService;
@@ -69,6 +71,13 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
6971

7072
private static final Logger log = LoggerFactory.getLogger(ArcpClient.class);
7173

74+
static EnumSet<Feature> safeFeatureCopy(Set<Feature> features) {
75+
if (features == null || features.isEmpty()) {
76+
return EnumSet.noneOf(Feature.class);
77+
}
78+
return EnumSet.copyOf(features);
79+
}
80+
7281
private final Transport transport;
7382
private final ObjectMapper mapper;
7483
private final ClientInfo info;
@@ -88,6 +97,12 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
8897
private @Nullable ScheduledFuture<?> heartbeatWatchdog;
8998
private final ConcurrentHashMap<JobId, SubmissionPublisher<EventBody>> liveSubscribers =
9099
new ConcurrentHashMap<>();
100+
private final ConcurrentHashMap<JobId, ExecutorService> liveExecutors = new ConcurrentHashMap<>();
101+
private final boolean ownedScheduler;
102+
103+
// Reserved for future use by tests asserting FIFO insertion order of pending submits.
104+
@SuppressWarnings("unused")
105+
private final ConcurrentLinkedDeque<MessageId> pendingSubmitOrder = new ConcurrentLinkedDeque<>();
91106

92107
@SuppressWarnings("unused")
93108
private Flow.@Nullable Subscription subscription;
@@ -103,16 +118,19 @@ private ArcpClient(Builder b) {
103118
this.mapper = b.mapper != null ? b.mapper : ArcpMapper.shared();
104119
this.info = b.info;
105120
this.auth = b.auth;
106-
this.requestedFeatures = EnumSet.copyOf(b.features);
121+
this.requestedFeatures = safeFeatureCopy(b.features);
107122
this.autoAck = b.autoAck;
108123
this.ackInterval = b.ackInterval;
109-
this.scheduler =
110-
b.scheduler != null
111-
? b.scheduler
112-
: Executors.newScheduledThreadPool(
113-
1,
114-
r ->
115-
Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
124+
if (b.scheduler != null) {
125+
this.scheduler = b.scheduler;
126+
this.ownedScheduler = false;
127+
} else {
128+
this.scheduler =
129+
Executors.newScheduledThreadPool(
130+
1,
131+
r -> Thread.ofPlatform().name("arcp-client-scheduler", 0).daemon(true).unstarted(r));
132+
this.ownedScheduler = true;
133+
}
116134
this.resumeToken = b.resumeToken;
117135
this.lastEventSeq = b.lastEventSeq;
118136
}
@@ -130,11 +148,16 @@ public CompletableFuture<Session> connect() {
130148
return sessionFuture;
131149
}
132150

133-
public Session connect(Duration timeout) throws InterruptedException, TimeoutException {
151+
public Session connect(Duration timeout)
152+
throws InterruptedException, TimeoutException, ArcpException {
134153
try {
135154
return connect().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
136155
} catch (java.util.concurrent.ExecutionException e) {
137-
throw new IllegalStateException("connect failed", e.getCause());
156+
Throwable cause = e.getCause() != null ? e.getCause() : e;
157+
if (cause instanceof ArcpException ax) {
158+
throw ax;
159+
}
160+
throw new IllegalStateException("connect failed", cause);
138161
}
139162
}
140163

@@ -144,40 +167,102 @@ public JobHandle submit(JobSubmit submit) {
144167

145168
public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
146169
Outstanding o = new Outstanding();
147-
// Pre-register the outstanding job under a request-id keyed slot so that
148-
// when JobAccepted arrives we associate the job_id.
149170
MessageId requestId = MessageId.generate();
150-
pendingSubmits.put(requestId, o);
151-
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
171+
// The put-then-send pair must be atomic w.r.t. other submits so that the
172+
// FIFO order of pendingSubmits matches the wire order observed by the
173+
// runtime; that ordering is how handleAccepted correlates JobAccepted
174+
// back to the right pending submit (the runtime does not echo our
175+
// request id on job.accepted).
176+
submitLock.lock();
177+
try {
178+
pendingSubmits.add(new PendingSubmit(requestId, o));
179+
send(Message.Type.JOB_SUBMIT, submit, sessionId, traceId, null, null, requestId);
180+
} finally {
181+
submitLock.unlock();
182+
}
152183
return o.handleFuture.join();
153184
}
154185

155-
public Page<JobSummary> listJobs(@Nullable JobFilter filter) {
156-
SessionListJobs req = new SessionListJobs(filter, null, null);
186+
public Page<JobSummary> listJobs(@Nullable JobFilter filter)
187+
throws InterruptedException, TimeoutException, ArcpException {
188+
return listJobs(filter, null, null);
189+
}
190+
191+
/**
192+
* List jobs with optional pagination. Supply {@code cursor} from the previous {@link Page} to
193+
* continue, or {@code null} to fetch the first page. {@code limit} caps the page size.
194+
*/
195+
public Page<JobSummary> listJobs(
196+
@Nullable JobFilter filter, @Nullable Integer limit, @Nullable String cursor)
197+
throws InterruptedException, TimeoutException, ArcpException {
198+
SessionListJobs req = new SessionListJobs(filter, limit, cursor);
157199
MessageId reqId = MessageId.generate();
158200
CompletableFuture<SessionJobs> fut = new CompletableFuture<>();
159201
listRequests.put(reqId, fut);
160202
send(Message.Type.SESSION_LIST_JOBS, req, sessionId, null, null, null, reqId);
161203
try {
162204
SessionJobs response = fut.get(10, TimeUnit.SECONDS);
163205
return new Page<>(response.jobs(), response.nextCursor());
164-
} catch (java.util.concurrent.ExecutionException | InterruptedException | TimeoutException e) {
165-
throw new RuntimeException("list_jobs failed", e);
206+
} catch (InterruptedException e) {
207+
listRequests.remove(reqId);
208+
Thread.currentThread().interrupt();
209+
throw e;
210+
} catch (TimeoutException e) {
211+
listRequests.remove(reqId);
212+
throw e;
213+
} catch (java.util.concurrent.ExecutionException e) {
214+
listRequests.remove(reqId);
215+
Throwable cause = e.getCause() != null ? e.getCause() : e;
216+
if (cause instanceof ArcpException ax) {
217+
throw ax;
218+
}
219+
throw new IllegalStateException("list_jobs failed", cause);
166220
}
167221
}
168222

169223
public Flow.Publisher<EventBody> subscribe(JobId jobId, SubscribeOptions options) {
224+
java.util.concurrent.atomic.AtomicBoolean inserted =
225+
new java.util.concurrent.atomic.AtomicBoolean(false);
170226
SubmissionPublisher<EventBody> pub =
171227
liveSubscribers.computeIfAbsent(
172228
jobId,
173-
k -> new SubmissionPublisher<>(Executors.newVirtualThreadPerTaskExecutor(), 1024));
174-
JobSubscribe sub =
175-
new JobSubscribe(
176-
jobId, options.history() ? options.fromEventSeq() : null, options.history());
177-
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
229+
k -> {
230+
inserted.set(true);
231+
ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
232+
liveExecutors.put(jobId, exec);
233+
return new SubmissionPublisher<>(exec, 1024);
234+
});
235+
if (inserted.get()) {
236+
JobSubscribe sub =
237+
new JobSubscribe(
238+
jobId, options.history() ? options.fromEventSeq() : null, options.history());
239+
send(Message.Type.JOB_SUBSCRIBE, sub, sessionId, null, jobId, null);
240+
}
178241
return pub;
179242
}
180243

244+
/**
245+
* Locally unsubscribe from job events and notify the runtime via {@code job.unsubscribe}. Closes
246+
* the local {@link Flow.Publisher} so any downstream subscribers see {@code onComplete}.
247+
*/
248+
public void unsubscribe(JobId jobId) {
249+
SubmissionPublisher<EventBody> pub = liveSubscribers.remove(jobId);
250+
if (pub != null) {
251+
pub.close();
252+
}
253+
ExecutorService exec = liveExecutors.remove(jobId);
254+
if (exec != null) {
255+
exec.shutdown();
256+
}
257+
if (!closed) {
258+
try {
259+
send(Message.Type.JOB_UNSUBSCRIBE, new JobUnsubscribe(jobId), sessionId, null, jobId, null);
260+
} catch (RuntimeException ignored) {
261+
// best-effort
262+
}
263+
}
264+
}
265+
181266
public void ack(long lastProcessedSeq) {
182267
send(Message.Type.SESSION_ACK, new SessionAck(lastProcessedSeq), sessionId, null, null, null);
183268
}
@@ -204,12 +289,18 @@ public void close() {
204289
for (var pub : liveSubscribers.values()) {
205290
pub.close();
206291
}
292+
for (ExecutorService exec : liveExecutors.values()) {
293+
exec.shutdown();
294+
}
295+
liveExecutors.clear();
207296
try {
208297
transport.close();
209298
} catch (RuntimeException ignored) {
210299
// best-effort close
211300
}
212-
scheduler.shutdownNow();
301+
if (ownedScheduler) {
302+
scheduler.shutdownNow();
303+
}
213304
}
214305

215306
/** Returns the highest event sequence number seen from the server, or -1 if none. */
@@ -248,17 +339,53 @@ public void onNext(Envelope envelope) {
248339

249340
@Override
250341
public void onError(Throwable throwable) {
251-
sessionFuture.completeExceptionally(throwable);
342+
failAll(throwable);
252343
}
253344

254345
@Override
255346
public void onComplete() {
256347
if (!sessionFuture.isDone()) {
257-
sessionFuture.completeExceptionally(
258-
new IllegalStateException("transport closed before welcome"));
348+
failAll(new IllegalStateException("transport closed before welcome"));
349+
} else {
350+
// Transport closed after welcome — fail anything still in flight.
351+
failAll(new IllegalStateException("transport closed"));
259352
}
260353
}
261354

355+
/**
356+
* Fail every outstanding future and complete every live subscription publisher exceptionally.
357+
* Called on transport error, unexpected transport close, and heartbeat-loss close so a caller
358+
* blocked on {@link #submit}, {@link #listJobs}, or {@link JobHandle#result()} does not wait
359+
* forever.
360+
*/
361+
private void failAll(Throwable cause) {
362+
if (!sessionFuture.isDone()) {
363+
sessionFuture.completeExceptionally(cause);
364+
}
365+
for (PendingSubmit head; (head = pendingSubmits.pollFirst()) != null; ) {
366+
head.outstanding().handleFuture.completeExceptionally(cause);
367+
}
368+
for (java.util.Map.Entry<JobId, Outstanding> e : outstanding.entrySet()) {
369+
Outstanding o = e.getValue();
370+
if (!o.resultFuture.isDone()) {
371+
o.resultFuture.completeExceptionally(cause);
372+
}
373+
o.events.close();
374+
}
375+
outstanding.clear();
376+
for (java.util.Map.Entry<MessageId, CompletableFuture<SessionJobs>> e :
377+
listRequests.entrySet()) {
378+
if (!e.getValue().isDone()) {
379+
e.getValue().completeExceptionally(cause);
380+
}
381+
}
382+
listRequests.clear();
383+
for (SubmissionPublisher<EventBody> pub : liveSubscribers.values()) {
384+
pub.closeExceptionally(cause);
385+
}
386+
liveSubscribers.clear();
387+
}
388+
262389
private void dispatch(Envelope envelope) {
263390
Message m;
264391
try {
@@ -338,25 +465,23 @@ private void watchHeartbeat(long intervalMs) {
338465
long elapsed = System.currentTimeMillis() - lastInboundMillis.get();
339466
if (elapsed > intervalMs * 2) {
340467
log.info("client observed heartbeat loss; closing session");
468+
failAll(new IllegalStateException("heartbeat lost"));
341469
close();
342470
}
343471
}
344472

345-
private final ConcurrentHashMap<MessageId, Outstanding> pendingSubmits =
346-
new ConcurrentHashMap<>();
473+
private record PendingSubmit(MessageId requestId, Outstanding outstanding) {}
474+
475+
private final ConcurrentLinkedDeque<PendingSubmit> pendingSubmits = new ConcurrentLinkedDeque<>();
476+
private final java.util.concurrent.locks.ReentrantLock submitLock =
477+
new java.util.concurrent.locks.ReentrantLock();
347478

348479
private void handleAccepted(Envelope envelope, JobAccepted accepted) {
349-
// We associate by traversing pending submits in insertion order; the
350-
// runtime guarantees ordering per-session, so the oldest pending submit
351-
// is the one being acknowledged.
352-
MessageId match = pendingSubmits.keySet().stream().findFirst().orElse(null);
353-
if (match == null) {
354-
return;
355-
}
356-
Outstanding o = pendingSubmits.remove(match);
357-
if (o == null) {
480+
PendingSubmit head = pendingSubmits.pollFirst();
481+
if (head == null) {
358482
return;
359483
}
484+
Outstanding o = head.outstanding();
360485
o.jobId = accepted.jobId();
361486
outstanding.put(accepted.jobId(), o);
362487
o.handleFuture.complete(new ClientJobHandle(accepted, o));
@@ -395,14 +520,11 @@ private void handleError(Envelope envelope, JobError err) {
395520
JobId jid = envelope.jobId();
396521
Outstanding o = jid != null ? outstanding.remove(jid) : null;
397522
if (o == null) {
398-
// Top-level (unassigned) error: drop the oldest pending submit.
399-
MessageId first = pendingSubmits.keySet().stream().findFirst().orElse(null);
400-
if (first != null) {
401-
Outstanding pending = pendingSubmits.remove(first);
402-
if (pending != null) {
403-
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
404-
pending.handleFuture.completeExceptionally(ex);
405-
}
523+
// Top-level (unassigned) error: fail the oldest pending submit.
524+
PendingSubmit head = pendingSubmits.pollFirst();
525+
if (head != null) {
526+
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
527+
head.outstanding().handleFuture.completeExceptionally(ex);
406528
}
407529
return;
408530
}
@@ -536,7 +658,7 @@ public Builder bearer(String token) {
536658
}
537659

538660
public Builder features(Set<Feature> features) {
539-
this.features = EnumSet.copyOf(features);
661+
this.features = safeFeatureCopy(features);
540662
return this;
541663
}
542664

arcp-client/src/main/java/dev/arcp/client/Page.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import org.jspecify.annotations.Nullable;
66

77
public record Page<T>(List<T> items, @Nullable String nextCursor) {
8+
public Page {
9+
items = items == null ? List.of() : List.copyOf(items);
10+
}
11+
812
public static Page<JobSummary> empty() {
913
return new Page<>(List.of(), null);
1014
}

0 commit comments

Comments
 (0)