diff --git a/core/requester/request.py b/core/requester/request.py new file mode 100644 index 0000000..2b8ddd6 --- /dev/null +++ b/core/requester/request.py @@ -0,0 +1,5 @@ +import httpx + + +class KibaRequest(httpx.Request): + pass diff --git a/core/requester/requester.py b/core/requester/requester.py index 584e52d..dac6f5b 100644 --- a/core/requester/requester.py +++ b/core/requester/requester.py @@ -13,14 +13,16 @@ import httpx from core import logging +from core.caching.cache import Cache from core.exceptions import HTTP_EXCEPTIONS_MAP from core.exceptions import KibaException +from core.requester.request import KibaRequest +from core.requester.requester_cache import RequesterCache +from core.requester.response import KibaResponse from core.util import dict_util from core.util import file_util from core.util.typing_util import Json -KibaResponse = httpx.Response - FileContent = Union[IO[bytes], bytes] File = Union[FileContent, tuple[str | None, FileContent]] HttpxFileTypes = Union[ @@ -43,9 +45,10 @@ def __init__(self, message: str | None = None, statusCode: int | None = None, he class Requester: - def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True) -> None: + def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True, caches: Sequence[Cache] | None = None) -> None: self.headers = headers or {} self.client = httpx.AsyncClient(follow_redirects=shouldFollowRedirects) + self._caches = [RequesterCache(cache=cache) for cache in caches] if caches else [] async def get(self, url: str, dataDict: Json | None = None, data: bytes | None = None, timeout: int | None = 10, headers: MutableMapping[str, str] | None = None, outputFilePath: str | None = None) -> KibaResponse: return await self.make_request(method='GET', url=url, dataDict=dataDict, data=data, timeout=timeout, headers=headers, outputFilePath=outputFilePath) @@ -136,23 +139,34 @@ async def make_request( else: logging.error('Error: formFiles should only be passed into POST requests.') request = self.client.build_request(method=method, url=url, content=content, data=innerData, files=files, timeout=timeout, headers=requestHeaders) - httpxResponse = await self.client.send(request=request) - if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004 - message = httpxResponse.text - if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004 - message = httpxResponse.headers['www-authenticate'] - if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None: - exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code] - exception: KibaException = exceptionCls(message=message) - else: - exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers) - raise exception + response: KibaResponse | None = None + visitedCaches: list[RequesterCache] = [] + for cache in self._caches: + response = await cache.get(request=typing.cast(KibaRequest, request)) + if response: + break + visitedCaches.append(cache) + if not response: + httpxResponse = await self.client.send(request=request) + if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004 + message = httpxResponse.text + if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004 + message = httpxResponse.headers['www-authenticate'] + if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None: + exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code] + exception: KibaException = exceptionCls(message=message) + else: + exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers) + raise exception + response = KibaResponse.from_httpx_response(httpxResponse=httpxResponse) + for cache in visitedCaches: + await cache.add(request=typing.cast(KibaRequest, request), response=response) # TODO(krishan711): this would be more efficient if streamed if outputFilePath is not None: if os.path.dirname(outputFilePath): os.makedirs(os.path.dirname(outputFilePath), exist_ok=True) - await file_util.write_file_bytes(filePath=outputFilePath, content=httpxResponse.content) - return httpxResponse + await file_util.write_file_bytes(filePath=outputFilePath, content=response.content) + return response async def close_connections(self) -> None: await self.client.aclose() diff --git a/core/requester/requester_cache.py b/core/requester/requester_cache.py new file mode 100644 index 0000000..003a809 --- /dev/null +++ b/core/requester/requester_cache.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from pydantic import BaseModel + +from core import logging +from core.caching.cache import Cache +from core.http import RestMethod + +# from core.http.headers import CacheUsedHeader +# from core.http.headers import Header +# from core.model.protobuf import CacheEntryProto +# from core.requesters import KibaRequest +from core.requester.request import KibaRequest +from core.requester.response import KibaResponse +from core.util import date_util +from core.util import http_util +from core.util import url_util + +_DEFAULT_MAX_AGE = 24 * 60 * 60 + + +class RequesterCache: + class CacheEntry(BaseModel): + requestMethod: str + requestUrl: str + requestHeaders: dict[str, str] + response: KibaResponse + + @property + def expirySeconds(self) -> int: # noqa: N802 + cacheControlHeader = self.response.headers['cacheControl'] + if not cacheControlHeader: # or cacheControlHeader.ignoreCache: + return 0 + expiryDate = date_util.datetime_from_datetime(dt=self.response.date, seconds=cacheControlHeader.maxAge or _DEFAULT_MAX_AGE) + timeDifference = expiryDate - date_util.datetime_from_now() + return timeDifference.seconds + (timeDifference.days * 60 * 60 * 24) + + def matches(self, request: KibaRequest) -> bool: + if self.requestMethod != request.method or self.requestUrl != request.url: + return False + for varyHeaderName in self.response.headers['vary']: + oldRequestValue = self.requestHeaders.get(varyHeaderName, '') + oldRequestValues = [value.strip() for value in oldRequestValue.split(',')] + newRequestValue = request.headers.get(varyHeaderName, '') + newRequestValues = [value.strip() for value in newRequestValue.split(',')] + if set(oldRequestValues) != set(newRequestValues): + return False + return True + + @classmethod + def create(cls, request: KibaRequest, response: KibaResponse, allowPrivate: bool = False) -> RequesterCache.CacheEntry | None: # noqa: ARG003 + if request.method != RestMethod.GET: + return None + if response.status not in http_util.CACHABLE_STATUS_CODES: + return None + cacheControlHeader = response.headers.get('cacheControl') + if cacheControlHeader is None: + return None + # if cacheControlHeader.ignoreCache or not cacheControlHeader.isPublic and not allowPrivate: + # return None + return cls( + requestMethod=request.method, + requestUrl=str(request.url), + requestHeaders=dict(request.headers), + response=response, + ) + + def __init__(self, cache: Cache) -> None: + self._cache = cache + + @staticmethod + def _get_key(url: str) -> str: + return url_util.encode_url(url=url) + + async def add(self, request: KibaRequest, response: KibaResponse) -> None: + cacheEntry = RequesterCache.CacheEntry.create(request=request, response=response, allowPrivate=self._cache.isPrivate) + if cacheEntry: + cacheKey = self._get_key(url=str(request.url)) + cacheValue = cacheEntry.model_dump_json() + try: + await self._cache.set(key=cacheKey, value=cacheValue, expirySeconds=cacheEntry.expirySeconds) + except Exception as exception: # noqa: BLE001 + logging.error(msg=f'Caught error setting to cache: {self._cache.__class__.__name__!r}: {exception}') + + async def get(self, request: KibaRequest) -> KibaResponse | None: + cacheControlHeader = request.headers['cacheControl'] + if not cacheControlHeader: # cacheControlHeader.ignoreCache + return None + cacheKey = self._get_key(url=str(request.url)) + response = None + try: + responseData = await self._cache.get(key=cacheKey) + except Exception as exception: # noqa: BLE001 + logging.error(msg=f'Caught error retrieving from cache: {self._cache.__class__.__name__!r}: {exception}') + responseData = None + if responseData is not None: + cacheEntry = RequesterCache.CacheEntry.model_validate_json(responseData) + if cacheEntry.matches(request=request): + response = cacheEntry.response + response.headers['X-Cache-Used'] = 'RequesterCache-Hit' + return response diff --git a/core/requester/response.py b/core/requester/response.py new file mode 100644 index 0000000..4a5d97f --- /dev/null +++ b/core/requester/response.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import datetime +import json +import typing + +import httpx +from pydantic import BaseModel + +from core.util import date_util + + +class KibaResponse(BaseModel): + status: int + date: datetime.datetime + headers: dict[str, str] + content: bytes + + @classmethod + def from_httpx_response(cls, httpxResponse: httpx.Response) -> KibaResponse: + return cls( + status=httpxResponse.status_code, + date=date_util.datetime_from_now(), + headers=dict(httpxResponse.headers), + content=httpxResponse.content, + ) + + def json(self) -> typing.Any: # type: ignore[explicit-any, override] + return json.loads(self.content)