Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import path from "node:path";

import type {
OrchestrationReadModel,
ProviderKind,
ProviderRuntimeEvent,
ProviderSession,
} from "@t3tools/contracts";
Expand Down Expand Up @@ -56,7 +55,7 @@ const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value);
type LegacyProviderRuntimeEvent = {
readonly type: string;
readonly eventId: EventId;
readonly provider: ProviderKind;
readonly provider: ProviderRuntimeEvent["provider"];
readonly createdAt: string;
readonly threadId: ThreadId;
readonly turnId?: string | undefined;
Expand All @@ -66,6 +65,23 @@ type LegacyProviderRuntimeEvent = {
readonly [key: string]: unknown;
};

type LegacyTurnCompletedEvent = LegacyProviderRuntimeEvent & {
readonly type: "turn.completed";
readonly payload?: undefined;
readonly status: "completed" | "failed" | "interrupted" | "cancelled";
readonly errorMessage?: string | undefined;
};

function isLegacyTurnCompletedEvent(
event: LegacyProviderRuntimeEvent,
): event is LegacyTurnCompletedEvent {
return (
event.type === "turn.completed" &&
event.payload === undefined &&
typeof event.status === "string"
);
}

function createProviderServiceHarness() {
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
const runtimeSessions: ProviderSession[] = [];
Expand Down Expand Up @@ -93,8 +109,23 @@ function createProviderServiceHarness() {
runtimeSessions.push(session);
};

const normalizeLegacyEvent = (event: LegacyProviderRuntimeEvent): ProviderRuntimeEvent => {
if (isLegacyTurnCompletedEvent(event)) {
const normalized: Extract<ProviderRuntimeEvent, { type: "turn.completed" }> = {
...(event as Omit<Extract<ProviderRuntimeEvent, { type: "turn.completed" }>, "payload">),
payload: {
state: event.status,
...(typeof event.errorMessage === "string" ? { errorMessage: event.errorMessage } : {}),
},
};
return normalized;
}

return event as ProviderRuntimeEvent;
};

const emit = (event: LegacyProviderRuntimeEvent): void => {
Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent));
Effect.runSync(PubSub.publish(runtimeEventPubSub, normalizeLegacyEvent(event)));
};

return {
Expand Down Expand Up @@ -1695,6 +1726,37 @@ describe("ProviderRuntimeIngestion", () => {
expect(thread.session?.lastError).toBe("runtime exploded");
});

it("records runtime.error activities from the typed payload message", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "runtime.error",
eventId: asEventId("evt-runtime-error-activity"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-runtime-error-activity"),
payload: {
message: "runtime activity exploded",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.activities.some((activity) => activity.id === "evt-runtime-error-activity"),
);
const activity = thread.activities.find(
(entry: ProviderRuntimeTestActivity) => entry.id === "evt-runtime-error-activity",
);
const activityPayload =
activity?.payload && typeof activity.payload === "object"
? (activity.payload as Record<string, unknown>)
: undefined;

expect(activity?.kind).toBe("runtime.error");
expect(activityPayload?.message).toBe("runtime activity exploded");
});

it("keeps the session running when a runtime.warning arrives during an active turn", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
46 changes: 8 additions & 38 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ function proposedPlanIdFromEvent(event: ProviderRuntimeEvent, threadId: ThreadId
return `plan:${threadId}:event:${event.eventId}`;
}

function asString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}

function buildContextWindowActivityPayload(
event: ProviderRuntimeEvent,
): ThreadTokenUsageSnapshot | undefined {
Expand All @@ -111,14 +107,6 @@ function buildContextWindowActivityPayload(
return event.payload.usage;
}

function runtimePayloadRecord(event: ProviderRuntimeEvent): Record<string, unknown> | undefined {
const payload = (event as { payload?: unknown }).payload;
if (!payload || typeof payload !== "object") {
return undefined;
}
return payload as Record<string, unknown>;
}

function normalizeRuntimeTurnState(
value: string | undefined,
): "completed" | "failed" | "interrupted" | "cancelled" {
Expand All @@ -133,23 +121,6 @@ function normalizeRuntimeTurnState(
}
}

function runtimeTurnState(
event: ProviderRuntimeEvent,
): "completed" | "failed" | "interrupted" | "cancelled" {
const payloadState = asString(runtimePayloadRecord(event)?.state);
return normalizeRuntimeTurnState(payloadState);
}

function runtimeTurnErrorMessage(event: ProviderRuntimeEvent): string | undefined {
const payloadErrorMessage = asString(runtimePayloadRecord(event)?.errorMessage);
return payloadErrorMessage;
}

function runtimeErrorMessageFromEvent(event: ProviderRuntimeEvent): string | undefined {
const payloadMessage = asString(runtimePayloadRecord(event)?.message);
return payloadMessage;
}

function orchestrationSessionStatusFromRuntimeState(
state: "starting" | "running" | "waiting" | "ready" | "interrupted" | "stopped" | "error",
): "starting" | "running" | "ready" | "interrupted" | "stopped" | "error" {
Expand Down Expand Up @@ -253,10 +224,6 @@ function runtimeEventToActivities(
}

case "runtime.error": {
const message = runtimeErrorMessageFromEvent(event);
if (!message) {
return [];
}
return [
{
id: event.eventId,
Expand All @@ -265,7 +232,7 @@ function runtimeEventToActivities(
kind: "runtime.error",
summary: "Runtime error",
payload: {
message: truncateDetail(message),
message: truncateDetail(event.payload.message),
},
turnId: toTurnId(event.turnId) ?? null,
...maybeSequence,
Expand Down Expand Up @@ -973,7 +940,9 @@ const make = Effect.gen(function* () {
case "session.exited":
return "stopped";
case "turn.completed":
return runtimeTurnState(event) === "failed" ? "error" : "ready";
return normalizeRuntimeTurnState(event.payload.state) === "failed"
? "error"
: "ready";
case "session.started":
case "thread.started":
// Provider thread/session start notifications can arrive during an
Expand All @@ -984,8 +953,9 @@ const make = Effect.gen(function* () {
const lastError =
event.type === "session.state.changed" && event.payload.state === "error"
? (event.payload.reason ?? thread.session?.lastError ?? "Provider session error")
: event.type === "turn.completed" && runtimeTurnState(event) === "failed"
? (runtimeTurnErrorMessage(event) ?? thread.session?.lastError ?? "Turn failed")
: event.type === "turn.completed" &&
normalizeRuntimeTurnState(event.payload.state) === "failed"
? (event.payload.errorMessage ?? thread.session?.lastError ?? "Turn failed")
: status === "ready"
? null
: (thread.session?.lastError ?? null);
Expand Down Expand Up @@ -1176,7 +1146,7 @@ const make = Effect.gen(function* () {
}

if (event.type === "runtime.error") {
const runtimeErrorMessage = runtimeErrorMessageFromEvent(event) ?? "Provider runtime error";
const runtimeErrorMessage = event.payload.message;

const shouldApplyRuntimeError = !STRICT_PROVIDER_LIFECYCLE_GUARD
? true
Expand Down