Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions lib/peer/ClaudeCodeDriver.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/**
* ClaudeCodeDriver — drives the local `claude` CLI for OrgX peer dispatch.
*
* The Claude Code CLI supports non-interactive one-shot mode via `-p` with a
* prompt argument, streaming JSON events on stdout when `--output-format
* stream-json` is set. The driver:
*
* 1. detect() — runs `claude --version` to confirm the CLI is on PATH
* and that auth is configured (the CLI errors if no
* subscription is active).
* 2. dispatch() — spawns `claude -p <prompt> --output-format stream-json
* --plugin-dir <plugin_dir>` in the task's repo_path,
* reads the NDJSON stream on stdout, yields PeerToServer
* messages as events land. Rule-based deviation checks
* run against each file-edit event.
* 3. cancel() — kills the spawned process (SIGTERM then SIGKILL after 3s).
* 4. probe() — cheap liveness check that runs `claude --version` with a
* short timeout.
*
* This driver is a *peer implementation detail*. The plugin peer runtime
* (see lib/peer/peer.mjs) wires it into @useorgx/orgx-gateway-sdk's PeerClient.
*/

import { spawn } from 'node:child_process';
import { fileURLToPath } from 'node:url';
import { dirname, resolve } from 'node:path';

const PLUGIN_ROOT = resolve(dirname(fileURLToPath(import.meta.url)), '..', '..');
const CANCEL_GRACE_MS = 3_000;

export class ClaudeCodeDriver {
id = 'claude_code';

constructor(opts = {}) {
this.opts = opts;
this.running = new Map(); // run_id → ChildProcess
}

async detect() {
try {
const out = await runOnce('claude', ['--version'], { timeoutMs: 5_000 });
return {
installed: true,
authenticated: !/not authenticated|subscription/i.test(out.stderr),
version: out.stdout.trim() || undefined,
subscription_active: true,
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (/ENOENT|not found/i.test(message)) {
return { installed: false, authenticated: false, error: message };
}
return { installed: true, authenticated: false, error: message };
}
}

async probe() {
try {
await runOnce('claude', ['--version'], { timeoutMs: 2_500 });
return { subscription_active: true, session_alive: true, queue_depth: this.running.size };
} catch {
return { subscription_active: false, session_alive: false, queue_depth: this.running.size };
}
}

async *dispatch(task, context) {
const prompt = renderPrompt(task);
const cwd = task.repo_path || process.cwd();
const args = [
'-p',
prompt,
'--output-format',
'stream-json',
'--plugin-dir',
this.opts.pluginDir ?? PLUGIN_ROOT,
];

const startedAt = new Date().toISOString();
const child = spawn('claude', args, {
cwd,
stdio: ['ignore', 'pipe', 'pipe'],
env: { ...process.env, ORGX_RUN_ID: context.run_id },
});
this.running.set(context.run_id, child);

yield { kind: 'task.started', run_id: context.run_id, started_at: startedAt };

const rules = (await (this.opts.skillRules?.() ?? Promise.resolve([]))).filter(Boolean);
const seen = new Set();
let firstResponseAt = null;
let tokensTotal = 0;

try {
for await (const line of readNdjson(child.stdout)) {
const event = safeParse(line);
if (!event || typeof event !== 'object') continue;

if (!firstResponseAt && (event.kind === 'tool_call' || event.kind === 'chat' || event.kind === 'file_edit')) {
firstResponseAt = new Date().toISOString();
}

if (event.kind === 'tokens_used') {
tokensTotal += Number(event.delta ?? 0);
continue;
}

if (event.kind === 'file_edit' || event.kind === 'tool_call') {
const summary =
event.kind === 'file_edit'
? `edit ${event.path} — ${event.summary ?? 'change'}`
: `call ${event.tool ?? 'tool'} — ${event.summary ?? ''}`;
yield {
kind: 'task.step',
run_id: context.run_id,
step: {
kind: event.kind,
summary,
evidence_ref: event.diff_ref ?? event.ref ?? null,
},
};

for (const rule of rules) {
if (rule.match?.on !== event.kind) continue;
const text =
event.kind === 'file_edit'
? `${event.path ?? ''} ${event.summary ?? ''}`
: `${event.tool ?? ''} ${event.summary ?? ''}`;
try {
if (!new RegExp(rule.match.pattern).test(text)) continue;
} catch {
continue;
}
const dedupe = `${rule.skill_id}:${rule.dedupe_fingerprint}:${context.run_id}`;
if (seen.has(dedupe)) continue;
seen.add(dedupe);
yield {
kind: 'task.deviation',
run_id: context.run_id,
skill_id: rule.skill_id,
evidence_kind: rule.evidence_kind,
evidence_ref: event.diff_ref ?? event.ref ?? event.path ?? event.tool ?? '',
dedupe_key: dedupe,
severity: 'warn',
};
}
}

if (event.kind === 'assistant_completed') {
tokensTotal = tokensTotal || Number(event.tokens_used ?? 0);
yield {
kind: 'task.completed',
run_id: context.run_id,
outcome_kind: 'shipped',
started_at: startedAt,
first_response_at: firstResponseAt ?? startedAt,
completed_at: new Date().toISOString(),
tokens_used: tokensTotal,
provider: 'anthropic',
source_sub_type: 'subscription',
source_driver: 'claude_code',
cost_estimate_cents: 0,
};
return;
}

if (event.kind === 'error') {
yield {
kind: 'task.failed',
run_id: context.run_id,
reason: event.message ?? 'claude errored',
recoverable: event.recoverable === true,
};
return;
}
}

// stdout closed without explicit completed → check exit code.
const exitCode = await waitExit(child);
if (exitCode === 0) {
yield {
kind: 'task.completed',
run_id: context.run_id,
outcome_kind: 'shipped',
started_at: startedAt,
first_response_at: firstResponseAt ?? startedAt,
completed_at: new Date().toISOString(),
tokens_used: tokensTotal,
provider: 'anthropic',
source_sub_type: 'subscription',
source_driver: 'claude_code',
cost_estimate_cents: 0,
};
} else {
yield {
kind: 'task.failed',
run_id: context.run_id,
reason: `claude exited ${exitCode}`,
recoverable: exitCode === null,
};
}
} finally {
this.running.delete(context.run_id);
}
}

async cancel(runId) {
const child = this.running.get(runId);
if (!child) return;
child.kill('SIGTERM');
setTimeout(() => {
if (!child.killed) child.kill('SIGKILL');
}, CANCEL_GRACE_MS).unref?.();
this.running.delete(runId);
}
}

// ───────── Helpers ─────────────────────────────────────────────────────────

function runOnce(cmd, args, opts = {}) {
const timeoutMs = opts.timeoutMs ?? 10_000;
return new Promise((resolveFn, rejectFn) => {
const child = spawn(cmd, args, { stdio: ['ignore', 'pipe', 'pipe'] });
let stdout = '';
let stderr = '';
const t = setTimeout(() => {
child.kill('SIGKILL');
rejectFn(new Error(`${cmd} timed out after ${timeoutMs}ms`));
}, timeoutMs);
child.stdout.on('data', (d) => {
stdout += d.toString('utf8');
});
child.stderr.on('data', (d) => {
stderr += d.toString('utf8');
});
child.on('error', (err) => {
clearTimeout(t);
rejectFn(err);
});
child.on('close', (code) => {
clearTimeout(t);
if (code === 0) resolveFn({ stdout, stderr });
else rejectFn(new Error(`${cmd} exited ${code}: ${stderr.slice(0, 300)}`));
});
});
}

async function* readNdjson(stream) {
let buffer = '';
for await (const chunk of stream) {
buffer += chunk.toString('utf8');
let nl;
while ((nl = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, nl).trim();
buffer = buffer.slice(nl + 1);
if (line) yield line;
}
}
if (buffer.trim()) yield buffer.trim();
}

function safeParse(s) {
try {
return JSON.parse(s);
} catch {
return null;
}
}

function waitExit(child) {
return new Promise((resolveFn) => {
if (child.exitCode !== null) resolveFn(child.exitCode);
else child.once('close', (code) => resolveFn(code));
});
}

function renderPrompt(task) {
const parts = [task.title];
if (task.description) parts.push('\n\n', task.description);
if (task.skill_ids?.length) {
parts.push('\n\nSkills to honor:\n');
for (const id of task.skill_ids) parts.push(` - ${id}\n`);
}
return parts.join('');
}
72 changes: 72 additions & 0 deletions lib/peer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# OrgX Peer Sidecar for Claude Code

The plugin under `@useorgx/claude-code-plugin` has always been a **Claude Code CLI plugin** (hooks + skills + commands + agents) you load with `claude --plugin-dir .`.

This folder adds a second shape: a **peer sidecar** that connects to OrgX server over WebSocket and dispatches to your local `claude` CLI on demand. That sidecar implements [Gateway Protocol v1](https://github.com/useorgx/orgx-gateway-sdk/blob/main/PROTOCOL.md) via the shared `@useorgx/orgx-gateway-sdk` package.

## Mental model

- The CLI plugin is **loaded by you** when you run Claude Code interactively.
- The peer sidecar is **loaded by the machine** (e.g. autostart on login) and listens for OrgX dispatches. When a task arrives, it runs `claude -p <prompt> --plugin-dir <this-plugin>` under the hood — which re-uses the same skills + hooks your interactive sessions use.

Both shapes share a codebase and the same skill catalog.

## Run it

```bash
# from the plugin root
ORGX_API_KEY=oxk_your_token_here \
ORGX_WORKSPACE_ID=<workspace-uuid> \
node lib/peer/cli.mjs
```

Or programmatically:

```js
import { startPeer } from '@useorgx/claude-code-plugin/peer';

const peer = await startPeer({
apiKey: process.env.ORGX_API_KEY,
workspaceId: process.env.ORGX_WORKSPACE_ID,
});

// later
await peer.stop();
```

## Required oxk_ scopes

- `gateway:drive` — accept task.dispatch / emit task.step + task.completed
- `plugin:heartbeat` — post `POST /api/v1/licenses/heartbeat` weekly

## Protocol

- On boot: calls `GET /api/v1/plan-skills` to pull skill rules the peer checks against file-edit / tool-call events. Matches emit `task.deviation` to the server.
- `claude` is invoked with `--output-format stream-json` so stdout is NDJSON events that the Driver translates into wire-protocol messages.
- Token usage is accumulated from `tokens_used` events emitted by Claude Code when present; `cost_estimate_cents` is set to 0 because subscription-backed dispatches don't carry a price — the server fills `saved_estimate_cents` later via the receipt aggregator.

## Peer lifecycle

```
startPeer()
├─ load plugin.manifest.json (unsigned in dev → 'degraded' in permissive mode)
├─ new PeerClient({ baseUrl: wss://useorgx.com, apiKey, workspaceId,
│ pluginId: '@useorgx/claude-code-plugin',
│ drivers: [new ClaudeCodeDriver(…)] })
├─ client.connect() // WebSocket + protocol handshake
├─ postHeartbeat() // initial license heartbeat
└─ setInterval(heartbeat, 7d) // keep status active
```

`client.disconnect()` on stop clears the heartbeat timer.

## Files

- `ClaudeCodeDriver.mjs` — Driver implementing Gateway Protocol v1. Spawns `claude`, reads NDJSON stdout, emits task.step / task.deviation / task.completed / task.failed.
- `peer.mjs` — `startPeer()` wires Driver into PeerClient + manages heartbeat.
- `cli.mjs` — shell entrypoint so `node lib/peer/cli.mjs` just works.
- `peer.test.mjs` — Node `node --test` unit coverage for the Driver (spawns a fake `claude` via a test fixture).

## Status

Alpha — lands alongside the other Sovereign Execution plugin peers (orgx-codex-plugin, orgx-opencode-plugin). See initiative [`993cabeb`](https://useorgx.com/live/993cabeb-8162-4f35-9b4d-3832df9d5f83).
35 changes: 35 additions & 0 deletions lib/peer/cli.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env node
/**
* CLI entrypoint: `orgx-claude-code-peer` — starts the plugin's peer
* sidecar so OrgX server can dispatch tasks to the user's local Claude
* Code session.
*/

import { startPeer } from './peer.mjs';

async function main() {
const apiKey = process.env.ORGX_API_KEY;
const workspaceId = process.env.ORGX_WORKSPACE_ID;
const baseUrl = process.env.ORGX_BASE_URL ?? 'https://useorgx.com';
if (!apiKey || !workspaceId) {
console.error('Missing ORGX_API_KEY and/or ORGX_WORKSPACE_ID. Export both and retry.');
process.exit(2);
}

const peer = await startPeer({ apiKey, workspaceId, baseUrl });
console.log(
'[orgx-claude-code-plugin] peer running — ctrl-c to stop. Dispatches arrive when OrgX sends them.'
);

const shutdown = async () => {
await peer.stop();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
}

main().catch((err) => {
console.error('[orgx-claude-code-plugin] fatal', err);
process.exit(1);
});
Loading
Loading