Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/requester/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import httpx


class KibaRequest(httpx.Request):
pass
46 changes: 30 additions & 16 deletions core/requester/requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
import httpx

from core import logging
from core.caching.cache import Cache
from core.exceptions import HTTP_EXCEPTIONS_MAP
from core.exceptions import KibaException
from core.requester.request import KibaRequest
from core.requester.requester_cache import RequesterCache
from core.requester.response import KibaResponse
from core.util import dict_util
from core.util import file_util
from core.util.typing_util import Json

KibaResponse = httpx.Response

FileContent = Union[IO[bytes], bytes]
File = Union[FileContent, tuple[str | None, FileContent]]
HttpxFileTypes = Union[
Expand All @@ -43,9 +45,10 @@ def __init__(self, message: str | None = None, statusCode: int | None = None, he


class Requester:
def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True) -> None:
def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True, caches: Sequence[Cache] | None = None) -> None:
self.headers = headers or {}
self.client = httpx.AsyncClient(follow_redirects=shouldFollowRedirects)
self._caches = [RequesterCache(cache=cache) for cache in caches] if caches else []

async def get(self, url: str, dataDict: Json | None = None, data: bytes | None = None, timeout: int | None = 10, headers: MutableMapping[str, str] | None = None, outputFilePath: str | None = None) -> KibaResponse:
return await self.make_request(method='GET', url=url, dataDict=dataDict, data=data, timeout=timeout, headers=headers, outputFilePath=outputFilePath)
Expand Down Expand Up @@ -136,23 +139,34 @@ async def make_request(
else:
logging.error('Error: formFiles should only be passed into POST requests.')
request = self.client.build_request(method=method, url=url, content=content, data=innerData, files=files, timeout=timeout, headers=requestHeaders)
httpxResponse = await self.client.send(request=request)
if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004
message = httpxResponse.text
if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004
message = httpxResponse.headers['www-authenticate']
if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None:
exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code]
exception: KibaException = exceptionCls(message=message)
else:
exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers)
raise exception
response: KibaResponse | None = None
visitedCaches: list[RequesterCache] = []
for cache in self._caches:
response = await cache.get(request=typing.cast(KibaRequest, request))
if response:
break
visitedCaches.append(cache)
if not response:
httpxResponse = await self.client.send(request=request)
if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004
message = httpxResponse.text
if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004
message = httpxResponse.headers['www-authenticate']
if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None:
exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code]
exception: KibaException = exceptionCls(message=message)
else:
exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers)
raise exception
response = KibaResponse.from_httpx_response(httpxResponse=httpxResponse)
for cache in visitedCaches:
await cache.add(request=typing.cast(KibaRequest, request), response=response)
# TODO(krishan711): this would be more efficient if streamed
if outputFilePath is not None:
if os.path.dirname(outputFilePath):
os.makedirs(os.path.dirname(outputFilePath), exist_ok=True)
await file_util.write_file_bytes(filePath=outputFilePath, content=httpxResponse.content)
return httpxResponse
await file_util.write_file_bytes(filePath=outputFilePath, content=response.content)
return response

async def close_connections(self) -> None:
await self.client.aclose()
101 changes: 101 additions & 0 deletions core/requester/requester_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

from pydantic import BaseModel

from core import logging
from core.caching.cache import Cache
from core.http import RestMethod

# from core.http.headers import CacheUsedHeader
# from core.http.headers import Header
# from core.model.protobuf import CacheEntryProto
# from core.requesters import KibaRequest
from core.requester.request import KibaRequest
from core.requester.response import KibaResponse
from core.util import date_util
from core.util import http_util
from core.util import url_util

_DEFAULT_MAX_AGE = 24 * 60 * 60


class RequesterCache:
class CacheEntry(BaseModel):
requestMethod: str
requestUrl: str
requestHeaders: dict[str, str]
response: KibaResponse

@property
def expirySeconds(self) -> int: # noqa: N802
cacheControlHeader = self.response.headers['cacheControl']
if not cacheControlHeader: # or cacheControlHeader.ignoreCache:
return 0
expiryDate = date_util.datetime_from_datetime(dt=self.response.date, seconds=cacheControlHeader.maxAge or _DEFAULT_MAX_AGE)
timeDifference = expiryDate - date_util.datetime_from_now()
return timeDifference.seconds + (timeDifference.days * 60 * 60 * 24)

def matches(self, request: KibaRequest) -> bool:
if self.requestMethod != request.method or self.requestUrl != request.url:
return False
for varyHeaderName in self.response.headers['vary']:
oldRequestValue = self.requestHeaders.get(varyHeaderName, '')
oldRequestValues = [value.strip() for value in oldRequestValue.split(',')]
newRequestValue = request.headers.get(varyHeaderName, '')
newRequestValues = [value.strip() for value in newRequestValue.split(',')]
if set(oldRequestValues) != set(newRequestValues):
return False
return True

@classmethod
def create(cls, request: KibaRequest, response: KibaResponse, allowPrivate: bool = False) -> RequesterCache.CacheEntry | None: # noqa: ARG003
if request.method != RestMethod.GET:
return None
if response.status not in http_util.CACHABLE_STATUS_CODES:
return None
cacheControlHeader = response.headers.get('cacheControl')
if cacheControlHeader is None:
return None
# if cacheControlHeader.ignoreCache or not cacheControlHeader.isPublic and not allowPrivate:
# return None
return cls(
requestMethod=request.method,
requestUrl=str(request.url),
requestHeaders=dict(request.headers),
response=response,
)

def __init__(self, cache: Cache) -> None:
self._cache = cache

@staticmethod
def _get_key(url: str) -> str:
return url_util.encode_url(url=url)

async def add(self, request: KibaRequest, response: KibaResponse) -> None:
cacheEntry = RequesterCache.CacheEntry.create(request=request, response=response, allowPrivate=self._cache.isPrivate)
if cacheEntry:
cacheKey = self._get_key(url=str(request.url))
cacheValue = cacheEntry.model_dump_json()
try:
await self._cache.set(key=cacheKey, value=cacheValue, expirySeconds=cacheEntry.expirySeconds)
except Exception as exception: # noqa: BLE001
logging.error(msg=f'Caught error setting to cache: {self._cache.__class__.__name__!r}: {exception}')

async def get(self, request: KibaRequest) -> KibaResponse | None:
cacheControlHeader = request.headers['cacheControl']
if not cacheControlHeader: # cacheControlHeader.ignoreCache
return None
cacheKey = self._get_key(url=str(request.url))
response = None
try:
responseData = await self._cache.get(key=cacheKey)
except Exception as exception: # noqa: BLE001
logging.error(msg=f'Caught error retrieving from cache: {self._cache.__class__.__name__!r}: {exception}')
responseData = None
if responseData is not None:
cacheEntry = RequesterCache.CacheEntry.model_validate_json(responseData)
if cacheEntry.matches(request=request):
response = cacheEntry.response
response.headers['X-Cache-Used'] = 'RequesterCache-Hit'
return response
29 changes: 29 additions & 0 deletions core/requester/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

import datetime
import json
import typing

import httpx
from pydantic import BaseModel

from core.util import date_util


class KibaResponse(BaseModel):
status: int
date: datetime.datetime
headers: dict[str, str]
content: bytes

@classmethod
def from_httpx_response(cls, httpxResponse: httpx.Response) -> KibaResponse:
return cls(
status=httpxResponse.status_code,
date=date_util.datetime_from_now(),
headers=dict(httpxResponse.headers),
content=httpxResponse.content,
)

def json(self) -> typing.Any: # type: ignore[explicit-any, override]
return json.loads(self.content)