Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 155 additions & 1 deletion java/src/main/java/com/github/copilot/sdk/CopilotClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public final class CopilotClient implements AutoCloseable {
private final CliServerManager serverManager;
private final LifecycleEventManager lifecycleManager = new LifecycleEventManager();
private final Map<String, CopilotSession> sessions = new ConcurrentHashMap<>();
private final PendingRoutingState pendingRoutingState = new PendingRoutingState();
private volatile CompletableFuture<Connection> connectionFuture;
private volatile boolean disposed = false;
private final String optionsHost;
Expand Down Expand Up @@ -210,7 +211,7 @@ private Connection startCoreBody() {

// Register handlers for server-to-client calls
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch,
options.getExecutor());
options.getExecutor(), pendingRoutingState);
dispatcher.registerHandlers(rpc);

// Verify protocol version
Expand Down Expand Up @@ -426,6 +427,10 @@ public CompletableFuture<CopilotSession> createSession(SessionConfig config) {
+ "For example, to allow all permissions, use: "
+ "new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)"));
}
if (config.getCloud() != null) {
return CompletableFuture.failedFuture(new IllegalArgumentException(
"CopilotClient.createSession does not support cloud sessions; use createCloudSession instead."));
}
return ensureConnected().thenCompose(connection -> {
long totalNanos = System.nanoTime();
// Pre-generate session ID so the session can be registered before the RPC call,
Expand Down Expand Up @@ -487,6 +492,155 @@ public CompletableFuture<CopilotSession> createSession(SessionConfig config) {
});
}

/**
* Creates a Mission Control–backed cloud session.
*
* <p>
* The runtime owns the session ID for cloud sessions. Do <strong>not</strong>
* set {@link SessionConfig#setSessionId(String) sessionId} or
* {@link SessionConfig#setProvider(com.github.copilot.sdk.json.ProviderConfig)
* provider} on the config; the SDK rejects both with
* {@link IllegalArgumentException}. The config must have
* {@link SessionConfig#setCloud(com.github.copilot.sdk.json.CloudSessionOptions)
* cloud} set;
* {@link SessionConfig#setOnPermissionRequest(com.github.copilot.sdk.json.PermissionHandler)
* onPermissionRequest} is required.
*
* <p>
* The SDK omits {@code sessionId} from the {@code session.create} wire request
* and registers the returned session under the id the runtime assigns. Any
* {@code session.event} notifications or inbound JSON-RPC requests that arrive
* between sending {@code session.create} and receiving its response are
* buffered (bounded, drop-oldest, up to
* {@value PendingRoutingState#BUFFER_LIMIT} per session id) and replayed once
* the session is registered.
*
* <p>
* Example:
*
* <pre>{@code
* var session = client.createCloudSession(new SessionConfig()
* .setCloud(new CloudSessionOptions()
* .setRepository(new CloudSessionRepository().setOwner("github").setName("copilot-sdk")))
* .setOnPermissionRequest(PermissionHandler.APPROVE_ALL)).get();
* }</pre>
*
* @param config
* configuration for the cloud session; must have {@code cloud} set
* and an {@code onPermissionRequest} handler; must not have
* {@code sessionId} or {@code provider} set
* @return a future that resolves with the created {@link CopilotSession}
* @throws IllegalArgumentException
* if validation fails (see above)
* @see SessionConfig#setCloud(com.github.copilot.sdk.json.CloudSessionOptions)
* @see com.github.copilot.sdk.json.PermissionHandler#APPROVE_ALL
* @since 1.6.0
*/
public CompletableFuture<CopilotSession> createCloudSession(SessionConfig config) {
if (config == null || config.getOnPermissionRequest() == null) {
return CompletableFuture.failedFuture(
new IllegalArgumentException("An onPermissionRequest handler is required when creating a session. "
+ "For example, to allow all permissions, use: "
+ "new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)"));
}
if (config.getCloud() == null) {
return CompletableFuture.failedFuture(
new IllegalArgumentException("CopilotClient.createCloudSession requires config.cloud to be set."));
}
if (config.getSessionId() != null && !config.getSessionId().isEmpty()) {
return CompletableFuture.failedFuture(new IllegalArgumentException(
"CopilotClient.createCloudSession does not support a caller-provided sessionId; "
+ "the runtime assigns one."));
}
if (config.getProvider() != null) {
return CompletableFuture.failedFuture(
new IllegalArgumentException("CopilotClient.createCloudSession does not support config.provider; "
+ "cloud sessions use the runtime's provider."));
}

return ensureConnected().thenCompose(connection -> {
long totalNanos = System.nanoTime();

// Enter pending-routing mode before sending session.create so that any
// session.event notifications or inbound RPC requests that arrive during
// the in-flight RPC are buffered rather than dropped.
pendingRoutingState.incrementGuard();

var request = SessionRequestBuilder.buildCloudCreateRequest(config);

// Extract transform callbacks from the system message config.
var extracted = SessionRequestBuilder.extractTransformCallbacks(config.getSystemMessage());
if (extracted.wireSystemMessage() != config.getSystemMessage()) {
request.setSystemMessage(extracted.wireSystemMessage());
}

long rpcNanos = System.nanoTime();
return connection.rpc.invoke("session.create", request, CreateSessionResponse.class).thenApply(response -> {
LoggingHelpers.logTiming(LOG, Level.FINE,
"CopilotClient.createCloudSession session.create completed. Elapsed={Elapsed}", rpcNanos);

String returnedId = response.sessionId();
if (returnedId == null || returnedId.isEmpty()) {
// No id: release the guard and surface the error. Any runtime session
// created on the other side may leak — we have nothing to destroy.
pendingRoutingState.decrementGuard();
LOG.warning("Cloud session.create response missing sessionId; runtime session may leak");
throw new RuntimeException(
"Cloud session.create response did not include a sessionId; cannot register session.");
}

var session = new CopilotSession(returnedId, connection.rpc);
if (options.getExecutor() != null) {
session.setExecutor(options.getExecutor());
}
SessionRequestBuilder.configureSession(session, config);
if (extracted.transformCallbacks() != null) {
session.registerTransformCallbacks(extracted.transformCallbacks());
}

try {
// Atomically register the session in the sessions map and drain any
// buffered events/parked waiters. The sessions.put happens inside the
// PendingRoutingState lock so concurrent tryBufferNotification /
// tryParkRequest calls see the session as registered.
var flush = pendingRoutingState.registerAndFlush(returnedId, session, sessions);

session.setWorkspacePath(response.workspacePath());
session.setCapabilities(response.capabilities());

// Replay buffered session.event notifications
for (var event : flush.events()) {
session.dispatchEvent(event);
}
// Complete parked request waiters
for (var waiter : flush.waiters()) {
waiter.complete(session);
}
} catch (Exception e) {
// Roll back: remove session from map, release guard.
sessions.remove(returnedId);
pendingRoutingState.decrementGuard();
LoggingHelpers.logTiming(LOG, Level.WARNING, e,
"CopilotClient.createCloudSession post-registration setup failed. Elapsed={Elapsed}",
totalNanos);
throw e instanceof RuntimeException re ? re : new RuntimeException(e);
}

pendingRoutingState.decrementGuard();

LoggingHelpers.logTiming(LOG, Level.FINE,
"CopilotClient.createCloudSession complete. Elapsed={Elapsed}, SessionId=" + returnedId,
totalNanos);
return session;
}).exceptionally(ex -> {
pendingRoutingState.decrementGuard();
LoggingHelpers.logTiming(LOG, Level.WARNING, ex,
"CopilotClient.createCloudSession failed. Elapsed={Elapsed}", totalNanos);
throw ex instanceof RuntimeException re ? re : new RuntimeException(ex);
});
});
}

/**
* Resumes an existing Copilot session.
* <p>
Expand Down
204 changes: 204 additions & 0 deletions java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;

import com.github.copilot.sdk.generated.SessionEvent;

/**
* Thread-safe state for pending-routing mode used by
* {@link CopilotClient#createCloudSession}.
*
* <p>
* While one or more cloud {@code session.create} calls are in flight (guard
* count {@code > 0}), notifications and inbound RPC requests addressed to
* session ids that are not yet registered are buffered here rather than
* dropped. Once {@link CopilotClient#createCloudSession} receives the
* runtime-assigned session id, it calls {@link #registerAndFlush} to atomically
* insert the session into the sessions map, drain any buffered events into it,
* and complete any parked request waiters.
*
* <p>
* All mutation methods synchronize on {@code this}. The sessions-map put inside
* {@link #registerAndFlush} is performed while holding the lock so that the
* {@link #tryBufferNotification} / {@link #tryParkRequest} check-then-act is
* free of TOCTOU races.
*/
final class PendingRoutingState {

static final int BUFFER_LIMIT = 128;

private static final Logger LOG = Logger.getLogger(PendingRoutingState.class.getName());

private int guardCount = 0;
/** Buffered session.event notifications keyed by session id. */
private final Map<String, ArrayDeque<SessionEvent>> pendingEvents = new HashMap<>();
/**
* Parked CompletableFutures for inbound RPC requests waiting for a session to
* be registered.
*/
private final Map<String, List<CompletableFuture<CopilotSession>>> pendingWaiters = new HashMap<>();

/** Increment the guard count. Must be matched by {@link #decrementGuard}. */
synchronized void incrementGuard() {
guardCount++;
}

/**
* Decrement the guard count. If the count reaches zero, clears all buffered
* events and completes all parked request waiters exceptionally with a
* canonical message that is distinct from the overflow-eviction path.
*/
synchronized void decrementGuard() {
guardCount = Math.max(0, guardCount - 1);
if (guardCount != 0) {
return;
}
pendingEvents.clear();
var stale = new ArrayList<CompletableFuture<CopilotSession>>();
for (var list : pendingWaiters.values()) {
stale.addAll(list);
}
pendingWaiters.clear();
if (!stale.isEmpty()) {
// Use a distinct phrasing from the overflow-eviction path so that
// debugging can tell the two failure modes apart. Matches the Rust
// SDK message (PR #1394 commit e0ff254f) and the TS SDK (commit
// c167bc3e).
LOG.warning("Pending session routing ended before session was registered; " + "completing " + stale.size()
+ " parked request waiter(s) exceptionally");
var ex = new RuntimeException("pending session routing ended before session was registered");
for (var waiter : stale) {
waiter.completeExceptionally(ex);
}
}
}

/**
* Attempt to buffer a {@code session.event} notification for a pending session.
*
* <p>
* The {@code sessions} map is checked inside this synchronized method so that
* the "session not found → buffer" decision is atomic with
* {@link #registerAndFlush}'s "put in map → flush buffer" operation.
*
* @param sessionId
* the session id from the notification
* @param event
* the parsed event to buffer
* @param sessions
* the live sessions map (checked under lock)
* @return {@code true} if the event was buffered; {@code false} if the session
* is already registered (caller should dispatch directly) or pending
* routing is inactive (caller should drop)
*/
synchronized boolean tryBufferNotification(String sessionId, SessionEvent event,
Map<String, CopilotSession> sessions) {
if (sessions.containsKey(sessionId)) {
return false; // session found; caller dispatches directly
}
if (guardCount == 0) {
return false; // no pending routing; drop
}
var queue = pendingEvents.computeIfAbsent(sessionId, k -> new ArrayDeque<>());
if (queue.size() >= BUFFER_LIMIT) {
queue.pollFirst();
LOG.warning("Pending session notification buffer full for session " + sessionId + "; dropping oldest");
}
queue.addLast(event);
return true;
}

/**
* Attempt to park an inbound RPC request until the session is registered.
*
* <p>
* Like {@link #tryBufferNotification}, the {@code sessions} map is checked
* under the lock to avoid TOCTOU races with {@link #registerAndFlush}.
*
* @param sessionId
* the session id from the request params
* @param sessions
* the live sessions map (checked under lock)
* @return a future that will be resolved with the {@link CopilotSession} when
* registered (or completed exceptionally when the guard is dropped), or
* {@code null} if the session is already registered (callers should use
* it directly) or if pending routing is inactive (caller should send
* error)
*/
synchronized CompletableFuture<CopilotSession> tryParkRequest(String sessionId,
Map<String, CopilotSession> sessions) {
CopilotSession existing = sessions.get(sessionId);
if (existing != null) {
return CompletableFuture.completedFuture(existing);
}
if (guardCount == 0) {
return null; // no pending; caller sends error
}
var future = new CompletableFuture<CopilotSession>();
var list = pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>());
if (list.size() >= BUFFER_LIMIT) {
// Cap parked waiters per session. When exceeded, evict the oldest
// and complete it with a distinct overflow message so the runtime
// gets an error response rather than hanging on a reply that will
// never arrive. Matches Rust PR #1394 (commit 491b4427) and TS
// (commit c167bc3e).
var oldest = list.remove(0);
LOG.warning("Pending session request waiter buffer full for session " + sessionId + " (limit="
+ BUFFER_LIMIT + "); evicting oldest request");
oldest.completeExceptionally(new RuntimeException("pending session buffer overflow"));
}
list.add(future);
return future;
}

/**
* Result of {@link #registerAndFlush}: buffered events to dispatch and parked
* waiters to complete.
*/
record FlushResult(List<SessionEvent> events, List<CompletableFuture<CopilotSession>> waiters) {
}

/**
* Atomically register a session in the sessions map and drain any buffered
* events and parked waiters for that session.
*
* <p>
* The {@code sessions.put} is performed inside the lock so that concurrent
* {@link #tryBufferNotification} / {@link #tryParkRequest} callers that haven't
* yet acquired the lock will see the session as registered.
*
* @param sessionId
* the session id to register
* @param session
* the session object
* @param sessions
* the live sessions map; the put is performed under lock
* @return buffered events and parked waiters to dispatch/complete outside the
* lock
*/
synchronized FlushResult registerAndFlush(String sessionId, CopilotSession session,
Map<String, CopilotSession> sessions) {
sessions.put(sessionId, session);

var queue = pendingEvents.remove(sessionId);
var events = queue != null ? new ArrayList<>(queue) : Collections.<SessionEvent>emptyList();

var waiters = pendingWaiters.remove(sessionId);
var futures = waiters != null
? new ArrayList<>(waiters)
: Collections.<CompletableFuture<CopilotSession>>emptyList();

return new FlushResult(events, futures);
}
}
Loading