From 31ec27e163260f564f50da697964cb9099ac9a1f Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Wed, 26 Feb 2025 12:35:20 +0000 Subject: [PATCH] . --- core/caching/__init__.py | 0 core/caching/cache.py | 24 +++++++ core/caching/dict_cache.py | 52 +++++++++++++++ core/caching/file_cache.py | 83 ++++++++++++++++++++++++ core/http/__init__.py | 1 + core/requester/__init__.py | 1 + core/requester/request.py | 5 ++ core/{ => requester}/requester.py | 46 +++++++++----- core/requester/requester_cache.py | 101 ++++++++++++++++++++++++++++++ core/requester/response.py | 29 +++++++++ core/util/file_util.py | 18 ++++-- core/util/http_util.py | 1 + core/util/url_util.py | 17 +++++ core/web3/eth_client.py | 19 +++--- 14 files changed, 368 insertions(+), 29 deletions(-) create mode 100644 core/caching/__init__.py create mode 100644 core/caching/cache.py create mode 100644 core/caching/dict_cache.py create mode 100644 core/caching/file_cache.py create mode 100644 core/requester/__init__.py create mode 100644 core/requester/request.py rename core/{ => requester}/requester.py (82%) create mode 100644 core/requester/requester_cache.py create mode 100644 core/requester/response.py create mode 100644 core/util/http_util.py create mode 100644 core/util/url_util.py diff --git a/core/caching/__init__.py b/core/caching/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/caching/cache.py b/core/caching/cache.py new file mode 100644 index 0000000..b3b5fbf --- /dev/null +++ b/core/caching/cache.py @@ -0,0 +1,24 @@ +from abc import ABC +from abc import abstractmethod + + +class Cache(ABC): + def __init__(self, isPrivate: bool = False) -> None: + super().__init__() + self.isPrivate = isPrivate + + @abstractmethod + async def set(self, key: str, value: str, expirySeconds: float) -> bool: ... + + @abstractmethod + async def get(self, key: str) -> str | None: ... + + @abstractmethod + async def delete(self, key: str) -> bool: + """ + Returned bool indicates whether the record was removed or not + """ + ... + + @abstractmethod + def can_store_complex_objects(self) -> bool: ... diff --git a/core/caching/dict_cache.py b/core/caching/dict_cache.py new file mode 100644 index 0000000..99dbf6a --- /dev/null +++ b/core/caching/dict_cache.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import dataclasses +import datetime +from collections.abc import MutableMapping +from threading import Lock + +from core.caching.cache import Cache +from core.util import date_util + + +class DictCache(Cache): + @dataclasses.dataclass + class CacheEntry: + value: str + expiryDate: datetime.datetime + + def __init__(self, isPrivate: bool = False) -> None: + super().__init__(isPrivate=isPrivate) + self._entries: MutableMapping[str, DictCache.CacheEntry] = {} + self._lock = Lock() + + async def set(self, key: str, value: str, expirySeconds: float) -> bool: + with self._lock: + self._entries[key] = DictCache.CacheEntry( + value=value, + expiryDate=date_util.datetime_from_now(seconds=expirySeconds), + ) + return True + + def _internal_get(self, key: str) -> str | None: + entry = self._entries.get(key) + if not entry: + return None + if entry.expiryDate < date_util.datetime_from_now(): + del self._entries[key] + return None + return entry.value + + async def get(self, key: str) -> str | None: + with self._lock: + return self._internal_get(key=key) + + async def delete(self, key: str) -> bool: + with self._lock: + ret = self._internal_get(key=key) + if ret: + del self._entries[key] + return ret is not None + + def can_store_complex_objects(self) -> bool: # pylint: disable=no-self-use + return True diff --git a/core/caching/file_cache.py b/core/caching/file_cache.py new file mode 100644 index 0000000..18afc23 --- /dev/null +++ b/core/caching/file_cache.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import asyncio +import os +from threading import Lock + +from core.caching.cache import Cache +from core.util import date_util +from core.util import file_util + + +class FileCache(Cache): + def __init__(self, cacheDirectory: str, isPrivate: bool = False) -> None: + super().__init__(isPrivate=isPrivate) + self._cacheDirectory = cacheDirectory + self._lock = Lock() + + # async def load_cache_file( + # fileName: str, + # expirySeconds: int = 3600, + # ) -> str | None: + # cacheFileDirectory = os.path.join('./data/cache', fileName) + # fileExists = await file_util.file_exists(filePath=cacheFileDirectory) + # fileAgeMillis = await file_util.get_file_age_millis(filePath=cacheFileDirectory) if fileExists else 0 + # if fileExists and (fileAgeMillis / 1000) < expirySeconds: + # return await file_util.read_file(filePath=cacheFileDirectory) + # return None + + # async def load_json_cache_file( # type: ignore[explicit-any] + # fileName: str, + # expirySeconds: int = 3600, + # ) -> dict[str, Any] | list[Any] | None: + # content = await load_cache_file(fileName=fileName, expirySeconds=expirySeconds) + # return json.loads(content) if content is not None else None + + async def set(self, key: str, value: str, expirySeconds: float) -> bool: + cacheFileDirectory = os.path.join(self._cacheDirectory, key) + contentFilePath = os.path.join(cacheFileDirectory, 'content.txt') + expiryFilePath = os.path.join(cacheFileDirectory, 'expiryDate.txt') + expiryDateString = date_util.datetime_to_string(dt=date_util.datetime_from_now(seconds=expirySeconds)) + with self._lock: + asyncio.gather( + *[ + file_util.write_file(filePath=contentFilePath, content=value), + file_util.write_file(filePath=expiryFilePath, content=expiryDateString), + ] + ) + return True + + async def _internal_get(self, key: str) -> str | None: + cacheFileDirectory = os.path.join(self._cacheDirectory, key) + contentFilePath = os.path.join(cacheFileDirectory, 'content.txt') + expiryFilePath = os.path.join(cacheFileDirectory, 'expiryDate.txt') + contentFileExists = await file_util.file_exists(filePath=contentFilePath) + if not contentFileExists: + return None + expiryFileExists = await file_util.file_exists(filePath=expiryFilePath) + if not expiryFileExists: + await file_util.remove_file(filePath=contentFilePath) + return None + expiryDateString = await file_util.read_file(filePath=expiryFilePath) + expiryDate = date_util.datetime_from_string(dateString=expiryDateString) + if expiryDate < date_util.datetime_from_now(): + await file_util.remove_file(filePath=contentFilePath) + return None + return await file_util.read_file(filePath=contentFilePath) + + async def get(self, key: str) -> str | None: + with self._lock: + return await self._internal_get(key=key) + + async def delete(self, key: str) -> bool: + # NOTE(krishan711): we only delete content file for speed + cacheFileDirectory = os.path.join(self._cacheDirectory, key) + contentFilePath = os.path.join(cacheFileDirectory, 'content.txt') + with self._lock: + fileExists = await file_util.file_exists(filePath=contentFilePath) + if fileExists: + await file_util.remove_file(filePath=contentFilePath) + return fileExists + + def can_store_complex_objects(self) -> bool: + return False diff --git a/core/http/__init__.py b/core/http/__init__.py index dc9fd4c..41dd7e0 100644 --- a/core/http/__init__.py +++ b/core/http/__init__.py @@ -1 +1,2 @@ # noqa: A005 +from .rest_method import RestMethod # noqa: F401 diff --git a/core/requester/__init__.py b/core/requester/__init__.py new file mode 100644 index 0000000..160b113 --- /dev/null +++ b/core/requester/__init__.py @@ -0,0 +1 @@ +from .requester import * # noqa: F403 diff --git a/core/requester/request.py b/core/requester/request.py new file mode 100644 index 0000000..2b8ddd6 --- /dev/null +++ b/core/requester/request.py @@ -0,0 +1,5 @@ +import httpx + + +class KibaRequest(httpx.Request): + pass diff --git a/core/requester.py b/core/requester/requester.py similarity index 82% rename from core/requester.py rename to core/requester/requester.py index 584e52d..dac6f5b 100644 --- a/core/requester.py +++ b/core/requester/requester.py @@ -13,14 +13,16 @@ import httpx from core import logging +from core.caching.cache import Cache from core.exceptions import HTTP_EXCEPTIONS_MAP from core.exceptions import KibaException +from core.requester.request import KibaRequest +from core.requester.requester_cache import RequesterCache +from core.requester.response import KibaResponse from core.util import dict_util from core.util import file_util from core.util.typing_util import Json -KibaResponse = httpx.Response - FileContent = Union[IO[bytes], bytes] File = Union[FileContent, tuple[str | None, FileContent]] HttpxFileTypes = Union[ @@ -43,9 +45,10 @@ def __init__(self, message: str | None = None, statusCode: int | None = None, he class Requester: - def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True) -> None: + def __init__(self, headers: Mapping[str, str] | None = None, shouldFollowRedirects: bool = True, caches: Sequence[Cache] | None = None) -> None: self.headers = headers or {} self.client = httpx.AsyncClient(follow_redirects=shouldFollowRedirects) + self._caches = [RequesterCache(cache=cache) for cache in caches] if caches else [] async def get(self, url: str, dataDict: Json | None = None, data: bytes | None = None, timeout: int | None = 10, headers: MutableMapping[str, str] | None = None, outputFilePath: str | None = None) -> KibaResponse: return await self.make_request(method='GET', url=url, dataDict=dataDict, data=data, timeout=timeout, headers=headers, outputFilePath=outputFilePath) @@ -136,23 +139,34 @@ async def make_request( else: logging.error('Error: formFiles should only be passed into POST requests.') request = self.client.build_request(method=method, url=url, content=content, data=innerData, files=files, timeout=timeout, headers=requestHeaders) - httpxResponse = await self.client.send(request=request) - if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004 - message = httpxResponse.text - if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004 - message = httpxResponse.headers['www-authenticate'] - if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None: - exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code] - exception: KibaException = exceptionCls(message=message) - else: - exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers) - raise exception + response: KibaResponse | None = None + visitedCaches: list[RequesterCache] = [] + for cache in self._caches: + response = await cache.get(request=typing.cast(KibaRequest, request)) + if response: + break + visitedCaches.append(cache) + if not response: + httpxResponse = await self.client.send(request=request) + if 400 <= httpxResponse.status_code < 600: # noqa: PLR2004 + message = httpxResponse.text + if not message and httpxResponse.status_code == 401 and httpxResponse.headers.get('www-authenticate'): # noqa: PLR2004 + message = httpxResponse.headers['www-authenticate'] + if HTTP_EXCEPTIONS_MAP.get(httpxResponse.status_code) is not None: + exceptionCls = HTTP_EXCEPTIONS_MAP[httpxResponse.status_code] + exception: KibaException = exceptionCls(message=message) + else: + exception = ResponseException(message=message, statusCode=httpxResponse.status_code, headers=httpxResponse.headers) + raise exception + response = KibaResponse.from_httpx_response(httpxResponse=httpxResponse) + for cache in visitedCaches: + await cache.add(request=typing.cast(KibaRequest, request), response=response) # TODO(krishan711): this would be more efficient if streamed if outputFilePath is not None: if os.path.dirname(outputFilePath): os.makedirs(os.path.dirname(outputFilePath), exist_ok=True) - await file_util.write_file_bytes(filePath=outputFilePath, content=httpxResponse.content) - return httpxResponse + await file_util.write_file_bytes(filePath=outputFilePath, content=response.content) + return response async def close_connections(self) -> None: await self.client.aclose() diff --git a/core/requester/requester_cache.py b/core/requester/requester_cache.py new file mode 100644 index 0000000..003a809 --- /dev/null +++ b/core/requester/requester_cache.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from pydantic import BaseModel + +from core import logging +from core.caching.cache import Cache +from core.http import RestMethod + +# from core.http.headers import CacheUsedHeader +# from core.http.headers import Header +# from core.model.protobuf import CacheEntryProto +# from core.requesters import KibaRequest +from core.requester.request import KibaRequest +from core.requester.response import KibaResponse +from core.util import date_util +from core.util import http_util +from core.util import url_util + +_DEFAULT_MAX_AGE = 24 * 60 * 60 + + +class RequesterCache: + class CacheEntry(BaseModel): + requestMethod: str + requestUrl: str + requestHeaders: dict[str, str] + response: KibaResponse + + @property + def expirySeconds(self) -> int: # noqa: N802 + cacheControlHeader = self.response.headers['cacheControl'] + if not cacheControlHeader: # or cacheControlHeader.ignoreCache: + return 0 + expiryDate = date_util.datetime_from_datetime(dt=self.response.date, seconds=cacheControlHeader.maxAge or _DEFAULT_MAX_AGE) + timeDifference = expiryDate - date_util.datetime_from_now() + return timeDifference.seconds + (timeDifference.days * 60 * 60 * 24) + + def matches(self, request: KibaRequest) -> bool: + if self.requestMethod != request.method or self.requestUrl != request.url: + return False + for varyHeaderName in self.response.headers['vary']: + oldRequestValue = self.requestHeaders.get(varyHeaderName, '') + oldRequestValues = [value.strip() for value in oldRequestValue.split(',')] + newRequestValue = request.headers.get(varyHeaderName, '') + newRequestValues = [value.strip() for value in newRequestValue.split(',')] + if set(oldRequestValues) != set(newRequestValues): + return False + return True + + @classmethod + def create(cls, request: KibaRequest, response: KibaResponse, allowPrivate: bool = False) -> RequesterCache.CacheEntry | None: # noqa: ARG003 + if request.method != RestMethod.GET: + return None + if response.status not in http_util.CACHABLE_STATUS_CODES: + return None + cacheControlHeader = response.headers.get('cacheControl') + if cacheControlHeader is None: + return None + # if cacheControlHeader.ignoreCache or not cacheControlHeader.isPublic and not allowPrivate: + # return None + return cls( + requestMethod=request.method, + requestUrl=str(request.url), + requestHeaders=dict(request.headers), + response=response, + ) + + def __init__(self, cache: Cache) -> None: + self._cache = cache + + @staticmethod + def _get_key(url: str) -> str: + return url_util.encode_url(url=url) + + async def add(self, request: KibaRequest, response: KibaResponse) -> None: + cacheEntry = RequesterCache.CacheEntry.create(request=request, response=response, allowPrivate=self._cache.isPrivate) + if cacheEntry: + cacheKey = self._get_key(url=str(request.url)) + cacheValue = cacheEntry.model_dump_json() + try: + await self._cache.set(key=cacheKey, value=cacheValue, expirySeconds=cacheEntry.expirySeconds) + except Exception as exception: # noqa: BLE001 + logging.error(msg=f'Caught error setting to cache: {self._cache.__class__.__name__!r}: {exception}') + + async def get(self, request: KibaRequest) -> KibaResponse | None: + cacheControlHeader = request.headers['cacheControl'] + if not cacheControlHeader: # cacheControlHeader.ignoreCache + return None + cacheKey = self._get_key(url=str(request.url)) + response = None + try: + responseData = await self._cache.get(key=cacheKey) + except Exception as exception: # noqa: BLE001 + logging.error(msg=f'Caught error retrieving from cache: {self._cache.__class__.__name__!r}: {exception}') + responseData = None + if responseData is not None: + cacheEntry = RequesterCache.CacheEntry.model_validate_json(responseData) + if cacheEntry.matches(request=request): + response = cacheEntry.response + response.headers['X-Cache-Used'] = 'RequesterCache-Hit' + return response diff --git a/core/requester/response.py b/core/requester/response.py new file mode 100644 index 0000000..4a5d97f --- /dev/null +++ b/core/requester/response.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import datetime +import json +import typing + +import httpx +from pydantic import BaseModel + +from core.util import date_util + + +class KibaResponse(BaseModel): + status: int + date: datetime.datetime + headers: dict[str, str] + content: bytes + + @classmethod + def from_httpx_response(cls, httpxResponse: httpx.Response) -> KibaResponse: + return cls( + status=httpxResponse.status_code, + date=date_util.datetime_from_now(), + headers=dict(httpxResponse.headers), + content=httpxResponse.content, + ) + + def json(self) -> typing.Any: # type: ignore[explicit-any, override] + return json.loads(self.content) diff --git a/core/util/file_util.py b/core/util/file_util.py index 5ed3752..a166aa8 100644 --- a/core/util/file_util.py +++ b/core/util/file_util.py @@ -67,22 +67,30 @@ def read_file_bytes_sync(filePath: str) -> bytes: return file.read() -async def write_file(filePath: str, content: str, shouldRaiseIfFileExists: bool | None = False) -> None: +async def write_file(filePath: str, content: str, shouldCreateParentDirectories: bool = True, shouldRaiseIfFileExists: bool = False) -> None: + if shouldCreateParentDirectories: + await create_directory(directory=os.path.dirname(filePath), shouldAllowExisting=True) async with aiofiles.open(filePath, 'x' if shouldRaiseIfFileExists else 'w') as file: await file.write(content) -async def write_file_bytes(filePath: str, content: bytes, shouldRaiseIfFileExists: bool | None = False) -> None: +async def write_file_bytes(filePath: str, content: bytes, shouldCreateParentDirectories: bool = True, shouldRaiseIfFileExists: bool = False) -> None: + if shouldCreateParentDirectories: + await create_directory(directory=os.path.dirname(filePath), shouldAllowExisting=True) async with aiofiles.open(filePath, 'xb' if shouldRaiseIfFileExists else 'wb') as file: await file.write(content) -def write_file_sync(filePath: str, content: str, shouldRaiseIfFileExists: bool | None = False) -> None: +def write_file_sync(filePath: str, content: str, shouldCreateParentDirectories: bool = True, shouldRaiseIfFileExists: bool = False) -> None: + if shouldCreateParentDirectories: + create_directory_sync(directory=os.path.dirname(filePath), shouldAllowExisting=True) with open(filePath, 'x' if shouldRaiseIfFileExists else 'w') as file: file.write(content) -def write_file_bytes_sync(filePath: str, content: bytes, shouldRaiseIfFileExists: bool | None = False) -> None: +def write_file_bytes_sync(filePath: str, content: bytes, shouldCreateParentDirectories: bool = True, shouldRaiseIfFileExists: bool = False) -> None: + if shouldCreateParentDirectories: + create_directory_sync(directory=os.path.dirname(filePath), shouldAllowExisting=True) with open(filePath, 'xb' if shouldRaiseIfFileExists else 'wb') as file: file.write(content) @@ -91,7 +99,7 @@ async def create_directory(directory: str, shouldAllowExisting: bool = True) -> pathlib.Path(directory).mkdir(parents=True, exist_ok=shouldAllowExisting) -async def create_directory_sync(directory: str, shouldAllowExisting: bool = True) -> None: +def create_directory_sync(directory: str, shouldAllowExisting: bool = True) -> None: pathlib.Path(directory).mkdir(parents=True, exist_ok=shouldAllowExisting) diff --git a/core/util/http_util.py b/core/util/http_util.py new file mode 100644 index 0000000..8b37167 --- /dev/null +++ b/core/util/http_util.py @@ -0,0 +1 @@ +CACHABLE_STATUS_CODES = {200, 203, 206, 300, 301, 410} diff --git a/core/util/url_util.py b/core/util/url_util.py new file mode 100644 index 0000000..f4de685 --- /dev/null +++ b/core/util/url_util.py @@ -0,0 +1,17 @@ +from urllib import parse + + +def encode_url(url: str) -> str: + return parse.quote(string=url) + + +def encode_url_part(urlPart: str) -> str: + return parse.quote(string=urlPart, safe='') + + +def decode_url(url: str) -> str: + return parse.unquote(string=url) + + +def decode_url_part(urlPart: str) -> str: + return parse.unquote(string=urlPart) diff --git a/core/web3/eth_client.py b/core/web3/eth_client.py index fc29117..30b1273 100644 --- a/core/web3/eth_client.py +++ b/core/web3/eth_client.py @@ -16,8 +16,11 @@ from web3.types import HexBytes from web3.types import HexStr from web3.types import LogReceipt +from web3.types import Nonce from web3.types import TxData +from web3.types import TxParams from web3.types import TxReceipt +from web3.types import Wei from core.exceptions import BadRequestException from core.exceptions import KibaException @@ -353,10 +356,10 @@ def _get_base_transaction_params( functionAbi: ABIFunction, fromAddress: str, arguments: DictStrAny | None = None, - ) -> DictStrAny: - params = { - 'to': toAddress, - 'from': fromAddress, + ) -> TxParams: + params: TxParams = { + 'to': chain_util.normalize_address(value=toAddress), + 'from': chain_util.normalize_address(value=fromAddress), 'data': encode_transaction_data(w3=self.w3, abi_element_identifier=functionAbi['name'], contract_abi=contractAbi, abi_callable=functionAbi, kwargs=(arguments or {}), args=[]), } return params @@ -373,7 +376,7 @@ async def _get_transaction_params( maxPriorityFeePerGas: int | None = None, arguments: DictStrAny | None = None, chainId: int | None = None, - ) -> DictStrAny: + ) -> TxParams: params = self._get_base_transaction_params(toAddress=toAddress, contractAbi=contractAbi, functionAbi=functionAbi, fromAddress=fromAddress, arguments=arguments) if gas is None: response = await self._make_request(method='eth_estimateGas', params=[params]) @@ -382,15 +385,15 @@ async def _get_transaction_params( if maxPriorityFeePerGas is None: response = await self._make_request(method='eth_maxPriorityFeePerGas') maxPriorityFeePerGas = int(response['result'], 16) - params['maxPriorityFeePerGas'] = maxPriorityFeePerGas + params['maxPriorityFeePerGas'] = typing.cast(Wei, maxPriorityFeePerGas) if maxFeePerGas is None: response = await self._make_request(method='eth_getBlockByNumber', params=['pending', False]) baseFeePerGas = int(response['result']['baseFeePerGas'], 16) maxFeePerGas = baseFeePerGas + maxPriorityFeePerGas - params['maxFeePerGas'] = maxFeePerGas + params['maxFeePerGas'] = typing.cast(Wei, maxFeePerGas) if nonce is None: nonce = await self.get_transaction_count(address=fromAddress) - params['nonce'] = nonce + params['nonce'] = typing.cast(Nonce, nonce) if chainId is not None: params['chainId'] = chainId return params