diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 25c9a2288f46..6cf12289d1e7 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -48,6 +48,10 @@ services: path /i/v1/logs } + @capture-logs-agent { + path /i/v1/agent-logs + } + @flags { path /flags path /flags* @@ -72,6 +76,11 @@ services: reverse_proxy capture-logs:4318 } + handle @capture-logs-agent { + uri replace /i/v1/agent-logs /i/v1/logs + reverse_proxy capture-logs-agent:4319 + } + handle @replay-capture { reverse_proxy replay-capture:3306 } @@ -268,6 +277,16 @@ services: depends_on: - kafka + capture-logs-agent: + extends: + file: docker-compose.base.yml + service: capture-logs + depends_on: + - kafka + environment: + BIND_PORT: '4319' + KAFKA_TOPIC: agent_logs + jaeger: extends: file: docker-compose.base.yml diff --git a/posthog/clickhouse/agent_logs.py b/posthog/clickhouse/agent_logs.py new file mode 100644 index 000000000000..c6a703a064df --- /dev/null +++ b/posthog/clickhouse/agent_logs.py @@ -0,0 +1,173 @@ +"""SQL definitions for agent_logs tables and materialized views. + +This module defines the ClickHouse tables for ingesting Twig agent logs via a dedicated +Kafka pipeline. Agent logs bypass the LogsIngestionConsumer and flow directly from +Kafka to ClickHouse, similar to distinct_id_usage and session_replay_events. + +Architecture: + Twig Agent → OTEL → capture-logs-agent → Kafka (agent_logs) + ↓ + kafka_agent_logs (Kafka engine) + ↓ + kafka_agent_logs_mv → agent_logs (dedicated table) + +The team_id is extracted from Kafka headers (set by capture-logs from API token). +Task/run IDs are extracted from OTEL resource attributes for fast queries. +""" + +from posthog.clickhouse.kafka_engine import kafka_engine + +KAFKA_AGENT_LOGS_TOPIC = "agent_logs" +CONSUMER_GROUP_AGENT_LOGS = "clickhouse_agent_logs" + +AGENT_LOGS_TABLE_NAME = "agent_logs" +KAFKA_TABLE_NAME = "kafka_agent_logs" +MV_TABLE_NAME = "kafka_agent_logs_mv" +METRICS_MV_TABLE_NAME = "kafka_agent_logs_kafka_metrics_mv" + + +def AGENT_LOGS_TABLE_SQL() -> str: + """ + Dedicated agent_logs table optimized for task/run queries. + + Schema designed for Twig agent log access patterns: + - Primary queries by team_id + task_id + run_id + - Secondary queries by team_id + timestamp + - event_type as dedicated column for filtering + """ + return f""" +CREATE TABLE IF NOT EXISTS {AGENT_LOGS_TABLE_NAME} +( + -- Identity + `uuid` UUID DEFAULT generateUUIDv4(), + `team_id` Int32, + `task_id` String, + `run_id` String, + + -- Timestamps + `timestamp` DateTime64(6), + `observed_timestamp` DateTime64(6), + + -- Log metadata + `severity_text` LowCardinality(String) DEFAULT 'INFO', + `severity_number` Int32 DEFAULT 9, + `service_name` LowCardinality(String) DEFAULT 'twig-agent', + + -- Log content + `body` String, + `event_type` LowCardinality(String), + + -- Deployment environment + `device_type` LowCardinality(String) DEFAULT 'local', + + -- Flexible attributes + `attributes` Map(LowCardinality(String), String), + `resource_attributes` Map(LowCardinality(String), String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMMDD(timestamp) +ORDER BY (team_id, task_id, run_id, timestamp, uuid) +TTL timestamp + INTERVAL 60 DAY +""" + + +def KAFKA_AGENT_LOGS_TABLE_SQL() -> str: + """Kafka engine table consuming from agent_logs topic (Avro format from capture-logs).""" + return """ +CREATE TABLE IF NOT EXISTS {table_name} +( + `uuid` String, + `trace_id` String, + `span_id` String, + `trace_flags` Int32, + `timestamp` DateTime64(6), + `observed_timestamp` DateTime64(6), + `body` String, + `severity_text` String, + `severity_number` Int32, + `service_name` String, + `resource_attributes` Map(LowCardinality(String), String), + `instrumentation_scope` String, + `event_name` String, + `attributes` Map(LowCardinality(String), String) +) +ENGINE = {engine} +SETTINGS kafka_skip_broken_messages = 100 +""".format( + table_name=KAFKA_TABLE_NAME, + engine=kafka_engine( + topic=KAFKA_AGENT_LOGS_TOPIC, + group=CONSUMER_GROUP_AGENT_LOGS, + serialization="Avro", + ), + ) + + +def KAFKA_AGENT_LOGS_MV_SQL() -> str: + """Materialized view: Kafka → agent_logs table with transformations""" + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} TO {target_table} +AS SELECT + toUUID(uuid) as uuid, + coalesce(toInt32OrNull(_headers.value[indexOf(_headers.name, 'team_id')]), toInt32OrNull(trimBoth(resource_attributes['team_id'], '"')), 0) as team_id, + trimBoth(coalesce(resource_attributes['task_id'], ''), '"') as task_id, + trimBoth(coalesce(resource_attributes['run_id'], ''), '"') as run_id, + timestamp, + observed_timestamp, + severity_text, + severity_number, + trimBoth(service_name, '"') as service_name, + body, + trimBoth(coalesce(attributes['event_type'], event_name, ''), '"') as event_type, + trimBoth(coalesce(resource_attributes['device_type'], 'local'), '"') as device_type, + mapApply((k, v) -> (k, trimBoth(v, '"')), attributes) as attributes, + mapApply((k, v) -> (k, trimBoth(v, '"')), resource_attributes) as resource_attributes +FROM {kafka_table} +SETTINGS min_insert_block_size_rows=0, min_insert_block_size_bytes=0 +""".format( + mv_name=MV_TABLE_NAME, + target_table=AGENT_LOGS_TABLE_NAME, + kafka_table=KAFKA_TABLE_NAME, + ) + + +def LOGS_KAFKA_METRICS_TABLE_SQL() -> str: + """Shared metrics table for Kafka consumer lag monitoring. + + This table may already exist (created by the logs cluster). Using IF NOT EXISTS + so it's safe to run in both CI (where it doesn't exist) and prod (where it does). + """ + return """ +CREATE TABLE IF NOT EXISTS logs_kafka_metrics +( + `_partition` UInt32, + `_topic` String, + `max_offset` SimpleAggregateFunction(max, UInt64), + `max_observed_timestamp` SimpleAggregateFunction(max, DateTime64(9)), + `max_timestamp` SimpleAggregateFunction(max, DateTime64(9)), + `max_created_at` SimpleAggregateFunction(max, DateTime64(9)), + `max_lag` SimpleAggregateFunction(max, UInt64) +) +ENGINE = MergeTree +ORDER BY (_topic, _partition) +""" + + +def KAFKA_AGENT_LOGS_METRICS_MV_SQL() -> str: + """Materialized view for monitoring agent_logs Kafka consumer lag.""" + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} TO logs_kafka_metrics +AS SELECT + _partition, + _topic, + maxSimpleState(_offset) as max_offset, + maxSimpleState(observed_timestamp) as max_observed_timestamp, + maxSimpleState(timestamp) as max_timestamp, + maxSimpleState(now64(9)) as max_created_at, + maxSimpleState(now64(9) - observed_timestamp) as max_lag +FROM {kafka_table} +GROUP BY _partition, _topic +""".format( + mv_name=METRICS_MV_TABLE_NAME, + kafka_table=KAFKA_TABLE_NAME, + ) diff --git a/posthog/clickhouse/client/connection.py b/posthog/clickhouse/client/connection.py index ef3e456c497b..b3cfa9a28868 100644 --- a/posthog/clickhouse/client/connection.py +++ b/posthog/clickhouse/client/connection.py @@ -44,6 +44,7 @@ class NodeRole(StrEnum): SHUFFLEHOG = "shufflehog" ENDPOINTS = "endpoints" LOGS = "logs" + AGENT_LOGS = "agent_logs" _default_workload = Workload.ONLINE diff --git a/posthog/clickhouse/migrations/0212_agent_logs_kafka.py b/posthog/clickhouse/migrations/0212_agent_logs_kafka.py new file mode 100644 index 000000000000..5a41bd28727e --- /dev/null +++ b/posthog/clickhouse/migrations/0212_agent_logs_kafka.py @@ -0,0 +1,45 @@ +""" +Migration to create agent_logs tables for Twig agent log ingestion. + +This creates a dedicated Kafka→ClickHouse pipeline for Twig agent logs, +bypassing the LogsIngestionConsumer for lower latency and independence +from customer log quotas. + +Architecture: + Twig Agent → OTEL → capture-logs-agent → Kafka (agent_logs) + ↓ + kafka_agent_logs (Kafka engine) + ↓ + kafka_agent_logs_mv → agent_logs (dedicated table) + +Tables created (on dedicated agent_logs nodes): +- agent_logs: Dedicated MergeTree table optimized for task/run queries +- kafka_agent_logs: Kafka engine table consuming from agent_logs topic +- kafka_agent_logs_mv: Materialized view transforming to agent_logs table +- kafka_agent_logs_kafka_metrics_mv: Materialized view for consumer lag metrics +""" + +from posthog.clickhouse.agent_logs import ( + AGENT_LOGS_TABLE_SQL, + KAFKA_AGENT_LOGS_METRICS_MV_SQL, + KAFKA_AGENT_LOGS_MV_SQL, + KAFKA_AGENT_LOGS_TABLE_SQL, + LOGS_KAFKA_METRICS_TABLE_SQL, +) +from posthog.clickhouse.client.migration_tools import NodeRole, run_sql_with_exceptions + +# All agent_logs tables live on dedicated agent_logs nodes, isolated from +# customer log nodes. In dev/test environments (non-cloud), migrations run +# on ALL nodes since we don't have separate ClickHouse topologies. +operations = [ + # 1. Create the dedicated agent_logs table + run_sql_with_exceptions(AGENT_LOGS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]), + # 2. Create the Kafka engine table + run_sql_with_exceptions(KAFKA_AGENT_LOGS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]), + # 3. Create the MV that writes to agent_logs table + run_sql_with_exceptions(KAFKA_AGENT_LOGS_MV_SQL(), node_roles=[NodeRole.AGENT_LOGS]), + # 4. Ensure logs_kafka_metrics table exists (already present on logs cluster in prod, needed in CI/dev) + run_sql_with_exceptions(LOGS_KAFKA_METRICS_TABLE_SQL(), node_roles=[NodeRole.AGENT_LOGS]), + # 5. Create metrics MV for Kafka consumer lag monitoring + run_sql_with_exceptions(KAFKA_AGENT_LOGS_METRICS_MV_SQL(), node_roles=[NodeRole.AGENT_LOGS]), +] diff --git a/posthog/clickhouse/migrations/max_migration.txt b/posthog/clickhouse/migrations/max_migration.txt index 1ffe1feeb2f0..936569be2a84 100644 --- a/posthog/clickhouse/migrations/max_migration.txt +++ b/posthog/clickhouse/migrations/max_migration.txt @@ -1 +1 @@ -0211_logs32 +0212_agent_logs_kafka