Skip to content

Commit a24f901

Browse files
committed
feat(agent): add OTEL log transport with resource attributes
chore: cleaner chore: clean
1 parent ad15b28 commit a24f901

15 files changed

Lines changed: 503 additions & 233 deletions

File tree

apps/twig/src/main/services/agent/service.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,12 +430,23 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
430430
const mockNodeDir = this.setupMockNodeEnvironment(taskRunId);
431431
this.setupEnvironment(credentials, mockNodeDir);
432432

433+
// Route agent logs to dedicated agent_logs Kafka topic via capture-logs-agent service
434+
// In local dev, use Caddy proxy (port 8010) which routes /i/v1/agent-logs to capture-logs-agent
435+
// In prod, use the main API host which proxies /i/v1/agent-logs to capture-logs-agent
436+
const otelHost = credentials.apiHost;
437+
const otelPath = "/i/v1/agent-logs";
438+
433439
const agent = new Agent({
434440
posthog: {
435441
apiUrl: credentials.apiHost,
436442
getApiKey: () => this.getToken(credentials.apiKey),
437443
projectId: credentials.projectId,
438444
},
445+
otelTransport: {
446+
host: otelHost,
447+
apiKey: this.getToken(credentials.apiKey),
448+
logsPath: otelPath,
449+
},
439450
debug: !app.isPackaged,
440451
onLog: onAgentLog,
441452
});
@@ -577,7 +588,9 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
577588
return this.getOrCreateSession(config, isReconnect, true);
578589
}
579590
log.error(
580-
`Failed to ${isReconnect ? "reconnect" : "create"} session${isRetry ? " after retry" : ""}`,
591+
`Failed to ${isReconnect ? "reconnect" : "create"} session${
592+
isRetry ? " after retry" : ""
593+
}`,
581594
err,
582595
);
583596
if (isReconnect) return null;

apps/twig/src/main/services/git/service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class GitService extends TypedEventEmitter<GitServiceEvents> {
6464
const remoteUrl = await getRemoteUrl(directoryPath);
6565
if (!remoteUrl) return null;
6666

67-
const repo = await parseGitHubUrl(remoteUrl);
67+
const repo = parseGitHubUrl(remoteUrl);
6868
if (!repo) return null;
6969

7070
const branch = await getCurrentBranch(directoryPath);

packages/agent/package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@
8282
"vitest": "^2.1.8"
8383
},
8484
"dependencies": {
85+
"@opentelemetry/api-logs": "^0.208.0",
86+
"@opentelemetry/exporter-logs-otlp-http": "^0.208.0",
87+
"@opentelemetry/resources": "^2.0.0",
88+
"@opentelemetry/sdk-logs": "^0.208.0",
89+
"@opentelemetry/semantic-conventions": "^1.28.0",
8590
"@agentclientprotocol/sdk": "^0.13.1",
8691
"@anthropic-ai/claude-agent-sdk": "0.2.12",
8792
"@anthropic-ai/sdk": "^0.71.0",

packages/agent/src/adapters/acp-connection.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ export type AcpConnectionConfig = {
1818
logWriter?: SessionLogWriter;
1919
sessionId?: string;
2020
taskId?: string;
21+
/** Deployment environment - "local" for desktop, "cloud" for cloud sandbox */
22+
deviceType?: "local" | "cloud";
2123
logger?: Logger;
2224
processCallbacks?: ClaudeAcpAgentOptions;
2325
};
@@ -52,6 +54,7 @@ export function createAcpConnection(
5254
logWriter.register(config.sessionId, {
5355
taskId: config.taskId ?? config.sessionId,
5456
runId: config.sessionId,
57+
deviceType: config.deviceType,
5558
});
5659
}
5760

packages/agent/src/adapters/claude/claude-agent.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ import {
2828
type SDKUserMessage,
2929
} from "@anthropic-ai/claude-agent-sdk";
3030
import { v7 as uuidv7 } from "uuid";
31-
import type {
32-
SessionLogConfig,
33-
SessionLogWriter,
34-
} from "@/session-log-writer.js";
31+
import type { SessionContext } from "@/otel-log-writer.js";
32+
import type { SessionLogWriter } from "@/session-log-writer.js";
3533
import { unreachable } from "@/utils/common.js";
3634
import { Logger } from "@/utils/logger.js";
3735
import { Pushable } from "@/utils/streams.js";
@@ -388,7 +386,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
388386
sessionId: string,
389387
meta: Record<string, unknown> | undefined,
390388
) {
391-
const persistence = meta?.persistence as SessionLogConfig | undefined;
389+
const persistence = meta?.persistence as SessionContext | undefined;
392390
if (persistence && this.logWriter) {
393391
this.logWriter.register(sessionId, persistence);
394392
}

packages/agent/src/agent.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,17 @@ export class Agent {
2525

2626
if (config.posthog) {
2727
this.posthogAPI = new PostHogAPIClient(config.posthog);
28-
this.sessionLogWriter = new SessionLogWriter(
29-
this.posthogAPI,
30-
this.logger.child("SessionLogWriter"),
31-
);
28+
}
29+
30+
if (config.otelTransport) {
31+
this.sessionLogWriter = new SessionLogWriter({
32+
otelConfig: {
33+
posthogHost: config.otelTransport.host,
34+
apiKey: config.otelTransport.apiKey,
35+
logsPath: config.otelTransport.logsPath,
36+
},
37+
logger: this.logger.child("SessionLogWriter"),
38+
});
3239
}
3340
}
3441

@@ -62,6 +69,7 @@ export class Agent {
6269
logWriter: this.sessionLogWriter,
6370
sessionId: taskRunId,
6471
taskId,
72+
deviceType: "local",
6573
logger: this.logger,
6674
processCallbacks: options.processCallbacks,
6775
});

packages/agent/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ export type {
2222
} from "./adapters/acp-connection.js";
2323
export { createAcpConnection } from "./adapters/acp-connection.js";
2424
export { Agent } from "./agent.js";
25+
export type { OtelLogConfig, SessionContext } from "./otel-log-writer.js";
26+
export { OtelLogWriter } from "./otel-log-writer.js";
2527
export { PostHogAPIClient } from "./posthog-api.js";
2628
export type {
2729
ConversationTurn,
@@ -30,6 +32,8 @@ export type {
3032
ToolCallInfo,
3133
} from "./resume.js";
3234
export { conversationToPromptHistory, resumeFromLog } from "./resume.js";
35+
export type { SessionLogWriterOptions } from "./session-log-writer.js";
36+
export { SessionLogWriter } from "./session-log-writer.js";
3337
export type { TreeSnapshot, TreeTrackerConfig } from "./tree-tracker.js";
3438
export {
3539
isCommitOnRemote,
@@ -44,6 +48,7 @@ export type {
4448
FileStatus,
4549
LogLevel,
4650
OnLogCallback,
51+
OtelTransportConfig,
4752
StoredEntry,
4853
StoredNotification,
4954
Task,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
import { OtelLogWriter } from "./otel-log-writer.js";
3+
import type { StoredNotification } from "./types.js";
4+
5+
// Mock the OTEL exporter
6+
const mockExport = vi.fn((_logs, callback) => {
7+
callback({ code: 0 }); // Success
8+
});
9+
10+
vi.mock("@opentelemetry/exporter-logs-otlp-http", () => ({
11+
OTLPLogExporter: vi.fn().mockImplementation(() => ({
12+
export: mockExport,
13+
shutdown: vi.fn().mockResolvedValue(undefined),
14+
})),
15+
}));
16+
17+
describe("OtelLogWriter", () => {
18+
let writer: OtelLogWriter;
19+
20+
beforeEach(() => {
21+
mockExport.mockClear();
22+
// Session context (taskId, runId) is now passed in constructor as resource attributes
23+
writer = new OtelLogWriter(
24+
{
25+
posthogHost: "https://us.i.posthog.com",
26+
apiKey: "phc_test_key",
27+
flushIntervalMs: 100,
28+
},
29+
{
30+
taskId: "task-123",
31+
runId: "run-456",
32+
},
33+
);
34+
});
35+
36+
afterEach(async () => {
37+
await writer.shutdown();
38+
});
39+
40+
it("should emit a log entry with event_type as regular attribute", async () => {
41+
const notification: StoredNotification = {
42+
type: "notification",
43+
timestamp: new Date().toISOString(),
44+
notification: {
45+
jsonrpc: "2.0",
46+
method: "_posthog/test_event",
47+
params: { foo: "bar" },
48+
},
49+
};
50+
51+
// taskId and runId are now resource attributes set in constructor,
52+
// only notification is passed per-emit
53+
writer.emit({ notification });
54+
55+
// Force flush to trigger export
56+
await writer.flush();
57+
58+
// Verify export was called
59+
expect(mockExport).toHaveBeenCalled();
60+
61+
// Get the logs that were exported
62+
const exportedLogs = mockExport.mock.calls[0][0];
63+
expect(exportedLogs.length).toBe(1);
64+
65+
const log = exportedLogs[0];
66+
// task_id and run_id are now resource attributes, not regular attributes
67+
expect(log.attributes.task_id).toBeUndefined();
68+
expect(log.attributes.run_id).toBeUndefined();
69+
// event_type is still a regular attribute (varies per log entry)
70+
expect(log.attributes.event_type).toBe("_posthog/test_event");
71+
expect(log.body).toBe(JSON.stringify(notification));
72+
73+
// Verify resource attributes contain task_id and run_id
74+
expect(log.resource.attributes.task_id).toBe("task-123");
75+
expect(log.resource.attributes.run_id).toBe("run-456");
76+
expect(log.resource.attributes["service.name"]).toBe("twig-agent");
77+
});
78+
79+
it("should batch multiple log entries", async () => {
80+
const makeNotification = (method: string): StoredNotification => ({
81+
type: "notification",
82+
timestamp: new Date().toISOString(),
83+
notification: {
84+
jsonrpc: "2.0",
85+
method,
86+
},
87+
});
88+
89+
writer.emit({ notification: makeNotification("event_1") });
90+
writer.emit({ notification: makeNotification("event_2") });
91+
writer.emit({ notification: makeNotification("event_3") });
92+
93+
await writer.flush();
94+
95+
expect(mockExport).toHaveBeenCalled();
96+
const exportedLogs = mockExport.mock.calls[0][0];
97+
expect(exportedLogs.length).toBe(3);
98+
99+
// All logs should share the same resource attributes
100+
for (const log of exportedLogs) {
101+
expect(log.resource.attributes.task_id).toBe("task-123");
102+
expect(log.resource.attributes.run_id).toBe("run-456");
103+
}
104+
});
105+
});
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { SeverityNumber } from "@opentelemetry/api-logs";
2+
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
3+
import { resourceFromAttributes } from "@opentelemetry/resources";
4+
import {
5+
BatchLogRecordProcessor,
6+
LoggerProvider,
7+
} from "@opentelemetry/sdk-logs";
8+
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
9+
import type { StoredNotification } from "./types.js";
10+
import { Logger } from "./utils/logger.js";
11+
12+
export interface OtelLogConfig {
13+
/** PostHog ingest host, e.g., "https://us.i.posthog.com" */
14+
posthogHost: string;
15+
/** Project API key, e.g., "phc_xxx" */
16+
apiKey: string;
17+
/** Batch flush interval in ms (default: 500) */
18+
flushIntervalMs?: number;
19+
/** Override the logs endpoint path (default: /i/v1/agent-logs) */
20+
logsPath?: string;
21+
}
22+
23+
/**
24+
* Session context for resource attributes.
25+
* These are set once per OTEL logger instance and indexed via resource_fingerprint
26+
*/
27+
export interface SessionContext {
28+
/** Parent task grouping - all runs for a task share this */
29+
taskId: string;
30+
/** Primary conversation identifier - all events in a run share this */
31+
runId: string;
32+
/** Deployment environment - "local" for desktop, "cloud" for cloud sandbox */
33+
deviceType?: "local" | "cloud";
34+
}
35+
36+
export class OtelLogWriter {
37+
private loggerProvider: LoggerProvider;
38+
private logger: ReturnType<LoggerProvider["getLogger"]>;
39+
private debugLogger: Logger;
40+
private sessionContext: SessionContext;
41+
42+
constructor(
43+
config: OtelLogConfig,
44+
sessionContext: SessionContext,
45+
debugLogger?: Logger,
46+
) {
47+
this.debugLogger =
48+
debugLogger ?? new Logger({ debug: false, prefix: "[OtelLogWriter]" });
49+
this.sessionContext = sessionContext;
50+
51+
const logsPath = config.logsPath ?? "/i/v1/agent-logs";
52+
const exporter = new OTLPLogExporter({
53+
url: `${config.posthogHost}${logsPath}`,
54+
headers: { Authorization: `Bearer ${config.apiKey}` },
55+
});
56+
57+
const processor = new BatchLogRecordProcessor(exporter, {
58+
scheduledDelayMillis: config.flushIntervalMs ?? 500,
59+
});
60+
61+
// Resource attributes are set ONCE per session and indexed via resource_fingerprint
62+
// So we have fast queries by run_id/task_id in PostHog Logs UI
63+
this.loggerProvider = new LoggerProvider({
64+
resource: resourceFromAttributes({
65+
[ATTR_SERVICE_NAME]: "twig-agent",
66+
run_id: sessionContext.runId,
67+
task_id: sessionContext.taskId,
68+
device_type: sessionContext.deviceType ?? "local",
69+
}),
70+
processors: [processor],
71+
});
72+
73+
this.logger = this.loggerProvider.getLogger("agent-session");
74+
}
75+
76+
/**
77+
* Emit an agent event to PostHog Logs via OTEL.
78+
*/
79+
emit(entry: { notification: StoredNotification }): void {
80+
const { notification } = entry;
81+
const eventType = notification.notification.method;
82+
83+
this.logger.emit({
84+
severityNumber: SeverityNumber.INFO,
85+
severityText: "INFO",
86+
body: JSON.stringify(notification),
87+
attributes: {
88+
event_type: eventType,
89+
},
90+
});
91+
92+
this.debugLogger.debug("Emitted OTEL log", {
93+
taskId: this.sessionContext.taskId,
94+
runId: this.sessionContext.runId,
95+
eventType,
96+
});
97+
}
98+
99+
async flush(): Promise<void> {
100+
await this.loggerProvider.forceFlush();
101+
}
102+
103+
async shutdown(): Promise<void> {
104+
await this.loggerProvider.shutdown();
105+
}
106+
}

packages/agent/src/server/agent-server.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,20 +434,25 @@ export class AgentServer {
434434
logger: new Logger({ debug: true, prefix: "[TreeTracker]" }),
435435
});
436436

437-
const posthogAPI = new PostHogAPIClient({
437+
const _posthogAPI = new PostHogAPIClient({
438438
apiUrl: this.config.apiUrl,
439439
projectId: this.config.projectId,
440440
getApiKey: () => this.config.apiKey,
441441
});
442442

443-
const logWriter = new SessionLogWriter(
444-
posthogAPI,
445-
new Logger({ debug: true, prefix: "[SessionLogWriter]" }),
446-
);
443+
const logWriter = new SessionLogWriter({
444+
otelConfig: {
445+
posthogHost: this.config.apiUrl,
446+
apiKey: this.config.apiKey,
447+
logsPath: "/i/v1/agent-logs",
448+
},
449+
logger: new Logger({ debug: true, prefix: "[SessionLogWriter]" }),
450+
});
447451

448452
const acpConnection = createAcpConnection({
449453
sessionId: payload.run_id,
450454
taskId: payload.task_id,
455+
deviceType: deviceInfo.type,
451456
logWriter,
452457
});
453458

0 commit comments

Comments
 (0)