Skip to content

Commit c695f35

Browse files
authored
fix: add back s3 writer (#811)
1 parent f328f5f commit c695f35

2 files changed

Lines changed: 56 additions & 13 deletions

File tree

packages/agent/src/agent.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ export class Agent {
2727
this.posthogAPI = new PostHogAPIClient(config.posthog);
2828
}
2929

30-
if (config.otelTransport) {
30+
if (config.posthog || config.otelTransport) {
3131
this.sessionLogWriter = new SessionLogWriter({
32-
otelConfig: {
33-
posthogHost: config.otelTransport.host,
34-
apiKey: config.otelTransport.apiKey,
35-
logsPath: config.otelTransport.logsPath,
36-
},
32+
posthogAPI: this.posthogAPI,
33+
otelConfig: config.otelTransport
34+
? {
35+
posthogHost: config.otelTransport.host,
36+
apiKey: config.otelTransport.apiKey,
37+
logsPath: config.otelTransport.logsPath,
38+
}
39+
: undefined,
3740
logger: this.logger.child("SessionLogWriter"),
3841
});
3942
}

packages/agent/src/session-log-writer.ts

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ import {
33
OtelLogWriter,
44
type SessionContext,
55
} from "./otel-log-writer.js";
6+
import type { PostHogAPIClient } from "./posthog-api.js";
67
import type { StoredNotification } from "./types.js";
78
import { Logger } from "./utils/logger.js";
89

910
export interface SessionLogWriterOptions {
1011
/** OTEL config for creating writers per session */
1112
otelConfig?: OtelLogConfig;
13+
/** PostHog API client for S3 log persistence */
14+
posthogAPI?: PostHogAPIClient;
1215
/** Logger instance */
1316
logger?: Logger;
1417
}
@@ -19,11 +22,15 @@ interface SessionState {
1922
}
2023

2124
export class SessionLogWriter {
25+
private posthogAPI?: PostHogAPIClient;
2226
private otelConfig?: OtelLogConfig;
27+
private pendingEntries: Map<string, StoredNotification[]> = new Map();
28+
private flushTimeouts: Map<string, NodeJS.Timeout> = new Map();
2329
private sessions: Map<string, SessionState> = new Map();
2430
private logger: Logger;
2531

2632
constructor(options: SessionLogWriterOptions = {}) {
33+
this.posthogAPI = options.posthogAPI;
2734
this.otelConfig = options.otelConfig;
2835
this.logger =
2936
options.logger ??
@@ -84,11 +91,6 @@ export class SessionLogWriter {
8491
return;
8592
}
8693

87-
if (!session.otelWriter) {
88-
this.logger.debug("No OTEL writer configured for session, skipping log");
89-
return;
90-
}
91-
9294
try {
9395
const message = JSON.parse(line);
9496
const entry: StoredNotification = {
@@ -97,7 +99,16 @@ export class SessionLogWriter {
9799
notification: message,
98100
};
99101

100-
session.otelWriter.emit({ notification: entry });
102+
if (session.otelWriter) {
103+
session.otelWriter.emit({ notification: entry });
104+
}
105+
106+
if (this.posthogAPI) {
107+
const pending = this.pendingEntries.get(sessionId) ?? [];
108+
pending.push(entry);
109+
this.pendingEntries.set(sessionId, pending);
110+
this.scheduleFlush(sessionId);
111+
}
101112
} catch {
102113
this.logger.warn("Failed to parse raw line for persistence", {
103114
sessionId,
@@ -108,8 +119,37 @@ export class SessionLogWriter {
108119

109120
async flush(sessionId: string): Promise<void> {
110121
const session = this.sessions.get(sessionId);
111-
if (session?.otelWriter) {
122+
if (!session) return;
123+
124+
if (session.otelWriter) {
112125
await session.otelWriter.flush();
113126
}
127+
128+
const pending = this.pendingEntries.get(sessionId);
129+
if (!this.posthogAPI || !pending?.length) return;
130+
131+
this.pendingEntries.delete(sessionId);
132+
const timeout = this.flushTimeouts.get(sessionId);
133+
if (timeout) {
134+
clearTimeout(timeout);
135+
this.flushTimeouts.delete(sessionId);
136+
}
137+
138+
try {
139+
await this.posthogAPI.appendTaskRunLog(
140+
session.context.taskId,
141+
session.context.runId,
142+
pending,
143+
);
144+
} catch (error) {
145+
this.logger.error("Failed to persist session logs:", error);
146+
}
147+
}
148+
149+
private scheduleFlush(sessionId: string): void {
150+
const existing = this.flushTimeouts.get(sessionId);
151+
if (existing) clearTimeout(existing);
152+
const timeout = setTimeout(() => this.flush(sessionId), 500);
153+
this.flushTimeouts.set(sessionId, timeout);
114154
}
115155
}

0 commit comments

Comments
 (0)