Skip to content

Commit 42f38f1

Browse files
authored
feat: cache logs locally (#937)
1 parent 023cdda commit 42f38f1

6 files changed

Lines changed: 208 additions & 52 deletions

File tree

apps/twig/src/main/services/agent/service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
468468
projectId: credentials.projectId,
469469
},
470470
skipLogPersistence: isPreview,
471+
localCachePath: join(app.getPath("home"), ".twig"),
471472
debug: !app.isPackaged,
472473
onLog: onAgentLog,
473474
});

apps/twig/src/main/trpc/routers/logs.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,29 @@
1+
import fs from "node:fs";
2+
import path from "node:path";
3+
import { app } from "electron";
14
import { z } from "zod";
25
import { logger } from "../../lib/logger";
36
import { publicProcedure, router } from "../trpc.js";
47

58
const log = logger.scope("logsRouter");
69

10+
function getLocalLogPath(taskRunId: string): string {
11+
return path.join(
12+
app.getPath("home"),
13+
".twig",
14+
"sessions",
15+
taskRunId,
16+
"logs.ndjson",
17+
);
18+
}
19+
720
export const logsRouter = router({
8-
/**
9-
* Fetch logs from S3 using presigned URL
10-
*/
1121
fetchS3Logs: publicProcedure
1222
.input(z.object({ logUrl: z.string() }))
1323
.query(async ({ input }) => {
1424
try {
1525
const response = await fetch(input.logUrl);
1626

17-
// 404 is expected for new task runs - file doesn't exist yet
1827
if (response.status === 404) {
1928
return null;
2029
}
@@ -34,4 +43,33 @@ export const logsRouter = router({
3443
return null;
3544
}
3645
}),
46+
47+
readLocalLogs: publicProcedure
48+
.input(z.object({ taskRunId: z.string() }))
49+
.query(async ({ input }) => {
50+
const logPath = getLocalLogPath(input.taskRunId);
51+
try {
52+
return await fs.promises.readFile(logPath, "utf-8");
53+
} catch (error) {
54+
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
55+
return null;
56+
}
57+
log.warn("Failed to read local logs:", error);
58+
return null;
59+
}
60+
}),
61+
62+
writeLocalLogs: publicProcedure
63+
.input(z.object({ taskRunId: z.string(), content: z.string() }))
64+
.mutation(async ({ input }) => {
65+
const logPath = getLocalLogPath(input.taskRunId);
66+
const logDir = path.dirname(logPath);
67+
68+
try {
69+
await fs.promises.mkdir(logDir, { recursive: true });
70+
await fs.promises.writeFile(logPath, input.content, "utf-8");
71+
} catch (error) {
72+
log.warn("Failed to write local logs:", error);
73+
}
74+
}),
3775
});

apps/twig/src/renderer/features/sessions/service/service.ts

Lines changed: 112 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,30 @@ export class SessionService {
180180
}
181181

182182
if (latestRun?.id && latestRun?.log_url) {
183-
// Start workspace verify and log fetch in parallel
183+
if (!getIsOnline()) {
184+
log.info("Skipping connection attempt - offline", { taskId });
185+
const { rawEntries } = await this.fetchSessionLogs(
186+
latestRun.log_url,
187+
latestRun.id,
188+
);
189+
const events = convertStoredEntriesToEvents(rawEntries);
190+
const session = this.createBaseSession(
191+
latestRun.id,
192+
taskId,
193+
taskTitle,
194+
);
195+
session.events = events;
196+
session.logUrl = latestRun.log_url;
197+
session.status = "disconnected";
198+
session.errorMessage =
199+
"No internet connection. Connect when you're back online.";
200+
sessionStoreSetters.setSession(session);
201+
return;
202+
}
203+
184204
const [workspaceResult, logResult] = await Promise.all([
185205
trpcVanilla.workspace.verify.query({ taskId }),
186-
this.fetchSessionLogs(latestRun.log_url),
206+
this.fetchSessionLogs(latestRun.log_url, latestRun.id),
187207
]);
188208

189209
if (!workspaceResult.exists) {
@@ -192,7 +212,6 @@ export class SessionService {
192212
missingPath: workspaceResult.missingPath,
193213
});
194214
const events = convertStoredEntriesToEvents(logResult.rawEntries);
195-
196215
const session = this.createBaseSession(
197216
latestRun.id,
198217
taskId,
@@ -204,21 +223,6 @@ export class SessionService {
204223
session.errorMessage = workspaceResult.missingPath
205224
? `Working directory no longer exists: ${workspaceResult.missingPath}`
206225
: "The working directory for this task no longer exists. Please start a new task.";
207-
208-
sessionStoreSetters.setSession(session);
209-
return;
210-
}
211-
212-
if (!getIsOnline()) {
213-
log.info("Skipping connection attempt - offline", { taskId });
214-
const session = this.createBaseSession(
215-
latestRun.id,
216-
taskId,
217-
taskTitle,
218-
);
219-
session.status = "disconnected";
220-
session.errorMessage =
221-
"No internet connection. Connect when you're back online.";
222226
sessionStoreSetters.setSession(session);
223227
return;
224228
}
@@ -267,7 +271,10 @@ export class SessionService {
267271

268272
if (latestRun?.log_url) {
269273
try {
270-
const { rawEntries } = await this.fetchSessionLogs(latestRun.log_url);
274+
const { rawEntries } = await this.fetchSessionLogs(
275+
latestRun.log_url,
276+
latestRun.id,
277+
);
271278
session.events = convertStoredEntriesToEvents(rawEntries);
272279
session.logUrl = latestRun.log_url;
273280
} catch {
@@ -293,16 +300,14 @@ export class SessionService {
293300
},
294301
): Promise<void> {
295302
const { rawEntries, sessionId, adapter } =
296-
prefetchedLogs ?? (await this.fetchSessionLogs(logUrl));
303+
prefetchedLogs ?? (await this.fetchSessionLogs(logUrl, taskRunId));
297304
const events = convertStoredEntriesToEvents(rawEntries);
298305

299-
// Resolve adapter from logs or persisted store
300306
const storedAdapter = useSessionAdapterStore
301307
.getState()
302308
.getAdapter(taskRunId);
303309
const resolvedAdapter = adapter ?? storedAdapter;
304310

305-
// Get persisted config options for this task run
306311
const persistedConfigOptions = getPersistedConfigOptions(taskRunId);
307312

308313
const session = this.createBaseSession(taskRunId, taskId, taskTitle);
@@ -318,13 +323,34 @@ export class SessionService {
318323

319324
sessionStoreSetters.setSession(session);
320325
this.subscribeToChannel(taskRunId);
326+
sessionStoreSetters.updateSession(taskRunId, { status: "connected" });
321327

322328
try {
323329
const persistedMode = getConfigOptionByCategory(
324330
persistedConfigOptions,
325331
"mode",
326332
)?.currentValue;
327333

334+
trpcVanilla.workspace.verify
335+
.query({ taskId })
336+
.then((workspaceResult) => {
337+
if (!workspaceResult.exists) {
338+
log.warn("Workspace no longer exists", {
339+
taskId,
340+
missingPath: workspaceResult.missingPath,
341+
});
342+
sessionStoreSetters.updateSession(taskRunId, {
343+
status: "error",
344+
errorMessage: workspaceResult.missingPath
345+
? `Working directory no longer exists: ${workspaceResult.missingPath}`
346+
: "The working directory for this task no longer exists. Please start a new task.",
347+
});
348+
}
349+
})
350+
.catch((err) => {
351+
log.warn("Failed to verify workspace", { taskId, err });
352+
});
353+
328354
const { customInstructions } = useSettingsStore.getState();
329355
const result = await trpcVanilla.agent.reconnect.mutate({
330356
taskId,
@@ -1496,45 +1522,83 @@ export class SessionService {
14961522
return { apiKey, apiHost, projectId, client };
14971523
}
14981524

1499-
private async fetchSessionLogs(logUrl: string): Promise<{
1525+
private parseLogContent(content: string): {
1526+
rawEntries: StoredLogEntry[];
1527+
sessionId?: string;
1528+
adapter?: Adapter;
1529+
} {
1530+
const rawEntries: StoredLogEntry[] = [];
1531+
let sessionId: string | undefined;
1532+
let adapter: Adapter | undefined;
1533+
1534+
for (const line of content.trim().split("\n")) {
1535+
try {
1536+
const stored = JSON.parse(line) as StoredLogEntry;
1537+
rawEntries.push(stored);
1538+
1539+
if (
1540+
stored.type === "notification" &&
1541+
stored.notification?.method?.endsWith("posthog/sdk_session")
1542+
) {
1543+
const params = stored.notification.params as {
1544+
sessionId?: string;
1545+
sdkSessionId?: string;
1546+
adapter?: Adapter;
1547+
};
1548+
if (params?.sessionId) sessionId = params.sessionId;
1549+
else if (params?.sdkSessionId) sessionId = params.sdkSessionId;
1550+
if (params?.adapter) adapter = params.adapter;
1551+
}
1552+
} catch {
1553+
log.warn("Failed to parse log entry", { line });
1554+
}
1555+
}
1556+
1557+
return { rawEntries, sessionId, adapter };
1558+
}
1559+
1560+
private async fetchSessionLogs(
1561+
logUrl: string,
1562+
taskRunId?: string,
1563+
): Promise<{
15001564
rawEntries: StoredLogEntry[];
15011565
sessionId?: string;
15021566
adapter?: Adapter;
15031567
}> {
1568+
if (!logUrl && !taskRunId) return { rawEntries: [] };
1569+
1570+
if (taskRunId) {
1571+
try {
1572+
const localContent = await trpcVanilla.logs.readLocalLogs.query({
1573+
taskRunId,
1574+
});
1575+
if (localContent?.trim()) {
1576+
return this.parseLogContent(localContent);
1577+
}
1578+
} catch {
1579+
log.warn("Failed to read local logs, falling back to S3", {
1580+
taskRunId,
1581+
});
1582+
}
1583+
}
1584+
15041585
if (!logUrl) return { rawEntries: [] };
15051586

15061587
try {
15071588
const content = await trpcVanilla.logs.fetchS3Logs.query({ logUrl });
15081589
if (!content?.trim()) return { rawEntries: [] };
15091590

1510-
const rawEntries: StoredLogEntry[] = [];
1511-
let sessionId: string | undefined;
1512-
let adapter: Adapter | undefined;
1513-
1514-
for (const line of content.trim().split("\n")) {
1515-
try {
1516-
const stored = JSON.parse(line) as StoredLogEntry;
1517-
rawEntries.push(stored);
1591+
const result = this.parseLogContent(content);
15181592

1519-
if (
1520-
stored.type === "notification" &&
1521-
stored.notification?.method?.endsWith("posthog/sdk_session")
1522-
) {
1523-
const params = stored.notification.params as {
1524-
sessionId?: string;
1525-
sdkSessionId?: string;
1526-
adapter?: Adapter;
1527-
};
1528-
if (params?.sessionId) sessionId = params.sessionId;
1529-
else if (params?.sdkSessionId) sessionId = params.sdkSessionId;
1530-
if (params?.adapter) adapter = params.adapter;
1531-
}
1532-
} catch {
1533-
log.warn("Failed to parse log entry", { line });
1534-
}
1593+
if (taskRunId && result.rawEntries.length > 0) {
1594+
trpcVanilla.logs.writeLocalLogs
1595+
.mutate({ taskRunId, content })
1596+
.catch((err) => {
1597+
log.warn("Failed to cache S3 logs locally", { taskRunId, err });
1598+
});
15351599
}
15361600

1537-
return { rawEntries, sessionId, adapter };
1601+
return result;
15381602
} catch {
15391603
return { rawEntries: [] };
15401604
}

packages/agent/src/agent.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class Agent {
3636
this.sessionLogWriter = new SessionLogWriter({
3737
posthogAPI: this.posthogAPI,
3838
logger: this.logger.child("SessionLogWriter"),
39+
localCachePath: config.localCachePath,
3940
});
4041
}
4142
}

0 commit comments

Comments
 (0)