Skip to content

Commit c515c8c

Browse files
committed
feat(agent): add OTEL log transport with resource attributes
1 parent 6cf3024 commit c515c8c

14 files changed

Lines changed: 495 additions & 233 deletions

File tree

apps/twig/src/main/services/agent/service.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,12 +430,23 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
430430
const mockNodeDir = this.setupMockNodeEnvironment(taskRunId);
431431
this.setupEnvironment(credentials, mockNodeDir);
432432

433+
// Route agent logs to dedicated agent_logs Kafka topic via capture-logs-agent service
434+
// In local dev, use Caddy proxy (port 8010) which routes /i/v1/agent-logs to capture-logs-agent
435+
// In prod, use the main API host which proxies /i/v1/agent-logs to capture-logs-agent
436+
const otelHost = credentials.apiHost;
437+
const otelPath = "/i/v1/agent-logs";
438+
433439
const agent = new Agent({
434440
posthog: {
435441
apiUrl: credentials.apiHost,
436442
getApiKey: () => this.getToken(credentials.apiKey),
437443
projectId: credentials.projectId,
438444
},
445+
otelTransport: {
446+
host: otelHost,
447+
apiKey: this.getToken(credentials.apiKey),
448+
logsPath: otelPath,
449+
},
439450
debug: !app.isPackaged,
440451
onLog: onAgentLog,
441452
});
@@ -577,7 +588,9 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
577588
return this.getOrCreateSession(config, isReconnect, true);
578589
}
579590
log.error(
580-
`Failed to ${isReconnect ? "reconnect" : "create"} session${isRetry ? " after retry" : ""}`,
591+
`Failed to ${isReconnect ? "reconnect" : "create"} session${
592+
isRetry ? " after retry" : ""
593+
}`,
581594
err,
582595
);
583596
if (isReconnect) return null;

apps/twig/src/main/services/git/service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class GitService extends TypedEventEmitter<GitServiceEvents> {
6464
const remoteUrl = await getRemoteUrl(directoryPath);
6565
if (!remoteUrl) return null;
6666

67-
const repo = await parseGitHubUrl(remoteUrl);
67+
const repo = parseGitHubUrl(remoteUrl);
6868
if (!repo) return null;
6969

7070
const branch = await getCurrentBranch(directoryPath);

packages/agent/package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@
8282
"vitest": "^2.1.8"
8383
},
8484
"dependencies": {
85+
"@opentelemetry/api-logs": "^0.208.0",
86+
"@opentelemetry/exporter-logs-otlp-http": "^0.208.0",
87+
"@opentelemetry/resources": "^2.0.0",
88+
"@opentelemetry/sdk-logs": "^0.208.0",
89+
"@opentelemetry/semantic-conventions": "^1.28.0",
8590
"@agentclientprotocol/sdk": "^0.13.1",
8691
"@anthropic-ai/claude-agent-sdk": "0.2.12",
8792
"@anthropic-ai/sdk": "^0.71.0",

packages/agent/src/adapters/claude/claude-agent.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ import {
2828
type SDKUserMessage,
2929
} from "@anthropic-ai/claude-agent-sdk";
3030
import { v7 as uuidv7 } from "uuid";
31-
import type {
32-
SessionLogConfig,
33-
SessionLogWriter,
34-
} from "@/session-log-writer.js";
31+
import type { SessionContext } from "@/otel-log-writer.js";
32+
import type { SessionLogWriter } from "@/session-log-writer.js";
3533
import { unreachable } from "@/utils/common.js";
3634
import { Logger } from "@/utils/logger.js";
3735
import { Pushable } from "@/utils/streams.js";
@@ -388,7 +386,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
388386
sessionId: string,
389387
meta: Record<string, unknown> | undefined,
390388
) {
391-
const persistence = meta?.persistence as SessionLogConfig | undefined;
389+
const persistence = meta?.persistence as SessionContext | undefined;
392390
if (persistence && this.logWriter) {
393391
this.logWriter.register(sessionId, persistence);
394392
}

packages/agent/src/agent.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,17 @@ export class Agent {
2525

2626
if (config.posthog) {
2727
this.posthogAPI = new PostHogAPIClient(config.posthog);
28-
this.sessionLogWriter = new SessionLogWriter(
29-
this.posthogAPI,
30-
this.logger.child("SessionLogWriter"),
31-
);
28+
}
29+
30+
if (config.otelTransport) {
31+
this.sessionLogWriter = new SessionLogWriter({
32+
otelConfig: {
33+
posthogHost: config.otelTransport.host,
34+
apiKey: config.otelTransport.apiKey,
35+
logsPath: config.otelTransport.logsPath,
36+
},
37+
logger: this.logger.child("SessionLogWriter"),
38+
});
3239
}
3340
}
3441

packages/agent/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ export type {
2222
} from "./adapters/acp-connection.js";
2323
export { createAcpConnection } from "./adapters/acp-connection.js";
2424
export { Agent } from "./agent.js";
25+
export type { OtelLogConfig, SessionContext } from "./otel-log-writer.js";
26+
export { OtelLogWriter } from "./otel-log-writer.js";
2527
export { PostHogAPIClient } from "./posthog-api.js";
2628
export type {
2729
ConversationTurn,
@@ -30,6 +32,8 @@ export type {
3032
ToolCallInfo,
3133
} from "./resume.js";
3234
export { conversationToPromptHistory, resumeFromLog } from "./resume.js";
35+
export type { SessionLogWriterOptions } from "./session-log-writer.js";
36+
export { SessionLogWriter } from "./session-log-writer.js";
3337
export type { TreeSnapshot, TreeTrackerConfig } from "./tree-tracker.js";
3438
export {
3539
isCommitOnRemote,
@@ -44,6 +48,7 @@ export type {
4448
FileStatus,
4549
LogLevel,
4650
OnLogCallback,
51+
OtelTransportConfig,
4752
StoredEntry,
4853
StoredNotification,
4954
Task,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
import { OtelLogWriter } from "./otel-log-writer.js";
3+
import type { StoredNotification } from "./types.js";
4+
5+
// Mock the OTEL exporter
6+
const mockExport = vi.fn((_logs, callback) => {
7+
callback({ code: 0 }); // Success
8+
});
9+
10+
vi.mock("@opentelemetry/exporter-logs-otlp-http", () => ({
11+
OTLPLogExporter: vi.fn().mockImplementation(() => ({
12+
export: mockExport,
13+
shutdown: vi.fn().mockResolvedValue(undefined),
14+
})),
15+
}));
16+
17+
describe("OtelLogWriter", () => {
18+
let writer: OtelLogWriter;
19+
20+
beforeEach(() => {
21+
mockExport.mockClear();
22+
// Session context (taskId, runId) is now passed in constructor as resource attributes
23+
writer = new OtelLogWriter(
24+
{
25+
posthogHost: "https://us.i.posthog.com",
26+
apiKey: "phc_test_key",
27+
flushIntervalMs: 100,
28+
},
29+
{
30+
taskId: "task-123",
31+
runId: "run-456",
32+
},
33+
);
34+
});
35+
36+
afterEach(async () => {
37+
await writer.shutdown();
38+
});
39+
40+
it("should emit a log entry with event_type as regular attribute", async () => {
41+
const notification: StoredNotification = {
42+
type: "notification",
43+
timestamp: new Date().toISOString(),
44+
notification: {
45+
jsonrpc: "2.0",
46+
method: "_posthog/test_event",
47+
params: { foo: "bar" },
48+
},
49+
};
50+
51+
// taskId and runId are now resource attributes set in constructor,
52+
// only notification is passed per-emit
53+
writer.emit({ notification });
54+
55+
// Force flush to trigger export
56+
await writer.flush();
57+
58+
// Verify export was called
59+
expect(mockExport).toHaveBeenCalled();
60+
61+
// Get the logs that were exported
62+
const exportedLogs = mockExport.mock.calls[0][0];
63+
expect(exportedLogs.length).toBe(1);
64+
65+
const log = exportedLogs[0];
66+
// task_id and run_id are now resource attributes, not regular attributes
67+
expect(log.attributes.task_id).toBeUndefined();
68+
expect(log.attributes.run_id).toBeUndefined();
69+
// event_type is still a regular attribute (varies per log entry)
70+
expect(log.attributes.event_type).toBe("_posthog/test_event");
71+
expect(log.body).toBe(JSON.stringify(notification));
72+
73+
// Verify resource attributes contain task_id and run_id
74+
expect(log.resource.attributes.task_id).toBe("task-123");
75+
expect(log.resource.attributes.run_id).toBe("run-456");
76+
expect(log.resource.attributes["service.name"]).toBe("twig-agent");
77+
});
78+
79+
it("should batch multiple log entries", async () => {
80+
const makeNotification = (method: string): StoredNotification => ({
81+
type: "notification",
82+
timestamp: new Date().toISOString(),
83+
notification: {
84+
jsonrpc: "2.0",
85+
method,
86+
},
87+
});
88+
89+
writer.emit({ notification: makeNotification("event_1") });
90+
writer.emit({ notification: makeNotification("event_2") });
91+
writer.emit({ notification: makeNotification("event_3") });
92+
93+
await writer.flush();
94+
95+
expect(mockExport).toHaveBeenCalled();
96+
const exportedLogs = mockExport.mock.calls[0][0];
97+
expect(exportedLogs.length).toBe(3);
98+
99+
// All logs should share the same resource attributes
100+
for (const log of exportedLogs) {
101+
expect(log.resource.attributes.task_id).toBe("task-123");
102+
expect(log.resource.attributes.run_id).toBe("run-456");
103+
}
104+
});
105+
});
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { SeverityNumber } from "@opentelemetry/api-logs";
2+
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
3+
import { resourceFromAttributes } from "@opentelemetry/resources";
4+
import {
5+
BatchLogRecordProcessor,
6+
LoggerProvider,
7+
} from "@opentelemetry/sdk-logs";
8+
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
9+
import type { StoredNotification } from "./types.js";
10+
import { Logger } from "./utils/logger.js";
11+
12+
export interface OtelLogConfig {
13+
/** PostHog ingest host, e.g., "https://us.i.posthog.com" */
14+
posthogHost: string;
15+
/** Project API key, e.g., "phc_xxx" */
16+
apiKey: string;
17+
/** Batch flush interval in ms (default: 500) */
18+
flushIntervalMs?: number;
19+
/** Override the logs endpoint path (default: /i/v1/agent-logs) */
20+
logsPath?: string;
21+
}
22+
23+
/**
24+
* Session context for resource attributes.
25+
* These are set once per OTEL logger instance and indexed via resource_fingerprint
26+
*/
27+
export interface SessionContext {
28+
/** Parent task grouping - all runs for a task share this */
29+
taskId: string;
30+
/** Primary conversation identifier - all events in a run share this */
31+
runId: string;
32+
}
33+
34+
export class OtelLogWriter {
35+
private loggerProvider: LoggerProvider;
36+
private logger: ReturnType<LoggerProvider["getLogger"]>;
37+
private debugLogger: Logger;
38+
private sessionContext: SessionContext;
39+
40+
constructor(
41+
config: OtelLogConfig,
42+
sessionContext: SessionContext,
43+
debugLogger?: Logger,
44+
) {
45+
this.debugLogger =
46+
debugLogger ?? new Logger({ debug: false, prefix: "[OtelLogWriter]" });
47+
this.sessionContext = sessionContext;
48+
49+
const logsPath = config.logsPath ?? "/i/v1/agent-logs";
50+
const exporter = new OTLPLogExporter({
51+
url: `${config.posthogHost}${logsPath}`,
52+
headers: { Authorization: `Bearer ${config.apiKey}` },
53+
});
54+
55+
const processor = new BatchLogRecordProcessor(exporter, {
56+
scheduledDelayMillis: config.flushIntervalMs ?? 500,
57+
});
58+
59+
// Resource attributes are set ONCE per session and indexed via resource_fingerprint
60+
// So we have fast queries by run_id/task_id in PostHog Logs UI
61+
this.loggerProvider = new LoggerProvider({
62+
resource: resourceFromAttributes({
63+
[ATTR_SERVICE_NAME]: "twig-agent",
64+
run_id: sessionContext.runId,
65+
task_id: sessionContext.taskId,
66+
}),
67+
processors: [processor],
68+
});
69+
70+
this.logger = this.loggerProvider.getLogger("agent-session");
71+
}
72+
73+
/**
74+
* Emit an agent event to PostHog Logs via OTEL.
75+
*/
76+
emit(entry: { notification: StoredNotification }): void {
77+
const { notification } = entry;
78+
const eventType = notification.notification.method;
79+
80+
this.logger.emit({
81+
severityNumber: SeverityNumber.INFO,
82+
severityText: "INFO",
83+
body: JSON.stringify(notification),
84+
attributes: {
85+
event_type: eventType,
86+
},
87+
});
88+
89+
this.debugLogger.debug("Emitted OTEL log", {
90+
taskId: this.sessionContext.taskId,
91+
runId: this.sessionContext.runId,
92+
eventType,
93+
});
94+
}
95+
96+
async flush(): Promise<void> {
97+
await this.loggerProvider.forceFlush();
98+
}
99+
100+
async shutdown(): Promise<void> {
101+
await this.loggerProvider.shutdown();
102+
}
103+
}

packages/agent/src/server/agent-server.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,16 +434,20 @@ export class AgentServer {
434434
logger: new Logger({ debug: true, prefix: "[TreeTracker]" }),
435435
});
436436

437-
const posthogAPI = new PostHogAPIClient({
437+
const _posthogAPI = new PostHogAPIClient({
438438
apiUrl: this.config.apiUrl,
439439
projectId: this.config.projectId,
440440
getApiKey: () => this.config.apiKey,
441441
});
442442

443-
const logWriter = new SessionLogWriter(
444-
posthogAPI,
445-
new Logger({ debug: true, prefix: "[SessionLogWriter]" }),
446-
);
443+
const logWriter = new SessionLogWriter({
444+
otelConfig: {
445+
posthogHost: this.config.apiUrl,
446+
apiKey: this.config.apiKey,
447+
logsPath: "/i/v1/agent-logs",
448+
},
449+
logger: new Logger({ debug: true, prefix: "[SessionLogWriter]" }),
450+
});
447451

448452
const acpConnection = createAcpConnection({
449453
sessionId: payload.run_id,

0 commit comments

Comments
 (0)