From 61440b1faf6664be0ed79117f4e56d46ce650455 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 16:19:25 +0000 Subject: [PATCH 1/4] . --- core/api/streaming_json_route.py | 6 +++++- tests/api/test_streaming_json_route.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/api/streaming_json_route.py b/core/api/streaming_json_route.py index 56c9e7f..55b9823 100644 --- a/core/api/streaming_json_route.py +++ b/core/api/streaming_json_route.py @@ -51,7 +51,11 @@ async def async_wrapper(*args: typing.Any) -> StreamingResponse: # type: ignore kibaRequest.data = requestParams responseGenerator = func(kibaRequest) wrappedGenerator = _convert_to_json_generator(typing.cast(AsyncIterator[BaseModel], responseGenerator), expectedType=typing.cast(typing.Type[BaseModel], responseType)) - return StreamingResponse(content=wrappedGenerator, media_type='application/x-ndjson') + return StreamingResponse( + content=wrappedGenerator, + media_type='application/x-ndjson', + headers={'Content-Encoding': 'identity'}, + ) # TODO(krishan711): figure out correct typing here return async_wrapper # type: ignore[return-value] diff --git a/tests/api/test_streaming_json_route.py b/tests/api/test_streaming_json_route.py index 02bce66..8056f5c 100644 --- a/tests/api/test_streaming_json_route.py +++ b/tests/api/test_streaming_json_route.py @@ -84,6 +84,7 @@ def test_streaming_json_route_with_valid_body(client): ) assert response.status_code == 200 assert response.headers["content-type"] == "application/x-ndjson" + assert response.headers["content-encoding"] == "identity" dataList = parse_ndjson_response(response) assert len(dataList) == 2 first_message = dataList[0] From 9453a3659bc5507aec88d9f7be28b5e1bc1929ea Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 16:42:00 +0000 Subject: [PATCH 2/4] . --- core/api/streaming_json_route.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/api/streaming_json_route.py b/core/api/streaming_json_route.py index 55b9823..f89c5ac 100644 --- a/core/api/streaming_json_route.py +++ b/core/api/streaming_json_route.py @@ -51,11 +51,8 @@ async def async_wrapper(*args: typing.Any) -> StreamingResponse: # type: ignore kibaRequest.data = requestParams responseGenerator = func(kibaRequest) wrappedGenerator = _convert_to_json_generator(typing.cast(AsyncIterator[BaseModel], responseGenerator), expectedType=typing.cast(typing.Type[BaseModel], responseType)) - return StreamingResponse( - content=wrappedGenerator, - media_type='application/x-ndjson', - headers={'Content-Encoding': 'identity'}, - ) + # NOTE(krishan711): we set content-encoding to identity to prevent gzip from trying to process it (cos it buffers all the content) + return StreamingResponse(content=wrappedGenerator, media_type='application/x-ndjson', headers={'Content-Encoding': 'identity'}) # TODO(krishan711): figure out correct typing here return async_wrapper # type: ignore[return-value] From 1440a58c180f2afed5461ae23d2b17a926736e70 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:10:01 +0000 Subject: [PATCH 3/4] . --- core/api/authorizer.py | 5 ++++- core/api/streaming_json_route.py | 12 ++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/api/authorizer.py b/core/api/authorizer.py index 5ebe04f..01e0dfd 100644 --- a/core/api/authorizer.py +++ b/core/api/authorizer.py @@ -38,7 +38,10 @@ def decorator(func: typing.Callable[[Arg(KibaApiRequest[ApiRequest], 'request')] @functools.wraps(func) async def async_wrapper(request: KibaApiRequest[ApiRequest]) -> typing.Any: # type: ignore[explicit-any, misc] request.authJwt = await _authorize_bearer_jwt(request=request, authorizer=authorizer) - return await func(request=request) + result = func(request=request) + if hasattr(result, '__aiter__'): + return result + return await result # TODO(krishan711): figure out correct typing here return async_wrapper # type: ignore[return-value] diff --git a/core/api/streaming_json_route.py b/core/api/streaming_json_route.py index f89c5ac..0bd7728 100644 --- a/core/api/streaming_json_route.py +++ b/core/api/streaming_json_route.py @@ -1,4 +1,5 @@ import functools +import inspect import typing from collections.abc import AsyncIterator from typing import ParamSpec @@ -29,11 +30,13 @@ def streaming_json_route[ApiRequest: BaseModel, ApiResponse: BaseModel]( ) -> typing.Callable[[typing.Callable[[KibaApiRequest[ApiRequest]], AsyncIterator[ApiResponse]]], typing.Callable[_P, StreamingResponse]]: def decorator(func: typing.Callable[[KibaApiRequest[ApiRequest]], AsyncIterator[ApiResponse]]) -> typing.Callable[_P, StreamingResponse]: @functools.wraps(func) - async def async_wrapper(*args: typing.Any) -> StreamingResponse: # type: ignore[explicit-any, misc] - receivedRequest = args[0] + async def async_wrapper(*args: typing.Any, **kwargs: typing.Any) -> StreamingResponse: # type: ignore[explicit-any, misc] + receivedRequest = kwargs.get('request', args[0] if args else None) + if receivedRequest is None: + raise BadRequestException('Missing request') pathParams = receivedRequest.path_params queryParams = receivedRequest.query_params - bodyBytes = await args[0].body() + bodyBytes = await receivedRequest.body() if len(bodyBytes) == 0: body: JsonObject = {} else: @@ -49,7 +52,8 @@ async def async_wrapper(*args: typing.Any) -> StreamingResponse: # type: ignore raise BadRequestException(f'Invalid request: {validationErrorMessage}') kibaRequest: KibaApiRequest[ApiRequest] = KibaApiRequest(scope=receivedRequest.scope, receive=receivedRequest._receive, send=receivedRequest._send) # noqa: SLF001 kibaRequest.data = requestParams - responseGenerator = func(kibaRequest) + responseGeneratorOrAwaitable = func(kibaRequest) + responseGenerator = await responseGeneratorOrAwaitable if inspect.isawaitable(responseGeneratorOrAwaitable) else responseGeneratorOrAwaitable wrappedGenerator = _convert_to_json_generator(typing.cast(AsyncIterator[BaseModel], responseGenerator), expectedType=typing.cast(typing.Type[BaseModel], responseType)) # NOTE(krishan711): we set content-encoding to identity to prevent gzip from trying to process it (cos it buffers all the content) return StreamingResponse(content=wrappedGenerator, media_type='application/x-ndjson', headers={'Content-Encoding': 'identity'}) From 1e94e09895ef5d0bb29fae84c2bde46a7a275ef6 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:10:50 +0000 Subject: [PATCH 4/4] . --- core/api/authorizer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/core/api/authorizer.py b/core/api/authorizer.py index 01e0dfd..1a6c74b 100644 --- a/core/api/authorizer.py +++ b/core/api/authorizer.py @@ -39,6 +39,7 @@ def decorator(func: typing.Callable[[Arg(KibaApiRequest[ApiRequest], 'request')] async def async_wrapper(request: KibaApiRequest[ApiRequest]) -> typing.Any: # type: ignore[explicit-any, misc] request.authJwt = await _authorize_bearer_jwt(request=request, authorizer=authorizer) result = func(request=request) + # NOTE(krishan711): this is here to support streaming responses which return an async generator if hasattr(result, '__aiter__'): return result return await result