diff --git a/cli/build-cmd.test.ts b/cli/build-cmd.test.ts index 3bffd6b72..9f9d5918a 100644 --- a/cli/build-cmd.test.ts +++ b/cli/build-cmd.test.ts @@ -235,6 +235,9 @@ it("unmodified dependencies", async () => { expect(patchedPjson.patchedPackageJson).toEqual({ scripts: {}, version: "1.2.3", + publishConfig: { + access: "public", + }, dependencies: { "@fireproof/vendor": "workspace:*", "xcmd-ts": "^0.13.0", @@ -244,6 +247,24 @@ it("unmodified dependencies", async () => { }); }); +it("patchPackageJson sets repository when repositoryUrl is provided", async () => { + const version = new Version("1.2.3", ""); + const { patchedPackageJson } = await patchPackageJson("package.json", version, { + mock, + repositoryUrl: "https://github.com/fireproof-storage/fireproof.git", + }); + expect(patchedPackageJson.repository).toEqual({ + type: "git", + url: "https://github.com/fireproof-storage/fireproof.git", + }); +}); + +it("patchPackageJson does not set repository when repositoryUrl is omitted", async () => { + const version = new Version("1.2.3", ""); + const { patchedPackageJson } = await patchPackageJson("package.json", version, { mock }); + expect(patchedPackageJson.repository).toBeUndefined(); +}); + it("sanitizeNpmrc with http lhs", async () => { const npmrc = [ "; .npmrc", diff --git a/cli/build-cmd.ts b/cli/build-cmd.ts index cca54436f..05422e71d 100644 --- a/cli/build-cmd.ts +++ b/cli/build-cmd.ts @@ -122,10 +122,13 @@ export interface PackageJson { exports: Record>; dependencies: Record; devDependencies: Record; + publishConfig?: Record; + repository?: { type: string; url: string } | string; } export interface PatchPackageJsonOptions { changeScope?: string; + repositoryUrl?: string; readonly mock?: { readonly readJSON: typeof fs.readJson; }; @@ -155,6 +158,10 @@ export async function patchPackageJson( patchedPackageJson.version = version.version; delete patchedPackageJson.scripts["pack"]; delete patchedPackageJson.scripts["publish"]; + patchedPackageJson.publishConfig = { ...patchedPackageJson.publishConfig, access: "public" }; + if (opts.repositoryUrl) { + patchedPackageJson.repository = { type: "git", url: opts.repositoryUrl }; + } return { patchedPackageJson, originalPackageJson }; } @@ -319,6 +326,14 @@ export async function buildJsrConf( return jsrConf; } +export async function getGitOriginUrl(): Promise { + const res = await $`git config --get remote.origin.url`.nothrow(); + if (res.exitCode !== 0) { + return ""; + } + return res.stdout.trim(); +} + // eslint-disable-next-line @typescript-eslint/no-unused-vars export function buildCmd(sthis: SuperThis) { const cmd = command({ @@ -476,6 +491,12 @@ export function buildCmd(sthis: SuperThis) { defaultValueIsSerializable: true, description: "Package manager to use (pnpm, npm, yarn, bun), defaults to 'pnpm'.", }), + repositoryUrl: option({ + long: "repository-url", + type: string, + defaultValue: () => "", + description: "The repository URL to set in package.json. Defaults to the origin URL read from .git/config.", + }), }, handler: async (args) => { const top = await findUp("tsconfig.dist.json"); @@ -546,7 +567,14 @@ export function buildCmd(sthis: SuperThis) { $.verbose = true; cd(jsrDstDir); - let packageJson = await patchPackageJson("package.json", version, { changeScope: args.changeScope }); + if (!args.repositoryUrl) { + args.repositoryUrl = await getGitOriginUrl(); + } + + let packageJson = await patchPackageJson("package.json", version, { + changeScope: args.changeScope, + repositoryUrl: args.repositoryUrl, + }); if (!args.noPinned) { console.log( "Prepared package.json with pinning for", diff --git a/cloud/3rd-party/package.json b/cloud/3rd-party/package.json index ad3e2a497..80672c1c3 100644 --- a/cloud/3rd-party/package.json +++ b/cloud/3rd-party/package.json @@ -45,6 +45,7 @@ "@fireproof/core-keybag": "workspace:*", "@fireproof/core-protocols-dashboard": "workspace:*", "@fireproof/core-runtime": "workspace:*", + "@fireproof/core-quick-silver": "workspace:*", "@fireproof/core-types-protocols-cloud": "workspace:*", "@fireproof/core-types-protocols-dashboard": "workspace:*", "jose": "^6.1.3", diff --git a/cloud/3rd-party/src/App.tsx b/cloud/3rd-party/src/App.tsx index eacd2f80f..e1194523f 100644 --- a/cloud/3rd-party/src/App.tsx +++ b/cloud/3rd-party/src/App.tsx @@ -1,26 +1,30 @@ -import { DocWithId, useFireproof, toCloud, RedirectStrategy } from "use-fireproof"; +import { DocWithId, useFireproof } from "use-fireproof"; import React, { useState, useEffect } from "react"; import "./App.css"; +import { QuickSilver } from "../../../core/quick-silver/quick-silver.js"; // import { URI } from "@adviser/cement"; function App() { const { database, attach } = useFireproof("fireproof-5-party", { - attach: toCloud({ - strategy: new RedirectStrategy({ - // overlayCss: defaultOverlayCss, - overlayHtml: (url: string) => `
-
×
- Fireproof Dashboard
- Sign in to Fireproof Dashboard - Redirect to Fireproof -
`, - }), - // dashboardURI: "http://localhost:7370/fp/cloud/api/token", - // tokenApiURI: "http://localhost:7370/api", - // urls: { base: "fpcloud://localhost:8787?protocol=ws" }, - // tenant: "3rd-party", - // ledger: "vibes", - }), + databaseFactory: (name) => { + return new QuickSilver({ name }); + }, + // attach: toCloud({ + // strategy: new RedirectStrategy({ + // // overlayCss: defaultOverlayCss, + // overlayHtml: (url: string) => `
+ //
×
+ // Fireproof Dashboard
+ // Sign in to Fireproof Dashboard + // Redirect to Fireproof + //
`, + // }), + // dashboardURI: "http://localhost:7370/fp/cloud/api/token", + // tokenApiURI: "http://localhost:7370/api", + // urls: { base: "fpcloud://localhost:8787?protocol=ws" }, + // tenant: "3rd-party", + // ledger: "vibes", + // }), }); const [rows, setRows] = useState([] as DocWithId<{ value: string }>[]); // const [token, setToken] = useState(""); diff --git a/cloud/quick-silver/api/cli.ts b/cloud/quick-silver/api/cli.ts new file mode 100644 index 000000000..c392cd747 --- /dev/null +++ b/cloud/quick-silver/api/cli.ts @@ -0,0 +1,161 @@ +/* eslint-disable no-console */ +import { command, option, multioption, string, run, subcommands, array } from "cmd-ts"; +import { QSApi } from "./qs-api.js"; +import type { QSGet, QSPut } from "@fireproof/cloud-quick-silver-types"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { processStream } from "@adviser/cement"; + +const sthis = ensureSuperThis(); + +// ── shared connection options ───────────────────────────────────────────────── + +function connectionOpts() { + return { + url: option({ + long: "url", + short: "u", + description: "WebSocket URL of the quick-silver worker", + type: string, + }), + db: option({ + long: "db", + short: "d", + description: "Database name", + type: string, + }), + authType: option({ + long: "auth-type", + description: "Auth type", + type: string, + defaultValue: () => "anon", + }), + authToken: option({ + long: "auth-token", + short: "t", + description: "Auth token", + type: string, + defaultValue: () => "", + }), + }; +} + +function makeApi(args: { url: string; db: string; authType: string; authToken: string }): ReturnType { + return QSApi({ + url: args.url, + db: args.db, + auth: () => ({ type: args.authType, token: args.authToken }), + }); +} + +// ── get ─────────────────────────────────────────────────────────────────────── + +const getCmd = command({ + name: "get", + description: "Fetch one or more docs by key (comma-separated)", + args: { + ...connectionOpts(), + keys: multioption({ + long: "keys", + short: "k", + description: "Key to fetch (repeatable)", + type: array(string), + }), + }, + handler: async (args) => { + const api = await makeApi(args); + const ops: QSGet[] = args.keys.map((key) => ({ key })); + await processStream(api.get(ops), async (r) => { + console.log(JSON.stringify(r)); + }); + await api.close(); + }, +}); + +// ── put ─────────────────────────────────────────────────────────────────────── + +const putCmd = command({ + name: "put", + description: "Write docs by key (repeatable: --pkg key,{json})", + args: { + ...connectionOpts(), + pkg: multioption({ + long: "pkg", + short: "p", + description: "key,{json} pair to store (repeatable)", + type: array(string), + }), + }, + handler: async (args) => { + const ops: QSPut[] = args.pkg.map((pair) => { + const comma = pair.indexOf(","); + if (comma === -1) throw new Error(`invalid --pkg "${pair}": expected key,{json}`); + const key = pair.slice(0, comma); + const data = sthis.ende.cbor.encodeToUint8(JSON.parse(pair.slice(comma + 1))) as Uint8Array; + return { key, data } satisfies QSPut; + }); + const api = await makeApi(args); + await processStream(api.put(ops), async (r) => { + console.log(JSON.stringify(r)); + }); + await api.close(); + }, +}); + +// ── all ─────────────────────────────────────────────────────────────────────── + +const queryCmd = command({ + name: "query", + description: "Stream all docs in the database", + args: { + ...connectionOpts(), + }, + handler: async (args) => { + const api = await makeApi(args); + await processStream(api.query(), async (r) => { + console.log(JSON.stringify(r)); + }); + await api.close(); + }, +}); + +// ── subscribe ───────────────────────────────────────────────────────────────── + +const subscribeCmd = command({ + name: "subscribe", + description: "Subscribe to events for a subscribeId (streams until Ctrl+C)", + args: { + ...connectionOpts(), + }, + handler: async (args) => { + const api = await makeApi(args); + const handle = api.subscribe(); + + const cleanup = () => { + handle.close(); + api + .close() + .then(() => process.exit(0)) + .catch(() => process.exit(1)); + }; + process.on("SIGINT", cleanup); + process.on("SIGTERM", cleanup); + + await processStream(handle.events, async (r) => { + console.log(JSON.stringify(r)); + }); + await api.close(); + }, +}); + +// ── entry point ─────────────────────────────────────────────────────────────── + +const cmd = subcommands({ + name: "qs", + description: "quick-silver CLI", + version: "0.0.0", + cmds: { get: getCmd, put: putCmd, query: queryCmd, subscribe: subscribeCmd }, +}); + +run(cmd, process.argv.slice(2)) + .then(() => process.exit(0)) + .catch(console.error); diff --git a/cloud/quick-silver/api/package.json b/cloud/quick-silver/api/package.json new file mode 100644 index 000000000..2f7d68e80 --- /dev/null +++ b/cloud/quick-silver/api/package.json @@ -0,0 +1,16 @@ +{ + "name": "@fireproof/cloud-quick-silver-api", + "version": "0.0.0", + "type": "module", + "scripts": { + "build": "core-cli tsc", + "pack": "core-cli build --doPack", + "publish": "core-cli build" + }, + "dependencies": { + "@adviser/cement": "^0.5.22", + "@fireproof/cloud-quick-silver-types": "workspace:*", + "@fireproof/core-runtime": "workspace:*", + "cmd-ts": "^0.15.0" + } +} diff --git a/cloud/quick-silver/api/qs-api.ts b/cloud/quick-silver/api/qs-api.ts new file mode 100644 index 000000000..61198e0c2 --- /dev/null +++ b/cloud/quick-silver/api/qs-api.ts @@ -0,0 +1,299 @@ +import { Future, Lazy, OnFunc, KeyedResolvOnce, timeouted, isTimeout, isError } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import type { + QSGet, + QSPut, + QSReqGet, + QSReqPut, + QSReqQuery, + QSReqRegisterSubscribe, + QSReqUnregisterSubscribe, + QSResGet, + QSResGetNotFound, + QSResPut, + QSResErr, + QSResRegisterSubscribe, + QSEvtSubscribe, + QSResQueryBegin, + QSResQueryRow, + QSResQueryEnd, + QSMsg, + QSCloudAuth, +} from "@fireproof/cloud-quick-silver-types"; +import { + isQSMsg, + isQSResGet, + isQSResGetNotFound, + isQSResPut, + isQSResErr, + isQSResQueryEnd, + isQSResRegisterSubscribe, + isQSEvtSubscribe, +} from "@fireproof/cloud-quick-silver-types"; + +const sthis = ensureSuperThis(); + +const DEFAULT_TIMEOUT_MS = 30_000; + +export interface QSSubscribeHandle { + readonly events: ReadableStream; + readonly close: () => void; +} + +export interface QSApiOpts { + readonly url: string; + readonly db: string; + readonly auth: () => QSCloudAuth; + readonly timeoutMs?: number; +} + +const connections = new KeyedResolvOnce(); + +export async function QSApi(opts: QSApiOpts): Promise { + return connections.get(opts.url).once(() => new QSApiImpl(opts)); +} + +class QSApiImpl { + private readonly opts: QSApiOpts; + + constructor(opts: QSApiOpts) { + this.opts = opts; + } + + readonly connect = Lazy(() => { + const ws = new WebSocket(this.opts.url); + const opened = new Future(); + ws.binaryType = "arraybuffer"; + ws.onopen = () => { + opened.resolve(ws); + }; + ws.onerror = (e) => opened.reject(new Error(String(e))); + ws.onmessage = (evt) => { + const decoded = sthis.ende.cbor.decodeUint8(new Uint8Array(evt.data as ArrayBuffer)); + if (decoded.isErr()) return; + const msg = decoded.Ok(); + if (isQSMsg(msg)) { + this.onMessage.invoke(msg); + } + }; + return opened.asPromise(); + }); + + private onMessage = OnFunc<(msg: QSMsg) => void>(); + + private withTimeout(done: Future, unreg: () => void, writer: WritableStreamDefaultWriter): void { + timeouted(done.asPromise(), { timeout: this.opts.timeoutMs ?? DEFAULT_TIMEOUT_MS }).then((result) => { + if (isTimeout(result) || isError(result)) { + unreg(); + writer.abort(new Error(isTimeout(result) ? "request timeout" : result.error?.message)); + } + }); + } + + get(ops: QSGet[]): ReadableStream { + const tids = ops.map(() => sthis.nextId().str); + const pending = new Set(tids); + const { readable, writable } = new TransformStream< + QSResGet | QSResGetNotFound | QSResErr, + QSResGet | QSResGetNotFound | QSResErr + >(); + const writer = writable.getWriter(); + const done = new Future(); + + const unreg = this.onMessage((msg) => { + if (!pending.has(msg.tid)) return; + if (!isQSResGet(msg) && !isQSResGetNotFound(msg) && !isQSResErr(msg)) return; + pending.delete(msg.tid); + writer.write(msg); + if (pending.size === 0) { + unreg(); + done.resolve(); + writer.close(); + } + }); + + this.withTimeout(done, unreg, writer); + + this.connect() + .then((ws) => { + for (let i = 0; i < ops.length; i++) { + ws.send( + sthis.ende.cbor.encodeToUint8({ + type: "QSReqGet", + tid: tids[i], + arg: 0, + db: this.opts.db, + auth: this.opts.auth(), + ...ops[i], + } satisfies QSReqGet), + ); + } + }) + .catch((e) => { + unreg(); + done.reject(e); + writer.abort(e); + }); + + return readable; + } + + put(ops: QSPut[]): ReadableStream { + const tids = ops.map(() => sthis.nextId().str); + const pending = new Set(tids); + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + const done = new Future(); + + const unreg = this.onMessage((msg) => { + if (!pending.has(msg.tid)) return; + if (!isQSResPut(msg) && !isQSResErr(msg)) return; + pending.delete(msg.tid); + writer.write(msg); + if (pending.size === 0) { + unreg(); + done.resolve(); + writer.close(); + } + }); + + this.withTimeout(done, unreg, writer); + + this.connect() + .then((ws) => { + for (let i = 0; i < ops.length; i++) { + ws.send( + sthis.ende.cbor.encodeToUint8({ + type: "QSReqPut", + tid: tids[i], + arg: 0, + db: this.opts.db, + auth: this.opts.auth(), + ...ops[i], + } satisfies QSReqPut), + ); + } + }) + .catch((e) => { + unreg(); + done.reject(e); + writer.abort(e); + }); + + return readable; + } + + query(_pred?: (data: unknown) => boolean): ReadableStream { + // pred is coming later + const tid = sthis.nextId().str; + const { readable, writable } = new TransformStream< + QSResQueryBegin | QSResQueryRow | QSResQueryEnd | QSResErr, + QSResQueryBegin | QSResQueryRow | QSResQueryEnd | QSResErr + >(); + const writer = writable.getWriter(); + const done = new Future(); + + const unreg = this.onMessage((msg) => { + if (msg.tid !== tid) return; + writer.write(msg as QSResQueryBegin | QSResQueryRow | QSResQueryEnd | QSResErr); + if (isQSResQueryEnd(msg) || isQSResErr(msg)) { + unreg(); + done.resolve(); + writer.close(); + } + }); + + this.withTimeout(done, unreg, writer); + + this.connect() + .then((ws) => { + ws.send( + sthis.ende.cbor.encodeToUint8({ + type: "QSReqQuery", + tid, + arg: 0, + db: this.opts.db, + auth: this.opts.auth(), + } satisfies QSReqQuery), + ); + }) + .catch((e) => { + unreg(); + done.reject(e); + writer.abort(e); + }); + + return readable; + } + + subscribe(): QSSubscribeHandle { + const tid = sthis.nextId().str; + const { readable, writable } = new TransformStream< + QSResRegisterSubscribe | QSEvtSubscribe | QSResErr, + QSResRegisterSubscribe | QSEvtSubscribe | QSResErr + >(); + const writer = writable.getWriter(); + + const unreg = this.onMessage((msg) => { + if (msg.tid !== tid) return; + if (!isQSResRegisterSubscribe(msg) && !isQSEvtSubscribe(msg) && !isQSResErr(msg)) return; + writer.write(msg); + if (isQSResErr(msg)) { + unreg(); + writer.close(); + } + }); + + this.connect() + .then((ws) => { + ws.send( + sthis.ende.cbor.encodeToUint8({ + type: "QSReqRegisterSubscribe", + tid, + arg: 0, + db: this.opts.db, + auth: this.opts.auth(), + } satisfies QSReqRegisterSubscribe), + ); + }) + .catch((e) => { + unreg(); + writer.abort(e); + }); + + return { + events: readable, + close: () => { + unreg(); + writer.close().catch(() => { + /* ignore if already closed */ + }); + this.connect() + .then((ws) => { + ws.send( + sthis.ende.cbor.encodeToUint8({ + type: "QSReqUnregisterSubscribe", + tid, + arg: 0, + db: this.opts.db, + auth: this.opts.auth(), + } satisfies QSReqUnregisterSubscribe), + ); + }) + .catch(() => { + /* ignore */ + }); + }, + }; + } + + close(): Promise { + return this.connect() + .then((ws) => { + ws.close(); + }) + .catch(() => { + /* ignore */ + }); + } +} diff --git a/cloud/quick-silver/api/tsconfig.json b/cloud/quick-silver/api/tsconfig.json new file mode 100644 index 000000000..a7db7c68a --- /dev/null +++ b/cloud/quick-silver/api/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + } +} diff --git a/cloud/quick-silver/cf/env.ts b/cloud/quick-silver/cf/env.ts new file mode 100644 index 000000000..414451066 --- /dev/null +++ b/cloud/quick-silver/cf/env.ts @@ -0,0 +1,6 @@ +/// + +export interface Env { + QS_ROOM: DurableObjectNamespace; + QS_DB_STORE: DurableObjectNamespace; +} diff --git a/cloud/quick-silver/cf/package.json b/cloud/quick-silver/cf/package.json new file mode 100644 index 000000000..d56a06c7a --- /dev/null +++ b/cloud/quick-silver/cf/package.json @@ -0,0 +1,22 @@ +{ + "name": "@fireproof/cloud-quick-silver-cf", + "version": "0.0.0", + "type": "module", + "scripts": { + "dev": "wrangler dev", + "deploy": "wrangler deploy", + "build": "core-cli tsc", + "pack": "core-cli build --doPack", + "publish": "core-cli build" + }, + "dependencies": { + "@adviser/cement": "^0.5.22", + "@fireproof/cloud-quick-silver-types": "workspace:*", + "@fireproof/core-types-base": "workspace:*", + "@fireproof/core-runtime": "workspace:*" + }, + "devDependencies": { + "@cloudflare/workers-types": "^4.20260214.0", + "wrangler": "^4.66.0" + } +} diff --git a/cloud/quick-silver/cf/qs-db-evento.ts b/cloud/quick-silver/cf/qs-db-evento.ts new file mode 100644 index 000000000..23de8e77d --- /dev/null +++ b/cloud/quick-silver/cf/qs-db-evento.ts @@ -0,0 +1,125 @@ +/// + +import { Lazy, Evento, EventoResult, EventoType, LoggerImpl, Result, Option } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { isQSReqGet, isQSReqPut, isQSReqQuery, QSResErr, QSEvtSubscribe } from "@fireproof/cloud-quick-silver-types"; +import type { QSReqGet, QSReqPut, QSReqQuery } from "@fireproof/cloud-quick-silver-types"; +import { QSCborEventoEnDecoder } from "./qs-encoder.js"; + +// Minimal interface to avoid circular import with qs-db-store.ts +interface QSDbDO { + readonly ctx: { readonly storage: DurableObjectStorage }; +} + +function handlerSql(ctx: { ctx: unknown }): SqlStorage { + return (ctx.ctx as unknown as QSDbDO).ctx.storage.sql; +} + +export const qsDbEvento = Lazy(() => { + const sthis = ensureSuperThis({ logger: new LoggerImpl() }); + const evento = new Evento(new QSCborEventoEnDecoder(sthis)); + + evento.push( + { + hash: "qs-req-put", + validate: async (ctx) => { + if (isQSReqPut(ctx.enRequest)) return Result.Ok(Option.Some(ctx.enRequest as QSReqPut)); + return Result.Ok(Option.None()); + }, + handle: async (ctx) => { + const req = ctx.validated as QSReqPut; + const sql = handlerSql(ctx); + sql.exec( + `INSERT INTO docs (id, cid, data) VALUES (?, ?, ?) + ON CONFLICT(id) DO UPDATE SET cid = excluded.cid, data = excluded.data`, + req.key, + req.key, + req.data, + ); + await ctx.send.send(ctx, { type: "QSResPut", tid: req.tid, arg: req.arg, key: req.key }); + await ctx.send.send(ctx, { + type: "QSEvtSubscribe", + tid: req.tid, + msg: { key: req.key, data: req.data }, + } satisfies QSEvtSubscribe); + return Result.Ok(EventoResult.Continue); + }, + }, + { + hash: "qs-req-get", + validate: async (ctx) => { + if (isQSReqGet(ctx.enRequest)) return Result.Ok(Option.Some(ctx.enRequest as QSReqGet)); + return Result.Ok(Option.None()); + }, + handle: async (ctx) => { + const req = ctx.validated as QSReqGet; + const sql = handlerSql(ctx); + const rows = [ + ...sql.exec<{ id: string; cid: string; data: ArrayBuffer }>(`SELECT id, cid, data FROM docs WHERE id = ?`, req.key), + ]; + if (!rows.length) { + await ctx.send.send(ctx, { type: "QSResGetNotFound", tid: req.tid, arg: req.arg, key: req.key }); + } else { + const row = rows[0]; + await ctx.send.send(ctx, { type: "QSResGet", tid: req.tid, arg: req.arg, key: row.id, data: row.data }); + } + return Result.Ok(EventoResult.Continue); + }, + }, + { + hash: "qs-req-query", + validate: async (ctx) => { + if (isQSReqQuery(ctx.enRequest)) return Result.Ok(Option.Some(ctx.enRequest as QSReqQuery)); + return Result.Ok(Option.None()); + }, + handle: async (ctx) => { + const req = ctx.validated as QSReqQuery; + const sql = handlerSql(ctx); + await ctx.send.send(ctx, { type: "QSResQueryBegin", tid: req.tid, arg: req.arg }); + const cursor = sql.exec<{ id: string; cid: string; data: ArrayBuffer; synced: number }>( + `SELECT id, cid, data, synced FROM docs`, + ); + let rowNr = 0; + for (const row of cursor) { + await ctx.send.send(ctx, { + type: "QSResQueryRow", + tid: req.tid, + arg: req.arg, + rowNr: rowNr++, + row: { _: { id: row.id, cid: row.cid, synced: row.synced }, payload: row.data }, + }); + } + await ctx.send.send(ctx, { type: "QSResQueryEnd", tid: req.tid, arg: req.arg, rows: rowNr }); + return Result.Ok(EventoResult.Continue); + }, + }, + { + type: EventoType.WildCard, + hash: "qs-unknown", + handle: async (ctx) => { + await ctx.send.send(ctx, { + type: "QSResErr", + tid: "unknown", + arg: 0, + error: `unknown request: ${JSON.stringify(ctx.enRequest)}`, + } satisfies QSResErr); + return Result.Ok(EventoResult.Continue); + }, + }, + { + type: EventoType.Error, + hash: "qs-error", + handle: async (ctx) => { + await ctx.send.send(ctx, { + type: "QSResErr", + tid: "unknown", + arg: 0, + error: ctx.error?.message ?? "internal error", + } satisfies QSResErr); + return Result.Ok(EventoResult.Continue); + }, + }, + ); + + return evento; +}); diff --git a/cloud/quick-silver/cf/qs-db-store.ts b/cloud/quick-silver/cf/qs-db-store.ts new file mode 100644 index 000000000..eaea8358e --- /dev/null +++ b/cloud/quick-silver/cf/qs-db-store.ts @@ -0,0 +1,55 @@ +/// + +import { DurableObject } from "cloudflare:workers"; +import { AppContext, LoggerImpl } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { Env } from "./env.js"; +import { QSSendProvider } from "./qs-send-provider.js"; +import { qsDbEvento } from "./qs-db-evento.js"; + +export class QSDBStore extends DurableObject { + private readonly sthis; + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + this.sthis = ensureSuperThis({ logger: new LoggerImpl() }); + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS docs ( + id TEXT PRIMARY KEY, + cid TEXT NOT NULL, + data BLOB NOT NULL, + synced INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (unixepoch()) + ) + `); + } + + async fetch(req: Request): Promise { + if (req.headers.get("Upgrade") !== "websocket") { + return new Response("expected websocket", { status: 426 }); + } + const { 0: client, 1: server } = new WebSocketPair(); + this.ctx.acceptWebSocket(server); + return new Response(null, { status: 101, webSocket: client }); + } + + async webSocketMessage(ws: WebSocket, msg: string | ArrayBuffer): Promise { + const sendProvider = new QSSendProvider(ws, this.sthis); + if (typeof msg === "string") { + console.log("[QSDBStore] rejected string message"); + await sendProvider.send({} as never, { type: "QSResErr", tid: "unknown", arg: 0, error: "binary messages only" }); + return; + } + console.log("[QSDBStore] dispatching binary message, bytes:", (msg as ArrayBuffer).byteLength); + const ctx = new AppContext().set("QSDBStore", this); + await qsDbEvento().trigger({ ctx, request: msg as ArrayBuffer, send: sendProvider }); + } + + webSocketClose(ws: WebSocket, code: number, reason: string): void { + ws.close(code, reason); + } + + webSocketError(ws: WebSocket, error: unknown): void { + ws.close(1011, String(error)); + } +} diff --git a/cloud/quick-silver/cf/qs-encoder.ts b/cloud/quick-silver/cf/qs-encoder.ts new file mode 100644 index 000000000..85bbd7b8b --- /dev/null +++ b/cloud/quick-silver/cf/qs-encoder.ts @@ -0,0 +1,22 @@ +/// + +import { EventoEnDecoder, Result } from "@adviser/cement"; +import type { SuperThis } from "@fireproof/core-types-base"; + +export class QSCborEventoEnDecoder implements EventoEnDecoder { + private readonly sthis: SuperThis; + + constructor(sthis: SuperThis) { + this.sthis = sthis; + } + + async encode(args: ArrayBuffer): Promise> { + const decoded = this.sthis.ende.cbor.decodeUint8(new Uint8Array(args)); + if (decoded.isErr()) return Result.Ok(); + return Result.Ok(decoded.Ok()); + } + + decode(data: unknown): Promise> { + return Promise.resolve(Result.Ok(JSON.stringify(data))); + } +} diff --git a/cloud/quick-silver/cf/qs-room-evento.ts b/cloud/quick-silver/cf/qs-room-evento.ts new file mode 100644 index 000000000..c5d377883 --- /dev/null +++ b/cloud/quick-silver/cf/qs-room-evento.ts @@ -0,0 +1,124 @@ +/// + +import { Lazy, Evento, EventoResult, EventoType, LoggerImpl, Result, Option, HandleTriggerCtx } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { + isQSReqGet, + isQSReqPut, + isQSReqQuery, + isQSReqRegisterSubscribe, + isQSReqUnregisterSubscribe, + QSResErr, + QSResRegisterSubscribe, +} from "@fireproof/cloud-quick-silver-types"; +import type { + QSReqGet, + QSReqPut, + QSReqQuery, + QSReqRegisterSubscribe, + QSReqUnregisterSubscribe, +} from "@fireproof/cloud-quick-silver-types"; +import { QSCborEventoEnDecoder } from "./qs-encoder.js"; +import type { QSSendProvider } from "./qs-send-provider.js"; + +// Minimal interface to avoid circular import with qs-room.ts +export interface QSRoomDO { + getStoreWs(db: string, clientWs: WebSocket): Promise; + registerSubscription(ws: WebSocket, db: string, tid: string): void; + unregisterSubscription(ws: WebSocket, tid: string): void; +} + +function room(ctx: { ctx: unknown }): QSRoomDO { + return ctx.ctx as QSRoomDO; +} + +function clientWs(ctx: { send: unknown }): WebSocket { + return (ctx.send as { provider: QSSendProvider }).provider.ws; +} + +export const qsRoomEvento = Lazy(() => { + const sthis = ensureSuperThis({ logger: new LoggerImpl() }); + const evento = new Evento(new QSCborEventoEnDecoder(sthis)); + + evento.push( + { + hash: "qs-req-register-subscribe", + validate: async (ctx) => { + if (isQSReqRegisterSubscribe(ctx.enRequest)) return Result.Ok(Option.Some(ctx.enRequest as QSReqRegisterSubscribe)); + return Result.Ok(Option.None()); + }, + handle: async (ctx) => { + const req = ctx.validated as QSReqRegisterSubscribe; + console.log("[QSRoom] register subscribe tid:", req.tid, "db:", req.db); + room(ctx).registerSubscription(clientWs(ctx), req.db, req.tid); + await ctx.send.send(ctx, { + type: "QSResRegisterSubscribe", + tid: req.tid, + arg: req.arg, + db: req.db, + } satisfies QSResRegisterSubscribe); + return Result.Ok(EventoResult.Continue); + }, + }, + { + hash: "qs-req-unregister-subscribe", + validate: async (ctx) => { + if (isQSReqUnregisterSubscribe(ctx.enRequest)) return Result.Ok(Option.Some(ctx.enRequest as QSReqUnregisterSubscribe)); + return Result.Ok(Option.None()); + }, + handle: async (ctx) => { + const req = ctx.validated as QSReqUnregisterSubscribe; + console.log("[QSRoom] unregister subscribe tid:", req.tid, "db:", req.db); + room(ctx).unregisterSubscription(clientWs(ctx), req.tid); + return Result.Ok(EventoResult.Continue); + }, + }, + { + hash: "qs-req-forward", + validate: async (ctx) => { + const p = ctx.enRequest; + if (isQSReqGet(p) || isQSReqPut(p) || isQSReqQuery(p)) return Result.Ok(Option.Some(p as QSReqGet | QSReqPut | QSReqQuery)); + return Result.Ok(Option.None()); + }, + handle: async (ctx: HandleTriggerCtx) => { + const req = ctx.validated as QSReqGet | QSReqPut | QSReqQuery; + console.log("[QSRoom] routing", req.type, "tid:", req.tid, "db:", req.db); + const storeWs = await room(ctx).getStoreWs(req.db, clientWs(ctx)); + try { + storeWs.send(ctx.request); + } catch (e) { + console.log("[QSRoom] send to store failed (client may have disconnected):", e); + } + return Result.Ok(EventoResult.Continue); + }, + }, + { + type: EventoType.WildCard, + hash: "qs-room-unknown", + handle: async (ctx) => { + await ctx.send.send(ctx, { + type: "QSResErr", + tid: "unknown", + arg: 0, + error: `unknown request: ${JSON.stringify(ctx.enRequest)}`, + } satisfies QSResErr); + return Result.Ok(EventoResult.Continue); + }, + }, + { + type: EventoType.Error, + hash: "qs-room-error", + handle: async (ctx) => { + await ctx.send.send(ctx, { + type: "QSResErr", + tid: "unknown", + arg: 0, + error: ctx.error?.message ?? "internal error", + } satisfies QSResErr); + return Result.Ok(EventoResult.Continue); + }, + }, + ); + + return evento; +}); diff --git a/cloud/quick-silver/cf/qs-room.ts b/cloud/quick-silver/cf/qs-room.ts new file mode 100644 index 000000000..50fde70e9 --- /dev/null +++ b/cloud/quick-silver/cf/qs-room.ts @@ -0,0 +1,128 @@ +/// + +import { DurableObject } from "cloudflare:workers"; +import { LRUMap, LoggerImpl, AppContext } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { Env } from "./env.js"; +import { isQSEvtSubscribe, QSResErr } from "@fireproof/cloud-quick-silver-types"; +import { QSSendProvider } from "./qs-send-provider.js"; +import { qsRoomEvento } from "./qs-room-evento.js"; +import type { QSRoomDO } from "./qs-room-evento.js"; + +// Stored as ws.serializeAttachment — survives DO hibernation +interface WsSubscription { + readonly db: string; + readonly tid: string; +} + +export class QSRoom extends DurableObject implements QSRoomDO { + private readonly sthis; + private readonly stores = new LRUMap({ maxEntries: 64 }); + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + this.sthis = ensureSuperThis({ logger: new LoggerImpl() }); + } + + async fetch(req: Request): Promise { + if (req.headers.get("Upgrade") !== "websocket") { + return new Response("expected websocket", { status: 426 }); + } + const { 0: client, 1: server } = new WebSocketPair(); + this.ctx.acceptWebSocket(server); + console.log("[QSRoom] new websocket connection accepted"); + return new Response(null, { status: 101, webSocket: client }); + } + + async webSocketMessage(ws: WebSocket, msg: string | ArrayBuffer): Promise { + const sendProvider = new QSSendProvider(ws, this.sthis); + if (typeof msg === "string") { + console.log("[QSRoom] rejected string message"); + await sendProvider.send({} as never, { + type: "QSResErr", + tid: "unknown", + arg: 0, + error: "binary messages only", + } satisfies QSResErr); + return; + } + const ctx = new AppContext().set("QSRoomDO", this); + await qsRoomEvento().trigger({ ctx, request: msg as ArrayBuffer, send: sendProvider }); + } + + registerSubscription(ws: WebSocket, db: string, tid: string): void { + const current: WsSubscription[] = ws.deserializeAttachment() ?? []; + current.push({ db, tid }); + ws.serializeAttachment(current); + } + + unregisterSubscription(ws: WebSocket, tid: string): void { + const current: WsSubscription[] = ws.deserializeAttachment() ?? []; + ws.serializeAttachment(current.filter((s) => s.tid !== tid)); + } + + private dispatchSubscribeEvent(db: string, msg: unknown): void { + const allWs = this.ctx.getWebSockets(); + console.log("[QSRoom] QSEvtSubscribe → checking", allWs.length, "websockets for db:", db); + for (const subWs of allWs) { + const subs: WsSubscription[] = subWs.deserializeAttachment() ?? []; + for (const sub of subs) { + if (sub.db !== db) continue; + try { + subWs.send(this.sthis.ende.cbor.encodeToUint8({ ...(msg as object), tid: sub.tid })); + } catch (e) { + console.log("[QSRoom] failed to notify subscriber tid:", sub.tid, e); + } + } + } + } + + async getStoreWs(db: string, clientWs: WebSocket): Promise { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return (await this.stores.getSet(db, async () => { + const id = this.env.QS_DB_STORE.idFromName(db); + const stub = this.env.QS_DB_STORE.get(id); + const res = await stub.fetch("https://internal/ws", { + headers: { Upgrade: "websocket" }, + }); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const storeWs = res.webSocket!; + storeWs.accept(); + storeWs.addEventListener("message", (evt) => { + const decoded = this.sthis.ende.cbor.decodeUint8(new Uint8Array(evt.data as ArrayBuffer)); + if (decoded.isErr()) return; + const msg = decoded.Ok(); + if (isQSEvtSubscribe(msg)) { + this.dispatchSubscribeEvent(db, msg); + } else { + clientWs.send(evt.data); + } + }); + return storeWs; + }))!; + } + + webSocketClose(ws: WebSocket, code: number, reason: string): void { + // 1006 is synthesized by the runtime for abnormal drops — not a valid close() code + const safeCode = code === 1006 ? 1000 : code; + console.log("[QSRoom] websocket closed:", code, reason); + this.stores.forEach((storeWs) => { + try { + storeWs.close(safeCode, reason); + } catch { + /* already closed */ + } + }); + this.stores.clear(); + try { + ws.close(safeCode, reason); + } catch { + /* already closed */ + } + } + + webSocketError(ws: WebSocket, error: unknown): void { + console.log("[QSRoom] websocket error:", error); + ws.close(1011, String(error)); + } +} diff --git a/cloud/quick-silver/cf/qs-send-provider.ts b/cloud/quick-silver/cf/qs-send-provider.ts new file mode 100644 index 000000000..63e3aeedd --- /dev/null +++ b/cloud/quick-silver/cf/qs-send-provider.ts @@ -0,0 +1,20 @@ +/// + +import { EventoSendProvider, HandleTriggerCtx, Result } from "@adviser/cement"; +import type { SuperThis } from "@fireproof/core-types-base"; +import type { QSOpRes } from "@fireproof/cloud-quick-silver-types"; + +export class QSSendProvider implements EventoSendProvider { + readonly ws: WebSocket; + private readonly sthis: SuperThis; + + constructor(ws: WebSocket, sthis: SuperThis) { + this.ws = ws; + this.sthis = sthis; + } + + async send(_ctx: HandleTriggerCtx, res: unknown): Promise> { + this.ws.send(this.sthis.ende.cbor.encodeToUint8(res as QSOpRes)); + return Result.Ok(res as T); + } +} diff --git a/cloud/quick-silver/cf/tsconfig.json b/cloud/quick-silver/cf/tsconfig.json new file mode 100644 index 000000000..a7db7c68a --- /dev/null +++ b/cloud/quick-silver/cf/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + } +} diff --git a/cloud/quick-silver/cf/worker.ts b/cloud/quick-silver/cf/worker.ts new file mode 100644 index 000000000..3aa7473fe --- /dev/null +++ b/cloud/quick-silver/cf/worker.ts @@ -0,0 +1,17 @@ +/// + +import { Env } from "./env.js"; +import { QSRoom } from "./qs-room.js"; +import { QSDBStore } from "./qs-db-store.js"; + +export { QSRoom, QSDBStore }; + +export default { + async fetch(req: Request, env: Env): Promise { + // eslint-disable-next-line no-restricted-globals + const url = new URL(req.url); + const roomName = url.searchParams.get("room") ?? "default"; + const id = env.QS_ROOM.idFromName(roomName); + return env.QS_ROOM.get(id).fetch(req); + }, +} satisfies ExportedHandler; diff --git a/cloud/quick-silver/cf/wrangler.toml b/cloud/quick-silver/cf/wrangler.toml new file mode 100644 index 000000000..dd4ae5e64 --- /dev/null +++ b/cloud/quick-silver/cf/wrangler.toml @@ -0,0 +1,13 @@ +name = "quicksilver" +main = "worker.ts" +compatibility_date = "2025-02-21" + +[durable_objects] +bindings = [ + { name = "QS_ROOM", class_name = "QSRoom" }, + { name = "QS_DB_STORE", class_name = "QSDBStore" } +] + +[[migrations]] +tag = "v1" +new_sqlite_classes = ["QSRoom", "QSDBStore"] diff --git a/cloud/quick-silver/types/index.ts b/cloud/quick-silver/types/index.ts new file mode 100644 index 000000000..ae58c802c --- /dev/null +++ b/cloud/quick-silver/types/index.ts @@ -0,0 +1,253 @@ +import { type } from "arktype"; + +// ── shared base types ───────────────────────────────────────────────────────── + +export const QSCloudAuth = type({ + type: "string", + token: "string", +}); + +export const QSDbName = type({ + db: "string", + auth: QSCloudAuth, +}); + +export const QSTid = type({ + tid: "string", +}); + +export const QSArg = type({ + arg: "number", +}); + +// ── individual ops (no tid/db — those live on the request) ─────────────────── + +export const QSPut = type({ + key: "string", + data: type.instanceOf(Uint8Array), +}); + +export const QSGet = type({ + key: "string", + "idx?": "string", +}); + +export const QSQuery = type({ + // filter / pred: coming later +}); + +// ── individual requests ─────────────────────────────────────────────────────── + +export const QSReqGet = type({ type: '"QSReqGet"' }).and(QSGet).and(QSTid).and(QSArg).and(QSDbName); + +export const QSReqPut = type({ type: '"QSReqPut"' }).and(QSPut).and(QSTid).and(QSArg).and(QSDbName); + +// ── query request ───────────────────────────────────────────────────────────── + +export const QSReqQuery = type({ type: '"QSReqQuery"' }).and(QSQuery).and(QSTid).and(QSDbName).and(QSArg); + +// ── subscribe requests ──────────────────────────────────────────────────────── + +export const QSReqRegisterSubscribe = type({ type: '"QSReqRegisterSubscribe"' }).and(QSTid).and(QSArg).and(QSDbName); + +export const QSReqUnregisterSubscribe = type({ type: '"QSReqUnregisterSubscribe"' }).and(QSTid).and(QSArg).and(QSDbName); + +// ── responses ───────────────────────────────────────────────────────────────── + +export const QSResGet = type({ + type: '"QSResGet"', + key: "string", + data: type.instanceOf(Uint8Array), +}) + .and(QSTid) + .and(QSArg); + +export const QSResGetNotFound = type({ + type: '"QSResGetNotFound"', + key: "string", +}) + .and(QSTid) + .and(QSArg); + +export const QSResPut = type({ + type: '"QSResPut"', + key: "string", +}) + .and(QSTid) + .and(QSArg); + +export const QSResErr = type({ + type: '"QSResErr"', + error: "string", +}) + .and(QSTid) + .and(QSArg); + +// ── subscribe responses ─────────────────────────────────────────────────────── + +export const QSResRegisterSubscribe = type({ + type: '"QSResRegisterSubscribe"', + db: "string", +}) + .and(QSTid) + .and(QSArg); + +export const QSEvtSubscribe = type({ + type: '"QSEvtSubscribe"', + msg: type({ + key: "string", + data: type.instanceOf(Uint8Array), + }), +}).and(QSTid); + +// ── query streaming response ────────────────────────────────────────────────── + +export const QSQueryRowMeta = type({ + id: "string", + cid: "string", + synced: "number", +}); + +export const QSResQueryBegin = type({ + type: '"QSResQueryBegin"', +}) + .and(QSTid) + .and(QSArg); + +export const QSResQueryRow = type({ + type: '"QSResQueryRow"', + rowNr: "number", + row: type({ + _: QSQueryRowMeta, + payload: type.instanceOf(Uint8Array), + }), +}) + .and(QSTid) + .and(QSArg); + +export const QSResQueryEnd = type({ + type: '"QSResQueryEnd"', + rows: "number", +}) + .and(QSTid) + .and(QSArg); + +export const QSOpRes = QSResGet.or(QSResGetNotFound) + .or(QSResPut) + .or(QSResErr) + .or(QSResQueryBegin) + .or(QSResQueryRow) + .or(QSResQueryEnd) + .or(QSResRegisterSubscribe) + .or(QSEvtSubscribe); + +// ── server message (response + transaction id) ──────────────────────────────── + +export const QSMsg = QSTid.and(QSOpRes); + +// ── inferred types ──────────────────────────────────────────────────────────── + +export type QSCloudAuth = typeof QSCloudAuth.infer; +export type QSDbName = typeof QSDbName.infer; +export type QSTid = typeof QSTid.infer; +export type QSArg = typeof QSArg.infer; + +export type QSPut = typeof QSPut.infer; +export type QSGet = typeof QSGet.infer; +export type QSQuery = typeof QSQuery.infer; +export type QSReqGet = typeof QSReqGet.infer; +export type QSReqPut = typeof QSReqPut.infer; +export type QSReqQuery = typeof QSReqQuery.infer; +export type QSReqRegisterSubscribe = typeof QSReqRegisterSubscribe.infer; +export type QSReqUnregisterSubscribe = typeof QSReqUnregisterSubscribe.infer; + +export type QSResGet = typeof QSResGet.infer; +export type QSResGetNotFound = typeof QSResGetNotFound.infer; +export type QSResPut = typeof QSResPut.infer; +export type QSResErr = typeof QSResErr.infer; +export type QSResRegisterSubscribe = typeof QSResRegisterSubscribe.infer; +export type QSEvtSubscribe = typeof QSEvtSubscribe.infer; +export type QSQueryRowMeta = typeof QSQueryRowMeta.infer; +export type QSResQueryBegin = typeof QSResQueryBegin.infer; +export type QSResQueryRow = typeof QSResQueryRow.infer; +export type QSResQueryEnd = typeof QSResQueryEnd.infer; +export type QSOpRes = typeof QSOpRes.infer; +export type QSMsg = typeof QSMsg.infer; + +// ── type guards ─────────────────────────────────────────────────────────────── + +export function isQSPut(x: unknown): x is QSPut { + return QSPut(x) instanceof type.errors === false; +} + +export function isQSGet(x: unknown): x is QSGet { + return QSGet(x) instanceof type.errors === false; +} + +export function isQSQuery(x: unknown): x is QSQuery { + return QSQuery(x) instanceof type.errors === false; +} + +export function isQSReqGet(x: unknown): x is QSReqGet { + return QSReqGet(x) instanceof type.errors === false; +} + +export function isQSReqPut(x: unknown): x is QSReqPut { + return QSReqPut(x) instanceof type.errors === false; +} + +export function isQSReqQuery(x: unknown): x is QSReqQuery { + return QSReqQuery(x) instanceof type.errors === false; +} + +export function isQSResGet(x: unknown): x is QSResGet { + return QSResGet(x) instanceof type.errors === false; +} + +export function isQSResGetNotFound(x: unknown): x is QSResGetNotFound { + return QSResGetNotFound(x) instanceof type.errors === false; +} + +export function isQSResPut(x: unknown): x is QSResPut { + return QSResPut(x) instanceof type.errors === false; +} + +export function isQSResErr(x: unknown): x is QSResErr { + return QSResErr(x) instanceof type.errors === false; +} + +export function isQSResQueryBegin(x: unknown): x is QSResQueryBegin { + return QSResQueryBegin(x) instanceof type.errors === false; +} + +export function isQSResQueryRow(x: unknown): x is QSResQueryRow { + return QSResQueryRow(x) instanceof type.errors === false; +} + +export function isQSResQueryEnd(x: unknown): x is QSResQueryEnd { + return QSResQueryEnd(x) instanceof type.errors === false; +} + +export function isQSReqRegisterSubscribe(x: unknown): x is QSReqRegisterSubscribe { + return QSReqRegisterSubscribe(x) instanceof type.errors === false; +} + +export function isQSReqUnregisterSubscribe(x: unknown): x is QSReqUnregisterSubscribe { + return QSReqUnregisterSubscribe(x) instanceof type.errors === false; +} + +export function isQSResRegisterSubscribe(x: unknown): x is QSResRegisterSubscribe { + return QSResRegisterSubscribe(x) instanceof type.errors === false; +} + +export function isQSEvtSubscribe(x: unknown): x is QSEvtSubscribe { + return QSEvtSubscribe(x) instanceof type.errors === false; +} + +export function isQSOpRes(x: unknown): x is QSOpRes { + return QSOpRes(x) instanceof type.errors === false; +} + +export function isQSMsg(x: unknown): x is QSMsg { + return QSMsg(x) instanceof type.errors === false; +} diff --git a/cloud/quick-silver/types/package.json b/cloud/quick-silver/types/package.json new file mode 100644 index 000000000..a25bd92a7 --- /dev/null +++ b/cloud/quick-silver/types/package.json @@ -0,0 +1,13 @@ +{ + "name": "@fireproof/cloud-quick-silver-types", + "version": "0.0.0", + "type": "module", + "scripts": { + "build": "core-cli tsc", + "pack": "core-cli build --doPack", + "publish": "core-cli build" + }, + "dependencies": { + "arktype": "^2.1.20" + } +} diff --git a/cloud/quick-silver/types/tsconfig.json b/cloud/quick-silver/types/tsconfig.json new file mode 100644 index 000000000..a7db7c68a --- /dev/null +++ b/cloud/quick-silver/types/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + } +} diff --git a/core/device-id/device-id-client.ts b/core/device-id/device-id-client.ts index 8168675f4..c7949d099 100644 --- a/core/device-id/device-id-client.ts +++ b/core/device-id/device-id-client.ts @@ -1,13 +1,28 @@ // can create a CSR // can sign Msg -import { SuperThis } from "@fireproof/core-types-base"; +import { IssueCertificateResult, SuperThis } from "@fireproof/core-types-base"; import { getKeyBag } from "@fireproof/core-keybag"; import { ResolveOnce, Result } from "@adviser/cement"; import { DeviceIdKey } from "./device-id-key.js"; import { DeviceIdSignMsg } from "./device-id-signed-msg.js"; import { DeviceIdCSR } from "./device-id-CSR.js"; -import { DeviceIdProtocol } from "@fireproof/core-types-device-id"; +import { DeviceIdProtocol, VerifyWithCertificateResult } from "@fireproof/core-types-device-id"; + +class NoopProtocol implements DeviceIdProtocol { + issueCertificate(_msg: string): Promise> { + return Promise.resolve(Result.Err("NoopProtocol: issueCertificate not supported")); + } + verifyMsg(_message: string, _schema?: S): Promise> { + return Promise.resolve({ + valid: false, + error: new Error("NoopProtocol: verifyMsg not supported"), + errorCode: "NOOP", + partialResults: { certificateExtracted: false, jwtSignatureValid: false }, + verificationTimestamp: new Date().toISOString(), + }); + } +} class MsgSigner { #x: DeviceIdSignMsg; @@ -22,45 +37,47 @@ class MsgSigner { } const onceDeviceId = new ResolveOnce>(); +const onceDeviceIdWithoutCert = new ResolveOnce>(); export class DeviceIdClient { readonly #sthis: SuperThis; readonly #transport: DeviceIdProtocol; - constructor(sthis: SuperThis, transport: DeviceIdProtocol) { + constructor(sthis: SuperThis, transport: DeviceIdProtocol = new NoopProtocol()) { this.#sthis = sthis; this.#transport = transport; } - ensureDeviceId() { - return onceDeviceId.once(async (): Promise> => { + ensureDeviceIdWithoutCert(): Promise> { + return onceDeviceIdWithoutCert.once(async (): Promise> => { const kBag = await getKeyBag(this.#sthis); let deviceIdResult = await kBag.getDeviceId(); if (deviceIdResult.deviceId.IsNone()) { const newKey = await DeviceIdKey.create(); deviceIdResult = await kBag.setDeviceId(await newKey.exportPrivateJWK()); } - const rKey = await DeviceIdKey.createFromJWK(deviceIdResult.deviceId.unwrap()); - if (rKey.isErr()) { - return Result.Err(rKey); - } + return DeviceIdKey.createFromJWK(deviceIdResult.deviceId.unwrap()); + }); + } + + ensureDeviceId() { + return onceDeviceId.once(async (): Promise> => { + const rKey = await this.ensureDeviceIdWithoutCert(); + if (rKey.isErr()) return Result.Err(rKey); const key = rKey.Ok(); + + const kBag = await getKeyBag(this.#sthis); + let deviceIdResult = await kBag.getDeviceId(); if (deviceIdResult.cert.IsNone()) { const csr = new DeviceIdCSR(this.#sthis, key); const rCsrJWT = await csr.createCSR({ commonName: `fp-dev@${await key.fingerPrint()}` }); - if (rCsrJWT.isErr()) { - return Result.Err(rCsrJWT.Err()); - } + if (rCsrJWT.isErr()) return Result.Err(rCsrJWT.Err()); const rCertResult = await this.#transport.issueCertificate(rCsrJWT.Ok()); - if (rCertResult.isErr()) { - return Result.Err(rCertResult.Err()); - } + if (rCertResult.isErr()) return Result.Err(rCertResult.Err()); deviceIdResult = await kBag.setDeviceId(deviceIdResult.deviceId.Unwrap(), rCertResult.Ok()); } const cert = deviceIdResult.cert.unwrap(); - if (!cert) { - return Result.Err(`No certificate for ${deviceIdResult.deviceId.unwrap().kid}`); - } + if (!cert) return Result.Err(`No certificate for ${deviceIdResult.deviceId.unwrap().kid}`); return Result.Ok(new MsgSigner(new DeviceIdSignMsg(this.#sthis.txt.base64, key, cert.certificatePayload))); }); } diff --git a/core/device-id/index.ts b/core/device-id/index.ts index ba03d68be..7431b5fc7 100644 --- a/core/device-id/index.ts +++ b/core/device-id/index.ts @@ -1,4 +1,5 @@ export * from "./certor.js"; +export * from "./device-id-client.js"; export * from "./device-id-CA.js"; export * from "./device-id-CSR.js"; export * from "./device-id-key.js"; diff --git a/core/gateways/indexeddb/gateway-impl.ts b/core/gateways/indexeddb/gateway-impl.ts index 7e53e2fcf..3f369aad5 100644 --- a/core/gateways/indexeddb/gateway-impl.ts +++ b/core/gateways/indexeddb/gateway-impl.ts @@ -167,6 +167,7 @@ export class IndexedDBGateway implements Gateway { return exception2Result(async () => { const key = getKey(url, sthis.logger); const store = getStore(url, sthis, joinDBName).name; + // console.log(">>>>>>", url.toString()); sthis.logger.Debug().Url(url).Str("key", key).Str("store", store).Msg("putting"); const idb = await connectIdb("write", url, sthis); const tx = idb.db.transaction([store], "readwrite"); diff --git a/core/quick-silver/SessionLog.md b/core/quick-silver/SessionLog.md new file mode 100644 index 000000000..125744851 --- /dev/null +++ b/core/quick-silver/SessionLog.md @@ -0,0 +1,42 @@ +# Quick-Silver Session Log + +## Session 2026-02-21 + +### What was built + +New offline database in `core/quick-silver/` implementing the Fireproof `Database` interface from scratch. + +### Files created + +- `fireproof.ts` — `fireproof(name, opts?)` factory, singleton per name via `KeyedResolvOnce` +- `quick-silver.ts` — `QuickSilver` class implementing `Database` +- `types.ts` — `QSConfigOpts { sthis?: SuperThis }` +- `envelope.ts` — arktype schemas `QCDoc`, `QCFile`, `QCEnvelope` + type guards `isQCDoc/isQCFile/isQCEnvelope` +- `fireproof.test.ts` — 12 passing tests +- `package.json` — dependencies: `@adviser/cement`, `@fireproof/core-runtime`, `@fireproof/core-gateways-indexeddb`, `@fireproof/core-types-base`, `arktype` + +### Key decisions + +- `_baseURL` bakes in `PARAM.STORE = "file"` and `PARAM.NAME` — do NOT add at call sites +- `ready = Lazy(...)` — memoized start, called internally, not required by user +- `onClosed = OnFunc<() => void>()` — field IS callable to register listeners; `close()` calls `.invoke()` +- `bulk` builds `QCDoc` + `QCFile` envelopes, puts via `IndexedDBGateway`, fires `_updateListeners` / `_noUpdateListeners` +- `put` delegates to `bulk([doc])` +- `del` calls `gateway.delete(url)`, `remove` is alias +- `compact` is no-op + +### Changes to core packages + +- `core/types/base/types.ts` — added `Ende`, `EndeJson`, `EndeCbor` interfaces; added `readonly ende: Ende` to `SuperThis` +- `core/runtime/utils.ts` — implemented `ende` in `SuperThisImpl` using `cborg` + `this.txt`; `decode*` wrapped in `exception2Result` + +### Status: 12 tests passing + +### TODO next session + +- `changes` +- `allDocs` / `allDocuments` +- `query` +- Real file bytes in `QCFile` (currently `new Uint8Array()` placeholder) +- `databasesByName.unget(name)` on close/destroy so instances can be re-created +- `clone()` in `SuperThisImpl` needs `ende` wired in diff --git a/core/quick-silver/cid-storage/cid-storage.test.ts b/core/quick-silver/cid-storage/cid-storage.test.ts new file mode 100644 index 000000000..881e17ecc --- /dev/null +++ b/core/quick-silver/cid-storage/cid-storage.test.ts @@ -0,0 +1,86 @@ +import { describe, it, expect } from "vitest"; +import { CIDStorageService } from "./service.js"; +import { DexieStorageBackendImpl } from "./dexie.js"; +import { OPFSStorageBackendImpl } from "./opfs.js"; +import { consumeStream } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; + +const sthis = ensureSuperThis(); + +function streamFrom(bytes: Uint8Array): ReadableStream { + return new ReadableStream({ + start(ctrl) { + ctrl.enqueue(bytes); + ctrl.close(); + }, + }); +} + +async function drainStream(stream: ReadableStream): Promise { + const chunks = await consumeStream(stream, (c: Uint8Array) => c); + const total = chunks.reduce((s, c) => s + c.byteLength, 0); + const out = new Uint8Array(total); + let off = 0; + for (const c of chunks) { + out.set(c, off); + off += c.byteLength; + } + return out; +} + +const backends = [ + { name: "dexie", make: () => new DexieStorageBackendImpl("cid-test-dexie") }, + { name: "opfs", make: () => new OPFSStorageBackendImpl("cid-test-opfs", sthis) }, +]; + +describe.each(backends)("CIDStorageService / $name", ({ make }) => { + const svc = CIDStorageService({ backends: [make()] }); + + it("store returns cid, size, and a url", async () => { + const bytes = sthis.txt.encode("hello cid storage"); + const result = await svc.store(streamFrom(bytes)); + expect(result.isErr()).toBe(false); + const r = result.Ok(); + expect(r.cid).toBeTruthy(); + expect(r.size).toBe(bytes.byteLength); + expect(r.url).toContain(r.cid); + expect(r.created).toBeInstanceOf(Date); + }); + + it("get returns the stored bytes via the url", async () => { + const bytes = sthis.txt.encode("round trip content"); + const storeResult = await svc.store(streamFrom(bytes)); + expect(storeResult.isErr()).toBe(false); + const { url } = storeResult.Ok(); + + const getResult = await svc.get(url); + expect(getResult.isErr()).toBe(false); + const found = getResult.Ok(); + expect(found.found).toBe(true); + if (!found.found) return; + + const received = await drainStream(found.stream); + expect(received).toEqual(bytes); + expect(found.size).toBe(bytes.byteLength); + }); + + it("get returns found=false for unknown url", async () => { + const result = await svc.get(`${make().name}://?cid=badfakecid123`); + expect(result.isErr()).toBe(false); + expect(result.Ok().found).toBe(false); + }); + + it("storing same bytes twice yields the same cid", async () => { + const bytes = sthis.txt.encode("idempotent content"); + const first = await svc.store(streamFrom(bytes)); + const second = await svc.store(streamFrom(bytes)); + expect(first.Ok().cid).toBe(second.Ok().cid); + }); + + it("cid is deterministic — same content always yields same cid", async () => { + const bytes = sthis.txt.encode("deterministic content"); + const r1 = await svc.store(streamFrom(bytes)); + const r2 = await svc.store(streamFrom(bytes)); + expect(r1.Ok().cid).toBe(r2.Ok().cid); + }); +}); diff --git a/core/quick-silver/cid-storage/dexie.ts b/core/quick-silver/cid-storage/dexie.ts new file mode 100644 index 000000000..a2b8b4b55 --- /dev/null +++ b/core/quick-silver/cid-storage/dexie.ts @@ -0,0 +1,78 @@ +import { Dexie, type Table } from "dexie"; +import { Lazy, ResolveSeq, Result, consumeStream, exception2Result } from "@adviser/cement"; +import type { StorageBackend, StorageBackendReadResult, StorageBackendWriteResult } from "./types.js"; + +interface BlobRecord { + cid: string; + data: Uint8Array; + size: number; + created: Date; +} + +export const DexieStorageBackend = Lazy(({ name }: { name?: string } = {}) => new DexieStorageBackendImpl(name ?? "cid-storage")); + +export class DexieStorageBackendImpl implements StorageBackend { + readonly name: string; + readonly dexie: Dexie; + + constructor(name: string) { + this.name = name; + this.dexie = new Dexie(name); + this.dexie.version(1).stores({ blobs: "&cid, size, created" }); + } + + private get blobs(): Table { + return this.dexie.table("blobs"); + } + + private commitSeq = new ResolveSeq>(); + async store(stream: ReadableStream): Promise> { + try { + const chunks = await consumeStream(stream, (chunk: Uint8Array) => chunk); + const total = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const data = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + data.set(chunk, offset); + offset += chunk.byteLength; + } + const size = data.byteLength; + + const commit = async (cid: string): Promise> => { + return this.commitSeq.add(async () => + exception2Result(async (): Promise> => { + const existing = await this.blobs.get(cid); + if (!existing) { + await this.blobs.add({ cid, data, size, created: new Date() }, cid); + } + return Result.Ok(undefined); + }), + ); + }; + + const rollback = async (): Promise> => { + return Result.Ok(undefined); + }; + + return Result.Ok({ commit, rollback, size }); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + } + + async get(cid: string): Promise> { + try { + const record = await this.blobs.get(cid); + if (!record) return Result.Ok(undefined); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(record.data); + controller.close(); + }, + }); + return Result.Ok({ stream, size: record.size }); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + } +} diff --git a/core/quick-silver/cid-storage/opfs.ts b/core/quick-silver/cid-storage/opfs.ts new file mode 100644 index 000000000..568ea78ef --- /dev/null +++ b/core/quick-silver/cid-storage/opfs.ts @@ -0,0 +1,96 @@ +import { Lazy, Result } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import type { SuperThis } from "@fireproof/core-types-base"; +import type { StorageBackend, StorageBackendReadResult, StorageBackendWriteResult } from "./types.js"; + +export const OPFSStorageBackend = Lazy( + ({ name, sthis }: { name?: string; sthis?: SuperThis } = {}) => + new OPFSStorageBackendImpl(name ?? "cid-opfs", sthis ?? ensureSuperThis()), +); + +export class OPFSStorageBackendImpl implements StorageBackend { + readonly name: string; + readonly sthis: SuperThis; + + private _dir?: FileSystemDirectoryHandle; + + constructor(name: string, sthis: SuperThis) { + this.name = name; + this.sthis = sthis; + } + + private async dir(): Promise { + if (!this._dir) { + const root = await navigator.storage.getDirectory(); + this._dir = await root.getDirectoryHandle(this.name, { create: true }); + } + return this._dir; + } + + async store(stream: ReadableStream): Promise> { + try { + const dir = await this.dir(); + const tempName = this.sthis.nextId(12).str; + const tempHandle = await dir.getFileHandle(tempName, { create: true }); + const writable = await tempHandle.createWritable(); + + let size = 0; + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + await writable.write(value as unknown as ArrayBufferView); + size += value.byteLength; + } + } finally { + reader.releaseLock(); + } + await writable.close(); + + const commit = async (cid: string): Promise> => { + try { + try { + await dir.getFileHandle(cid); // throws if not found + // target already exists — discard temp instead of overwriting + await dir.removeEntry(tempName); + } catch { + await (tempHandle as FileSystemFileHandle & { move(name: string): Promise }).move(cid); + } + return Result.Ok(undefined); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + }; + + const rollback = async (): Promise> => { + try { + await dir.removeEntry(tempName); + return Result.Ok(undefined); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + }; + + return Result.Ok({ commit, rollback, size }); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + } + + async get(cid: string): Promise> { + try { + const dir = await this.dir(); + let fileHandle: FileSystemFileHandle; + try { + fileHandle = await dir.getFileHandle(cid); + } catch { + return Result.Ok(undefined); + } + const file = await fileHandle.getFile(); + return Result.Ok({ stream: file.stream() as ReadableStream, size: file.size }); + } catch (e) { + return Result.Err(e instanceof Error ? e : new Error(String(e))); + } + } +} diff --git a/core/quick-silver/cid-storage/service.ts b/core/quick-silver/cid-storage/service.ts new file mode 100644 index 000000000..0e47f80e2 --- /dev/null +++ b/core/quick-silver/cid-storage/service.ts @@ -0,0 +1,80 @@ +import { BuildURI, exception2Result, KeyedResolvOnce, Result, URI } from "@adviser/cement"; +import { sha256 } from "@noble/hashes/sha2"; +import { CID } from "multiformats"; +import { create as createDigest } from "multiformats/hashes/digest"; +import * as raw from "multiformats/codecs/raw"; +import { DexieStorageBackend } from "./dexie.js"; +import type { CIDGetResult, CIDStoreResult, StorageBackend } from "./types.js"; + +const SHA2_256 = 0x12; + +function hashingTap() { + const h = sha256.create(); + let size = 0; + const transform = new TransformStream({ + transform(chunk, ctrl) { + h.update(chunk); + size += chunk.byteLength; + ctrl.enqueue(chunk); + }, + }); + const getCID = () => { + const digest = createDigest(SHA2_256, h.digest()); + return { cid: CID.create(1, raw.code, digest).toString(), size }; + }; + return { transform, getCID }; +} + +const cidStorageServicePerBackend = new KeyedResolvOnce(); + +export function CIDStorageService(x?: { backends?: StorageBackend[] }) { + const bs = x?.backends ?? [DexieStorageBackend()]; + return cidStorageServicePerBackend + .get( + bs + .map((b) => b.name) + .sort() + .join(","), + ) + .once(() => new CIDStorageServiceImpl(bs)); +} + +export class CIDStorageServiceImpl { + readonly backends: StorageBackend[]; + + constructor(backends: StorageBackend[]) { + this.backends = backends; + } + + // single backend — extend to fanOut when multi-backend support is needed + private get backend(): StorageBackend { + return this.backends[0]; + } + + async store(stream: ReadableStream): Promise> { + return exception2Result(async (): Promise> => { + const { transform, getCID } = hashingTap(); + const write = await this.backend.store(stream.pipeThrough(transform)); + if (write.isErr()) return Result.Err(write); + + const { cid, size } = getCID(); + + const commit = await write.Ok().commit(cid); + if (commit.isErr()) return Result.Err(commit); + + const url = BuildURI.from(`${this.backend.name}://`).setParam("cid", cid).toString(); + + return Result.Ok({ cid, size, created: new Date(), url }); + }); + } + + async get(url: string): Promise> { + const cid = URI.from(url).getParam("cid"); + if (!cid) return Result.Err("missing cid in url"); + const result = await this.backend.get(cid); + if (result.isErr()) return Result.Err(result.Err()); + const found = result.Ok(); + if (found) return Result.Ok({ found: true, cid, size: found.size, stream: found.stream }); + return Result.Ok({ found: false, cid }); + } +} diff --git a/core/quick-silver/cid-storage/test-noble.html b/core/quick-silver/cid-storage/test-noble.html new file mode 100644 index 000000000..6518286d2 --- /dev/null +++ b/core/quick-silver/cid-storage/test-noble.html @@ -0,0 +1,153 @@ + + + + + noble/hashes streaming test + + + +

@noble/hashes — streaming sha256 in browser

+ + +
press a button…
+ + + + diff --git a/core/quick-silver/cid-storage/types.ts b/core/quick-silver/cid-storage/types.ts new file mode 100644 index 000000000..7229dfb24 --- /dev/null +++ b/core/quick-silver/cid-storage/types.ts @@ -0,0 +1,29 @@ +import { Result } from "@adviser/cement"; + +export interface StorageBackendWriteResult { + commit(cid: string): Promise>; + rollback(): Promise>; + readonly size: number; +} + +export interface StorageBackendReadResult { + readonly stream: ReadableStream; + readonly size: number; +} + +export interface StorageBackend { + readonly name: string; + store(stream: ReadableStream): Promise>; + get(cid: string): Promise>; +} + +export interface CIDStoreResult { + readonly url: string; + readonly cid: string; + readonly size: number; + readonly created: Date; +} + +export type CIDGetResult = + | { readonly found: false; readonly cid: string } + | { readonly found: true; readonly cid: string; readonly size: number; readonly stream: ReadableStream }; diff --git a/core/quick-silver/envelope.ts b/core/quick-silver/envelope.ts new file mode 100644 index 000000000..31b319e39 --- /dev/null +++ b/core/quick-silver/envelope.ts @@ -0,0 +1,68 @@ +import { type } from "arktype"; + +export const PayloadBase = type({ + idxName: "string", + cid: "string", + url: "string", + created: "string", +}); + +export const QSFileMeta = type({ + type: '"qs.file.meta"', + key: "string", + payload: type({ + filename: "string", + size: "number", + }).and(PayloadBase), +}); + +export type QSFileMeta = typeof QSFileMeta.infer; + +export function isQSFileMeta(x: unknown): x is QSFileMeta { + return QSFileMeta(x) instanceof type.errors === false; +} + +export const QSDeviceMeta = type({ + type: '"qs.device.meta"', + key: "string", // deviceId + payload: type({ + // deviceId: "string", + who: "'me' | 'other'", + "deleted?": "boolean", + }).and(PayloadBase), +}); + +export type QSDeviceMeta = typeof QSDeviceMeta.infer; + +export function isQSDeviceMeta(x: unknown): x is QSDeviceMeta { + return QSDeviceMeta(x) instanceof type.errors === false; +} + +export const QSDocMeta = type({ + type: '"qs.doc.meta"', + key: "string", + payload: type({ + primaryKey: "string", + }).and(PayloadBase), +}); + +export type QSDocMeta = typeof QSDocMeta.infer; + +export function isQSDocMeta(x: unknown): x is QSDocMeta { + return QSDocMeta(x) instanceof type.errors === false; +} + +export const QSIdxValueMeta = type({ + type: '"qs.emit.value"', + key: "string", + payload: { + keys: "unknown[]", + "emitValue?": "unknown", + }, +}); + +export type QSIdxValueMeta = typeof QSIdxValueMeta.infer; + +export function isQSEmitIdxValueMeta(x: unknown): x is QSIdxValueMeta { + return QSIdxValueMeta(x) instanceof type.errors === false; +} diff --git a/core/quick-silver/fireproof.test.ts b/core/quick-silver/fireproof.test.ts new file mode 100644 index 000000000..b96eb133b --- /dev/null +++ b/core/quick-silver/fireproof.test.ts @@ -0,0 +1,342 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { fireproof } from "./fireproof.js"; +import { Database } from "@fireproof/core-types-base"; +import { isQSDocMeta, isQSFileMeta } from "./envelope.js"; +import { ensureSuperThis } from "@fireproof/core-runtime"; + +describe("quick-silver", () => { + const sthis = ensureSuperThis(); + it("one instance per dbname", () => { + const db1 = fireproof("db"); + const db2 = fireproof("db"); + expect(db1).toBe(db2); + }); + + describe("bulk", () => { + let db: Database; + beforeEach(() => { + db = fireproof("bulk-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("should bulk insert docs", async () => { + const result = await db.bulk([ + { withoutKey: "xxxx" }, + { _id: "setkey", setKey: "key" }, + { _files: { myfile: new File(["i don't know"], "test.txt") }, files: "dings" }, + ]); + expect(result.ids).toHaveLength(3); + expect(result.ids[1]).toBe("setkey"); + }); + + it("file _ has cid matching content hash and get resolves docFile", async () => { + const file = new File(["hello file"], "hello.txt"); + + const result = await db.bulk([{ _id: "file-doc", _files: { hello: file }, label: "test" }]); + expect(result.ids[0]).toBe("file-doc"); + + const doc = await db.get<{ label: string }>("file-doc"); + const fileMetas = (doc._meta ?? []).filter(isQSFileMeta); + expect(fileMetas).toHaveLength(1); + expect(fileMetas[0].payload.filename).toBe("hello.txt"); + expect(fileMetas[0].key).toBeTruthy(); + expect(doc._files?.["hello.txt"]).toBeInstanceOf(File); + }); + }); + + describe("put", () => { + let db: Database; + beforeEach(() => { + db = fireproof("put-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("should put and get a doc", async () => { + const ok = await db.put({ _id: "puttest", foo: "baz" }); + expect(ok.id).toBe("puttest"); + const doc = await db.get<{ foo: string }>("puttest"); + expect(doc.foo).toBe("baz"); + }); + + it("should auto-generate an id", async () => { + const ok = await db.put({ bar: 42 }); + expect(ok.id).toBeTruthy(); + const doc = await db.get<{ bar: number }>(ok.id); + expect(doc.bar).toBe(42); + }); + }); + + describe("subscribe", () => { + let db: Database; + beforeEach(() => { + db = fireproof("subscribe-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("should notify subscriber with docs on put", async () => { + const received: unknown[] = []; + db.subscribe((docs) => { + received.push(...docs); + }); + await db.put({ _id: "sub1", val: 1 }); + expect(received).toHaveLength(1); + expect((received[0] as { _id: string })._id).toBe("sub1"); + }); + + it("should notify no-update subscriber without docs", async () => { + let called = 0; + db.subscribe(() => { + called++; + }, false); + await db.put({ _id: "sub2", val: 2 }); + expect(called).toBe(1); + }); + + it("should unsubscribe", async () => { + let called = 0; + const unsub = db.subscribe(() => { + called++; + }, false); + unsub(); + await db.put({ _id: "sub3", val: 3 }); + expect(called).toBe(0); + }); + }); + + describe("get", () => { + let db: Database; + beforeEach(() => { + db = fireproof("get-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("should get a doc by id", async () => { + await db.bulk([{ _id: "gettest", foo: "bar" }]); + const doc = await db.get<{ foo: string }>("gettest"); + expect(doc._id).toBe("gettest"); + expect(doc.foo).toBe("bar"); + }); + + it("should include _ metadata in returned doc", async () => { + await db.put({ _id: "with-meta", foo: "bar" }); + const doc = await db.get<{ foo: string }>("with-meta"); + expect(doc.foo).toBe("bar"); + const docMeta = (doc._meta ?? []).find(isQSDocMeta); + expect(docMeta).toBeDefined(); + expect(docMeta?.key).toBe("with-meta"); + expect(docMeta?.payload.cid).toBeTruthy(); + expect((doc._meta ?? []).filter(isQSFileMeta)).toHaveLength(0); + }); + + it("should throw NotFoundError for missing id", async () => { + await expect(db.get("missing")).rejects.toThrow(); + }); + }); + + describe("del", () => { + let db: Database; + beforeEach(() => { + db = fireproof("del-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("should delete a doc", async () => { + await db.put({ _id: "to-delete", foo: "bar" }); + const ok = await db.del("to-delete"); + expect(ok.id).toBe("to-delete"); + await expect(db.get("to-delete")).rejects.toThrow(); + }); + + it("remove should be an alias for del", async () => { + await db.put({ _id: "to-remove", foo: "bar" }); + const ok = await db.remove("to-remove"); + expect(ok.id).toBe("to-remove"); + }); + }); + + describe("destroy", () => { + it("clears all docs — get throws after destroy", async () => { + const db = fireproof("destroy-test"); + await db.put({ _id: "survivor", x: 1 }); + await db.destroy(); + + const db2 = fireproof("destroy-test"); + await expect(db2.get("survivor")).rejects.toThrow(); + await db2.destroy(); + }); + + it("allDocs returns empty after destroy", async () => { + const dbName = `destroy-alldocs-test-${sthis.nextId().str}`; + const db = fireproof(dbName); + await db.bulk([{ _id: "a" }, { _id: "b" }]); + await db.destroy(); + + const db2 = fireproof(dbName); + const result = await db2.allDocs(); + expect(result.rows).toHaveLength(0); + await db2.destroy(); + }); + }); + + describe("allDocs", () => { + let db: Database; + beforeEach(() => { + db = fireproof("alldocs-test"); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("returns all docs", async () => { + await db.bulk([ + { _id: "a", x: 1 }, + { _id: "b", x: 2 }, + { _id: "c", x: 3 }, + ]); + const result = await db.allDocs<{ x: number }>(); + expect(result.rows).toHaveLength(3); + const keys = result.rows.map((r) => r.key).sort(); + expect(keys).toEqual(["a", "b", "c"]); + expect(result.rows.find((r) => r.key === "b")?.value.x).toBe(2); + }); + + it("filters by keys", async () => { + await db.bulk([ + { _id: "a", x: 1 }, + { _id: "b", x: 2 }, + { _id: "c", x: 3 }, + ]); + const result = await db.allDocs<{ x: number }>({ keys: ["a", "c"] }); + expect(result.rows).toHaveLength(2); + expect(result.rows.map((r) => r.key).sort()).toEqual(["a", "c"]); + }); + + it("excludes deleted docs by default", async () => { + await db.bulk([ + { _id: "x", v: 1 }, + { _id: "y", v: 2 }, + ]); + await db.del("x"); + const result = await db.allDocs(); + expect(result.rows.map((r) => r.key)).not.toContain("x"); + expect(result.rows.map((r) => r.key)).toContain("y"); + }); + + it("allDocuments is an alias for allDocs", async () => { + await db.put({ _id: "z", v: 1 }); + const r1 = await db.allDocs(); + const r2 = await db.allDocuments(); + expect(r1.rows.map((r) => r.key)).toEqual(r2.rows.map((r) => r.key)); + }); + }); + + describe("query", () => { + let db: Database; + beforeEach(async () => { + db = fireproof("query-test"); + await db.bulk([ + { _id: "a", score: 3, tag: "x" }, + { _id: "b", score: 1, tag: "y" }, + { _id: "c", score: 2, tag: "x" }, + { _id: "d", score: 4, tag: "z" }, + ]); + }); + afterEach(async () => { + await db.destroy(); + }); + + it("field string — rows keyed by that field, sorted asc", async () => { + const r = await db.query<{ score: number; tag: string }, number>("score"); + expect(r.rows.map((row) => row.key)).toEqual([1, 2, 3, 4]); + expect(r.rows.map((row) => row.id)).toEqual(["b", "c", "a", "d"]); + }); + + it("field string — descending", async () => { + const r = await db.query<{ score: number }, number>("score", { descending: true }); + expect(r.rows.map((row) => row.key)).toEqual([4, 3, 2, 1]); + }); + + it("field string — limit", async () => { + const r = await db.query<{ score: number }, number>("score", { limit: 2 }); + expect(r.rows).toHaveLength(2); + expect(r.rows.map((row) => row.key)).toEqual([1, 2]); + }); + + it("field string — key filter", async () => { + const r = await db.query<{ score: number }, number>("score", { key: 2 }); + expect(r.rows).toHaveLength(1); + expect(r.rows[0].id).toBe("c"); + }); + + it("field string — keys filter", async () => { + const r = await db.query<{ score: number }, number>("score", { keys: [1, 3] }); + expect(r.rows.map((row) => row.key).sort()).toEqual([1, 3]); + }); + + it("field string — range filter (inclusive)", async () => { + const r = await db.query<{ score: number }, number>("score", { range: [2, 3] }); + expect(r.rows.map((row) => row.key)).toEqual([2, 3]); + }); + + it("MapFn — emit per doc", async () => { + const r = await db.query<{ tag: string }, string>((doc, emit) => { + emit(doc.tag); + }); + expect(r.rows.map((row) => row.key).sort()).toEqual(["x", "x", "y", "z"]); + }); + + it("MapFn — emit per return", async () => { + const r = await db.query<{ tag: string }, string>((doc, _emit) => { + return doc.tag; + }); + expect(r.rows.map((row) => row.key).sort()).toEqual(["x", "x", "y", "z"]); + }); + + it("MapFn — multi-emit per doc (one row per emit)", async () => { + const r = await db.query<{ score: number; tag: string }, string>((doc, emit) => { + emit(doc.tag, doc.score); + emit(`score:${doc.score}`); + }); + expect(r.rows).toHaveLength(8); // 4 docs × 2 emits each + }); + + it("includeDocs: false — no doc on rows", async () => { + const r = await db.query("score", { includeDocs: false }); + expect(r.rows.every((row) => row.doc === undefined)).toBe(true); + }); + + it("docs array present when includeDocs is not false", async () => { + const r = (await db.query("score")) as { rows: unknown[]; docs: unknown[] }; + expect(r.docs).toHaveLength(4); + }); + + it("skips docs without the queried field", async () => { + await db.put({ _id: "no-score", tag: "x" }); + const r = await db.query("score"); + expect(r.rows.map((row) => row.id)).not.toContain("no-score"); + await db.del("no-score"); + }); + }); + + describe("close", () => { + it("should invoke onClosed listeners", async () => { + const db = fireproof("close-test"); + let called = 0; + db.onClosed(() => { + called++; + }); + await db.close(); + expect(called).toBe(1); + }); + }); +}); diff --git a/core/quick-silver/fireproof.ts b/core/quick-silver/fireproof.ts new file mode 100644 index 000000000..b589b25e4 --- /dev/null +++ b/core/quick-silver/fireproof.ts @@ -0,0 +1,11 @@ +import { KeyedResolvOnce } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import type { Database } from "@fireproof/core-types-base"; +import type { QSConfigOpts } from "./types.js"; +import { QuickSilver } from "./quick-silver.js"; + +const databasesByName = new KeyedResolvOnce(); + +export function fireproof(name: string, opts?: QSConfigOpts): Database { + return databasesByName.get(name).once(() => new QuickSilver({ sthis: opts?.sthis ?? ensureSuperThis(), name })); +} diff --git a/core/quick-silver/idx-service/idx-service.test.ts b/core/quick-silver/idx-service/idx-service.test.ts new file mode 100644 index 000000000..658bcd53e --- /dev/null +++ b/core/quick-silver/idx-service/idx-service.test.ts @@ -0,0 +1,154 @@ +import { describe, it, expect } from "vitest"; +import { consumeStream } from "@adviser/cement"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { IdxService } from "./service.js"; +import type { IdxEntry } from "./types.js"; + +const sthis = ensureSuperThis(); + +async function drain(stream: ReadableStream): Promise { + return consumeStream(stream, (e: IdxEntry) => e); +} + +// One service instance per suite; each test gets its own dbname to stay isolated. +const svc = IdxService({ prefix: `test-${sthis.nextId(8).str}-` }); +const idxName = "byTitle"; + +function db(): string { + return `db-${sthis.nextId(8).str}`; +} + +describe("IdxService / addToIdx + query", () => { + it("stores an entry and query returns it", async () => { + const dbname = db(); + const r = await svc.addToIdx({ dbname, idxName, keys: ["apple"] }); + expect(r.isErr()).toBe(false); + + const qr = await svc.query({ dbname, idxName }); + expect(qr.isErr()).toBe(false); + const entries = await drain(qr.Ok()); + expect(entries).toHaveLength(1); + expect(entries[0].keys[0]).toBe("apple"); + }); + + it("returns entries in ascending order by default", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["cherry"] }); + await svc.addToIdx({ dbname, idxName, keys: ["apple"] }); + await svc.addToIdx({ dbname, idxName, keys: ["banana"] }); + + const qr = await svc.query({ dbname, idxName, order: "asc" }); + const entries = await drain(qr.Ok()); + expect(entries.map((e) => e.keys[0])).toEqual(["apple", "banana", "cherry"]); + }); + + it("returns entries in descending order", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["cherry"] }); + await svc.addToIdx({ dbname, idxName, keys: ["apple"] }); + await svc.addToIdx({ dbname, idxName, keys: ["banana"] }); + + const qr = await svc.query({ dbname, idxName, order: "desc" }); + const entries = await drain(qr.Ok()); + expect(entries.map((e) => e.keys[0])).toEqual(["cherry", "banana", "apple"]); + }); + + it("filters by specific keys", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["x"] }); + await svc.addToIdx({ dbname, idxName, keys: ["y"] }); + await svc.addToIdx({ dbname, idxName, keys: ["z"] }); + + const qr = await svc.query({ dbname, idxName, keys: ["x", "z"] }); + const entries = await drain(qr.Ok()); + expect(entries.map((e) => e.keys[0]).sort()).toEqual(["x", "z"]); + }); + + it("select filters entries at cursor level", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["aaa"], meta: [{ type: "t", key: "v", payload: 1 }] }); + await svc.addToIdx({ dbname, idxName, keys: ["bbb"], meta: [{ type: "t", key: "v", payload: 2 }] }); + await svc.addToIdx({ dbname, idxName, keys: ["ccc"], meta: [{ type: "t", key: "v", payload: 3 }] }); + + const qr = await svc.query({ + dbname, + idxName, + select: (r) => ((r.meta?.find((m) => m.type === "t" && m.key === "v")?.payload as number) ?? 0) > 1, + }); + const entries = await drain(qr.Ok()); + expect(entries.map((e) => e.keys[0])).toEqual(["bbb", "ccc"]); + }); +}); + +describe("IdxService / meta merging", () => { + it("merges meta — existing entries not in incoming are kept", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["k"], meta: [{ type: "tag", key: "color", payload: "yellow" }] }); + const r2 = await svc.addToIdx({ dbname, idxName, keys: ["k"], meta: [{ type: "tag", key: "size", payload: "large" }] }); + expect(r2.isErr()).toBe(false); + const r2Meta = r2.Ok().meta ?? []; + expect(r2Meta.find((m) => m.key === "color")?.payload).toBe("yellow"); + expect(r2Meta.find((m) => m.key === "size")?.payload).toBe("large"); + + const qr = await svc.query({ dbname, idxName }); + const [entry] = await drain(qr.Ok()); + const meta = entry.meta ?? []; + expect(meta.find((m) => m.key === "color")?.payload).toBe("yellow"); + expect(meta.find((m) => m.key === "size")?.payload).toBe("large"); + }); + + it("incoming wins on type+key collision", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["k"], meta: [{ type: "tag", key: "color", payload: "yellow" }] }); + const r2 = await svc.addToIdx({ dbname, idxName, keys: ["k"], meta: [{ type: "tag", key: "color", payload: "green" }] }); + expect(r2.isErr()).toBe(false); + const colorTag = (r2.Ok().meta ?? []).find((m) => m.type === "tag" && m.key === "color"); + expect(colorTag?.payload).toBe("green"); + + const qr = await svc.query({ dbname, idxName }); + const [entry] = await drain(qr.Ok()); + const queriedColorTag = (entry.meta ?? []).find((m) => m.type === "tag" && m.key === "color"); + expect(queriedColorTag?.payload).toBe("green"); + }); +}); + +describe("IdxService / soft delete", () => { + it("deleteFromIdx excludes entry from query by default", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["fig"] }); + const dr = await svc.deleteFromIdx({ dbname, idxName, keys: ["fig"] }); + expect(dr.isErr()).toBe(false); + + const qr = await svc.query({ dbname, idxName }); + const entries = await drain(qr.Ok()); + expect(entries.find((e) => e.keys[0] === "fig")).toBeUndefined(); + }); + + it("includeDeleted: true emits the deleted entry with deleted=true", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["grape"] }); + await svc.deleteFromIdx({ dbname, idxName, keys: ["grape"] }); + + const qr = await svc.query({ dbname, idxName, includeDeleted: true }); + const entries = await drain(qr.Ok()); + const grape = entries.find((e) => e.keys[0] === "grape"); + expect(grape?.deleted).toBe(true); + }); + + it("addToIdx resets the deleted marker so entry reappears", async () => { + const dbname = db(); + await svc.addToIdx({ dbname, idxName, keys: ["mango"] }); + await svc.deleteFromIdx({ dbname, idxName, keys: ["mango"] }); + await svc.addToIdx({ dbname, idxName, keys: ["mango"] }); + + const qr = await svc.query({ dbname, idxName }); + const entries = await drain(qr.Ok()); + expect(entries.find((e) => e.keys[0] === "mango")).toBeDefined(); + }); + + it("deleteFromIdx is a no-op for an unknown entry", async () => { + const dbname = db(); + const r = await svc.deleteFromIdx({ dbname, idxName, keys: ["ghost"] }); + expect(r.isErr()).toBe(false); + }); +}); diff --git a/core/quick-silver/idx-service/primary-key-strategy.test.ts b/core/quick-silver/idx-service/primary-key-strategy.test.ts new file mode 100644 index 000000000..94f23812f --- /dev/null +++ b/core/quick-silver/idx-service/primary-key-strategy.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from "vitest"; +import { ensureSuperThis } from "@fireproof/core-runtime"; +import { PrimaryKeyStrategy } from "./primary-key-strategy.js"; +import { IdxService } from "./service.js"; +import { isQSDeviceMeta, QSDocMeta, QSFileMeta } from "../envelope.js"; +import { consumeStream } from "@adviser/cement"; +import { IdxEntry } from "./types.js"; + +const sthis = ensureSuperThis(); +const svc = IdxService({ prefix: `test-${sthis.nextId(8).str}-` }); + +describe("PrimaryKeyStrategy", () => { + it("lazily resolves device fingerprint", async () => { + const strategy = new PrimaryKeyStrategy({ sthis }); + const fp = await strategy.deviceFingerPrint(); + expect(fp).toBeTruthy(); + expect(typeof fp).toBe("string"); + }); + + it("write: _id gets all meta, _cid gets one entry per CID with device meta", async () => { + const dbname = `db-${sthis.nextId(8).str}`; + const strategy = new PrimaryKeyStrategy({ sthis, idxService: svc }); + const deviceId = await strategy.deviceFingerPrint(); + + const docMeta: QSDocMeta = { + type: "qs.doc.meta", + key: "doc-1", + payload: { + idxName: "_id", + cid: "bafydoc111", + url: "x://?cid=bafydoc111", + primaryKey: "doc-1", + created: new Date().toISOString(), + }, + }; + const fileMeta1: QSFileMeta = { + type: "qs.file.meta", + key: "file-1", + payload: { + idxName: "_id", + cid: "bafyfile111", + url: "x://?cid=bafyfile111", + filename: "photo.jpg", + size: 1024, + created: new Date().toISOString(), + }, + }; + const fileMeta2: QSFileMeta = { + type: "qs.file.meta", + key: "file-2", + payload: { + idxName: "_id", + cid: "bafyfile222", + url: "x://?cid=bafyfile222", + filename: "thumb.jpg", + size: 256, + created: new Date().toISOString(), + }, + }; + + const r = await svc.transaction(dbname, (tx) => + strategy.write(tx, { dbname, idxName: "_id", keys: ["doc-1"], meta: [docMeta, fileMeta1, fileMeta2] }, "doc-1"), + ); + expect(r.isErr()).toBe(false); + + // _id: exactly one qs.doc.meta and two qs.file.meta + const idQr = await svc.query({ dbname, idxName: "_id" }); + expect(idQr.isErr()).toBe(false); + const idReader = idQr.Ok().getReader(); + const { value: idEntry } = await idReader.read(); + idReader.releaseLock(); + expect(idEntry?.keys[0]).toBe("doc-1"); + + const idDocMetas = idEntry?.meta?.filter((m) => m.type === "qs.doc.meta") ?? []; + expect(idDocMetas).toHaveLength(1); + expect((idDocMetas[0].payload as QSDocMeta["payload"]).cid).toBe("bafydoc111"); + expect((idDocMetas[0].payload as QSDocMeta["payload"]).url).toBe("x://?cid=bafydoc111"); + + const idFileMetas = idEntry?.meta?.filter((m) => m.type === "qs.file.meta") ?? []; + expect(idFileMetas).toHaveLength(2); + expect((idFileMetas[0].payload as QSFileMeta["payload"]).cid).toBe("bafyfile111"); + expect((idFileMetas[1].payload as QSFileMeta["payload"]).cid).toBe("bafyfile222"); + + // _cid: one entry per CID, each with its source meta + one qs.device.meta keyed by deviceId + const cidQr = await svc.query({ dbname, idxName: "_cid" }); + expect(cidQr.isErr()).toBe(false); + const cidEntries = await consumeStream(cidQr.Ok(), (a) => [a.serializedKey, a] as const); + const byCid = cidEntries.reduce( + (acc, [key, entry]) => { + if (!acc[key]) acc[key] = []; + acc[key].push(entry); + return acc; + }, + {} as Record, + ); + for (const cid of ["bafydoc111", "bafyfile111", "bafyfile222"]) { + for (const idx of byCid[cid]) { + const deviceMetas = (idx.meta ?? []).filter(isQSDeviceMeta); + expect(deviceMetas).toHaveLength(1); + expect(deviceMetas[0].key).toBe(deviceId); + } + } + }); +}); diff --git a/core/quick-silver/idx-service/primary-key-strategy.ts b/core/quick-silver/idx-service/primary-key-strategy.ts new file mode 100644 index 000000000..03f46d340 --- /dev/null +++ b/core/quick-silver/idx-service/primary-key-strategy.ts @@ -0,0 +1,106 @@ +import { exception2Result, Lazy, Result } from "@adviser/cement"; +import { SuperThis } from "@fireproof/core-types-base"; +import { DeviceIdClient, DeviceIdKey } from "@fireproof/core-device-id"; +import { IdxService, IdxServiceImpl, defaultIdxStrategy } from "./service.js"; +import { AddToIdxOpts, IdxEntry, IdxStrategy, IdxTransaction, MetaEntry } from "./types.js"; +import { isQSDocMeta, isQSFileMeta, QSDeviceMeta, QSDocMeta, QSFileMeta } from "../envelope.js"; + +export interface PrimaryKeyOpts { + readonly sthis: SuperThis; + readonly deviceIdKey?: DeviceIdKey; + readonly idxStrategy?: IdxStrategy; + readonly idxService?: IdxServiceImpl; +} + +// export interface TickPrimaryKey { +// readonly key: string; +// readonly dbname: string; +// readonly cidUrl: string; +// readonly meta?: MetaEntry[]; +// } + +// export interface TickOpts { +// readonly primaryKey: TickPrimaryKey; +// readonly idxService?: IdxServiceImpl; +// } + +export class PrimaryKeyStrategy implements IdxStrategy { + #opts: Omit & { + readonly idxStrategy: IdxStrategy; + readonly idxService: IdxServiceImpl; + }; + + readonly deviceFingerPrint = Lazy(async () => { + if (this.#opts.deviceIdKey) { + return this.#opts.deviceIdKey.fingerPrint(); + } + const rKey = await new DeviceIdClient(this.#opts.sthis).ensureDeviceIdWithoutCert(); + if (rKey.isErr()) throw rKey.Err(); + return rKey.Ok().fingerPrint(); + }); + + constructor(opts: PrimaryKeyOpts) { + this.#opts = { + ...opts, + idxStrategy: opts.idxStrategy ?? defaultIdxStrategy, + idxService: opts.idxService ?? IdxService(), + }; + } + + getDocFileMeta(meta: MetaEntry[] = []): (QSDocMeta | QSFileMeta)[] { + return meta.reduce( + (cids, m) => { + if (isQSDocMeta(m) || isQSFileMeta(m)) { + cids.push(m); + } + return cids; + }, + [] as (QSDocMeta | QSFileMeta)[], + ); + } + + async write(tx: IdxTransaction, opts: AddToIdxOpts, _serializedKey: string): Promise> { + const r = await exception2Result(async (): Promise> => { + const deviceId = await this.deviceFingerPrint(); + const rIdx = await this.#opts.idxService.addToIdx({ + strategy: this.#opts.idxStrategy, + dbname: opts.dbname, + idxName: "_id", + keys: opts.keys, + meta: opts.meta, + tx, + }); + if (rIdx.isErr()) return Result.Err(rIdx); + + const deviceIdMetas: MetaEntry[] = []; + for (const m of this.getDocFileMeta(opts.meta)) { + const rCid = await this.#opts.idxService.addToIdx({ + strategy: this.#opts.idxStrategy, + tx, + dbname: opts.dbname, + idxName: "_cid", + keys: [m.payload.cid], + meta: [ + { + type: "qs.device.meta", + key: deviceId, + payload: { + idxName: "_cid", + url: m.payload.url, + who: "me", + cid: m.payload.cid, + created: new Date().toISOString(), + deleted: false, + }, + } satisfies QSDeviceMeta, + ], + }); + if (rCid.isErr()) return Result.Err(rCid); + deviceIdMetas.push(...(rCid.Ok().meta ?? [])); + } + + return Result.Ok({ ...rIdx.Ok(), meta: [...(rIdx.Ok().meta ?? []), ...deviceIdMetas] }); + }); + return r; + } +} diff --git a/core/quick-silver/idx-service/service.ts b/core/quick-silver/idx-service/service.ts new file mode 100644 index 000000000..0d2473a3d --- /dev/null +++ b/core/quick-silver/idx-service/service.ts @@ -0,0 +1,199 @@ +import { Dexie, type Collection, type Table } from "dexie"; // Dexie used for minKey/maxKey + IdxDB base +import { consumeStream, exception2Result, KeyedResolvOnce, Result } from "@adviser/cement"; +import type { + AddToIdxOpts, + DeleteFromIdxOpts, + IdxEntry, + IdxQueryOpts, + IdxServiceOpts, + IdxStrategy, + IdxTransaction, + MetaEntry, +} from "./types.js"; + +function serializeKey(keys: string[]): string { + return keys.length === 1 ? keys[0] : JSON.stringify(keys); +} + +export class KeyIdxStrategy implements IdxStrategy { + async write(tx: IdxTransaction, opts: AddToIdxOpts, serializedKey: string): Promise> { + const rExisting = await tx.get([opts.idxName, serializedKey]); + if (rExisting.isErr()) { + return Result.Err(rExisting); + } + const existing = rExisting.Ok(); + const metaMap = new Map(); + for (const e of existing?.meta ?? []) metaMap.set(`${e.type}:${e.key}`, e); + for (const m of opts.meta ?? []) metaMap.set(`${m.type}:${m.key}`, m); + + const entry: IdxEntry = { + idxName: opts.idxName, + serializedKey, + keys: opts.keys, + // cidUrl: opts.cidUrl, + // primaryKey: opts.primaryKey, + meta: [...metaMap.values()], + deleted: false, + }; + + const rPut = await tx.put(entry); + if (rPut.isErr()) { + return Result.Err(rPut); + } + return Result.Ok(entry); + } +} + +export const defaultIdxStrategy = new KeyIdxStrategy(); + +class IdxDB extends Dexie { + readonly idx!: Table; + + constructor(name: string) { + super(name); + this.version(1).stores({ + idx: "&[idxName+serializedKey], serializedKey", + }); + // this.idx = this.dexie.table("idx"); + } + + // transaction(mode: "readonly" | "rw", table: Table, fn: (tx: Transaction) => Promise): Promise { + // return this.dexie.transaction(mode, table, async (dtx) => { + // const o = await exception2Result(() => fn(dtx)); // we need to wait here + // return o as T; + // }); + // } +} + +const idxServiceSingleton = new KeyedResolvOnce(); + +export function IdxService({ prefix = "Idx" }: IdxServiceOpts = {}): IdxServiceImpl { + return idxServiceSingleton.get(prefix).once(() => new IdxServiceImpl(prefix)); +} + +export class IdxServiceImpl { + readonly prefix: string; + + private readonly dbRegistry = new KeyedResolvOnce(); + + constructor(prefix: string) { + this.prefix = prefix; + } + + private prepare(dbname: string): IdxDB { + return this.dbRegistry.get(dbname).once(() => new IdxDB(`${this.prefix}${dbname}`)); + } + + private idx(dbname: string): Table { + return this.prepare(dbname).table("idx"); + } + + async addToIdx(opts: AddToIdxOpts): Promise> { + return exception2Result(async (): Promise> => { + const serializedKey = serializeKey(opts.keys); + const strategy = opts.strategy ?? defaultIdxStrategy; + if (opts.tx) { + const ret = await strategy.write(opts.tx, opts, serializedKey); + return ret; + } + return this.transaction(opts.dbname, (tx) => strategy.write(tx, opts, serializedKey)); + }); + } + + async transaction(dbname: string, fn: (tx: IdxTransaction) => Promise): Promise { + const db = this.prepare(dbname); + return db.transaction("rw", db.idx, async (dtx) => { + const tx: IdxTransaction = { + get: (key: string[]) => + dtx.idx + .get(key as [string, string]) + .then((res) => Result.Ok(res)) + .catch((e) => Result.Err(e as Error)), + put: (entry: IdxEntry) => + dtx.idx + .put(entry) + .then(() => undefined) + .then(() => Result.Ok(undefined)) + .catch((e) => Result.Err(e as Error)), + del: (key: string[]) => + dtx.idx + .delete(key as [string, string]) + .then(() => Result.Ok(undefined)) + .catch((e) => Result.Err(e as Error)), + }; + const o = await exception2Result(() => fn(tx)); // we need to wait here + return o as T; + }); + } + + async deleteFromIdx(opts: DeleteFromIdxOpts): Promise> { + return exception2Result(async (): Promise> => { + const serializedKey = serializeKey(opts.keys); + const db = this.prepare(opts.dbname); + + await db.transaction("rw", db.idx, async () => { + const existing = await db.idx.get([opts.idxName, serializedKey]); + if (!existing) return; + await db.idx.put({ ...existing, deleted: true }); + }); + + return Result.Ok(undefined); + }); + } + + async destroyDb(dbname: string): Promise { + const db = this.prepare(dbname); + const r = await this.query({ dbname, idxName: "_id" }); + if (r.isErr()) { + console.error("IdxService.destroyDb: failed to query db before deletion", dbname, r.Err()); + } + const toDelete = await consumeStream(r.Ok(), (entry) => { + return entry.keys; // we only need the keys to delete, and we can do it in parallel with the db deletion, so we don't await it here. We will await the deleteFromIdx call after the db.delete() call, which should be fast since it's just marking entries as deleted. + }); + for (const keys of toDelete) { + await this.deleteFromIdx({ dbname, idxName: "_id", keys }); + } + await db.delete(); + this.dbRegistry.delete(dbname); + } + + async query(opts: IdxQueryOpts): Promise>> { + return exception2Result(async (): Promise>> => { + const { dbname, idxName, keys = [], order = "asc" } = opts; + const idx = this.idx(dbname); + + let collection: Collection; + + if (keys.length > 0) { + const serializedKeys = keys.map((k) => serializeKey([k])); + collection = idx + .where("serializedKey") + .anyOf(serializedKeys) + .and((r) => r.idxName === idxName); + } else { + collection = idx.where("[idxName+serializedKey]").between([idxName, Dexie.minKey], [idxName, Dexie.maxKey]); + } + + if (order === "desc") { + collection = collection.reverse(); + } + + if (!opts.includeDeleted) { + collection = collection.and((r) => !r.deleted); + } + + if (opts.select) { + collection = collection.and(opts.select); + } + + const stream = new ReadableStream({ + async start(ctrl) { + await collection.each((entry) => ctrl.enqueue(entry)); + ctrl.close(); + }, + }); + + return Result.Ok(stream); + }); + } +} diff --git a/core/quick-silver/idx-service/types.ts b/core/quick-silver/idx-service/types.ts new file mode 100644 index 000000000..4f8528a0e --- /dev/null +++ b/core/quick-silver/idx-service/types.ts @@ -0,0 +1,57 @@ +import { Result } from "@adviser/cement"; + +export interface MetaEntry { + readonly type: string; + readonly key: string; + readonly payload: unknown; +} + +export interface IdxEntry { + readonly idxName: string; + readonly serializedKey: string; + readonly keys: string[]; + // readonly cidUrl: string; + // readonly primaryKey?: string; + readonly meta?: MetaEntry[]; + readonly deleted?: boolean; +} + +export interface IdxTransaction { + get(key: string[]): Promise>; + put(entry: IdxEntry): Promise>; + del(key: string[]): Promise>; +} + +export interface IdxStrategy { + write(tx: IdxTransaction, opts: AddToIdxOpts, serializedKey: string): Promise>; +} + +export interface AddToIdxOpts { + readonly dbname: string; + readonly idxName: string; + readonly keys: string[]; + // readonly cidUrl: string; + // readonly primaryKey?: string; + readonly meta?: MetaEntry[]; + readonly tx?: IdxTransaction; + readonly strategy?: IdxStrategy; +} + +export interface DeleteFromIdxOpts { + readonly dbname: string; + readonly idxName: string; + readonly keys: string[]; +} + +export interface IdxServiceOpts { + readonly prefix?: string; +} + +export interface IdxQueryOpts { + readonly dbname: string; + readonly idxName: string; + readonly keys?: string[]; + readonly order?: "asc" | "desc"; + readonly select?: (row: IdxEntry) => boolean; + readonly includeDeleted?: boolean; +} diff --git a/core/quick-silver/package.json b/core/quick-silver/package.json new file mode 100644 index 000000000..da7544111 --- /dev/null +++ b/core/quick-silver/package.json @@ -0,0 +1,53 @@ +{ + "name": "@fireproof/core-quick-silver", + "version": "0.0.0", + "description": "Live ledger for the web.", + "type": "module", + "main": "./index.js", + "scripts": { + "build": "core-cli tsc", + "test": "vitest --run", + "pack": "core-cli build --doPack", + "publish": "core-cli build" + }, + "keywords": [ + "ledger", + "JSON", + "document", + "IPLD", + "CID", + "IPFS" + ], + "contributors": [ + "J Chris Anderson", + "Alan Shaw", + "Travis Vachon", + "Mikeal Rogers", + "Meno Abels" + ], + "author": "J Chris Anderson", + "license": "AFL-2.0", + "homepage": "https://use-fireproof.com", + "gptdoc": "import { fireproof } from 'use-fireproof'; const db = fireproof('app-db-name'); const ok = await db.put({ anyField: ['any','json'] }); const doc = await db.get(ok.id); await db.del(doc._id); db.subscribe(myRedrawFn); const result = await db.query('anyField', {range : ['a', 'z']}); result.rows.map(({ key }) => key);", + "repository": { + "type": "git", + "url": "git+https://github.com/fireproof-storage/fireproof.git" + }, + "bugs": { + "url": "https://github.com/fireproof-storage/fireproof/issues" + }, + "dependencies": { + "@adviser/cement": "^0.5.22", + "@fireproof/core-gateways-indexeddb": "workspace:*", + "@fireproof/core-runtime": "workspace:0.0.0", + "@fireproof/core-device-id": "workspace:0.0.0", + "@fireproof/core-types-base": "workspace:*", + "@noble/hashes": "^1.8.0", + "arktype": "^2.1.29", + "dexie": "^4.3.0", + "multiformats": "^13.4.2" + }, + "devDependencies": { + "@fireproof/core-cli": "workspace:0.0.0" + } +} diff --git a/core/quick-silver/quick-silver.ts b/core/quick-silver/quick-silver.ts new file mode 100644 index 000000000..3ece1c5d7 --- /dev/null +++ b/core/quick-silver/quick-silver.ts @@ -0,0 +1,537 @@ +import type { + Database, + Ledger, + SuperThis, + DocTypes, + DocWithId, + DocSet, + DocResponse, + BulkResponse, + ClockHead, + ChangesOptions, + ChangesResponse, + AllDocsQueryOpts, + AllDocsResponse, + ListenerFn, + MapFn, + IndexKeyType, + DocFragment, + QueryOpts, + QueryResult, + Attachable, + Attached, +} from "@fireproof/core-types-base"; +import { + KeyedResolvOnce, + Logger, + LoggerImpl, + OnFunc, + Result, + consumeStream, + stream2uint8array, + stripper, + toSortedObject, + uint8array2stream, +} from "@adviser/cement"; +import { + QSDocMeta, + QSIdxValueMeta, + QSFileMeta, + isQSDocMeta, + isQSEmitIdxValueMeta as isQSIdxValueMeta, + isQSFileMeta, +} from "./envelope.js"; +import { NotFoundError } from "@fireproof/core-types-base"; +import { ensureLogger, ensureSuperThis, hashStringSync } from "@fireproof/core-runtime"; +import { CIDStorageService } from "./cid-storage/service.js"; +import { IdxService } from "./idx-service/service.js"; +import { PrimaryKeyStrategy } from "./idx-service/primary-key-strategy.js"; +import { IdxEntry, MetaEntry } from "./idx-service/types.js"; +import { CIDGetResult, CIDStoreResult } from "./cid-storage/types.js"; +import { type } from "arktype"; + +function secIdxName(field: string | MapFn): string { + return typeof field === "string" ? `_field_${field}` : `_map_${hashStringSync(field.toString())}`; +} + +function compareKeys(a: IndexKeyType, b: IndexKeyType): number { + if (Array.isArray(a) && Array.isArray(b)) { + for (let i = 0; i < Math.min(a.length, b.length); i++) { + const c = compareKeys(a[i], b[i]); + if (c !== 0) return c; + } + return a.length - b.length; + } + if (a < b) return -1; + if (a > b) return 1; + return 0; +} + +export interface QuickSilverOpts { + readonly sthis?: SuperThis; + readonly name: string; + readonly cacheSize?: number; +} + +export class QuickSilver implements Database { + readonly name: string; + readonly sthis: SuperThis; + readonly logger: Logger; + + private readonly _docCache = new KeyedResolvOnce>(); + private readonly _updateListeners = OnFunc<(docs: DocWithId[]) => void>(); + private readonly _noUpdateListeners = OnFunc<() => void>(); + private readonly _primaryKeyStrategy: PrimaryKeyStrategy; + + get ledger(): Ledger { + throw new Error("not implemented"); + } + + constructor(opts: QuickSilverOpts) { + this.name = opts.name; + this.sthis = opts.sthis ?? ensureSuperThis({ logger: new LoggerImpl() }); + this.logger = ensureLogger(this.sthis, "QuickSilver"); + const cacheSize = opts.cacheSize ?? 64; + if (cacheSize > 0) { + this._docCache.setParam({ lru: { maxEntries: cacheSize } }); + } + this._primaryKeyStrategy = new PrimaryKeyStrategy({ sthis: this.sthis }); + } + + readonly onClosed = OnFunc<() => void>(); + + readonly ready = (): Promise => Promise.resolve(); + + async close(): Promise { + this.onClosed.invoke(); + } + + async destroy(): Promise { + await IdxService().destroyDb(this.name); + } + + attach(_a: Attachable): Promise { + throw new Error("not implemented"); + } + + findQSDocMeta(meta: MetaEntry[] = []): QSDocMeta | undefined { + return meta.find((m) => isQSDocMeta(m)); + } + + async get(id: string): Promise> { + return this._docCache.get(id).once(async () => { + const rQ = await IdxService().query({ dbname: this.name, idxName: "_id", keys: [id] }); + if (rQ.isErr()) throw rQ.Err(); + + const entries = await consumeStream(rQ.Ok(), (r) => r); + if (entries.length === 0) throw new NotFoundError(`doc not found: ${id}`); + const item = entries[0]; + const cids = await Promise.all([ + (async () => { + const docMeta = this.findQSDocMeta(item.meta); + if (!docMeta) return Result.Err(new Error(`no doc meta found for id: ${id}`)); + return CIDStorageService().get(docMeta.payload.url); + })(), + ...(item.meta ?? []) + .filter((m) => isQSFileMeta(m)) + .map((m) => + CIDStorageService() + .get(m.payload.url) + .then((r) => { + if (r.isErr()) return r; + const getFile = r.Ok(); + if (!getFile.found) return { found: false }; + return Result.Ok({ meta: m, found: getFile.found, stream: getFile.stream }); + }), + ), + ]); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const rData = cids.shift()! as Result; + if (rData.isErr()) throw rData.Err(); + const data = rData.Ok(); + if (!data.found) throw new NotFoundError(`doc content not found for id: ${id}`); + + const _files: Record = {}; + for (const cidItem of cids as unknown as Result[]) { + if (cidItem.isErr()) continue; + const r = cidItem.Ok(); + if (r.found === false) continue; + const qsMeta = QSFileMeta(r.meta); + if (qsMeta instanceof type.errors) continue; + const blob = await new Response(r.stream).blob(); + const file = new File([blob], qsMeta.payload.filename, { + type: "application/octet-stream", + lastModified: Date.parse(qsMeta.payload.created), + }); + _files[qsMeta.payload.filename] = file; + } + + const decoded = this.sthis.ende.cbor.decodeUint8(await stream2uint8array(data.stream)); + if (decoded.isErr()) throw decoded.Err(); + return { + _id: item.keys[0], + _meta: item.meta, + _files, + ...decoded.Ok(), + } as DocWithId; + }) as Promise>; + } + + async put(doc: DocSet): Promise { + const { ids, clock, name } = await this.bulk([doc]); + return { id: ids[0], clock, name }; + } + + qsFiles(files: Record): Promise[]> { + return Promise.all( + Object.values(files).map(async (file) => { + const rFile = await CIDStorageService().store(file.stream()); + if (rFile.isErr()) return Result.Err(rFile); + const rFileIdx = await IdxService().addToIdx({ + dbname: this.name, + idxName: "_files", + keys: [rFile.Ok().cid], + meta: [ + { + type: "qs.file.meta", + key: rFile.Ok().cid, + payload: { + idxName: "_files", + url: rFile.Ok().url, + filename: file.name, + size: rFile.Ok().size, + created: new Date().toISOString(), + cid: rFile.Ok().cid, + }, + } satisfies QSFileMeta, + ], + }); + return rFileIdx; + }), + ); + } + + async bulk(docs: DocSet[]): Promise { + const writtenDocs: DocWithId[] = []; + + // dexie abort transaction if something other is called + await this._primaryKeyStrategy.deviceFingerPrint(); + + const docAndFiles = await Promise.all( + docs.map(async (doc): Promise; docResult: CIDStoreResult; files: IdxEntry[] }>> => { + const raw = doc as DocSet & { _files?: Record }; + const data = toSortedObject(stripper(/(_id|_files|_publicFiles|_meta|_deleted)/, raw)) as object; + const rData = await CIDStorageService().store(uint8array2stream(this.sthis.ende.cbor.encodeToUint8(data))); + if (rData.isErr()) { + return Result.Err(rData); + } + const qcFiles = await this.qsFiles(raw._files ?? {}); + if (qcFiles.some((f) => f.isErr())) { + return Result.Err(new Error(`failed to store files for doc with id: ${raw._id ?? "unknown"}`)); + } + return Result.Ok({ doc, docResult: rData.Ok(), files: qcFiles.map((i) => i.Ok()) }); + }), + ); + if (docAndFiles.some((r) => r.isErr())) { + throw new Error(`failed to store some docs/files: ${docAndFiles.map((r) => r.Err().message).join("\n ")}`); + } + await IdxService().transaction(this.name, async (tx) => { + for (const rDoc of docAndFiles) { + if (rDoc.isErr()) continue; + const daf = rDoc.Ok(); + const id = daf.doc._id ?? this.sthis.timeOrderedNextId().str; + + const fileMeta = daf.files + .filter((f) => !!f.meta) + .map((f) => f.meta) + .flat() as MetaEntry[]; + const docMeta: QSDocMeta = { + type: "qs.doc.meta", + key: id, + payload: { + idxName: "_id", + primaryKey: id, + cid: daf.docResult.cid, + url: daf.docResult.url, + created: new Date().toISOString(), + }, + }; + const meta = [docMeta, ...fileMeta]; + const rIdx = await IdxService().addToIdx({ + tx, + dbname: this.name, + idxName: "_id", + keys: [id], + meta, + strategy: this._primaryKeyStrategy, + }); + if (rIdx.isErr()) { + console.log("Failed to index doc with id:", id, "error:", rIdx.Err()); + continue; + } + this._docCache.delete(id); + writtenDocs.push({ + ...(daf.doc as DocTypes), + _id: id, + _meta: meta, + }); + // console.log("Doc indexed with id:", id, "meta:", docMeta, "file meta:", fileMeta); + } + }); + + this._updateListeners.invoke(writtenDocs); + this._noUpdateListeners.invoke(); + + return { ids: writtenDocs.map((d) => d._id), clock: [], name: this.name }; + } + + async del(id: string): Promise { + const result = await IdxService().deleteFromIdx({ dbname: this.name, idxName: "_id", keys: [id] }); + if (result.isErr()) throw result.Err(); + this._docCache.delete(id); + return { id, clock: [], name: this.name }; + } + + remove(id: string): Promise { + return this.del(id); + } + + changes(_since?: ClockHead, _opts?: ChangesOptions): Promise> { + throw new Error("not implemented"); + } + + async allDocs(opts?: Partial): Promise> { + const rQ = await IdxService().query({ + dbname: this.name, + idxName: "_id", + keys: opts?.keys, + includeDeleted: opts?.includeDeleted, + }); + if (rQ.isErr()) throw rQ.Err(); + + const entries = await consumeStream(rQ.Ok(), (r) => r); + const rows = await Promise.all( + entries.map(async (entry) => ({ + key: entry.keys[0], + value: await this.get(entry.keys[0]), + })), + ); + return { rows, clock: [], name: this.name }; + } + + allDocuments(opts?: Partial): Promise> { + return this.allDocs(opts); + } + + subscribe(listener: ListenerFn, updates?: boolean): () => void { + if (updates === false) { + return this._noUpdateListeners(listener as () => void); + } + return this._updateListeners(listener as (docs: DocWithId[]) => void); + } + + async query< + T extends DocTypes, + K extends IndexKeyType = string, + R extends DocFragment = T, + O extends Partial> = Partial>, + >(field: string | MapFn, opts?: O): Promise> { + const idxName = secIdxName(field); + + // Build secondary index if empty + const rCheck = await IdxService().query({ dbname: this.name, idxName }); + if (rCheck.isErr()) throw rCheck.Err(); + const checkReader = rCheck.Ok().getReader(); + let hasEntries = false; + try { + const { done } = await checkReader.read(); + hasEntries = !done; + } finally { + checkReader.releaseLock(); + } + + if (!hasEntries) { + const rPrimary = await IdxService().query({ dbname: this.name, idxName: "_id" }); + if (rPrimary.isErr()) throw rPrimary.Err(); + const primaryReader = rPrimary.Ok().getReader(); + try { + interface PendingEmit { + k: IndexKeyType; + v?: DocFragment; + } + while (true) { + const { done, value: entry } = await primaryReader.read(); + if (done) break; + + let doc: DocWithId; + try { + doc = await this.get(entry.keys[0]); + } catch { + continue; + } + // const docId = entry.keys[0]; + const docMeta = this.findQSDocMeta(entry.meta); + if (!docMeta) continue; + + const pending = new Map(); + if (typeof field === "string") { + const key = (doc as unknown as Record)[field]; + if (key !== undefined) pending.set(String(key), [{ k: key as IndexKeyType }]); + } else { + let called = 0; + const emit = (k: IndexKeyType, v?: DocFragment) => { + called++; + const key = JSON.stringify(k); + if (!pending.has(key)) pending.set(key, []); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + pending.get(key)!.push({ k, v }); + }; + try { + const ret = field(doc, emit); + if (!called && ret !== undefined && ret !== null) { + if (!pending.has(JSON.stringify(ret))) { + pending.set(JSON.stringify(ret), []); + } + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + pending.get(JSON.stringify(ret))!.push({ k: ret as IndexKeyType }); + } + } catch (e) { + console.log("Error executing map function for doc with id:", doc._id, "error:", e); + } + } + + for (const [_sk, emits] of pending.entries()) { + const meta: MetaEntry[] = [ + { + type: "qs.doc.meta", + key: doc._id, + payload: { + primaryKey: doc._id, + idxName, + cid: docMeta.payload.cid, + url: docMeta.payload.url, + created: new Date().toISOString(), + }, + } satisfies QSDocMeta, + ...Array.from(emits).reduce((acc, emit) => { + acc.push({ + type: "qs.emit.value", + key: `${JSON.stringify(emit.k)}:${doc._id}`, + payload: { + keys: Array.isArray(emit.k) ? emit.k : [emit.k], + emitValue: emit.v, + }, + } satisfies QSIdxValueMeta); + return acc; + }, [] as MetaEntry[]), + ]; + await IdxService().addToIdx({ + dbname: this.name, + idxName, + keys: Array.isArray(emits[0].k) ? emits[0].k.map(String) : [String(emits[0].k)], + meta, + }); + } + } + } finally { + primaryReader.releaseLock(); + } + } + + // Build select filter from opts + const select = (() => { + if (opts?.key !== undefined) { + const s = JSON.stringify(opts.key); + return (e: IdxEntry) => e.keys[0] === s; + } + if (opts?.keys && (opts.keys as unknown[]).length > 0) { + const sset = new Set((opts.keys as unknown[]).map((k) => JSON.stringify(k))); + return (e: IdxEntry) => sset.has(e.keys[0]); + } + if (opts?.range) { + const [lo, hi] = opts.range as [K, K]; + return (e: IdxEntry) => { + const k = JSON.parse(e.keys[0]) as K; + return compareKeys(k, lo) >= 0 && compareKeys(k, hi) <= 0; + }; + } + if (opts?.prefix !== undefined) { + const prefix = String(opts.prefix); + return (e: IdxEntry) => String(JSON.parse(e.keys[0])).startsWith(prefix); + } + return undefined; + })(); + + const rQ = await IdxService().query({ dbname: this.name, idxName, select }); + if (rQ.isErr()) throw rQ.Err(); + + interface EmittedRow { + id: string; + key: K; + value: R; + doc: DocWithId; + } + const emitted: EmittedRow[] = []; + const reader = rQ.Ok().getReader(); + try { + while (true) { + const { done, value: entry } = await reader.read(); + if (done) break; + + const docMeta = this.findQSDocMeta(entry.meta); + if (!docMeta) continue; + + let doc: DocWithId; + try { + doc = await this.get(docMeta.payload.primaryKey); + } catch { + continue; + } + + // const valueMeta = entry.meta?.find((m) => m.type === "qs.emit.value" && m.key === docId); + // const value = (valueMeta ? valueMeta.payload : emittedKey) as unknown as R; + + const idxValues = entry.meta?.filter((m) => isQSIdxValueMeta(m)); + for (const idxValue of idxValues ?? []) { + if (idxValue instanceof type.errors) continue; + if (idxValue.payload.keys.length === 0) continue; + let key: K; + if (idxValue.payload.keys.length === 1) { + key = idxValue.payload.keys[0] as K; + } else { + key = idxValue.payload.keys as unknown as K; + } + + emitted.push({ + id: doc._id, + key, + value: {} as R, + doc, + }); + } + } + } finally { + reader.releaseLock(); + } + + emitted.sort((a, b) => compareKeys(a.key, b.key)); + if (opts?.descending) emitted.reverse(); + const filtered = opts?.limit ? emitted.slice(0, opts.limit) : emitted; + + const rows = filtered.map(({ id, key, value, doc }) => ({ + id, + key, + value, + doc: opts?.includeDocs !== false ? doc : undefined, + })); + + if (opts?.includeDocs === false) { + return { rows } as QueryResult; + } + return { rows, docs: filtered.map((r) => r.doc) } as QueryResult; + } + + compact(): Promise { + return Promise.resolve(); + } +} diff --git a/core/quick-silver/tsconfig.json b/core/quick-silver/tsconfig.json new file mode 100644 index 000000000..9f2759456 --- /dev/null +++ b/core/quick-silver/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist" + } +} diff --git a/core/quick-silver/types.ts b/core/quick-silver/types.ts new file mode 100644 index 000000000..8e6858383 --- /dev/null +++ b/core/quick-silver/types.ts @@ -0,0 +1,5 @@ +import type { SuperThis } from "@fireproof/core-types-base"; + +export interface QSConfigOpts { + readonly sthis?: SuperThis; +} diff --git a/core/quick-silver/vitest.config.ts b/core/quick-silver/vitest.config.ts new file mode 100644 index 000000000..5783144f7 --- /dev/null +++ b/core/quick-silver/vitest.config.ts @@ -0,0 +1,22 @@ +import { defineConfig } from "vitest/config"; +import { playwright } from "@vitest/browser-playwright"; + +export default defineConfig({ + test: { + name: "quick-silver", + exclude: ["dist/**", "node_modules/**", "examples/**", "gateway/file"], + include: ["**/*test.?(c|m)[jt]s?(x)"], + browser: { + enabled: true, + headless: true, + provider: playwright(), + instances: [ + { + browser: "chromium", + }, + ], + screenshotFailures: false, + }, + // setupFiles: "./setup.indexeddb.ts", + }, +}); diff --git a/core/runtime/utils.ts b/core/runtime/utils.ts index 22ea0920f..d2a94672b 100644 --- a/core/runtime/utils.ts +++ b/core/runtime/utils.ts @@ -17,6 +17,9 @@ import { AppContext, toSortedArray, toSorted, + top_uint8, + exception2Result, + CoerceBinaryInput, } from "@adviser/cement"; import { PARAM, @@ -28,7 +31,9 @@ import { PromiseToUInt8, ToUInt8, HasLogger, + Ende, } from "@fireproof/core-types-base"; +import { decode as cborDecode, encode as cborEncode } from "cborg"; import { base58btc } from "multiformats/bases/base58"; import { sha256 } from "multiformats/hashes/sha2"; import { CID } from "multiformats/cid"; @@ -62,6 +67,7 @@ class SuperThisImpl implements SuperThis { readonly ctx: AppContext; readonly txt: TextEndeCoder; readonly crypto: CryptoRuntime; + readonly ende: Ende; constructor(opts: superThisOpts) { this.logger = opts.logger; @@ -70,7 +76,18 @@ class SuperThisImpl implements SuperThis { this.pathOps = opts.pathOps; this.txt = opts.txt; this.ctx = opts.ctx; - // console.log("superThis", this); + this.ende = { + json: { + encodeToStr: (obj) => JSON.stringify(obj), + encodeToUint8: (obj) => this.txt.encode(JSON.stringify(obj)), + decodeStr: (str: string) => exception2Result(() => JSON.parse(str)) as Result, + decodeUint8: (uint8: Uint8Array) => exception2Result(() => JSON.parse(this.txt.decode(uint8))) as Result, + }, + cbor: { + encodeToUint8: (obj) => cborEncode(obj), + decodeUint8: (uint8: Uint8Array) => exception2Result(() => cborDecode(uint8)) as Result, + }, + }; } nextId(bytes = 6): { str: string; bin: Uint8Array } { @@ -596,6 +613,11 @@ class Hasher { } export async function hashStringAsync(str: string): Promise { const bytes = json.encode(str); + return hashBlobAsync(bytes); +} + +export async function hashBlobAsync(uint8: CoerceBinaryInput | Blob): Promise { + const bytes = await top_uint8(uint8); const hash = await sha256.digest(bytes); return CID.create(1, json.code, hash).toString(); } diff --git a/core/types/base/types.ts b/core/types/base/types.ts index f7bf23e05..165d10496 100644 --- a/core/types/base/types.ts +++ b/core/types/base/types.ts @@ -138,6 +138,23 @@ export interface TextEndeCodable { txt: TextEndeCoder; } +export interface EndeJson { + encodeToStr(obj: object): string; + encodeToUint8(obj: object): Uint8Array; + decodeStr(str: string): Result; + decodeUint8(uint8: Uint8Array): Result; +} + +export interface EndeCbor { + encodeToUint8(obj: object): Uint8Array; + decodeUint8(uint8: Uint8Array): Result; +} + +export interface Ende { + readonly json: EndeJson; + readonly cbor: EndeCbor; +} + export interface SuperThisOpts { // readonly crypto?: CryptoRuntime; readonly logger: Logger; @@ -155,6 +172,7 @@ export interface SuperThis { readonly pathOps: PathOps; readonly ctx: AppContext; readonly txt: TextEndeCoder; + readonly ende: Ende; // hash(): string; timeOrderedNextId(time?: number): { str: string; toString: () => string }; nextId(bytes?: number): { str: string; bin: Uint8Array; toString: () => string }; @@ -198,6 +216,7 @@ export interface ConfigOpts extends Partial { readonly writeQueue?: Partial; readonly gatewayInterceptor?: SerdeGatewayInterceptor; readonly autoCompact?: number; + readonly databaseFactory?: (name: string, opts: ConfigOpts) => Database; // could be registered with registerCompactStrategy(name: string, compactStrategy: CompactStrategy) readonly compactStrategy?: string; // default "FULL" other "fireproof" , "no-op" readonly storeUrls?: StoreUrlsOpts; @@ -233,6 +252,7 @@ export interface DocBase { readonly _id: string; readonly _files?: DocFiles; readonly _publicFiles?: DocFiles; + readonly _meta?: unknown[]; readonly _deleted?: boolean; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 64062a5bd..50ce0595f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -170,6 +170,9 @@ importers: '@fireproof/core-protocols-dashboard': specifier: workspace:* version: link:../../core/protocols/dashboard + '@fireproof/core-quick-silver': + specifier: workspace:* + version: link:../../core/quick-silver '@fireproof/core-runtime': specifier: workspace:* version: link:../../core/runtime @@ -431,6 +434,49 @@ importers: specifier: ^8.8.5 version: 8.8.5 + cloud/quick-silver/api: + dependencies: + '@adviser/cement': + specifier: ^0.5.22 + version: 0.5.23(typescript@5.9.3) + '@fireproof/cloud-quick-silver-types': + specifier: workspace:* + version: link:../types + '@fireproof/core-runtime': + specifier: workspace:* + version: link:../../../core/runtime + cmd-ts: + specifier: ^0.15.0 + version: 0.15.0 + + cloud/quick-silver/cf: + dependencies: + '@adviser/cement': + specifier: ^0.5.22 + version: 0.5.23(typescript@5.9.3) + '@fireproof/cloud-quick-silver-types': + specifier: workspace:* + version: link:../types + '@fireproof/core-runtime': + specifier: workspace:* + version: link:../../../core/runtime + '@fireproof/core-types-base': + specifier: workspace:* + version: link:../../../core/types/base + devDependencies: + '@cloudflare/workers-types': + specifier: ^4.20260214.0 + version: 4.20260302.0 + wrangler: + specifier: ^4.66.0 + version: 4.67.0(@cloudflare/workers-types@4.20260302.0)(bufferutil@4.1.0)(utf-8-validate@5.0.10) + + cloud/quick-silver/types: + dependencies: + arktype: + specifier: ^2.1.20 + version: 2.1.29 + cloud/todo-app: dependencies: '@adviser/cement': @@ -896,6 +942,40 @@ importers: specifier: ^4.3.6 version: 4.3.6 + core/quick-silver: + dependencies: + '@adviser/cement': + specifier: ^0.5.22 + version: 0.5.23(typescript@5.9.3) + '@fireproof/core-device-id': + specifier: workspace:0.0.0 + version: link:../device-id + '@fireproof/core-gateways-indexeddb': + specifier: workspace:* + version: link:../gateways/indexeddb + '@fireproof/core-runtime': + specifier: workspace:0.0.0 + version: link:../runtime + '@fireproof/core-types-base': + specifier: workspace:* + version: link:../types/base + '@noble/hashes': + specifier: ^1.8.0 + version: 1.8.0 + arktype: + specifier: ^2.1.29 + version: 2.1.29 + dexie: + specifier: ^4.3.0 + version: 4.3.0 + multiformats: + specifier: ^13.4.2 + version: 13.4.2 + devDependencies: + '@fireproof/core-cli': + specifier: workspace:0.0.0 + version: link:../../cli + core/runtime: dependencies: '@adviser/cement': @@ -1659,6 +1739,12 @@ packages: resolution: {integrity: sha512-UrcABB+4bUrFABwbluTIBErXwvbsU/V7TZWfmbgJfbkwiBuziS9gxdODUyuiecfdGQ85jglMW6juS3+z5TsKLw==} engines: {node: '>=10'} + '@ark/schema@0.56.0': + resolution: {integrity: sha512-ECg3hox/6Z/nLajxXqNhgPtNdHWC9zNsDyskwO28WinoFEnWow4IsERNz9AnXRhTZJnYIlAJ4uGn3nlLk65vZA==} + + '@ark/util@0.56.0': + resolution: {integrity: sha512-BghfRC8b9pNs3vBoDJhcta0/c1J1rsoS1+HgVUreMFPdhz/CRAKReAu57YEllNaSy98rWAdY1gE+gFup7OXpgA==} + '@babel/code-frame@7.29.0': resolution: {integrity: sha512-9NhCeYjq9+3uxgdtp20LSiJXJvN0FeCtNGpJxuMFZ1Kv3cWUNb6DOhJwUvcVCzKGR66cw4njwM6hrJLqgOwbcw==} engines: {node: '>=6.9.0'} @@ -3970,6 +4056,12 @@ packages: aria-query@5.3.0: resolution: {integrity: sha512-b0P0sZPKtyu8HkeRAfCq0IfURZK+SuwMjY1UXGBU27wpAiTwQAIlq56IbIO+ytk/JjS1fMR14ee5WBBfKi5J6A==} + arkregex@0.0.5: + resolution: {integrity: sha512-ncYjBdLlh5/QnVsAA8De16Tc9EqmYM7y/WU9j+236KcyYNUXogpz3sC4ATIZYzzLxwI+0sEOaQLEmLmRleaEXw==} + + arktype@2.1.29: + resolution: {integrity: sha512-jyfKk4xIOzvYNayqnD8ZJQqOwcrTOUbIU4293yrzAjA3O1dWh61j71ArMQ6tS/u4pD7vabSPe7nG3RCyoXW6RQ==} + array-buffer-byte-length@1.0.2: resolution: {integrity: sha512-LHE+8BuR7RYGDKvnrmcuSq3tDcKv9OFEXQt/HpbZhY7V6h0zlUXutnAD82GiFx9rdieCMjkvtcsPqBwgUl1Iiw==} engines: {node: '>= 0.4'} @@ -4454,6 +4546,9 @@ packages: resolution: {integrity: sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==} engines: {node: '>=8'} + dexie@4.3.0: + resolution: {integrity: sha512-5EeoQpJvMKHe6zWt/FSIIuRa3CWlZeIl6zKXt+Lz7BU6RoRRLgX9dZEynRfXrkLcldKYCBiz7xekTEylnie1Ug==} + didyoumean@1.2.2: resolution: {integrity: sha512-gxtyfqMg7GKyhQmb056K7M3xszy/myH8w+B4RT+QXBQsvAOdc3XymqDDPHx1BgPgsdAA5SIifona89YtRATDzw==} @@ -7369,6 +7464,12 @@ snapshots: '@alloc/quick-lru@5.2.0': {} + '@ark/schema@0.56.0': + dependencies: + '@ark/util': 0.56.0 + + '@ark/util@0.56.0': {} + '@babel/code-frame@7.29.0': dependencies: '@babel/helper-validator-identifier': 7.28.5 @@ -9726,6 +9827,16 @@ snapshots: dependencies: dequal: 2.0.3 + arkregex@0.0.5: + dependencies: + '@ark/util': 0.56.0 + + arktype@2.1.29: + dependencies: + '@ark/schema': 0.56.0 + '@ark/util': 0.56.0 + arkregex: 0.0.5 + array-buffer-byte-length@1.0.2: dependencies: call-bound: 1.0.4 @@ -10278,6 +10389,8 @@ snapshots: detect-libc@2.1.2: {} + dexie@4.3.0: {} + didyoumean@1.2.2: {} diff-sequences@29.6.3: {} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index ceef2453c..d6e8cd9ab 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -3,6 +3,7 @@ packages: - "cloud/3rd-party" - "cloud/todo-app" - "cloud/base" + - "cloud/quick-silver/*" - "cloud/backend/*" - "cli" - "core/types/*" diff --git a/use-fireproof/base/react/use-fireproof.ts b/use-fireproof/base/react/use-fireproof.ts index 1daeee091..85f59c261 100644 --- a/use-fireproof/base/react/use-fireproof.ts +++ b/use-fireproof/base/react/use-fireproof.ts @@ -1,4 +1,4 @@ -import { fireproof } from "@fireproof/core-base"; +import { fireproof, isDatabase } from "@fireproof/core-base"; import { useMemo } from "react"; import { createAttach } from "./use-attach.js"; import type { UseFPConfig, UseFireproof } from "./types.js"; @@ -28,10 +28,15 @@ export const FireproofCtx = {} as UseFireproof; * * */ -export function useFireproof(name: string | Database = "useFireproof", config: UseFPConfig = {}): UseFireproof { +export function useFireproof(nameOrDatabase: string | Database = "useFireproof", config: UseFPConfig = {}): UseFireproof { const strConfig = hashObjectSync(config); - const database = useMemo(() => (typeof name === "string" ? fireproof(name, config) : name), [name, strConfig]); + const database = useMemo(() => { + if (isDatabase(nameOrDatabase)) { + return nameOrDatabase; + } + return (config.databaseFactory ?? fireproof)(nameOrDatabase, config); + }, [nameOrDatabase, strConfig]); const attach = createAttach(database, config); const useDocument = useMemo(() => createUseDocument(database), [database.name, strConfig]); diff --git a/vitest.config.ts b/vitest.config.ts index 763ab7fee..a2304c70e 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -6,6 +6,7 @@ export default defineConfig({ "core/tests/vitest.file.config.ts", "core/tests/vitest.indexeddb.config.ts", "core/tests/vitest.memory.config.ts", + "core/quick-silver/vitest.config.ts", "use-fireproof/tests/vitest.config.ts", "cloud/backend/cf-d1/vitest.config.ts",