From ac6191b516c7881e06dae4532c37c9da71ce56fe Mon Sep 17 00:00:00 2001 From: nullkey Date: Fri, 15 May 2026 13:14:52 +0800 Subject: [PATCH 1/3] feat(ndjson): emit video_url/duration_ms/retryable on terminal events (0.6.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reconcile the long-standing gap between events.md (aspirational) and the real NDJSON wire shape (drift). The events.md draft has documented these fields since the schema was first written, but the CLI was dropping them from backend `aim_result` and `error` SSE payloads — agent consumers had no way to retrieve the rendered video URL from the terminal event. Code - StreamEvent gains VideoURL, DurationMs, Retryable. stream.go pulls the URL from html_path (v=3 pipeline) or text (v=2 agent) and reads duration_ms from the v=3 metadata bag. Retryable is inferred from the code label via httpclient.IsRetryableCode since the backend emits no retryable flag of its own. - New StreamEvent.NDJSONFields() helper produces the canonical wire shape. `vk create` and `vk video wait` both delegate to it so the two stream-consuming entry points stay in sync (they previously diverged: wait emitted stage/node on every event regardless of type and never emitted code/retryable/video_url). - Exit codes: retryable task failures now exit 4 (previously always 5). `vk video wait` gained the same script_invalid→2 / retryable→4 parity already in `vk create`. Docs - events.md rewritten against the actual taxonomy. Removed five documented-but-never-emitted events. Added engine-difference section. Schema version stays "1"; node.*→stage.* rename and code→error_code rename are reserved for a future "2" bump. Tests - httpclient.IsRetryableCode unit table (transient vs permanent vs unknown). - stream_test: aim_result v=3 carries video_url+duration_ms; aim_result v=2 falls back to text and omits duration_ms; plain `error` SSE marks retryable=false; concurrent_work_limit marks retryable=true. - event_ndjson_test: helper omits absent optional fields and always emits retryable on task.failed. - New integration test runs the real binary with mock figlens and asserts the on-the-wire NDJSON shape end-to-end, plus the exit-4 path for concurrent_work_limit. Real-backend smoke deliberately skipped this round to avoid burning credits on an already-validated payload-shape change; the integration test exercises the same wire path end-to-end through the binary. --- CHANGELOG.md | 46 +++++++ client/figlens/event_ndjson.go | 49 +++++++ client/figlens/event_ndjson_test.go | 66 +++++++++ client/figlens/stream.go | 58 +++++++- client/figlens/stream_test.go | 126 +++++++++++++++++ cmd/create.go | 28 ++-- cmd/video/wait.go | 30 ++-- internal/httpclient/errors.go | 17 +++ internal/httpclient/errors_test.go | 28 ++++ package.json | 2 +- skills/vibeknow-core/SKILL.md | 2 +- skills/vibeknow-create/SKILL.md | 23 ++-- skills/vibeknow-create/references/events.md | 136 +++++++++--------- skills/vibeknow-doc/SKILL.md | 2 +- tests/integration/create_ndjson_test.go | 145 ++++++++++++++++++++ 15 files changed, 644 insertions(+), 114 deletions(-) create mode 100644 client/figlens/event_ndjson.go create mode 100644 client/figlens/event_ndjson_test.go create mode 100644 tests/integration/create_ndjson_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d450d7a..dac51da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,51 @@ # Changelog +## 0.6.3 — 2026-05-15 + +### Fixed (NDJSON terminal events were missing result data) + +- `task.succeeded` events emitted by `vk create --output ndjson` and + `vk video wait --output ndjson` now include `video_url` (and, on the + pipeline engine, `duration_ms`). Previously the CLI dropped these + fields from the backend `aim_result` SSE payload and forwarded only + `session_id`, so any NDJSON consumer (agent or shell script) had no + way to retrieve the rendered video URL from the terminal event + without making a separate `vk video status` follow-up call. +- `task.failed` events now include a `retryable` boolean derived on + the CLI side from the error code (transient codes — `rate_limited`, + `internal_error`, `concurrent_work_limit` — map to `retryable=true`; + permanent codes like `insufficient_credits` and `script_invalid` + map to `retryable=false`). The backend's terminal `error` event + carries no retryable flag of its own, so the CLI is the source of + truth for this signal. +- `vk create` exit codes now honor the `retryable` flag: transient + task failures exit **4** (previously always 5). `vk video wait` + gained the same exit-code mapping plus `script_invalid → exit 2` + parity with `vk create`. Shell scripts that previously hard-coded + `[ $? = 5 ]` to detect any task failure should switch to + `[ $? -ge 4 ] && [ $? -le 5 ]` or branch on the NDJSON `retryable` + field directly. + +### Docs + +- `skills/vibeknow-create/references/events.md` rewritten against + the actual emitted event taxonomy. Previously documented but + never-emitted events (`task.submitted`, `task.queued`, `stage.*`, + `task.cancelled`) removed. Engine-difference section added to make + the pipeline-vs-agent split explicit. Schema version stays `"1"`; + a `"2"` bump is reserved for renaming `node.*` → `stage.*` or + `code` → `error_code`, neither of which ships in this release. + +### Internal + +- New `figlens.StreamEvent.NDJSONFields()` helper produces the wire + shape both `vk create` and `vk video wait` emit. The two commands + previously diverged in subtle ways (wait emitted `stage` / `node` + on every event regardless of type and never emitted `code`, + `retryable`, `video_url`); they now share one canonical projection. +- New `httpclient.IsRetryableCode(code)` centralizes the retryable + inference so the SSE path no longer needs an ad-hoc switch. + ## 0.6.2 — 2026-05-14 ### Fixed diff --git a/client/figlens/event_ndjson.go b/client/figlens/event_ndjson.go new file mode 100644 index 0000000..3667161 --- /dev/null +++ b/client/figlens/event_ndjson.go @@ -0,0 +1,49 @@ +package figlens + +// NDJSONFields renders the event into the canonical wire-shape map that +// `vk create` and `vk video wait` both emit on `--output ndjson`. The two +// commands deliberately use the same projection so a downstream agent +// can consume either stream interchangeably. +// +// Optional fields are omitted (not zero-emitted) so consumers can rely +// on presence implying a real value — useful in particular for +// `duration_ms`, which the agent engine simply does not produce. +func (e StreamEvent) NDJSONFields() map[string]any { + switch e.Type { + case "node.started", "node.succeeded", "node.failed": + return map[string]any{ + "type": e.Type, + "stage": e.Stage, + "node": e.Node, + "message": e.Message, + } + case "node.progress": + return map[string]any{ + "type": e.Type, + "status": e.Status, + "message": e.Message, + } + case "task.succeeded": + out := map[string]any{ + "type": e.Type, + "session_id": e.SessionID, + } + if e.VideoURL != "" { + out["video_url"] = e.VideoURL + } + if e.DurationMs > 0 { + out["duration_ms"] = e.DurationMs + } + return out + case "task.failed": + return map[string]any{ + "type": e.Type, + "code": e.Code, + "message": e.Message, + "retryable": e.Retryable, + } + } + // Unknown event types are passed through with just the type so + // future backend additions don't crash existing consumers. + return map[string]any{"type": e.Type} +} diff --git a/client/figlens/event_ndjson_test.go b/client/figlens/event_ndjson_test.go new file mode 100644 index 0000000..2fc5882 --- /dev/null +++ b/client/figlens/event_ndjson_test.go @@ -0,0 +1,66 @@ +package figlens_test + +import ( + "reflect" + "testing" + + "github.com/vibeknow/cli/client/figlens" +) + +func TestNDJSONFields_TaskSucceededOmitsAbsentFields(t *testing.T) { + // Agent engine has no duration_ms — must be omitted, not zero-emitted, + // so a consumer doing `if "duration_ms" in event` works. + ev := figlens.StreamEvent{Type: "task.succeeded", SessionID: "s", VideoURL: "https://x"} + got := ev.NDJSONFields() + if _, has := got["duration_ms"]; has { + t.Fatalf("duration_ms must be omitted when zero, got %v", got["duration_ms"]) + } + if got["video_url"] != "https://x" { + t.Fatalf("video_url = %v, want https://x", got["video_url"]) + } +} + +func TestNDJSONFields_TaskSucceededIncludesPresentFields(t *testing.T) { + ev := figlens.StreamEvent{ + Type: "task.succeeded", SessionID: "s", + VideoURL: "https://x", DurationMs: 12345, + } + got := ev.NDJSONFields() + want := map[string]any{ + "type": "task.succeeded", + "session_id": "s", + "video_url": "https://x", + "duration_ms": int64(12345), + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } +} + +func TestNDJSONFields_TaskFailedAlwaysCarriesRetryable(t *testing.T) { + // retryable must be present on every task.failed event — consumers + // branch exit-code 4 vs 5 on it, so omission would silently change + // behavior. + ev := figlens.StreamEvent{Type: "task.failed", Code: "rate_limited", Message: "slow down", Retryable: true} + got := ev.NDJSONFields() + if got["retryable"] != true { + t.Fatalf("retryable = %v, want true", got["retryable"]) + } + if got["code"] != "rate_limited" { + t.Fatalf("code = %v", got["code"]) + } +} + +func TestNDJSONFields_NodeProgressUsesStatusNotStage(t *testing.T) { + // node.progress is the agent-engine free-form event; it carries + // `status` (start/success/error), not stage/node which are pipeline + // concepts. + ev := figlens.StreamEvent{Type: "node.progress", Status: "start", Message: "calling KB"} + got := ev.NDJSONFields() + if _, has := got["stage"]; has { + t.Fatalf("node.progress must not carry stage/node, got %v", got) + } + if got["status"] != "start" { + t.Fatalf("status = %v, want start", got["status"]) + } +} diff --git a/client/figlens/stream.go b/client/figlens/stream.go index a836ef2..ef1966c 100644 --- a/client/figlens/stream.go +++ b/client/figlens/stream.go @@ -33,6 +33,16 @@ type StreamEvent struct { Node string Message string SessionID string + // Set on task.succeeded. VideoURL is the playable HTML URL (v=3 + // `html_path`, v=2 `text`). DurationMs is the rendered video length + // in milliseconds; backend only emits it on v=3 (in `data.duration_ms`), + // so v=2 task.succeeded events carry DurationMs=0. + VideoURL string + DurationMs int64 + // Set on task.failed. Derived from Code via httpclient.IsRetryableCode + // because the backend's terminal SSE error event carries no retryable + // flag of its own. + Retryable bool } type ssePayload struct { @@ -45,6 +55,18 @@ type sseData struct { Log json.RawMessage `json:"log,omitempty"` SessionID string `json:"session_id,omitempty"` Message string `json:"message,omitempty"` + // aim_result terminal-event fields. The two engines disagree on + // where the playable URL lives — see resolveVideoURL. + HtmlPath string `json:"html_path,omitempty"` // v=3 pipeline + Text string `json:"text,omitempty"` // v=2 agent (also free text on v=3, ignored there) + DataMap json.RawMessage `json:"data,omitempty"` // v=3 metadata bag, contains duration_ms etc. +} + +// aimResultData captures the v=3 metadata bag. Other fields exist +// (themeId, fps, coverUrl, scenes, …) but the CLI only forwards +// duration_ms today — additions are cheap when consumers need them. +type aimResultData struct { + DurationMs int64 `json:"duration_ms"` } type processLog struct { @@ -62,6 +84,22 @@ func mapSSECode(code int) string { return "business_error" } +// resolveVideoURL extracts the playable HTML URL from an aim_result payload. +// The two engines disagree on which field holds it: +// - v=3 pipeline: `html_path` (preferred — engine emits a structured URL). +// - v=2 agent: `text` (the agent only has one free-form result field; +// it puts the HTML URL there directly). +// +// Picking HtmlPath first means v=3 wins cleanly when both happen to be set, +// which they shouldn't be — but we defend against the cross-engine drift +// case rather than assume the backend stays in its lane. +func resolveVideoURL(d sseData) string { + if d.HtmlPath != "" { + return d.HtmlPath + } + return d.Text +} + func (c *Client) StreamChat(ctx context.Context, params StreamParams, onEvent func(StreamEvent)) error { resp, err := c.http.DoRaw(ctx, "POST", params.Engine.StreamPath(), params) if err != nil { @@ -101,10 +139,12 @@ func (c *Client) StreamChat(ctx context.Context, params StreamParams, onEvent fu msg = d.Message } } + code := mapSSECode(payload.Code) onEvent(StreamEvent{ - Type: "task.failed", - Code: mapSSECode(payload.Code), - Message: msg, + Type: "task.failed", + Code: code, + Message: msg, + Retryable: httpclient.IsRetryableCode(code), }) return nil } @@ -154,13 +194,23 @@ func (c *Client) StreamChat(ctx context.Context, params StreamParams, onEvent fu } case "aim_result": - onEvent(StreamEvent{Type: "task.succeeded", SessionID: d.SessionID}) + ev := StreamEvent{Type: "task.succeeded", SessionID: d.SessionID} + ev.VideoURL = resolveVideoURL(d) + if len(d.DataMap) > 0 { + var ar aimResultData + if json.Unmarshal(d.DataMap, &ar) == nil { + ev.DurationMs = ar.DurationMs + } + } + onEvent(ev) case "error", "ERROR": msg := d.Message if msg == "" { msg = string(payload.Data) } + // Plain `error` SSE never carries a code; Retryable defaults + // to false so consumers do not silently re-run unbounded. onEvent(StreamEvent{Type: "task.failed", Message: msg}) } } diff --git a/client/figlens/stream_test.go b/client/figlens/stream_test.go index 1f9d5b4..a5386e8 100644 --- a/client/figlens/stream_test.go +++ b/client/figlens/stream_test.go @@ -140,6 +140,132 @@ func TestStreamChat_ScriptInvalidCode(t *testing.T) { if events[0].Message != "讲稿超过 8000 字" { t.Fatalf("expected backend message verbatim, got %q", events[0].Message) } + if events[0].Retryable { + t.Fatalf("script_invalid must not be retryable (it's a permanent input error)") + } +} + +// concurrent_work_limit is the canonical transient business code: same +// request can succeed once the user's other tasks finish, so the CLI must +// surface Retryable=true here. +func TestStreamChat_ConcurrentLimitIsRetryable(t *testing.T) { + sseBody := `data: {"code":100003,"data":{"message":"too many concurrent works"}} + +` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + fmt.Fprint(w, sseBody) + })) + defer srv.Close() + + c := figlens.New(srv.URL, staticToken("tok")) + var events []figlens.StreamEvent + err := c.StreamChat(context.Background(), figlens.StreamParams{TaskID: 1, SessionID: "s"}, func(ev figlens.StreamEvent) { + events = append(events, ev) + }) + if err != nil { + t.Fatalf("StreamChat: %v", err) + } + if len(events) == 0 || events[0].Code != "concurrent_work_limit" { + t.Fatalf("expected Code=concurrent_work_limit, got %+v", events) + } + if !events[0].Retryable { + t.Fatalf("concurrent_work_limit must be retryable") + } +} + +// Plain `error` SSE (no envelope code) is the v=2 agent failure shape. +// Backend sends no retryable flag and no code, so the CLI must default +// Retryable=false rather than guess true. +func TestStreamChat_PlainErrorEventIsNotRetryable(t *testing.T) { + sseBody := `data: {"code":200,"data":{"type":"error","message":"agent crashed"}} + +` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + fmt.Fprint(w, sseBody) + })) + defer srv.Close() + + c := figlens.New(srv.URL, staticToken("tok")) + var events []figlens.StreamEvent + err := c.StreamChat(context.Background(), figlens.StreamParams{TaskID: 1, SessionID: "s"}, func(ev figlens.StreamEvent) { + events = append(events, ev) + }) + if err != nil { + t.Fatalf("StreamChat: %v", err) + } + if len(events) == 0 || events[0].Type != "task.failed" { + t.Fatalf("expected task.failed, got %v", events) + } + if events[0].Retryable { + t.Fatalf("plain error SSE has no code — must not be marked retryable") + } +} + +// aim_result on v=3 pipeline: video URL in `html_path`, duration_ms in +// the metadata bag. Both must surface on the task.succeeded event so +// agent consumers can act on the result. +func TestStreamChat_AimResultV3HasVideoURLAndDuration(t *testing.T) { + sseBody := `data: {"code":200,"data":{"type":"aim_result","session_id":"s_v3","html_path":"https://cdn.example.com/v/s_v3.html","data":{"duration_ms":42500,"fps":30}}} + +data: [DONE] + +` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + fmt.Fprint(w, sseBody) + })) + defer srv.Close() + + c := figlens.New(srv.URL, staticToken("tok")) + var events []figlens.StreamEvent + err := c.StreamChat(context.Background(), figlens.StreamParams{TaskID: 1, SessionID: "s_v3"}, func(ev figlens.StreamEvent) { + events = append(events, ev) + }) + if err != nil { + t.Fatalf("StreamChat: %v", err) + } + if len(events) == 0 || events[0].Type != "task.succeeded" { + t.Fatalf("expected task.succeeded, got %+v", events) + } + if events[0].VideoURL != "https://cdn.example.com/v/s_v3.html" { + t.Fatalf("VideoURL = %q, want html_path value", events[0].VideoURL) + } + if events[0].DurationMs != 42500 { + t.Fatalf("DurationMs = %d, want 42500", events[0].DurationMs) + } +} + +// aim_result on v=2 agent: video URL lives in `text`; no duration_ms. +// Falling back to text — and not promising a duration we don't have — +// is the contract this case pins. +func TestStreamChat_AimResultV2UsesTextForVideoURL(t *testing.T) { + sseBody := `data: {"code":200,"data":{"type":"aim_result","session_id":"s_v2","text":"https://cdn.example.com/v/s_v2.html"}} + +data: [DONE] + +` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + fmt.Fprint(w, sseBody) + })) + defer srv.Close() + + c := figlens.New(srv.URL, staticToken("tok")) + var events []figlens.StreamEvent + err := c.StreamChat(context.Background(), figlens.StreamParams{TaskID: 1, SessionID: "s_v2"}, func(ev figlens.StreamEvent) { + events = append(events, ev) + }) + if err != nil { + t.Fatalf("StreamChat: %v", err) + } + if len(events) == 0 || events[0].VideoURL != "https://cdn.example.com/v/s_v2.html" { + t.Fatalf("expected VideoURL from text field, got %+v", events) + } + if events[0].DurationMs != 0 { + t.Fatalf("v=2 has no duration_ms — DurationMs must be 0, got %d", events[0].DurationMs) + } } func TestStreamChat_AgentEngineUsesAgent2Path(t *testing.T) { diff --git a/cmd/create.go b/cmd/create.go index abb41f1..0aa7ded 100644 --- a/cmd/create.go +++ b/cmd/create.go @@ -223,9 +223,7 @@ var createCmd = &cobra.Command{ switch ev.Type { case "node.started", "node.succeeded", "node.failed": if isNDJSONCreate { - _ = output.NewNDJSON(cmd.OutOrStdout()).Event(map[string]any{ - "type": ev.Type, "stage": ev.Stage, "node": ev.Node, "message": ev.Message, - }) + _ = output.NewNDJSON(cmd.OutOrStdout()).Event(ev.NDJSONFields()) } else { switch ev.Type { case "node.started": @@ -238,11 +236,7 @@ var createCmd = &cobra.Command{ } case "node.progress": if isNDJSONCreate { - _ = output.NewNDJSON(cmd.OutOrStdout()).Event(map[string]any{ - "type": "node.progress", - "status": ev.Status, - "message": ev.Message, - }) + _ = output.NewNDJSON(cmd.OutOrStdout()).Event(ev.NDJSONFields()) } else { // [agent] prefix keeps output scannable alongside v=3's [] lines. fmt.Fprintf(os.Stderr, "[agent] %s\n", ev.Message) @@ -253,23 +247,21 @@ var createCmd = &cobra.Command{ successSessionID = task.SessionID } if isNDJSONCreate { - _ = output.NewNDJSON(cmd.OutOrStdout()).Event(map[string]any{ - "type": "task.succeeded", "session_id": ev.SessionID, - }) + _ = output.NewNDJSON(cmd.OutOrStdout()).Event(ev.NDJSONFields()) } else { fmt.Fprintln(os.Stderr, i18n.T("create.task_succeeded")) } case "task.failed": - failExitCode = 5 - if ev.Code == "script_invalid" { + switch { + case ev.Code == "script_invalid": failExitCode = 2 + case ev.Retryable: + failExitCode = 4 + default: + failExitCode = 5 } if isNDJSONCreate { - _ = output.NewNDJSON(cmd.OutOrStdout()).Event(map[string]any{ - "type": "task.failed", - "code": ev.Code, - "message": ev.Message, - }) + _ = output.NewNDJSON(cmd.OutOrStdout()).Event(ev.NDJSONFields()) } else { switch ev.Code { case "insufficient_credits": diff --git a/cmd/video/wait.go b/cmd/video/wait.go index af25e90..a93885a 100644 --- a/cmd/video/wait.go +++ b/cmd/video/wait.go @@ -54,18 +54,12 @@ var waitCmd = &cobra.Command{ var taskFailed, taskSucceeded bool var successSessionID string + var failedCode string + var failedRetryable bool + emit := func(ev figlens.StreamEvent) { if emitEvent != nil { - out := map[string]any{ - "type": ev.Type, - "stage": ev.Stage, - "node": ev.Node, - "message": ev.Message, - } - if ev.SessionID != "" { - out["session_id"] = ev.SessionID - } - _ = emitEvent(out) + _ = emitEvent(ev.NDJSONFields()) } else { switch ev.Type { case "node.started": @@ -74,6 +68,8 @@ var waitCmd = &cobra.Command{ fmt.Fprintf(stderr, "[%s] done\n", ev.Node) case "node.failed": fmt.Fprintf(stderr, "[%s] failed: %s\n", ev.Node, ev.Message) + case "node.progress": + fmt.Fprintf(stderr, "[agent] %s\n", ev.Message) case "task.succeeded": fmt.Fprintf(stderr, "task succeeded\n") case "task.failed": @@ -90,6 +86,8 @@ var waitCmd = &cobra.Command{ } case "task.failed": taskFailed = true + failedCode = ev.Code + failedRetryable = ev.Retryable } } @@ -102,7 +100,17 @@ var waitCmd = &cobra.Command{ return clerr.Newf("stream interrupted: %s", err).WithCode(6) } if taskFailed { - return clerr.Newf("task failed").WithCode(5) + // Mirror `vk create`: script_invalid → 2, retryable → 4, + // otherwise 5. Keeps exit codes consistent across the two + // stream-consuming entry points. + code := 5 + switch { + case failedCode == "script_invalid": + code = 2 + case failedRetryable: + code = 4 + } + return clerr.Newf("task failed").WithCode(code) } if !taskSucceeded { return nil diff --git a/internal/httpclient/errors.go b/internal/httpclient/errors.go index 37c8455..12f2c3a 100644 --- a/internal/httpclient/errors.go +++ b/internal/httpclient/errors.go @@ -94,6 +94,23 @@ const ( CodeAccountPendingDeletion = "account_pending_deletion" ) +// IsRetryableCode reports whether the operation that produced this code +// is likely to succeed if the caller retries the same request unchanged. +// Used by transports that cannot read a server-supplied retryable flag — +// in particular the SSE stream path, where the backend currently emits +// neither `code` nor `retryable` on its terminal `error` event, so the +// CLI must derive the answer from the code label alone. +// +// Codes not listed return false: the safer default is to not promise a +// retry will help when we cannot prove it. +func IsRetryableCode(code string) bool { + switch code { + case "rate_limited", "internal_error", "concurrent_work_limit": + return true + } + return false +} + // MapBusinessCode maps a 100xxx-range backend envelope code to a stable // CLI error code label. Returns ok=false for codes outside that range so // callers can fall back to their own mapping (e.g. HTTP-class for HTTP, diff --git a/internal/httpclient/errors_test.go b/internal/httpclient/errors_test.go index 471e818..4766c35 100644 --- a/internal/httpclient/errors_test.go +++ b/internal/httpclient/errors_test.go @@ -13,6 +13,34 @@ func TestErrObjectAsErrsObject(t *testing.T) { } } +func TestIsRetryableCode(t *testing.T) { + tests := []struct { + code string + want bool + }{ + // Transient: re-running the same request can succeed. + {"rate_limited", true}, + {"internal_error", true}, + {"concurrent_work_limit", true}, + // Permanent: user must change something first. + {"insufficient_credits", false}, + {"script_invalid", false}, + {"freeze_not_found", false}, + {"auth_required", false}, + {"business_error", false}, + // Empty / unknown: never promise retry will help. + {"", false}, + {"unknown_label", false}, + } + for _, tc := range tests { + t.Run(tc.code, func(t *testing.T) { + if got := IsRetryableCode(tc.code); got != tc.want { + t.Errorf("IsRetryableCode(%q) = %v, want %v", tc.code, got, tc.want) + } + }) + } +} + func TestMapEnvelopeCode(t *testing.T) { tests := []struct { name string diff --git a/package.json b/package.json index a1271c0..ed2793c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "vibeknow-cli", - "version": "0.6.2", + "version": "0.6.3", "description": "VibeKnow CLI — turn docs / URLs into videos", "license": "MIT", "bin": { diff --git a/skills/vibeknow-core/SKILL.md b/skills/vibeknow-core/SKILL.md index ee2fa33..31fa17e 100644 --- a/skills/vibeknow-core/SKILL.md +++ b/skills/vibeknow-core/SKILL.md @@ -1,6 +1,6 @@ --- name: vibeknow-core -version: 0.6.2 +version: 0.6.3 description: "vibeknow CLI setup, authentication, profile management, and diagnostics. Use when: first-time setup, auth errors, switching environments, diagnosing connection issues." metadata: requires: diff --git a/skills/vibeknow-create/SKILL.md b/skills/vibeknow-create/SKILL.md index b456cc9..57f6ccc 100644 --- a/skills/vibeknow-create/SKILL.md +++ b/skills/vibeknow-create/SKILL.md @@ -1,6 +1,6 @@ --- name: vibeknow-create -version: 0.6.2 +version: 0.6.3 description: "Generate videos from documents/URLs/files, track video task progress, download results, list voice templates. Use when: user wants to create a video, check task status, download video, or browse voices." metadata: requires: @@ -108,20 +108,21 @@ For detailed error handling and recovery, see [errors.md](references/errors.md) ## NDJSON Event Summary -Events share common fields: `schema_version`, `ts`, `event`, `task_id`. +Events share common fields: `schema_version`, `ts`, `type`. -Key events: +Key events (pipeline engine): | Event | Extra Fields | Meaning | |-------|-------------|---------| -| `task.submitted` | — | Task accepted | -| `stage.started` | `stage` | Pipeline stage begins | -| `stage.progress` | `stage`, `percent`, `message?` | Progress update | -| `stage.succeeded` | `stage`, `duration_ms` | Stage done | -| `task.succeeded` | `video_url`, `duration_ms` | **Terminal**: video ready | -| `task.failed` | `failed_stage`, `error_code`, `error_message`, `retryable` | **Terminal**: task failed | - -See [events.md](references/events.md) for the complete list (including `task.queued`, `stage.failed`, `task.cancelled`) and parsing examples. +| `node.started` | `stage`, `node`, `message` | Pipeline node begins | +| `node.succeeded` | `stage`, `node`, `message` | Node done | +| `node.failed` | `stage`, `node`, `message` | Node failed (not necessarily terminal — wait for `task.failed`) | +| `task.succeeded` | `session_id`, `video_url`, `duration_ms` | **Terminal**: video ready | +| `task.failed` | `code`, `message`, `retryable` | **Terminal**: task failed (`retryable=true` → exit 4, `false` → exit 5) | + +Agent engine (`--engine agent`) replaces `node.started/succeeded/failed` with `node.progress` carrying `status` + `message`, and omits `duration_ms` from `task.succeeded`. + +See [events.md](references/events.md) for the complete field reference, engine differences, and parsing examples. ## References diff --git a/skills/vibeknow-create/references/events.md b/skills/vibeknow-create/references/events.md index 71e29f2..9e88683 100644 --- a/skills/vibeknow-create/references/events.md +++ b/skills/vibeknow-create/references/events.md @@ -1,8 +1,10 @@ # NDJSON Task Event Schema -Schema version: `"1"` (see main spec §11.1 for canonical definition). +Schema version: `"1"`. -Produced by: `vibeknow create` (sync mode, non-TTY or `--output ndjson`) and `vibeknow video wait`. +Produced by: `vibeknow create --output ndjson` and (for already-submitted tasks) `vibeknow video wait --output ndjson`. + +This document describes the events the CLI **actually emits today**. The earlier draft of this file documented several events (`task.submitted`, `task.queued`, `stage.*`, `task.cancelled`) that the backend has never produced — they were aspirational and have been removed. A future schema version may reintroduce them once the backend starts emitting them; consumers should ignore unknown event types. ## Common Fields @@ -11,123 +13,123 @@ Every event line contains: | Field | Type | Required | Description | |-------|------|----------|-------------| | `schema_version` | string | yes | Always `"1"` | -| `ts` | string (RFC3339) | yes | Server-side event timestamp | -| `event` | string (enum) | yes | Event type (see below) | -| `task_id` | string | yes | Task identifier | +| `ts` | string (RFC3339Nano) | yes | Client-side event timestamp (CLI clock) | +| `type` | string (enum) | yes | Event type (see below). Wire field is `type`, not `event`. | -## Event Types +The CLI also stamps event-specific fields documented below. -### task.submitted +## Engine differences -Task accepted by the backend. No extra fields. +The backend offers two engines selectable via `--engine`: -```json -{"schema_version":"1","ts":"2026-04-15T10:00:00Z","event":"task.submitted","task_id":"t_abc123"} -``` +- **`pipeline`** (default, v=3): structured stage graph. Emits `node.started` / `node.succeeded` / `node.failed` for each named pipeline node. Terminal `task.succeeded` carries `video_url` *and* `duration_ms`. +- **`agent`** (v=2): free-form agent loop, no stage graph. Emits `node.progress` lines instead of `node.*` lifecycle events. Terminal `task.succeeded` carries `video_url` but **not** `duration_ms` (the backend does not include it in the agent-mode aim_result payload). -### task.queued +Consumers that need cross-engine compatibility should treat `duration_ms` as optional. -Task is waiting in queue. - -| Field | Type | Description | -|-------|------|-------------| -| `queue_position` | int (optional) | Position in queue | +## Event Types -### stage.started +### node.started -A pipeline stage has begun. +A pipeline node has begun (pipeline engine only). | Field | Type | Description | |-------|------|-------------| -| `stage` | string | One of: `parse`, `outline`, `storyboard`, `tts`, `render`, `publish` | +| `stage` | string | High-level stage bucket (e.g. `parse`, `outline`, `storyboard`, `tts`, `render`, `publish`) | +| `node` | string | Specific node display name within the stage | +| `message` | string | Backend-supplied status message (may be empty) | -### stage.progress +```json +{"schema_version":"1","ts":"2026-05-15T10:00:01.234Z","type":"node.started","stage":"parse","node":"prepare","message":"开始解析"} +``` -Progress update within a stage. Emitted at least once every 5 seconds. +### node.succeeded -| Field | Type | Description | -|-------|------|-------------| -| `stage` | string | Current stage | -| `percent` | int (0-100) | Completion percentage | -| `message` | string (optional) | Human-readable status | - -### stage.succeeded +A pipeline node completed (pipeline engine only). Same fields as `node.started`. -A pipeline stage completed successfully. +### node.failed -| Field | Type | Description | -|-------|------|-------------| -| `stage` | string | Completed stage | -| `duration_ms` | int | Stage duration in milliseconds | +A pipeline node failed (pipeline engine only). Same fields as `node.started`. A non-fatal node failure does **not** by itself end the stream — wait for the terminal `task.failed`. There is currently no `fatal` or `retryable` field on this event. -### stage.failed +### node.progress -A pipeline stage failed. +Free-form progress message (agent engine only). The agent does not have a named stage graph, so progress is reported as a status string + human-readable message. | Field | Type | Description | |-------|------|-------------| -| `stage` | string | Failed stage | -| `error_code` | string | Error code | -| `error_message` | string | Human-readable error | -| `retryable` | bool | Whether the failure is retryable | -| `fatal` | bool | If `true`, `task.failed` will follow. If `false`, pipeline may continue. | +| `status` | string | `"start"` / `"success"` / `"error"` | +| `message` | string | Human-readable description of the current step | + +```json +{"schema_version":"1","ts":"2026-05-15T10:00:01.234Z","type":"node.progress","status":"start","message":"正在调用知识库..."} +``` ### task.succeeded (**terminal**) -Video generation completed. +Video generation completed. Always the last event emitted on success. | Field | Type | Description | |-------|------|-------------| -| `video_url` | string | URL to download the video | -| `thumbnail_url` | string (optional) | Thumbnail image URL | -| `duration_ms` | int | Total task duration in milliseconds | +| `session_id` | string | Session that produced the video (use with `vk video download`) | +| `video_url` | string (optional) | Playable HTML URL. Omitted if the backend did not include one. | +| `duration_ms` | integer (optional) | Rendered video length in milliseconds. **Pipeline engine only** — omitted on agent engine. | ```json -{"schema_version":"1","ts":"2026-04-15T10:05:00Z","event":"task.succeeded","task_id":"t_abc123","video_url":"https://cdn.example.com/v/t_abc123.mp4","duration_ms":300000} +{"schema_version":"1","ts":"2026-05-15T10:05:00.000Z","type":"task.succeeded","session_id":"s_abc","video_url":"https://cdn.example.com/v/s_abc.html","duration_ms":42500} ``` ### task.failed (**terminal**) -Task failed permanently. +Task failed permanently within the stream. Always the last event emitted on failure. | Field | Type | Description | |-------|------|-------------| -| `failed_stage` | string | Stage that caused the failure | -| `error_code` | string | Error code | -| `error_message` | string | Human-readable error | -| `retryable` | bool | If `true` → exit code 4 (retry). If `false` → exit code 5 (give up). | +| `code` | string | Stable error code (e.g. `insufficient_credits`, `script_invalid`, `concurrent_work_limit`, `rate_limited`, `internal_error`, `business_error`). Empty when the backend emitted a plain `error` SSE event with no envelope code. | +| `message` | string | Human-readable failure reason from the backend | +| `retryable` | boolean | `true` if re-running the same `create` command is likely to succeed (transient codes: `rate_limited`, `internal_error`, `concurrent_work_limit`). `false` for permanent codes (e.g. `script_invalid`, `insufficient_credits`) and for any failure where the CLI cannot prove a retry would help. Determines exit code: `true` → exit 4, `false` → exit 5. | -### task.cancelled (**terminal**) +The `retryable` flag is **derived on the CLI side** from `code` — the backend does not currently include one on its terminal error event. The mapping is intentionally conservative: when in doubt, `retryable` is `false`. -Task was cancelled. - -| Field | Type | Description | -|-------|------|-------------| -| `cancelled_by` | string | Who cancelled | +```json +{"schema_version":"1","ts":"2026-05-15T10:05:00.000Z","type":"task.failed","code":"insufficient_credits","message":"积分不足","retryable":false} +``` ## Terminal Events -After receiving `task.succeeded`, `task.failed`, or `task.cancelled`, the CLI closes the stream and exits. No more events will follow. +After receiving `task.succeeded` or `task.failed`, the CLI closes the stream and exits. No more events will follow. + +`task.cancelled` is not currently emitted — SIGINT terminates the process with exit 130 without producing a terminal NDJSON event. ## Parsing Example (bash + jq) ```bash vibeknow create --from doc.pdf --output ndjson | while IFS= read -r line; do - event=$(echo "$line" | jq -r '.event') - case "$event" in + type=$(echo "$line" | jq -r '.type') + case "$type" in task.succeeded) - url=$(echo "$line" | jq -r '.video_url') - echo "Video ready: $url" + url=$(echo "$line" | jq -r '.video_url // empty') + [ -n "$url" ] && echo "Video ready: $url" ;; task.failed) - msg=$(echo "$line" | jq -r '.error_message') - echo "Failed: $msg" >&2 + msg=$(echo "$line" | jq -r '.message') + retryable=$(echo "$line" | jq -r '.retryable') + echo "Failed (retryable=$retryable): $msg" >&2 ;; - stage.progress) - pct=$(echo "$line" | jq -r '.percent') + node.started|node.succeeded|node.failed) stage=$(echo "$line" | jq -r '.stage') - echo "[$stage] $pct%" >&2 + node=$(echo "$line" | jq -r '.node') + echo "[$stage] $type: $node" >&2 + ;; + node.progress) + msg=$(echo "$line" | jq -r '.message') + echo "[agent] $msg" >&2 ;; esac done ``` + +## Field Stability + +`type`, `schema_version`, `ts`, `session_id`, `code`, and `message` are stable: existing values won't be renamed within schema version `"1"`. New event types and new fields may be added without bumping the schema version — consumers must ignore unknown fields and unknown `type` values rather than treat them as errors. + +A future schema version bump (to `"2"`) will be reserved for breaking changes such as renaming `code` → `error_code` or replacing `node.*` with `stage.*`. diff --git a/skills/vibeknow-doc/SKILL.md b/skills/vibeknow-doc/SKILL.md index 195eaf9..588d69b 100644 --- a/skills/vibeknow-doc/SKILL.md +++ b/skills/vibeknow-doc/SKILL.md @@ -1,6 +1,6 @@ --- name: vibeknow-doc -version: 0.6.2 +version: 0.6.3 description: "Upload documents to vectoria and check processing status. Use when: user wants to upload a document, check if a document is ready, or get a doc_id for use with vibeknow create." metadata: requires: diff --git a/tests/integration/create_ndjson_test.go b/tests/integration/create_ndjson_test.go new file mode 100644 index 0000000..e4d2b56 --- /dev/null +++ b/tests/integration/create_ndjson_test.go @@ -0,0 +1,145 @@ +package integration + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// Pins the end-to-end shape of `vk create --output ndjson` so the +// task.succeeded terminal event carries video_url + duration_ms from the +// backend aim_result payload all the way to stdout. Regression-prevents +// the long-standing bug where agent consumers could not get the video +// URL out of NDJSON because the CLI only forwarded session_id. +func TestCreateNDJSON_TaskSucceededIncludesVideoURLAndDuration(t *testing.T) { + if testing.Short() { + t.Skip("integration test") + } + + figlens := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/tasks/init": + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "data": map[string]any{"task_id": 7, "session_id": "s_ndj", "work_id": 8, "v": 3}, + }) + case "/v1/agent3forVideo/stream": + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + for _, e := range []string{ + `data: {"code":200,"data":{"type":"aim_result","session_id":"s_ndj","html_path":"https://cdn.example.com/v/s_ndj.html","data":{"duration_ms":30000}}}`, + `data: [DONE]`, + } { + fmt.Fprintln(w, e) + fmt.Fprintln(w) + if flusher != nil { + flusher.Flush() + } + } + case "/v1/works/detailBySession": + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "data": map[string]any{"id": 8, "title": "t", "video_path": "/x.mp4", "duration": 30, "share_token": "tok"}, + }) + default: + w.WriteHeader(404) + } + })) + defer figlens.Close() + + configHome := buildProfile(t, map[string]string{"figlens": figlens.URL}) + stdout, stderr, code := runVideoCmd(t, build(t), configHome, + "create", "--from", "doc_smoke12345", "--output", "ndjson") + + if code != 0 { + t.Fatalf("exit=%d stderr=%s", code, stderr) + } + + succeeded := findEvent(t, stdout, "task.succeeded") + if succeeded["video_url"] != "https://cdn.example.com/v/s_ndj.html" { + t.Errorf("video_url = %v, want backend html_path", succeeded["video_url"]) + } + if got, want := jsonNumberInt(t, succeeded["duration_ms"]), int64(30000); got != want { + t.Errorf("duration_ms = %d, want %d", got, want) + } +} + +// Pins task.failed shape: retryable must be present so consumers can +// branch on it, and the CLI must select exit 4 (retryable) instead of 5 +// when the backend code is one we map as transient. concurrent_work_limit +// (100003) is the production canary for that path. +func TestCreateNDJSON_TaskFailedRetryableExitsFour(t *testing.T) { + if testing.Short() { + t.Skip("integration test") + } + + figlens := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/tasks/init": + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "data": map[string]any{"task_id": 9, "session_id": "s_retry", "work_id": 10, "v": 3}, + }) + case "/v1/agent3forVideo/stream": + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + fmt.Fprintln(w, `data: {"code":100003,"data":{"message":"too many concurrent works"}}`) + fmt.Fprintln(w) + if flusher != nil { + flusher.Flush() + } + default: + w.WriteHeader(404) + } + })) + defer figlens.Close() + + configHome := buildProfile(t, map[string]string{"figlens": figlens.URL}) + stdout, stderr, code := runVideoCmd(t, build(t), configHome, + "create", "--from", "doc_smoke12345", "--output", "ndjson") + + if code != 4 { + t.Fatalf("exit=%d want 4 (retryable). stderr=%s", code, stderr) + } + + failed := findEvent(t, stdout, "task.failed") + if failed["code"] != "concurrent_work_limit" { + t.Errorf("code = %v, want concurrent_work_limit", failed["code"]) + } + if failed["retryable"] != true { + t.Errorf("retryable = %v, want true", failed["retryable"]) + } +} + +func findEvent(t *testing.T, stdout, typ string) map[string]any { + t.Helper() + sc := bufio.NewScanner(strings.NewReader(stdout)) + for sc.Scan() { + var m map[string]any + if err := json.Unmarshal(sc.Bytes(), &m); err != nil { + continue + } + if m["type"] == typ { + return m + } + } + t.Fatalf("event %q not found in stdout:\n%s", typ, stdout) + return nil +} + +// json.Unmarshal decodes numbers into float64 by default; cast safely. +func jsonNumberInt(t *testing.T, v any) int64 { + t.Helper() + f, ok := v.(float64) + if !ok { + t.Fatalf("expected numeric value, got %T (%v)", v, v) + } + return int64(f) +} From dd871b791696546256728de7a278d549894b7435 Mon Sep 17 00:00:00 2001 From: nullkey Date: Fri, 15 May 2026 13:44:27 +0800 Subject: [PATCH 2/3] feat(create): exit 4 + synthesized task.failed for pre-stream retryable errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the second half of the NDJSON/exit-code consistency gap surfaced by the 0.6.3 smoke. The SSE-side `task.failed` path already exits 4 on retryable codes after the prior commit, but `/v1/tasks/init` errors (InitTask) returning the same envelope code (`concurrent_work_limit`, `rate_limited`, `internal_error`) still exited 1 via cobra's default handler — same condition, different exit code is precisely the agent-confusing inconsistency the `retryable` flag exists to prevent. Also closes an NDJSON gap: pre-stream failures left stdout empty, forcing consumers to special-case "no terminal event implies it failed before the stream started". `vk create --output ndjson` now synthesizes one `task.failed` event on stdout for InitTask-time errors, sharing the wire shape with in-stream task.failed (via the same `code`/`message`/ `retryable` fields). The events.md note makes the synthesis explicit. Verified end-to-end against the beta backend: a real `concurrent_work_limit` returned by `/v1/tasks/init` now produces: - exit code 4 - stdout: `{"type":"task.failed","code":"concurrent_work_limit", "retryable":true,"message":"...","schema_version":"1","ts":"..."}` - stderr: clerr-rendered Chinese error line Integration test pins the contract with a httptest mux returning 100003 so the regression can never silently revert. Refs PR #9, follows the same retryable-inference taxonomy from httpclient.IsRetryableCode used by the SSE path. --- CHANGELOG.md | 18 +++++++ cmd/create.go | 54 +++++++++++++++++++++ skills/vibeknow-create/references/events.md | 2 +- tests/integration/create_credits_test.go | 48 ++++++++++++++++++ 4 files changed, 121 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dac51da..e218f7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,24 @@ `[ $? = 5 ]` to detect any task failure should switch to `[ $? -ge 4 ] && [ $? -le 5 ]` or branch on the NDJSON `retryable` field directly. +- HTTP `/v1/tasks/init` errors now share the same retryable exit-code + mapping as in-stream `task.failed` events. Previously a backend + envelope code of `concurrent_work_limit` / `rate_limited` / + `internal_error` returned by InitTask exited **1** (cobra default), + while the same code surfaced mid-stream exited **4** — same + condition, different exit code is precisely the agent-confusing + inconsistency the `retryable` flag exists to prevent. After this + patch both paths exit 4 and emit identical `task.failed` NDJSON + events when `--output ndjson` is set. Verified end-to-end against + the beta backend: a real `concurrent_work_limit` at InitTask now + produces exit 4 + a structured terminal event with + `retryable: true`. +- `vk create --output ndjson` now synthesizes a terminal + `task.failed` event on stdout for **pre-stream** failures (InitTask + errors). Previously a pre-stream failure left stdout empty, + forcing NDJSON consumers to special-case "no terminal event implies + it failed before the stream started". Every CLI exit ≠ 0 in NDJSON + mode now ships exactly one terminal `task.failed` line on stdout. ### Docs diff --git a/cmd/create.go b/cmd/create.go index 0aa7ded..837be2d 100644 --- a/cmd/create.go +++ b/cmd/create.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "os" "regexp" "strings" @@ -18,6 +19,7 @@ import ( "github.com/vibeknow/cli/internal/clerr" "github.com/vibeknow/cli/internal/cmdutil" "github.com/vibeknow/cli/internal/errs" + "github.com/vibeknow/cli/internal/httpclient" "github.com/vibeknow/cli/internal/i18n" "github.com/vibeknow/cli/internal/output" "github.com/vibeknow/cli/internal/video/exportpoll" @@ -168,6 +170,18 @@ var createCmd = &cobra.Command{ } task, err = fc.InitTask(ctx, initParams) if err != nil { + // NDJSON consumers expect exactly one terminal task.failed event + // for every failure regardless of where in the pipeline it + // happened. Without this synthesis, a pre-stream InitTask + // failure would leave stdout empty and force consumers to + // special-case "no terminal event implies it failed before + // the stream started" — they would have to read stderr to + // classify the failure, defeating the point of NDJSON output. + format, _ := cmd.Flags().GetString("output") + if format == "ndjson" { + emitPreStreamFailure(cmd.OutOrStdout(), err) + } + if errs.HasCode(err, "insufficient_credits") { // Mirror the stream-side path's exit code: business failure → 5. // os.Exit skips defers — clean up the orphan kb inline first. @@ -186,6 +200,19 @@ var createCmd = &cobra.Command{ } return clerr.Validation(err.Error()) } + // Retryable codes (rate_limited, internal_error, + // concurrent_work_limit): exit 4 so agent consumers can branch + // on "same command will probably succeed if I just wait". The + // in-stream task.failed path already does this; without this + // branch, an InitTask-time concurrent_work_limit would exit 1 + // while the same code mid-stream would exit 4 — same condition, + // different exit code is the exact agent-confusing inconsistency + // the retryable flag exists to prevent. The deferred orphan-kb + // cleanup above fires on the return path. + var o *errs.Object + if errors.As(err, &o) && httpclient.IsRetryableCode(o.Code) { + return clerr.Newf("%s", o.Message).WithCode(4) + } return err } // Past this point, `task != nil` and the backend task owns the kb; @@ -484,6 +511,33 @@ func uploadURL(ctx context.Context, url string) (string, string, error) { return kbID, docID, nil } +// emitPreStreamFailure writes a synthetic task.failed NDJSON event for +// errors raised before the SSE stream opens (InitTask, future pre-stream +// hooks). The wire shape mirrors the in-stream task.failed event emitted +// by StreamEvent.NDJSONFields so consumers don't have to special-case +// where the failure happened: every CLI exit ≠ 0 in `--output ndjson` +// mode ships exactly one terminal task.failed line on stdout. +// +// Implementation deliberately mirrors NDJSONFields manually instead of +// constructing a fake StreamEvent — the SSE path is the source of truth +// for the stream-side shape, and faking events into it would be a foot +// gun if NDJSONFields gains divergent semantics. +func emitPreStreamFailure(w io.Writer, err error) { + code := "" + msg := err.Error() + var o *errs.Object + if errors.As(err, &o) { + code = o.Code + msg = o.Message + } + _ = output.NewNDJSON(w).Event(map[string]any{ + "type": "task.failed", + "code": code, + "message": msg, + "retryable": httpclient.IsRetryableCode(code), + }) +} + // cleanupOrphanKB best-effort deletes a kb the CLI created when the // backend never claimed it. Errors are swallowed: hygiene, not correctness. func cleanupOrphanKB(kbID string) { diff --git a/skills/vibeknow-create/references/events.md b/skills/vibeknow-create/references/events.md index 9e88683..b4f5b9f 100644 --- a/skills/vibeknow-create/references/events.md +++ b/skills/vibeknow-create/references/events.md @@ -80,7 +80,7 @@ Video generation completed. Always the last event emitted on success. ### task.failed (**terminal**) -Task failed permanently within the stream. Always the last event emitted on failure. +Task failed permanently. Always the last event emitted on failure — either inside the SSE stream (`stage.failed`-like conditions, backend `error` event, business-code SSE envelope) or **synthesized by the CLI for pre-stream failures** (e.g. when `/v1/tasks/init` rejects with a business code before the SSE stream opens). Consumers should treat both sources identically; the wire shape is the same. | Field | Type | Description | |-------|------|-------------| diff --git a/tests/integration/create_credits_test.go b/tests/integration/create_credits_test.go index 0c5fa6d..d90e48b 100644 --- a/tests/integration/create_credits_test.go +++ b/tests/integration/create_credits_test.go @@ -10,6 +10,54 @@ import ( "testing" ) +// TestCreate_ConcurrentLimitOnInit_Exits4 covers the 0.6.3 fix: when +// InitTask returns code 100003 (concurrent_work_limit, a transient +// "wait then retry" condition), the CLI must exit 4 (retryable) — same +// as the SSE stream-side handling of the same code. Before this fix, +// the HTTP path returned exit 1 (cobra default) while the SSE path +// returned exit 4, producing an agent-confusing inconsistency where +// the same condition gave different exit codes depending on whether +// the backend rejected at HTTP-init time vs mid-stream. +// +// Also pins the NDJSON pre-stream synthesis: `--output ndjson` consumers +// see a terminal task.failed event on stdout even when failure happens +// before the SSE stream opens. +func TestCreate_ConcurrentLimitOnInit_Exits4(t *testing.T) { + if testing.Short() { + t.Skip("integration test") + } + + mux := http.NewServeMux() + mux.HandleFunc("/v1/tasks/init", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(map[string]any{ + "code": 100003, + "message": "concurrent work limit reached", + }) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + bin := build(t) + configHome := buildProfile(t, map[string]string{"figlens": srv.URL}) + + stdout, stderr, code := runVideoCmd(t, bin, configHome, + "create", "--from", "doc_concurrentlimit12345", "--output", "ndjson", + ) + + if code != 4 { + t.Fatalf("exit code = %d, want 4 (retryable). stderr: %s", code, stderr) + } + failed := findEvent(t, stdout, "task.failed") + if failed["code"] != "concurrent_work_limit" { + t.Errorf("ndjson code = %v, want concurrent_work_limit", failed["code"]) + } + if failed["retryable"] != true { + t.Errorf("ndjson retryable = %v, want true", failed["retryable"]) + } +} + // TestCreate_InsufficientCreditsOnInit_Exits5 covers the bug fixed in 0.5.1: // when the backend rejects InitTask with envelope code 100001 (insufficient // credits), the CLI must exit 5 (business failure) to match the stream-side From 150fbd575cfe9353b5ba4f10af8c3856b69b3760 Mon Sep 17 00:00:00 2001 From: nullkey Date: Fri, 15 May 2026 13:52:47 +0800 Subject: [PATCH 3/3] test(video/wait): pin exit-code parity with vk create MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `vk video wait` shares the in-stream task.failed → exit code mapping with `vk create` (script_invalid → 2, retryable → 4, otherwise 5), but that mapping was previously untested for wait specifically — only the NDJSONFields helper had coverage. A regression that hard-coded wait back to "always 5" would have slipped through the suite. Two integration tests, mirroring the existing create-side coverage: - retryable code (100003 concurrent_work_limit) → exit 4 + ndjson task.failed with retryable=true - script_invalid (100004) → exit 2 Both run against an httptest figlens server that emits the SSE envelope the real backend produces. --- tests/integration/video_wait_test.go | 77 ++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 tests/integration/video_wait_test.go diff --git a/tests/integration/video_wait_test.go b/tests/integration/video_wait_test.go new file mode 100644 index 0000000..32780d4 --- /dev/null +++ b/tests/integration/video_wait_test.go @@ -0,0 +1,77 @@ +package integration + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" +) + +// Pins the exit-code parity between `vk video wait` and `vk create`: +// when the stream's terminal task.failed carries retryable=true (mapped +// from a transient backend code like concurrent_work_limit), `vk video +// wait` must exit 4 — not 5 — so a downstream agent can reuse the same +// retry policy regardless of which command consumed the stream. Before +// 0.6.3, wait.go hard-coded exit 5 on any task.failed event. +func TestVideoWait_RetryableFailedExits4(t *testing.T) { + if testing.Short() { + t.Skip("integration test") + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // wait.go streams via /v1/agent3forVideo/stream by default. + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + fmt.Fprintln(w, `data: {"code":100003,"data":{"message":"too many concurrent works"}}`) + fmt.Fprintln(w) + if flusher != nil { + flusher.Flush() + } + })) + defer srv.Close() + + configHome := buildProfile(t, map[string]string{"figlens": srv.URL}) + stdout, stderr, code := runVideoCmd(t, build(t), configHome, + "video", "wait", "42", "--session-id", "s_wait_retry", "--output", "ndjson", + ) + + if code != 4 { + t.Fatalf("exit code = %d, want 4 (retryable). stderr: %s", code, stderr) + } + failed := findEvent(t, stdout, "task.failed") + if failed["code"] != "concurrent_work_limit" { + t.Errorf("ndjson code = %v, want concurrent_work_limit", failed["code"]) + } + if failed["retryable"] != true { + t.Errorf("ndjson retryable = %v, want true", failed["retryable"]) + } +} + +// `script_invalid` is an input error, not a task failure: re-running won't +// help. Both `vk create` and `vk video wait` exit 2 (validation), not 5, +// so a caller wrapping either command can branch the same way on bad input. +func TestVideoWait_ScriptInvalidExits2(t *testing.T) { + if testing.Short() { + t.Skip("integration test") + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + fmt.Fprintln(w, `data: {"code":100004,"data":{"message":"讲稿超过 8000 字"}}`) + fmt.Fprintln(w) + if flusher != nil { + flusher.Flush() + } + })) + defer srv.Close() + + configHome := buildProfile(t, map[string]string{"figlens": srv.URL}) + _, stderr, code := runVideoCmd(t, build(t), configHome, + "video", "wait", "42", "--session-id", "s_wait_script", + ) + + if code != 2 { + t.Fatalf("exit code = %d, want 2 (validation/script_invalid). stderr: %s", code, stderr) + } +}