Skip to content
1 change: 1 addition & 0 deletions examples/agent_control_demo/demo_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize_demo_agent() -> bool:
agent_name=AGENT_NAME,
agent_description="Demo chatbot for testing controls",
server_url=SERVER_URL,
observability_enabled=True
)
logger.info("Agent initialized successfully")
return True
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
from typing import TYPE_CHECKING, Any

import httpx
from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult
from agent_control_telemetry.sinks import (
BaseControlEventSink,
ControlEventSink,
SinkResult,
)

from agent_control.settings import configure_settings, get_settings

Expand Down
34 changes: 23 additions & 11 deletions server/src/agent_control_server/observability/ingest/direct.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Direct event ingestor implementation.

This module provides the DirectEventIngestor, which processes events
immediately (synchronously) by storing them directly to the EventStore.
immediately by writing them to an async control-event sink. Existing
store-based callers are preserved by wrapping EventStore instances in the
default EventStoreControlEventSink internally.

For high-throughput scenarios, users can implement their own buffered
ingestor (e.g., QueuedEventIngestor, RedisEventIngestor).
Expand All @@ -11,39 +13,48 @@
import logging

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import AsyncControlEventSink

from ..sinks import EventStoreControlEventSink
from ..store.base import EventStore
from .base import EventIngestor, IngestResult

logger = logging.getLogger(__name__)


class DirectEventIngestor(EventIngestor):
"""Processes events immediately by storing them to the EventStore.
"""Processes events immediately by writing them to an async control-event sink.

This is the simplest ingestor implementation. Events are stored
directly to the database, adding ~5-20ms latency per batch.
This is the simplest ingestor implementation. Events are written
directly to the configured sink, adding ~5-20ms latency per batch.

For use cases that require lower latency or higher throughput,
implement a custom buffered ingestor (e.g., QueuedEventIngestor).

Attributes:
store: The EventStore to write events to
sink: The AsyncControlEventSink used to write events
log_to_stdout: Whether to log events as structured JSON
"""

def __init__(self, store: EventStore, log_to_stdout: bool = False):
def __init__(
self,
store: EventStore | AsyncControlEventSink,
log_to_stdout: bool = False,
):
"""Initialize the ingestor.

Args:
store: The EventStore to write events to
store: Either an EventStore or an AsyncControlEventSink implementation
log_to_stdout: Whether to log events as structured JSON (default: False)
"""
self.store = store
if isinstance(store, EventStore):
self.sink: AsyncControlEventSink = EventStoreControlEventSink(store)
else:
self.sink = store
self.log_to_stdout = log_to_stdout

async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
"""Ingest events by storing them directly to the EventStore.
"""Ingest events by writing them directly to the configured sink.

Args:
events: List of control execution events to ingest
Expand All @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
dropped = 0

try:
# Store events
processed = await self.store.store(events)
sink_result = await self.sink.write_events(events)
processed = sink_result.accepted
dropped = sink_result.dropped

# Log to stdout if enabled
if self.log_to_stdout:
Expand Down
23 changes: 23 additions & 0 deletions server/src/agent_control_server/observability/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Server-side sink implementations for observability event delivery."""

from __future__ import annotations

from collections.abc import Sequence

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult

from .store.base import EventStore


class EventStoreControlEventSink:
"""Write events through an EventStore-backed sink."""

def __init__(self, store: EventStore):
self.store = store

async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
"""Write events to the underlying store and report accepted/dropped counts."""
stored = await self.store.store(list(events))
dropped = max(len(events) - stored, 0)
return SinkResult(accepted=stored, dropped=dropped)
37 changes: 37 additions & 0 deletions server/tests/test_observability_direct_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult
from agent_control_server.observability.ingest.direct import DirectEventIngestor
from agent_control_server.observability.store.base import EventStore

Expand Down Expand Up @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used
raise NotImplementedError


class CountingSink:
def __init__(self) -> None:
self.calls: list[list[ControlExecutionEvent]] = []

async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult:
self.calls.append(events)
return SinkResult(accepted=len(events), dropped=0)


@pytest.mark.asyncio
async def test_direct_ingestor_drops_on_store_error() -> None:
# Given: an ingestor with a failing store
Expand Down Expand Up @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None:

# Then: no error is raised
assert True


@pytest.mark.asyncio
async def test_direct_ingestor_accepts_control_event_sink() -> None:
sink = CountingSink()
ingestor = DirectEventIngestor(sink)
events = [
ControlExecutionEvent(
trace_id="a" * 32,
span_id="b" * 16,
agent_name="agent-test-01",
control_id=1,
control_name="c",
check_stage="pre",
applies_to="llm_call",
action="observe",
matched=True,
confidence=0.9,
)
]

result = await ingestor.ingest(events)

assert result.received == 1
assert result.processed == 1
assert result.dropped == 0
assert sink.calls == [events]
1 change: 1 addition & 0 deletions telemetry/tests/test_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import UTC, datetime

from agent_control_models import ControlExecutionEvent

from agent_control_telemetry import (
BaseAsyncControlEventSink,
BaseControlEventSink,
Expand Down
Loading