From a46f2911fd7ac0028fb3591bd174fbd7a5842064 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Fri, 27 Mar 2026 16:42:41 -0700 Subject: [PATCH 1/2] Normalize typed runtime events in provider ingestion - Read `turn.completed` and `runtime.error` data from typed payloads - Preserve runtime error activities when payload message is present - Add regression coverage for runtime.error activity ingestion --- .../Layers/ProviderRuntimeIngestion.test.ts | 51 ++++++++++++++++++- .../Layers/ProviderRuntimeIngestion.ts | 46 +++-------------- 2 files changed, 58 insertions(+), 39 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 8d205bbe2f..9e615762c3 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -93,8 +93,26 @@ function createProviderServiceHarness() { runtimeSessions.push(session); }; + const normalizeLegacyEvent = (event: LegacyProviderRuntimeEvent): ProviderRuntimeEvent => { + if ( + event.type === "turn.completed" && + event.payload === undefined && + typeof event.status === "string" + ) { + return { + ...(event as unknown as ProviderRuntimeEvent), + payload: { + state: event.status, + ...(typeof event.errorMessage === "string" ? { errorMessage: event.errorMessage } : {}), + }, + } as ProviderRuntimeEvent; + } + + return event as unknown as ProviderRuntimeEvent; + }; + const emit = (event: LegacyProviderRuntimeEvent): void => { - Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent)); + Effect.runSync(PubSub.publish(runtimeEventPubSub, normalizeLegacyEvent(event))); }; return { @@ -1695,6 +1713,37 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBe("runtime exploded"); }); + it("records runtime.error activities from the typed payload message", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "runtime.error", + eventId: asEventId("evt-runtime-error-activity"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-runtime-error-activity"), + payload: { + message: "runtime activity exploded", + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some((activity) => activity.id === "evt-runtime-error-activity"), + ); + const activity = thread.activities.find( + (entry: ProviderRuntimeTestActivity) => entry.id === "evt-runtime-error-activity", + ); + const activityPayload = + activity?.payload && typeof activity.payload === "object" + ? (activity.payload as Record) + : undefined; + + expect(activity?.kind).toBe("runtime.error"); + expect(activityPayload?.message).toBe("runtime activity exploded"); + }); + it("keeps the session running when a runtime.warning arrives during an active turn", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index f9a662b84f..b42e5f1566 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -98,10 +98,6 @@ function proposedPlanIdFromEvent(event: ProviderRuntimeEvent, threadId: ThreadId return `plan:${threadId}:event:${event.eventId}`; } -function asString(value: unknown): string | undefined { - return typeof value === "string" ? value : undefined; -} - function buildContextWindowActivityPayload( event: ProviderRuntimeEvent, ): ThreadTokenUsageSnapshot | undefined { @@ -111,14 +107,6 @@ function buildContextWindowActivityPayload( return event.payload.usage; } -function runtimePayloadRecord(event: ProviderRuntimeEvent): Record | undefined { - const payload = (event as { payload?: unknown }).payload; - if (!payload || typeof payload !== "object") { - return undefined; - } - return payload as Record; -} - function normalizeRuntimeTurnState( value: string | undefined, ): "completed" | "failed" | "interrupted" | "cancelled" { @@ -133,23 +121,6 @@ function normalizeRuntimeTurnState( } } -function runtimeTurnState( - event: ProviderRuntimeEvent, -): "completed" | "failed" | "interrupted" | "cancelled" { - const payloadState = asString(runtimePayloadRecord(event)?.state); - return normalizeRuntimeTurnState(payloadState); -} - -function runtimeTurnErrorMessage(event: ProviderRuntimeEvent): string | undefined { - const payloadErrorMessage = asString(runtimePayloadRecord(event)?.errorMessage); - return payloadErrorMessage; -} - -function runtimeErrorMessageFromEvent(event: ProviderRuntimeEvent): string | undefined { - const payloadMessage = asString(runtimePayloadRecord(event)?.message); - return payloadMessage; -} - function orchestrationSessionStatusFromRuntimeState( state: "starting" | "running" | "waiting" | "ready" | "interrupted" | "stopped" | "error", ): "starting" | "running" | "ready" | "interrupted" | "stopped" | "error" { @@ -253,10 +224,6 @@ function runtimeEventToActivities( } case "runtime.error": { - const message = runtimeErrorMessageFromEvent(event); - if (!message) { - return []; - } return [ { id: event.eventId, @@ -265,7 +232,7 @@ function runtimeEventToActivities( kind: "runtime.error", summary: "Runtime error", payload: { - message: truncateDetail(message), + message: truncateDetail(event.payload.message), }, turnId: toTurnId(event.turnId) ?? null, ...maybeSequence, @@ -973,7 +940,9 @@ const make = Effect.gen(function* () { case "session.exited": return "stopped"; case "turn.completed": - return runtimeTurnState(event) === "failed" ? "error" : "ready"; + return normalizeRuntimeTurnState(event.payload.state) === "failed" + ? "error" + : "ready"; case "session.started": case "thread.started": // Provider thread/session start notifications can arrive during an @@ -984,8 +953,9 @@ const make = Effect.gen(function* () { const lastError = event.type === "session.state.changed" && event.payload.state === "error" ? (event.payload.reason ?? thread.session?.lastError ?? "Provider session error") - : event.type === "turn.completed" && runtimeTurnState(event) === "failed" - ? (runtimeTurnErrorMessage(event) ?? thread.session?.lastError ?? "Turn failed") + : event.type === "turn.completed" && + normalizeRuntimeTurnState(event.payload.state) === "failed" + ? (event.payload.errorMessage ?? thread.session?.lastError ?? "Turn failed") : status === "ready" ? null : (thread.session?.lastError ?? null); @@ -1176,7 +1146,7 @@ const make = Effect.gen(function* () { } if (event.type === "runtime.error") { - const runtimeErrorMessage = runtimeErrorMessageFromEvent(event) ?? "Provider runtime error"; + const runtimeErrorMessage = event.payload.message; const shouldApplyRuntimeError = !STRICT_PROVIDER_LIFECYCLE_GUARD ? true From 677441518b9517bcdf5fe23e7405bf369d73b860 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Fri, 27 Mar 2026 16:56:50 -0700 Subject: [PATCH 2/2] Type legacy runtime turn.completed events - add a type guard for legacy `turn.completed` runtime events - normalize legacy payloads with typed `ProviderRuntimeEvent` shapes --- .../Layers/ProviderRuntimeIngestion.test.ts | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 9e615762c3..3eaeb2cd1d 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -4,7 +4,6 @@ import path from "node:path"; import type { OrchestrationReadModel, - ProviderKind, ProviderRuntimeEvent, ProviderSession, } from "@t3tools/contracts"; @@ -56,7 +55,7 @@ const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value); type LegacyProviderRuntimeEvent = { readonly type: string; readonly eventId: EventId; - readonly provider: ProviderKind; + readonly provider: ProviderRuntimeEvent["provider"]; readonly createdAt: string; readonly threadId: ThreadId; readonly turnId?: string | undefined; @@ -66,6 +65,23 @@ type LegacyProviderRuntimeEvent = { readonly [key: string]: unknown; }; +type LegacyTurnCompletedEvent = LegacyProviderRuntimeEvent & { + readonly type: "turn.completed"; + readonly payload?: undefined; + readonly status: "completed" | "failed" | "interrupted" | "cancelled"; + readonly errorMessage?: string | undefined; +}; + +function isLegacyTurnCompletedEvent( + event: LegacyProviderRuntimeEvent, +): event is LegacyTurnCompletedEvent { + return ( + event.type === "turn.completed" && + event.payload === undefined && + typeof event.status === "string" + ); +} + function createProviderServiceHarness() { const runtimeEventPubSub = Effect.runSync(PubSub.unbounded()); const runtimeSessions: ProviderSession[] = []; @@ -94,21 +110,18 @@ function createProviderServiceHarness() { }; const normalizeLegacyEvent = (event: LegacyProviderRuntimeEvent): ProviderRuntimeEvent => { - if ( - event.type === "turn.completed" && - event.payload === undefined && - typeof event.status === "string" - ) { - return { - ...(event as unknown as ProviderRuntimeEvent), + if (isLegacyTurnCompletedEvent(event)) { + const normalized: Extract = { + ...(event as Omit, "payload">), payload: { state: event.status, ...(typeof event.errorMessage === "string" ? { errorMessage: event.errorMessage } : {}), }, - } as ProviderRuntimeEvent; + }; + return normalized; } - return event as unknown as ProviderRuntimeEvent; + return event as ProviderRuntimeEvent; }; const emit = (event: LegacyProviderRuntimeEvent): void => {