From 6d5510a85b6c37a31a70f659ada95c350b0f44ac Mon Sep 17 00:00:00 2001 From: Timothy Clem Date: Fri, 22 May 2026 19:28:13 -0700 Subject: [PATCH 1/2] python: add create_cloud_session Port create_cloud_session to the Python SDK, following the Rust (PR #1394) and TypeScript (PR #1395) reference implementations. Key additions: - CopilotClient.create_cloud_session(): creates a Mission Control-backed cloud session. The runtime owns the session ID (omitting sessionId from the wire payload); the caller must not set session_id or provider. - Pending-routing infrastructure: session.event notifications and inbound JSON-RPC requests that arrive before the session is fully registered are buffered (up to _PENDING_SESSION_BUFFER_LIMIT = 128) and replayed once the session is ready. _PendingSessionRoutingGuard + _begin_pending_session_routing + _flush_pending_for_session + _resolve_session implement this. - CopilotSession.remote_url property: exposes the remoteUrl returned in the session.create response for cloud sessions. - create_session() now rejects cloud= being set; callers must use create_cloud_session() instead. - test_cloud_session.py: 7 new unit tests covering wire shape, validation, early notification buffering, and parked inbound request handling. - Updated test_client.py: test_create_session_forwards_cloud_options updated to test create_cloud_session instead. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/README.md | 34 +++ python/copilot/client.py | 497 +++++++++++++++++++++++++++++++++-- python/copilot/session.py | 11 + python/test_client.py | 10 +- python/test_cloud_session.py | 279 ++++++++++++++++++++ 5 files changed, 801 insertions(+), 30 deletions(-) create mode 100644 python/test_cloud_session.py diff --git a/python/README.md b/python/README.md index 3a504f966..44e409bfe 100644 --- a/python/README.md +++ b/python/README.md @@ -414,6 +414,40 @@ When `streaming=True`: Note: `assistant.message` and `assistant.reasoning` (final events) are always sent regardless of streaming setting. +## Cloud Sessions + +Use `create_cloud_session()` to create a Mission Control–backed cloud session. The +runtime owns the session ID for cloud sessions, so omit `session_id` and `provider` +(the SDK raises `ValueError` if either is set). + +```python +from copilot import CopilotClient, RuntimeConnection +from copilot.client import CloudSessionOptions, CloudSessionRepository + +client = CopilotClient(connection=RuntimeConnection.for_stdio(path="/path/to/cli")) +await client.start() + +session = await client.create_cloud_session( + cloud=CloudSessionOptions( + repository=CloudSessionRepository( + owner="my-org", + name="my-repo", + branch="main", + ) + ), + on_event=lambda event: print(event), +) +print(session.remote_url) # URL of the remote cloud session +``` + +`create_cloud_session()` accepts the same keyword arguments as `create_session()` +(tools, streaming, model, hooks, etc.) with two restrictions: +- `session_id` must not be set (the runtime assigns the ID). +- `provider` must not be set (cloud sessions always use the Mission Control provider). + +Early `session.event` notifications and inbound RPC requests that arrive before the +session is fully registered are buffered and replayed once the session is ready. + ## Infinite Sessions By default, sessions use **infinite sessions** which automatically manage context window limits through background compaction and persist state to a workspace directory. diff --git a/python/copilot/client.py b/python/copilot/client.py index a52b8711f..02a04cc18 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -996,6 +996,55 @@ def _extract_transform_callbacks( return wire_payload, callbacks +_PENDING_SESSION_BUFFER_LIMIT = 128 +"""Upper bound on buffered notifications/requests per pending session id. + +Holds traffic that arrives between ``session.create`` being sent and the +SDK learning the runtime-assigned session id from the response (cloud path). +Drop-oldest behaviour is acceptable: cloud handshakes are short and 128 +entries is well above realistic init/replay bursts. +""" + + +class _PendingSessionRoutingGuard: + """RAII guard that keeps pending-routing mode active for a cloud session.create. + + While alive, notifications and inbound requests addressed to session ids + that are not yet registered are buffered instead of dropped, so events + the runtime emits between ``session.create`` and its response are not + lost. Dispose exactly once — either after successful registration (to + replay the buffered messages) or on any error path (to reject parked + request futures so callers don't hang). + """ + + __slots__ = ("_client", "_disposed") + + def __init__(self, client: CopilotClient) -> None: + self._client = client + self._disposed = False + + def dispose(self) -> None: + if self._disposed: + return + self._disposed = True + waiters_to_reject: list[asyncio.Future] = [] + with self._client._sessions_lock: + self._client._pending_routing_count -= 1 + if self._client._pending_routing_count == 0: + self._client._pending_session_events.clear() + for session_waiters in self._client._pending_session_waiters.values(): + waiters_to_reject.extend(session_waiters) + self._client._pending_session_waiters.clear() + for future in waiters_to_reject: + if not future.done(): + future.set_exception( + ValueError( + "Cloud session.create completed without registering this session id; " + "inbound request dropped" + ) + ) + + class CopilotClient: """ Main client for interacting with the Copilot CLI. @@ -1181,6 +1230,10 @@ def __init__( self._state: _ConnectionState = "disconnected" self._sessions: dict[str, CopilotSession] = {} self._sessions_lock = threading.Lock() + # Pending-routing state for create_cloud_session: guarded by _sessions_lock. + self._pending_routing_count: int = 0 + self._pending_session_events: dict[str, list[SessionEvent]] = {} + self._pending_session_waiters: dict[str, list[asyncio.Future[CopilotSession]]] = {} self._models_cache: list[ModelInfo] | None = None self._models_cache_lock = asyncio.Lock() self._lifecycle_handlers: list[SessionLifecycleHandler] = [] @@ -1629,6 +1682,11 @@ async def create_session( ... streaming=True, ... ) """ + if cloud is not None: + raise ValueError( + "CopilotClient.create_session does not support cloud sessions; " + "use create_cloud_session instead." + ) if on_permission_request is not None and not callable(on_permission_request): raise ValueError("on_permission_request must be callable when provided.") if not self._client: @@ -1880,6 +1938,392 @@ async def create_session( ) return session + async def create_cloud_session( + self, + *, + cloud: CloudSessionOptions | None = None, + session_id: str | None = None, + provider: ProviderConfig | None = None, + on_permission_request: _PermissionHandlerFn | None = None, + model: str | None = None, + client_name: str | None = None, + reasoning_effort: ReasoningEffort | None = None, + tools: list[Tool] | None = None, + system_message: SystemMessageConfig | None = None, + available_tools: list[str] | None = None, + excluded_tools: list[str] | None = None, + on_user_input_request: UserInputHandler | None = None, + hooks: SessionHooks | None = None, + working_directory: str | None = None, + enable_session_telemetry: bool | None = None, + model_capabilities: ModelCapabilitiesOverride | None = None, + streaming: bool | None = None, + include_sub_agent_streaming_events: bool | None = None, + mcp_servers: dict[str, MCPServerConfig] | None = None, + custom_agents: list[CustomAgentConfig] | None = None, + default_agent: DefaultAgentConfig | dict[str, Any] | None = None, + agent: str | None = None, + config_dir: str | None = None, + enable_config_discovery: bool | None = None, + skill_directories: list[str] | None = None, + instruction_directories: list[str] | None = None, + disabled_skills: list[str] | None = None, + infinite_sessions: InfiniteSessionConfig | None = None, + on_event: Callable[[SessionEvent], None] | None = None, + commands: list[CommandDefinition] | None = None, + on_elicitation_request: ElicitationHandler | None = None, + on_exit_plan_mode_request: ExitPlanModeHandler | None = None, + on_auto_mode_switch_request: AutoModeSwitchHandler | None = None, + create_session_fs_handler: CreateSessionFsHandler | None = None, + github_token: str | None = None, + remote_session: RemoteSessionMode | None = None, + ) -> CopilotSession: + """ + Create a Mission Control–backed cloud session. + + The runtime owns the session ID for cloud sessions: do **not** set + ``session_id`` or ``provider`` on the call (the SDK rejects both with + :class:`ValueError`). The SDK omits ``sessionId`` from the + ``session.create`` wire payload and registers the resulting session + under the id that the runtime returns. + + Any ``session.event`` notifications or inbound JSON-RPC requests + (``userInput.request``, ``exitPlanMode.request``, etc.) that arrive + between sending ``session.create`` and receiving its response are + buffered (bounded, drop-oldest, up to + ``_PENDING_SESSION_BUFFER_LIMIT`` per id) and replayed once the + returned session id is registered, so early events are not lost. + + **Known limitation:** inbound ``sessionFs.*`` requests (the generated + client-session API handlers) are not pending-buffered. In practice the + runtime does not initiate ``sessionFs.*`` calls before the + ``session.create`` response, so this is theoretical. + + Args: + cloud: Required. Cloud session options (repository, branch, etc.). + session_id: Must be ``None``; the runtime assigns the id. + provider: Must be ``None``; cloud sessions use the runtime's provider. + on_permission_request: Handler for permission requests. + model: Model to use. + client_name: Client name for identification. + reasoning_effort: Reasoning effort level. + tools: Custom tools to register. + system_message: System message configuration. + available_tools: Allowlist of tools. + excluded_tools: Tools to disable. + on_user_input_request: Handler for user input requests. + hooks: Lifecycle hooks. + working_directory: Working directory. + enable_session_telemetry: Enable/disable session telemetry. + model_capabilities: Model capabilities override. + streaming: Enable streaming responses. + include_sub_agent_streaming_events: Include sub-agent streaming events. + mcp_servers: MCP server configurations. + custom_agents: Custom agent configurations. + default_agent: Default agent configuration. + agent: Agent to use. + config_dir: Configuration directory override. + enable_config_discovery: Auto-discover MCP/skill config from cwd. + skill_directories: Directories to search for skills. + instruction_directories: Additional instruction file directories. + disabled_skills: Skills to disable. + infinite_sessions: Infinite session configuration. + on_event: Callback for session events. + commands: Commands to register. + on_elicitation_request: Handler for elicitation requests. + on_exit_plan_mode_request: Handler for exit-plan-mode requests. + on_auto_mode_switch_request: Handler for auto-mode-switch requests. + create_session_fs_handler: Session filesystem handler factory. + github_token: Per-session GitHub token. + remote_session: Remote session mode. + + Returns: + A :class:`CopilotSession` for the cloud session, with its + ``session_id`` set to the runtime-assigned id. + + Raises: + ValueError: If ``cloud`` is ``None``, ``session_id`` is set, or + ``provider`` is set. + + Example: + >>> session = await client.create_cloud_session( + ... cloud=CloudSessionOptions( + ... repository=CloudSessionRepository( + ... owner="github", name="copilot-sdk", branch="main" + ... ) + ... ), + ... ) + >>> print(session.session_id) # runtime-assigned id + """ + if cloud is None: + raise ValueError( + "create_cloud_session requires cloud to be set; " + "use CloudSessionOptions to configure the repository." + ) + if session_id is not None: + raise ValueError( + "create_cloud_session does not accept session_id; the runtime assigns one." + ) + if provider is not None: + raise ValueError( + "create_cloud_session does not accept provider; " + "cloud sessions use the runtime's provider." + ) + + if not self._client: + await self.start() + + tool_defs = [] + if tools: + for tool in tools: + definition: dict[str, Any] = { + "name": tool.name, + "description": tool.description, + } + if tool.parameters: + definition["parameters"] = tool.parameters + if tool.overrides_built_in_tool: + definition["overridesBuiltInTool"] = True + if tool.skip_permission: + definition["skipPermission"] = True + tool_defs.append(definition) + + payload: dict[str, Any] = {} + if model: + payload["model"] = model + if client_name: + payload["clientName"] = client_name + if reasoning_effort: + payload["reasoningEffort"] = reasoning_effort + if tool_defs: + payload["tools"] = tool_defs + + wire_system_message, transform_callbacks = _extract_transform_callbacks(system_message) + if wire_system_message: + payload["systemMessage"] = wire_system_message + + if available_tools is not None: + payload["availableTools"] = available_tools + if excluded_tools is not None: + payload["excludedTools"] = excluded_tools + + payload["requestPermission"] = bool(on_permission_request) + if on_user_input_request: + payload["requestUserInput"] = True + payload["requestElicitation"] = bool(on_elicitation_request) + payload["requestExitPlanMode"] = bool(on_exit_plan_mode_request) + payload["requestAutoModeSwitch"] = bool(on_auto_mode_switch_request) + + if commands: + payload["commands"] = [ + {"name": cmd.name, "description": cmd.description} for cmd in commands + ] + if hooks and any(hooks.values()): + payload["hooks"] = True + if github_token is not None: + payload["gitHubToken"] = github_token + if remote_session is not None: + payload["remoteSession"] = remote_session.value + + # sessionId intentionally omitted: the runtime assigns the id for cloud sessions. + payload["cloud"] = _cloud_session_options_to_dict(cloud) + + if working_directory: + payload["workingDirectory"] = working_directory + if streaming is not None: + payload["streaming"] = streaming + payload["includeSubAgentStreamingEvents"] = ( + include_sub_agent_streaming_events + if include_sub_agent_streaming_events is not None + else True + ) + if enable_session_telemetry is not None: + payload["enableSessionTelemetry"] = enable_session_telemetry + if model_capabilities: + payload["modelCapabilities"] = _capabilities_to_dict(model_capabilities) + if mcp_servers: + payload["mcpServers"] = _mcp_servers_to_wire(mcp_servers) + payload["envValueMode"] = "direct" + if custom_agents: + payload["customAgents"] = [ + self._convert_custom_agent_to_wire_format(a) for a in custom_agents + ] + if default_agent: + payload["defaultAgent"] = self._convert_default_agent_to_wire_format(default_agent) + if agent: + payload["agent"] = agent + if config_dir: + payload["configDir"] = config_dir + if enable_config_discovery is not None: + payload["enableConfigDiscovery"] = enable_config_discovery + if skill_directories: + payload["skillDirectories"] = skill_directories + if instruction_directories is not None: + payload["instructionDirectories"] = instruction_directories + if disabled_skills: + payload["disabledSkills"] = disabled_skills + if infinite_sessions: + wire_config: dict[str, Any] = {} + if "enabled" in infinite_sessions: + wire_config["enabled"] = infinite_sessions["enabled"] + if "background_compaction_threshold" in infinite_sessions: + wire_config["backgroundCompactionThreshold"] = infinite_sessions[ + "background_compaction_threshold" + ] + if "buffer_exhaustion_threshold" in infinite_sessions: + wire_config["bufferExhaustionThreshold"] = infinite_sessions[ + "buffer_exhaustion_threshold" + ] + payload["infiniteSessions"] = wire_config + + if not self._client: + raise RuntimeError("Client not connected") + + trace_ctx = get_trace_context() + payload.update(trace_ctx) + + total_start = time.perf_counter() + guard = self._begin_pending_session_routing() + + try: + rpc_start = time.perf_counter() + response = await self._client.request("session.create", payload) + log_timing( + logger, + logging.DEBUG, + "CopilotClient.create_cloud_session session creation request completed", + rpc_start, + ) + except BaseException: + guard.dispose() + raise + + returned_session_id = response.get("sessionId") + if not isinstance(returned_session_id, str) or not returned_session_id: + logger.warning( + "Cloud session.create response missing sessionId; runtime session may leak" + ) + guard.dispose() + raise ValueError( + "Cloud session.create response did not include a sessionId; " + "cannot register session." + ) + + session = CopilotSession(returned_session_id, self._client, workspace_path=None) + session._register_tools(tools) + session._register_commands(commands) + session._register_permission_handler(on_permission_request) + if on_user_input_request: + session._register_user_input_handler(on_user_input_request) + if on_elicitation_request: + session._register_elicitation_handler(on_elicitation_request) + if on_exit_plan_mode_request: + session._register_exit_plan_mode_handler(on_exit_plan_mode_request) + if on_auto_mode_switch_request: + session._register_auto_mode_switch_handler(on_auto_mode_switch_request) + if hooks: + session._register_hooks(hooks) + if transform_callbacks: + session._register_transform_callbacks(transform_callbacks) + if on_event: + session.on(on_event) + + try: + if self._session_fs_config: + if create_session_fs_handler is None: + raise ValueError( + "create_session_fs_handler is required in session config when " + "session_fs is enabled in client options." + ) + fs_provider: SessionFsProvider = create_session_fs_handler(session) + caps = self._session_fs_config.get("capabilities") + if caps and caps.get("sqlite"): + from .session_fs_provider import SessionFsSqliteProvider + + if not isinstance(fs_provider, SessionFsSqliteProvider): + raise ValueError( + "SessionFs capabilities declare SQLite support but the provider " + "does not implement SessionFsSqliteProvider" + ) + session._client_session_apis.session_fs = create_session_fs_adapter(fs_provider) + + with self._sessions_lock: + self._sessions[returned_session_id] = session + + session._workspace_path = response.get("workspacePath") + session._remote_url = response.get("remoteUrl") + capabilities = response.get("capabilities") + session._set_capabilities(capabilities) + self._flush_pending_for_session(returned_session_id, session) + except BaseException: + with self._sessions_lock: + self._sessions.pop(returned_session_id, None) + guard.dispose() + raise + + guard.dispose() + log_timing( + logger, + logging.DEBUG, + "CopilotClient.create_cloud_session complete", + total_start, + session_id=returned_session_id, + ) + return session + + def _begin_pending_session_routing(self) -> _PendingSessionRoutingGuard: + """Enter pending-routing mode; return a guard that exits it on dispose(). + + While at least one guard is alive, ``session.event`` notifications and + inbound JSON-RPC requests addressed to session ids that are not yet + registered are buffered (bounded, drop-oldest) and replayed on + registration. When the last guard is disposed, any still-pending + messages are dropped and parked request futures are rejected so callers + don't hang. + """ + with self._sessions_lock: + self._pending_routing_count += 1 + return _PendingSessionRoutingGuard(self) + + def _flush_pending_for_session(self, session_id: str, session: CopilotSession) -> None: + """Drain buffered events and resolve parked request futures for ``session_id``. + + Called from :meth:`create_cloud_session` after the session has been + registered in ``_sessions`` and before the pending-routing guard is + released. + """ + events_to_dispatch: list[SessionEvent] = [] + waiters_to_resolve: list[asyncio.Future[CopilotSession]] = [] + with self._sessions_lock: + events_to_dispatch = self._pending_session_events.pop(session_id, []) + waiters_to_resolve = self._pending_session_waiters.pop(session_id, []) + for event in events_to_dispatch: + session._dispatch_event(event) + for future in waiters_to_resolve: + if not future.done(): + future.set_result(session) + + async def _resolve_session(self, session_id: str) -> CopilotSession: + """Look up the session for an inbound request. + + If the session is not yet registered but a cloud ``session.create`` is + in flight (pending-routing mode is active), park the caller on a + :class:`asyncio.Future` until the session is registered or pending mode + ends. Otherwise raise :class:`ValueError` immediately. + """ + future: asyncio.Future[CopilotSession] | None = None + with self._sessions_lock: + session = self._sessions.get(session_id) + if session is None and self._pending_routing_count > 0: + loop = asyncio.get_running_loop() + future = loop.create_future() + self._pending_session_waiters.setdefault(session_id, []).append(future) + if session is not None: + return session + if future is not None: + return await future + raise ValueError(f"unknown session {session_id}") + async def resume_session( self, session_id: str, @@ -2969,6 +3413,16 @@ def handle_notification(method: str, params: dict): event = session_event_from_dict(event_dict) with self._sessions_lock: session = self._sessions.get(session_id) + if session is None and self._pending_routing_count > 0: + buf = self._pending_session_events.setdefault(session_id, []) + if len(buf) >= _PENDING_SESSION_BUFFER_LIMIT: + buf.pop(0) + logger.warning( + "pending session event buffer full for %s; dropping oldest", + session_id, + ) + buf.append(event) + return if session: session._dispatch_event(event) elif method == "session.lifecycle": @@ -3087,7 +3541,18 @@ def handle_notification(method: str, params: dict): event_dict = params["event"] # Convert dict to SessionEvent object event = session_event_from_dict(event_dict) - session = self._sessions.get(session_id) + with self._sessions_lock: + session = self._sessions.get(session_id) + if session is None and self._pending_routing_count > 0: + buf = self._pending_session_events.setdefault(session_id, []) + if len(buf) >= _PENDING_SESSION_BUFFER_LIMIT: + buf.pop(0) + logger.warning( + "pending session event buffer full for %s; dropping oldest", + session_id, + ) + buf.append(event) + return if session: session._dispatch_event(event) elif method == "session.lifecycle": @@ -3153,11 +3618,7 @@ async def _handle_user_input_request(self, params: dict) -> dict: if not session_id or not question: raise ValueError("invalid user input request payload") - with self._sessions_lock: - session = self._sessions.get(session_id) - if not session: - raise ValueError(f"unknown session {session_id}") - + session = await self._resolve_session(session_id) result = await session._handle_user_input_request(params) return {"answer": result["answer"], "wasFreeform": result["wasFreeform"]} @@ -3173,11 +3634,7 @@ async def _handle_exit_plan_mode_request(self, params: dict) -> dict: if not isinstance(actions, list) or not isinstance(recommended_action, str): raise ValueError("invalid exit plan mode request payload") - with self._sessions_lock: - session = self._sessions.get(session_id) - if not session: - raise ValueError(f"unknown session {session_id}") - + session = await self._resolve_session(session_id) return dict(await session._handle_exit_plan_mode_request(params)) async def _handle_auto_mode_switch_request(self, params: dict) -> dict: @@ -3186,11 +3643,7 @@ async def _handle_auto_mode_switch_request(self, params: dict) -> dict: if not session_id: raise ValueError("invalid auto mode switch request payload") - with self._sessions_lock: - session = self._sessions.get(session_id) - if not session: - raise ValueError(f"unknown session {session_id}") - + session = await self._resolve_session(session_id) response = await session._handle_auto_mode_switch_request(params) return {"response": response} @@ -3214,11 +3667,7 @@ async def _handle_hooks_invoke(self, params: dict) -> dict: if not session_id or not hook_type: raise ValueError("invalid hooks invoke payload") - with self._sessions_lock: - session = self._sessions.get(session_id) - if not session: - raise ValueError(f"unknown session {session_id}") - + session = await self._resolve_session(session_id) output = await session._handle_hooks_invoke(hook_type, input_data) return {"output": output} @@ -3230,9 +3679,5 @@ async def _handle_system_message_transform(self, params: dict) -> dict: if not session_id or not sections: raise ValueError("invalid systemMessage.transform payload") - with self._sessions_lock: - session = self._sessions.get(session_id) - if not session: - raise ValueError(f"unknown session {session_id}") - + session = await self._resolve_session(session_id) return await session._handle_system_message_transform(sections) diff --git a/python/copilot/session.py b/python/copilot/session.py index c775ef58e..7bee0530a 100644 --- a/python/copilot/session.py +++ b/python/copilot/session.py @@ -1001,6 +1001,7 @@ def __init__( self.session_id = session_id self._client = client self._workspace_path = os.fsdecode(workspace_path) if workspace_path is not None else None + self._remote_url: str | None = None self._event_handlers: set[Callable[[SessionEvent], None]] = set() self._event_handlers_lock = threading.Lock() self._tool_handlers: dict[str, ToolHandler] = {} @@ -1069,6 +1070,16 @@ def workspace_path(self) -> pathlib.Path | None: # attribute to do the conversion, or just do the conversion lazily via a getter. return pathlib.Path(self._workspace_path) if self._workspace_path else None + @property + def remote_url(self) -> str | None: + """Remote URL for the Mission Control–backed cloud session. + + Set from the ``remoteUrl`` field in the ``session.create`` response for + cloud sessions created via :meth:`CopilotClient.create_cloud_session`. + ``None`` for regular local sessions. + """ + return self._remote_url + async def send( self, prompt: str, diff --git a/python/test_client.py b/python/test_client.py index 14320b3a2..9869d9967 100644 --- a/python/test_client.py +++ b/python/test_client.py @@ -63,7 +63,7 @@ async def test_resume_session_allows_none_permission_handler(self): class TestCreateSessionConfig: @pytest.mark.asyncio - async def test_create_session_forwards_cloud_options(self): + async def test_create_cloud_session_forwards_cloud_options(self): client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) await client.start() try: @@ -72,12 +72,11 @@ async def test_create_session_forwards_cloud_options(self): async def mock_request(method, params): captured[method] = params if method == "session.create": - return {"sessionId": params["sessionId"], "workspacePath": None} + return {"sessionId": "cloud-session-id", "workspacePath": None} return {} client._client.request = mock_request - await client.create_session( - on_permission_request=PermissionHandler.approve_all, + await client.create_cloud_session( cloud=CloudSessionOptions( repository=CloudSessionRepository( owner="github", @@ -87,6 +86,9 @@ async def mock_request(method, params): ), ) + assert "sessionId" not in captured["session.create"], ( + "cloud sessions must not send a sessionId in the wire payload" + ) assert captured["session.create"]["cloud"] == { "repository": { "owner": "github", diff --git a/python/test_cloud_session.py b/python/test_cloud_session.py new file mode 100644 index 000000000..f03164a05 --- /dev/null +++ b/python/test_cloud_session.py @@ -0,0 +1,279 @@ +""" +Tests for CopilotClient.create_cloud_session. + +Ports the spirit of the Rust integration tests in rust/tests/session_test.rs. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime +from uuid import uuid4 + +import pytest + +from copilot import CopilotClient, RuntimeConnection +from copilot.client import CloudSessionOptions, CloudSessionRepository +from copilot.session import ProviderConfig, UserInputRequest, UserInputResponse +from e2e.testharness import CLI_PATH + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _cloud_config() -> dict: + return dict( + cloud=CloudSessionOptions( + repository=CloudSessionRepository(owner="github", name="copilot-sdk", branch="main") + ) + ) + + +def _make_event_dict(event_type: str = "session.buffered_test", data: dict | None = None) -> dict: + """Build a minimal valid session-event dict for injection in tests.""" + return { + "id": str(uuid4()), + "timestamp": datetime.now().isoformat(), + "parentId": None, + "type": event_type, + "data": data or {}, + } + + +# --------------------------------------------------------------------------- +# Test 1: create_session rejects cloud config +# --------------------------------------------------------------------------- + + +class TestCreateSessionRejectsCloud: + @pytest.mark.asyncio + async def test_create_session_rejects_cloud_config(self): + """create_session must raise ValueError mentioning create_cloud_session.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + with pytest.raises(ValueError, match="create_cloud_session"): + await client.create_session(**_cloud_config()) + finally: + await client.force_stop() + + +# --------------------------------------------------------------------------- +# Test 2: wire shape — sessionId omitted, cloud set, returned id used +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionWireShape: + @pytest.mark.asyncio + async def test_sends_cloud_without_session_id(self): + """session.create must carry cloud but omit sessionId; the response id is used.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + captured: dict = {} + + async def mock_request(method, params): + captured[method] = params + if method == "session.create": + return { + "sessionId": "remote-cloud-session", + "remoteUrl": "https://copilot.example.test/agents/remote-cloud-session", + "capabilities": {"ui": {"elicitation": True}}, + } + return {} + + client._client.request = mock_request + session = await client.create_cloud_session(**_cloud_config()) + + wire = captured["session.create"] + assert "sessionId" not in wire, "sessionId must be omitted from cloud create" + assert wire["cloud"]["repository"]["owner"] == "github" + assert wire["cloud"]["repository"]["name"] == "copilot-sdk" + assert wire["cloud"]["repository"]["branch"] == "main" + assert "provider" not in wire + + assert session.session_id == "remote-cloud-session" + assert session.remote_url == "https://copilot.example.test/agents/remote-cloud-session" + assert session.capabilities.get("ui", {}).get("elicitation") is True + finally: + await client.force_stop() + + +# --------------------------------------------------------------------------- +# Test 3: rejects caller-provided session_id +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionRejectsSessionId: + @pytest.mark.asyncio + async def test_rejects_caller_session_id(self): + """Passing session_id must raise ValueError naming session_id.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + with pytest.raises(ValueError, match="session_id"): + await client.create_cloud_session(**_cloud_config(), session_id="caller-id") + + +# --------------------------------------------------------------------------- +# Test 4: rejects caller-provided provider +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionRejectsProvider: + @pytest.mark.asyncio + async def test_rejects_caller_provider(self): + """Passing provider must raise ValueError naming provider.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + with pytest.raises(ValueError, match="provider"): + await client.create_cloud_session( + **_cloud_config(), + provider=ProviderConfig(type="openai", base_url="https://api.example.test/v1"), + ) + + +# --------------------------------------------------------------------------- +# Test 5: requires cloud +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionRequiresCloud: + @pytest.mark.asyncio + async def test_requires_cloud(self): + """Omitting cloud (or passing None) must raise ValueError mentioning cloud.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + with pytest.raises(ValueError, match="cloud"): + await client.create_cloud_session() + + +# --------------------------------------------------------------------------- +# Test 6: buffers early session.event notifications +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionBuffersEarlyNotifications: + @pytest.mark.asyncio + async def test_early_notifications_dispatched_after_registration(self): + """session.event notifications arriving before registration are buffered and replayed.""" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + create_response_gate: asyncio.Future[dict] = asyncio.get_event_loop().create_future() + + async def mock_request(method, params): + if method == "session.create": + return await create_response_gate + return {} + + client._client.request = mock_request + + session_id = "remote-cloud-session" + received_events: list = [] + + create_task = asyncio.ensure_future( + client.create_cloud_session( + **_cloud_config(), + on_event=lambda e: received_events.append(e), + ) + ) + + # Yield control so create_cloud_session enters pending-routing mode. + await asyncio.sleep(0) + await asyncio.sleep(0) + + # Inject a session.event notification while the create is in flight. + notification_handler = client._client.notification_handler + assert notification_handler is not None, "notification handler not registered" + notification_handler( + "session.event", + { + "sessionId": session_id, + "event": _make_event_dict(), + }, + ) + + # Verify it is buffered (not yet dispatched — session not registered yet). + await asyncio.sleep(0) + assert not received_events, "event dispatched before session was registered" + + # Allow session.create to respond; this registers the session. + create_response_gate.set_result({"sessionId": session_id}) + await asyncio.wait_for(create_task, timeout=5.0) + + # Give the event loop a tick to flush the buffered event. + await asyncio.sleep(0) + + assert len(received_events) == 1, ( + f"expected 1 buffered event to be replayed, got {len(received_events)}" + ) + # Our synthetic event uses an unknown type; just confirm it was dispatched. + assert received_events[0].raw_type == "session.buffered_test" + finally: + await client.force_stop() + + +# --------------------------------------------------------------------------- +# Test 7: parks inbound requests until registration +# --------------------------------------------------------------------------- + + +class TestCreateCloudSessionParksInboundRequests: + @pytest.mark.asyncio + async def test_parked_user_input_resolves_after_registration(self): + """userInput.request that arrives before registration is parked, then resolved.""" + answered: list[str] = [] + + async def color_picker(request: UserInputRequest, context: dict) -> UserInputResponse: + answered.append(request["question"]) + return UserInputResponse(answer="blue", wasFreeform=True) + + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + create_response_gate: asyncio.Future[dict] = asyncio.get_event_loop().create_future() + + async def mock_request(method, params): + if method == "session.create": + return await create_response_gate + return {} + + client._client.request = mock_request + + session_id = "remote-cloud-session" + create_task = asyncio.ensure_future( + client.create_cloud_session(**_cloud_config(), on_user_input_request=color_picker) + ) + + # Yield so pending-routing mode is entered. + await asyncio.sleep(0) + await asyncio.sleep(0) + + # Dispatch a userInput.request while the create is in flight. + user_input_handler = client._client.request_handlers.get("userInput.request") + assert user_input_handler is not None, "userInput.request handler not registered" + + input_task = asyncio.ensure_future( + user_input_handler( + { + "sessionId": session_id, + "question": "Pick a color", + "choices": ["red", "blue"], + "allowFreeform": True, + } + ) + ) + + # Yield to let the handler park on the pending future. + await asyncio.sleep(0) + assert not input_task.done(), "handler should be parked waiting for session" + + # Now let the create response arrive; this registers the session. + create_response_gate.set_result({"sessionId": session_id}) + await asyncio.wait_for(create_task, timeout=5.0) + + # The parked userInput handler should now complete. + result = await asyncio.wait_for(input_task, timeout=5.0) + assert result["answer"] == "blue" + assert result["wasFreeform"] is True + assert answered == ["Pick a color"] + finally: + await client.force_stop() From adff7fb51116e9f1d1becebfa8bcfbfd6600be56 Mon Sep 17 00:00:00 2001 From: Timothy Clem Date: Fri, 22 May 2026 19:51:54 -0700 Subject: [PATCH 2/2] python: emit JSON-RPC error responses for dropped pending requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two symmetric fixes ported from Rust SDK (commits 491b4427, e0ff254f) and TS SDK (commit c167bc3e, PR #1395): 1. Pending request buffer overflow: when _resolve_session would push the waiter list past _PENDING_SESSION_BUFFER_LIMIT (128), evict the oldest future and call set_exception(ValueError('pending session buffer overflow')). The JSON-RPC dispatch layer (_dispatch_request) catches the raised exception and sends a proper -32603 error response so the runtime doesn't hang on the request id. 2. Guard drop without registration: _PendingSessionRoutingGuard.dispose() already called set_exception on parked waiters, but used a generic message. Changed to the cross-SDK canonical message 'pending session routing ended before session was registered' so the two failure paths are distinguishable in logs and error messages. Notifications retain warn-and-drop-oldest behaviour (no response needed — they carry no id). Add two unit tests: - TestPendingRequestBufferOverflow: 129 concurrent waiters; oldest raises with the overflow message; remaining 128 resolve after registration. - TestPendingRequestGuardDropWithoutRegistration: session.create fails; parked request raises with the routing-ended message. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/copilot/client.py | 20 ++++-- python/test_cloud_session.py | 136 ++++++++++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 6 deletions(-) diff --git a/python/copilot/client.py b/python/copilot/client.py index 02a04cc18..354ff4d65 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -1037,11 +1037,11 @@ def dispose(self) -> None: self._client._pending_session_waiters.clear() for future in waiters_to_reject: if not future.done(): + # Distinct phrasing from the overflow-eviction path so debugging + # can tell the two cases apart. Matches Rust SDK (commit e0ff254f) + # and TS SDK (commit c167bc3e). future.set_exception( - ValueError( - "Cloud session.create completed without registering this session id; " - "inbound request dropped" - ) + ValueError("pending session routing ended before session was registered") ) @@ -2312,12 +2312,22 @@ async def _resolve_session(self, session_id: str) -> CopilotSession: ends. Otherwise raise :class:`ValueError` immediately. """ future: asyncio.Future[CopilotSession] | None = None + evicted: asyncio.Future[CopilotSession] | None = None with self._sessions_lock: session = self._sessions.get(session_id) if session is None and self._pending_routing_count > 0: loop = asyncio.get_running_loop() future = loop.create_future() - self._pending_session_waiters.setdefault(session_id, []).append(future) + waiters = self._pending_session_waiters.setdefault(session_id, []) + # Cap parked waiters at the same limit as notifications. When exceeded, + # reject the oldest so the runtime gets a JSON-RPC error response + # (code -32603) rather than hanging on the request id until timeout. + # Matches Rust SDK fix (commit 491b4427) and TS SDK (commit c167bc3e). + if len(waiters) >= _PENDING_SESSION_BUFFER_LIMIT: + evicted = waiters.pop(0) + waiters.append(future) + if evicted is not None and not evicted.done(): + evicted.set_exception(ValueError("pending session buffer overflow")) if session is not None: return session if future is not None: diff --git a/python/test_cloud_session.py b/python/test_cloud_session.py index f03164a05..a0b0ff87f 100644 --- a/python/test_cloud_session.py +++ b/python/test_cloud_session.py @@ -13,7 +13,11 @@ import pytest from copilot import CopilotClient, RuntimeConnection -from copilot.client import CloudSessionOptions, CloudSessionRepository +from copilot.client import ( + _PENDING_SESSION_BUFFER_LIMIT, + CloudSessionOptions, + CloudSessionRepository, +) from copilot.session import ProviderConfig, UserInputRequest, UserInputResponse from e2e.testharness import CLI_PATH @@ -277,3 +281,133 @@ async def mock_request(method, params): assert answered == ["Pick a color"] finally: await client.force_stop() + + +# --------------------------------------------------------------------------- +# Test 8: pending request buffer overflow emits an error (not silent drop) +# --------------------------------------------------------------------------- + + +class TestPendingRequestBufferOverflow: + @pytest.mark.asyncio + async def test_oldest_waiter_rejected_on_overflow(self): + """When the parked-request buffer is full, the oldest waiter is rejected. + + The rejection causes the JSON-RPC dispatch layer to send a JSON-RPC error + response (code -32603) rather than silently hanging the runtime on that + request id. The remaining _PENDING_SESSION_BUFFER_LIMIT waiters resolve + normally once the session is registered. + """ + session_id = "overflow-session" + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + # Enter pending-routing mode manually so _resolve_session parks futures. + guard = client._begin_pending_session_routing() + + total = _PENDING_SESSION_BUFFER_LIMIT + 1 # 129 concurrent waiters + tasks = [ + asyncio.ensure_future(client._resolve_session(session_id)) + for _ in range(total) + ] + + # Yield so all _resolve_session calls park on futures. + await asyncio.sleep(0) + await asyncio.sleep(0) + + # The oldest (tasks[0]) should now be rejected with the overflow message. + assert tasks[0].done(), "oldest waiter should have been rejected synchronously" + with pytest.raises(ValueError, match="pending session buffer overflow"): + tasks[0].result() + + # The remaining 128 are still parked. + assert all(not t.done() for t in tasks[1:]), "remaining waiters should still be parked" + + # Register the session so the remaining waiters resolve. + from copilot.session import CopilotSession + + session = CopilotSession(session_id, client._client, workspace_path=None) + with client._sessions_lock: + client._sessions[session_id] = session + client._flush_pending_for_session(session_id, session) + guard.dispose() + + # Let the event loop settle. + await asyncio.sleep(0) + + resolved_sessions = await asyncio.gather(*tasks[1:], return_exceptions=True) + assert all(s is session for s in resolved_sessions), ( + "all remaining parked waiters should resolve to the registered session" + ) + finally: + await client.force_stop() + + +# --------------------------------------------------------------------------- +# Test 9: guard drop without registration rejects parked requests +# --------------------------------------------------------------------------- + + +class TestPendingRequestGuardDropWithoutRegistration: + @pytest.mark.asyncio + async def test_parked_request_rejected_when_create_fails(self): + """When session.create fails, parked request waiters get a distinct error. + + The error message "pending session routing ended before session was registered" + must differ from the overflow message so the two failure modes are + distinguishable in logs and the runtime gets a proper JSON-RPC error + response rather than hanging. + """ + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + try: + create_response_gate: asyncio.Future[dict] = asyncio.get_event_loop().create_future() + + async def mock_request(method, params): + if method == "session.create": + return await create_response_gate + return {} + + client._client.request = mock_request + + session_id = "failing-cloud-session" + create_task = asyncio.ensure_future( + client.create_cloud_session(**_cloud_config()) + ) + + # Yield so create_cloud_session enters pending-routing mode. + await asyncio.sleep(0) + await asyncio.sleep(0) + + # Park an inbound request while the create is in flight. + user_input_handler = client._client.request_handlers.get("userInput.request") + assert user_input_handler is not None, "userInput.request handler not registered" + + input_task = asyncio.ensure_future( + user_input_handler( + { + "sessionId": session_id, + "question": "Pick a color", + "choices": ["red", "blue"], + "allowFreeform": True, + } + ) + ) + + await asyncio.sleep(0) + assert not input_task.done(), "handler should be parked waiting for session" + + # Make session.create fail; this causes create_cloud_session to call + # guard.dispose() without registering any session id. + create_response_gate.set_exception(RuntimeError("simulated session.create failure")) + with pytest.raises(RuntimeError, match="simulated session.create failure"): + await asyncio.wait_for(create_task, timeout=5.0) + + # The parked waiter should now be rejected with the routing-ended message. + await asyncio.sleep(0) + assert input_task.done(), "parked waiter should be rejected after guard drop" + expected_msg = "pending session routing ended before session was registered" + with pytest.raises(ValueError, match=expected_msg): + await input_task + finally: + await client.force_stop()