Skip to content

fix(cli): preserve stream terminal records#149

Open
0xpolarzero wants to merge 2 commits into
wevm:mainfrom
0xpolarzero:stream-terminal-command-results
Open

fix(cli): preserve stream terminal records#149
0xpolarzero wants to merge 2 commits into
wevm:mainfrom
0xpolarzero:stream-terminal-command-results

Conversation

@0xpolarzero
Copy link
Copy Markdown

@0xpolarzero 0xpolarzero commented May 26, 2026

Warning

This PR is stacked on top of #144 (parse-object-validation). Because wevm/incur does not have that base branch and this PR targets main, the diff includes the commits from that prerequisite branch as well.

Check the PR on my fork for reviewing from a stacked diff.

Overview

Fix HTTP NDJSON streaming so the terminal record matches how the async generator actually ended.

The bug was that HTTP streaming used for await...of, which can emit yielded chunks but cannot see the generator's final return value. Middleware also used yield* raw without returning it, so returned stream sentinels could disappear before HTTP saw them.

Behavior Covered

Returned c.ok() keeps terminal CTA metadata

cli.command('stream', {
  async *run(c) {
    yield { progress: 1 }
    return c.ok(
      { ignored: true },
      { cta: { commands: ['next'], description: 'Next steps:' } },
    )
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "done",
          "ok": true,
          "meta": {
            "command": "stream",
            "cta": {
              "commands": [{ "command": "test next" }],
              "description": "Next steps:"
            },
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: final line was only:
// { "type": "done", "ok": true, "meta": { "command": "stream" } }
// CTA metadata from returned c.ok() was lost.

Returned c.error() becomes a terminal error

cli.command('stream', {
  async *run(c) {
    yield { progress: 1 }
    return c.error({
      code: 'STREAM_FAIL',
      message: 'failed late',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "error",
          "ok": false,
          "error": {
            "code": "STREAM_FAIL",
            "message": "failed late",
            "retryable": true
          },
          "meta": {
            "command": "stream",
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: this ended as a successful done record.
// The returned c.error() was never inspected.

Yielded c.error() stops the stream and closes the generator

let closed = false

cli.command('stream', {
  async *run(c) {
    try {
      yield { progress: 1 }
      yield c.error({ code: 'STREAM_FAIL', message: 'failed now' })
      yield { progress: 2 }
    } finally {
      closed = true
    }
  },
})

const result = await fetchNdjson(cli, new Request('http://localhost/stream'))

expect(result.lines).toEqual([
  { type: 'chunk', data: { progress: 1 } },
  {
    type: 'error',
    ok: false,
    error: { code: 'STREAM_FAIL', message: 'failed now' },
    meta: { command: 'stream', duration: '<stripped>' },
  },
])
expect(closed).toBe(true)

// Before: yielded c.error() was serialized as chunk data,
// and later stream work could continue instead of stopping terminally.

Terminal-only streams work through middleware

const order: string[] = []

cli.use(async (c, next) => {
  order.push(`before:${c.command}`)
  await next()
  order.push(`after:${c.command}`)
})

sub.command('ok', {
  async *run(c) {
    return c.ok(
      { ignored: true },
      { cta: { commands: [{ command: 'next', description: 'Continue' }] } },
    )
  },
})

sub.command('fail', {
  async *run(c) {
    return c.error({
      code: 'EMPTY_FAIL',
      cta: { commands: ['retry'], description: 'Recover with:' },
      message: 'failed before chunks',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/ops/ok')))
  .toMatchObject({
    lines: [{ type: 'done', ok: true, meta: { command: 'ops ok' } }],
  })

expect(await fetchNdjson(cli, new Request('http://localhost/ops/fail')))
  .toMatchObject({
    lines: [{
      type: 'error',
      ok: false,
      error: { code: 'EMPTY_FAIL', retryable: true },
      meta: { command: 'ops fail' },
    }],
  })

expect(order).toEqual([
  'before:ops ok',
  'after:ops ok',
  'before:ops fail',
  'after:ops fail',
])

// Before: middleware used `yield* raw`, which forwards yielded chunks
// but discards the delegated generator's final return value.

Cancellation runs generator and middleware cleanup

const order: string[] = []

cli.use(async (_c, next) => {
  order.push('mw:before')
  await next()
  order.push('mw:after')
})

cli.command('stream', {
  async *run() {
    try {
      order.push('stream:yield')
      yield { progress: 1 }
      while (true) yield { progress: 2 }
    } finally {
      order.push('stream:finally')
    }
  },
})

const res = await cli.fetch(new Request('http://localhost/stream'))
const reader = res.body!.getReader()
await reader.read()
await reader.cancel()

expect(order).toEqual([
  'mw:before',
  'stream:yield',
  'stream:finally',
  'mw:after',
])

// Before: HTTP cancellation did not explicitly call iterator.return(),
// so generator cleanup and middleware after-hooks were not guaranteed.

Thrown stream errors include terminal metadata

cli.command('stream', {
  async *run() {
    yield { progress: 1 }
    throw new Errors.IncurError({
      code: 'RATE_LIMITED',
      message: 'too fast',
      retryable: true,
    })
  },
})

expect(await fetchNdjson(cli, new Request('http://localhost/stream')))
  .toMatchInlineSnapshot(`
    {
      "lines": [
        { "type": "chunk", "data": { "progress": 1 } },
        {
          "type": "error",
          "ok": false,
          "error": {
            "code": "RATE_LIMITED",
            "message": "too fast",
            "retryable": true
          },
          "meta": {
            "command": "stream",
            "duration": "<stripped>"
          }
        }
      ],
      "status": 200
    }
  `)

// Before: thrown HTTP stream errors were UNKNOWN and had no terminal meta.

Fix

HTTP streaming now drives the async iterator manually with iterator.next() so it can inspect the final done value. It formats returned c.ok() as terminal done, returned or yielded c.error() as terminal error, closes the stream after terminal errors, and calls iterator.return() on cancellation.

Middleware streaming now uses return yield* raw, preserving the wrapped generator's final return value.

CLI streaming machine-format errors also preserve IncurError.retryable, matching non-stream command errors.

@0xpolarzero 0xpolarzero marked this pull request as ready for review May 26, 2026 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant