Skip to content

Commit b6e5b93

Browse files
authored
feat(cloud): ability to send messages and cancel prompts for cloud agents (#981)
Enables interactive command sending and cancellation for cloud runs, leveraging the new PostHog API command endpoint to provide the same functionality as local agents. Previously, cloud runs in Twig were largely read-only, limiting user interaction. This PR integrates Twig's frontend and backend with the new PostHog API `/command` endpoint (introduced in #48688), allowing users to send commands (like `user_message` and `cancel`) to active cloud agents, thereby removing the read-only restriction for interactive sessions. --- [Slack Thread](https://posthog.slack.com/archives/D09AS56P2FJ/p1771835444684379?thread_ts=1771835444.684379&cid=D09AS56P2FJ) <p><a href="https://cursor.com/agents?id=bc-e9591cdf-a7e7-4101-b483-8f817e2e9b68"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cursor.com/assets/images/open-in-web-dark.png"><source media="(prefers-color-scheme: light)" srcset="https://cursor.com/assets/images/open-in-web-light.png"><img alt="Open in Web" width="114" height="28" src="https://cursor.com/assets/images/open-in-web-dark.png"></picture></a>&nbsp;<a href="https://cursor.com/background-agent?bcId=bc-e9591cdf-a7e7-4101-b483-8f817e2e9b68"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cursor.com/assets/images/open-in-cursor-dark.png"><source media="(prefers-color-scheme: light)" srcset="https://cursor.com/assets/images/open-in-cursor-light.png"><img alt="Open in Cursor" width="131" height="28" src="https://cursor.com/assets/images/open-in-cursor-dark.png"></picture></a>&nbsp;</p>
1 parent 1816895 commit b6e5b93

10 files changed

Lines changed: 495 additions & 79 deletions

File tree

apps/twig/src/main/services/cloud-task/schemas.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,28 @@ export const onUpdateInput = z.object({
4343
taskId: z.string(),
4444
runId: z.string(),
4545
});
46+
47+
export const setViewingInput = z.object({
48+
taskId: z.string(),
49+
runId: z.string(),
50+
viewing: z.boolean(),
51+
});
52+
53+
export const sendCommandInput = z.object({
54+
taskId: z.string(),
55+
runId: z.string(),
56+
apiHost: z.string(),
57+
teamId: z.number(),
58+
method: z.enum(["user_message", "cancel", "close"]),
59+
params: z.record(z.string(), z.unknown()).optional(),
60+
});
61+
62+
export type SendCommandInput = z.infer<typeof sendCommandInput>;
63+
64+
export const sendCommandOutput = z.object({
65+
success: z.boolean(),
66+
result: z.unknown().optional(),
67+
error: z.string().optional(),
68+
});
69+
70+
export type SendCommandOutput = z.infer<typeof sendCommandOutput>;

apps/twig/src/main/services/cloud-task/service.ts

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { TypedEventEmitter } from "../../lib/typed-event-emitter.js";
66
import {
77
CloudTaskEvent,
88
type CloudTaskEvents,
9+
type SendCommandInput,
10+
type SendCommandOutput,
911
type TaskRunStatus,
1012
TERMINAL_STATUSES,
1113
type WatchInput,
@@ -14,7 +16,9 @@ import {
1416
const log = logger.scope("cloud-task");
1517

1618
const LOG_POLL_INTERVAL_MS = 5_000;
19+
const LOG_POLL_INTERVAL_FAST_MS = 1_000;
1720
const STATUS_POLL_INTERVAL_MS = 10_000;
21+
const STATUS_POLL_INTERVAL_FAST_MS = 3_000;
1822

1923
interface TaskRunResponse {
2024
id: string;
@@ -41,6 +45,7 @@ interface WatcherState {
4145
lastBranch: string | null;
4246
lastStatusPollTime: number;
4347
subscriberCount: number;
48+
viewing: boolean;
4449
}
4550

4651
interface PendingWatchState {
@@ -128,6 +133,106 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
128133
}
129134
}
130135

136+
setViewing(taskId: string, runId: string, viewing: boolean): void {
137+
const key = watcherKey(taskId, runId);
138+
const watcher = this.watchers.get(key);
139+
if (!watcher) return;
140+
141+
if (watcher.viewing === viewing) return;
142+
watcher.viewing = viewing;
143+
144+
if (viewing && watcher.pollTimeoutId) {
145+
clearTimeout(watcher.pollTimeoutId);
146+
watcher.pollTimeoutId = null;
147+
this.schedulePoll(key);
148+
}
149+
150+
log.info("Cloud task watcher viewing changed", { key, viewing });
151+
}
152+
153+
async sendCommand(input: SendCommandInput): Promise<SendCommandOutput> {
154+
if (!this.apiKey) {
155+
return { success: false, error: "No API token available" };
156+
}
157+
158+
const url = `${input.apiHost}/api/projects/${input.teamId}/tasks/${input.taskId}/runs/${input.runId}/command/`;
159+
const body = {
160+
jsonrpc: "2.0",
161+
method: input.method,
162+
params: input.params ?? {},
163+
id: `twig-${Date.now()}`,
164+
};
165+
166+
try {
167+
const response = await net.fetch(url, {
168+
method: "POST",
169+
headers: {
170+
Authorization: `Bearer ${this.apiKey}`,
171+
"Content-Type": "application/json",
172+
},
173+
body: JSON.stringify(body),
174+
});
175+
176+
if (!response.ok) {
177+
const errorText = await response.text().catch(() => "");
178+
let errorMessage = `Command failed with status ${response.status}`;
179+
try {
180+
const errorJson = JSON.parse(errorText);
181+
if (errorJson.error?.message) {
182+
errorMessage = errorJson.error.message;
183+
} else if (errorJson.error) {
184+
errorMessage =
185+
typeof errorJson.error === "string"
186+
? errorJson.error
187+
: JSON.stringify(errorJson.error);
188+
}
189+
} catch {
190+
if (errorText) errorMessage = errorText;
191+
}
192+
193+
log.warn("Cloud task command failed", {
194+
taskId: input.taskId,
195+
runId: input.runId,
196+
method: input.method,
197+
status: response.status,
198+
error: errorMessage,
199+
});
200+
return { success: false, error: errorMessage };
201+
}
202+
203+
const data = await response.json();
204+
205+
if (data.error) {
206+
log.warn("Cloud task command returned error", {
207+
taskId: input.taskId,
208+
method: input.method,
209+
error: data.error,
210+
});
211+
return {
212+
success: false,
213+
error: data.error.message ?? JSON.stringify(data.error),
214+
};
215+
}
216+
217+
log.info("Cloud task command sent", {
218+
taskId: input.taskId,
219+
runId: input.runId,
220+
method: input.method,
221+
});
222+
223+
return { success: true, result: data.result };
224+
} catch (error) {
225+
const errorMessage =
226+
error instanceof Error ? error.message : "Unknown error";
227+
log.error("Cloud task command error", {
228+
taskId: input.taskId,
229+
method: input.method,
230+
error: errorMessage,
231+
});
232+
return { success: false, error: errorMessage };
233+
}
234+
}
235+
131236
@preDestroy()
132237
unwatchAll(): void {
133238
for (const key of [...this.watchers.keys()]) {
@@ -157,6 +262,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
157262
lastBranch: null,
158263
lastStatusPollTime: 0,
159264
subscriberCount,
265+
viewing: false,
160266
};
161267

162268
this.watchers.set(key, watcher);
@@ -183,10 +289,14 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
183289
const watcher = this.watchers.get(key);
184290
if (!watcher) return;
185291

292+
const interval = watcher.viewing
293+
? LOG_POLL_INTERVAL_FAST_MS
294+
: LOG_POLL_INTERVAL_MS;
295+
186296
watcher.pollTimeoutId = setTimeout(() => {
187297
watcher.pollTimeoutId = null;
188298
this.poll(key, false);
189-
}, LOG_POLL_INTERVAL_MS);
299+
}, interval);
190300
}
191301

192302
private async poll(key: string, isSnapshot: boolean): Promise<void> {
@@ -199,9 +309,11 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
199309

200310
// Fetch status if snapshot or interval elapsed
201311
const now = Date.now();
312+
const statusInterval = watcher.viewing
313+
? STATUS_POLL_INTERVAL_FAST_MS
314+
: STATUS_POLL_INTERVAL_MS;
202315
const shouldFetchStatus =
203-
isSnapshot ||
204-
now - watcher.lastStatusPollTime >= STATUS_POLL_INTERVAL_MS;
316+
isSnapshot || now - watcher.lastStatusPollTime >= statusInterval;
205317

206318
let statusResult: TaskRunResponse | null = null;
207319
let statusChanged = false;

apps/twig/src/main/trpc/routers/cloud-task.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { MAIN_TOKENS } from "../../di/tokens.js";
33
import {
44
CloudTaskEvent,
55
onUpdateInput,
6+
sendCommandInput,
7+
sendCommandOutput,
8+
setViewingInput,
69
unwatchInput,
710
updateTokenInput,
811
watchInput,
@@ -26,6 +29,17 @@ export const cloudTaskRouter = router({
2629
.input(updateTokenInput)
2730
.mutation(({ input }) => getService().updateToken(input.token)),
2831

32+
setViewing: publicProcedure
33+
.input(setViewingInput)
34+
.mutation(({ input }) =>
35+
getService().setViewing(input.taskId, input.runId, input.viewing),
36+
),
37+
38+
sendCommand: publicProcedure
39+
.input(sendCommandInput)
40+
.output(sendCommandOutput)
41+
.mutation(({ input }) => getService().sendCommand(input)),
42+
2943
onUpdate: publicProcedure
3044
.input(onUpdateInput)
3145
.subscription(async function* (opts) {

apps/twig/src/renderer/api/posthogClient.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,63 @@ export class PostHogAPIClient {
276276
});
277277
}
278278

279+
async sendRunCommand(
280+
taskId: string,
281+
runId: string,
282+
method: "user_message" | "cancel" | "close",
283+
params?: Record<string, unknown>,
284+
): Promise<{ success: boolean; result?: unknown; error?: string }> {
285+
const teamId = await this.getTeamId();
286+
const url = new URL(
287+
`${this.api.baseUrl}/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/command/`,
288+
);
289+
const body = {
290+
jsonrpc: "2.0",
291+
method,
292+
params: params ?? {},
293+
id: `twig-${Date.now()}`,
294+
};
295+
296+
try {
297+
const response = await this.api.fetcher.fetch({
298+
method: "post",
299+
url,
300+
path: `/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/command/`,
301+
overrides: {
302+
body: JSON.stringify(body),
303+
},
304+
});
305+
306+
if (!response.ok) {
307+
const errorText = await response.text().catch(() => "");
308+
let errorMessage = `Command failed: ${response.statusText}`;
309+
try {
310+
const errorJson = JSON.parse(errorText);
311+
errorMessage =
312+
errorJson.error?.message ?? errorJson.error ?? errorMessage;
313+
} catch {
314+
if (errorText) errorMessage = errorText;
315+
}
316+
return { success: false, error: errorMessage };
317+
}
318+
319+
const data = await response.json();
320+
if (data.error) {
321+
return {
322+
success: false,
323+
error: data.error.message ?? JSON.stringify(data.error),
324+
};
325+
}
326+
327+
return { success: true, result: data.result };
328+
} catch (error) {
329+
return {
330+
success: false,
331+
error: error instanceof Error ? error.message : "Unknown error",
332+
};
333+
}
334+
}
335+
279336
async runTaskInCloud(taskId: string): Promise<Task> {
280337
const teamId = await this.getTeamId();
281338

apps/twig/src/renderer/features/sessions/components/buildConversationItems.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,7 @@ export function buildConversationItems(
123123
if (!isPromptPending) {
124124
for (const turn of b.pendingPrompts.values()) {
125125
turn.isComplete = true;
126-
turn.stopReason = "cancelled";
127-
turn.context.turnCancelled = true;
128126
turn.context.turnComplete = true;
129-
b.items.push({
130-
type: "turn_cancelled",
131-
id: `${turn.id}-cancelled`,
132-
interruptReason: turn.interruptReason,
133-
});
134127
}
135128
}
136129

0 commit comments

Comments
 (0)