Skip to content

fix(cli): preserve stream terminal records#12

Open
0xpolarzero wants to merge 2 commits into
parse-object-validationfrom
stream-terminal-command-results
Open

fix(cli): preserve stream terminal records#12
0xpolarzero wants to merge 2 commits into
parse-object-validationfrom
stream-terminal-command-results

Conversation

@0xpolarzero
Copy link
Copy Markdown
Owner

@0xpolarzero 0xpolarzero commented May 26, 2026

Overview

Fix HTTP NDJSON streaming so the terminal record matches how the async generator actually ended.

The bug was that HTTP streaming used for await...of, which can emit yielded chunks but cannot see the generator's final return value. Middleware also used yield* raw without returning it, so returned stream sentinels could disappear before HTTP saw them.

Behavior Covered

Returned c.ok() keeps terminal CTA metadata

cli.command('stream', {
  async *run(c) {
    yield { progress: 1 }
    return c.ok(
      { ignored: true },
      { cta: { commands: ['next'], description: 'Next steps:' } },
    )
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "done",
          "ok": true,
          "meta": {
            "command": "stream",
            "cta": {
              "commands": [{ "command": "test next" }],
              "description": "Next steps:"
            },
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: final line was only:
// { "type": "done", "ok": true, "meta": { "command": "stream" } }
// CTA metadata from returned c.ok() was lost.

Returned c.error() becomes a terminal error

cli.command('stream', {
  async *run(c) {
    yield { progress: 1 }
    return c.error({
      code: 'STREAM_FAIL',
      message: 'failed late',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "error",
          "ok": false,
          "error": {
            "code": "STREAM_FAIL",
            "message": "failed late",
            "retryable": true
          },
          "meta": {
            "command": "stream",
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: this ended as a successful done record.
// The returned c.error() was never inspected.

Yielded c.error() stops the stream and closes the generator

let closed = false

cli.command('stream', {
  async *run(c) {
    try {
      yield { progress: 1 }
      yield c.error({ code: 'STREAM_FAIL', message: 'failed now' })
      yield { progress: 2 }
    } finally {
      closed = true
    }
  },
})

const result = await fetchNdjson(cli, new Request('http://localhost/stream'))

expect(result.lines).toEqual([
  { type: 'chunk', data: { progress: 1 } },
  {
    type: 'error',
    ok: false,
    error: { code: 'STREAM_FAIL', message: 'failed now' },
    meta: { command: 'stream', duration: '<stripped>' },
  },
])
expect(closed).toBe(true)

// Before: yielded c.error() was serialized as chunk data,
// and later stream work could continue instead of stopping terminally.

Terminal-only streams work through middleware

const order: string[] = []

cli.use(async (c, next) => {
  order.push(`before:${c.command}`)
  await next()
  order.push(`after:${c.command}`)
})

sub.command('ok', {
  async *run(c) {
    return c.ok(
      { ignored: true },
      { cta: { commands: [{ command: 'next', description: 'Continue' }] } },
    )
  },
})

sub.command('fail', {
  async *run(c) {
    return c.error({
      code: 'EMPTY_FAIL',
      cta: { commands: ['retry'], description: 'Recover with:' },
      message: 'failed before chunks',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/ops/ok')))
  .toMatchObject({
    lines: [{ type: 'done', ok: true, meta: { command: 'ops ok' } }],
  })

expect(await fetchNdjson(cli, new Request('http://localhost/ops/fail')))
  .toMatchObject({
    lines: [{
      type: 'error',
      ok: false,
      error: { code: 'EMPTY_FAIL', retryable: true },
      meta: { command: 'ops fail' },
    }],
  })

expect(order).toEqual([
  'before:ops ok',
  'after:ops ok',
  'before:ops fail',
  'after:ops fail',
])

// Before: middleware used `yield* raw`, which forwards yielded chunks
// but discards the delegated generator's final return value.

Cancellation runs generator and middleware cleanup

const order: string[] = []

cli.use(async (_c, next) => {
  order.push('mw:before')
  await next()
  order.push('mw:after')
})

cli.command('stream', {
  async *run() {
    try {
      order.push('stream:yield')
      yield { progress: 1 }
      while (true) yield { progress: 2 }
    } finally {
      order.push('stream:finally')
    }
  },
})

const res = await cli.fetch(new Request('http://localhost/stream'))
const reader = res.body!.getReader()
await reader.read()
await reader.cancel()

expect(order).toEqual([
  'mw:before',
  'stream:yield',
  'stream:finally',
  'mw:after',
])

// Before: HTTP cancellation did not explicitly call iterator.return(),
// so generator cleanup and middleware after-hooks were not guaranteed.

Thrown stream errors include terminal metadata

cli.command('stream', {
  async *run() {
    yield { progress: 1 }
    throw new Errors.IncurError({
      code: 'RATE_LIMITED',
      message: 'too fast',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "error",
          "ok": false,
          "error": {
            "code": "RATE_LIMITED",
            "message": "too fast",
            "retryable": true
          },
          "meta": {
            "command": "stream",
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: thrown HTTP stream errors were UNKNOWN and had no terminal meta.

Fix

HTTP streaming now drives the async iterator manually with iterator.next() so it can inspect the final done value. It formats returned c.ok() as terminal done, returned or yielded c.error() as terminal error, closes the stream after terminal errors, and calls iterator.return() on cancellation.

Middleware streaming now uses return yield* raw, preserving the wrapped generator's final return value.

CLI streaming machine-format errors also preserve IncurError.retryable, matching non-stream command errors.

Copy link
Copy Markdown
Owner Author

0xpolarzero commented May 26, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@0xpolarzero 0xpolarzero force-pushed the stream-terminal-command-results branch from 225bea7 to 53fdfcf Compare May 26, 2026 17:41
@0xpolarzero 0xpolarzero marked this pull request as ready for review May 26, 2026 17:47
@0xpolarzero 0xpolarzero changed the base branch from parse-object-validation to graphite-base/12 May 26, 2026 18:11
@0xpolarzero 0xpolarzero changed the base branch from graphite-base/12 to main May 26, 2026 18:11
@0xpolarzero 0xpolarzero changed the base branch from main to parse-object-validation May 26, 2026 20:00
@0xpolarzero 0xpolarzero force-pushed the stream-terminal-command-results branch from 53fdfcf to 6be41bd Compare May 27, 2026 17:21
@0xpolarzero 0xpolarzero force-pushed the parse-object-validation branch from 3fd525c to 0217d7b Compare May 27, 2026 17:21
@0xpolarzero 0xpolarzero force-pushed the parse-object-validation branch from 0217d7b to 85d238a Compare May 27, 2026 18:50
@0xpolarzero 0xpolarzero force-pushed the stream-terminal-command-results branch from 3776dbe to ea85131 Compare May 27, 2026 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant