From 03c2f34fc449ca468ba37f493fce527f586a4679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Fri, 22 May 2026 13:47:34 +0000 Subject: [PATCH 1/2] fix(httpapi): encode event stream timestamps as epoch millis Effect `DateTime` values serialize through `toJSON()` as ISO 8601 strings. The `/event` and `/global/event` SSE handlers ran bus payloads through a plain `JSON.stringify`, so every `session.next.*` event sent its `timestamp` as a string. The OpenAPI spec declares `timestamp` as a number (the encoded form of `V2Schema.DateTimeUtcFromMillis`), so a strict typed client cannot decode any `session.next.*` event and the event stream dies on the first prompt. Encode `DateTime` values to epoch millis before serializing, matching the spec and every other timestamp in the API. --- .../routes/instance/httpapi/handlers/event.ts | 10 +------ .../instance/httpapi/handlers/global.ts | 10 +------ packages/opencode/src/server/shared/sse.ts | 26 +++++++++++++++++++ 3 files changed, 28 insertions(+), 18 deletions(-) create mode 100644 packages/opencode/src/server/shared/sse.ts diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts index e770a7cfba1a..647058ec4243 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts @@ -5,19 +5,11 @@ import * as Stream from "effect/Stream" import { HttpServerResponse } from "effect/unstable/http" import { HttpApiBuilder } from "effect/unstable/httpapi" import * as Sse from "effect/unstable/encoding/Sse" +import { eventData } from "@/server/shared/sse" import { EventApi } from "../groups/event" const log = Log.create({ service: "server" }) -function eventData(data: unknown): Sse.Event { - return { - _tag: "Event", - event: "message", - id: undefined, - data: JSON.stringify(data), - } -} - function eventResponse(bus: Bus.Interface) { return Effect.gen(function* () { // Subscribe eagerly: the bus subscription is acquired in the request scope diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/global.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/global.ts index f80869b64d3f..4e7d47079352 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/global.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/global.ts @@ -11,20 +11,12 @@ import * as Stream from "effect/Stream" import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { HttpApiBuilder } from "effect/unstable/httpapi" import * as Sse from "effect/unstable/encoding/Sse" +import { eventData } from "@/server/shared/sse" import { RootHttpApi } from "../api" import { GlobalUpgradeInput } from "../groups/global" const log = Log.create({ service: "server" }) -function eventData(data: unknown): Sse.Event { - return { - _tag: "Event", - event: "message", - id: undefined, - data: JSON.stringify(data), - } -} - function parseBody(body: string) { try { return JSON.parse(body || "{}") as unknown diff --git a/packages/opencode/src/server/shared/sse.ts b/packages/opencode/src/server/shared/sse.ts new file mode 100644 index 000000000000..9e3c1e9a7aba --- /dev/null +++ b/packages/opencode/src/server/shared/sse.ts @@ -0,0 +1,26 @@ +import { DateTime } from "effect" +import * as Sse from "effect/unstable/encoding/Sse" + +// Serialize a bus/global event payload into an SSE frame for the `/event` and +// `/global/event` streams. +export function eventData(data: unknown): Sse.Event { + return { + _tag: "Event", + event: "message", + id: undefined, + data: JSON.stringify(encodeDateTimes(data)), + } +} + +// `DateTime.toJSON()` emits ISO 8601 strings, but every event schema declares +// timestamps as epoch-millis numbers (`V2Schema.DateTimeUtcFromMillis`). Encode +// any `DateTime` to epoch millis so the wire form matches the OpenAPI spec. +// See https://github.com/anomalyco/opencode/issues/28847. +function encodeDateTimes(value: unknown): unknown { + if (DateTime.isDateTime(value)) return DateTime.toEpochMillis(value) + if (Array.isArray(value)) return value.map(encodeDateTimes) + if (typeof value === "object" && value !== null) { + return Object.fromEntries(Object.entries(value).map(([key, item]) => [key, encodeDateTimes(item)])) + } + return value +} From f8eb3de98717ef151fc295f32ada6f150d31225a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Fri, 22 May 2026 13:50:25 +0000 Subject: [PATCH 2/2] test(httpapi): cover event stream timestamp serialization Guard against regressing the `session.next.*` `timestamp` wire format. Tests assert epoch-millis numbers on the `/event` and `/global/event` SSE streams, and that `eventData` encodes nested `DateTime` values. --- .../server/httpapi-event-timestamp.test.ts | 160 ++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 packages/opencode/test/server/httpapi-event-timestamp.test.ts diff --git a/packages/opencode/test/server/httpapi-event-timestamp.test.ts b/packages/opencode/test/server/httpapi-event-timestamp.test.ts new file mode 100644 index 000000000000..7dd88b55ab0c --- /dev/null +++ b/packages/opencode/test/server/httpapi-event-timestamp.test.ts @@ -0,0 +1,160 @@ +// Regression coverage for https://github.com/anomalyco/opencode/issues/28847. +// +// `session.next.*` events carry their `timestamp` as an Effect `DateTime`. The +// OpenAPI spec declares `timestamp` as a number (epoch millis), so the SSE +// streams must encode it as a number, not the ISO 8601 string a plain +// `JSON.stringify` of a `DateTime` produces. +import { afterEach, describe, expect } from "bun:test" +import { DateTime, Effect } from "effect" +import * as Log from "@opencode-ai/core/util/log" +import { SessionEvent } from "@opencode-ai/core/session-event" +import { Bus } from "../../src/bus" +import { GlobalBus } from "../../src/bus/global" +import { Server } from "../../src/server/server" +import { eventData } from "../../src/server/shared/sse" +import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" +import { GlobalPaths } from "../../src/server/routes/instance/httpapi/groups/global" +import { resetDatabase } from "../fixture/db" +import { disposeAllInstances, TestInstance } from "../fixture/fixture" +import { testEffectShared } from "../lib/effect" + +void Log.init({ print: false }) + +const decoder = new TextDecoder() + +const readFrame = (reader: ReadableStreamDefaultReader) => + Effect.gen(function* () { + const result = yield* Effect.promise(() => reader.read()).pipe( + Effect.timeoutOrElse({ + duration: "5 seconds", + orElse: () => Effect.fail(new Error("timed out waiting for SSE frame")), + }), + ) + if (result.done || !result.value) return yield* Effect.fail(new Error("event stream closed")) + return JSON.parse( + decoder + .decode(result.value) + .replace(/^data: /, "") + .trim(), + ) as T + }) + +const waitFor = (predicate: () => boolean) => + Effect.gen(function* () { + const deadline = Date.now() + 2000 + while (!predicate()) { + if (Date.now() > deadline) return yield* Effect.fail(new Error("timed out waiting for condition")) + yield* Effect.sleep("5 millis") + } + }) + +const openStream = (path: string, headers: Record) => + Effect.gen(function* () { + const response = yield* Effect.promise(async () => Server.Default().app.request(path, { headers })) + if (!response.body) return yield* Effect.die("missing SSE response body") + const reader = response.body.getReader() + yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined))) + return reader + }) + +afterEach(async () => { + await disposeAllInstances() + await resetDatabase() +}) + +const it = testEffectShared(Bus.defaultLayer) + +// A `session.next.*` event is published as a bus event whose properties match +// the EventV2 `data` schema (including the `DateTime` timestamp). +const AgentSwitched = { + type: SessionEvent.AgentSwitched.type, + properties: SessionEvent.AgentSwitched.data, +} as const + +describe("event stream timestamp serialization", () => { + it.instance( + "/event sends session.next timestamp as epoch millis", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const reader = yield* openStream(EventPaths.event, { "x-opencode-directory": directory }) + expect(yield* readFrame(reader)).toMatchObject({ type: "server.connected" }) + + const now = Date.now() + yield* Bus.use.publish(AgentSwitched, { + sessionID: "ses_0000000000000000000000000000", + timestamp: DateTime.makeUnsafe(now), + agent: "build", + }) + + const event = yield* readFrame<{ type: string; properties: { timestamp: number } }>(reader) + expect(event.type).toBe("session.next.agent.switched") + expect(event.properties.timestamp).toBe(now) + expect(typeof event.properties.timestamp).toBe("number") + }), + { git: true, config: { formatter: false, lsp: false } }, + ) + + it.live("/global/event sends a nested sync timestamp as epoch millis", () => + Effect.gen(function* () { + const before = GlobalBus.listenerCount("event") + const reader = yield* openStream(GlobalPaths.event, {}) + expect(yield* readFrame(reader)).toMatchObject({ payload: { type: "server.connected" } }) + + // The handler registers its GlobalBus listener lazily, only once + // server.connected has been drained. Wait for it before emitting so the + // event is not dropped before anyone is listening. + yield* waitFor(() => GlobalBus.listenerCount("event") > before) + + const now = Date.now() + yield* Effect.sync(() => + GlobalBus.emit("event", { + directory: "/tmp/issue-28847", + payload: { + type: "sync", + syncEvent: { + type: "session.next.agent.switched.1", + id: "evt_0000000000000000000000000000", + seq: 0, + aggregateID: "ses_0000000000000000000000000000", + data: { + sessionID: "ses_0000000000000000000000000000", + timestamp: DateTime.makeUnsafe(now), + agent: "build", + }, + }, + }, + }), + ) + + const event = yield* readFrame<{ payload: { syncEvent: { data: { timestamp: number } } } }>(reader) + expect(event.payload.syncEvent.data.timestamp).toBe(now) + expect(typeof event.payload.syncEvent.data.timestamp).toBe("number") + }), + ) +}) + +describe("eventData", () => { + it.effect("encodes DateTime values to epoch millis everywhere in the payload", () => + Effect.sync(() => { + const now = Date.now() + const frame = eventData({ + timestamp: DateTime.makeUnsafe(now), + properties: { nested: DateTime.makeUnsafe(now) }, + list: [{ timestamp: DateTime.makeUnsafe(now) }], + untouched: { id: "evt_x", count: 1, flag: true, missing: null }, + }) + + const parsed = JSON.parse(frame.data) as { + timestamp: number + properties: { nested: number } + list: { timestamp: number }[] + untouched: unknown + } + expect(parsed.timestamp).toBe(now) + expect(parsed.properties.nested).toBe(now) + expect(parsed.list[0].timestamp).toBe(now) + expect(parsed.untouched).toEqual({ id: "evt_x", count: 1, flag: true, missing: null }) + }), + ) +})