Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [MAJOR] Added `maxWaitSeconds` with default=120 to `EthClient.wait_for_transaction_receipt`
- [MINOR] Added `async_util.gather_batched` function to `async_util`
- [MINOR] Added more exception classes
- [MAJOR] Added `apiPathPattern` to `logging.api`

### Changed

Expand Down
23 changes: 21 additions & 2 deletions core/api/middleware/logging_middleware.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
import time
import typing
import uuid

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.base import RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Match
from starlette.routing import Router
from starlette.types import ASGIApp
from starlette.types import Scope

from core import logging
from core.util.value_holder import RequestIdHolder


def _find_path_pattern(scope: Scope, prefix: str = '') -> str | None:
routes = scope['endpoint'].routes if 'endpoint' in scope else scope['app'].routes
for route in routes:
match_state, match_scope = route.matches(scope)
if match_state != Match.NONE:
if isinstance(match_scope['endpoint'], Router):
return _find_path_pattern(
scope={**match_scope, 'type': scope['type'], 'path': scope['path'][len(match_scope['root_path']) :], 'method': scope['method']},
prefix=prefix + match_scope['root_path'],
)
return prefix + typing.cast(str, route.path)
return None


class LoggingMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, requestIdHolder: RequestIdHolder | None = None) -> None:
super().__init__(app=app)
Expand All @@ -21,12 +39,13 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -
if self.requestIdHolder:
self.requestIdHolder.set_value(value=requestId)
startTime = time.time()
logging.api(action=request.method, path=request.url.path, query=request.url.query)
pathPattern = _find_path_pattern(request.scope) or request.url.path
logging.api(action=request.method, path=request.url.path, pathPattern=pathPattern, query=request.url.query)
response = await call_next(request)
duration = time.time() - startTime
response.headers['X-Response-Time'] = str(duration)
response.headers['X-Request-Id'] = requestId
logging.api(action=request.method, path=request.url.path, query=request.url.query, response=response.status_code, duration=duration)
logging.api(action=request.method, path=request.url.path, pathPattern=pathPattern, query=request.url.query, response=response.status_code, duration=duration)
if self.requestIdHolder:
self.requestIdHolder.set_value(value=None)
return response
12 changes: 9 additions & 3 deletions core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def json_parse_float_value(value: str) -> Union[float, None]:
loggerType='api',
loggerName=f'KIBA_API_{_LOGGING_FORMAT_VERSION}',
loggerFormat='%(apiAction)s:%(apiPath)s:%(apiQuery)s:%(apiResponse)s:%(apiDuration)s',
jsonFieldFormatters={'apiAction': json_parse_string_value, 'apiPath': json_parse_string_value, 'apiQuery': json_parse_string_value, 'apiResponse': json_parse_int_value, 'apiDuration': json_parse_float_value},
jsonFieldFormatters={'apiAction': json_parse_string_value, 'apiPath': json_parse_string_value, 'apiPathPattern': json_parse_string_value, 'apiQuery': json_parse_string_value, 'apiResponse': json_parse_int_value, 'apiDuration': json_parse_float_value},
)
ALL_LOGGER_FORMATS = [ROOT_FORMAT, STAT_FORMAT, API_FORMAT]

Expand Down Expand Up @@ -172,14 +172,20 @@ def stat(name: str, key: str, value: float = 1) -> None:
STAT_LOGGER.log(level=logging.INFO, msg='', extra=typing.cast(dict[str, str], {'statName': nameValue, 'statKey': keyValue, 'statValue': statValue}))


def api(action: str, path: str, query: str, response: int | None = None, duration: float | None = None) -> None:
# TODO(krishan711): make pathPattern mandatory in next major release
def api(action: str, path: str, query: str, pathPattern: str | None = None, response: int | None = None, duration: float | None = None) -> None:
if API_LOGGER.isEnabledFor(level=logging.INFO):
actionString = _serialize_string_value(value=action)
pathString = _serialize_string_value(value=path)
pathPatternString = _serialize_string_value(value=pathPattern if pathPattern is not None else path)
queryString = _serialize_string_value(value=query)
responseString = _serialize_numeric_value(value=response)
durationString = _serialize_numeric_value(value=duration)
API_LOGGER.log(level=logging.INFO, msg='', extra=typing.cast(dict[str, str], {'apiAction': actionString, 'apiPath': pathString, 'apiQuery': queryString, 'apiResponse': responseString or '', 'apiDuration': durationString or ''}))
API_LOGGER.log(
level=logging.INFO,
msg='',
extra=typing.cast(dict[str, str], {'apiAction': actionString, 'apiPath': pathString, 'apiPathPattern': pathPatternString, 'apiQuery': queryString, 'apiResponse': responseString or '', 'apiDuration': durationString or ''}),
)


# Wrappers around common python logging functions which go straight to the root logger
Expand Down
4 changes: 2 additions & 2 deletions core/queues/message_queue_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def _process_message(self, message: MessageType) -> None:
if self.requestIdHolder:
self.requestIdHolder.set_value(value=requestId)
query = urlparse.urlencode(message.content, doseq=True)
logging.api(action='MESSAGE', path=message.command, query=query)
logging.api(action='MESSAGE', path=message.command, pathPattern=message.command, query=query)
startTime = time.time()
statusCode = 200
try:
Expand All @@ -57,7 +57,7 @@ async def _process_message(self, message: MessageType) -> None:
await client.post(messageText=f'Error processing message: {message.command}\n```\n{requestId}\n{message.content}\n{exception}```')
# TODO(krishan711): should possibly reset the visibility timeout
duration = time.time() - startTime
logging.api(action='MESSAGE', path=message.command, query=query, response=statusCode, duration=duration)
logging.api(action='MESSAGE', path=message.command, pathPattern=message.command, query=query, response=statusCode, duration=duration)
if self.requestIdHolder:
self.requestIdHolder.set_value(value=None)

Expand Down
Loading