diff --git a/.changeset/quiet-walls-share.md b/.changeset/quiet-walls-share.md new file mode 100644 index 0000000..7d5598c --- /dev/null +++ b/.changeset/quiet-walls-share.md @@ -0,0 +1,5 @@ +--- +'incur': patch +--- + +Fixed HTTP and MCP command input validation to return standard validation field errors for object-shaped inputs. diff --git a/.changeset/sour-dingos-shine.md b/.changeset/sour-dingos-shine.md new file mode 100644 index 0000000..9fefa90 --- /dev/null +++ b/.changeset/sour-dingos-shine.md @@ -0,0 +1,7 @@ +--- +'incur': patch +--- + +Fixed streaming command terminal records so HTTP NDJSON responses preserve returned `c.ok()` CTA metadata, represent returned or yielded `c.error()` values as terminal errors, include terminal duration metadata, and unwind generators on response cancellation. + +Also preserves `IncurError.retryable` metadata in streaming machine-format errors. diff --git a/src/Cli.test.ts b/src/Cli.test.ts index bd55fc6..81fe966 100644 --- a/src/Cli.test.ts +++ b/src/Cli.test.ts @@ -3,6 +3,8 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises' import { homedir, tmpdir } from 'node:os' import { join } from 'node:path' +import * as Command from './internal/command.js' + const originalIsTTY = process.stdout.isTTY beforeAll(() => { ;(process.stdout as any).isTTY = false @@ -3654,6 +3656,57 @@ test('streaming: generator throws in buffered mode', async () => { expect(output).toContain('generator exploded') }) +test('streaming: thrown IncurError preserves retryable metadata in machine formats', async () => { + const cli = Cli.create('test') + cli.command('limited', { + async *run() { + yield { step: 1 } + throw new Errors.IncurError({ + code: 'RATE_LIMITED', + message: 'too fast', + retryable: true, + }) + }, + }) + + const jsonl = await serve(cli, ['limited', '--format', 'jsonl']) + const jsonlLines = jsonl.output + .trim() + .split('\n') + .map((line) => JSON.parse(line)) + expect(jsonl.exitCode).toBe(1) + expect(jsonlLines[1]).toMatchInlineSnapshot(` + { + "error": { + "code": "RATE_LIMITED", + "message": "too fast", + "retryable": true, + }, + "ok": false, + "type": "error", + } + `) + + const json = await serve(cli, ['limited', '--full-output', '--format', 'json']) + const body = JSON.parse(json.output) + body.meta.duration = '' + expect(json.exitCode).toBe(1) + expect(body).toMatchInlineSnapshot(` + { + "error": { + "code": "RATE_LIMITED", + "message": "too fast", + "retryable": true, + }, + "meta": { + "command": "limited", + "duration": "", + }, + "ok": false, + } + `) +}) + test('streaming: generator returns error in buffered mode', async () => { const cli = Cli.create('test') cli.command('fail', { @@ -4051,6 +4104,74 @@ describe('--filter-output', () => { }) }) +describe('Command.execute', () => { + test.each([ + { + name: 'split', + command: { options: z.object({ name: z.string() }), run: () => ({ ok: true }) }, + inputOptions: { name: 123 }, + path: 'name', + parseMode: 'split' as const, + }, + { + name: 'flat', + command: { args: z.object({ id: z.string() }), run: () => ({ ok: true }) }, + inputOptions: { id: 123 }, + path: 'id', + parseMode: 'flat' as const, + }, + ])('$name mode returns validation fieldErrors for invalid command input', async (c) => { + const result = await Command.execute(c.command, { + agent: true, + argv: [], + format: 'json', + formatExplicit: false, + inputOptions: c.inputOptions, + name: 'test', + parseMode: c.parseMode, + path: 'users', + version: undefined, + }) + + expect(result).toMatchObject({ + ok: false, + error: { + code: 'VALIDATION_ERROR', + fieldErrors: [ + { + code: 'invalid_type', + missing: false, + path: c.path, + }, + ], + }, + }) + }) + + test('does not normalize handler-thrown Zod errors as command input', async () => { + const result = await Command.execute( + { + run() { + z.object({ name: z.string() }).parse({ name: 123 }) + }, + }, + { + agent: true, + argv: [], + format: 'json', + formatExplicit: false, + inputOptions: {}, + name: 'test', + path: 'users', + version: undefined, + }, + ) + + expect(result).toMatchObject({ ok: false, error: { code: 'UNKNOWN' } }) + expect(result).not.toHaveProperty('error.fieldErrors') + }) +}) + async function fetchJson(cli: Cli.Cli, req: Request) { const res = await cli.fetch(req) const body = await res.json() @@ -4058,6 +4179,20 @@ async function fetchJson(cli: Cli.Cli, req: Request) { return { status: res.status, body } } +async function fetchNdjson(cli: Cli.Cli, req: Request) { + const res = await cli.fetch(req) + const lines = (await res.text()) + .trim() + .split('\n') + .map((line) => JSON.parse(line)) + for (const line of lines) + if (line.meta?.duration) { + expect(line.meta.duration).toMatch(/^\d+ms$/) + line.meta.duration = '' + } + return { status: res.status, contentType: res.headers.get('content-type'), lines } +} + describe('fetch', () => { test('GET /health → 200', async () => { const cli = Cli.create('test') @@ -4292,36 +4427,356 @@ describe('fetch', () => { return { done: true } }, }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "data": { + "progress": 2, + }, + "type": "chunk", + }, + { + "meta": { + "command": "stream", + "duration": "", + }, + "ok": true, + "type": "done", + }, + ], + "status": 200, + } + `) + }) + + test('streaming response preserves returned ok CTA through middleware', async () => { + const cli = Cli.create('test') + cli.use(async (_c, next) => { + await next() + }) + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.ok({ ignored: true }, { cta: { commands: ['next'], description: 'Next steps:' } }) + }, + }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "meta": { + "command": "stream", + "cta": { + "commands": [ + { + "command": "test next", + }, + ], + "description": "Next steps:", + }, + "duration": "", + }, + "ok": true, + "type": "done", + }, + ], + "status": 200, + } + `) + }) + + test('streaming response handles terminal-only sentinel returns through middleware', async () => { + const order: string[] = [] + const cli = Cli.create('test') + cli.use(async (c, next) => { + order.push(`before:${c.command}`) + await next() + order.push(`after:${c.command}`) + }) + const sub = Cli.create('ops') + sub.command('ok', { + // oxlint-disable-next-line require-yield -- exercises a stream that returns before yielding. + async *run(c) { + return c.ok( + { ignored: true }, + { cta: { commands: [{ command: 'next', description: 'Continue' }] } }, + ) + }, + }) + sub.command('fail', { + // oxlint-disable-next-line require-yield -- exercises a stream that returns before yielding. + async *run(c) { + return c.error({ + code: 'EMPTY_FAIL', + cta: { commands: ['retry'], description: 'Recover with:' }, + message: 'failed before chunks', + retryable: true, + }) + }, + }) + cli.command(sub) + + const ok = await fetchNdjson(cli, new Request('http://localhost/ops/ok')) + expect(ok).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "meta": { + "command": "ops ok", + "cta": { + "commands": [ + { + "command": "test next", + "description": "Continue", + }, + ], + "description": "Suggested command:", + }, + "duration": "", + }, + "ok": true, + "type": "done", + }, + ], + "status": 200, + } + `) + expect(ok.lines[0]).not.toHaveProperty('data') + + expect(await fetchNdjson(cli, new Request('http://localhost/ops/fail'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "error": { + "code": "EMPTY_FAIL", + "message": "failed before chunks", + "retryable": true, + }, + "meta": { + "command": "ops fail", + "cta": { + "commands": [ + { + "command": "test retry", + }, + ], + "description": "Recover with:", + }, + "duration": "", + }, + "ok": false, + "type": "error", + }, + ], + "status": 200, + } + `) + expect(order).toEqual(['before:ops ok', 'after:ops ok', 'before:ops fail', 'after:ops fail']) + }) + + test('streaming response represents returned error as terminal error', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.error({ code: 'STREAM_FAIL', message: 'failed late', retryable: true }) + }, + }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "error": { + "code": "STREAM_FAIL", + "message": "failed late", + "retryable": true, + }, + "meta": { + "command": "stream", + "duration": "", + }, + "ok": false, + "type": "error", + }, + ], + "status": 200, + } + `) + }) + + test('streaming response represents yielded error as terminal error', async () => { + let closed = false + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + try { + yield { progress: 1 } + yield c.error({ code: 'STREAM_FAIL', message: 'failed now' }) + yield { progress: 2 } + } finally { + closed = true + } + }, + }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "error": { + "code": "STREAM_FAIL", + "message": "failed now", + }, + "meta": { + "command": "stream", + "duration": "", + }, + "ok": false, + "type": "error", + }, + ], + "status": 200, + } + `) + expect(closed).toBe(true) + }) + + test('streaming response cancellation unwinds generator and middleware', async () => { + let resolveAfter = () => {} + const after = new Promise((resolve) => { + resolveAfter = resolve + }) + const order: string[] = [] + const cli = Cli.create('test') + cli.use(async (_c, next) => { + order.push('mw:before') + await next() + order.push('mw:after') + resolveAfter() + }) + cli.command('stream', { + async *run() { + try { + order.push('stream:yield') + yield { progress: 1 } + while (true) yield { progress: 2 } + } finally { + order.push('stream:finally') + } + }, + }) const res = await cli.fetch(new Request('http://localhost/stream')) - expect(res.status).toBe(200) - expect(res.headers.get('content-type')).toBe('application/x-ndjson') - const text = await res.text() - const lines = text - .trim() - .split('\n') - .map((l) => JSON.parse(l)) - expect(lines).toMatchInlineSnapshot(` - [ - { - "data": { - "progress": 1, + const reader = res.body!.getReader() + await reader.read() + await reader.cancel() + await after + expect(order).toEqual(['mw:before', 'stream:yield', 'stream:finally', 'mw:after']) + }) + + test('streaming response thrown error includes terminal duration metadata', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run() { + yield { progress: 1 } + throw new Error('boom') + }, + }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", }, - "type": "chunk", - }, - { - "data": { - "progress": 2, + { + "error": { + "code": "UNKNOWN", + "message": "boom", + }, + "meta": { + "command": "stream", + "duration": "", + }, + "ok": false, + "type": "error", }, - "type": "chunk", - }, - { - "meta": { - "command": "stream", + ], + "status": 200, + } + `) + }) + + test('streaming response thrown IncurError preserves code and retryable metadata', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run() { + yield { progress: 1 } + throw new Errors.IncurError({ + code: 'RATE_LIMITED', + message: 'too fast', + retryable: true, + }) + }, + }) + expect(await fetchNdjson(cli, new Request('http://localhost/stream'))).toMatchInlineSnapshot(` + { + "contentType": "application/x-ndjson", + "lines": [ + { + "data": { + "progress": 1, + }, + "type": "chunk", }, - "ok": true, - "type": "done", - }, - ] + { + "error": { + "code": "RATE_LIMITED", + "message": "too fast", + "retryable": true, + }, + "meta": { + "command": "stream", + "duration": "", + }, + "ok": false, + "type": "error", + }, + ], + "status": 200, + } `) }) diff --git a/src/Cli.ts b/src/Cli.ts index d8efef9..ec314ac 100644 --- a/src/Cli.ts +++ b/src/Cli.ts @@ -1854,24 +1854,61 @@ async function executeCommand( // Streaming path — async generator → NDJSON response if ('stream' in result) { + const iterator = result.stream + const encoder = new TextEncoder() + const meta = (cta?: FormattedCtaBlock | undefined) => ({ + command: path, + duration: `${Math.round(performance.now() - start)}ms`, + ...(cta ? { cta } : undefined), + }) + const errorRecord = (err: ErrorResult) => ({ + type: 'error', + ok: false, + error: { + code: err.code, + message: err.message, + ...(err.retryable !== undefined ? { retryable: err.retryable } : undefined), + }, + meta: meta(formatCtaBlock(options.name ?? path, err.cta)), + }) const stream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder() + async cancel() { + await iterator.return(undefined) + }, + async pull(controller) { try { - for await (const value of result.stream) { + const { value, done } = await iterator.next() + if (done) { + if (isSentinel(value) && value[sentinel] === 'error') { + controller.enqueue(encoder.encode(JSON.stringify(errorRecord(value)) + '\n')) + controller.close() + return + } + const cta = + isSentinel(value) && value[sentinel] === 'ok' + ? formatCtaBlock(options.name ?? path, value.cta) + : undefined controller.enqueue( - encoder.encode(JSON.stringify({ type: 'chunk', data: value }) + '\n'), + encoder.encode( + JSON.stringify({ + type: 'done', + ok: true, + meta: meta(cta), + }) + '\n', + ), ) + controller.close() + return } - controller.enqueue( - encoder.encode( - JSON.stringify({ - type: 'done', - ok: true, - meta: { command: path }, - }) + '\n', - ), - ) + + if (isSentinel(value) && value[sentinel] === 'error') { + controller.enqueue(encoder.encode(JSON.stringify(errorRecord(value)) + '\n')) + await iterator.return(undefined) + controller.close() + return + } + + controller.enqueue(encoder.encode(JSON.stringify({ type: 'chunk', data: value }) + '\n')) } catch (error) { controller.enqueue( encoder.encode( @@ -1879,14 +1916,16 @@ async function executeCommand( type: 'error', ok: false, error: { - code: 'UNKNOWN', + code: error instanceof IncurError ? error.code : 'UNKNOWN', message: error instanceof Error ? error.message : String(error), + ...(error instanceof IncurError ? { retryable: error.retryable } : undefined), }, + meta: meta(), }) + '\n', ), ) + controller.close() } - controller.close() }, }) return new Response(stream, { @@ -2719,6 +2758,7 @@ async function handleStreaming( error: { code: error instanceof IncurError ? error.code : 'UNKNOWN', message: error instanceof Error ? error.message : String(error), + ...(error instanceof IncurError ? { retryable: error.retryable } : undefined), }, }), ) @@ -2802,6 +2842,7 @@ async function handleStreaming( error: { code: error instanceof IncurError ? error.code : 'UNKNOWN', message: error instanceof Error ? error.message : String(error), + ...(error instanceof IncurError ? { retryable: error.retryable } : undefined), }, meta: { command: ctx.path, diff --git a/src/Parser.ts b/src/Parser.ts index ea21a75..3a601ad 100644 --- a/src/Parser.ts +++ b/src/Parser.ts @@ -257,7 +257,7 @@ function setOption( } /** Wraps zod schema.parse(), converting ZodError to ValidationError. */ -function zodParse(schema: z.ZodObject, data: Record) { +export function zodParse(schema: z.ZodObject, data: Record) { try { return schema.parse(data) } catch (err: any) { diff --git a/src/e2e.test.ts b/src/e2e.test.ts index 61bbb4d..4cd126c 100644 --- a/src/e2e.test.ts +++ b/src/e2e.test.ts @@ -2833,6 +2833,8 @@ describe('fetch api', () => { .trim() .split('\n') .map((l) => JSON.parse(l)) + expect(lines[2].meta.duration).toMatch(/^\d+ms$/) + lines[2].meta.duration = '' expect(lines).toMatchInlineSnapshot(` [ { @@ -2850,6 +2852,7 @@ describe('fetch api', () => { { "meta": { "command": "stream", + "duration": "", }, "ok": true, "type": "done", diff --git a/src/internal/command.ts b/src/internal/command.ts index 3dc0c6f..c8f7f08 100644 --- a/src/internal/command.ts +++ b/src/internal/command.ts @@ -81,12 +81,12 @@ export async function execute(command: any, options: execute.Options): Promise + return yield* raw as AsyncGenerator } finally { resolveStreamConsumed!() }