Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion crates/lingua/src/providers/openai/responses_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,11 @@ impl ProviderAdapter for ResponsesAdapter {
// Responses API streaming has two formats:
// 1. type field starting with "response." at top level
// 2. object="response.delta" at top level with delta.type nested
// 3. explicit keepalive events
payload
.get("type")
.and_then(Value::as_str)
.is_some_and(|t| t.starts_with("response."))
.is_some_and(|t| t.starts_with("response.") || t == "keepalive")
|| payload
.get("object")
.and_then(Value::as_str)
Expand Down Expand Up @@ -542,6 +543,8 @@ impl ProviderAdapter for ResponsesAdapter {
let delta_obj = payload.get("delta");

match event_type.as_str() {
"keepalive" => Ok(Some(UniversalStreamChunk::keep_alive())),

"response.output_text.delta" => {
// Text delta - extract from delta field
// Standard format: payload.delta is the text string
Expand Down Expand Up @@ -1293,4 +1296,31 @@ mod tests {
other => panic!("expected web_search tool, got {:?}", other),
}
}

#[test]
fn test_responses_detect_stream_response_keepalive() {
let adapter = ResponsesAdapter;
let payload = json!({
"type": "keepalive",
"sequence_number": 3
});

assert!(adapter.detect_stream_response(&payload));
}

#[test]
fn test_responses_stream_to_universal_keepalive() {
let adapter = ResponsesAdapter;
let payload = json!({
"type": "keepalive",
"sequence_number": 3
});

let chunk = adapter
.stream_to_universal(payload)
.expect("stream_to_universal should succeed")
.expect("keepalive should emit a chunk");

assert!(chunk.is_keep_alive());
}
}
34 changes: 5 additions & 29 deletions payloads/scripts/providers/google.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
GoogleGenerateContentRequest,
GOOGLE_MODEL,
} from "../../cases";
import { parseGoogleSseStream } from "../transforms/helpers";

const GOOGLE_API_BASE = "https://generativelanguage.googleapis.com/v1beta";

Expand Down Expand Up @@ -99,33 +100,6 @@ async function googleRequest(
return response;
}

/**
* Parse SSE stream into array of response chunks
*/
async function parseSSEStream(
response: Response
): Promise<GenerateContentResponse[]> {
const chunks: GenerateContentResponse[] = [];
const text = await response.text();

// SSE format: "data: {...}\n\n"
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const jsonStr = line.slice(6); // Remove "data: " prefix
if (jsonStr.trim()) {
try {
chunks.push(JSON.parse(jsonStr));
} catch {
// Skip malformed JSON
}
}
}
}

return chunks;
}

type ParallelGoogleResult =
| {
type: "response";
Expand Down Expand Up @@ -189,7 +163,8 @@ export async function executeGoogle(
const text = await response.text();
throw new Error(`ApiError: ${text}`);
}
const chunks = await parseSSEStream(response);
const chunks =
await parseGoogleSseStream<GenerateContentResponse>(response);
return { type: "streamingResponse" as const, data: chunks };
})()
);
Expand Down Expand Up @@ -300,7 +275,8 @@ export async function executeGoogle(
const text = await response.text();
throw new Error(`ApiError: ${text}`);
}
const chunks = await parseSSEStream(response);
const chunks =
await parseGoogleSseStream<GenerateContentResponse>(response);
return {
type: "followupStreamingResponse" as const,
data: chunks,
Expand Down
3,370 changes: 2,787 additions & 583 deletions payloads/scripts/transforms/__snapshots__/transforms-streaming.test.ts.snap

Large diffs are not rendered by default.

29 changes: 20 additions & 9 deletions payloads/scripts/transforms/capture-transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
TRANSFORM_PAIRS,
STREAMING_PAIRS,
TRANSFORMS_DIR,
parseGoogleSseStream,
RESPONSE_VALIDATORS,
TARGET_MODELS,
transformAndValidateRequest,
Expand Down Expand Up @@ -51,7 +52,8 @@ function getOpenAI(): OpenAI {
}

async function callGoogleProvider(
request: Record<string, unknown>
request: Record<string, unknown>,
options?: CallProviderOptions
): Promise<unknown> {
const apiKey = process.env.GOOGLE_API_KEY;
if (!apiKey) {
Expand All @@ -61,8 +63,11 @@ async function callGoogleProvider(
const rawModel = request.model ?? GOOGLE_MODEL;
const model = typeof rawModel === "string" ? rawModel : String(rawModel);
const { model: _model, ...body } = request;
const stream = options?.stream === true;

const endpoint = `${GOOGLE_API_BASE}/models/${model}:generateContent`;
const endpoint = stream
? `${GOOGLE_API_BASE}/models/${model}:streamGenerateContent?alt=sse`
: `${GOOGLE_API_BASE}/models/${model}:generateContent`;
const response = await fetch(endpoint, {
method: "POST",
headers: {
Expand All @@ -77,7 +82,7 @@ async function callGoogleProvider(
throw new Error(`Google API error (${response.status}): ${text}`);
}

return response.json();
return stream ? parseGoogleSseStream(response) : response.json();
}

/* eslint-disable @typescript-eslint/consistent-type-assertions -- SDK methods require specific param types, validation done by transformAndValidateRequest */
Expand Down Expand Up @@ -123,12 +128,7 @@ async function callProvider(
request as unknown as OpenAI.Responses.ResponseCreateParams
);
case "google":
if (stream) {
throw new Error(
"Streaming capture is not implemented for google target in capture-transforms.ts"
);
}
return callGoogleProvider(request);
return callGoogleProvider(request, { stream });
}
}
/* eslint-enable @typescript-eslint/consistent-type-assertions */
Expand All @@ -145,6 +145,10 @@ function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
async function collectStreamChunks(
streamResponse: unknown
): Promise<unknown[]> {
if (Array.isArray(streamResponse)) {
return streamResponse;
}

if (!isAsyncIterable(streamResponse)) {
throw new Error(
"Expected streaming provider response to be async iterable"
Expand Down Expand Up @@ -313,6 +317,13 @@ export async function captureTransforms(
);
captured++;
} catch (e) {
const errorObj = e && typeof e === "object" ? e : {};
const errorData = {
error: e instanceof Error ? e.message : String(e),
name: e instanceof Error ? e.name : undefined,
...("response" in errorObj ? { response: errorObj.response } : {}),
};
writeFileSync(streamingPath, JSON.stringify(errorData, null, 2));
console.error(
`❌ ${streamingPair.source} → ${streamingPair.target} / ${caseName} (streaming): ${e}`
);
Expand Down
54 changes: 54 additions & 0 deletions payloads/scripts/transforms/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,42 @@ export function getGenAiStreamGenerateContentPath(model: string): string {
return `/v1beta/models/${model}:streamGenerateContent?alt=sse`;
}

/**
* Parse Google `alt=sse` transport into provider event objects.
*
* This lives in payload-layer helpers rather than Rust/WASM because the payload
* capture scripts are making raw HTTP requests and need to peel provider
* transport framing before they can store fixtures as JSON event objects.
* Lingua's Rust stream code transforms provider event objects; it does not own
* raw provider HTTP response parsing in these scripts.
*/
export async function parseGoogleSseStream<T = unknown>(
response: Response
): Promise<T[]> {
const chunks: T[] = [];
const text = await response.text();

for (const line of text.split("\n")) {
if (!line.startsWith("data: ")) {
continue;
}

const json = line.slice(6).trim();
if (!json) {
continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any logging or error capturing here?

Copy link
Copy Markdown
Contributor Author

@knjiang Ken Jiang (knjiang) Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's fine it's onyl used in snapshotting json, which we should just spot check that there isn't any errors.

}

try {
const parsed: T = JSON.parse(json);
chunks.push(parsed);
} catch {
// Ignore malformed SSE lines and keep capturing valid chunks.
}
}

return chunks;
}

export function getTransformableCases(
pair: TransformPair,
filter?: string
Expand All @@ -271,6 +307,24 @@ export const STREAMING_PAIRS: TransformPair[] = [
wasmSource: "OpenAI",
wasmTarget: "Anthropic",
},
{
source: "chat-completions",
target: "responses",
wasmSource: "OpenAI",
wasmTarget: "Responses",
},
{
source: "responses",
target: "google",
wasmSource: "Responses",
wasmTarget: "Google",
},
{
source: "responses",
target: "anthropic",
wasmSource: "Responses",
wasmTarget: "Anthropic",
},
{
source: "anthropic",
target: "chat-completions",
Expand Down
131 changes: 131 additions & 0 deletions payloads/scripts/transforms/transforms-responses.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { describe, test, expect } from "vitest";
import OpenAI from "openai";
import { allTestCases, getCaseForProvider } from "../../cases";
import {
STREAMING_PAIRS,
TARGET_MODELS,
TRANSFORM_PAIRS,
getFixtureSkipReason,
getResponsePath,
getStreamingResponsePath,
getStreamingTransformableCases,
getTransformableCases,
} from "./helpers";
import {
registerSkippedFixtureTest,
useTransformTestServer,
} from "./vitest-helpers";

const TIMEOUT = 30000;
const getServer = useTransformTestServer();

function getResponsesModel(caseName: string): string {
const responsesCase = getCaseForProvider(allTestCases, caseName, "responses");
return responsesCase &&
typeof responsesCase === "object" &&
"model" in responsesCase &&
typeof responsesCase.model === "string"
? responsesCase.model
: TARGET_MODELS.responses;
}

// These tests exercise the source=responses path: captured provider responses
// are transformed back into OpenAI Responses format, then parsed by the OpenAI
// SDK to verify the transformed payload still satisfies its schema.
for (const pair of TRANSFORM_PAIRS.filter((p) => p.source === "responses")) {
const pairLabel = `${pair.target} → ${pair.source}`;
describe(`responses SDK: ${pairLabel}`, () => {
for (const caseName of getTransformableCases(pair)) {
const path = getResponsePath(pair.source, pair.target, caseName);
const skipReason = getFixtureSkipReason(path);

if (skipReason) {
registerSkippedFixtureTest(pairLabel, caseName, skipReason);
continue;
}

test(
caseName,
async () => {
const model = getResponsesModel(caseName);

// 1. Serve the transformed fixture from the responses endpoint so
// the SDK receives Lingua's wasm output.
getServer().useJsonFixture({
path: "/v1/responses",
targetFormat: pair.target,
wasmSource: pair.wasmSource,
responsePath: path,
});

const client = new OpenAI({
apiKey: "test-key",
baseURL: getServer().openaiBaseUrl,
});
await expect(
// 2. This request only provides a valid SDK entrypoint invocation.
// The transformed fixture selected by pair + caseName is the
// actual subject under test.
client.responses.create({
model,
input: "test",
stream: false,
})
).resolves.toBeDefined();
},
TIMEOUT
);
}
});
}

for (const pair of STREAMING_PAIRS.filter((p) => p.source === "responses")) {
const pairLabel = `${pair.target} → ${pair.source}`;
describe(`responses SDK streaming: ${pairLabel}`, () => {
for (const caseName of getStreamingTransformableCases(pair)) {
const path = getStreamingResponsePath(pair.source, pair.target, caseName);
const skipReason = getFixtureSkipReason(path, { streaming: true });

if (skipReason) {
registerSkippedFixtureTest(pairLabel, caseName, skipReason);
continue;
}

test(
caseName,
async () => {
const model = getResponsesModel(caseName);

// 1. Serve the transformed streaming fixture from the responses
// endpoint so the SDK parses Lingua's stream output.
getServer().useStreamingFixture({
path: "/v1/responses",
targetFormat: pair.target,
wasmSource: pair.wasmSource,
responsePath: path,
});

const client = new OpenAI({
apiKey: "test-key",
baseURL: getServer().openaiBaseUrl,
});
// 2. This request only opens a valid SDK streaming entrypoint. The
// transformed stream fixture selected by pair + caseName is the
// actual subject under test.
const stream = await client.responses.create({
model,
input: "test",
stream: true,
});

const collected: unknown[] = [];
for await (const event of stream) {
collected.push(event);
}
expect(collected.length).toBeGreaterThan(0);
},
TIMEOUT
);
}
});
}
Loading
Loading