From 1a469905536822cf4ea16b818d11b5b86a3685d1 Mon Sep 17 00:00:00 2001 From: Timothy Clem Date: Fri, 22 May 2026 19:35:11 -0700 Subject: [PATCH 1/2] Java SDK: add CopilotClient.createCloudSession - Add createCloudSession(SessionConfig) to CopilotClient that sends a session.create request without a caller-supplied sessionId, letting the runtime assign one (cloud-managed routing) - Add PendingRoutingState: thread-safe buffer for events and parked request futures that arrive before cloud session registration; bounded at 128 entries per id, drop-oldest; flush on registerAndFlush() - Add SessionRequestBuilder.buildCloudCreateRequest that omits sessionId and provider from the wire payload - Update RpcHandlerDispatcher to accept PendingRoutingState and route all inbound server requests (tool.call, permission, user-input, exit-plan-mode, auto-mode-switch, hooks.invoke, system-message-transform) through resolveSessionForRequest so they park on the pending state when the session is still in flight - Guard createSession against being called with cloud config; guard createCloudSession against caller-supplied sessionId or provider - Preserve existing non-cloud rekey-on-mismatch logic unchanged - 10 new tests in CloudSessionTest covering all 7 contract scenarios; fix RpcHandlerDispatcherTest constructor call for new signature Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../com/github/copilot/sdk/CopilotClient.java | 173 ++++++- .../copilot/sdk/PendingRoutingState.java | 183 ++++++++ .../copilot/sdk/RpcHandlerDispatcher.java | 155 ++++++- .../copilot/sdk/SessionRequestBuilder.java | 76 +++ .../github/copilot/sdk/CloudSessionTest.java | 433 ++++++++++++++++++ .../copilot/sdk/RpcHandlerDispatcherTest.java | 2 +- 6 files changed, 1002 insertions(+), 20 deletions(-) create mode 100644 java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java create mode 100644 java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java diff --git a/java/src/main/java/com/github/copilot/sdk/CopilotClient.java b/java/src/main/java/com/github/copilot/sdk/CopilotClient.java index 4d0770319..15391d638 100644 --- a/java/src/main/java/com/github/copilot/sdk/CopilotClient.java +++ b/java/src/main/java/com/github/copilot/sdk/CopilotClient.java @@ -81,6 +81,7 @@ public final class CopilotClient implements AutoCloseable { private final CliServerManager serverManager; private final LifecycleEventManager lifecycleManager = new LifecycleEventManager(); private final Map sessions = new ConcurrentHashMap<>(); + private final PendingRoutingState pendingRoutingState = new PendingRoutingState(); private volatile CompletableFuture connectionFuture; private volatile boolean disposed = false; private final String optionsHost; @@ -210,7 +211,7 @@ private Connection startCoreBody() { // Register handlers for server-to-client calls RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch, - options.getExecutor()); + options.getExecutor(), pendingRoutingState); dispatcher.registerHandlers(rpc); // Verify protocol version @@ -426,6 +427,10 @@ public CompletableFuture createSession(SessionConfig config) { + "For example, to allow all permissions, use: " + "new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)")); } + if (config.getCloud() != null) { + return CompletableFuture.failedFuture(new IllegalArgumentException( + "CopilotClient.createSession does not support cloud sessions; use createCloudSession instead.")); + } return ensureConnected().thenCompose(connection -> { long totalNanos = System.nanoTime(); // Pre-generate session ID so the session can be registered before the RPC call, @@ -487,6 +492,172 @@ public CompletableFuture createSession(SessionConfig config) { }); } + /** + * Creates a Mission Control–backed cloud session. + * + *

+ * The runtime owns the session ID for cloud sessions. Do not + * set {@link SessionConfig#setSessionId(String) sessionId} or + * {@link SessionConfig#setProvider(com.github.copilot.sdk.json.ProviderConfig) + * provider} on the config; the SDK rejects both with + * {@link IllegalArgumentException}. The config must have + * {@link SessionConfig#setCloud(com.github.copilot.sdk.json.CloudSessionOptions) + * cloud} set; + * {@link SessionConfig#setOnPermissionRequest(com.github.copilot.sdk.json.PermissionHandler) + * onPermissionRequest} is required. + * + *

+ * The SDK omits {@code sessionId} from the {@code session.create} wire request + * and registers the returned session under the id the runtime assigns. Any + * {@code session.event} notifications or inbound JSON-RPC requests that arrive + * between sending {@code session.create} and receiving its response are + * buffered (bounded, drop-oldest, up to + * {@value PendingRoutingState#BUFFER_LIMIT} per session id) and replayed once + * the session is registered. + * + *

+ * Example: + * + *

{@code
+     * var session = client.createCloudSession(new SessionConfig()
+     * 		.setCloud(new CloudSessionOptions()
+     * 				.setRepository(new CloudSessionRepository().setOwner("github").setName("copilot-sdk")))
+     * 		.setOnPermissionRequest(PermissionHandler.APPROVE_ALL)).get();
+     * }
+ * + * @param config + * configuration for the cloud session; must have {@code cloud} set + * and an {@code onPermissionRequest} handler; must not have + * {@code sessionId} or {@code provider} set + * @return a future that resolves with the created {@link CopilotSession} + * @throws IllegalArgumentException + * if validation fails (see above) + * @see SessionConfig#setCloud(com.github.copilot.sdk.json.CloudSessionOptions) + * @see com.github.copilot.sdk.json.PermissionHandler#APPROVE_ALL + * @since 1.6.0 + */ + public CompletableFuture createCloudSession(SessionConfig config) { + if (config == null || config.getOnPermissionRequest() == null) { + return CompletableFuture.failedFuture( + new IllegalArgumentException("An onPermissionRequest handler is required when creating a session. " + + "For example, to allow all permissions, use: " + + "new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)")); + } + if (config.getCloud() == null) { + return CompletableFuture.failedFuture( + new IllegalArgumentException("CopilotClient.createCloudSession requires config.cloud to be set.")); + } + if (config.getSessionId() != null && !config.getSessionId().isEmpty()) { + return CompletableFuture.failedFuture(new IllegalArgumentException( + "CopilotClient.createCloudSession does not support a caller-provided sessionId; " + + "the runtime assigns one.")); + } + if (config.getProvider() != null) { + return CompletableFuture.failedFuture( + new IllegalArgumentException("CopilotClient.createCloudSession does not support config.provider; " + + "cloud sessions use the runtime's provider.")); + } + + return ensureConnected().thenCompose(connection -> { + long totalNanos = System.nanoTime(); + + // Enter pending-routing mode before sending session.create so that any + // session.event notifications or inbound RPC requests that arrive during + // the in-flight RPC are buffered rather than dropped. + pendingRoutingState.incrementGuard(); + + var request = SessionRequestBuilder.buildCloudCreateRequest(config); + + // Extract transform callbacks from the system message config. + var extracted = SessionRequestBuilder.extractTransformCallbacks(config.getSystemMessage()); + if (extracted.wireSystemMessage() != config.getSystemMessage()) { + request.setSystemMessage(extracted.wireSystemMessage()); + } + + long rpcNanos = System.nanoTime(); + return connection.rpc.invoke("session.create", request, CreateSessionResponse.class).thenApply(response -> { + LoggingHelpers.logTiming(LOG, Level.FINE, + "CopilotClient.createCloudSession session.create completed. Elapsed={Elapsed}", rpcNanos); + + String returnedId = response.sessionId(); + if (returnedId == null || returnedId.isEmpty()) { + // No id: release the guard and surface the error. Any runtime session + // created on the other side may leak — we have nothing to destroy. + var staleWaiters = pendingRoutingState.decrementGuard(); + completeWaitersExceptionally(staleWaiters, + "Cloud session.create completed without registering this sessionId; request dropped"); + LOG.warning("Cloud session.create response missing sessionId; runtime session may leak"); + throw new RuntimeException( + "Cloud session.create response did not include a sessionId; cannot register session."); + } + + var session = new CopilotSession(returnedId, connection.rpc); + if (options.getExecutor() != null) { + session.setExecutor(options.getExecutor()); + } + SessionRequestBuilder.configureSession(session, config); + if (extracted.transformCallbacks() != null) { + session.registerTransformCallbacks(extracted.transformCallbacks()); + } + + try { + // Atomically register the session in the sessions map and drain any + // buffered events/parked waiters. The sessions.put happens inside the + // PendingRoutingState lock so concurrent tryBufferNotification / + // tryParkRequest calls see the session as registered. + var flush = pendingRoutingState.registerAndFlush(returnedId, session, sessions); + + session.setWorkspacePath(response.workspacePath()); + session.setCapabilities(response.capabilities()); + + // Replay buffered session.event notifications + for (var event : flush.events()) { + session.dispatchEvent(event); + } + // Complete parked request waiters + for (var waiter : flush.waiters()) { + waiter.complete(session); + } + } catch (Exception e) { + // Roll back: remove session from map, release guard with exceptional completion + sessions.remove(returnedId); + var staleWaiters = pendingRoutingState.decrementGuard(); + completeWaitersExceptionally(staleWaiters, + "Cloud session post-registration setup failed; request dropped"); + LoggingHelpers.logTiming(LOG, Level.WARNING, e, + "CopilotClient.createCloudSession post-registration setup failed. Elapsed={Elapsed}", + totalNanos); + throw e instanceof RuntimeException re ? re : new RuntimeException(e); + } + + var staleWaiters = pendingRoutingState.decrementGuard(); + completeWaitersExceptionally(staleWaiters, + "Cloud session.create completed without registering this sessionId; request dropped"); + + LoggingHelpers.logTiming(LOG, Level.FINE, + "CopilotClient.createCloudSession complete. Elapsed={Elapsed}, SessionId=" + returnedId, + totalNanos); + return session; + }).exceptionally(ex -> { + var staleWaiters = pendingRoutingState.decrementGuard(); + completeWaitersExceptionally(staleWaiters, "Cloud session.create failed; request dropped"); + LoggingHelpers.logTiming(LOG, Level.WARNING, ex, + "CopilotClient.createCloudSession failed. Elapsed={Elapsed}", totalNanos); + throw ex instanceof RuntimeException re ? re : new RuntimeException(ex); + }); + }); + } + + private static void completeWaitersExceptionally(java.util.List> waiters, + String message) { + if (!waiters.isEmpty()) { + var ex = new RuntimeException(message); + for (var waiter : waiters) { + waiter.completeExceptionally(ex); + } + } + } + /** * Resumes an existing Copilot session. *

diff --git a/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java b/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java new file mode 100644 index 000000000..a0b2e0852 --- /dev/null +++ b/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java @@ -0,0 +1,183 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +package com.github.copilot.sdk; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Logger; + +import com.github.copilot.sdk.generated.SessionEvent; + +/** + * Thread-safe state for pending-routing mode used by + * {@link CopilotClient#createCloudSession}. + * + *

+ * While one or more cloud {@code session.create} calls are in flight (guard + * count {@code > 0}), notifications and inbound RPC requests addressed to + * session ids that are not yet registered are buffered here rather than + * dropped. Once {@link CopilotClient#createCloudSession} receives the + * runtime-assigned session id, it calls {@link #registerAndFlush} to atomically + * insert the session into the sessions map, drain any buffered events into it, + * and complete any parked request waiters. + * + *

+ * All mutation methods synchronize on {@code this}. The sessions-map put inside + * {@link #registerAndFlush} is performed while holding the lock so that the + * {@link #tryBufferNotification} / {@link #tryParkRequest} check-then-act is + * free of TOCTOU races. + */ +final class PendingRoutingState { + + static final int BUFFER_LIMIT = 128; + + private static final Logger LOG = Logger.getLogger(PendingRoutingState.class.getName()); + + private int guardCount = 0; + /** Buffered session.event notifications keyed by session id. */ + private final Map> pendingEvents = new HashMap<>(); + /** + * Parked CompletableFutures for inbound RPC requests waiting for a session to + * be registered. + */ + private final Map>> pendingWaiters = new HashMap<>(); + + /** Increment the guard count. Must be matched by {@link #decrementGuard}. */ + synchronized void incrementGuard() { + guardCount++; + } + + /** + * Decrement the guard count. If the count reaches zero, clears all buffered + * events and completes all parked waiters exceptionally. + * + * @return list of waiters that should be completed exceptionally (caller must + * complete them outside this lock to avoid re-entrancy) + */ + synchronized List> decrementGuard() { + guardCount = Math.max(0, guardCount - 1); + if (guardCount == 0) { + pendingEvents.clear(); + var stale = new ArrayList>(); + for (var list : pendingWaiters.values()) { + stale.addAll(list); + } + pendingWaiters.clear(); + return stale; + } + return Collections.emptyList(); + } + + /** + * Attempt to buffer a {@code session.event} notification for a pending session. + * + *

+ * The {@code sessions} map is checked inside this synchronized method so that + * the "session not found → buffer" decision is atomic with + * {@link #registerAndFlush}'s "put in map → flush buffer" operation. + * + * @param sessionId + * the session id from the notification + * @param event + * the parsed event to buffer + * @param sessions + * the live sessions map (checked under lock) + * @return {@code true} if the event was buffered; {@code false} if the session + * is already registered (caller should dispatch directly) or pending + * routing is inactive (caller should drop) + */ + synchronized boolean tryBufferNotification(String sessionId, SessionEvent event, + Map sessions) { + if (sessions.containsKey(sessionId)) { + return false; // session found; caller dispatches directly + } + if (guardCount == 0) { + return false; // no pending routing; drop + } + var queue = pendingEvents.computeIfAbsent(sessionId, k -> new ArrayDeque<>()); + if (queue.size() >= BUFFER_LIMIT) { + queue.pollFirst(); + LOG.warning("Pending session notification buffer full for session " + sessionId + "; dropping oldest"); + } + queue.addLast(event); + return true; + } + + /** + * Attempt to park an inbound RPC request until the session is registered. + * + *

+ * Like {@link #tryBufferNotification}, the {@code sessions} map is checked + * under the lock to avoid TOCTOU races with {@link #registerAndFlush}. + * + * @param sessionId + * the session id from the request params + * @param sessions + * the live sessions map (checked under lock) + * @return a future that will be resolved with the {@link CopilotSession} when + * registered (or completed exceptionally when the guard is dropped), or + * {@code null} if the session is already registered (callers should use + * it directly) or if pending routing is inactive (caller should send + * error) + */ + synchronized CompletableFuture tryParkRequest(String sessionId, + Map sessions) { + CopilotSession existing = sessions.get(sessionId); + if (existing != null) { + return CompletableFuture.completedFuture(existing); + } + if (guardCount == 0) { + return null; // no pending; caller sends error + } + var future = new CompletableFuture(); + pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(future); + return future; + } + + /** + * Result of {@link #registerAndFlush}: buffered events to dispatch and parked + * waiters to complete. + */ + record FlushResult(List events, List> waiters) { + } + + /** + * Atomically register a session in the sessions map and drain any buffered + * events and parked waiters for that session. + * + *

+ * The {@code sessions.put} is performed inside the lock so that concurrent + * {@link #tryBufferNotification} / {@link #tryParkRequest} callers that haven't + * yet acquired the lock will see the session as registered. + * + * @param sessionId + * the session id to register + * @param session + * the session object + * @param sessions + * the live sessions map; the put is performed under lock + * @return buffered events and parked waiters to dispatch/complete outside the + * lock + */ + synchronized FlushResult registerAndFlush(String sessionId, CopilotSession session, + Map sessions) { + sessions.put(sessionId, session); + + var queue = pendingEvents.remove(sessionId); + var events = queue != null ? new ArrayList<>(queue) : Collections.emptyList(); + + var waiters = pendingWaiters.remove(sessionId); + var futures = waiters != null + ? new ArrayList<>(waiters) + : Collections.>emptyList(); + + return new FlushResult(events, futures); + } +} diff --git a/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java b/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java index 1d76d8b88..6202860a9 100644 --- a/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java +++ b/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java @@ -9,8 +9,11 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -49,6 +52,10 @@ final class RpcHandlerDispatcher { private final Map sessions; private final LifecycleEventDispatcher lifecycleDispatcher; private final Executor executor; + private final PendingRoutingState pendingState; + + /** Timeout in seconds to wait for a pending cloud session to register. */ + private static final int PENDING_SESSION_TIMEOUT_SECONDS = 60; /** * Creates a dispatcher with session registry and lifecycle dispatcher. @@ -59,12 +66,16 @@ final class RpcHandlerDispatcher { * callback for dispatching lifecycle events * @param executor * the executor for async dispatch, or {@code null} for default + * @param pendingState + * the pending-routing state for cloud sessions, or {@code null} to + * disable pending routing */ RpcHandlerDispatcher(Map sessions, LifecycleEventDispatcher lifecycleDispatcher, - Executor executor) { + Executor executor, PendingRoutingState pendingState) { this.sessions = sessions; this.lifecycleDispatcher = lifecycleDispatcher; this.executor = executor; + this.pendingState = pendingState; } /** @@ -96,13 +107,28 @@ private void handleSessionEvent(JsonNode params) { JsonNode eventNode = params.get("event"); LOG.fine("Received session.event: " + eventNode); + if (eventNode == null) { + return; + } + SessionEvent event = MAPPER.treeToValue(eventNode, SessionEvent.class); + if (event == null) { + return; + } + + // Fast path: session already registered — dispatch directly. CopilotSession session = sessions.get(sessionId); - if (session != null && eventNode != null) { - SessionEvent event = MAPPER.treeToValue(eventNode, SessionEvent.class); - if (event != null) { - session.dispatchEvent(event); - } + if (session != null) { + session.dispatchEvent(event); + return; + } + + // Slow path: session not found. Attempt to buffer for a pending cloud + // session.create. The tryBufferNotification check is inside the pending + // state's lock so it's atomic with registerAndFlush's sessions.put. + if (pendingState != null && pendingState.tryBufferNotification(sessionId, event, sessions)) { + return; // buffered; will be replayed when session is registered } + // session not registered and no pending routing active; silently drop } catch (Exception e) { LOG.log(Level.SEVERE, "Error handling session event", e); } @@ -141,9 +167,8 @@ private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params String toolName = params.get("toolName").asText(); JsonNode arguments = params.get("arguments"); - CopilotSession session = sessions.get(sessionId); + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); if (session == null) { - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -203,7 +228,20 @@ private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNo String sessionId = params.get("sessionId").asText(); JsonNode permissionRequest = params.get("permissionRequest"); + // Try to resolve the session; for a pending cloud session, park until + // registration. If not found and no pending routing, fall back to the + // protocol-correct DENIED_COULD_NOT_REQUEST_FROM_USER response. CopilotSession session = sessions.get(sessionId); + if (session == null && pendingState != null) { + CompletableFuture waiter = pendingState.tryParkRequest(sessionId, sessions); + if (waiter != null) { + try { + session = waiter.get(PENDING_SESSION_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + session = null; + } + } + } if (session == null) { var result = new PermissionRequestResult() .setKind(PermissionRequestResultKind.DENIED_COULD_NOT_REQUEST_FROM_USER); @@ -254,11 +292,10 @@ private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNod JsonNode choicesNode = params.get("choices"); JsonNode allowFreeformNode = params.get("allowFreeform"); - CopilotSession session = sessions.get(sessionId); + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); LOG.fine("Found session: " + (session != null)); if (session == null) { LOG.fine("Session not found, sending error"); - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -309,9 +346,8 @@ private void handleExitPlanModeRequest(JsonRpcClient rpc, String requestId, Json try { String sessionId = params.get("sessionId").asText(); - CopilotSession session = sessions.get(sessionId); + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); if (session == null) { - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -363,9 +399,8 @@ private void handleAutoModeSwitchRequest(JsonRpcClient rpc, String requestId, Js try { String sessionId = params.get("sessionId").asText(); - CopilotSession session = sessions.get(sessionId); + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); if (session == null) { - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -409,9 +444,8 @@ private void handleHooksInvoke(JsonRpcClient rpc, String requestId, JsonNode par String hookType = params.get("hookType").asText(); JsonNode input = params.get("input"); - CopilotSession session = sessions.get(sessionId); + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); if (session == null) { - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -454,9 +488,8 @@ private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, J String sessionId = params.has("sessionId") ? params.get("sessionId").asText() : null; JsonNode sections = params.get("sections"); - CopilotSession session = sessionId != null ? sessions.get(sessionId) : null; + CopilotSession session = resolveSessionForRequest(sessionId, requestIdLong, rpc); if (session == null) { - rpc.sendErrorResponse(requestIdLong, -32602, "Unknown session " + sessionId); return; } @@ -499,6 +532,92 @@ private static long parseRequestId(String requestId, String methodName) { } } + /** + * Resolve a session for an inbound RPC request, optionally parking until a + * pending cloud session is registered. + * + *

+ * If the session is already registered, returns it immediately. If pending + * routing is active (a cloud {@code session.create} is in flight), parks this + * call until the session is registered or the guard expires. On failure, sends + * an error response and returns {@code null}. + * + * @param sessionId + * the session id extracted from the request params + * @param requestId + * the JSON-RPC request id (for error responses) + * @param rpc + * the JSON-RPC client + * @return the resolved session, or {@code null} if an error was sent + */ + private CopilotSession resolveSessionForRequest(String sessionId, long requestId, JsonRpcClient rpc) { + if (sessionId == null) { + try { + rpc.sendErrorResponse(requestId, -32602, "Missing sessionId in request"); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed to send missing-sessionId error", e); + } + return null; + } + + // Fast path: session already registered + CopilotSession session = sessions.get(sessionId); + if (session != null) { + return session; + } + + if (pendingState != null) { + CompletableFuture waiter = pendingState.tryParkRequest(sessionId, sessions); + if (waiter != null) { + if (waiter.isDone()) { + // Resolved synchronously (session was already registered under the lock) + try { + return waiter.get(); + } catch (Exception e) { + // fall through to send error + } + } else { + try { + return waiter.get(PENDING_SESSION_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + try { + rpc.sendErrorResponse(requestId, -32603, "Session " + sessionId + " not registered within " + + PENDING_SESSION_TIMEOUT_SECONDS + "s"); + } catch (IOException ioe) { + LOG.log(Level.SEVERE, "Failed to send timeout error", ioe); + } + return null; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + try { + rpc.sendErrorResponse(requestId, -32603, "Session " + sessionId + " not registered: " + + (cause != null ? cause.getMessage() : e.getMessage())); + } catch (IOException ioe) { + LOG.log(Level.SEVERE, "Failed to send registration-failed error", ioe); + } + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + try { + rpc.sendErrorResponse(requestId, -32603, "Interrupted waiting for session " + sessionId); + } catch (IOException ioe) { + LOG.log(Level.SEVERE, "Failed to send interrupted error", ioe); + } + return null; + } + } + } + } + + // No pending routing active or pending routing returned null; send error + try { + rpc.sendErrorResponse(requestId, -32602, "Unknown session " + sessionId); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed to send unknown-session error", e); + } + return null; + } + private void runAsync(Runnable task) { try { if (executor != null) { diff --git a/java/src/main/java/com/github/copilot/sdk/SessionRequestBuilder.java b/java/src/main/java/com/github/copilot/sdk/SessionRequestBuilder.java index 0cdc4f942..fd0e4b97a 100644 --- a/java/src/main/java/com/github/copilot/sdk/SessionRequestBuilder.java +++ b/java/src/main/java/com/github/copilot/sdk/SessionRequestBuilder.java @@ -173,6 +173,82 @@ static CreateSessionRequest buildCreateRequest(SessionConfig config) { return buildCreateRequest(config, sessionId); } + /** + * Builds a CreateSessionRequest for cloud session creation. Unlike + * {@link #buildCreateRequest(SessionConfig, String)}, this method omits + * {@code sessionId} from the request — the runtime assigns the session id for + * cloud sessions. + * + * @param config + * the session configuration (may be null) + * @return the built request object with {@code sessionId} left null + */ + static CreateSessionRequest buildCloudCreateRequest(SessionConfig config) { + var request = new CreateSessionRequest(); + // Always request permission callbacks to enable deny-by-default behavior + request.setRequestPermission(true); + // Always send envValueMode=direct for MCP servers + request.setEnvValueMode("direct"); + // sessionId intentionally omitted: the runtime assigns the id for cloud + // sessions + if (config == null) { + return request; + } + + request.setModel(config.getModel()); + request.setClientName(config.getClientName()); + request.setReasoningEffort(config.getReasoningEffort()); + request.setTools(config.getTools()); + request.setSystemMessage(config.getSystemMessage()); + request.setAvailableTools(config.getAvailableTools()); + request.setExcludedTools(config.getExcludedTools()); + // provider intentionally omitted: cloud sessions use the runtime's provider + config.getEnableSessionTelemetry().ifPresent(request::setEnableSessionTelemetry); + if (config.getOnUserInputRequest() != null) { + request.setRequestUserInput(true); + } + if (config.getHooks() != null && config.getHooks().hasHooks()) { + request.setHooks(true); + } + request.setWorkingDirectory(config.getWorkingDirectory()); + if (config.isStreaming()) { + request.setStreaming(true); + } + config.getIncludeSubAgentStreamingEvents().ifPresent(request::setIncludeSubAgentStreamingEvents); + request.setMcpServers(config.getMcpServers()); + request.setCustomAgents(config.getCustomAgents()); + request.setDefaultAgent(config.getDefaultAgent()); + request.setAgent(config.getAgent()); + request.setInfiniteSessions(config.getInfiniteSessions()); + request.setSkillDirectories(config.getSkillDirectories()); + request.setInstructionDirectories(config.getInstructionDirectories()); + request.setDisabledSkills(config.getDisabledSkills()); + request.setConfigDir(config.getConfigDir()); + config.getEnableConfigDiscovery().ifPresent(request::setEnableConfigDiscovery); + request.setModelCapabilities(config.getModelCapabilities()); + + if (config.getCommands() != null && !config.getCommands().isEmpty()) { + var wireCommands = config.getCommands().stream() + .map(c -> new CommandWireDefinition(c.getName(), c.getDescription())) + .collect(java.util.stream.Collectors.toList()); + request.setCommands(wireCommands); + } + if (config.getOnElicitationRequest() != null) { + request.setRequestElicitation(true); + } + if (config.getOnExitPlanMode() != null) { + request.setRequestExitPlanMode(true); + } + if (config.getOnAutoModeSwitch() != null) { + request.setRequestAutoModeSwitch(true); + } + request.setGitHubToken(config.getGitHubToken()); + request.setRemoteSession(config.getRemoteSession()); + request.setCloud(config.getCloud()); + + return request; + } + /** * Builds a ResumeSessionRequest from the given session ID and configuration. * diff --git a/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java b/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java new file mode 100644 index 000000000..19eb9ed2d --- /dev/null +++ b/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java @@ -0,0 +1,433 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +package com.github.copilot.sdk; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.InputStream; +import java.lang.reflect.Field; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.copilot.sdk.json.CloudSessionOptions; +import com.github.copilot.sdk.json.CloudSessionRepository; +import com.github.copilot.sdk.json.CreateSessionRequest; +import com.github.copilot.sdk.json.PermissionHandler; +import com.github.copilot.sdk.json.ProviderConfig; +import com.github.copilot.sdk.json.SessionConfig; +import com.github.copilot.sdk.json.SessionLifecycleEvent; +import com.github.copilot.sdk.json.ToolDefinition; +import com.github.copilot.sdk.json.ToolResultObject; + +/** + * Tests for {@link CopilotClient#createCloudSession} and + * {@link CopilotClient#createSession} cloud-config rejection. + * + *

+ * Covers: + *

    + *
  1. {@code createSession} rejects cloud config
  2. + *
  3. Wire payload omits {@code sessionId} and includes {@code cloud}
  4. + *
  5. {@code createCloudSession} rejects caller-provided {@code sessionId}
  6. + *
  7. {@code createCloudSession} rejects caller-provided {@code provider}
  8. + *
  9. {@code createCloudSession} requires {@code cloud} to be set
  10. + *
  11. Early {@code session.event} notifications are buffered and replayed
  12. + *
  13. Inbound RPC requests are parked until the session is registered
  14. + *
+ */ +class CloudSessionTest { + + private static final ObjectMapper MAPPER = JsonRpcClient.getObjectMapper(); + private static final int SOCKET_TIMEOUT_MS = 5000; + + // Socket-pair fields used by routing-related tests + private Socket clientSideSocket; + private Socket serverSideSocket; + private JsonRpcClient rpc; + private Map sessions; + private CopyOnWriteArrayList lifecycleEvents; + private PendingRoutingState pendingState; + private RpcHandlerDispatcher dispatcher; + private InputStream responseStream; + private Map> handlers; + + @BeforeEach + void setup() throws Exception { + try (ServerSocket ss = new ServerSocket(0)) { + clientSideSocket = new Socket("localhost", ss.getLocalPort()); + serverSideSocket = ss.accept(); + } + serverSideSocket.setSoTimeout(SOCKET_TIMEOUT_MS); + + rpc = JsonRpcClient.fromSocket(clientSideSocket); + responseStream = serverSideSocket.getInputStream(); + + sessions = new ConcurrentHashMap<>(); + lifecycleEvents = new CopyOnWriteArrayList<>(); + pendingState = new PendingRoutingState(); + + dispatcher = new RpcHandlerDispatcher(sessions, lifecycleEvents::add, null, pendingState); + dispatcher.registerHandlers(rpc); + + // Extract registered handlers via reflection (same pattern as + // RpcHandlerDispatcherTest) + Field f = JsonRpcClient.class.getDeclaredField("notificationHandlers"); + f.setAccessible(true); + @SuppressWarnings("unchecked") + Map> h = (Map>) f.get(rpc); + handlers = h; + } + + @AfterEach + void teardown() throws Exception { + if (rpc != null) { + rpc.close(); + } + if (serverSideSocket != null) { + serverSideSocket.close(); + } + if (clientSideSocket != null) { + clientSideSocket.close(); + } + } + + private void invokeHandler(String method, String requestId, JsonNode params) { + handlers.get(method).accept(requestId, params); + } + + private JsonNode readResponse() throws Exception { + StringBuilder header = new StringBuilder(); + while (!header.toString().endsWith("\r\n\r\n")) { + int b = responseStream.read(); + if (b == -1) { + throw new java.io.IOException("Unexpected end of stream"); + } + header.append((char) b); + } + String headerStr = header.toString().trim(); + int idx = headerStr.indexOf(':'); + int contentLength = Integer.parseInt(headerStr.substring(idx + 1).trim()); + byte[] body = responseStream.readNBytes(contentLength); + return MAPPER.readTree(body); + } + + // ========================================================================= + // Test 1: createSession rejects cloud config + // ========================================================================= + + @Test + void createSession_rejectsCloudConfig() { + var client = new CopilotClient(); + var config = new SessionConfig().setCloud(new CloudSessionOptions()) + .setOnPermissionRequest(PermissionHandler.APPROVE_ALL); + + var future = client.createSession(config); + + var ex = assertThrows(ExecutionException.class, future::get, "createSession should fail with cloud config set"); + assertInstanceOf(IllegalArgumentException.class, ex.getCause(), "Cause should be IllegalArgumentException"); + assertTrue(ex.getCause().getMessage().contains("cloud"), "Error message should mention 'cloud'"); + + try { + client.forceStop().get(5, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + } + + // ========================================================================= + // Test 2: wire payload omits sessionId and includes cloud + // ========================================================================= + + @Test + void buildCloudCreateRequest_omitsSessionIdAndIncludesCloud() throws Exception { + var cloud = new CloudSessionOptions() + .setRepository(new CloudSessionRepository().setOwner("github").setName("copilot-sdk")); + var config = new SessionConfig().setCloud(cloud).setModel("gpt-5") + .setOnPermissionRequest(PermissionHandler.APPROVE_ALL); + + CreateSessionRequest request = SessionRequestBuilder.buildCloudCreateRequest(config); + + // Java-level assertions + assertNull(request.getSessionId(), "sessionId must be null on the cloud create request"); + assertNotNull(request.getCloud(), "cloud must be set on the cloud create request"); + assertEquals("gpt-5", request.getModel()); + + // Serialize to JSON and verify wire shape + String json = MAPPER.writeValueAsString(request); + JsonNode tree = MAPPER.readTree(json); + + assertFalse(tree.has("sessionId"), "sessionId must be absent from serialized JSON (NON_NULL omits it)"); + assertTrue(tree.has("cloud"), "cloud must be present in serialized JSON"); + assertTrue(tree.get("cloud").has("repository"), "cloud.repository must be present"); + assertEquals("github", tree.get("cloud").get("repository").get("owner").asText()); + } + + @Test + void buildCloudCreateRequest_sessionIdOmittedEvenWhenModelIsNull() throws Exception { + // Minimal config: only cloud set, no model + var config = new SessionConfig().setCloud(new CloudSessionOptions()); + + CreateSessionRequest request = SessionRequestBuilder.buildCloudCreateRequest(config); + + assertNull(request.getSessionId()); + String json = MAPPER.writeValueAsString(request); + JsonNode tree = MAPPER.readTree(json); + assertFalse(tree.has("sessionId"), "sessionId must never appear in cloud create wire payload"); + assertTrue(tree.has("cloud")); + } + + // ========================================================================= + // Test 3: createCloudSession rejects caller-provided sessionId + // ========================================================================= + + @Test + void createCloudSession_rejectsCallerSessionId() { + var client = new CopilotClient(); + var config = new SessionConfig().setCloud(new CloudSessionOptions()).setSessionId("my-caller-session") + .setOnPermissionRequest(PermissionHandler.APPROVE_ALL); + + var future = client.createCloudSession(config); + + var ex = assertThrows(ExecutionException.class, future::get, + "createCloudSession should fail when sessionId is set"); + assertInstanceOf(IllegalArgumentException.class, ex.getCause()); + assertTrue(ex.getCause().getMessage().contains("sessionId"), "Error message should mention 'sessionId'"); + + try { + client.forceStop().get(5, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + } + + // ========================================================================= + // Test 4: createCloudSession rejects caller-provided provider + // ========================================================================= + + @Test + void createCloudSession_rejectsCallerProvider() { + var client = new CopilotClient(); + var config = new SessionConfig().setCloud(new CloudSessionOptions()) + .setProvider(new ProviderConfig().setType("openai")) + .setOnPermissionRequest(PermissionHandler.APPROVE_ALL); + + var future = client.createCloudSession(config); + + var ex = assertThrows(ExecutionException.class, future::get, + "createCloudSession should fail when provider is set"); + assertInstanceOf(IllegalArgumentException.class, ex.getCause()); + assertTrue(ex.getCause().getMessage().contains("provider"), "Error message should mention 'provider'"); + + try { + client.forceStop().get(5, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + } + + // ========================================================================= + // Test 5: createCloudSession requires cloud to be set + // ========================================================================= + + @Test + void createCloudSession_requiresCloud() { + var client = new CopilotClient(); + var config = new SessionConfig().setModel("gpt-5").setOnPermissionRequest(PermissionHandler.APPROVE_ALL); + // cloud is NOT set + + var future = client.createCloudSession(config); + + var ex = assertThrows(ExecutionException.class, future::get, + "createCloudSession should fail when cloud is not set"); + assertInstanceOf(IllegalArgumentException.class, ex.getCause()); + assertTrue(ex.getCause().getMessage().contains("cloud"), "Error message should mention 'cloud'"); + + try { + client.forceStop().get(5, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + } + + // ========================================================================= + // Test 6: early session.event notifications are buffered and replayed + // ========================================================================= + + @Test + void bufferEarlySessionEventNotifications() throws Exception { + // Enter pending routing mode (simulates createCloudSession in-flight) + pendingState.incrementGuard(); + + String pendingSessionId = "cloud-session-abc"; + + // Dispatch a session.event while no session is registered yet. + ObjectNode params = MAPPER.createObjectNode(); + params.put("sessionId", pendingSessionId); + ObjectNode event = params.putObject("event"); + event.put("type", "sessionStart"); + ObjectNode data = event.putObject("data"); + data.put("sessionId", pendingSessionId); + + invokeHandler("session.event", null, params); + + // Give the (synchronous) handler a moment — no session registered yet, so the + // event should be buffered, not dispatched. + Thread.sleep(50); + + // Create the session object and register it via registerAndFlush, which + // atomically inserts the session into the map and drains the buffer. + var session = new CopilotSession(pendingSessionId, rpc); + var dispatched = new CopyOnWriteArrayList<>(); + session.on(dispatched::add); + + var flush = pendingState.registerAndFlush(pendingSessionId, session, sessions); + + // Replay buffered events into the session (simulates what createCloudSession + // does) + for (var buffered : flush.events()) { + session.dispatchEvent(buffered); + } + + // Complete parked waiters (none in this test) + for (var waiter : flush.waiters()) { + waiter.complete(session); + } + + // Release the guard + var staleWaiters = pendingState.decrementGuard(); + assertTrue(staleWaiters.isEmpty(), "No stale waiters expected"); + + // The buffered session.event should now have been replayed to the session + Thread.sleep(50); + assertEquals(1, dispatched.size(), "Buffered notification should have been replayed to the session"); + } + + @Test + void bufferRespectsSizeLimit() throws Exception { + pendingState.incrementGuard(); + String sid = "cloud-overflow-test"; + + // Send more than BUFFER_LIMIT events + int overLimit = PendingRoutingState.BUFFER_LIMIT + 10; + for (int i = 0; i < overLimit; i++) { + ObjectNode params = MAPPER.createObjectNode(); + params.put("sessionId", sid); + ObjectNode event = params.putObject("event"); + event.put("type", "assistantMessage"); + event.putObject("data").put("content", "msg-" + i); + invokeHandler("session.event", null, params); + } + + var session = new CopilotSession(sid, rpc); + var flush = pendingState.registerAndFlush(sid, session, sessions); + + // Should have been capped at BUFFER_LIMIT; oldest entries were dropped + assertEquals(PendingRoutingState.BUFFER_LIMIT, flush.events().size(), + "Buffer should be capped at BUFFER_LIMIT"); + + pendingState.decrementGuard(); + } + + // ========================================================================= + // Test 7: inbound RPC requests are parked until the session is registered + // ========================================================================= + + @Test + void parksInboundRequestsUntilRegistration() throws Exception { + String pendingSessionId = "cloud-session-xyz"; + + // Register a tool on the (not-yet-created) session by pre-creating it without + // registering in the sessions map yet. We'll use the pending state directly. + pendingState.incrementGuard(); + + // In a background thread, send a tool.call request for the pending session. + // The handler should park until the session is registered. + var toolCallFuture = CompletableFuture.runAsync(() -> { + ObjectNode params = MAPPER.createObjectNode(); + params.put("sessionId", pendingSessionId); + params.put("toolCallId", "tc-1"); + params.put("toolName", "say_hello"); + params.set("arguments", MAPPER.createObjectNode()); + invokeHandler("tool.call", "42", params); + }); + + // Brief pause to allow the handler thread to start and park + Thread.sleep(100); + + // Create and register the session with the requested tool + var session = new CopilotSession(pendingSessionId, rpc); + session.registerTools(java.util.List.of( + ToolDefinition.create("say_hello", "Greets the user", Map.of("type", "object", "properties", Map.of()), + inv -> CompletableFuture.completedFuture(ToolResultObject.success("hello!"))))); + + var flush = pendingState.registerAndFlush(pendingSessionId, session, sessions); + + // No buffered notifications in this test + assertTrue(flush.events().isEmpty(), "No buffered events expected"); + + // Complete any parked request waiters + for (var waiter : flush.waiters()) { + waiter.complete(session); + } + + pendingState.decrementGuard(); + + // Wait for the handler to finish (it was parked on the waiter) + toolCallFuture.get(5, TimeUnit.SECONDS); + + // The tool.call handler should have executed and sent a response back on the + // wire + JsonNode response = readResponse(); + assertNotNull(response, "Should have received a tool response"); + assertEquals(42, response.get("id").asInt(), "Response id should match request id"); + assertNotNull(response.get("result"), "Tool call should produce a result"); + } + + @Test + void parkedRequestFailsExceptionallyWhenGuardDroppedWithoutRegistration() throws Exception { + String pendingSessionId = "cloud-session-dropped"; + + pendingState.incrementGuard(); + + // Park a request in the background + var toolCallFuture = CompletableFuture.runAsync(() -> { + ObjectNode params = MAPPER.createObjectNode(); + params.put("sessionId", pendingSessionId); + params.put("toolCallId", "tc-2"); + params.put("toolName", "any_tool"); + params.set("arguments", MAPPER.createObjectNode()); + invokeHandler("tool.call", "99", params); + }); + + Thread.sleep(100); + + // Drop the guard WITHOUT registering the session → waiters should be completed + // exceptionally + var staleWaiters = pendingState.decrementGuard(); + // Complete them exceptionally to simulate what CopilotClient does on failure + for (var w : staleWaiters) { + w.completeExceptionally(new RuntimeException("Cloud session.create failed; request dropped")); + } + + // The handler should eventually receive the exceptional completion and send an + // error response + toolCallFuture.get(5, TimeUnit.SECONDS); + + JsonNode response = readResponse(); + assertNotNull(response, "Should have received an error response"); + assertEquals(99, response.get("id").asInt()); + assertNotNull(response.get("error"), "Response should be an error (not a result)"); + } +} diff --git a/java/src/test/java/com/github/copilot/sdk/RpcHandlerDispatcherTest.java b/java/src/test/java/com/github/copilot/sdk/RpcHandlerDispatcherTest.java index 7453a7b26..5c5bcbfd6 100644 --- a/java/src/test/java/com/github/copilot/sdk/RpcHandlerDispatcherTest.java +++ b/java/src/test/java/com/github/copilot/sdk/RpcHandlerDispatcherTest.java @@ -66,7 +66,7 @@ void setup() throws Exception { sessions = new ConcurrentHashMap<>(); lifecycleEvents = new CopyOnWriteArrayList<>(); - dispatcher = new RpcHandlerDispatcher(sessions, lifecycleEvents::add, null); + dispatcher = new RpcHandlerDispatcher(sessions, lifecycleEvents::add, null, null); dispatcher.registerHandlers(rpc); // Extract the registered handlers via reflection so we can invoke them directly From 5b3abb52b8a961e406c2c37e2ab1274cb817d473 Mon Sep 17 00:00:00 2001 From: Timothy Clem Date: Fri, 22 May 2026 20:39:53 -0700 Subject: [PATCH 2/2] Java SDK: emit JSON-RPC error on pending-buffer overflow + guard drop Carries forward the Rust SDK PR #1394 review feedback into the Java port: 1. Cap the per-session inbound-request parked-waiter list at 128 (BUFFER_LIMIT). When exceeded, evict the oldest waiter via completeExceptionally("pending session buffer overflow"). The RpcHandlerDispatcher thread blocked in waiter.get() wakes up, catches ExecutionException, and resolveSessionForRequest calls rpc.sendErrorResponse(-32603, ...) so the runtime isn't left waiting on a request id that will never get a reply. Mirrors Rust commit 491b4427 and TS commit c167bc3e on PR #1395. 2. decrementGuard now completes all stale waiters internally with a distinct message ("pending session routing ended before session was registered") instead of returning them for callers to complete with ad-hoc strings. A single canonical message lets debugging tell the overflow-eviction path from the create-failed path. Mirrors Rust commit e0ff254f and TS commit c167bc3e. 3. Fix isDone() fast path in resolveSessionForRequest: the existing catch-all "fall through" swallowed ExecutionException from an overflow-evicted waiter, sending a generic -32602 "Unknown session" error instead of -32603. Now explicitly catches ExecutionException in the isDone() branch and sends the same -32603 error as the blocking path. Adds two new unit tests in CloudSessionTest: - parkedRequestWaiterBuffer_overflow_evictsOldestWithOverflowMessage: parks 129 waiters, verifies oldest completes with "pending session buffer overflow", remaining 128 resolve normally after registration. - parkedRequestWaiter_guardDropMessage_isDistinctFromOverflowMessage: parks a request via the full handler path, drops the guard without registration, verifies the JSON-RPC error response contains "routing ended before session was registered" but not "buffer overflow". Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../com/github/copilot/sdk/CopilotClient.java | 27 +---- .../copilot/sdk/PendingRoutingState.java | 49 ++++++--- .../copilot/sdk/RpcHandlerDispatcher.java | 15 ++- .../github/copilot/sdk/CloudSessionTest.java | 102 ++++++++++++++++-- 4 files changed, 146 insertions(+), 47 deletions(-) diff --git a/java/src/main/java/com/github/copilot/sdk/CopilotClient.java b/java/src/main/java/com/github/copilot/sdk/CopilotClient.java index 15391d638..c7ccc8a03 100644 --- a/java/src/main/java/com/github/copilot/sdk/CopilotClient.java +++ b/java/src/main/java/com/github/copilot/sdk/CopilotClient.java @@ -583,9 +583,7 @@ public CompletableFuture createCloudSession(SessionConfig config if (returnedId == null || returnedId.isEmpty()) { // No id: release the guard and surface the error. Any runtime session // created on the other side may leak — we have nothing to destroy. - var staleWaiters = pendingRoutingState.decrementGuard(); - completeWaitersExceptionally(staleWaiters, - "Cloud session.create completed without registering this sessionId; request dropped"); + pendingRoutingState.decrementGuard(); LOG.warning("Cloud session.create response missing sessionId; runtime session may leak"); throw new RuntimeException( "Cloud session.create response did not include a sessionId; cannot register session."); @@ -619,28 +617,23 @@ public CompletableFuture createCloudSession(SessionConfig config waiter.complete(session); } } catch (Exception e) { - // Roll back: remove session from map, release guard with exceptional completion + // Roll back: remove session from map, release guard. sessions.remove(returnedId); - var staleWaiters = pendingRoutingState.decrementGuard(); - completeWaitersExceptionally(staleWaiters, - "Cloud session post-registration setup failed; request dropped"); + pendingRoutingState.decrementGuard(); LoggingHelpers.logTiming(LOG, Level.WARNING, e, "CopilotClient.createCloudSession post-registration setup failed. Elapsed={Elapsed}", totalNanos); throw e instanceof RuntimeException re ? re : new RuntimeException(e); } - var staleWaiters = pendingRoutingState.decrementGuard(); - completeWaitersExceptionally(staleWaiters, - "Cloud session.create completed without registering this sessionId; request dropped"); + pendingRoutingState.decrementGuard(); LoggingHelpers.logTiming(LOG, Level.FINE, "CopilotClient.createCloudSession complete. Elapsed={Elapsed}, SessionId=" + returnedId, totalNanos); return session; }).exceptionally(ex -> { - var staleWaiters = pendingRoutingState.decrementGuard(); - completeWaitersExceptionally(staleWaiters, "Cloud session.create failed; request dropped"); + pendingRoutingState.decrementGuard(); LoggingHelpers.logTiming(LOG, Level.WARNING, ex, "CopilotClient.createCloudSession failed. Elapsed={Elapsed}", totalNanos); throw ex instanceof RuntimeException re ? re : new RuntimeException(ex); @@ -648,16 +641,6 @@ public CompletableFuture createCloudSession(SessionConfig config }); } - private static void completeWaitersExceptionally(java.util.List> waiters, - String message) { - if (!waiters.isEmpty()) { - var ex = new RuntimeException(message); - for (var waiter : waiters) { - waiter.completeExceptionally(ex); - } - } - } - /** * Resumes an existing Copilot session. *

diff --git a/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java b/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java index a0b2e0852..7c0678e20 100644 --- a/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java +++ b/java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java @@ -56,23 +56,32 @@ synchronized void incrementGuard() { /** * Decrement the guard count. If the count reaches zero, clears all buffered - * events and completes all parked waiters exceptionally. - * - * @return list of waiters that should be completed exceptionally (caller must - * complete them outside this lock to avoid re-entrancy) + * events and completes all parked request waiters exceptionally with a + * canonical message that is distinct from the overflow-eviction path. */ - synchronized List> decrementGuard() { + synchronized void decrementGuard() { guardCount = Math.max(0, guardCount - 1); - if (guardCount == 0) { - pendingEvents.clear(); - var stale = new ArrayList>(); - for (var list : pendingWaiters.values()) { - stale.addAll(list); + if (guardCount != 0) { + return; + } + pendingEvents.clear(); + var stale = new ArrayList>(); + for (var list : pendingWaiters.values()) { + stale.addAll(list); + } + pendingWaiters.clear(); + if (!stale.isEmpty()) { + // Use a distinct phrasing from the overflow-eviction path so that + // debugging can tell the two failure modes apart. Matches the Rust + // SDK message (PR #1394 commit e0ff254f) and the TS SDK (commit + // c167bc3e). + LOG.warning("Pending session routing ended before session was registered; " + "completing " + stale.size() + + " parked request waiter(s) exceptionally"); + var ex = new RuntimeException("pending session routing ended before session was registered"); + for (var waiter : stale) { + waiter.completeExceptionally(ex); } - pendingWaiters.clear(); - return stale; } - return Collections.emptyList(); } /** @@ -137,7 +146,19 @@ synchronized CompletableFuture tryParkRequest(String sessionId, return null; // no pending; caller sends error } var future = new CompletableFuture(); - pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(future); + var list = pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>()); + if (list.size() >= BUFFER_LIMIT) { + // Cap parked waiters per session. When exceeded, evict the oldest + // and complete it with a distinct overflow message so the runtime + // gets an error response rather than hanging on a reply that will + // never arrive. Matches Rust PR #1394 (commit 491b4427) and TS + // (commit c167bc3e). + var oldest = list.remove(0); + LOG.warning("Pending session request waiter buffer full for session " + sessionId + " (limit=" + + BUFFER_LIMIT + "); evicting oldest request"); + oldest.completeExceptionally(new RuntimeException("pending session buffer overflow")); + } + list.add(future); return future; } diff --git a/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java b/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java index 6202860a9..1b164aa28 100644 --- a/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java +++ b/java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java @@ -570,9 +570,22 @@ private CopilotSession resolveSessionForRequest(String sessionId, long requestId CompletableFuture waiter = pendingState.tryParkRequest(sessionId, sessions); if (waiter != null) { if (waiter.isDone()) { - // Resolved synchronously (session was already registered under the lock) + // Session was already registered under tryParkRequest's lock — + // complete synchronously. If the waiter was completed exceptionally + // (e.g. overflow eviction just beat us to isDone()), send the + // same -32603 error as the blocking path so the runtime isn't + // left waiting for a reply that will never arrive. try { return waiter.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + try { + rpc.sendErrorResponse(requestId, -32603, "Session " + sessionId + " not registered: " + + (cause != null ? cause.getMessage() : e.getMessage())); + } catch (IOException ioe) { + LOG.log(Level.SEVERE, "Failed to send synchronous registration-failed error", ioe); + } + return null; } catch (Exception e) { // fall through to send error } diff --git a/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java b/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java index 19eb9ed2d..2177655a5 100644 --- a/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java +++ b/java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java @@ -306,8 +306,7 @@ void bufferEarlySessionEventNotifications() throws Exception { } // Release the guard - var staleWaiters = pendingState.decrementGuard(); - assertTrue(staleWaiters.isEmpty(), "No stale waiters expected"); + pendingState.decrementGuard(); // The buffered session.event should now have been replayed to the session Thread.sleep(50); @@ -413,15 +412,11 @@ void parkedRequestFailsExceptionallyWhenGuardDroppedWithoutRegistration() throws Thread.sleep(100); - // Drop the guard WITHOUT registering the session → waiters should be completed - // exceptionally - var staleWaiters = pendingState.decrementGuard(); - // Complete them exceptionally to simulate what CopilotClient does on failure - for (var w : staleWaiters) { - w.completeExceptionally(new RuntimeException("Cloud session.create failed; request dropped")); - } + // Drop the guard WITHOUT registering the session. decrementGuard now + // completes parked waiters internally with the canonical message. + pendingState.decrementGuard(); - // The handler should eventually receive the exceptional completion and send an + // The handler should receive the exceptional completion and send an // error response toolCallFuture.get(5, TimeUnit.SECONDS); @@ -429,5 +424,92 @@ void parkedRequestFailsExceptionallyWhenGuardDroppedWithoutRegistration() throws assertNotNull(response, "Should have received an error response"); assertEquals(99, response.get("id").asInt()); assertNotNull(response.get("error"), "Response should be an error (not a result)"); + String errorMessage = response.get("error").get("message").asText(); + assertTrue(errorMessage.contains("routing ended before session was registered"), + "Error message should contain the canonical guard-drop phrase; got: " + errorMessage); + } + + // ========================================================================= + // Test 8: overflow path — oldest parked waiter gets the overflow message + // ========================================================================= + + @Test + void parkedRequestWaiterBuffer_overflow_evictsOldestWithOverflowMessage() throws Exception { + pendingState.incrementGuard(); + String sid = "cloud-overflow-requests"; + + // Park BUFFER_LIMIT + 1 waiters via tryParkRequest. The 129th call must + // evict the very first waiter and complete it with the overflow message. + var waiters = new java.util.ArrayList>(); + for (int i = 0; i < PendingRoutingState.BUFFER_LIMIT + 1; i++) { + waiters.add(pendingState.tryParkRequest(sid, sessions)); + } + + // The first waiter (oldest) must have been evicted with the overflow message. + CompletableFuture oldest = waiters.get(0); + assertTrue(oldest.isCompletedExceptionally(), "Oldest waiter should be completed exceptionally on overflow"); + ExecutionException ex = assertThrows(ExecutionException.class, oldest::get); + assertEquals("pending session buffer overflow", ex.getCause().getMessage()); + + // The remaining BUFFER_LIMIT waiters should still be pending. + for (int i = 1; i <= PendingRoutingState.BUFFER_LIMIT; i++) { + assertFalse(waiters.get(i).isDone(), "Waiter " + i + " should still be pending after overflow eviction"); + } + + // Registering the session resolves the remaining 128 waiters normally. + var session = new CopilotSession(sid, rpc); + var flush = pendingState.registerAndFlush(sid, session, sessions); + assertEquals(PendingRoutingState.BUFFER_LIMIT, flush.waiters().size(), + "registerAndFlush should return all non-evicted waiters"); + for (var waiter : flush.waiters()) { + waiter.complete(session); + } + for (int i = 1; i <= PendingRoutingState.BUFFER_LIMIT; i++) { + assertFalse(waiters.get(i).isCompletedExceptionally(), + "Waiter " + i + " should complete normally, not exceptionally"); + assertEquals(session, waiters.get(i).get(1, TimeUnit.SECONDS)); + } + + pendingState.decrementGuard(); + } + + // ========================================================================= + // Test 9: guard-drop message is distinct from overflow message + // ========================================================================= + + @Test + void parkedRequestWaiter_guardDropMessage_isDistinctFromOverflowMessage() throws Exception { + String pendingSessionId = "cloud-session-distinct-msg"; + + pendingState.incrementGuard(); + + // Park a request in the background via the full handler path so the + // response travels over the socket — this mirrors the real runtime flow. + var toolCallFuture = CompletableFuture.runAsync(() -> { + ObjectNode params = MAPPER.createObjectNode(); + params.put("sessionId", pendingSessionId); + params.put("toolCallId", "tc-distinct"); + params.put("toolName", "noop"); + params.set("arguments", MAPPER.createObjectNode()); + invokeHandler("tool.call", "77", params); + }); + + Thread.sleep(100); + + // Drop the guard without registration. decrementGuard completes waiters + // internally with the canonical guard-drop message. + pendingState.decrementGuard(); + + toolCallFuture.get(5, TimeUnit.SECONDS); + + JsonNode response = readResponse(); + assertEquals(77, response.get("id").asInt()); + assertNotNull(response.get("error"), "Should be an error response"); + String msg = response.get("error").get("message").asText(); + + // Must contain the guard-drop phrase — NOT the overflow phrase. + assertTrue(msg.contains("routing ended before session was registered"), + "Guard-drop error must use the routing-ended phrase; got: " + msg); + assertFalse(msg.contains("buffer overflow"), "Guard-drop error must NOT use the overflow phrase; got: " + msg); } }