From 932e380bb391da25367e1e824a60908e337a5bb7 Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Mon, 23 Mar 2026 07:09:34 -0400 Subject: [PATCH 1/2] refactor(error): centralize upstream taxonomy and jsonrpc translators (#274) --- src/opencode_a2a/error_taxonomy.py | 95 +++++ src/opencode_a2a/execution/upstream_errors.py | 88 +--- src/opencode_a2a/jsonrpc/application.py | 383 +++++++----------- src/opencode_a2a/jsonrpc/error_mapping.py | 154 +++++++ tests/jsonrpc/test_error_mapping.py | 84 ++++ 5 files changed, 477 insertions(+), 327 deletions(-) create mode 100644 src/opencode_a2a/error_taxonomy.py create mode 100644 src/opencode_a2a/jsonrpc/error_mapping.py create mode 100644 tests/jsonrpc/test_error_mapping.py diff --git a/src/opencode_a2a/error_taxonomy.py b/src/opencode_a2a/error_taxonomy.py new file mode 100644 index 0000000..7666810 --- /dev/null +++ b/src/opencode_a2a/error_taxonomy.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import httpx +from a2a.types import TaskState + + +@dataclass(frozen=True) +class UpstreamHTTPErrorProfile: + error_type: str + state: TaskState + default_message: str + + +_UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS: dict[int, UpstreamHTTPErrorProfile] = { + 400: UpstreamHTTPErrorProfile( + "UPSTREAM_BAD_REQUEST", + TaskState.failed, + "OpenCode rejected the request due to invalid input", + ), + 401: UpstreamHTTPErrorProfile( + "UPSTREAM_UNAUTHORIZED", + TaskState.auth_required, + "OpenCode rejected the request due to authentication failure", + ), + 403: UpstreamHTTPErrorProfile( + "UPSTREAM_PERMISSION_DENIED", + TaskState.failed, + "OpenCode rejected the request due to insufficient permissions", + ), + 404: UpstreamHTTPErrorProfile( + "UPSTREAM_RESOURCE_NOT_FOUND", + TaskState.failed, + "OpenCode rejected the request because the target resource was not found", + ), + 429: UpstreamHTTPErrorProfile( + "UPSTREAM_QUOTA_EXCEEDED", + TaskState.failed, + "OpenCode rejected the request due to quota limits", + ), +} + + +def resolve_upstream_http_error_profile(status: int) -> UpstreamHTTPErrorProfile: + if status in _UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS: + return _UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS[status] + if 400 <= status < 500: + return UpstreamHTTPErrorProfile( + "UPSTREAM_CLIENT_ERROR", + TaskState.failed, + f"OpenCode rejected the request with client error {status}", + ) + if status >= 500: + return UpstreamHTTPErrorProfile( + "UPSTREAM_SERVER_ERROR", + TaskState.failed, + f"OpenCode rejected the request with server error {status}", + ) + return UpstreamHTTPErrorProfile( + "UPSTREAM_HTTP_ERROR", + TaskState.failed, + f"OpenCode rejected the request with HTTP status {status}", + ) + + +def extract_upstream_error_detail(response: httpx.Response | None) -> str | None: + if response is None: + return None + + payload = None + try: + payload = response.json() + except Exception: + payload = None + + if isinstance(payload, dict): + for key in ("detail", "error", "message"): + value = payload.get(key) + if isinstance(value, str): + value = value.strip() + if value: + return value + + text = response.text.strip() + if text: + return text[:512] + return None + + +__all__ = [ + "UpstreamHTTPErrorProfile", + "extract_upstream_error_detail", + "resolve_upstream_http_error_profile", +] diff --git a/src/opencode_a2a/execution/upstream_errors.py b/src/opencode_a2a/execution/upstream_errors.py index eb36668..0f8d7ac 100644 --- a/src/opencode_a2a/execution/upstream_errors.py +++ b/src/opencode_a2a/execution/upstream_errors.py @@ -7,6 +7,12 @@ import httpx from a2a.types import TaskState +from ..error_taxonomy import ( + extract_upstream_error_detail as _extract_upstream_error_detail, +) +from ..error_taxonomy import ( + resolve_upstream_http_error_profile as _resolve_upstream_error_profile, +) from ..opencode_upstream_client import UpstreamContractError @@ -18,13 +24,6 @@ class _StreamTerminalSignal: upstream_status: int | None = None -@dataclass(frozen=True) -class _UpstreamErrorProfile: - error_type: str - state: TaskState - default_message: str - - @dataclass(frozen=True) class _UpstreamInBandError: error_type: str @@ -33,81 +32,6 @@ class _UpstreamInBandError: upstream_status: int | None = None -_UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS: dict[int, _UpstreamErrorProfile] = { - 400: _UpstreamErrorProfile( - "UPSTREAM_BAD_REQUEST", - TaskState.failed, - "OpenCode rejected the request due to invalid input", - ), - 401: _UpstreamErrorProfile( - "UPSTREAM_UNAUTHORIZED", - TaskState.auth_required, - "OpenCode rejected the request due to authentication failure", - ), - 403: _UpstreamErrorProfile( - "UPSTREAM_PERMISSION_DENIED", - TaskState.failed, - "OpenCode rejected the request due to insufficient permissions", - ), - 404: _UpstreamErrorProfile( - "UPSTREAM_RESOURCE_NOT_FOUND", - TaskState.failed, - "OpenCode rejected the request because the target resource was not found", - ), - 429: _UpstreamErrorProfile( - "UPSTREAM_QUOTA_EXCEEDED", - TaskState.failed, - "OpenCode rejected the request due to quota limits", - ), -} - - -def _resolve_upstream_error_profile(status: int) -> _UpstreamErrorProfile: - if status in _UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS: - return _UPSTREAM_HTTP_ERROR_PROFILE_BY_STATUS[status] - if 400 <= status < 500: - return _UpstreamErrorProfile( - "UPSTREAM_CLIENT_ERROR", - TaskState.failed, - f"OpenCode rejected the request with client error {status}", - ) - if status >= 500: - return _UpstreamErrorProfile( - "UPSTREAM_SERVER_ERROR", - TaskState.failed, - f"OpenCode rejected the request with server error {status}", - ) - return _UpstreamErrorProfile( - "UPSTREAM_HTTP_ERROR", - TaskState.failed, - f"OpenCode rejected the request with HTTP status {status}", - ) - - -def _extract_upstream_error_detail(response: httpx.Response | None) -> str | None: - if response is None: - return None - - payload = None - try: - payload = response.json() - except Exception: - payload = None - - if isinstance(payload, dict): - for key in ("detail", "error", "message"): - value = payload.get(key) - if isinstance(value, str): - value = value.strip() - if value: - return value - - text = response.text.strip() - if text: - return text[:512] - return None - - def _format_upstream_error( exc: httpx.HTTPStatusError, *, request: str ) -> tuple[str, TaskState, str]: diff --git a/src/opencode_a2a/jsonrpc/application.py b/src/opencode_a2a/jsonrpc/application.py index 0032fea..7d20c85 100644 --- a/src/opencode_a2a/jsonrpc/application.py +++ b/src/opencode_a2a/jsonrpc/application.py @@ -8,10 +8,7 @@ from a2a.server.apps.jsonrpc.fastapi_app import A2AFastAPIApplication from a2a.types import ( A2AError, - InternalError, - InvalidParamsError, InvalidRequestError, - JSONRPCError, JSONRPCRequest, ) from fastapi.responses import JSONResponse @@ -24,6 +21,18 @@ SESSION_QUERY_ERROR_BUSINESS_CODES, ) from ..opencode_upstream_client import OpencodeUpstreamClient, UpstreamContractError +from .error_mapping import ( + internal_error, + interrupt_not_found_error, + invalid_params_error, + invalid_params_exception_error, + method_not_supported_error, + session_forbidden_error, + session_not_found_error, + upstream_http_error, + upstream_payload_error, + upstream_unreachable_error, +) from .methods import ( SESSION_CONTEXT_PREFIX, _apply_session_query_limit, @@ -139,11 +148,7 @@ def _session_forbidden_response( ) -> Response: return self._generate_error_response( request_id, - JSONRPCError( - code=ERR_SESSION_FORBIDDEN, - message="Session forbidden", - data={"type": "SESSION_FORBIDDEN", "session_id": session_id}, - ), + session_forbidden_error(ERR_SESSION_FORBIDDEN, session_id=session_id), ) def _extract_directory_from_metadata( @@ -156,11 +161,9 @@ def _extract_directory_from_metadata( if metadata is not None and not isinstance(metadata, dict): return None, self._generate_error_response( request_id, - A2AError( - root=InvalidParamsError( - message="metadata must be an object", - data={"type": "INVALID_FIELD", "field": "metadata"}, - ) + invalid_params_error( + "metadata must be an object", + data={"type": "INVALID_FIELD", "field": "metadata"}, ), ) @@ -171,22 +174,18 @@ def _extract_directory_from_metadata( prefixed_fields = [f"metadata.{field}" for field in unknown_metadata_fields] return None, self._generate_error_response( request_id, - A2AError( - root=InvalidParamsError( - message=f"Unsupported metadata fields: {', '.join(prefixed_fields)}", - data={"type": "INVALID_FIELD", "fields": prefixed_fields}, - ) + invalid_params_error( + f"Unsupported metadata fields: {', '.join(prefixed_fields)}", + data={"type": "INVALID_FIELD", "fields": prefixed_fields}, ), ) raw_opencode_metadata = metadata.get("opencode") if raw_opencode_metadata is not None and not isinstance(raw_opencode_metadata, dict): return None, self._generate_error_response( request_id, - A2AError( - root=InvalidParamsError( - message="metadata.opencode must be an object", - data={"type": "INVALID_FIELD", "field": "metadata.opencode"}, - ) + invalid_params_error( + "metadata.opencode must be an object", + data={"type": "INVALID_FIELD", "field": "metadata.opencode"}, ), ) if isinstance(raw_opencode_metadata, dict): @@ -195,11 +194,9 @@ def _extract_directory_from_metadata( if raw_shared_metadata is not None and not isinstance(raw_shared_metadata, dict): return None, self._generate_error_response( request_id, - A2AError( - root=InvalidParamsError( - message="metadata.shared must be an object", - data={"type": "INVALID_FIELD", "field": "metadata.shared"}, - ) + invalid_params_error( + "metadata.shared must be an object", + data={"type": "INVALID_FIELD", "field": "metadata.shared"}, ), ) @@ -209,11 +206,9 @@ def _extract_directory_from_metadata( if directory is not None and not isinstance(directory, str): return None, self._generate_error_response( request_id, - A2AError( - root=InvalidParamsError( - message="metadata.opencode.directory must be a string", - data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, - ) + invalid_params_error( + "metadata.opencode.directory must be a string", + data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, ), ) @@ -281,15 +276,10 @@ async def _handle_requests(self, request: Request) -> Response: return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_METHOD_NOT_SUPPORTED, - message=f"Unsupported method: {base_request.method}", - data={ - "type": "METHOD_NOT_SUPPORTED", - "method": base_request.method, - "supported_methods": self._supported_methods, - "protocol_version": self._protocol_version, - }, + method_not_supported_error( + method=base_request.method, + supported_methods=self._supported_methods, + protocol_version=self._protocol_version, ), ) @@ -297,7 +287,7 @@ async def _handle_requests(self, request: Request) -> Response: if not isinstance(params, dict): return self._generate_error_response( base_request.id, - A2AError(root=InvalidParamsError(message="params must be an object")), + invalid_params_error("params must be an object"), ) if base_request.method in session_query_methods: @@ -326,12 +316,7 @@ async def _handle_session_query_request( except JsonRpcParamsValidationError as exc: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=str(exc), - data=exc.data, - ) - ), + invalid_params_exception_error(exc, data=exc.data), ) limit = int(query["limit"]) @@ -344,39 +329,28 @@ async def _handle_session_query_request( except httpx.HTTPStatusError as exc: upstream_status = exc.response.status_code if upstream_status == 404 and base_request.method == self._method_get_session_messages: + assert session_id is not None return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_SESSION_NOT_FOUND, - message="Session not found", - data={"type": "SESSION_NOT_FOUND", "session_id": session_id}, - ), + session_not_found_error(ERR_SESSION_NOT_FOUND, session_id=session_id), ) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_HTTP_ERROR, - message="Upstream OpenCode error", - data={ - "type": "UPSTREAM_HTTP_ERROR", - "upstream_status": upstream_status, - }, + upstream_http_error( + ERR_UPSTREAM_HTTP_ERROR, + upstream_status=upstream_status, ), ) except httpx.HTTPError: return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_UNREACHABLE, - message="Upstream OpenCode unreachable", - data={"type": "UPSTREAM_UNREACHABLE"}, - ), + upstream_unreachable_error(ERR_UPSTREAM_UNREACHABLE), ) except Exception as exc: logger.exception("OpenCode session query JSON-RPC method failed") return self._generate_error_response( base_request.id, - A2AError(root=InternalError(message=str(exc))), + internal_error(exc), ) try: @@ -388,10 +362,9 @@ async def _handle_session_query_request( logger.warning("Upstream OpenCode payload mismatch: %s", exc) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_PAYLOAD_ERROR, - message="Upstream OpenCode payload mismatch", - data={"type": "UPSTREAM_PAYLOAD_ERROR", "detail": str(exc)}, + upstream_payload_error( + ERR_UPSTREAM_PAYLOAD_ERROR, + detail=str(exc), ), ) @@ -441,11 +414,9 @@ async def _handle_provider_discovery_request( prefixed_fields = [f"params.{field}" for field in unknown_fields] return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=f"Unsupported params fields: {', '.join(prefixed_fields)}", - data={"type": "INVALID_FIELD", "fields": prefixed_fields}, - ) + invalid_params_error( + f"Unsupported params fields: {', '.join(prefixed_fields)}", + data={"type": "INVALID_FIELD", "fields": prefixed_fields}, ), ) @@ -456,11 +427,9 @@ async def _handle_provider_discovery_request( if not isinstance(raw_provider_id, str) or not raw_provider_id.strip(): return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message="provider_id must be a non-empty string", - data={"type": "INVALID_FIELD", "field": "provider_id"}, - ) + invalid_params_error( + "provider_id must be a non-empty string", + data={"type": "INVALID_FIELD", "field": "provider_id"}, ), ) provider_id = raw_provider_id.strip() @@ -477,11 +446,9 @@ async def _handle_provider_discovery_request( except ValueError as exc: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=str(exc), - data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, - ) + invalid_params_exception_error( + exc, + data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, ), ) @@ -491,30 +458,25 @@ async def _handle_provider_discovery_request( upstream_status = exc.response.status_code return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_DISCOVERY_UPSTREAM_HTTP_ERROR, - message="Upstream OpenCode error", - data={ - "type": "UPSTREAM_HTTP_ERROR", - "method": base_request.method, - "upstream_status": upstream_status, - }, + upstream_http_error( + ERR_DISCOVERY_UPSTREAM_HTTP_ERROR, + upstream_status=upstream_status, + method=base_request.method, ), ) except httpx.HTTPError: return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_DISCOVERY_UPSTREAM_UNREACHABLE, - message="Upstream OpenCode unreachable", - data={"type": "UPSTREAM_UNREACHABLE", "method": base_request.method}, + upstream_unreachable_error( + ERR_DISCOVERY_UPSTREAM_UNREACHABLE, + method=base_request.method, ), ) except Exception as exc: logger.exception("OpenCode provider discovery JSON-RPC method failed") return self._generate_error_response( base_request.id, - A2AError(root=InternalError(message=str(exc))), + internal_error(exc), ) try: @@ -536,14 +498,10 @@ async def _handle_provider_discovery_request( logger.warning("Upstream OpenCode provider payload mismatch: %s", exc) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_DISCOVERY_UPSTREAM_PAYLOAD_ERROR, - message="Upstream OpenCode payload mismatch", - data={ - "type": "UPSTREAM_PAYLOAD_ERROR", - "method": base_request.method, - "detail": str(exc), - }, + upstream_payload_error( + ERR_DISCOVERY_UPSTREAM_PAYLOAD_ERROR, + detail=str(exc), + method=base_request.method, ), ) @@ -570,11 +528,9 @@ async def _handle_session_control_request( if unknown_fields: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=f"Unsupported fields: {', '.join(unknown_fields)}", - data={"type": "INVALID_FIELD", "fields": unknown_fields}, - ) + invalid_params_error( + f"Unsupported fields: {', '.join(unknown_fields)}", + data={"type": "INVALID_FIELD", "fields": unknown_fields}, ), ) @@ -582,11 +538,9 @@ async def _handle_session_control_request( if not isinstance(session_id, str) or not session_id.strip(): return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message="Missing required params.session_id", - data={"type": "MISSING_FIELD", "field": "session_id"}, - ) + invalid_params_error( + "Missing required params.session_id", + data={"type": "MISSING_FIELD", "field": "session_id"}, ), ) session_id = session_id.strip() @@ -595,21 +549,17 @@ async def _handle_session_control_request( if raw_request is None: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message="Missing required params.request", - data={"type": "MISSING_FIELD", "field": "request"}, - ) + invalid_params_error( + "Missing required params.request", + data={"type": "MISSING_FIELD", "field": "request"}, ), ) if not isinstance(raw_request, dict): return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message="params.request must be an object", - data={"type": "INVALID_FIELD", "field": "request"}, - ) + invalid_params_error( + "params.request must be an object", + data={"type": "INVALID_FIELD", "field": "request"}, ), ) @@ -647,11 +597,9 @@ def _log_shell_audit(outcome: str) -> None: except _PromptAsyncValidationError as exc: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=str(exc), - data={"type": "INVALID_FIELD", "field": exc.field}, - ) + invalid_params_exception_error( + exc, + data={"type": "INVALID_FIELD", "field": exc.field}, ), ) @@ -667,11 +615,9 @@ def _log_shell_audit(outcome: str) -> None: except ValueError as exc: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=str(exc), - data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, - ) + invalid_params_exception_error( + exc, + data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, ), ) @@ -739,53 +685,37 @@ def _log_shell_audit(outcome: str) -> None: _log_shell_audit("upstream_404") return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_SESSION_NOT_FOUND, - message="Session not found", - data={"type": "SESSION_NOT_FOUND", "session_id": session_id}, - ), + session_not_found_error(ERR_SESSION_NOT_FOUND, session_id=session_id), ) _log_shell_audit("upstream_http_error") return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_HTTP_ERROR, - message="Upstream OpenCode error", - data={ - "type": "UPSTREAM_HTTP_ERROR", - "method": base_request.method, - "upstream_status": upstream_status, - "session_id": session_id, - }, + upstream_http_error( + ERR_UPSTREAM_HTTP_ERROR, + upstream_status=upstream_status, + method=base_request.method, + session_id=session_id, ), ) except httpx.HTTPError: _log_shell_audit("upstream_unreachable") return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_UNREACHABLE, - message="Upstream OpenCode unreachable", - data={ - "type": "UPSTREAM_UNREACHABLE", - "method": base_request.method, - "session_id": session_id, - }, + upstream_unreachable_error( + ERR_UPSTREAM_UNREACHABLE, + method=base_request.method, + session_id=session_id, ), ) except UpstreamContractError as exc: _log_shell_audit("upstream_payload_error") return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_PAYLOAD_ERROR, - message="Upstream OpenCode payload mismatch", - data={ - "type": "UPSTREAM_PAYLOAD_ERROR", - "method": base_request.method, - "detail": str(exc), - "session_id": session_id, - }, + upstream_payload_error( + ERR_UPSTREAM_PAYLOAD_ERROR, + detail=str(exc), + method=base_request.method, + session_id=session_id, ), ) except PermissionError: @@ -799,7 +729,7 @@ def _log_shell_audit(outcome: str) -> None: logger.exception("OpenCode session control JSON-RPC method failed") return self._generate_error_response( base_request.id, - A2AError(root=InternalError(message=str(exc))), + internal_error(exc), ) finally: if pending_claim and not claim_finalized and identity: @@ -832,11 +762,9 @@ async def _handle_interrupt_callback_request( if not isinstance(request_id, str) or not request_id.strip(): return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message="Missing required params.request_id", - data={"type": "MISSING_FIELD", "field": "request_id"}, - ) + invalid_params_error( + "Missing required params.request_id", + data={"type": "MISSING_FIELD", "field": "request_id"}, ), ) request_id = request_id.strip() @@ -854,39 +782,28 @@ async def _handle_interrupt_callback_request( if callable(resolve_request): status, binding = resolve_request(request_id) if status != "active" or binding is None: - error_type = ( - "INTERRUPT_REQUEST_EXPIRED" - if status == "expired" - else "INTERRUPT_REQUEST_NOT_FOUND" - ) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_INTERRUPT_NOT_FOUND, - message=( - "Interrupt request expired" - if status == "expired" - else "Interrupt request not found" - ), - data={"type": error_type, "request_id": request_id}, + interrupt_not_found_error( + ERR_INTERRUPT_NOT_FOUND, + request_id=request_id, + expired=status == "expired", ), ) if binding.interrupt_type != expected_interrupt_type: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=( - "Interrupt type mismatch: " - f"expected {expected_interrupt_type}, got {binding.interrupt_type}" - ), - data={ - "type": "INTERRUPT_TYPE_MISMATCH", - "request_id": request_id, - "expected": expected_interrupt_type, - "actual": binding.interrupt_type, - }, - ) + invalid_params_error( + ( + "Interrupt type mismatch: " + f"expected {expected_interrupt_type}, got {binding.interrupt_type}" + ), + data={ + "type": "INTERRUPT_TYPE_MISMATCH", + "request_id": request_id, + "expected": expected_interrupt_type, + "actual": binding.interrupt_type, + }, ), ) if ( @@ -897,13 +814,9 @@ async def _handle_interrupt_callback_request( ): return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_INTERRUPT_NOT_FOUND, - message="Interrupt request not found", - data={ - "type": "INTERRUPT_REQUEST_NOT_FOUND", - "request_id": request_id, - }, + interrupt_not_found_error( + ERR_INTERRUPT_NOT_FOUND, + request_id=request_id, ), ) else: @@ -912,13 +825,9 @@ async def _handle_interrupt_callback_request( if not resolve_session(request_id): return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_INTERRUPT_NOT_FOUND, - message="Interrupt request not found", - data={ - "type": "INTERRUPT_REQUEST_NOT_FOUND", - "request_id": request_id, - }, + interrupt_not_found_error( + ERR_INTERRUPT_NOT_FOUND, + request_id=request_id, ), ) if base_request.method == self._method_reply_permission: @@ -931,11 +840,9 @@ async def _handle_interrupt_callback_request( if unknown_fields: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=f"Unsupported fields: {', '.join(unknown_fields)}", - data={"type": "INVALID_FIELD", "fields": unknown_fields}, - ) + invalid_params_error( + f"Unsupported fields: {', '.join(unknown_fields)}", + data={"type": "INVALID_FIELD", "fields": unknown_fields}, ), ) @@ -970,12 +877,7 @@ async def _handle_interrupt_callback_request( except ValueError as exc: return self._generate_error_response( base_request.id, - A2AError( - root=InvalidParamsError( - message=str(exc), - data={"type": "INVALID_FIELD"}, - ) - ), + invalid_params_exception_error(exc, data={"type": "INVALID_FIELD"}), ) except httpx.HTTPStatusError as exc: upstream_status = exc.response.status_code @@ -985,41 +887,32 @@ async def _handle_interrupt_callback_request( discard_request(request_id) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_INTERRUPT_NOT_FOUND, - message="Interrupt request not found", - data={ - "type": "INTERRUPT_REQUEST_NOT_FOUND", - "request_id": request_id, - }, + interrupt_not_found_error( + ERR_INTERRUPT_NOT_FOUND, + request_id=request_id, ), ) return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_HTTP_ERROR, - message="Upstream OpenCode error", - data={ - "type": "UPSTREAM_HTTP_ERROR", - "upstream_status": upstream_status, - "request_id": request_id, - }, + upstream_http_error( + ERR_UPSTREAM_HTTP_ERROR, + upstream_status=upstream_status, + request_id=request_id, ), ) except httpx.HTTPError: return self._generate_error_response( base_request.id, - JSONRPCError( - code=ERR_UPSTREAM_UNREACHABLE, - message="Upstream OpenCode unreachable", - data={"type": "UPSTREAM_UNREACHABLE", "request_id": request_id}, + upstream_unreachable_error( + ERR_UPSTREAM_UNREACHABLE, + request_id=request_id, ), ) except Exception as exc: logger.exception("OpenCode interrupt callback JSON-RPC method failed") return self._generate_error_response( base_request.id, - A2AError(root=InternalError(message=str(exc))), + internal_error(exc), ) if base_request.id is None: diff --git a/src/opencode_a2a/jsonrpc/error_mapping.py b/src/opencode_a2a/jsonrpc/error_mapping.py new file mode 100644 index 0000000..e3b949c --- /dev/null +++ b/src/opencode_a2a/jsonrpc/error_mapping.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from typing import Any + +from a2a.types import A2AError, InternalError, InvalidParamsError, JSONRPCError + + +def invalid_params_error( + message: str, + *, + data: dict[str, Any] | None = None, +) -> A2AError: + return A2AError(root=InvalidParamsError(message=message, data=data)) + + +def invalid_params_exception_error( + exc: Exception, + *, + data: dict[str, Any] | None = None, +) -> A2AError: + return invalid_params_error(str(exc), data=data) + + +def internal_error(exc: Exception) -> A2AError: + return A2AError(root=InternalError(message=str(exc))) + + +def method_not_supported_error( + *, + method: str, + supported_methods: list[str], + protocol_version: str, +) -> JSONRPCError: + return JSONRPCError( + code=-32601, + message=f"Unsupported method: {method}", + data={ + "type": "METHOD_NOT_SUPPORTED", + "method": method, + "supported_methods": supported_methods, + "protocol_version": protocol_version, + }, + ) + + +def session_forbidden_error(code: int, *, session_id: str) -> JSONRPCError: + return JSONRPCError( + code=code, + message="Session forbidden", + data={"type": "SESSION_FORBIDDEN", "session_id": session_id}, + ) + + +def session_not_found_error(code: int, *, session_id: str) -> JSONRPCError: + return JSONRPCError( + code=code, + message="Session not found", + data={"type": "SESSION_NOT_FOUND", "session_id": session_id}, + ) + + +def interrupt_not_found_error( + code: int, + *, + request_id: str, + expired: bool = False, +) -> JSONRPCError: + return JSONRPCError( + code=code, + message="Interrupt request expired" if expired else "Interrupt request not found", + data={ + "type": "INTERRUPT_REQUEST_EXPIRED" if expired else "INTERRUPT_REQUEST_NOT_FOUND", + "request_id": request_id, + }, + ) + + +def upstream_http_error( + code: int, + *, + upstream_status: int, + method: str | None = None, + session_id: str | None = None, + request_id: str | None = None, + detail: str | None = None, + message: str = "Upstream OpenCode error", +) -> JSONRPCError: + data: dict[str, Any] = { + "type": "UPSTREAM_HTTP_ERROR", + "upstream_status": upstream_status, + } + if method is not None: + data["method"] = method + if session_id is not None: + data["session_id"] = session_id + if request_id is not None: + data["request_id"] = request_id + if detail is not None: + data["detail"] = detail + return JSONRPCError(code=code, message=message, data=data) + + +def upstream_unreachable_error( + code: int, + *, + method: str | None = None, + session_id: str | None = None, + request_id: str | None = None, + message: str = "Upstream OpenCode unreachable", +) -> JSONRPCError: + data: dict[str, Any] = {"type": "UPSTREAM_UNREACHABLE"} + if method is not None: + data["method"] = method + if session_id is not None: + data["session_id"] = session_id + if request_id is not None: + data["request_id"] = request_id + return JSONRPCError(code=code, message=message, data=data) + + +def upstream_payload_error( + code: int, + *, + detail: str, + method: str | None = None, + session_id: str | None = None, + request_id: str | None = None, + message: str = "Upstream OpenCode payload mismatch", +) -> JSONRPCError: + data: dict[str, Any] = { + "type": "UPSTREAM_PAYLOAD_ERROR", + "detail": detail, + } + if method is not None: + data["method"] = method + if session_id is not None: + data["session_id"] = session_id + if request_id is not None: + data["request_id"] = request_id + return JSONRPCError(code=code, message=message, data=data) + + +__all__ = [ + "internal_error", + "interrupt_not_found_error", + "invalid_params_error", + "invalid_params_exception_error", + "method_not_supported_error", + "session_forbidden_error", + "session_not_found_error", + "upstream_http_error", + "upstream_payload_error", + "upstream_unreachable_error", +] diff --git a/tests/jsonrpc/test_error_mapping.py b/tests/jsonrpc/test_error_mapping.py new file mode 100644 index 0000000..0acacf4 --- /dev/null +++ b/tests/jsonrpc/test_error_mapping.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from a2a.types import InternalError, InvalidParamsError + +from opencode_a2a.jsonrpc.error_mapping import ( + internal_error, + interrupt_not_found_error, + invalid_params_exception_error, + method_not_supported_error, + session_forbidden_error, + session_not_found_error, + upstream_http_error, + upstream_payload_error, + upstream_unreachable_error, +) + + +def test_jsonrpc_error_mapping_helpers_preserve_business_contract_fields() -> None: + unsupported = method_not_supported_error( + method="unsupported.method", + supported_methods=["message/send", "tasks/get"], + protocol_version="0.3.0", + ) + assert unsupported.code == -32601 + assert unsupported.data["type"] == "METHOD_NOT_SUPPORTED" + + forbidden = session_forbidden_error(-32006, session_id="s-1") + assert forbidden.code == -32006 + assert forbidden.data == {"type": "SESSION_FORBIDDEN", "session_id": "s-1"} + + missing_session = session_not_found_error(-32001, session_id="s-404") + assert missing_session.data == {"type": "SESSION_NOT_FOUND", "session_id": "s-404"} + + expired_interrupt = interrupt_not_found_error(-32007, request_id="req-1", expired=True) + assert expired_interrupt.data == { + "type": "INTERRUPT_REQUEST_EXPIRED", + "request_id": "req-1", + } + + +def test_jsonrpc_error_mapping_helpers_build_upstream_envelopes() -> None: + http_error = upstream_http_error( + -32003, + upstream_status=503, + method="opencode.sessions.command", + session_id="s-1", + ) + assert http_error.data == { + "type": "UPSTREAM_HTTP_ERROR", + "upstream_status": 503, + "method": "opencode.sessions.command", + "session_id": "s-1", + } + + unreachable = upstream_unreachable_error(-32002, request_id="req-1") + assert unreachable.data == { + "type": "UPSTREAM_UNREACHABLE", + "request_id": "req-1", + } + + payload_error = upstream_payload_error( + -32005, + detail="payload mismatch", + method="opencode.providers.list", + ) + assert payload_error.data == { + "type": "UPSTREAM_PAYLOAD_ERROR", + "detail": "payload mismatch", + "method": "opencode.providers.list", + } + + +def test_invalid_and_internal_error_helpers_wrap_a2a_errors() -> None: + invalid = invalid_params_exception_error( + ValueError("bad field"), + data={"type": "INVALID_FIELD", "field": "request"}, + ) + assert isinstance(invalid.root, InvalidParamsError) + assert invalid.root.message == "bad field" + assert invalid.root.data == {"type": "INVALID_FIELD", "field": "request"} + + internal = internal_error(RuntimeError("boom")) + assert isinstance(internal.root, InternalError) + assert internal.root.message == "boom" From 9df88963f0274125bc6ba1faa5818f5c7b230b4e Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Mon, 23 Mar 2026 09:02:00 -0400 Subject: [PATCH 2/2] refactor(jsonrpc): remove thin wrapper helpers from error mapping (#274) --- src/opencode_a2a/jsonrpc/application.py | 28 ++++++++++------------- src/opencode_a2a/jsonrpc/error_mapping.py | 25 ++++---------------- tests/jsonrpc/test_error_mapping.py | 15 ++++-------- 3 files changed, 21 insertions(+), 47 deletions(-) diff --git a/src/opencode_a2a/jsonrpc/application.py b/src/opencode_a2a/jsonrpc/application.py index 7d20c85..477245d 100644 --- a/src/opencode_a2a/jsonrpc/application.py +++ b/src/opencode_a2a/jsonrpc/application.py @@ -8,6 +8,7 @@ from a2a.server.apps.jsonrpc.fastapi_app import A2AFastAPIApplication from a2a.types import ( A2AError, + InternalError, InvalidRequestError, JSONRPCRequest, ) @@ -22,10 +23,8 @@ ) from ..opencode_upstream_client import OpencodeUpstreamClient, UpstreamContractError from .error_mapping import ( - internal_error, interrupt_not_found_error, invalid_params_error, - invalid_params_exception_error, method_not_supported_error, session_forbidden_error, session_not_found_error, @@ -316,7 +315,7 @@ async def _handle_session_query_request( except JsonRpcParamsValidationError as exc: return self._generate_error_response( base_request.id, - invalid_params_exception_error(exc, data=exc.data), + invalid_params_error(str(exc), data=exc.data), ) limit = int(query["limit"]) @@ -350,7 +349,7 @@ async def _handle_session_query_request( logger.exception("OpenCode session query JSON-RPC method failed") return self._generate_error_response( base_request.id, - internal_error(exc), + A2AError(root=InternalError(message=str(exc))), ) try: @@ -446,8 +445,8 @@ async def _handle_provider_discovery_request( except ValueError as exc: return self._generate_error_response( base_request.id, - invalid_params_exception_error( - exc, + invalid_params_error( + str(exc), data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, ), ) @@ -476,7 +475,7 @@ async def _handle_provider_discovery_request( logger.exception("OpenCode provider discovery JSON-RPC method failed") return self._generate_error_response( base_request.id, - internal_error(exc), + A2AError(root=InternalError(message=str(exc))), ) try: @@ -597,10 +596,7 @@ def _log_shell_audit(outcome: str) -> None: except _PromptAsyncValidationError as exc: return self._generate_error_response( base_request.id, - invalid_params_exception_error( - exc, - data={"type": "INVALID_FIELD", "field": exc.field}, - ), + invalid_params_error(str(exc), data={"type": "INVALID_FIELD", "field": exc.field}), ) directory, metadata_error = self._extract_directory_from_metadata( @@ -615,8 +611,8 @@ def _log_shell_audit(outcome: str) -> None: except ValueError as exc: return self._generate_error_response( base_request.id, - invalid_params_exception_error( - exc, + invalid_params_error( + str(exc), data={"type": "INVALID_FIELD", "field": "metadata.opencode.directory"}, ), ) @@ -729,7 +725,7 @@ def _log_shell_audit(outcome: str) -> None: logger.exception("OpenCode session control JSON-RPC method failed") return self._generate_error_response( base_request.id, - internal_error(exc), + A2AError(root=InternalError(message=str(exc))), ) finally: if pending_claim and not claim_finalized and identity: @@ -877,7 +873,7 @@ async def _handle_interrupt_callback_request( except ValueError as exc: return self._generate_error_response( base_request.id, - invalid_params_exception_error(exc, data={"type": "INVALID_FIELD"}), + invalid_params_error(str(exc), data={"type": "INVALID_FIELD"}), ) except httpx.HTTPStatusError as exc: upstream_status = exc.response.status_code @@ -912,7 +908,7 @@ async def _handle_interrupt_callback_request( logger.exception("OpenCode interrupt callback JSON-RPC method failed") return self._generate_error_response( base_request.id, - internal_error(exc), + A2AError(root=InternalError(message=str(exc))), ) if base_request.id is None: diff --git a/src/opencode_a2a/jsonrpc/error_mapping.py b/src/opencode_a2a/jsonrpc/error_mapping.py index e3b949c..0dd0e11 100644 --- a/src/opencode_a2a/jsonrpc/error_mapping.py +++ b/src/opencode_a2a/jsonrpc/error_mapping.py @@ -2,7 +2,7 @@ from typing import Any -from a2a.types import A2AError, InternalError, InvalidParamsError, JSONRPCError +from a2a.types import A2AError, InvalidParamsError, JSONRPCError def invalid_params_error( @@ -13,18 +13,6 @@ def invalid_params_error( return A2AError(root=InvalidParamsError(message=message, data=data)) -def invalid_params_exception_error( - exc: Exception, - *, - data: dict[str, Any] | None = None, -) -> A2AError: - return invalid_params_error(str(exc), data=data) - - -def internal_error(exc: Exception) -> A2AError: - return A2AError(root=InternalError(message=str(exc))) - - def method_not_supported_error( *, method: str, @@ -83,7 +71,6 @@ def upstream_http_error( session_id: str | None = None, request_id: str | None = None, detail: str | None = None, - message: str = "Upstream OpenCode error", ) -> JSONRPCError: data: dict[str, Any] = { "type": "UPSTREAM_HTTP_ERROR", @@ -97,7 +84,7 @@ def upstream_http_error( data["request_id"] = request_id if detail is not None: data["detail"] = detail - return JSONRPCError(code=code, message=message, data=data) + return JSONRPCError(code=code, message="Upstream OpenCode error", data=data) def upstream_unreachable_error( @@ -106,7 +93,6 @@ def upstream_unreachable_error( method: str | None = None, session_id: str | None = None, request_id: str | None = None, - message: str = "Upstream OpenCode unreachable", ) -> JSONRPCError: data: dict[str, Any] = {"type": "UPSTREAM_UNREACHABLE"} if method is not None: @@ -115,7 +101,7 @@ def upstream_unreachable_error( data["session_id"] = session_id if request_id is not None: data["request_id"] = request_id - return JSONRPCError(code=code, message=message, data=data) + return JSONRPCError(code=code, message="Upstream OpenCode unreachable", data=data) def upstream_payload_error( @@ -125,7 +111,6 @@ def upstream_payload_error( method: str | None = None, session_id: str | None = None, request_id: str | None = None, - message: str = "Upstream OpenCode payload mismatch", ) -> JSONRPCError: data: dict[str, Any] = { "type": "UPSTREAM_PAYLOAD_ERROR", @@ -137,14 +122,12 @@ def upstream_payload_error( data["session_id"] = session_id if request_id is not None: data["request_id"] = request_id - return JSONRPCError(code=code, message=message, data=data) + return JSONRPCError(code=code, message="Upstream OpenCode payload mismatch", data=data) __all__ = [ - "internal_error", "interrupt_not_found_error", "invalid_params_error", - "invalid_params_exception_error", "method_not_supported_error", "session_forbidden_error", "session_not_found_error", diff --git a/tests/jsonrpc/test_error_mapping.py b/tests/jsonrpc/test_error_mapping.py index 0acacf4..68182bc 100644 --- a/tests/jsonrpc/test_error_mapping.py +++ b/tests/jsonrpc/test_error_mapping.py @@ -1,11 +1,10 @@ from __future__ import annotations -from a2a.types import InternalError, InvalidParamsError +from a2a.types import InvalidParamsError from opencode_a2a.jsonrpc.error_mapping import ( - internal_error, interrupt_not_found_error, - invalid_params_exception_error, + invalid_params_error, method_not_supported_error, session_forbidden_error, session_not_found_error, @@ -70,15 +69,11 @@ def test_jsonrpc_error_mapping_helpers_build_upstream_envelopes() -> None: } -def test_invalid_and_internal_error_helpers_wrap_a2a_errors() -> None: - invalid = invalid_params_exception_error( - ValueError("bad field"), +def test_invalid_error_helper_wraps_a2a_error() -> None: + invalid = invalid_params_error( + "bad field", data={"type": "INVALID_FIELD", "field": "request"}, ) assert isinstance(invalid.root, InvalidParamsError) assert invalid.root.message == "bad field" assert invalid.root.data == {"type": "INVALID_FIELD", "field": "request"} - - internal = internal_error(RuntimeError("boom")) - assert isinstance(internal.root, InternalError) - assert internal.root.message == "boom"