From 0415f483911a84b0aee15609fb0c2266bc3e944c Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Mon, 27 Apr 2026 16:54:07 +0200 Subject: [PATCH 1/9] feat(logs): add customer log endpoint and admin security --- hub_adapter/auth.py | 28 ++++++++++++++++++ hub_adapter/routers/logs.py | 52 +++++++++++++++++++++++++++++++-- hub_adapter/schemas/logs.py | 18 ++++++++++++ tests/router_tests/routes.py | 9 +++++- tests/router_tests/test_logs.py | 44 ++++++++++++++++++++++++++++ tests/test_auth.py | 31 ++++++++++++++++++++ uv.lock | 2 +- 7 files changed, 179 insertions(+), 5 deletions(-) diff --git a/hub_adapter/auth.py b/hub_adapter/auth.py index e3f7ffd..19059e4 100644 --- a/hub_adapter/auth.py +++ b/hub_adapter/auth.py @@ -258,3 +258,31 @@ async def require_researcher_role( """Dependency to check if the user has the ADMIN_ROLE or RESEARCHER_ROLE.""" researcher_role = settings.researcher_role return _require_role(researcher_role, verified_token, settings) + + +async def require_admin_role( + verified_token: Annotated[dict, Depends(verify_idp_token)], + settings: Annotated[Settings, Depends(get_settings)], +) -> dict: + """Dependency to check if the user has the ADMIN_ROLE.""" + role_claim_name = settings.role_claim_name + admin_role = settings.admin_role + if role_claim_name and admin_role: + role_claim_keys = role_claim_name.split(".") + parsed_claim = verified_token + for key in role_claim_keys: + parsed_claim = parsed_claim.get(key, {}) + if not parsed_claim: + logger.warning(f"No roles found in token using {role_claim_name}") + if isinstance(parsed_claim, str): + parsed_claim = [parsed_claim] + if admin_role not in parsed_claim: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "message": "Insufficient permissions, admin role not found in token.", + "service": "Auth", + "status_code": status.HTTP_403_FORBIDDEN, + }, + ) + return verified_token diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index 1c63d58..f6f6dde 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -8,13 +8,13 @@ from typing import Annotated import httpx -from fastapi import APIRouter, HTTPException, Query, Security, Path +from fastapi import APIRouter, Depends, HTTPException, Query, Security, Path from starlette import status -from hub_adapter.auth import verify_idp_token, jwtbearer +from hub_adapter.auth import verify_idp_token, jwtbearer, require_admin_role from hub_adapter.constants import ServiceTag from hub_adapter.dependencies import get_settings, make_log_hook -from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse +from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse, LogQLQueryRequest, LogQLQueryResponse logger = logging.getLogger(__name__) @@ -48,6 +48,23 @@ def count_logs(query: str, params: dict | None = None) -> int: return 0 +def _execute_raw_query(query: str, params: dict | None = None) -> list[dict]: + """Execute a LogQL query against VictoriaLogs and return raw parsed results.""" + settings = get_settings() + query_data = {"query": query, **(params or {})} + with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: + resp = client.post( + f"{settings.victoria_logs_url}/select/logsql/query", + data=query_data, + ) + resp.raise_for_status() + logs = [] + for line in resp.text.strip().splitlines(): + if line: + logs.append(json.loads(line)) + return logs + + def query_logs(query: str, params: dict | None = None): """Retrieve a selection of logs.""" _fields = ( @@ -308,3 +325,32 @@ async def get_analysis_log_history( ) return {"analysis_id": analysis_id, "runs": result_runs} + + +@logs_router.post( + "/logs/query", + status_code=status.HTTP_200_OK, + name="logs.query.raw", + response_model=LogQLQueryResponse, + dependencies=[Depends(require_admin_role)], +) +async def raw_log_query(body: LogQLQueryRequest): + """Execute a raw LogQL query against VictoriaLogs. Admin only.""" + settings = get_settings() + if not settings.victoria_logs_url: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Log service is not configured", + ) + + params: dict = {"limit": body.limit, "offset": body.offset} + if body.start: + params["start"] = body.start.isoformat() + if body.end: + params["end"] = body.end.isoformat() + + data = _execute_raw_query(body.query, params) + total = count_logs(body.query, params) + + meta = {"total": total, "limit": body.limit, "offset": body.offset, "count": len(data)} + return {"data": data, "meta": meta} diff --git a/hub_adapter/schemas/logs.py b/hub_adapter/schemas/logs.py index b688fca..134c78b 100644 --- a/hub_adapter/schemas/logs.py +++ b/hub_adapter/schemas/logs.py @@ -78,6 +78,23 @@ class AnalysisLogHistoryResponse(BaseModel): runs: list[RunLogs] +class LogQLQueryRequest(BaseModel): + """Request body for a raw LogQL query.""" + + query: str + limit: int = 50 + offset: int = 0 + start: datetime.datetime | None = None + end: datetime.datetime | None = None + + +class LogQLQueryResponse(BaseModel): + """Response for a raw LogQL query.""" + + data: list[dict] + meta: Meta + + # Events ## Keys are the event name and the value is a human-readable description of the event TRACKED_EVENTS = { @@ -125,6 +142,7 @@ class AnalysisLogHistoryResponse(BaseModel): "logs.events.get": "A user requested a list of events from the event log", "logs.analysis.live.get": "A user requested the logs for an analysis", "logs.analysis.history.get": "A user requested the log history for an analysis", + "logs.query.raw": "An admin sent a raw LogQL query to VictoriaLogs", "autostart.analysis.create": "The Hub Adapter automatically sent a request to start an analysis to the Pod Orchestrator", "api.ui.access": "The API Swagger UI was accessed", "unknown": "An unknown event has occurred", diff --git a/tests/router_tests/routes.py b/tests/router_tests/routes.py index 2ee8ebe..9e8a982 100644 --- a/tests/router_tests/routes.py +++ b/tests/router_tests/routes.py @@ -13,7 +13,7 @@ ListRoutes, ListServices, ) -from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse +from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse, LogQLQueryResponse from hub_adapter.schemas.podorc import ( CleanupPodResponse, LogResponse, @@ -58,6 +58,13 @@ "status_code": 200, "response_model": AnalysisLogHistoryResponse, }, + { + "path": "/logs/query", + "name": "logs.query.raw", + "methods": {"POST"}, + "status_code": 200, + "response_model": LogQLQueryResponse, + }, ) EXPECTED_META_ROUTE_CONFIG = ( diff --git a/tests/router_tests/test_logs.py b/tests/router_tests/test_logs.py index 21d6b1f..8a21949 100644 --- a/tests/router_tests/test_logs.py +++ b/tests/router_tests/test_logs.py @@ -13,7 +13,9 @@ get_analysis_logs, get_events, logs_router, + raw_log_query, ) +from hub_adapter.schemas.logs import LogQLQueryRequest from tests.conftest import check_routes from tests.router_tests.routes import EXPECTED_LOGS_ROUTE_CONFIG @@ -224,3 +226,45 @@ async def test_returns_empty_runs_list_when_no_containers_found( assert result["analysis_id"] == analysis_id assert result["runs"] == [] + + +class TestRawLogQuery: + """Tests for the POST /logs/query endpoint.""" + + def test_route_configs(self, test_client): + """Test endpoint configurations for the logs router.""" + check_routes(logs_router, EXPECTED_LOGS_ROUTE_CONFIG, test_client) + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs.get_settings") + async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): + """raw_log_query raises 503 when victoria_logs_url is None.""" + mock_get_settings.return_value.victoria_logs_url = None + + with pytest.raises(HTTPException) as exc_info: + await raw_log_query(LogQLQueryRequest(query="*")) + + assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + assert exc_info.value.detail == "Log service is not configured" + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs.count_logs") + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_returns_paginated_data_when_configured( + self, mock_get_settings, mock_execute, mock_count + ): + """raw_log_query returns data and meta envelope when configured.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [{"_msg": "hello", "level": "info"}] + mock_count.return_value = 1 + + body = LogQLQueryRequest(query="*", limit=10, offset=0) + result = await raw_log_query(body) + + assert result["meta"]["total"] == 1 + assert result["meta"]["count"] == 1 + assert result["meta"]["limit"] == 10 + assert result["meta"]["offset"] == 0 + assert len(result["data"]) == 1 + assert result["data"][0] == {"_msg": "hello", "level": "info"} diff --git a/tests/test_auth.py b/tests/test_auth.py index 5bbb9eb..f82c50e 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -15,6 +15,7 @@ _add_internal_token_if_missing, _get_internal_token, get_hub_public_key, + require_admin_role, require_researcher_role, require_steward_role, verify_idp_token, @@ -24,6 +25,7 @@ ADMIN_ROLE, RESEARCHER_ROLE, STEWARD_ROLE, + TEST_ADMIN_DECRYPTED_JWT, TEST_JWKS_RESPONSE, TEST_JWT, TEST_OIDC, @@ -202,3 +204,32 @@ async def test_check_rbac_rules(self, mock_logger): steward_error.value.detail["message"] == f"Insufficient permissions, admin or {STEWARD_ROLE} role not found in token." ) + + @pytest.mark.asyncio + async def test_require_admin_role_passes_for_admin(self): + """require_admin_role returns the token when the admin role is present.""" + settings = Settings( + role_claim_name="resource_access.node-ui.roles", + admin_role=ADMIN_ROLE, + ) + result = await require_admin_role(TEST_ADMIN_DECRYPTED_JWT, settings) + assert result == TEST_ADMIN_DECRYPTED_JWT + + @pytest.mark.asyncio + async def test_require_admin_role_raises_403_for_non_admin(self): + """require_admin_role raises 403 when the admin role is absent.""" + settings = Settings( + role_claim_name="resource_access.node-ui.roles", + admin_role=ADMIN_ROLE, + ) + with pytest.raises(HTTPException) as exc_info: + await require_admin_role(TEST_STEWARD_DECRYPTED_JWT, settings) + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + assert "admin role not found in token" in exc_info.value.detail["message"] + + @pytest.mark.asyncio + async def test_require_admin_role_passes_when_role_claim_not_configured(self): + """require_admin_role bypasses the check when role_claim_name is not set.""" + settings = Settings(role_claim_name=None, admin_role=ADMIN_ROLE) + result = await require_admin_role(TEST_STEWARD_DECRYPTED_JWT, settings) + assert result == TEST_STEWARD_DECRYPTED_JWT diff --git a/uv.lock b/uv.lock index 2616081..2dec61b 100644 --- a/uv.lock +++ b/uv.lock @@ -291,7 +291,7 @@ wheels = [ [[package]] name = "hub-adapter" -version = "0.6.0" +version = "0.6.1" source = { editable = "." } dependencies = [ { name = "click" }, From ffa654dc3c78246f79289d1c0ce7f40c97128e62 Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Tue, 28 Apr 2026 08:39:09 +0200 Subject: [PATCH 2/9] feat(logs): add network traffic log fetching --- hub_adapter/routers/logs.py | 100 +++++++++++++++++++++++++++++++++++- hub_adapter/schemas/logs.py | 26 +++++++++- 2 files changed, 124 insertions(+), 2 deletions(-) diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index f6f6dde..550602d 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -14,7 +14,16 @@ from hub_adapter.auth import verify_idp_token, jwtbearer, require_admin_role from hub_adapter.constants import ServiceTag from hub_adapter.dependencies import get_settings, make_log_hook -from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse, LogQLQueryRequest, LogQLQueryResponse +from hub_adapter.schemas.logs import ( + AnalysisLogHistoryResponse, + AnalysisLogsResponse, + EventLogResponse, + LogQLQueryRequest, + LogQLQueryResponse, + NetStatResponse, + NetStatRun, + NetStatTotal, +) logger = logging.getLogger(__name__) @@ -327,6 +336,95 @@ async def get_analysis_log_history( return {"analysis_id": analysis_id, "runs": result_runs} +def _parse_netstats_container(container_name: str) -> tuple[str, int]: + """Strip the 'net-stats-analysis-' prefix and split on the last dash to get (analysis_id_str, run_number).""" + prefix = "net-stats-analysis-" + if not container_name.startswith(prefix): + raise ValueError(f"Unexpected container name format: {container_name!r}") + remainder = container_name[len(prefix) :] + analysis_id_str, _, run_str = remainder.rpartition("-") + if not analysis_id_str or not run_str.isdigit(): + raise ValueError(f"Cannot parse run number from container name: {container_name!r}") + return analysis_id_str, int(run_str) + + +@logs_router.get( + "/netstats", + status_code=status.HTTP_200_OK, + response_model=NetStatResponse, + name="logs.netstats.get", +) +async def get_netstats( + analysis_id: Annotated[uuid.UUID | None, Query(description="Filter by analysis UUID")] = None, + start_date: Annotated[ + datetime.datetime | None, + Query(description="Filter by start date using ISO8601 format"), + ] = None, + end_date: Annotated[ + datetime.datetime | None, + Query(description="Filter by end date using ISO8601 format"), + ] = None, + limit: Annotated[int, Query(description="Maximum number of raw log entries to return")] = 1000, +): + """Retrieve network traffic statistics from netstats log events.""" + settings = get_settings() + if not settings.victoria_logs_url: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Log service is not configured", + ) + + query_parts = ['log.event_name:"netstats.analysis.traffic"'] + if analysis_id is not None: + query_parts.append(f'kubernetes.container_name:~"net-stats-analysis-{str(analysis_id)}-"') + base_query = " AND ".join(query_parts) + + fields = "_time, kubernetes.container_name, kubernetes.pod_name, log.bytes_in, log.bytes_out" + logsql_query = f"{base_query} | fields {fields}" + + params: dict = {"limit": limit} + if start_date: + params["start"] = start_date.isoformat() + if end_date: + params["end"] = end_date.isoformat() + + raw_logs = _execute_raw_query(logsql_query, params) + total = count_logs(base_query, params) + + totals: dict[str, NetStatTotal] = {} + for entry in raw_logs: + container_name = entry.get("kubernetes.container_name", "") + try: + analysis_id_str, run_number = _parse_netstats_container(container_name) + entry_analysis_id = uuid.UUID(analysis_id_str) + + except (ValueError, AttributeError): + logger.warning(f"Skipping netstats entry with unparseable container name: {container_name}") + continue + + raw_time = entry.get("_time", "") + run = NetStatRun( + timestamp=datetime.datetime.fromisoformat(raw_time) if raw_time else datetime.datetime.min, + container=container_name, + analysis_id=entry_analysis_id, + run_number=run_number, + pod=entry.get("kubernetes.pod_name", ""), + bytes_in=int(entry.get("log.bytes_in") or 0), + bytes_out=int(entry.get("log.bytes_out") or 0), + ) + + key = str(entry_analysis_id) + if key not in totals: + totals[key] = NetStatTotal(analysis_id=entry_analysis_id, bytes_in=0, bytes_out=0, runs=[]) + totals[key].runs.append(run) + totals[key].bytes_in += run.bytes_in + totals[key].bytes_out += run.bytes_out + + data = list(totals.values()) + meta = {"total": total, "limit": limit, "offset": 0, "count": len(data)} + return {"data": data, "meta": meta} + + @logs_router.post( "/logs/query", status_code=status.HTTP_200_OK, diff --git a/hub_adapter/schemas/logs.py b/hub_adapter/schemas/logs.py index 134c78b..581b00e 100644 --- a/hub_adapter/schemas/logs.py +++ b/hub_adapter/schemas/logs.py @@ -33,7 +33,7 @@ class Meta(BaseModel): count: int total: int limit: int - offset: int + offset: int | None = None class EventLogResponse(BaseModel): @@ -43,6 +43,28 @@ class EventLogResponse(BaseModel): meta: Meta +class NetStatRun(BaseModel): + timestamp: datetime.datetime + container: str + analysis_id: uuid.UUID + run_number: int + pod: str + bytes_in: int + bytes_out: int + + +class NetStatTotal(BaseModel): + analysis_id: uuid.UUID + bytes_in: int + bytes_out: int + runs: list[NetStatRun] + + +class NetStatResponse(BaseModel): + data: list[NetStatTotal] + meta: Meta + + class PodLog(BaseModel): """A single log line from a pod container.""" @@ -143,6 +165,8 @@ class LogQLQueryResponse(BaseModel): "logs.analysis.live.get": "A user requested the logs for an analysis", "logs.analysis.history.get": "A user requested the log history for an analysis", "logs.query.raw": "An admin sent a raw LogQL query to VictoriaLogs", + "logs.netstats.get": "A user requested network traffic statistics", + "netstats.analysis.traffic": "A network traffic statistics event was recorded", "autostart.analysis.create": "The Hub Adapter automatically sent a request to start an analysis to the Pod Orchestrator", "api.ui.access": "The API Swagger UI was accessed", "unknown": "An unknown event has occurred", From fb2175e072ac4291bf6fe7d1ce614b5555859c8e Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Tue, 28 Apr 2026 13:33:15 +0200 Subject: [PATCH 3/9] feat(logs): add network requests summation endpoints to logs --- hub_adapter/routers/hub.py | 3 +- hub_adapter/routers/logs.py | 65 +++++++++++ hub_adapter/schemas/logs.py | 11 ++ tests/router_tests/routes.py | 23 +++- tests/router_tests/test_logs.py | 201 +++++++++++++++++++++++++++++--- 5 files changed, 288 insertions(+), 15 deletions(-) diff --git a/hub_adapter/routers/hub.py b/hub_adapter/routers/hub.py index ca7dfe9..d7bb2a1 100644 --- a/hub_adapter/routers/hub.py +++ b/hub_adapter/routers/hub.py @@ -195,10 +195,11 @@ async def list_analysis_nodes( @catch_hub_errors async def list_specific_analysis_node( analysis_node_id: Annotated[uuid.UUID | str, Path(description="Analysis Node UUID.")], + query_params: Annotated[dict, Depends(_parse_query_params)], core_client: Annotated[flame_hub.CoreClient, Depends(get_core_client)], ): """List a specific analysis node.""" - return core_client.get_analysis_node(analysis_node_id=analysis_node_id) + return core_client.get_analysis_node(analysis_node_id=analysis_node_id, **query_params) @hub_router.post( diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index 550602d..2c3f943 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -17,6 +17,7 @@ from hub_adapter.schemas.logs import ( AnalysisLogHistoryResponse, AnalysisLogsResponse, + ApiRequestCountResponse, EventLogResponse, LogQLQueryRequest, LogQLQueryResponse, @@ -452,3 +453,67 @@ async def raw_log_query(body: LogQLQueryRequest): meta = {"total": total, "limit": body.limit, "offset": body.offset, "count": len(data)} return {"data": data, "meta": meta} + + +@logs_router.get( + "/requests", + status_code=status.HTTP_200_OK, + response_model=ApiRequestCountResponse, + name="logs.requests.get", +) +async def get_api_requests( + start_date: Annotated[ + datetime.datetime | None, + Query(description="Filter requests from this timestamp using ISO8601 format"), + ] = None, + end_date: Annotated[ + datetime.datetime | None, + Query(description="Filter requests up to this timestamp using ISO8601 format"), + ] = None, + endpoint: Annotated[ + str | None, + Query(description="Filter breakdown to paths starting with this prefix"), + ] = None, + method: Annotated[ + str | None, + Query(description="Filter breakdown to a specific HTTP method (e.g. GET, POST, DELETE)"), + ] = None, +): + """Get total API request count and a per-endpoint breakdown grouped by HTTP method and path.""" + settings = get_settings() + if not settings.victoria_logs_url: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Event log service is not configured", + ) + + logsql_query = r"""log.logger:"uvicorn.access" | extract '<_> " HTTP/<_>" ' | stats by (method, path) count() as requests""" + params: dict = {} + if start_date: + params["start"] = start_date.isoformat() + if end_date: + params["end"] = end_date.isoformat() + + raw = _execute_raw_query(logsql_query, params) + method_filter = method.upper() if method else None + + by_path: dict[str, dict[str, int]] = {} + for entry in raw: + req_method = entry.get("method", "") + base_path = entry.get("path", "").split("?")[0] + count = int(entry.get("requests", 0)) + by_path.setdefault(base_path, {}) + by_path[base_path][req_method] = by_path[base_path].get(req_method, 0) + count + + data: dict[str, dict[str, int]] = {} + for path in sorted(by_path): + if endpoint is not None and not path.startswith(endpoint): + continue + methods = by_path[path] + if method_filter is not None: + if method_filter not in methods: + continue + methods = {method_filter: methods[method_filter]} + data[path] = {**methods, "total": sum(methods.values())} + + return {"total": sum(v["total"] for v in data.values()), "data": data} diff --git a/hub_adapter/schemas/logs.py b/hub_adapter/schemas/logs.py index 581b00e..c0f4fc0 100644 --- a/hub_adapter/schemas/logs.py +++ b/hub_adapter/schemas/logs.py @@ -117,6 +117,16 @@ class LogQLQueryResponse(BaseModel): meta: Meta +class ApiRequestCountResponse(BaseModel): + """Response for the API request count endpoint. + + data maps endpoint path → {method: count, ..., "total": count}. + """ + + total: int + data: dict[str, dict[str, int]] + + # Events ## Keys are the event name and the value is a human-readable description of the event TRACKED_EVENTS = { @@ -166,6 +176,7 @@ class LogQLQueryResponse(BaseModel): "logs.analysis.history.get": "A user requested the log history for an analysis", "logs.query.raw": "An admin sent a raw LogQL query to VictoriaLogs", "logs.netstats.get": "A user requested network traffic statistics", + "logs.requests.get": "A user requested API request counts from the event log", "netstats.analysis.traffic": "A network traffic statistics event was recorded", "autostart.analysis.create": "The Hub Adapter automatically sent a request to start an analysis to the Pod Orchestrator", "api.ui.access": "The API Swagger UI was accessed", diff --git a/tests/router_tests/routes.py b/tests/router_tests/routes.py index 9e8a982..794322c 100644 --- a/tests/router_tests/routes.py +++ b/tests/router_tests/routes.py @@ -13,7 +13,14 @@ ListRoutes, ListServices, ) -from hub_adapter.schemas.logs import AnalysisLogHistoryResponse, AnalysisLogsResponse, EventLogResponse, LogQLQueryResponse +from hub_adapter.schemas.logs import ( + AnalysisLogHistoryResponse, + AnalysisLogsResponse, + ApiRequestCountResponse, + EventLogResponse, + LogQLQueryResponse, + NetStatResponse, +) from hub_adapter.schemas.podorc import ( CleanupPodResponse, LogResponse, @@ -65,6 +72,20 @@ "status_code": 200, "response_model": LogQLQueryResponse, }, + { + "path": "/netstats", + "name": "logs.netstats.get", + "methods": {"GET"}, + "status_code": 200, + "response_model": NetStatResponse, + }, + { + "path": "/requests", + "name": "logs.requests.get", + "methods": {"GET"}, + "status_code": 200, + "response_model": ApiRequestCountResponse, + }, ) EXPECTED_META_ROUTE_CONFIG = ( diff --git a/tests/router_tests/test_logs.py b/tests/router_tests/test_logs.py index 8a21949..d236756 100644 --- a/tests/router_tests/test_logs.py +++ b/tests/router_tests/test_logs.py @@ -1,5 +1,6 @@ """Unit tests for the logs router.""" +import datetime import uuid from unittest.mock import patch @@ -11,15 +12,30 @@ _group_by_run, get_analysis_log_history, get_analysis_logs, + get_api_requests, get_events, logs_router, raw_log_query, ) -from hub_adapter.schemas.logs import LogQLQueryRequest +from hub_adapter.schemas.logs import ApiRequestCountResponse, LogQLQueryRequest from tests.conftest import check_routes from tests.router_tests.routes import EXPECTED_LOGS_ROUTE_CONFIG +class TestApiRequestCountSchemas: + """Tests for ApiRequestCountResponse Pydantic model.""" + + def test_api_request_count_response_fields(self): + """ApiRequestCountResponse stores total and a dict mapping path to method counts.""" + response = ApiRequestCountResponse( + total=12, + data={"/node/settings": {"GET": 12, "total": 12}}, + ) + assert response.total == 12 + assert response.data["/node/settings"]["GET"] == 12 + assert response.data["/node/settings"]["total"] == 12 + + class TestLogs: """Logs endpoint configuration and behaviour tests.""" @@ -43,9 +59,7 @@ async def test_get_events_503_when_victoria_logs_url_not_set(self, mock_get_sett @patch("hub_adapter.routers.logs.count_logs") @patch("hub_adapter.routers.logs.query_logs") @patch("hub_adapter.routers.logs.get_settings") - async def test_get_events_returns_data_when_configured( - self, mock_get_settings, mock_query_logs, mock_count_logs - ): + async def test_get_events_returns_data_when_configured(self, mock_get_settings, mock_query_logs, mock_count_logs): """get_events returns paginated data when victoria_logs_url is configured.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" mock_query_logs.return_value = [{"event_name": "hub.project.get", "message": "test"}] @@ -191,9 +205,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @patch("hub_adapter.routers.logs._query_pod_logs") @patch("hub_adapter.routers.logs._get_analysis_container_names") @patch("hub_adapter.routers.logs.get_settings") - async def test_returns_all_runs_sorted_ascending( - self, mock_get_settings, mock_get_names, mock_query_logs - ): + async def test_returns_all_runs_sorted_ascending(self, mock_get_settings, mock_get_names, mock_query_logs): """get_analysis_log_history returns every run ordered by run number ascending.""" analysis_id = uuid.uuid4() analysis_id_str = str(analysis_id) @@ -214,9 +226,7 @@ async def test_returns_all_runs_sorted_ascending( @pytest.mark.asyncio @patch("hub_adapter.routers.logs._get_analysis_container_names") @patch("hub_adapter.routers.logs.get_settings") - async def test_returns_empty_runs_list_when_no_containers_found( - self, mock_get_settings, mock_get_names - ): + async def test_returns_empty_runs_list_when_no_containers_found(self, mock_get_settings, mock_get_names): """get_analysis_log_history returns an empty runs list when no containers exist.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" mock_get_names.return_value = [] @@ -251,9 +261,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @patch("hub_adapter.routers.logs.count_logs") @patch("hub_adapter.routers.logs._execute_raw_query") @patch("hub_adapter.routers.logs.get_settings") - async def test_returns_paginated_data_when_configured( - self, mock_get_settings, mock_execute, mock_count - ): + async def test_returns_paginated_data_when_configured(self, mock_get_settings, mock_execute, mock_count): """raw_log_query returns data and meta envelope when configured.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" mock_execute.return_value = [{"_msg": "hello", "level": "info"}] @@ -268,3 +276,170 @@ async def test_returns_paginated_data_when_configured( assert result["meta"]["offset"] == 0 assert len(result["data"]) == 1 assert result["data"][0] == {"_msg": "hello", "level": "info"} + + +class TestGetApiRequests: + """Tests for the GET /requests endpoint.""" + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs.get_settings") + async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): + """get_api_requests raises 503 when victoria_logs_url is None.""" + mock_get_settings.return_value.victoria_logs_url = None + + with pytest.raises(HTTPException) as exc_info: + await get_api_requests() + + assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + assert exc_info.value.detail == "Event log service is not configured" + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_returns_total_and_per_endpoint_breakdown(self, mock_get_settings, mock_execute): + """get_api_requests returns the sum total and one dict entry per endpoint.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/node/settings", "requests": "12"}, + {"method": "POST", "path": "/events/signin", "requests": "6"}, + ] + + result = await get_api_requests() + + assert result["total"] == 18 + assert len(result["data"]) == 2 + assert result["data"]["/node/settings"] == {"GET": 12, "total": 12} + assert result["data"]["/events/signin"] == {"POST": 6, "total": 6} + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_strips_query_params_and_reaggregates(self, mock_get_settings, mock_execute): + """get_api_requests strips query strings and merges counts for the same base path.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/analysis-nodes?include=analysis,node", "requests": "8"}, + {"method": "GET", "path": "/analysis-nodes?sort=-updated_at", "requests": "3"}, + ] + + result = await get_api_requests() + + assert result["total"] == 11 + assert len(result["data"]) == 1 + assert result["data"]["/analysis-nodes"] == {"GET": 11, "total": 11} + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_multiple_methods_per_endpoint(self, mock_get_settings, mock_execute): + """get_api_requests groups multiple methods under the same endpoint key.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/node/settings", "requests": "10"}, + {"method": "POST", "path": "/node/settings", "requests": "3"}, + ] + + result = await get_api_requests() + + assert result["total"] == 13 + assert result["data"]["/node/settings"]["GET"] == 10 + assert result["data"]["/node/settings"]["POST"] == 3 + assert result["data"]["/node/settings"]["total"] == 13 + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_filters_by_endpoint_prefix(self, mock_get_settings, mock_execute): + """get_api_requests filters the breakdown and total to paths starting with the given prefix.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/node/settings", "requests": "12"}, + {"method": "GET", "path": "/analysis-nodes", "requests": "8"}, + ] + + result = await get_api_requests(endpoint="/node") + + assert result["total"] == 12 + assert list(result["data"].keys()) == ["/node/settings"] + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_returns_zero_total_when_no_paths_match_prefix(self, mock_get_settings, mock_execute): + """get_api_requests returns total 0 and empty dict when no paths match the given prefix.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/node/settings", "requests": "12"}, + ] + + result = await get_api_requests(endpoint="/nonexistent") + + assert result["total"] == 0 + assert result["data"] == {} + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_passes_date_range_params_to_query(self, mock_get_settings, mock_execute): + """get_api_requests forwards start_date and end_date to _execute_raw_query.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [] + start = datetime.datetime(2026, 4, 1, tzinfo=datetime.timezone.utc) + end = datetime.datetime(2026, 4, 28, tzinfo=datetime.timezone.utc) + + await get_api_requests(start_date=start, end_date=end) + + passed_params = mock_execute.call_args[0][1] + assert passed_params["start"] == start.isoformat() + assert passed_params["end"] == end.isoformat() + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_filters_by_method(self, mock_get_settings, mock_execute): + """get_api_requests only returns endpoints that have the given method.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "GET", "path": "/node/settings", "requests": "10"}, + {"method": "DELETE", "path": "/local", "requests": "4"}, + {"method": "GET", "path": "/local", "requests": "2"}, + ] + + result = await get_api_requests(method="DELETE") + + assert result["total"] == 4 + assert list(result["data"].keys()) == ["/local"] + assert result["data"]["/local"] == {"DELETE": 4, "total": 4} + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_method_filter_is_case_insensitive(self, mock_get_settings, mock_execute): + """get_api_requests uppercases the method filter before comparing.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "DELETE", "path": "/local", "requests": "4"}, + ] + + result = await get_api_requests(method="delete") + + assert result["total"] == 4 + assert "/local" in result["data"] + + @pytest.mark.asyncio + @patch("hub_adapter.routers.logs._execute_raw_query") + @patch("hub_adapter.routers.logs.get_settings") + async def test_filters_by_method_and_endpoint(self, mock_get_settings, mock_execute): + """get_api_requests applies both method and endpoint filters together.""" + mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" + mock_execute.return_value = [ + {"method": "DELETE", "path": "/local", "requests": "4"}, + {"method": "DELETE", "path": "/node/settings", "requests": "1"}, + {"method": "GET", "path": "/node/settings", "requests": "10"}, + ] + + result = await get_api_requests(method="DELETE", endpoint="/node") + + assert result["total"] == 1 + assert list(result["data"].keys()) == ["/node/settings"] + assert result["data"]["/node/settings"] == {"DELETE": 1, "total": 1} From 77a7bee4522708109cc713d2c1d711d810db024e Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Tue, 28 Apr 2026 14:46:05 +0200 Subject: [PATCH 4/9] refactor(logs): remove redundant fields in requests ep --- hub_adapter/routers/logs.py | 1 - hub_adapter/schemas/logs.py | 1 - 2 files changed, 2 deletions(-) diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index 2c3f943..7c1f5fd 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -407,7 +407,6 @@ async def get_netstats( run = NetStatRun( timestamp=datetime.datetime.fromisoformat(raw_time) if raw_time else datetime.datetime.min, container=container_name, - analysis_id=entry_analysis_id, run_number=run_number, pod=entry.get("kubernetes.pod_name", ""), bytes_in=int(entry.get("log.bytes_in") or 0), diff --git a/hub_adapter/schemas/logs.py b/hub_adapter/schemas/logs.py index c0f4fc0..60210ff 100644 --- a/hub_adapter/schemas/logs.py +++ b/hub_adapter/schemas/logs.py @@ -46,7 +46,6 @@ class EventLogResponse(BaseModel): class NetStatRun(BaseModel): timestamp: datetime.datetime container: str - analysis_id: uuid.UUID run_number: int pod: str bytes_in: int From a03cf9737dbfd20901a617b33e655fc026db61e1 Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Wed, 29 Apr 2026 09:27:47 +0200 Subject: [PATCH 5/9] feat(logs): allow more params for pod log queries --- hub_adapter/errors.py | 17 ++++++ hub_adapter/routers/logs.py | 109 +++++++++++++++++++----------------- 2 files changed, 76 insertions(+), 50 deletions(-) diff --git a/hub_adapter/errors.py b/hub_adapter/errors.py index 721008e..a7a4e75 100644 --- a/hub_adapter/errors.py +++ b/hub_adapter/errors.py @@ -152,6 +152,23 @@ def __init__(self): ) +def require_victoria_logs(f): + """Raise HTTP 503 if VictoriaLogs is not configured.""" + + @functools.wraps(f) + async def inner(*args, **kwargs): + from hub_adapter.dependencies import get_settings # avoid circular import + + if not get_settings().victoria_logs_url: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Log service is not configured", + ) + return await f(*args, **kwargs) + + return inner + + def catch_hub_errors(f): """Custom error handling decorator for flame_hub_client.""" diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index 7c1f5fd..a7d26a7 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -14,6 +14,7 @@ from hub_adapter.auth import verify_idp_token, jwtbearer, require_admin_role from hub_adapter.constants import ServiceTag from hub_adapter.dependencies import get_settings, make_log_hook +from hub_adapter.errors import require_victoria_logs from hub_adapter.schemas.logs import ( AnalysisLogHistoryResponse, AnalysisLogsResponse, @@ -145,14 +146,36 @@ def _get_analysis_container_names(analysis_id_str: str) -> list[str]: return names -def _query_pod_logs(container_name: str) -> list[dict]: - """Return log lines for a specific container, sorted oldest-first.""" +def _query_pod_logs( + container_name: str, + start_date: datetime.datetime | None = None, + end_date: datetime.datetime | None = None, + limit: int | None = None, + offset: int = 0, +) -> list[dict]: + """Return log lines for a specific container, sorted oldest-first. If no start_date or end_date provided, then set + limit to 1000.""" settings = get_settings() query = f'kubernetes.container_name:"{container_name}"' - query_data = { + query_data: dict = { "query": (f"{query} | fields _time, _msg, level, log.error | sort by (_time)"), - "limit": 1000, } + if not limit: + if not start_date and not end_date: + query_data["limit"] = 1000 + + else: + query_data["limit"] = limit + + if offset: + query_data["offset"] = offset + + if start_date: + query_data["start"] = start_date.isoformat() + + if end_date: + query_data["end"] = end_date.isoformat() + with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: resp = client.post( f"{settings.victoria_logs_url}/select/logsql/query", @@ -197,6 +220,7 @@ def _group_by_run(container_names: list[str]) -> dict[int, dict[str, str]]: response_model=EventLogResponse, name="logs.events.get", ) +@require_victoria_logs async def get_events( limit: Annotated[int | None, Query(description="Maximum number of events to return")] = 50, offset: Annotated[int | None, Query(description="Number of events to offset by")] = None, @@ -212,13 +236,6 @@ async def get_events( ] = None, ): """Retrieve a selection of logged events.""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Event log service is not configured", - ) - query_parts = ["log.event_name:*"] if service_tag: @@ -272,17 +289,21 @@ async def log_user_signout(): name="logs.analysis.live.get", response_model=AnalysisLogsResponse, ) +@require_victoria_logs async def get_analysis_logs( analysis_id: Annotated[uuid.UUID, Path(description="UUID of the analysis.")], + start_date: Annotated[ + datetime.datetime | None, + Query(description="Filter logs from this timestamp using ISO8601 format"), + ] = None, + end_date: Annotated[ + datetime.datetime | None, + Query(description="Filter logs up to this timestamp using ISO8601 format"), + ] = None, + limit: Annotated[int | None, Query(description="Maximum number of log lines to return per container")] = 1000, + offset: Annotated[int | None, Query(description="Number of log lines to skip per container")] = 0, ): """Get the latest logs for both containers of an analysis (highest run number).""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Log service is not configured", - ) - container_names = _get_analysis_container_names(str(analysis_id)) runs = _group_by_run(container_names) @@ -295,11 +316,12 @@ async def get_analysis_logs( latest_run_num = max(runs) latest = runs[latest_run_num] + pod_args = (start_date, end_date, limit, offset) return { "analysis_id": analysis_id, "run_number": latest_run_num, - "nginx_logs": _query_pod_logs(latest["nginx"]) if "nginx" in latest else [], - "analysis_logs": _query_pod_logs(latest["analysis"]) if "analysis" in latest else [], + "nginx_logs": _query_pod_logs(latest["nginx"], *pod_args) if "nginx" in latest else [], + "analysis_logs": _query_pod_logs(latest["analysis"], *pod_args) if "analysis" in latest else [], } @@ -309,28 +331,33 @@ async def get_analysis_logs( name="logs.analysis.history.get", response_model=AnalysisLogHistoryResponse, ) +@require_victoria_logs async def get_analysis_log_history( analysis_id: Annotated[uuid.UUID, Path(description="UUID of the analysis.")], + start_date: Annotated[ + datetime.datetime | None, + Query(description="Filter logs from this timestamp using ISO8601 format"), + ] = None, + end_date: Annotated[ + datetime.datetime | None, + Query(description="Filter logs up to this timestamp using ISO8601 format"), + ] = None, + limit: Annotated[int, Query(description="Maximum number of log lines to return per container")] = 1000, + offset: Annotated[int, Query(description="Number of log lines to skip per container")] = 0, ): """Get logs for all runs of an analysis, sorted by run number ascending.""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Log service is not configured", - ) - container_names = _get_analysis_container_names(str(analysis_id)) runs = _group_by_run(container_names) + pod_args = (start_date, end_date, limit, offset) result_runs = [] for run_num in sorted(runs): run = runs[run_num] result_runs.append( { "run_number": run_num, - "nginx_logs": _query_pod_logs(run["nginx"]) if "nginx" in run else [], - "analysis_logs": _query_pod_logs(run["analysis"]) if "analysis" in run else [], + "nginx_logs": _query_pod_logs(run["nginx"], *pod_args) if "nginx" in run else [], + "analysis_logs": _query_pod_logs(run["analysis"], *pod_args) if "analysis" in run else [], } ) @@ -355,6 +382,7 @@ def _parse_netstats_container(container_name: str) -> tuple[str, int]: response_model=NetStatResponse, name="logs.netstats.get", ) +@require_victoria_logs async def get_netstats( analysis_id: Annotated[uuid.UUID | None, Query(description="Filter by analysis UUID")] = None, start_date: Annotated[ @@ -368,13 +396,6 @@ async def get_netstats( limit: Annotated[int, Query(description="Maximum number of raw log entries to return")] = 1000, ): """Retrieve network traffic statistics from netstats log events.""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Log service is not configured", - ) - query_parts = ['log.event_name:"netstats.analysis.traffic"'] if analysis_id is not None: query_parts.append(f'kubernetes.container_name:~"net-stats-analysis-{str(analysis_id)}-"') @@ -432,15 +453,9 @@ async def get_netstats( response_model=LogQLQueryResponse, dependencies=[Depends(require_admin_role)], ) +@require_victoria_logs async def raw_log_query(body: LogQLQueryRequest): """Execute a raw LogQL query against VictoriaLogs. Admin only.""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Log service is not configured", - ) - params: dict = {"limit": body.limit, "offset": body.offset} if body.start: params["start"] = body.start.isoformat() @@ -460,6 +475,7 @@ async def raw_log_query(body: LogQLQueryRequest): response_model=ApiRequestCountResponse, name="logs.requests.get", ) +@require_victoria_logs async def get_api_requests( start_date: Annotated[ datetime.datetime | None, @@ -479,13 +495,6 @@ async def get_api_requests( ] = None, ): """Get total API request count and a per-endpoint breakdown grouped by HTTP method and path.""" - settings = get_settings() - if not settings.victoria_logs_url: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Event log service is not configured", - ) - logsql_query = r"""log.logger:"uvicorn.access" | extract '<_> " HTTP/<_>" ' | stats by (method, path) count() as requests""" params: dict = {} if start_date: From c0e6b38359474f46e79457c02f21b7dc161fe270 Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Wed, 29 Apr 2026 12:33:28 +0200 Subject: [PATCH 6/9] chore(logs): remove limit from pod log query --- hub_adapter/routers/logs.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index a7d26a7..fca2e6d 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -150,23 +150,16 @@ def _query_pod_logs( container_name: str, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, - limit: int | None = None, + limit: int | None = 1000, offset: int = 0, ) -> list[dict]: - """Return log lines for a specific container, sorted oldest-first. If no start_date or end_date provided, then set - limit to 1000.""" + """Return log lines for a specific container, sorted oldest-first.""" settings = get_settings() query = f'kubernetes.container_name:"{container_name}"' query_data: dict = { "query": (f"{query} | fields _time, _msg, level, log.error | sort by (_time)"), + "limit": limit, } - if not limit: - if not start_date and not end_date: - query_data["limit"] = 1000 - - else: - query_data["limit"] = limit - if offset: query_data["offset"] = offset From e490089c1f1eb96ef413511be4f031399bbf4b3e Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Thu, 30 Apr 2026 12:51:05 +0200 Subject: [PATCH 7/9] test(logs): update unit tests --- tests/router_tests/test_logs.py | 16 ++++++++-------- tests/test_user_settings.py | 14 ++++++-------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/router_tests/test_logs.py b/tests/router_tests/test_logs.py index d236756..b86363b 100644 --- a/tests/router_tests/test_logs.py +++ b/tests/router_tests/test_logs.py @@ -44,7 +44,7 @@ def test_route_configs(self, test_client): check_routes(logs_router, EXPECTED_LOGS_ROUTE_CONFIG, test_client) @pytest.mark.asyncio - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_get_events_503_when_victoria_logs_url_not_set(self, mock_get_settings): """get_events raises 503 when victoria_logs_url is None.""" mock_get_settings.return_value.victoria_logs_url = None @@ -53,7 +53,7 @@ async def test_get_events_503_when_victoria_logs_url_not_set(self, mock_get_sett await get_events() assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE - assert exc_info.value.detail == "Event log service is not configured" + assert exc_info.value.detail == "Log service is not configured" @pytest.mark.asyncio @patch("hub_adapter.routers.logs.count_logs") @@ -114,7 +114,7 @@ class TestGetAnalysisLogs: """Tests for the get_analysis_logs endpoint.""" @pytest.mark.asyncio - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): """get_analysis_logs raises 503 when victoria_logs_url is None.""" mock_get_settings.return_value.victoria_logs_url = None @@ -157,7 +157,7 @@ async def test_returns_latest_run_logs(self, mock_get_settings, mock_get_names, ] nginx_logs = [{"timestamp": "2024-01-01T00:00:00Z", "message": "nginx log"}] analysis_logs = [{"timestamp": "2024-01-01T00:00:01Z", "message": "analysis log"}] - mock_query_logs.side_effect = lambda name: nginx_logs if "nginx" in name else analysis_logs + mock_query_logs.side_effect = lambda name, *args, **kwargs: nginx_logs if "nginx" in name else analysis_logs result = await get_analysis_logs(analysis_id) @@ -189,7 +189,7 @@ class TestGetAnalysisLogHistory: """Tests for the get_analysis_log_history endpoint.""" @pytest.mark.asyncio - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): """get_analysis_log_history raises 503 when victoria_logs_url is None.""" mock_get_settings.return_value.victoria_logs_url = None @@ -246,7 +246,7 @@ def test_route_configs(self, test_client): check_routes(logs_router, EXPECTED_LOGS_ROUTE_CONFIG, test_client) @pytest.mark.asyncio - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): """raw_log_query raises 503 when victoria_logs_url is None.""" mock_get_settings.return_value.victoria_logs_url = None @@ -282,7 +282,7 @@ class TestGetApiRequests: """Tests for the GET /requests endpoint.""" @pytest.mark.asyncio - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings): """get_api_requests raises 503 when victoria_logs_url is None.""" mock_get_settings.return_value.victoria_logs_url = None @@ -291,7 +291,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings await get_api_requests() assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE - assert exc_info.value.detail == "Event log service is not configured" + assert exc_info.value.detail == "Log service is not configured" @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") diff --git a/tests/test_user_settings.py b/tests/test_user_settings.py index 20fc506..9c6149a 100644 --- a/tests/test_user_settings.py +++ b/tests/test_user_settings.py @@ -184,10 +184,9 @@ def test_load_from_database_with_none_database(self): @patch("hub_adapter.user_settings.node_database") def test_load_from_database_operational_error(self, mock_db): """Test that function returns an empty dict on OperationalError.""" - mock_db.side_effect = pw.OperationalError("Connection failed") - - with patch("hub_adapter.user_settings.logger"): - result = _load_from_database() + with patch("hub_adapter.user_settings.bind_user_settings", side_effect=pw.OperationalError("Connection failed")): + with patch("hub_adapter.user_settings.logger"): + result = _load_from_database() assert result == {} @@ -286,10 +285,9 @@ def test_save_to_database_with_none_database(self): @patch("hub_adapter.user_settings.node_database") def test_save_to_database_operational_error(self, mock_db): """Test that function returns False on OperationalError.""" - mock_db.side_effect = pw.OperationalError("Connection failed") - - with patch("hub_adapter.user_settings.logger"): - result = _save_to_database({"key": "value"}) + with patch("hub_adapter.user_settings.bind_user_settings", side_effect=pw.OperationalError("Connection failed")): + with patch("hub_adapter.user_settings.logger"): + result = _save_to_database({"key": "value"}) assert result is False From a27cf6c27dd6faec36fa521d71e040e35ac09cd1 Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Thu, 30 Apr 2026 13:05:00 +0200 Subject: [PATCH 8/9] test(logs): fix bad mocks --- tests/router_tests/test_logs.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/router_tests/test_logs.py b/tests/router_tests/test_logs.py index b86363b..b866ae8 100644 --- a/tests/router_tests/test_logs.py +++ b/tests/router_tests/test_logs.py @@ -58,7 +58,7 @@ async def test_get_events_503_when_victoria_logs_url_not_set(self, mock_get_sett @pytest.mark.asyncio @patch("hub_adapter.routers.logs.count_logs") @patch("hub_adapter.routers.logs.query_logs") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_get_events_returns_data_when_configured(self, mock_get_settings, mock_query_logs, mock_count_logs): """get_events returns paginated data when victoria_logs_url is configured.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -128,7 +128,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @pytest.mark.asyncio @patch("hub_adapter.routers.logs._get_analysis_container_names") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_raises_404_when_no_containers_found(self, mock_get_settings, mock_get_names): """get_analysis_logs raises 404 when no matching containers exist.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -143,7 +143,7 @@ async def test_raises_404_when_no_containers_found(self, mock_get_settings, mock @pytest.mark.asyncio @patch("hub_adapter.routers.logs._query_pod_logs") @patch("hub_adapter.routers.logs._get_analysis_container_names") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_latest_run_logs(self, mock_get_settings, mock_get_names, mock_query_logs): """get_analysis_logs returns logs for the highest run number only.""" analysis_id = uuid.uuid4() @@ -169,7 +169,7 @@ async def test_returns_latest_run_logs(self, mock_get_settings, mock_get_names, @pytest.mark.asyncio @patch("hub_adapter.routers.logs._query_pod_logs") @patch("hub_adapter.routers.logs._get_analysis_container_names") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_empty_lists_when_container_type_absent( self, mock_get_settings, mock_get_names, mock_query_logs ): @@ -204,7 +204,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @pytest.mark.asyncio @patch("hub_adapter.routers.logs._query_pod_logs") @patch("hub_adapter.routers.logs._get_analysis_container_names") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_all_runs_sorted_ascending(self, mock_get_settings, mock_get_names, mock_query_logs): """get_analysis_log_history returns every run ordered by run number ascending.""" analysis_id = uuid.uuid4() @@ -225,7 +225,7 @@ async def test_returns_all_runs_sorted_ascending(self, mock_get_settings, mock_g @pytest.mark.asyncio @patch("hub_adapter.routers.logs._get_analysis_container_names") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_empty_runs_list_when_no_containers_found(self, mock_get_settings, mock_get_names): """get_analysis_log_history returns an empty runs list when no containers exist.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -260,7 +260,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @pytest.mark.asyncio @patch("hub_adapter.routers.logs.count_logs") @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_paginated_data_when_configured(self, mock_get_settings, mock_execute, mock_count): """raw_log_query returns data and meta envelope when configured.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -295,7 +295,7 @@ async def test_raises_503_when_victoria_logs_url_not_set(self, mock_get_settings @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_total_and_per_endpoint_breakdown(self, mock_get_settings, mock_execute): """get_api_requests returns the sum total and one dict entry per endpoint.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -313,7 +313,7 @@ async def test_returns_total_and_per_endpoint_breakdown(self, mock_get_settings, @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_strips_query_params_and_reaggregates(self, mock_get_settings, mock_execute): """get_api_requests strips query strings and merges counts for the same base path.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -330,7 +330,7 @@ async def test_strips_query_params_and_reaggregates(self, mock_get_settings, moc @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_multiple_methods_per_endpoint(self, mock_get_settings, mock_execute): """get_api_requests groups multiple methods under the same endpoint key.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -348,7 +348,7 @@ async def test_multiple_methods_per_endpoint(self, mock_get_settings, mock_execu @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_filters_by_endpoint_prefix(self, mock_get_settings, mock_execute): """get_api_requests filters the breakdown and total to paths starting with the given prefix.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -364,7 +364,7 @@ async def test_filters_by_endpoint_prefix(self, mock_get_settings, mock_execute) @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_returns_zero_total_when_no_paths_match_prefix(self, mock_get_settings, mock_execute): """get_api_requests returns total 0 and empty dict when no paths match the given prefix.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -379,7 +379,7 @@ async def test_returns_zero_total_when_no_paths_match_prefix(self, mock_get_sett @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_passes_date_range_params_to_query(self, mock_get_settings, mock_execute): """get_api_requests forwards start_date and end_date to _execute_raw_query.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -395,7 +395,7 @@ async def test_passes_date_range_params_to_query(self, mock_get_settings, mock_e @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_filters_by_method(self, mock_get_settings, mock_execute): """get_api_requests only returns endpoints that have the given method.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -413,7 +413,7 @@ async def test_filters_by_method(self, mock_get_settings, mock_execute): @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_method_filter_is_case_insensitive(self, mock_get_settings, mock_execute): """get_api_requests uppercases the method filter before comparing.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" @@ -428,7 +428,7 @@ async def test_method_filter_is_case_insensitive(self, mock_get_settings, mock_e @pytest.mark.asyncio @patch("hub_adapter.routers.logs._execute_raw_query") - @patch("hub_adapter.routers.logs.get_settings") + @patch("hub_adapter.dependencies.get_settings") async def test_filters_by_method_and_endpoint(self, mock_get_settings, mock_execute): """get_api_requests applies both method and endpoint filters together.""" mock_get_settings.return_value.victoria_logs_url = "http://victoria:9428" From 445c86dde275872b9e4b7ef984141d61d839ac0f Mon Sep 17 00:00:00 2001 From: Bruce Schultz Date: Wed, 6 May 2026 11:07:25 +0200 Subject: [PATCH 9/9] fix(logs): make log queries async to avoid IO blocking --- hub_adapter/auth.py | 16 +++++++++- hub_adapter/routers/logs.py | 64 ++++++++++++++++++------------------- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/hub_adapter/auth.py b/hub_adapter/auth.py index 19059e4..3c30f03 100644 --- a/hub_adapter/auth.py +++ b/hub_adapter/auth.py @@ -267,7 +267,21 @@ async def require_admin_role( """Dependency to check if the user has the ADMIN_ROLE.""" role_claim_name = settings.role_claim_name admin_role = settings.admin_role - if role_claim_name and admin_role: + + if not role_claim_name: + return verified_token + + if not admin_role: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "message": "Insufficient permissions, admin role not found in token or not configured.", + "service": "Auth", + "status_code": status.HTTP_503_SERVICE_UNAVAILABLE, + }, + ) + + if role_claim_name: role_claim_keys = role_claim_name.split(".") parsed_claim = verified_token for key in role_claim_keys: diff --git a/hub_adapter/routers/logs.py b/hub_adapter/routers/logs.py index fca2e6d..e9ed88c 100644 --- a/hub_adapter/routers/logs.py +++ b/hub_adapter/routers/logs.py @@ -39,14 +39,14 @@ ) -def count_logs(query: str, params: dict | None = None) -> int: +async def count_logs(query: str, params: dict | None = None) -> int: """Return the total number of logs matching a query, ignoring limit/offset.""" settings = get_settings() count_params = {k: v for k, v in (params or {}).items() if k not in ("limit", "offset")} query_data = {"query": f"{query} | count() as total", **count_params} - with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: - resp = client.post( + async with httpx.AsyncClient(event_hooks={"response": [make_log_hook(ServiceTag.LOGS, is_async=True)]}) as client: + resp = await client.post( f"{settings.victoria_logs_url}/select/logsql/query", data=query_data, ) @@ -59,12 +59,12 @@ def count_logs(query: str, params: dict | None = None) -> int: return 0 -def _execute_raw_query(query: str, params: dict | None = None) -> list[dict]: +async def _execute_raw_query(query: str, params: dict | None = None) -> list[dict]: """Execute a LogQL query against VictoriaLogs and return raw parsed results.""" settings = get_settings() query_data = {"query": query, **(params or {})} - with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: - resp = client.post( + async with httpx.AsyncClient(event_hooks={"response": [make_log_hook(ServiceTag.LOGS, is_async=True)]}) as client: + resp = await client.post( f"{settings.victoria_logs_url}/select/logsql/query", data=query_data, ) @@ -76,7 +76,7 @@ def _execute_raw_query(query: str, params: dict | None = None) -> list[dict]: return logs -def query_logs(query: str, params: dict | None = None): +async def query_logs(query: str, params: dict | None = None): """Retrieve a selection of logs.""" _fields = ( "_msg", @@ -105,8 +105,8 @@ def query_logs(query: str, params: dict | None = None): _output_fields = {_rename.get(f, f) for f in _fields} - with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: - resp = client.post( + async with httpx.AsyncClient(event_hooks={"response": [make_log_hook(ServiceTag.LOGS, is_async=True)]}) as client: + resp = await client.post( f"{settings.victoria_logs_url}/select/logsql/query", data=query_data, ) @@ -121,7 +121,7 @@ def query_logs(query: str, params: dict | None = None): return logs -def _get_analysis_container_names(analysis_id_str: str) -> list[str]: +async def _get_analysis_container_names(analysis_id_str: str) -> list[str]: """Return all unique container names matching the analysis ID pattern.""" settings = get_settings() pattern = f"^(nginx-analysis|analysis)-{analysis_id_str}-[0-9]+$" @@ -130,8 +130,8 @@ def _get_analysis_container_names(analysis_id_str: str) -> list[str]: "query": f"{query} | uniq by (kubernetes.container_name)", "limit": 100, } - with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: - resp = client.post( + async with httpx.AsyncClient(event_hooks={"response": [make_log_hook(ServiceTag.LOGS, is_async=True)]}) as client: + resp = await client.post( f"{settings.victoria_logs_url}/select/logsql/query", data=query_data, ) @@ -146,7 +146,7 @@ def _get_analysis_container_names(analysis_id_str: str) -> list[str]: return names -def _query_pod_logs( +async def _query_pod_logs( container_name: str, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, @@ -169,8 +169,8 @@ def _query_pod_logs( if end_date: query_data["end"] = end_date.isoformat() - with httpx.Client(event_hooks={"response": [make_log_hook(ServiceTag.LOGS)]}) as client: - resp = client.post( + async with httpx.AsyncClient(event_hooks={"response": [make_log_hook(ServiceTag.LOGS, is_async=True)]}) as client: + resp = await client.post( f"{settings.victoria_logs_url}/select/logsql/query", data=query_data, ) @@ -246,8 +246,8 @@ async def get_events( if end_date: params["end"] = end_date.isoformat() - data = query_logs(query, params) - total = count_logs(query, params) + data = await query_logs(query, params) + total = await count_logs(query, params) meta = {"total": total, "limit": params["limit"], "offset": params["offset"], "count": len(data)} @@ -297,7 +297,7 @@ async def get_analysis_logs( offset: Annotated[int | None, Query(description="Number of log lines to skip per container")] = 0, ): """Get the latest logs for both containers of an analysis (highest run number).""" - container_names = _get_analysis_container_names(str(analysis_id)) + container_names = await _get_analysis_container_names(str(analysis_id)) runs = _group_by_run(container_names) if not runs: @@ -313,8 +313,8 @@ async def get_analysis_logs( return { "analysis_id": analysis_id, "run_number": latest_run_num, - "nginx_logs": _query_pod_logs(latest["nginx"], *pod_args) if "nginx" in latest else [], - "analysis_logs": _query_pod_logs(latest["analysis"], *pod_args) if "analysis" in latest else [], + "nginx_logs": await _query_pod_logs(latest["nginx"], *pod_args) if "nginx" in latest else [], + "analysis_logs": await _query_pod_logs(latest["analysis"], *pod_args) if "analysis" in latest else [], } @@ -339,7 +339,7 @@ async def get_analysis_log_history( offset: Annotated[int, Query(description="Number of log lines to skip per container")] = 0, ): """Get logs for all runs of an analysis, sorted by run number ascending.""" - container_names = _get_analysis_container_names(str(analysis_id)) + container_names = await _get_analysis_container_names(str(analysis_id)) runs = _group_by_run(container_names) pod_args = (start_date, end_date, limit, offset) @@ -349,8 +349,8 @@ async def get_analysis_log_history( result_runs.append( { "run_number": run_num, - "nginx_logs": _query_pod_logs(run["nginx"], *pod_args) if "nginx" in run else [], - "analysis_logs": _query_pod_logs(run["analysis"], *pod_args) if "analysis" in run else [], + "nginx_logs": await _query_pod_logs(run["nginx"], *pod_args) if "nginx" in run else [], + "analysis_logs": await _query_pod_logs(run["analysis"], *pod_args) if "analysis" in run else [], } ) @@ -386,7 +386,7 @@ async def get_netstats( datetime.datetime | None, Query(description="Filter by end date using ISO8601 format"), ] = None, - limit: Annotated[int, Query(description="Maximum number of raw log entries to return")] = 1000, + limit: Annotated[int, Query(description="Maximum number of analysis groups to return")] = 1000, ): """Retrieve network traffic statistics from netstats log events.""" query_parts = ['log.event_name:"netstats.analysis.traffic"'] @@ -397,14 +397,13 @@ async def get_netstats( fields = "_time, kubernetes.container_name, kubernetes.pod_name, log.bytes_in, log.bytes_out" logsql_query = f"{base_query} | fields {fields}" - params: dict = {"limit": limit} + params: dict = {} if start_date: params["start"] = start_date.isoformat() if end_date: params["end"] = end_date.isoformat() - raw_logs = _execute_raw_query(logsql_query, params) - total = count_logs(base_query, params) + raw_logs = await _execute_raw_query(logsql_query, params) totals: dict[str, NetStatTotal] = {} for entry in raw_logs: @@ -435,8 +434,9 @@ async def get_netstats( totals[key].bytes_out += run.bytes_out data = list(totals.values()) - meta = {"total": total, "limit": limit, "offset": 0, "count": len(data)} - return {"data": data, "meta": meta} + total = len(data) + meta = {"total": total, "limit": limit, "offset": 0, "count": len(data[:limit])} + return {"data": data[:limit], "meta": meta} @logs_router.post( @@ -455,8 +455,8 @@ async def raw_log_query(body: LogQLQueryRequest): if body.end: params["end"] = body.end.isoformat() - data = _execute_raw_query(body.query, params) - total = count_logs(body.query, params) + data = await _execute_raw_query(body.query, params) + total = await count_logs(body.query, params) meta = {"total": total, "limit": body.limit, "offset": body.offset, "count": len(data)} return {"data": data, "meta": meta} @@ -495,7 +495,7 @@ async def get_api_requests( if end_date: params["end"] = end_date.isoformat() - raw = _execute_raw_query(logsql_query, params) + raw = await _execute_raw_query(logsql_query, params) method_filter = method.upper() if method else None by_path: dict[str, dict[str, int]] = {}