Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function useSessionConnection({
() => {
queryClient.invalidateQueries({ queryKey: ["tasks"] });
},
task.latest_run?.log_url,
);
return cleanup;
}, [
Expand All @@ -91,6 +92,7 @@ export function useSessionConnection({
queryClient,
task.id,
task.latest_run?.id,
task.latest_run?.log_url,
]);

useEffect(() => {
Expand Down
53 changes: 53 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,59 @@ describe("SessionService", () => {
);
});

it("hydrates a fresh cloud session from persisted logs before replay arrives", async () => {
const service = getSessionService();
const hydratedSession = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
taskTitle: "Cloud Task",
status: "disconnected",
isCloud: true,
events: [],
});

mockSessionStoreSetters.getSessionByTaskId.mockImplementation(() => {
return hydratedSession;
});
mockTrpcLogs.readLocalLogs.query.mockResolvedValue("");
mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(
JSON.stringify({
type: "notification",
timestamp: "2024-01-01T00:00:00Z",
notification: {
method: "session/update",
params: {
update: {
sessionUpdate: "assistant_message",
},
},
},
}),
);
mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined);

service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
undefined,
"https://logs.example.com/run-123",
);

await vi.waitFor(() => {
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({
events: [],
isCloud: true,
logUrl: "https://logs.example.com/run-123",
processedLineCount: 1,
}),
);
});
});

it("ignores stale async starts when the same watcher is replaced", async () => {
const service = getSessionService();
let resolveFirstWatchStart!: () => void;
Expand Down
50 changes: 50 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,7 @@ export class SessionService {
apiHost: string,
teamId: number,
onStatusChange?: () => void,
logUrl?: string,
): () => void {
const taskRunId = runId;
const startToken = ++this.nextCloudTaskWatchToken;
Expand Down Expand Up @@ -1947,6 +1948,11 @@ export class SessionService {
existing?.taskRunId === taskRunId &&
existing.events.length > 0 &&
existing.processedLineCount === undefined;
const shouldHydrateSession =
!existing ||
existing.taskRunId !== taskRunId ||
shouldResetExistingSession ||
existing.events.length === 0;

if (
!existing ||
Expand All @@ -1964,6 +1970,10 @@ export class SessionService {
});
}

if (shouldHydrateSession) {
this.hydrateCloudTaskSessionFromLogs(taskId, taskRunId, logUrl);
}

// Subscribe before starting the main-process watcher so the first replayed
// SSE/log burst cannot race ahead of the renderer subscription.
const subscription = trpcClient.cloudTask.onUpdate.subscribe(
Expand Down Expand Up @@ -2025,6 +2035,46 @@ export class SessionService {
return () => {};
}

private hydrateCloudTaskSessionFromLogs(
taskId: string,
taskRunId: string,
logUrl?: string,
): void {
void (async () => {
const { rawEntries } = await this.fetchSessionLogs(logUrl, taskRunId);
if (rawEntries.length === 0) {
return;
}

const session = sessionStoreSetters.getSessionByTaskId(taskId);
if (!session || session.taskRunId !== taskRunId) {
return;
}

// If live updates already populated a processed count, don't overwrite
// that newer state with the persisted baseline fetched during startup.
if (
session.processedLineCount !== undefined &&
session.processedLineCount > 0
) {
return;
}

sessionStoreSetters.updateSession(taskRunId, {
events: convertStoredEntriesToEvents(rawEntries),
isCloud: true,
logUrl: logUrl ?? session.logUrl,
processedLineCount: rawEntries.length,
});
})().catch((err: unknown) => {
log.warn("Failed to hydrate cloud task session from logs", {
taskId,
taskRunId,
err,
});
});
}

private isCurrentCloudTaskWatcher(
taskId: string,
runId: string,
Expand Down
Loading