Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions packages/trigger-sdk/src/v3/chat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,40 @@ describe("TriggerChatTransport", () => {
expect(subscribe!).toContain("/realtime/v1/sessions/chat-by-chatid/out");
});

it("routes .out SSE through streamBaseURL while appends stay on baseURL", async () => {
const requests: string[] = [];
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
const urlStr = typeof url === "string" ? url : url.toString();
requests.push(urlStr);
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
throw new Error(`Unexpected URL: ${urlStr}`);
});

const transport = new TriggerChatTransport({
task: "my-chat-task",
accessToken: () => "pat",
baseURL: "https://api.test.trigger.dev",
streamBaseURL: "https://chat-proxy.example.com",
sessions: { "chat-split": { publicAccessToken: "p" } },
});

const stream = await transport.sendMessages({
trigger: "submit-message",
chatId: "chat-split",
messageId: undefined,
messages: [createUserMessage("Hi")],
abortSignal: undefined,
});
await drainChunks(stream);

const append = requests.find(isSessionStreamAppendUrl);
const subscribe = requests.find(isSessionOutSubscribeUrl);
expect(append!.startsWith("https://api.test.trigger.dev/")).toBe(true);
expect(subscribe!.startsWith("https://chat-proxy.example.com/")).toBe(true);
expect(subscribe!).toContain("/realtime/v1/sessions/chat-split/out");
});

it("for submit-message, only the latest message is delivered to .in", async () => {
// Slim wire: each `.in/append` carries at most ONE new message in
// `payload.message` (singular). Even if the caller hands sendMessages
Expand Down
13 changes: 12 additions & 1 deletion packages/trigger-sdk/src/v3/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ export type TriggerChatTransportOptions<TClientData = unknown> = {
/** Base URL for the Trigger.dev API. @default "https://api.trigger.dev" */
baseURL?: string;

/**
* Base URL for the SSE stream subscription only (`GET .../sessions/{chatId}/out`).
* Falls back to `baseURL` when unset. Set this to route the long-lived
* stream through a custom proxy (e.g. a Cloudflare worker capturing JA4
* fingerprints for bot detection) while keeping append POSTs direct to
* `baseURL` to avoid an extra hop on every user message.
*/
streamBaseURL?: string;

/** Additional headers included in every API request. */
headers?: Record<string, string>;

Expand Down Expand Up @@ -346,6 +355,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
| ((params: StartSessionParams<Record<string, unknown>>) => Promise<StartSessionResult>)
| undefined;
private readonly baseURL: string;
private readonly streamBaseURL: string;
private readonly extraHeaders: Record<string, string>;
private readonly streamTimeoutSeconds: number;
private defaultMetadata: Record<string, unknown> | undefined;
Expand All @@ -367,6 +377,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
| ((params: StartSessionParams<Record<string, unknown>>) => Promise<StartSessionResult>)
| undefined;
this.baseURL = options.baseURL ?? DEFAULT_BASE_URL;
this.streamBaseURL = options.streamBaseURL ?? this.baseURL;
this.extraHeaders = options.headers ?? {};
this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;
this.defaultMetadata = options.clientData;
Expand Down Expand Up @@ -1021,7 +1032,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
);
}

const streamUrl = `${this.baseURL}/realtime/v1/sessions/${encodeURIComponent(chatId)}/out`;
const streamUrl = `${this.streamBaseURL}/realtime/v1/sessions/${encodeURIComponent(chatId)}/out`;

return new ReadableStream<UIMessageChunk>({
start: async (controller) => {
Expand Down