Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ services:
path /i/v1/logs
}

@capture-logs-agent {
path /i/v1/agent-logs
}

@flags {
path /flags
path /flags*
Expand All @@ -72,6 +76,11 @@ services:
reverse_proxy capture-logs:4318
}

handle @capture-logs-agent {
uri replace /i/v1/agent-logs /i/v1/logs
reverse_proxy capture-logs-agent:4319
}

handle @replay-capture {
reverse_proxy replay-capture:3306
}
Expand Down Expand Up @@ -268,6 +277,16 @@ services:
depends_on:
- kafka

capture-logs-agent:
extends:
file: docker-compose.base.yml
service: capture-logs
depends_on:
- kafka
environment:
BIND_PORT: '4319'
KAFKA_TOPIC: agent_logs

jaeger:
extends:
file: docker-compose.base.yml
Expand Down
173 changes: 173 additions & 0 deletions posthog/clickhouse/agent_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""SQL definitions for agent_logs tables and materialized views.

This module defines the ClickHouse tables for ingesting Twig agent logs via a dedicated
Kafka pipeline. Agent logs bypass the LogsIngestionConsumer and flow directly from
Kafka to ClickHouse, similar to distinct_id_usage and session_replay_events.

Architecture:
Twig Agent → OTEL → capture-logs-agent → Kafka (agent_logs)
kafka_agent_logs (Kafka engine)
kafka_agent_logs_mv → agent_logs (dedicated table)

The team_id is extracted from Kafka headers (set by capture-logs from API token).
Task/run IDs are extracted from OTEL resource attributes for fast queries.
"""

from posthog.clickhouse.kafka_engine import kafka_engine

KAFKA_AGENT_LOGS_TOPIC = "agent_logs"
CONSUMER_GROUP_AGENT_LOGS = "clickhouse_agent_logs"

AGENT_LOGS_TABLE_NAME = "agent_logs"
KAFKA_TABLE_NAME = "kafka_agent_logs"
MV_TABLE_NAME = "kafka_agent_logs_mv"
METRICS_MV_TABLE_NAME = "kafka_agent_logs_kafka_metrics_mv"


def AGENT_LOGS_TABLE_SQL() -> str:
"""
Dedicated agent_logs table optimized for task/run queries.

Schema designed for Twig agent log access patterns:
- Primary queries by team_id + task_id + run_id
- Secondary queries by team_id + timestamp
- event_type as dedicated column for filtering
"""
return f"""
CREATE TABLE IF NOT EXISTS {AGENT_LOGS_TABLE_NAME}
(
-- Identity
`uuid` UUID DEFAULT generateUUIDv4(),
`team_id` Int32,
`task_id` String,
`run_id` String,

-- Timestamps
`timestamp` DateTime64(6),
`observed_timestamp` DateTime64(6),

-- Log metadata
`severity_text` LowCardinality(String) DEFAULT 'INFO',
`severity_number` Int32 DEFAULT 9,
`service_name` LowCardinality(String) DEFAULT 'twig-agent',

-- Log content
`body` String,
`event_type` LowCardinality(String),

-- Deployment environment
`device_type` LowCardinality(String) DEFAULT 'local',

-- Flexible attributes
`attributes` Map(LowCardinality(String), String),
`resource_attributes` Map(LowCardinality(String), String)
)
ENGINE = MergeTree()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an S3 backed merge tree?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pawel from CH team suggested not to go there unless strictly needed, the migration to one seems super easy code wise but requires extra S3 setup (infra-wise) for where to dump this, I think we can avoid for now but can do if feel strongly about it

PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (team_id, task_id, run_id, timestamp, uuid)
TTL timestamp + INTERVAL 60 DAY
"""


def KAFKA_AGENT_LOGS_TABLE_SQL() -> str:
"""Kafka engine table consuming from agent_logs topic (Avro format from capture-logs)."""
return """
CREATE TABLE IF NOT EXISTS {table_name}
(
`uuid` String,
`trace_id` String,
`span_id` String,
`trace_flags` Int32,
`timestamp` DateTime64(6),
`observed_timestamp` DateTime64(6),
`body` String,
`severity_text` String,
`severity_number` Int32,
`service_name` String,
`resource_attributes` Map(LowCardinality(String), String),
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(LowCardinality(String), String)
)
ENGINE = {engine}
SETTINGS kafka_skip_broken_messages = 100
""".format(
table_name=KAFKA_TABLE_NAME,
engine=kafka_engine(
topic=KAFKA_AGENT_LOGS_TOPIC,
group=CONSUMER_GROUP_AGENT_LOGS,
serialization="Avro",
),
)

Comment on lines +74 to +104
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka engine table definition should be modified to handle test environments where Kafka might not be available. Consider adding a parameter to conditionally use a different engine (like Memory) in test environments, or add a try/catch mechanism in the migration to handle Kafka connection failures gracefully.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


def KAFKA_AGENT_LOGS_MV_SQL() -> str:
"""Materialized view: Kafka → agent_logs table with transformations"""
return """
CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} TO {target_table}
AS SELECT
toUUID(uuid) as uuid,
coalesce(toInt32OrNull(_headers.value[indexOf(_headers.name, 'team_id')]), toInt32OrNull(trimBoth(resource_attributes['team_id'], '"')), 0) as team_id,
trimBoth(coalesce(resource_attributes['task_id'], ''), '"') as task_id,
trimBoth(coalesce(resource_attributes['run_id'], ''), '"') as run_id,
timestamp,
observed_timestamp,
severity_text,
severity_number,
trimBoth(service_name, '"') as service_name,
body,
trimBoth(coalesce(attributes['event_type'], event_name, ''), '"') as event_type,
trimBoth(coalesce(resource_attributes['device_type'], 'local'), '"') as device_type,
mapApply((k, v) -> (k, trimBoth(v, '"')), attributes) as attributes,
mapApply((k, v) -> (k, trimBoth(v, '"')), resource_attributes) as resource_attributes
FROM {kafka_table}
SETTINGS min_insert_block_size_rows=0, min_insert_block_size_bytes=0
""".format(
mv_name=MV_TABLE_NAME,
target_table=AGENT_LOGS_TABLE_NAME,
kafka_table=KAFKA_TABLE_NAME,
)


def LOGS_KAFKA_METRICS_TABLE_SQL() -> str:
"""Shared metrics table for Kafka consumer lag monitoring.

This table may already exist (created by the logs cluster). Using IF NOT EXISTS
so it's safe to run in both CI (where it doesn't exist) and prod (where it does).
"""
return """
CREATE TABLE IF NOT EXISTS logs_kafka_metrics
(
`_partition` UInt32,
`_topic` String,
`max_offset` SimpleAggregateFunction(max, UInt64),
`max_observed_timestamp` SimpleAggregateFunction(max, DateTime64(9)),
`max_timestamp` SimpleAggregateFunction(max, DateTime64(9)),
`max_created_at` SimpleAggregateFunction(max, DateTime64(9)),
`max_lag` SimpleAggregateFunction(max, UInt64)
)
ENGINE = MergeTree
ORDER BY (_topic, _partition)
"""


def KAFKA_AGENT_LOGS_METRICS_MV_SQL() -> str:
"""Materialized view for monitoring agent_logs Kafka consumer lag."""
return """
CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} TO logs_kafka_metrics
AS SELECT
_partition,
_topic,
maxSimpleState(_offset) as max_offset,
maxSimpleState(observed_timestamp) as max_observed_timestamp,
maxSimpleState(timestamp) as max_timestamp,
maxSimpleState(now64(9)) as max_created_at,
maxSimpleState(now64(9) - observed_timestamp) as max_lag
FROM {kafka_table}
GROUP BY _partition, _topic
""".format(
mv_name=METRICS_MV_TABLE_NAME,
kafka_table=KAFKA_TABLE_NAME,
)
1 change: 1 addition & 0 deletions posthog/clickhouse/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class NodeRole(StrEnum):
SHUFFLEHOG = "shufflehog"
ENDPOINTS = "endpoints"
LOGS = "logs"
AGENT_LOGS = "agent_logs"


_default_workload = Workload.ONLINE
Expand Down
45 changes: 45 additions & 0 deletions posthog/clickhouse/migrations/0212_agent_logs_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Migration to create agent_logs tables for Twig agent log ingestion.

This creates a dedicated Kafka→ClickHouse pipeline for Twig agent logs,
bypassing the LogsIngestionConsumer for lower latency and independence
from customer log quotas.

Architecture:
Twig Agent → OTEL → capture-logs-agent → Kafka (agent_logs)
kafka_agent_logs (Kafka engine)
kafka_agent_logs_mv → agent_logs (dedicated table)

Tables created (on dedicated agent_logs nodes):
- agent_logs: Dedicated MergeTree table optimized for task/run queries
- kafka_agent_logs: Kafka engine table consuming from agent_logs topic
- kafka_agent_logs_mv: Materialized view transforming to agent_logs table
- kafka_agent_logs_kafka_metrics_mv: Materialized view for consumer lag metrics
"""

from posthog.clickhouse.agent_logs import (
AGENT_LOGS_TABLE_SQL,
KAFKA_AGENT_LOGS_METRICS_MV_SQL,
KAFKA_AGENT_LOGS_MV_SQL,
KAFKA_AGENT_LOGS_TABLE_SQL,
LOGS_KAFKA_METRICS_TABLE_SQL,
)
from posthog.clickhouse.client.migration_tools import NodeRole, run_sql_with_exceptions

# All agent_logs tables live on dedicated agent_logs nodes, isolated from
# customer log nodes. In dev/test environments (non-cloud), migrations run
# on ALL nodes since we don't have separate ClickHouse topologies.
operations = [
# 1. Create the dedicated agent_logs table
run_sql_with_exceptions(AGENT_LOGS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we specify these nodes, and what are their characteristics? (how big are they, how many, is it the same in EU / US)

Copy link
Copy Markdown
Contributor Author

@tatoalo tatoalo Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The agent-logs nodes are defined in https://github.com/PostHog/charts/pull/8218, they're a new cluster within the existing logs CH.

Specs-wise:

m7i.xlarge instance type across all envs)

Env Shards Replicas CPU Memory Disk Zone
dev 1 1 1 4Gi 50Gi gp3 us-east-1c
prod-us 1 1 2 8Gi 100Gi gp3 us-east-1c
prod-eu 1 1 2 8Gi 100Gi gp3 eu-central-1a

So we are starting with 1 shard / 1 replica volume is not super clear for now.

# 2. Create the Kafka engine table
run_sql_with_exceptions(KAFKA_AGENT_LOGS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]),
# 3. Create the MV that writes to agent_logs table
run_sql_with_exceptions(KAFKA_AGENT_LOGS_MV_SQL(), node_roles=[NodeRole.AGENT_LOGS]),
# 4. Ensure logs_kafka_metrics table exists (already present on logs cluster in prod, needed in CI/dev)
run_sql_with_exceptions(LOGS_KAFKA_METRICS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]),
# 5. Create metrics MV for Kafka consumer lag monitoring
run_sql_with_exceptions(KAFKA_AGENT_LOGS_METRICS_MV_SQL(), node_roles=[NodeRole.AGENT_LOGS]),
]
Comment on lines +34 to +45
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration is failing in CI because it's trying to create Kafka engine tables that connect to a Kafka topic that might not be available in the test environment. We should modify this to check if we're in a test environment and use a different approach, such as creating a dummy table or skipping the Kafka engine table creation. For example, we could add a condition to check for test environment and use a Memory engine instead of Kafka engine, or add error handling to gracefully skip if Kafka connection fails.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

2 changes: 1 addition & 1 deletion posthog/clickhouse/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0211_logs32
0212_agent_logs_kafka
Loading