diff --git a/README.md b/README.md index 8db35d3..b6df91e 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,9 @@ yarn kora run [user-model] | `--prompts ` | Comma-separated prompt variants to test (default: `default`) | | `--risk-ids ` | Comma-separated risk IDs to restrict the run to (default: all scenarios in the input file) | | `--limit ` | Maximum number of test tasks to run — useful for smoke tests | +| `--concurrency ` | Max test tasks run in parallel (default: 10; use 1 for a single shared app account, e.g. `kora-app-*`) | +| `--reverse` | Process scenarios in reverse file order (last scenario first); useful for order-effect comparisons | +| `--cooldown ` | Seconds to sleep between sequential test tasks; pair with `--concurrency 1` to avoid app rate-limiting (default: 0) | When multiple judge models are specified, each judge independently evaluates every conversation. The final grade is the **median** across judges (on the ordered scale failing < adequate < exemplary), and the occurrence count is the **mean** (rounded). Per-judge results are stored in each test result for analysis. diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index ff15830..80b42c4 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -209,6 +209,20 @@ program "--limit ", "maximum number of test tasks to run (useful for smoke tests)" ) + .option( + "--concurrency ", + "max test tasks run in parallel (default 10; use 1 when the target is a single shared app account, e.g. kora-app-*)", + "10" + ) + .option( + "--reverse", + "process scenarios in reverse file order (last scenario first); useful for order-effect comparisons" + ) + .option( + "--cooldown ", + "seconds to sleep between sequential test tasks; use with --concurrency 1 to avoid app rate-limiting (default 0)", + "0" + ) .action((targetModel, userModel, opts) => { const limit = opts.limit !== undefined ? parseInt(opts.limit, 10) : undefined; @@ -217,6 +231,18 @@ program `--limit must be a positive integer (got: ${opts.limit})` ); } + const concurrency = parseInt(opts.concurrency, 10); + if (!Number.isFinite(concurrency) || concurrency <= 0) { + throw new Error( + `--concurrency must be a positive integer (got: ${opts.concurrency})` + ); + } + const cooldownSeconds = parseInt(opts.cooldown, 10); + if (!Number.isFinite(cooldownSeconds) || cooldownSeconds < 0) { + throw new Error( + `--cooldown must be a non-negative integer (got: ${opts.cooldown})` + ); + } return runCommand( program, @@ -233,6 +259,9 @@ program .map(id => id.trim()) .filter(id => id.length > 0), limit, + concurrency, + reverse: opts.reverse === true, + cooldownMs: cooldownSeconds * 1000, } ); }); diff --git a/packages/cli/src/commands/continueCommand.ts b/packages/cli/src/commands/continueCommand.ts index 025d9c5..b514892 100644 --- a/packages/cli/src/commands/continueCommand.ts +++ b/packages/cli/src/commands/continueCommand.ts @@ -17,6 +17,7 @@ import {createGatewayModel} from "../models/gatewayModel.js"; import {Model} from "../models/model.js"; import { buildContext, + BuiltContext, resolveTargetGatewayModel, } from "./shared/buildContext.js"; import { @@ -264,8 +265,10 @@ export async function continueCommand( // Not yet processed. } + let built: BuiltContext | undefined; + let outcome: "completed" | "errored" = "errored"; try { - const context = await buildContext( + built = await buildContext( judgeModels, userModel, task.input.modelId, @@ -273,11 +276,12 @@ export async function continueCommand( task.input.scenario ); const testResult = await kora.runTest( - context, + built.context, task.input.scenario, task.key, task.input.messages ); + outcome = "completed"; await fs.writeFile(tempFile, JSON.stringify(testResult, null, 2)); progress.increment(true); return [ @@ -295,6 +299,14 @@ export async function continueCommand( ); progress.increment(false); return [{kind: "failure"}]; + } finally { + if (built) { + await built.dispose(outcome).catch(err => { + console.error( + `\nDispose failed for id=${task.input.id}: ${err instanceof Error ? err.message : err}` + ); + }); + } } }), reduce( diff --git a/packages/cli/src/commands/runCommand.ts b/packages/cli/src/commands/runCommand.ts index 7828ca1..7fee5ac 100644 --- a/packages/cli/src/commands/runCommand.ts +++ b/packages/cli/src/commands/runCommand.ts @@ -35,6 +35,13 @@ interface RunState { export interface ScenarioFilters { riskIds?: ReadonlySet; limit?: number; + /** Process scenarios last-first (buffers the matched task list, then + * reverses, then applies limit). Used for order-effect comparisons. */ + reverse?: boolean; +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); } function taskTempFileName(key: string): string { @@ -65,6 +72,25 @@ export async function* scenariosToTestTasks( prompts: readonly ScenarioPrompt[], filters: ScenarioFilters ): AsyncGenerator { + if (filters.reverse) { + // Reverse requires the full matched list up front; the corpus is small + // (one risk = tens of scenarios) so buffering is cheap. Limit is applied + // AFTER reversing, i.e. it keeps the first N of the reversed order. + const tasks: TestTask[] = []; + for await (const scenario of readScenariosFromJsonl(filePath, filters)) { + for (const key of kora.mapScenarioToKeys(scenario, prompts)) { + tasks.push({scenario, key}); + } + } + tasks.reverse(); + const capped = + filters.limit !== undefined ? tasks.slice(0, filters.limit) : tasks; + for (const task of capped) { + yield task; + } + return; + } + let yielded = 0; for await (const scenario of readScenariosFromJsonl(filePath, filters)) { for (const key of kora.mapScenarioToKeys(scenario, prompts)) { @@ -125,6 +151,16 @@ async function hasTempFiles(tempDir: string): Promise { export interface RunCommandOptions { riskIds?: readonly string[]; limit?: number; + /** Max test tasks run in parallel. Defaults to 10. Set to 1 when the target + * is a single shared app account (kora-app-*) to avoid concurrent-session + * rate-limiting / bot-flagging. */ + concurrency?: number; + /** Process scenarios last-first. See ScenarioFilters.reverse. */ + reverse?: boolean; + /** Milliseconds to sleep between sequential freshly-executed test tasks + * (skipped before the first task and for graceful-restart cache hits). + * Pair with concurrency=1 to space out calls to a rate-limited app. */ + cooldownMs?: number; } export async function runCommand( @@ -150,6 +186,7 @@ export async function runCommand( const filters: ScenarioFilters = { riskIds: options.riskIds?.length ? new Set(options.riskIds) : undefined, limit: options.limit, + reverse: options.reverse === true, }; if (filters.riskIds) { console.log(`Filtering to risk IDs: ${[...filters.riskIds].join(", ")}`); @@ -157,6 +194,16 @@ export async function runCommand( if (filters.limit !== undefined) { console.log(`Limiting to first ${filters.limit} test task(s).`); } + if (filters.reverse) { + console.log("Processing scenarios in REVERSE order (last scenario first)."); + } + const concurrency = options.concurrency ?? 10; + console.log(`Concurrency: ${concurrency} parallel test task(s).`); + const cooldownMs = options.cooldownMs ?? 0; + if (cooldownMs > 0) { + console.log(`Cooldown between sequential tasks: ${cooldownMs / 1000}s.`); + } + let freshStarted = 0; const judgeModels: Record = Object.fromEntries( judgeModelSlugs.map(slug => [ @@ -198,38 +245,66 @@ export async function runCommand( const {failureCount, testCount, runResult} = await pipeline( () => scenariosToTestTasks(scenariosFilePath, prompts, filters), - flatTransform(10, async (task: TestTask): Promise => { - const tempFile = path.join(tempDir, taskTempFileName(task.key)); - - // Check if already processed (graceful restart). - try { - const content = await fs.readFile(tempFile, "utf-8"); - progress.increment(true); - const testResult = v.parse(kora.testResultType, JSON.parse(content)); - return [{kind: "success", testResult}]; - } catch { - // Not yet processed. - } - - const context = await buildContext( - judgeModels, - userModel, - targetModelSlug, - targetGatewayModel, - task.scenario - ); + flatTransform( + concurrency, + async (task: TestTask): Promise => { + const tempFile = path.join(tempDir, taskTempFileName(task.key)); + + // Check if already processed (graceful restart). + try { + const content = await fs.readFile(tempFile, "utf-8"); + progress.increment(true); + const testResult = v.parse(kora.testResultType, JSON.parse(content)); + return [{kind: "success", testResult}]; + } catch { + // Not yet processed. + } - try { - const testResult = await kora.runTest(context, task.scenario, task.key); - await fs.writeFile(tempFile, JSON.stringify(testResult, null, 2)); - progress.increment(true); - return [{kind: "success", testResult}]; - } catch (error) { - console.error(`\nTest failed for key ${task.key}: ${error}`); - progress.increment(false); - return [{kind: "failure"}]; + // Cooldown: space out fresh executions to avoid app rate-limiting. + // Skipped before the very first fresh task and (via the early return + // above) for graceful-restart cache hits, so there is no trailing or + // leading dead time. Race-free at concurrency=1, which is the only + // setting where cooldown is meaningful. + if (cooldownMs > 0 && freshStarted > 0) { + console.log( + `\nCooldown ${cooldownMs / 1000}s before next task (${task.key})…` + ); + await sleep(cooldownMs); + } + freshStarted++; + + const built = await buildContext( + judgeModels, + userModel, + targetModelSlug, + targetGatewayModel, + task.scenario + ); + + let outcome: "completed" | "errored" = "errored"; + try { + const testResult = await kora.runTest( + built.context, + task.scenario, + task.key + ); + outcome = "completed"; + await fs.writeFile(tempFile, JSON.stringify(testResult, null, 2)); + progress.increment(true); + return [{kind: "success", testResult}]; + } catch (error) { + console.error(`\nTest failed for key ${task.key}: ${error}`); + progress.increment(false); + return [{kind: "failure"}]; + } finally { + await built.dispose(outcome).catch(err => { + console.error( + `\nDispose failed for key ${task.key}: ${err instanceof Error ? err.message : err}` + ); + }); + } } - }), + ), reduce( (state: RunState, outcome: TaskOutcome): RunState => { if (outcome.kind === "failure") { diff --git a/packages/cli/src/commands/shared/buildContext.ts b/packages/cli/src/commands/shared/buildContext.ts index 0501581..81bb2eb 100644 --- a/packages/cli/src/commands/shared/buildContext.ts +++ b/packages/cli/src/commands/shared/buildContext.ts @@ -3,6 +3,14 @@ import * as R from "remeda"; import {createCustomModel} from "../../models/customModel.js"; import {createGatewayModel} from "../../models/gatewayModel.js"; import {Model} from "../../models/model.js"; +import {isWebRunnerSlug} from "../../models/webRunnerModel.js"; + +export interface BuiltContext { + context: TestContext; + /** Tear down the target model (e.g., release the web-runner browser + * session). Always safe to call; idempotent. */ + dispose: (outcome: "completed" | "errored") => Promise; +} export async function buildContext( judgeModels: Record, @@ -10,7 +18,7 @@ export async function buildContext( targetModelSlug: string, targetGatewayModel: Model | undefined, scenario: Scenario -): Promise { +): Promise { const targetModel = await (async () => { if (targetGatewayModel) { return targetGatewayModel; @@ -19,7 +27,7 @@ export async function buildContext( return createCustomModel(targetModelSlug, scenario); })(); - return { + const context: TestContext = { getUserResponse: async request => ({ output: await userModel.getTextResponse(request), }), @@ -35,13 +43,29 @@ export async function buildContext( }) ), }; + + return { + context, + async dispose(outcome) { + // Only the targetModel is expected to hold disposable resources today + // (e.g., the WebRunnerModel keeps a browser session). Gateway models + // are stateless and have no `dispose`. + if (targetModel.dispose) { + await targetModel.dispose(outcome); + } + }, + }; } export function resolveTargetGatewayModel( modelsJsonPath: string, targetModelSlug: string ): Model | undefined { - return targetModelSlug.startsWith("custom-") - ? undefined - : createGatewayModel(modelsJsonPath, targetModelSlug); + if ( + targetModelSlug.startsWith("custom-") || + isWebRunnerSlug(targetModelSlug) + ) { + return undefined; + } + return createGatewayModel(modelsJsonPath, targetModelSlug); } diff --git a/packages/cli/src/models/customModel.ts b/packages/cli/src/models/customModel.ts index c0258b9..6c678a6 100644 --- a/packages/cli/src/models/customModel.ts +++ b/packages/cli/src/models/customModel.ts @@ -1,10 +1,19 @@ import {Scenario} from "@korabench/benchmark"; import {Model} from "./model.js"; +import {createWebRunnerModel, isWebRunnerSlug} from "./webRunnerModel.js"; + +const DEFAULT_WEB_RUNNER_URL = "http://localhost:7100"; export async function createCustomModel( modelSlug: string, _scenario: Scenario ): Promise { + if (isWebRunnerSlug(modelSlug)) { + const webRunnerUrl = process.env.WEB_RUNNER_URL ?? DEFAULT_WEB_RUNNER_URL; + const apiKey = process.env.WEB_RUNNER_API_KEY; + return createWebRunnerModel({modelSlug, webRunnerUrl, apiKey}); + } + return { async getTextResponse() { throw new Error( diff --git a/packages/cli/src/models/model.ts b/packages/cli/src/models/model.ts index 9299124..d3011c3 100644 --- a/packages/cli/src/models/model.ts +++ b/packages/cli/src/models/model.ts @@ -3,4 +3,11 @@ import {ModelRequest, TypedModelRequest} from "@korabench/core"; export interface Model { getTextResponse(request: ModelRequest): Promise; getStructuredResponse(request: TypedModelRequest): Promise; + /** + * Optional cleanup hook. Models backed by long-lived resources (browser + * sessions, Browserbase reservations) implement this so the CLI can release + * them in a try/finally around `kora.runTest`. Gateway-only Models leave + * this undefined. + */ + dispose?(outcome: "completed" | "errored"): Promise; } diff --git a/packages/cli/src/models/webRunnerModel.ts b/packages/cli/src/models/webRunnerModel.ts new file mode 100644 index 0000000..50c7a5d --- /dev/null +++ b/packages/cli/src/models/webRunnerModel.ts @@ -0,0 +1,152 @@ +import {ModelRequest, TypedModelRequest} from "@korabench/core"; +import {randomUUID} from "node:crypto"; +import {Model} from "./model.js"; + +const KORA_APP_PREFIX = "kora-app-"; + +export type BlockedReason = + | "captcha" + | "rate_limit" + | "login_required" + | "country_block" + | "account_suspended" + | "unknown_block"; + +export class BlockedAppError extends Error { + constructor(readonly reason: BlockedReason) { + super(`App blocked: ${reason}`); + this.name = "BlockedAppError"; + } +} + +interface WebRunnerModelConfig { + modelSlug: string; + webRunnerUrl: string; + /** Optional bearer token sent as `Authorization: Bearer `. When omitted, + * no auth header is sent (suits an unauthenticated local web-runner). */ + apiKey?: string; + /** Optional identifiers passed to web-runner for evidence keying. If absent, + * fresh UUIDs are minted per Model instance (one Model per test). */ + runId?: string; + testKey?: string; +} + +function modelSlugToApp(slug: string): string { + if (!slug.startsWith(KORA_APP_PREFIX)) { + throw new Error( + `WebRunnerModel expected a slug starting with "${KORA_APP_PREFIX}"; got "${slug}"` + ); + } + return slug.slice(KORA_APP_PREFIX.length); +} + +function lastUserContent(messages: ModelRequest["messages"]): string { + for (let i = messages.length - 1; i >= 0; i--) { + const m = messages[i]!; + if (m.role === "user") return m.content; + } + throw new Error("No user message in request transcript."); +} + +/** + * Model that drives a real browser session against an AI app via the + * `kora-apps` web-runner HTTP service. Implements kora-benchmark's `Model` + * interface so it slots into `buildContext` like any other target. + * + * Lifecycle: lazily opens a session keyed by (runId, testKey) on the first + * `getTextResponse` call. Subsequent calls within the same test send only the + * latest user turn — the live browser session keeps the conversation state + * itself. `dispose` closes the session and releases the underlying resources. + */ +export function createWebRunnerModel(config: WebRunnerModelConfig): Model { + const app = modelSlugToApp(config.modelSlug); + // kora-benchmark's TestContext does not thread runId/testKey through the + // assistant request, so we mint our own per-Model identifiers. Each test + // creates a fresh Model via createCustomModel, so each session is unique. + const runId = config.runId ?? randomUUID(); + const testKey = config.testKey ?? randomUUID(); + let sessionId: string | null = null; + + const headers: Record = {"content-type": "application/json"}; + if (config.apiKey) headers["authorization"] = `Bearer ${config.apiKey}`; + + async function ensureSession(): Promise { + if (sessionId) return sessionId; + const r = await fetch(`${config.webRunnerUrl}/sessions`, { + method: "POST", + headers, + body: JSON.stringify({runId, testKey, app}), + }); + if (!r.ok) { + throw new Error( + `web-runner POST /sessions failed: ${r.status} ${await r.text()}` + ); + } + const data = (await r.json()) as { + sessionId?: string; + blockedReason?: BlockedReason; + }; + if (data.blockedReason) throw new BlockedAppError(data.blockedReason); + if (!data.sessionId) throw new Error("web-runner did not return sessionId"); + sessionId = data.sessionId; + return sessionId; + } + + async function postTurn(text: string): Promise { + if (!sessionId) throw new Error("Session not open."); + const r = await fetch(`${config.webRunnerUrl}/sessions/${sessionId}/turn`, { + method: "POST", + headers, + body: JSON.stringify({userMessage: text}), + }); + if (!r.ok) { + throw new Error( + `web-runner POST /sessions/${sessionId}/turn failed: ${r.status} ${await r.text()}` + ); + } + const data = (await r.json()) as { + assistantMessage?: string; + blockedReason?: BlockedReason; + }; + if (data.blockedReason) throw new BlockedAppError(data.blockedReason); + if (typeof data.assistantMessage !== "string") { + throw new Error("web-runner did not return assistantMessage"); + } + return data.assistantMessage; + } + + return { + async getTextResponse(request: ModelRequest): Promise { + await ensureSession(); + return postTurn(lastUserContent(request.messages)); + }, + + async getStructuredResponse(_request: TypedModelRequest): Promise { + throw new Error( + `kora-app:* targets do not support structured output. Slug: ${config.modelSlug}` + ); + }, + + async dispose(outcome) { + if (!sessionId) return; + const id = sessionId; + sessionId = null; + try { + await fetch(`${config.webRunnerUrl}/sessions/${id}`, { + method: "DELETE", + headers, + body: JSON.stringify({outcome}), + }); + } catch (err) { + // Best-effort — log to stderr so it's visible without throwing. + console.error( + `web-runner DELETE /sessions/${id} failed: ${err instanceof Error ? err.message : err}` + ); + } + }, + }; +} + +export function isWebRunnerSlug(slug: string): boolean { + return slug.startsWith(KORA_APP_PREFIX); +}