Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions packages/lplex/src/cloud.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { Client, type ClientOptions } from "./client.js";
import { HttpError } from "./errors.js";
import type {
InstanceStatus,
InstanceSummary,
ReplicationEvent,
} from "./types.js";

type FetchFn = typeof globalThis.fetch;

export interface CloudClientOptions {
fetch?: FetchFn;
}

/**
* Client for the lplex-cloud management API.
*
* For per-instance data (devices, SSE), use {@link client} to get a
* regular {@link Client} scoped to that instance.
*/
export class CloudClient {
readonly #baseURL: string;
readonly #fetch: FetchFn;
readonly #fetchOpt: CloudClientOptions;

constructor(baseURL: string, options?: CloudClientOptions) {
this.#baseURL = baseURL.replace(/\/+$/, "");
this.#fetch = options?.fetch ?? globalThis.fetch.bind(globalThis);
this.#fetchOpt = options ?? {};
}

/**
* Returns a {@link Client} scoped to a specific instance.
* The returned client's `devices()`, `subscribe()`, etc. hit the
* cloud's per-instance endpoints.
*/
client(instanceId: string): Client {
const opts: ClientOptions = {};
if (this.#fetchOpt.fetch) opts.fetch = this.#fetchOpt.fetch;
return new Client(`${this.#baseURL}/instances/${instanceId}`, opts);
}

/** List all known instances. */
async instances(signal?: AbortSignal): Promise<InstanceSummary[]> {
const url = `${this.#baseURL}/instances`;
const resp = await this.#fetch(url, { signal });

if (!resp.ok) {
const body = await resp.text();
throw new HttpError("GET", url, resp.status, body);
}

const data = (await resp.json()) as { instances: InstanceSummary[] };
return data.instances;
}

/** Get detailed replication status for one instance. */
async status(
instanceId: string,
signal?: AbortSignal,
): Promise<InstanceStatus> {
const url = `${this.#baseURL}/instances/${instanceId}/status`;
const resp = await this.#fetch(url, { signal });

if (!resp.ok) {
const body = await resp.text();
throw new HttpError("GET", url, resp.status, body);
}

return resp.json() as Promise<InstanceStatus>;
}

/** Fetch recent replication diagnostic events for an instance. */
async replicationEvents(
instanceId: string,
limit?: number,
signal?: AbortSignal,
): Promise<ReplicationEvent[]> {
let url = `${this.#baseURL}/instances/${instanceId}/replication/events`;
if (limit !== undefined) url += `?limit=${limit}`;

const resp = await this.#fetch(url, { signal });

if (!resp.ok) {
const body = await resp.text();
throw new HttpError("GET", url, resp.status, body);
}

return resp.json() as Promise<ReplicationEvent[]>;
}
}
7 changes: 7 additions & 0 deletions packages/lplex/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export { Client } from "./client.js";
export type { ClientOptions } from "./client.js";
export { CloudClient } from "./cloud.js";
export type { CloudClientOptions } from "./cloud.js";
export { Session } from "./session.js";
export { LplexError, HttpError } from "./errors.js";
export type {
Expand All @@ -10,4 +12,9 @@ export type {
SessionConfig,
SessionInfo,
SendParams,
InstanceSummary,
InstanceStatus,
SeqRange,
ReplicationEvent,
ReplicationEventType,
} from "./types.js";
47 changes: 47 additions & 0 deletions packages/lplex/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,50 @@ export interface SendParams {
prio: number;
data: string;
}

// --- Cloud types ---

/** Summary of a cloud instance, returned by GET /instances. */
export interface InstanceSummary {
id: string;
connected: boolean;
cursor: number;
boat_head_seq: number;
holes: number;
lag_seqs: number;
last_seen: string;
}

/** A sequence range representing a gap in the replication stream. */
export interface SeqRange {
start: number;
end: number;
}

/** Detailed replication status for one instance. */
export interface InstanceStatus {
id: string;
connected: boolean;
cursor: number;
boat_head_seq: number;
boat_journal_bytes: number;
holes: SeqRange[];
lag_seqs: number;
last_seen: string;
}

/** Event types emitted by the replication pipeline. */
export type ReplicationEventType =
| "live_start"
| "live_stop"
| "backfill_start"
| "backfill_stop"
| "block_received"
| "checkpoint";

/** A single diagnostic event from the replication pipeline. */
export interface ReplicationEvent {
time: string;
type: ReplicationEventType;
detail?: Record<string, unknown>;
}
164 changes: 164 additions & 0 deletions packages/lplex/test/cloud.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { describe, expect, it } from "vitest";
import { CloudClient } from "../src/cloud.js";
import { HttpError } from "../src/errors.js";

function jsonResponse(data: unknown, status = 200): Response {
return new Response(JSON.stringify(data), {
status,
headers: { "Content-Type": "application/json" },
});
}

function errorResponse(status: number, body: string): Response {
return new Response(body, { status });
}

describe("CloudClient.instances", () => {
it("fetches the instance list", async () => {
const payload = {
instances: [
{
id: "boat-1",
connected: true,
cursor: 1000,
boat_head_seq: 1050,
holes: 2,
lag_seqs: 50,
last_seen: "2026-03-01T00:00:00Z",
},
],
};

const mockFetch = async (url: string | URL | Request) => {
expect(url).toBe("https://cloud.example.com/instances");
return jsonResponse(payload);
};

const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
const result = await client.instances();
expect(result).toHaveLength(1);
expect(result[0].id).toBe("boat-1");
expect(result[0].connected).toBe(true);
expect(result[0].lag_seqs).toBe(50);
});

it("throws HttpError on failure", async () => {
const mockFetch = async () => errorResponse(500, "boom");
const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
await expect(client.instances()).rejects.toThrow(HttpError);
});
});

describe("CloudClient.status", () => {
it("fetches instance status", async () => {
const payload = {
id: "boat-1",
connected: true,
cursor: 1000,
boat_head_seq: 1050,
boat_journal_bytes: 50000,
holes: [{ start: 500, end: 600 }],
lag_seqs: 50,
last_seen: "2026-03-01T00:00:00Z",
};

const mockFetch = async (url: string | URL | Request) => {
expect(url).toBe("https://cloud.example.com/instances/boat-1/status");
return jsonResponse(payload);
};

const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
const result = await client.status("boat-1");
expect(result.id).toBe("boat-1");
expect(result.holes).toHaveLength(1);
expect(result.holes[0].start).toBe(500);
});

it("throws HttpError on 404", async () => {
const mockFetch = async () => errorResponse(404, "instance not found");
const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
await expect(client.status("nope")).rejects.toThrow(HttpError);
});
});

describe("CloudClient.replicationEvents", () => {
it("fetches replication events", async () => {
const events = [
{
time: "2026-03-01T00:00:00Z",
type: "live_start",
detail: { boat_head_seq: 1000 },
},
{
time: "2026-03-01T00:00:01Z",
type: "checkpoint",
detail: { frames_received: 50000 },
},
];

const mockFetch = async (url: string | URL | Request) => {
expect(url).toBe(
"https://cloud.example.com/instances/boat-1/replication/events",
);
return jsonResponse(events);
};

const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
const result = await client.replicationEvents("boat-1");
expect(result).toHaveLength(2);
expect(result[0].type).toBe("live_start");
expect(result[1].detail?.frames_received).toBe(50000);
});

it("passes limit as query param", async () => {
let capturedURL = "";
const mockFetch = async (url: string | URL | Request) => {
capturedURL = String(url);
return jsonResponse([]);
};

const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
await client.replicationEvents("boat-1", 10);
expect(capturedURL).toBe(
"https://cloud.example.com/instances/boat-1/replication/events?limit=10",
);
});

it("throws HttpError on 404", async () => {
const mockFetch = async () => errorResponse(404, "instance not found");
const client = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
await expect(client.replicationEvents("nope")).rejects.toThrow(HttpError);
});
});

describe("CloudClient.client", () => {
it("returns a Client scoped to the instance", async () => {
const devices = [{ src: 1, manufacturer: "Garmin" }];

const mockFetch = async (url: string | URL | Request) => {
expect(url).toBe("https://cloud.example.com/instances/boat-1/devices");
return jsonResponse(devices);
};

const cloud = new CloudClient("https://cloud.example.com", {
fetch: mockFetch as typeof fetch,
});
const inst = cloud.client("boat-1");
const result = await inst.devices();
expect(result).toHaveLength(1);
});
});