Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@ import * as Stream from "effect/Stream"
import { HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import * as Sse from "effect/unstable/encoding/Sse"
import { eventData } from "@/server/shared/sse"
import { EventApi } from "../groups/event"

const log = Log.create({ service: "server" })

function eventData(data: unknown): Sse.Event {
return {
_tag: "Event",
event: "message",
id: undefined,
data: JSON.stringify(data),
}
}

function eventResponse(bus: Bus.Interface) {
return Effect.gen(function* () {
// Subscribe eagerly: the bus subscription is acquired in the request scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,12 @@ import * as Stream from "effect/Stream"
import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import * as Sse from "effect/unstable/encoding/Sse"
import { eventData } from "@/server/shared/sse"
import { RootHttpApi } from "../api"
import { GlobalUpgradeInput } from "../groups/global"

const log = Log.create({ service: "server" })

function eventData(data: unknown): Sse.Event {
return {
_tag: "Event",
event: "message",
id: undefined,
data: JSON.stringify(data),
}
}

function parseBody(body: string) {
try {
return JSON.parse(body || "{}") as unknown
Expand Down
26 changes: 26 additions & 0 deletions packages/opencode/src/server/shared/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { DateTime } from "effect"
import * as Sse from "effect/unstable/encoding/Sse"

// Serialize a bus/global event payload into an SSE frame for the `/event` and
// `/global/event` streams.
export function eventData(data: unknown): Sse.Event {
return {
_tag: "Event",
event: "message",
id: undefined,
data: JSON.stringify(encodeDateTimes(data)),
}
}

// `DateTime.toJSON()` emits ISO 8601 strings, but every event schema declares
// timestamps as epoch-millis numbers (`V2Schema.DateTimeUtcFromMillis`). Encode
// any `DateTime` to epoch millis so the wire form matches the OpenAPI spec.
// See https://github.com/anomalyco/opencode/issues/28847.
function encodeDateTimes(value: unknown): unknown {
if (DateTime.isDateTime(value)) return DateTime.toEpochMillis(value)
if (Array.isArray(value)) return value.map(encodeDateTimes)
if (typeof value === "object" && value !== null) {
return Object.fromEntries(Object.entries(value).map(([key, item]) => [key, encodeDateTimes(item)]))
}
return value
}
160 changes: 160 additions & 0 deletions packages/opencode/test/server/httpapi-event-timestamp.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Regression coverage for https://github.com/anomalyco/opencode/issues/28847.
//
// `session.next.*` events carry their `timestamp` as an Effect `DateTime`. The
// OpenAPI spec declares `timestamp` as a number (epoch millis), so the SSE
// streams must encode it as a number, not the ISO 8601 string a plain
// `JSON.stringify` of a `DateTime` produces.
import { afterEach, describe, expect } from "bun:test"
import { DateTime, Effect } from "effect"
import * as Log from "@opencode-ai/core/util/log"
import { SessionEvent } from "@opencode-ai/core/session-event"
import { Bus } from "../../src/bus"
import { GlobalBus } from "../../src/bus/global"
import { Server } from "../../src/server/server"
import { eventData } from "../../src/server/shared/sse"
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
import { GlobalPaths } from "../../src/server/routes/instance/httpapi/groups/global"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, TestInstance } from "../fixture/fixture"
import { testEffectShared } from "../lib/effect"

void Log.init({ print: false })

const decoder = new TextDecoder()

const readFrame = <T = unknown>(reader: ReadableStreamDefaultReader<Uint8Array>) =>
Effect.gen(function* () {
const result = yield* Effect.promise(() => reader.read()).pipe(
Effect.timeoutOrElse({
duration: "5 seconds",
orElse: () => Effect.fail(new Error("timed out waiting for SSE frame")),
}),
)
if (result.done || !result.value) return yield* Effect.fail(new Error("event stream closed"))
return JSON.parse(
decoder
.decode(result.value)
.replace(/^data: /, "")
.trim(),
) as T
})

const waitFor = (predicate: () => boolean) =>
Effect.gen(function* () {
const deadline = Date.now() + 2000
while (!predicate()) {
if (Date.now() > deadline) return yield* Effect.fail(new Error("timed out waiting for condition"))
yield* Effect.sleep("5 millis")
}
})

const openStream = (path: string, headers: Record<string, string>) =>
Effect.gen(function* () {
const response = yield* Effect.promise(async () => Server.Default().app.request(path, { headers }))
if (!response.body) return yield* Effect.die("missing SSE response body")
const reader = response.body.getReader()
yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined)))
return reader
})

afterEach(async () => {
await disposeAllInstances()
await resetDatabase()
})

const it = testEffectShared(Bus.defaultLayer)

// A `session.next.*` event is published as a bus event whose properties match
// the EventV2 `data` schema (including the `DateTime` timestamp).
const AgentSwitched = {
type: SessionEvent.AgentSwitched.type,
properties: SessionEvent.AgentSwitched.data,
} as const

describe("event stream timestamp serialization", () => {
it.instance(
"/event sends session.next timestamp as epoch millis",
() =>
Effect.gen(function* () {
const { directory } = yield* TestInstance
const reader = yield* openStream(EventPaths.event, { "x-opencode-directory": directory })
expect(yield* readFrame(reader)).toMatchObject({ type: "server.connected" })

const now = Date.now()
yield* Bus.use.publish(AgentSwitched, {
sessionID: "ses_0000000000000000000000000000",
timestamp: DateTime.makeUnsafe(now),
agent: "build",
})

const event = yield* readFrame<{ type: string; properties: { timestamp: number } }>(reader)
expect(event.type).toBe("session.next.agent.switched")
expect(event.properties.timestamp).toBe(now)
expect(typeof event.properties.timestamp).toBe("number")
}),
{ git: true, config: { formatter: false, lsp: false } },
)

it.live("/global/event sends a nested sync timestamp as epoch millis", () =>
Effect.gen(function* () {
const before = GlobalBus.listenerCount("event")
const reader = yield* openStream(GlobalPaths.event, {})
expect(yield* readFrame(reader)).toMatchObject({ payload: { type: "server.connected" } })

// The handler registers its GlobalBus listener lazily, only once
// server.connected has been drained. Wait for it before emitting so the
// event is not dropped before anyone is listening.
yield* waitFor(() => GlobalBus.listenerCount("event") > before)

const now = Date.now()
yield* Effect.sync(() =>
GlobalBus.emit("event", {
directory: "/tmp/issue-28847",
payload: {
type: "sync",
syncEvent: {
type: "session.next.agent.switched.1",
id: "evt_0000000000000000000000000000",
seq: 0,
aggregateID: "ses_0000000000000000000000000000",
data: {
sessionID: "ses_0000000000000000000000000000",
timestamp: DateTime.makeUnsafe(now),
agent: "build",
},
},
},
}),
)

const event = yield* readFrame<{ payload: { syncEvent: { data: { timestamp: number } } } }>(reader)
expect(event.payload.syncEvent.data.timestamp).toBe(now)
expect(typeof event.payload.syncEvent.data.timestamp).toBe("number")
}),
)
})

describe("eventData", () => {
it.effect("encodes DateTime values to epoch millis everywhere in the payload", () =>
Effect.sync(() => {
const now = Date.now()
const frame = eventData({
timestamp: DateTime.makeUnsafe(now),
properties: { nested: DateTime.makeUnsafe(now) },
list: [{ timestamp: DateTime.makeUnsafe(now) }],
untouched: { id: "evt_x", count: 1, flag: true, missing: null },
})

const parsed = JSON.parse(frame.data) as {
timestamp: number
properties: { nested: number }
list: { timestamp: number }[]
untouched: unknown
}
expect(parsed.timestamp).toBe(now)
expect(parsed.properties.nested).toBe(now)
expect(parsed.list[0].timestamp).toBe(now)
expect(parsed.untouched).toEqual({ id: "evt_x", count: 1, flag: true, missing: null })
}),
)
})
Loading