Skip to content

Commit 801e924

Browse files
committed
interactive cloud runs
1 parent 14ef0c8 commit 801e924

6 files changed

Lines changed: 90 additions & 25 deletions

File tree

apps/twig/src/main/services/cloud-task/schemas.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ export const onUpdateInput = z.object({
4444
runId: z.string(),
4545
});
4646

47+
export const setViewingInput = z.object({
48+
taskId: z.string(),
49+
runId: z.string(),
50+
viewing: z.boolean(),
51+
});
52+
4753
export const sendCommandInput = z.object({
4854
taskId: z.string(),
4955
runId: z.string(),

apps/twig/src/main/services/cloud-task/service.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import {
1616
const log = logger.scope("cloud-task");
1717

1818
const LOG_POLL_INTERVAL_MS = 5_000;
19+
const LOG_POLL_INTERVAL_FAST_MS = 1_000;
1920
const STATUS_POLL_INTERVAL_MS = 10_000;
21+
const STATUS_POLL_INTERVAL_FAST_MS = 3_000;
2022

2123
interface TaskRunResponse {
2224
id: string;
@@ -43,6 +45,7 @@ interface WatcherState {
4345
lastBranch: string | null;
4446
lastStatusPollTime: number;
4547
subscriberCount: number;
48+
viewing: boolean;
4649
}
4750

4851
interface PendingWatchState {
@@ -130,6 +133,23 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
130133
}
131134
}
132135

136+
setViewing(taskId: string, runId: string, viewing: boolean): void {
137+
const key = watcherKey(taskId, runId);
138+
const watcher = this.watchers.get(key);
139+
if (!watcher) return;
140+
141+
if (watcher.viewing === viewing) return;
142+
watcher.viewing = viewing;
143+
144+
if (viewing && watcher.pollTimeoutId) {
145+
clearTimeout(watcher.pollTimeoutId);
146+
watcher.pollTimeoutId = null;
147+
this.schedulePoll(key);
148+
}
149+
150+
log.info("Cloud task watcher viewing changed", { key, viewing });
151+
}
152+
133153
async sendCommand(input: SendCommandInput): Promise<SendCommandOutput> {
134154
if (!this.apiKey) {
135155
return { success: false, error: "No API token available" };
@@ -242,6 +262,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
242262
lastBranch: null,
243263
lastStatusPollTime: 0,
244264
subscriberCount,
265+
viewing: false,
245266
};
246267

247268
this.watchers.set(key, watcher);
@@ -268,10 +289,14 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
268289
const watcher = this.watchers.get(key);
269290
if (!watcher) return;
270291

292+
const interval = watcher.viewing
293+
? LOG_POLL_INTERVAL_FAST_MS
294+
: LOG_POLL_INTERVAL_MS;
295+
271296
watcher.pollTimeoutId = setTimeout(() => {
272297
watcher.pollTimeoutId = null;
273298
this.poll(key, false);
274-
}, LOG_POLL_INTERVAL_MS);
299+
}, interval);
275300
}
276301

277302
private async poll(key: string, isSnapshot: boolean): Promise<void> {
@@ -284,9 +309,11 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
284309

285310
// Fetch status if snapshot or interval elapsed
286311
const now = Date.now();
312+
const statusInterval = watcher.viewing
313+
? STATUS_POLL_INTERVAL_FAST_MS
314+
: STATUS_POLL_INTERVAL_MS;
287315
const shouldFetchStatus =
288-
isSnapshot ||
289-
now - watcher.lastStatusPollTime >= STATUS_POLL_INTERVAL_MS;
316+
isSnapshot || now - watcher.lastStatusPollTime >= statusInterval;
290317

291318
let statusResult: TaskRunResponse | null = null;
292319
let statusChanged = false;

apps/twig/src/main/trpc/routers/cloud-task.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
onUpdateInput,
66
sendCommandInput,
77
sendCommandOutput,
8+
setViewingInput,
89
unwatchInput,
910
updateTokenInput,
1011
watchInput,
@@ -28,6 +29,12 @@ export const cloudTaskRouter = router({
2829
.input(updateTokenInput)
2930
.mutation(({ input }) => getService().updateToken(input.token)),
3031

32+
setViewing: publicProcedure
33+
.input(setViewingInput)
34+
.mutation(({ input }) =>
35+
getService().setViewing(input.taskId, input.runId, input.viewing),
36+
),
37+
3138
sendCommand: publicProcedure
3239
.input(sendCommandInput)
3340
.output(sendCommandOutput)

apps/twig/src/renderer/features/sessions/components/buildConversationItems.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,7 @@ export function buildConversationItems(
123123
if (!isPromptPending) {
124124
for (const turn of b.pendingPrompts.values()) {
125125
turn.isComplete = true;
126-
turn.stopReason = "cancelled";
127-
turn.context.turnCancelled = true;
128126
turn.context.turnComplete = true;
129-
b.items.push({
130-
type: "turn_cancelled",
131-
id: `${turn.id}-cancelled`,
132-
interruptReason: turn.interruptReason,
133-
});
134127
}
135128
}
136129

apps/twig/src/renderer/features/sessions/service/service.ts

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -761,34 +761,49 @@ export class SessionService {
761761
this.previewAbort = null;
762762
}
763763

764+
private updatePromptStateFromEvents(
765+
taskRunId: string,
766+
events: AcpMessage[],
767+
): void {
768+
for (const acpMsg of events) {
769+
const msg = acpMsg.message;
770+
if (isJsonRpcRequest(msg) && msg.method === "session/prompt") {
771+
sessionStoreSetters.updateSession(taskRunId, {
772+
isPromptPending: true,
773+
promptStartedAt: acpMsg.ts,
774+
});
775+
}
776+
if (
777+
"id" in msg &&
778+
"result" in msg &&
779+
typeof msg.result === "object" &&
780+
msg.result !== null &&
781+
"stopReason" in msg.result
782+
) {
783+
sessionStoreSetters.updateSession(taskRunId, {
784+
isPromptPending: false,
785+
promptStartedAt: null,
786+
});
787+
}
788+
}
789+
}
790+
764791
private handleSessionEvent(taskRunId: string, acpMsg: AcpMessage): void {
765792
const session = sessionStoreSetters.getSessions()[taskRunId];
766793
if (!session) return;
767794

768795
sessionStoreSetters.appendEvents(taskRunId, [acpMsg]);
796+
this.updatePromptStateFromEvents(taskRunId, [acpMsg]);
769797

770798
const msg = acpMsg.message;
771799

772-
if (isJsonRpcRequest(msg) && msg.method === "session/prompt") {
773-
// acpMsg.ts is local time (set in-process) — matches Date.now() used in sendLocalPrompt
774-
sessionStoreSetters.updateSession(taskRunId, {
775-
isPromptPending: true,
776-
promptStartedAt: acpMsg.ts,
777-
});
778-
}
779-
780800
if (
781801
"id" in msg &&
782802
"result" in msg &&
783803
typeof msg.result === "object" &&
784804
msg.result !== null &&
785805
"stopReason" in msg.result
786806
) {
787-
sessionStoreSetters.updateSession(taskRunId, {
788-
isPromptPending: false,
789-
promptStartedAt: null,
790-
});
791-
792807
const stopReason = (msg.result as { stopReason?: string }).stopReason;
793808
if (stopReason) {
794809
notifyPromptComplete(session.taskTitle, stopReason);
@@ -1068,6 +1083,11 @@ export class SessionService {
10681083
const session = sessionStoreSetters.getSessionByTaskId(taskId);
10691084
if (!session) return false;
10701085

1086+
sessionStoreSetters.updateSession(session.taskRunId, {
1087+
isPromptPending: false,
1088+
promptStartedAt: null,
1089+
});
1090+
10711091
if (session.isCloud) {
10721092
return this.cancelCloudPrompt(session);
10731093
}
@@ -1704,6 +1724,7 @@ export class SessionService {
17041724
const entriesToAppend = update.newEntries.slice(-delta);
17051725
const newEvents = convertStoredEntriesToEvents(entriesToAppend);
17061726
sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount);
1727+
this.updatePromptStateFromEvents(taskRunId, newEvents);
17071728
} else {
17081729
// Gap in data — append everything we have but don't jump processedLineCount
17091730
log.warn("Cloud task log count inconsistency", {
@@ -1718,6 +1739,7 @@ export class SessionService {
17181739
newEvents,
17191740
currentCount + update.newEntries.length,
17201741
);
1742+
this.updatePromptStateFromEvents(taskRunId, newEvents);
17211743
}
17221744
}
17231745
// Update cloud status fields if present

apps/twig/src/renderer/features/task-detail/components/TaskLogsPanel.tsx

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,23 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
106106
// Cloud task watching — logs + status via main-process CloudTaskService subscription
107107
useEffect(() => {
108108
if (!isCloud || !task.latest_run?.id) return;
109-
return getSessionService().watchCloudTask(
109+
const runId = task.latest_run.id;
110+
trpcVanilla.cloudTask.setViewing
111+
.mutate({ taskId: task.id, runId, viewing: true })
112+
.catch(() => {});
113+
const cleanup = getSessionService().watchCloudTask(
110114
task.id,
111-
task.latest_run.id,
115+
runId,
112116
() => {
113117
queryClient.invalidateQueries({ queryKey: ["tasks"] });
114118
},
115119
);
120+
return () => {
121+
trpcVanilla.cloudTask.setViewing
122+
.mutate({ taskId: task.id, runId, viewing: false })
123+
.catch(() => {});
124+
cleanup?.();
125+
};
116126
}, [isCloud, task.id, task.latest_run?.id, queryClient]);
117127

118128
// Local session connection

0 commit comments

Comments
 (0)