Skip to content

Commit dcba4e9

Browse files
committed
refactor: unify stream next proxies
1 parent 2462ff9 commit dcba4e9

12 files changed

Lines changed: 387 additions & 156 deletions

File tree

web/src/app/api/_utils/backend-proxy.test.ts

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ vi.mock("./auth-headers", () => ({
1010
withBackendAuth: withBackendAuthMock,
1111
}))
1212

13-
import { apiBaseUrl, proxyJson, proxyText } from "./backend-proxy"
13+
import { apiBaseUrl, proxyJson, proxyStream, proxyText } from "./backend-proxy"
1414

1515
describe("apiBaseUrl", () => {
1616
it("uses the shared backend base URL helper", () => {
@@ -172,4 +172,84 @@ describe("backend proxy helpers", () => {
172172
)
173173
expect(res.status).toBe(200)
174174
})
175+
176+
it("passes through SSE responses without buffering", async () => {
177+
withBackendAuthMock.mockResolvedValue({
178+
"Content-Type": "application/json",
179+
authorization: "Bearer stream-token",
180+
})
181+
182+
const fetchMock = vi.fn(async () =>
183+
new Response("data: ping\n\n", {
184+
status: 200,
185+
headers: { "Content-Type": "text/event-stream" },
186+
}),
187+
)
188+
global.fetch = fetchMock as typeof fetch
189+
190+
const req = new Request("https://localhost/api/studio/chat", {
191+
method: "POST",
192+
headers: { "Content-Type": "application/json" },
193+
body: JSON.stringify({ prompt: "hello" }),
194+
})
195+
const res = await proxyStream(req, "https://backend/api/studio/chat", "POST", {
196+
auth: true,
197+
responseContentType: "text/event-stream",
198+
})
199+
200+
expect(withBackendAuthMock).toHaveBeenCalledTimes(1)
201+
const calls = fetchMock.mock.calls as unknown[][]
202+
const init = calls[0]?.[1] as
203+
| (RequestInit & { dispatcher?: unknown })
204+
| undefined
205+
expect(init).toBeDefined()
206+
expect(init).toMatchObject({
207+
method: "POST",
208+
headers: {
209+
"Content-Type": "application/json",
210+
authorization: "Bearer stream-token",
211+
},
212+
body: JSON.stringify({ prompt: "hello" }),
213+
})
214+
expect(init?.signal).toBeUndefined()
215+
expect(init?.dispatcher).toBeDefined()
216+
expect(res.headers.get("content-type")).toContain("text/event-stream")
217+
expect(await res.text()).toContain("data: ping")
218+
})
219+
220+
it("returns JSON when a stream route falls back to a non-stream response", async () => {
221+
withBackendAuthMock.mockResolvedValue({
222+
Accept: "text/event-stream, application/json",
223+
"Content-Type": "application/json",
224+
authorization: "Bearer stream-token",
225+
})
226+
227+
const fetchMock = vi.fn(async () =>
228+
new Response(JSON.stringify({ done: true }), {
229+
status: 200,
230+
headers: { "Content-Type": "application/json" },
231+
}),
232+
)
233+
global.fetch = fetchMock as typeof fetch
234+
235+
const req = new Request("https://localhost/api/research/paperscool/daily", {
236+
method: "POST",
237+
headers: { "Content-Type": "application/json" },
238+
body: JSON.stringify({ query: "llm" }),
239+
})
240+
const res = await proxyStream(
241+
req,
242+
"https://backend/api/research/paperscool/daily",
243+
"POST",
244+
{
245+
accept: "text/event-stream, application/json",
246+
auth: true,
247+
passthroughNonStreamResponse: true,
248+
responseContentType: "text/event-stream",
249+
},
250+
)
251+
252+
expect(res.headers.get("content-type")).toContain("application/json")
253+
await expect(res.json()).resolves.toEqual({ done: true })
254+
})
175255
})

web/src/app/api/_utils/backend-proxy.ts

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { Agent } from "undici"
2+
13
import { backendBaseUrl, withBackendAuth } from "./auth-headers"
24

35
export type ProxyMethod = "DELETE" | "GET" | "HEAD" | "PATCH" | "POST" | "PUT"
@@ -22,7 +24,17 @@ type TextProxyOptions = ProxyOptions & {
2224
responseHeaders?: HeadersInit
2325
}
2426

27+
type StreamProxyOptions = ProxyOptions & {
28+
dispatcher?: Agent
29+
passthroughNonStreamResponse?: boolean
30+
responseContentType?: string
31+
}
32+
2533
const DEFAULT_TIMEOUT_MS = 120_000
34+
const SSE_DISPATCHER = new Agent({
35+
bodyTimeout: 0,
36+
headersTimeout: 0,
37+
})
2638

2739
export function apiBaseUrl(): string {
2840
return backendBaseUrl()
@@ -65,11 +77,58 @@ export async function proxyText(
6577
}
6678
}
6779

80+
export async function proxyStream(
81+
req: Request,
82+
upstreamUrl: string,
83+
method: ProxyMethod,
84+
options: StreamProxyOptions = {},
85+
): Promise<Response> {
86+
const requestOptions = {
87+
responseContentType: "text/event-stream",
88+
timeoutMs: 0,
89+
...options,
90+
}
91+
92+
try {
93+
const upstream = await fetchUpstream(req, upstreamUrl, method, requestOptions, {
94+
dispatcher: requestOptions.dispatcher ?? SSE_DISPATCHER,
95+
})
96+
const upstreamContentType = upstream.headers.get("content-type") || ""
97+
98+
if (
99+
requestOptions.passthroughNonStreamResponse &&
100+
!upstreamContentType.includes("text/event-stream")
101+
) {
102+
const text = await upstream.text()
103+
return buildTextResponse(text, upstream, {
104+
responseContentType: requestOptions.responseContentType ?? "application/json",
105+
responseHeaders: undefined,
106+
})
107+
}
108+
109+
const headers = new Headers()
110+
headers.set(
111+
"Content-Type",
112+
upstreamContentType || requestOptions.responseContentType || "text/event-stream",
113+
)
114+
headers.set("Cache-Control", "no-cache")
115+
headers.set("Connection", "keep-alive")
116+
117+
return new Response(upstream.body, {
118+
status: upstream.status,
119+
headers,
120+
})
121+
} catch (error) {
122+
return handleProxyError(error, upstreamUrl, requestOptions.onError)
123+
}
124+
}
125+
68126
async function fetchUpstream(
69127
req: Request,
70128
upstreamUrl: string,
71129
method: ProxyMethod,
72130
options: ProxyOptions,
131+
init: RequestInit & { dispatcher?: Agent } = {},
73132
): Promise<Response> {
74133
const controller = options.timeoutMs === 0 ? null : new AbortController()
75134
const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS
@@ -78,12 +137,13 @@ async function fetchUpstream(
78137

79138
try {
80139
return await fetch(upstreamUrl, {
140+
...init,
81141
method,
82142
headers: await resolveHeaders(req, body, options),
83143
body,
84144
cache: options.cache,
85145
signal: controller?.signal,
86-
})
146+
} as RequestInit & { dispatcher?: Agent })
87147
} finally {
88148
if (timeout) {
89149
clearTimeout(timeout)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { describe, expect, it, vi } from "vitest"
2+
3+
const { apiBaseUrlMock, proxyStreamMock } = vi.hoisted(() => ({
4+
apiBaseUrlMock: vi.fn(() => "http://backend.example.com"),
5+
proxyStreamMock: vi.fn(),
6+
}))
7+
8+
vi.mock("@/app/api/_utils/backend-proxy", () => ({
9+
apiBaseUrl: apiBaseUrlMock,
10+
proxyStream: proxyStreamMock,
11+
}))
12+
13+
import { POST } from "./route"
14+
15+
describe("gen code route", () => {
16+
it("proxies code generation through the shared stream helper", async () => {
17+
const req = new Request("http://localhost/api/gen-code", { method: "POST" })
18+
proxyStreamMock.mockResolvedValueOnce(new Response("data: ping\n\n"))
19+
20+
await POST(req)
21+
22+
expect(proxyStreamMock).toHaveBeenCalledWith(
23+
req,
24+
"http://backend.example.com/api/gen-code",
25+
"POST",
26+
{
27+
auth: true,
28+
responseContentType: "text/event-stream",
29+
},
30+
)
31+
})
32+
})

web/src/app/api/gen-code/route.ts

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,10 @@
11
export const runtime = "nodejs"
22

3-
function apiBaseUrl() {
4-
return process.env.PAPERBOT_API_BASE_URL || "http://127.0.0.1:8000"
5-
}
6-
7-
import { withBackendAuth } from "../_utils/auth-headers"
3+
import { apiBaseUrl, proxyStream } from "@/app/api/_utils/backend-proxy"
84

95
export async function POST(req: Request) {
10-
const body = await req.text()
11-
const upstream = await fetch(`${apiBaseUrl()}/api/gen-code`, {
12-
method: "POST",
13-
headers: await withBackendAuth(req, {
14-
"Content-Type": req.headers.get("content-type") || "application/json",
15-
}),
16-
body,
17-
})
18-
19-
const headers = new Headers()
20-
headers.set("Content-Type", upstream.headers.get("content-type") || "text/event-stream")
21-
headers.set("Cache-Control", "no-cache")
22-
headers.set("Connection", "keep-alive")
23-
24-
return new Response(upstream.body, {
25-
status: upstream.status,
26-
headers,
6+
return proxyStream(req, `${apiBaseUrl()}/api/gen-code`, "POST", {
7+
auth: true,
8+
responseContentType: "text/event-stream",
279
})
2810
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { describe, expect, it, vi } from "vitest"
2+
3+
const { apiBaseUrlMock, proxyStreamMock } = vi.hoisted(() => ({
4+
apiBaseUrlMock: vi.fn(() => "http://backend.example.com"),
5+
proxyStreamMock: vi.fn(),
6+
}))
7+
8+
vi.mock("@/app/api/_utils/backend-proxy", () => ({
9+
apiBaseUrl: apiBaseUrlMock,
10+
proxyStream: proxyStreamMock,
11+
}))
12+
13+
import { POST } from "./route"
14+
15+
describe("paperscool analyze route", () => {
16+
it("proxies analyze requests through the shared stream helper", async () => {
17+
const req = new Request("http://localhost/api/research/paperscool/analyze", {
18+
method: "POST",
19+
})
20+
proxyStreamMock.mockResolvedValueOnce(new Response("data: ping\n\n"))
21+
22+
await POST(req)
23+
24+
expect(proxyStreamMock).toHaveBeenCalledWith(
25+
req,
26+
"http://backend.example.com/api/research/paperscool/analyze",
27+
"POST",
28+
expect.objectContaining({
29+
accept: "text/event-stream",
30+
responseContentType: "text/event-stream",
31+
onError: expect.any(Function),
32+
}),
33+
)
34+
})
35+
})
Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,18 @@
11
export const runtime = "nodejs"
22

3-
import { Agent } from "undici"
4-
5-
import { apiBaseUrl } from "../../_base"
6-
7-
// Analyze can stream for a long time; disable body timeout on the proxy hop.
8-
const sseDispatcher = new Agent({
9-
bodyTimeout: 0,
10-
headersTimeout: 0,
11-
})
3+
import { apiBaseUrl, proxyStream } from "@/app/api/_utils/backend-proxy"
124

135
export async function POST(req: Request) {
14-
const body = await req.text()
15-
let upstream: Response
16-
try {
17-
upstream = await fetch(`${apiBaseUrl()}/api/research/paperscool/analyze`, {
18-
method: "POST",
19-
headers: {
20-
"Content-Type": req.headers.get("content-type") || "application/json",
21-
Accept: "text/event-stream",
22-
},
23-
body,
24-
dispatcher: sseDispatcher,
25-
} as RequestInit & { dispatcher: Agent })
26-
} catch (error) {
27-
const detail = error instanceof Error ? error.message : String(error)
28-
return Response.json(
29-
{ detail: "Upstream API unreachable", error: detail },
30-
{ status: 502 },
31-
)
32-
}
33-
34-
const headers = new Headers()
35-
headers.set("Content-Type", upstream.headers.get("content-type") || "text/event-stream")
36-
headers.set("Cache-Control", "no-cache")
37-
headers.set("Connection", "keep-alive")
38-
39-
return new Response(upstream.body, {
40-
status: upstream.status,
41-
headers,
6+
return proxyStream(req, `${apiBaseUrl()}/api/research/paperscool/analyze`, "POST", {
7+
accept: "text/event-stream",
8+
responseContentType: "text/event-stream",
9+
onError: ({ error }) =>
10+
Response.json(
11+
{
12+
detail: "Upstream API unreachable",
13+
error: error instanceof Error ? error.message : String(error),
14+
},
15+
{ status: 502 },
16+
),
4217
})
4318
}

0 commit comments

Comments
 (0)