Skip to content

Commit 8876c8f

Browse files
authored
refactor(agent): remove OTEL write path from SessionLogWriter (#902)
1 parent e1ec517 commit 8876c8f

4 files changed

Lines changed: 82 additions & 159 deletions

File tree

packages/agent/src/agent.ts

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,11 @@ export class Agent {
3232
this.posthogAPI = new PostHogAPIClient(config.posthog);
3333
}
3434

35-
if (!config.skipLogPersistence) {
36-
if (config.otelTransport) {
37-
// OTEL pipeline: use OtelLogWriter only (no S3 writer)
38-
this.sessionLogWriter = new SessionLogWriter({
39-
otelConfig: {
40-
posthogHost: config.otelTransport.host,
41-
apiKey: config.otelTransport.apiKey,
42-
logsPath: config.otelTransport.logsPath,
43-
},
44-
logger: this.logger.child("SessionLogWriter"),
45-
});
46-
} else if (config.posthog) {
47-
// Legacy: use S3 writer via PostHog API
48-
this.sessionLogWriter = new SessionLogWriter({
49-
posthogAPI: this.posthogAPI,
50-
logger: this.logger.child("SessionLogWriter"),
51-
});
52-
}
35+
if (config.posthog && !config.skipLogPersistence) {
36+
this.sessionLogWriter = new SessionLogWriter({
37+
posthogAPI: this.posthogAPI,
38+
logger: this.logger.child("SessionLogWriter"),
39+
});
5340
}
5441
}
5542

packages/agent/src/server/agent-server.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -464,18 +464,14 @@ export class AgentServer {
464464
logger: new Logger({ debug: true, prefix: "[TreeTracker]" }),
465465
});
466466

467-
const _posthogAPI = new PostHogAPIClient({
467+
const posthogAPI = new PostHogAPIClient({
468468
apiUrl: this.config.apiUrl,
469469
projectId: this.config.projectId,
470470
getApiKey: () => this.config.apiKey,
471471
});
472472

473473
const logWriter = new SessionLogWriter({
474-
otelConfig: {
475-
posthogHost: this.config.apiUrl,
476-
apiKey: this.config.apiKey,
477-
logsPath: "/i/v1/agent-logs",
478-
},
474+
posthogAPI,
479475
logger: new Logger({ debug: true, prefix: "[SessionLogWriter]" }),
480476
});
481477

Lines changed: 72 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,140 +1,111 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2-
import { OtelLogWriter } from "./otel-log-writer.js";
2+
import type { PostHogAPIClient } from "./posthog-api.js";
33
import { SessionLogWriter } from "./session-log-writer.js";
4-
5-
// Mock the OtelLogWriter
6-
vi.mock("./otel-log-writer.js", () => ({
7-
OtelLogWriter: vi.fn(),
8-
}));
4+
import type { StoredNotification } from "./types.js";
5+
6+
function makeSessionUpdate(
7+
sessionUpdate: string,
8+
extra: Record<string, unknown> = {},
9+
): string {
10+
return JSON.stringify({
11+
jsonrpc: "2.0",
12+
method: "session/update",
13+
params: { update: { sessionUpdate, ...extra } },
14+
});
15+
}
916

1017
describe("SessionLogWriter", () => {
1118
let logWriter: SessionLogWriter;
12-
let mockEmit: ReturnType<typeof vi.fn>;
13-
let mockFlush: ReturnType<typeof vi.fn>;
14-
let mockShutdown: ReturnType<typeof vi.fn>;
19+
let mockAppendLog: ReturnType<typeof vi.fn>;
20+
let mockPosthogAPI: PostHogAPIClient;
1521

1622
beforeEach(() => {
17-
mockEmit = vi.fn();
18-
mockFlush = vi.fn().mockResolvedValue(undefined);
19-
mockShutdown = vi.fn().mockResolvedValue(undefined);
20-
21-
vi.mocked(OtelLogWriter).mockImplementation(
22-
() =>
23-
({
24-
emit: mockEmit,
25-
flush: mockFlush,
26-
shutdown: mockShutdown,
27-
}) as unknown as OtelLogWriter,
28-
);
29-
30-
logWriter = new SessionLogWriter({
31-
otelConfig: {
32-
posthogHost: "http://localhost:8000",
33-
apiKey: "test-api-key",
34-
logsPath: "/i/v1/agent-logs",
35-
},
36-
});
23+
mockAppendLog = vi.fn().mockResolvedValue(undefined);
24+
mockPosthogAPI = {
25+
appendTaskRunLog: mockAppendLog,
26+
} as unknown as PostHogAPIClient;
27+
28+
logWriter = new SessionLogWriter({ posthogAPI: mockPosthogAPI });
3729
});
3830

3931
afterEach(() => {
4032
vi.restoreAllMocks();
4133
});
4234

4335
describe("appendRawLine", () => {
44-
it("emits entries immediately via OtelLogWriter", () => {
45-
const sessionId = "test-session";
46-
logWriter.register(sessionId, { taskId: "task-1", runId: sessionId });
36+
it("queues entries for flush", async () => {
37+
const sessionId = "s1";
38+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
4739

48-
logWriter.appendRawLine(
49-
sessionId,
50-
JSON.stringify({ method: "test", params: {} }),
51-
);
52-
logWriter.appendRawLine(
53-
sessionId,
54-
JSON.stringify({ method: "test2", params: {} }),
55-
);
40+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "test" }));
41+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "test2" }));
5642

57-
expect(mockEmit).toHaveBeenCalledTimes(2);
58-
});
43+
await logWriter.flush(sessionId);
5944

60-
it("wraps raw messages in StoredNotification format", () => {
61-
const sessionId = "test-session";
62-
logWriter.register(sessionId, { taskId: "task-1", runId: sessionId });
63-
64-
const message = {
65-
jsonrpc: "2.0",
66-
method: "session/update",
67-
params: { foo: "bar" },
68-
};
69-
logWriter.appendRawLine(sessionId, JSON.stringify(message));
70-
71-
expect(mockEmit).toHaveBeenCalledTimes(1);
72-
const emitArg = mockEmit.mock.calls[0][0];
73-
expect(emitArg.notification.type).toBe("notification");
74-
expect(emitArg.notification.timestamp).toBeDefined();
75-
expect(emitArg.notification.notification).toEqual(message);
45+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
46+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
47+
expect(entries).toHaveLength(2);
7648
});
7749

78-
it("ignores unregistered sessions", () => {
79-
logWriter.appendRawLine(
80-
"unknown-session",
81-
JSON.stringify({ method: "test" }),
82-
);
83-
84-
expect(mockEmit).not.toHaveBeenCalled();
50+
it("ignores unregistered sessions", async () => {
51+
logWriter.appendRawLine("unknown", JSON.stringify({ method: "test" }));
52+
await logWriter.flush("unknown");
53+
expect(mockAppendLog).not.toHaveBeenCalled();
8554
});
8655

87-
it("ignores invalid JSON", () => {
88-
const sessionId = "test-session";
89-
logWriter.register(sessionId, { taskId: "task-1", runId: sessionId });
56+
it("ignores invalid JSON", async () => {
57+
const sessionId = "s1";
58+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
9059

9160
logWriter.appendRawLine(sessionId, "not valid json {{{");
92-
93-
expect(mockEmit).not.toHaveBeenCalled();
61+
await logWriter.flush(sessionId);
62+
expect(mockAppendLog).not.toHaveBeenCalled();
9463
});
9564
});
9665

97-
describe("flush", () => {
98-
it("calls flush on OtelLogWriter", async () => {
99-
const sessionId = "test-session";
100-
logWriter.register(sessionId, { taskId: "task-1", runId: sessionId });
66+
describe("agent_message_chunk coalescing", () => {
67+
it("coalesces consecutive chunks into a single agent_message", async () => {
68+
const sessionId = "s1";
69+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
10170

102-
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "test" }));
103-
await logWriter.flush(sessionId);
71+
logWriter.appendRawLine(
72+
sessionId,
73+
makeSessionUpdate("agent_message_chunk", {
74+
content: { type: "text", text: "Hello " },
75+
}),
76+
);
77+
logWriter.appendRawLine(
78+
sessionId,
79+
makeSessionUpdate("agent_message_chunk", {
80+
content: { type: "text", text: "world" },
81+
}),
82+
);
83+
// Non-chunk event triggers flush of chunks
84+
logWriter.appendRawLine(
85+
sessionId,
86+
makeSessionUpdate("tool_call", { toolCallId: "tc1" }),
87+
);
10488

105-
expect(mockFlush).toHaveBeenCalledTimes(1);
106-
});
89+
await logWriter.flush(sessionId);
10790

108-
it("does nothing for unregistered sessions", async () => {
109-
await logWriter.flush("unknown-session");
91+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
92+
expect(entries).toHaveLength(2); // coalesced message + tool_call
11093

111-
expect(mockFlush).not.toHaveBeenCalled();
94+
const coalesced = entries[0].notification;
95+
expect(coalesced.params?.update).toEqual({
96+
sessionUpdate: "agent_message",
97+
content: { type: "text", text: "Hello world" },
98+
});
11299
});
113100
});
114101

115102
describe("register", () => {
116-
it("creates OtelLogWriter with session context", () => {
117-
const sessionId = "test-session";
118-
const context = { taskId: "task-1", runId: sessionId };
119-
120-
logWriter.register(sessionId, context);
121-
122-
expect(OtelLogWriter).toHaveBeenCalledWith(
123-
expect.objectContaining({
124-
posthogHost: "http://localhost:8000",
125-
apiKey: "test-api-key",
126-
}),
127-
context,
128-
expect.anything(),
129-
);
130-
});
131-
132103
it("does not re-register existing sessions", () => {
133-
const sessionId = "test-session";
134-
logWriter.register(sessionId, { taskId: "task-1", runId: sessionId });
135-
logWriter.register(sessionId, { taskId: "task-2", runId: sessionId });
104+
const sessionId = "s1";
105+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
106+
logWriter.register(sessionId, { taskId: "t2", runId: sessionId });
136107

137-
expect(OtelLogWriter).toHaveBeenCalledTimes(1);
108+
expect(logWriter.isRegistered(sessionId)).toBe(true);
138109
});
139110
});
140111
});

packages/agent/src/session-log-writer.ts

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
1-
import {
2-
type OtelLogConfig,
3-
OtelLogWriter,
4-
type SessionContext,
5-
} from "./otel-log-writer.js";
1+
import type { SessionContext } from "./otel-log-writer.js";
62
import type { PostHogAPIClient } from "./posthog-api.js";
73
import type { StoredNotification } from "./types.js";
84
import { Logger } from "./utils/logger.js";
95

106
export interface SessionLogWriterOptions {
11-
/** OTEL config for creating writers per session */
12-
otelConfig?: OtelLogConfig;
13-
/** PostHog API client for S3 log persistence */
7+
/** PostHog API client for log persistence */
148
posthogAPI?: PostHogAPIClient;
159
/** Logger instance */
1610
logger?: Logger;
@@ -23,21 +17,18 @@ interface ChunkBuffer {
2317

2418
interface SessionState {
2519
context: SessionContext;
26-
otelWriter?: OtelLogWriter;
2720
chunkBuffer?: ChunkBuffer;
2821
}
2922

3023
export class SessionLogWriter {
3124
private posthogAPI?: PostHogAPIClient;
32-
private otelConfig?: OtelLogConfig;
3325
private pendingEntries: Map<string, StoredNotification[]> = new Map();
3426
private flushTimeouts: Map<string, NodeJS.Timeout> = new Map();
3527
private sessions: Map<string, SessionState> = new Map();
3628
private logger: Logger;
3729

3830
constructor(options: SessionLogWriterOptions = {}) {
3931
this.posthogAPI = options.posthogAPI;
40-
this.otelConfig = options.otelConfig;
4132
this.logger =
4233
options.logger ??
4334
new Logger({ debug: false, prefix: "[SessionLogWriter]" });
@@ -56,17 +47,7 @@ export class SessionLogWriter {
5647
return;
5748
}
5849

59-
let otelWriter: OtelLogWriter | undefined;
60-
if (this.otelConfig) {
61-
// Create a dedicated OtelLogWriter for this session with resource attributes
62-
otelWriter = new OtelLogWriter(
63-
this.otelConfig,
64-
context,
65-
this.logger.child(`OtelWriter:${sessionId}`),
66-
);
67-
}
68-
69-
this.sessions.set(sessionId, { context, otelWriter });
50+
this.sessions.set(sessionId, { context });
7051
}
7152

7253
isRegistered(sessionId: string): boolean {
@@ -106,10 +87,6 @@ export class SessionLogWriter {
10687
notification: message,
10788
};
10889

109-
if (session.otelWriter) {
110-
session.otelWriter.emit({ notification: entry });
111-
}
112-
11390
if (this.posthogAPI) {
11491
const pending = this.pendingEntries.get(sessionId) ?? [];
11592
pending.push(entry);
@@ -131,10 +108,6 @@ export class SessionLogWriter {
131108
// Emit any buffered chunks before flushing
132109
this.emitCoalescedMessage(sessionId, session);
133110

134-
if (session.otelWriter) {
135-
await session.otelWriter.flush();
136-
}
137-
138111
const pending = this.pendingEntries.get(sessionId);
139112
if (!this.posthogAPI || !pending?.length) return;
140113

@@ -196,10 +169,6 @@ export class SessionLogWriter {
196169
},
197170
};
198171

199-
if (session.otelWriter) {
200-
session.otelWriter.emit({ notification: entry });
201-
}
202-
203172
if (this.posthogAPI) {
204173
const pending = this.pendingEntries.get(sessionId) ?? [];
205174
pending.push(entry);

0 commit comments

Comments
 (0)