Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions services/control-plane/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createServer } from "node:http";
import { createServer, type ServerResponse } from "node:http";
import { fileURLToPath } from "node:url";

import { dispatchJsonRpc } from "./json-rpc.ts";
Expand All @@ -9,6 +9,22 @@ interface ServerOptions {
port: number;
}

const jsonHeaders = {
"access-control-allow-headers": "content-type",
"access-control-allow-methods": "GET,POST,OPTIONS",
"access-control-allow-origin": "*",
"content-type": "application/json",
};

function jsonResult(response: ReturnType<typeof dispatchJsonRpc>): unknown {
return Array.isArray(response) ? response : response?.result ?? response;
}

function writeJson(res: ServerResponse, statusCode: number, body: unknown): void {
res.writeHead(statusCode, jsonHeaders);
res.end(`${JSON.stringify(body)}\n`);
}

function parseArgs(args: string[]): ServerOptions {
const options: ServerOptions = {
host: "127.0.0.1",
Expand Down Expand Up @@ -44,16 +60,26 @@ function parseArgs(args: string[]): ServerOptions {
export function startControlPlaneServer(options: ServerOptions): ReturnType<typeof createServer> {
const state = loadControlPlaneState();
const server = createServer((req, res) => {
if (req.method === "OPTIONS") {
res.writeHead(204, jsonHeaders);
res.end();
return;
}

if (req.method === "GET" && req.url === "/health") {
const response = dispatchJsonRpc({ jsonrpc: "2.0", id: "health", method: "health" }, { state });
res.writeHead(200, { "content-type": "application/json" });
res.end(`${JSON.stringify(Array.isArray(response) ? response : response?.result ?? response)}\n`);
writeJson(res, 200, jsonResult(response));
return;
}

if (req.method === "GET" && req.url === "/state") {
const response = dispatchJsonRpc({ jsonrpc: "2.0", id: "state", method: "devnet_state" }, { state });
writeJson(res, 200, jsonResult(response));
return;
}

if (req.method !== "POST" || req.url !== "/rpc") {
res.writeHead(404, { "content-type": "application/json" });
res.end(JSON.stringify({ error: "not found" }));
writeJson(res, 404, { error: "not found" });
return;
}

Expand All @@ -67,15 +93,13 @@ export function startControlPlaneServer(options: ServerOptions): ReturnType<type
const payload = JSON.parse(body) as unknown;
const response = dispatchJsonRpc(payload, { state });
if (response === undefined) {
res.writeHead(204);
res.writeHead(204, jsonHeaders);
res.end();
return;
}
res.writeHead(200, { "content-type": "application/json" });
res.end(`${JSON.stringify(response)}\n`);
writeJson(res, 200, response);
} catch (error) {
res.writeHead(400, { "content-type": "application/json" });
res.end(JSON.stringify({
writeJson(res, 400, {
jsonrpc: "2.0",
id: null,
error: {
Expand All @@ -87,7 +111,7 @@ export function startControlPlaneServer(options: ServerOptions): ReturnType<type
localOnly: true,
},
},
}));
});
}
});
});
Expand Down
38 changes: 38 additions & 0 deletions services/control-plane/test/control-plane.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import assert from "node:assert/strict";
import { once } from "node:events";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
Expand All @@ -11,6 +12,7 @@ import {
type RpcErrorResponse,
type RpcSuccessResponse,
} from "../src/index.ts";
import { startControlPlaneServer } from "../src/server.ts";
import { runControlPlaneSmoke } from "../src/smoke.ts";

test("dispatches JSON-RPC methods against local fixture state", () => {
Expand Down Expand Up @@ -170,3 +172,39 @@ test("smoke client queries the complete local lifecycle surface", () => {
assert.equal(smoke.methodCount, 31);
assert.ok((smoke.responseSchemas as string[]).includes("flowmemory.control_plane.raw_json.v0"));
});

test("HTTP server exposes browser-safe health and state endpoints", async () => {
const server = startControlPlaneServer({ host: "127.0.0.1", port: 0 });

try {
await once(server, "listening");
const address = server.address();
assert.equal(typeof address, "object");
assert.notEqual(address, null);
const port = address?.port;

const health = await fetch(`http://127.0.0.1:${port}/health`, {
headers: { Origin: "http://127.0.0.1:5173" },
});
assert.equal(health.status, 200);
assert.equal(health.headers.get("access-control-allow-origin"), "*");
assert.equal((await health.json()).status, "ok");

const state = await fetch(`http://127.0.0.1:${port}/state`, {
headers: { Origin: "http://127.0.0.1:5173" },
});
assert.equal(state.status, 200);
assert.equal(state.headers.get("access-control-allow-origin"), "*");
assert.equal((await state.json()).schema, "flowmemory.control_plane.devnet_state.v0");
} finally {
await new Promise<void>((resolve, reject) => {
server.close((error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
});
Loading