diff --git a/examples/agent_control_demo/demo_agent.py b/examples/agent_control_demo/demo_agent.py index b6cb38c7..0cd39165 100644 --- a/examples/agent_control_demo/demo_agent.py +++ b/examples/agent_control_demo/demo_agent.py @@ -154,6 +154,7 @@ def initialize_demo_agent() -> bool: agent_name=AGENT_NAME, agent_description="Demo chatbot for testing controls", server_url=SERVER_URL, + observability_enabled=True ) logger.info("Agent initialized successfully") return True diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 1568f13e..32bb1dd1 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -54,7 +54,11 @@ from typing import TYPE_CHECKING, Any import httpx -from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult +from agent_control_telemetry.sinks import ( + BaseControlEventSink, + ControlEventSink, + SinkResult, +) from agent_control.settings import configure_settings, get_settings diff --git a/server/src/agent_control_server/observability/ingest/direct.py b/server/src/agent_control_server/observability/ingest/direct.py index b2b09e6d..37f7d3e8 100644 --- a/server/src/agent_control_server/observability/ingest/direct.py +++ b/server/src/agent_control_server/observability/ingest/direct.py @@ -1,7 +1,9 @@ """Direct event ingestor implementation. This module provides the DirectEventIngestor, which processes events -immediately (synchronously) by storing them directly to the EventStore. +immediately by writing them to an async control-event sink. Existing +store-based callers are preserved by wrapping EventStore instances in the +default EventStoreControlEventSink internally. For high-throughput scenarios, users can implement their own buffered ingestor (e.g., QueuedEventIngestor, RedisEventIngestor). @@ -11,7 +13,9 @@ import logging from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import AsyncControlEventSink +from ..sinks import EventStoreControlEventSink from ..store.base import EventStore from .base import EventIngestor, IngestResult @@ -19,31 +23,38 @@ class DirectEventIngestor(EventIngestor): - """Processes events immediately by storing them to the EventStore. + """Processes events immediately by writing them to an async control-event sink. - This is the simplest ingestor implementation. Events are stored - directly to the database, adding ~5-20ms latency per batch. + This is the simplest ingestor implementation. Events are written + directly to the configured sink, adding ~5-20ms latency per batch. For use cases that require lower latency or higher throughput, implement a custom buffered ingestor (e.g., QueuedEventIngestor). Attributes: - store: The EventStore to write events to + sink: The AsyncControlEventSink used to write events log_to_stdout: Whether to log events as structured JSON """ - def __init__(self, store: EventStore, log_to_stdout: bool = False): + def __init__( + self, + store: EventStore | AsyncControlEventSink, + log_to_stdout: bool = False, + ): """Initialize the ingestor. Args: - store: The EventStore to write events to + store: Either an EventStore or an AsyncControlEventSink implementation log_to_stdout: Whether to log events as structured JSON (default: False) """ - self.store = store + if isinstance(store, EventStore): + self.sink: AsyncControlEventSink = EventStoreControlEventSink(store) + else: + self.sink = store self.log_to_stdout = log_to_stdout async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: - """Ingest events by storing them directly to the EventStore. + """Ingest events by writing them directly to the configured sink. Args: events: List of control execution events to ingest @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: dropped = 0 try: - # Store events - processed = await self.store.store(events) + sink_result = await self.sink.write_events(events) + processed = sink_result.accepted + dropped = sink_result.dropped # Log to stdout if enabled if self.log_to_stdout: diff --git a/server/src/agent_control_server/observability/sinks.py b/server/src/agent_control_server/observability/sinks.py new file mode 100644 index 00000000..b96f4eca --- /dev/null +++ b/server/src/agent_control_server/observability/sinks.py @@ -0,0 +1,23 @@ +"""Server-side sink implementations for observability event delivery.""" + +from __future__ import annotations + +from collections.abc import Sequence + +from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult + +from .store.base import EventStore + + +class EventStoreControlEventSink: + """Write events through an EventStore-backed sink.""" + + def __init__(self, store: EventStore): + self.store = store + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write events to the underlying store and report accepted/dropped counts.""" + stored = await self.store.store(list(events)) + dropped = max(len(events) - stored, 0) + return SinkResult(accepted=stored, dropped=dropped) diff --git a/server/tests/test_observability_direct_ingest.py b/server/tests/test_observability_direct_ingest.py index 1e83f816..f3f6db81 100644 --- a/server/tests/test_observability_direct_ingest.py +++ b/server/tests/test_observability_direct_ingest.py @@ -7,6 +7,7 @@ from uuid import uuid4 from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult from agent_control_server.observability.ingest.direct import DirectEventIngestor from agent_control_server.observability.store.base import EventStore @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used raise NotImplementedError +class CountingSink: + def __init__(self) -> None: + self.calls: list[list[ControlExecutionEvent]] = [] + + async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult: + self.calls.append(events) + return SinkResult(accepted=len(events), dropped=0) + + @pytest.mark.asyncio async def test_direct_ingestor_drops_on_store_error() -> None: # Given: an ingestor with a failing store @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None: # Then: no error is raised assert True + + +@pytest.mark.asyncio +async def test_direct_ingestor_accepts_control_event_sink() -> None: + sink = CountingSink() + ingestor = DirectEventIngestor(sink) + events = [ + ControlExecutionEvent( + trace_id="a" * 32, + span_id="b" * 16, + agent_name="agent-test-01", + control_id=1, + control_name="c", + check_stage="pre", + applies_to="llm_call", + action="observe", + matched=True, + confidence=0.9, + ) + ] + + result = await ingestor.ingest(events) + + assert result.received == 1 + assert result.processed == 1 + assert result.dropped == 0 + assert sink.calls == [events] diff --git a/telemetry/tests/test_sinks.py b/telemetry/tests/test_sinks.py index cb04f179..83e8c0f6 100644 --- a/telemetry/tests/test_sinks.py +++ b/telemetry/tests/test_sinks.py @@ -5,6 +5,7 @@ from datetime import UTC, datetime from agent_control_models import ControlExecutionEvent + from agent_control_telemetry import ( BaseAsyncControlEventSink, BaseControlEventSink,