Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions patches/opencode/0001-add-nemo-flow-integration.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/bun.lock b/bun.lock
index 35841622b..5c58f4a65 100644
index 35841622b..1f99d0b39 100644
--- a/bun.lock
+++ b/bun.lock
@@ -23,6 +23,9 @@
Expand Down Expand Up @@ -308,7 +308,7 @@ index 35841622b..5c58f4a65 100644

+ "typedoc/minimatch": ["minimatch@10.2.5", "", { "dependencies": { "brace-expansion": "^5.0.5" } }, "sha512-MULkVLfKGYDFYejP07QOurDLLQpcjk7Fw+7jXS2R2czRQzR56yHRveU5NDJEOviH+hETZKSkIk5c+T23GjFUMg=="],
+
+ "typedoc/yaml": ["yaml@2.8.3", "", { "bin": { "yaml": "bin.mjs" } }, "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg=="],
+ "typedoc/yaml": ["yaml@2.8.4", "", { "bin": { "yaml": "bin.mjs" } }, "sha512-ml/JPOj9fOQK8RNnWojA67GbZ0ApXAUlN2UQclwv2eVgTgn7O9gg9o7paZWKMp4g0H3nTLtS9LVzhkpOFIKzog=="],
+
"unifont/ofetch": ["ofetch@1.5.1", "", { "dependencies": { "destr": "^2.0.5", "node-fetch-native": "^1.6.7", "ufo": "^1.6.1" } }, "sha512-2W4oUZlVaqAPAil6FUg/difl6YhqhUR7x2eZY4bQCko22UXg3hptq9KLQdqFClV+Wu85UX7hNtdGTngi/1BxcA=="],

Expand Down Expand Up @@ -595,7 +595,7 @@ index 5bde2608f..b2b087e12 100644
await Plugin.trigger(
"tool.execute.after",
diff --git a/packages/opencode/src/tool/batch.ts b/packages/opencode/src/tool/batch.ts
index 00c22bfe6..2648c2243 100644
index 00c22bfe6..343816b57 100644
--- a/packages/opencode/src/tool/batch.ts
+++ b/packages/opencode/src/tool/batch.ts
@@ -2,6 +2,7 @@ import z from "zod"
Expand All @@ -617,24 +617,30 @@ index 00c22bfe6..2648c2243 100644
const attachments = result.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
@@ -130,7 +133,9 @@ export const BatchTool = Tool.define("batch", async () => {
@@ -130,7 +133,13 @@ export const BatchTool = Tool.define("batch", async () => {
}
}

- const results = await Promise.all(toolCalls.map((call) => executeCall(call)))
+ const batchScope = NemoFlow.pushFunctionScope("batch-parallel", NemoFlow.SCOPE_ATTR_PARALLEL)
const results = await Promise.all(toolCalls.map((call) => executeCall(call)))
+ NemoFlow.popScope(batchScope)
+ let results: Awaited<ReturnType<typeof executeCall>>[]
+ try {
+ results = await Promise.all(toolCalls.map((call) => executeCall(call)))
+ } finally {
+ NemoFlow.popScope(batchScope)
+ }

// Add discarded calls as errors
const now = Date.now()
diff --git a/packages/opencode/src/nemo_flow/index.ts b/packages/opencode/src/nemo_flow/index.ts
new file mode 100644
index 000000000..2cfe4d267
index 000000000..f8cd52955
--- /dev/null
+++ b/packages/opencode/src/nemo_flow/index.ts
@@ -0,0 +1,259 @@
@@ -0,0 +1,292 @@
+import { Log } from "../util/log"
+import { Flag } from "../flag/flag"
+import fsSync from "fs"
+
+const log = Log.create({ service: "nemo_flow" })
+
Expand All @@ -644,6 +650,16 @@ index 000000000..2cfe4d267
+let initDone = false
+
+const exporters = new Map<string, any>()
+let atofJsonlSubscriberName: string | null = null
+
+function toJsonSafe(value: any): any {
+ if (value === undefined) return null
+ try {
+ return JSON.parse(JSON.stringify(value))
+ } catch {
+ return null
+ }
+}
+
+export namespace NemoFlow {
+ export let SCOPE_ATTR_PARALLEL = 0
Expand Down Expand Up @@ -706,19 +722,21 @@ index 000000000..2cfe4d267
+ fn: (args: any) => Promise<T>,
+ ): Promise<T> {
+ if (!enabled || !lib) return fn(args)
+ let originalResult: T
+ let executed = false
+ try {
+ // Capture the original JS result to avoid the lossy NAPI JSON round-trip.
+ // Tool results may contain non-JSON types (class instances, LSP diagnostics)
+ // that don't survive serde_json serialization. NemoFlow still sees a JSON copy
+ // for its events/intercepts; the caller gets the original object.
+ let originalResult: T
+ await lib.toolCallExecuteAsync(name, args, async (a: any) => {
+ originalResult = await fn(a)
+ return originalResult
+ // Keep OpenCode's tool execution on its original JS values. The native
+ // observer only needs JSON-safe snapshots; handing it OpenCode-owned
+ // objects can make later structuredClone(part) calls fail.
+ await lib.toolCallExecuteAsync(name, toJsonSafe(args), async () => {
+ executed = true
+ originalResult = await fn(args)
+ return toJsonSafe(originalResult)
+ }, null, null, null, null)
+ return originalResult!
+ } catch (e) {
+ log.error("wrapToolExecute", { error: e })
+ if (executed) return originalResult!
+ return fn(args)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
willkill07 marked this conversation as resolved.
+ }
+ }
Expand Down Expand Up @@ -757,6 +775,7 @@ index 000000000..2cfe4d267
+ yield* streamFn()
+ return
+ }
+ let streamStarted = false
+ try {
+ const request = {
+ headers: {},
Expand Down Expand Up @@ -803,6 +822,7 @@ index 000000000..2cfe4d267
+ streamInput.model.providerID,
+ request,
+ (interceptedReq: any) => {
+ streamStarted = true
+ // Apply intercepted request changes back to streamInput before
+ // streamFn() reads it — streamFn captures streamInput by reference.
+ applyIntercepted(streamInput, interceptedReq?.content)
Expand Down Expand Up @@ -845,6 +865,7 @@ index 000000000..2cfe4d267
+ }
+ } catch (e) {
+ log.error("wrapLlmStream", { error: e })
+ if (streamStarted) throw e
+ yield* streamFn()
+ }
+ }
Expand Down Expand Up @@ -891,13 +912,30 @@ index 000000000..2cfe4d267
+ log.error("clearExporter", { error: e })
+ }
+ }
+
+ export function createAtOfJsonlExporter(filePath: string): boolean {
+ if (!enabled || !lib) return false
+ if (atofJsonlSubscriberName) return true
+ try {
+ const subscriberName = `atof-jsonl-${process.pid}`
+ lib.registerSubscriber(subscriberName, (event: any) => {
+ fsSync.appendFileSync(filePath, JSON.stringify(event) + "\n")
+ })
Comment thread
willkill07 marked this conversation as resolved.
+ atofJsonlSubscriberName = subscriberName
+ log.info("registered ATOF JSONL exporter", { path: filePath })
+ return true
+ } catch (e) {
+ log.error("createAtOfJsonlExporter", { error: e })
+ return false
+ }
+ }
+}
diff --git a/packages/opencode/src/plugin/nemo_flow.ts b/packages/opencode/src/plugin/nemo_flow.ts
new file mode 100644
index 000000000..c885bfae4
index 000000000..5659fdd37
--- /dev/null
+++ b/packages/opencode/src/plugin/nemo_flow.ts
@@ -0,0 +1,50 @@
@@ -0,0 +1,54 @@
+import type { Hooks, Plugin as PluginInstance } from "@opencode-ai/plugin"
+import { Config } from "../config/config"
+import { NemoFlow } from "../nemo_flow"
Expand All @@ -913,7 +951,11 @@ index 000000000..c885bfae4
+ const enabled = await NemoFlow.init({ nemo_flow: config.experimental?.nemo_flow })
+ if (!enabled) return {}
+
+ const atifDir = path.join(Global.Path.data, "atif")
+ const atofDir = process.env.NEMO_FLOW_ATOF_DIR ?? path.join(Global.Path.data, "atof")
+ await fs.mkdir(atofDir, { recursive: true })
+ NemoFlow.createAtOfJsonlExporter(path.join(atofDir, "events.jsonl"))
+
+ const atifDir = process.env.NEMO_FLOW_ATIF_DIR ?? path.join(Global.Path.data, "atif")
+ await fs.mkdir(atifDir, { recursive: true })
+
+ return {
Expand Down
25 changes: 20 additions & 5 deletions third_party/README-opencode.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ This directory contains the NeMo Flow integration patch for
`third_party/opencode`.

The patch adds optional NeMo Flow tracing, LLM stream wrapping, tool execution
wrapping, and ATIF export support to the opencode package. It depends on the
local NeMo Flow Node binding through a `file:` dependency that resolves from
`third_party/opencode/packages/opencode` back to `crates/node`.
wrapping, raw ATOF JSONL export, and optional direct ATIF export support to the
opencode package. The patch also wires opencode to the local NeMo Flow Node
package with an optional `file:` dependency so the patched workspace can load
`nemo-flow-node` when NeMo Flow tracing is enabled.

## Setup

Expand Down Expand Up @@ -63,8 +64,18 @@ Alternatively, enable the patched experimental config flag:
```

When enabled, opencode creates NeMo Flow scopes for agents and batched tool
execution, wraps LLM streams and tool calls, and exports ATIF trajectories under
the opencode data directory's `atif` subdirectory when a session becomes idle.
execution, wraps LLM streams and tool calls, and registers a raw ATOF JSONL
subscriber. Set `NEMO_FLOW_ATOF_DIR` to control where `events.jsonl` is written;
otherwise it defaults to the opencode data directory's `atof` subdirectory.

Direct ATIF export is optional comparison output. Set `NEMO_FLOW_ATIF_DIR` to
control where exported ATIF JSON files are written when a session becomes idle;
otherwise it defaults to the opencode data directory's `atif` subdirectory.

The tool wrapper keeps opencode's execution on original JavaScript values while
passing JSON-safe snapshots to the NeMo Flow native observer. This avoids
`structuredClone()` failures in opencode while still preserving NeMo Flow tool
events.

## Validation

Expand All @@ -80,3 +91,7 @@ Also rerun the patch applicability check from the NeMo Flow repository root:
```bash
./scripts/apply-patches.sh --check
```

For an end-to-end smoke, run an opencode task with `NEMO_FLOW_ENABLED=1` and
verify that the configured `NEMO_FLOW_ATOF_DIR` contains an `events.jsonl` file
with scope and tool/LLM events.