diff --git a/bun.lock b/bun.lock index 10ba56f..b14c2ed 100644 --- a/bun.lock +++ b/bun.lock @@ -14,14 +14,14 @@ }, "packages/accounts": { "name": "@memory.build/accounts", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@pydantic/logfire-node": "^0.13.1", }, }, "packages/cli": { "name": "@memory.build/cli", - "version": "0.2.5", + "version": "0.2.6", "bin": { "me": "./index.ts", }, @@ -38,7 +38,7 @@ }, "packages/client": { "name": "@memory.build/client", - "version": "0.2.5", + "version": "0.2.6", "dependencies": { "@memory.build/protocol": "workspace:*", }, @@ -83,7 +83,7 @@ }, "packages/embedding": { "name": "@memory.build/embedding", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@ai-sdk/openai": "^3.0.0", "@pydantic/logfire-node": "^0.13.1", @@ -92,14 +92,21 @@ }, "packages/engine": { "name": "@memory.build/engine", - "version": "0.2.0", + "version": "0.2.5", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1", + }, + }, + "packages/engine-core": { + "name": "@memory.build/engine-core", + "version": "0.0.0", "dependencies": { "@pydantic/logfire-node": "^0.13.1", }, }, "packages/protocol": { "name": "@memory.build/protocol", - "version": "0.2.5", + "version": "0.2.6", "dependencies": { "zod": "^4.0.0", }, @@ -112,7 +119,7 @@ }, "packages/server": { "name": "memory-engine-server", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@memory.build/accounts": "workspace:*", "@memory.build/embedding": "workspace:*", @@ -153,7 +160,7 @@ }, "packages/worker": { "name": "@memory.build/worker", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@memory.build/embedding": "workspace:*", "@memory.build/engine": "workspace:*", @@ -375,6 +382,8 @@ "@memory.build/engine": ["@memory.build/engine@workspace:packages/engine"], + "@memory.build/engine-core": ["@memory.build/engine-core@workspace:packages/engine-core"], + "@memory.build/protocol": ["@memory.build/protocol@workspace:packages/protocol"], "@memory.build/web": ["@memory.build/web@workspace:packages/web"], diff --git a/package.json b/package.json index 29e8f2f..dd898b6 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "clean": "rm -rf packages/cli/dist dist", "docs": "./bun --filter @memory.build/docs-site dev", "docs:build": "./bun --filter @memory.build/docs-site build", + "engine-core:migrate": "./bun scripts/migrate-engine-core.ts", "web": "./bun --filter @memory.build/web dev", "web:build": "./bun --filter @memory.build/web build", "generate:master-key": "./bun scripts/generate-master-key.ts", diff --git a/packages/engine-core/index.ts b/packages/engine-core/index.ts new file mode 100644 index 0000000..e69de29 diff --git a/packages/engine-core/migrate/bootstrap.integration.test.ts b/packages/engine-core/migrate/bootstrap.integration.test.ts new file mode 100644 index 0000000..dce1f54 --- /dev/null +++ b/packages/engine-core/migrate/bootstrap.integration.test.ts @@ -0,0 +1,148 @@ +import { afterAll, beforeAll, describe, expect, test } from "bun:test"; +import { SQL, semver } from "bun"; +import { bootstrapEngineDatabase } from "./bootstrap"; + +const adminUrl = + process.env.ENGINE_CORE_TEST_DATABASE_URL ?? + "postgresql://postgres@localhost:5432/postgres"; + +// These tests expect the local Postgres image from docker/Dockerfile.postgres, +// usually started with `./bun run pg`, unless ENGINE_CORE_TEST_DATABASE_URL is set. + +let dbName: string | undefined; +let sql: SQL | undefined; + +function assertSafeIdentifier(name: string): void { + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(name)) { + throw new Error(`Unsafe database identifier: ${name}`); + } +} + +async function createTestDatabase(): Promise { + dbName = `test_engine_core_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + assertSafeIdentifier(dbName); + + const admin = new SQL(adminUrl); + try { + await admin.unsafe(`create database ${dbName}`); + } finally { + await admin.close(); + } + + const url = new URL(adminUrl); + url.pathname = `/${dbName}`; + return url.toString(); +} + +async function dropTestDatabase(): Promise { + if (!dbName) return; + assertSafeIdentifier(dbName); + + const admin = new SQL(adminUrl); + try { + await admin` + select pg_terminate_backend(pid) + from pg_stat_activity + where datname = ${dbName} + and pid <> pg_backend_pid() + `; + await admin.unsafe(`drop database if exists ${dbName}`); + } finally { + await admin.close(); + dbName = undefined; + } +} + +function getSql(): SQL { + if (!sql) throw new Error("test database is not initialized"); + return sql; +} + +beforeAll(async () => { + const connectionString = await createTestDatabase(); + sql = new SQL(connectionString); +}); + +afterAll(async () => { + await sql?.close(); + sql = undefined; + await dropTestDatabase(); +}); + +describe("bootstrapEngineDatabase", () => { + test("creates required extensions in public", async () => { + await bootstrapEngineDatabase(getSql()); + + const rows = await getSql()` + select e.extname, e.extversion, n.nspname + from pg_extension e + inner join pg_namespace n on n.oid = e.extnamespace + where e.extname in ('citext', 'ltree', 'vector', 'pg_textsearch') + order by e.extname + `; + + expect(rows.map((row: { extname: string }) => row.extname)).toEqual([ + "citext", + "ltree", + "pg_textsearch", + "vector", + ]); + + const minimumVersions = new Map([ + ["citext", "1.6"], + ["ltree", "1.3"], + ["pg_textsearch", "1.1.0"], + ["vector", "0.8.2"], + ]); + for (const row of rows as Array<{ + extname: string; + extversion: string; + nspname: string; + }>) { + expect(row.nspname).toBe("public"); + expect( + semver.order(row.extversion, minimumVersions.get(row.extname)!), + ).toBeGreaterThanOrEqual(0); + } + }); + + test("creates required nologin roles", async () => { + await bootstrapEngineDatabase(getSql()); + + const rows = await getSql()` + select rolname, rolcanlogin + from pg_roles + where rolname in ('me_ro', 'me_rw', 'me_embed') + order by rolname + `; + + expect(rows).toHaveLength(3); + expect(rows.map((row: { rolname: string }) => row.rolname)).toEqual([ + "me_embed", + "me_ro", + "me_rw", + ]); + for (const row of rows as Array<{ rolcanlogin: boolean }>) { + expect(row.rolcanlogin).toBe(false); + } + }); + + test("is idempotent", async () => { + await bootstrapEngineDatabase(getSql()); + await bootstrapEngineDatabase(getSql()); + + const [{ extensionCount }] = await getSql()` + select count(*)::int as "extensionCount" + from pg_extension + where extname in ('citext', 'ltree', 'vector', 'pg_textsearch') + `; + const [{ roleCount }] = await getSql()` + select count(*)::int as "roleCount" + from pg_roles + where rolname in ('me_ro', 'me_rw', 'me_embed') + `; + + expect(extensionCount).toBe(4); + expect(roleCount).toBe(3); + }); +}); diff --git a/packages/engine-core/migrate/bootstrap.ts b/packages/engine-core/migrate/bootstrap.ts new file mode 100644 index 0000000..a5f747e --- /dev/null +++ b/packages/engine-core/migrate/bootstrap.ts @@ -0,0 +1,189 @@ +import { info, reportError, span } from "@pydantic/logfire-node"; +import { SQL, semver } from "bun"; + +const REQUIRED_EXTENSIONS = [ + { name: "citext", minVersion: "1.6" }, + { name: "ltree", minVersion: "1.3" }, + { name: "vector", minVersion: "0.8.2" }, + { name: "pg_textsearch", minVersion: "1.1.0" }, +] as const; + +export async function bootstrapEngineDatabase( + sql: SQL, + statementTimeout: string = "20s", + lockTimeout: string = "5s", + transactionTimeout: string = "30s", + idleInTransactionSessionTimeout: string = "30s", + shardId?: number, +): Promise { + const attributes = { + "db.shard": shardId, + "db.statement_timeout": statementTimeout, + "db.lock_timeout": lockTimeout, + "db.transaction_timeout": transactionTimeout, + "db.idle_in_transaction_session_timeout": idleInTransactionSessionTimeout, + "engine_core.required_extensions": REQUIRED_EXTENSIONS.map( + (extension) => `${extension.name}@>=${extension.minVersion}`, + ), + }; + + await span("engine_core.bootstrap", { + attributes, + callback: async () => { + try { + await sql.begin(async (tx) => { + if (shardId !== undefined) { + await tx.unsafe(`set local pgdog.shard to ${String(shardId)}`); + } + await ensurePostgresVersion(tx); + await span("engine_core.bootstrap.acquire_lock", { + callback: () => acquireAdvisoryLock(tx), + }); + await tx`select set_config('statement_timeout', ${statementTimeout}, true)`; + await tx`select set_config('lock_timeout', ${lockTimeout}, true)`; + await tx`select set_config('transaction_timeout', ${transactionTimeout}, true)`; + await tx`select set_config('idle_in_transaction_session_timeout', ${idleInTransactionSessionTimeout}, true)`; + for (const extension of REQUIRED_EXTENSIONS) { + await span("engine_core.bootstrap.ensure_extension", { + attributes: { + "db.extension": extension.name, + "db.extension_min_version": extension.minVersion, + }, + callback: () => + ensureExtension(tx, extension.name, extension.minVersion), + }); + } + /* TODO: remove + await span("engine_core.bootstrap.ensure_roles", { + callback: () => ensureRoles(tx), + }); + */ + }); + info("Engine core bootstrap completed", attributes); + } catch (error) { + reportError("Engine core bootstrap failed", error as Error, attributes); + throw error; + } + }, + }); +} + +const MAX_LOCK_RETRIES = 5; +const BASE_DELAY_MS = 100; +const BOOTSTRAP_LOCK_ID = 1982010637711; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function acquireAdvisoryLock(tx: SQL): Promise { + let acquired = false; + for (let attempt = 0; attempt < MAX_LOCK_RETRIES; attempt++) { + const [result] = await tx` + select pg_try_advisory_xact_lock(${BOOTSTRAP_LOCK_ID}) as acquired + `; + if (result.acquired) { + acquired = true; + break; + } + if (attempt < MAX_LOCK_RETRIES - 1) { + await sleep(BASE_DELAY_MS * 2 ** attempt); + } + } + + if (!acquired) { + throw new Error(`Failed to acquire advisory lock`); + } +} + +async function ensurePostgresVersion(tx: SQL): Promise { + const [{ server_version_num }] = await tx` + select current_setting('server_version_num')::int as server_version_num + `; + if (server_version_num < 180000) { + throw new Error( + `PostgreSQL version 18 or higher is required (found ${server_version_num})`, + ); + } +} + +async function ensureExtension( + tx: SQL, + name: string, + minVersion: string, +): Promise { + const [installed] = await tx` + select x.extversion, n.nspname + from pg_extension x + inner join pg_namespace n on (x.extnamespace = n.oid) + where x.extname = ${name} + `; + + if (installed) { + if ( + installed.nspname === "public" && + semver.order(installed.extversion, minVersion) >= 0 + ) { + return; + } + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required in the "public" schema (found ${installed.extversion} installed in "${installed.nspname}")`, + ); + } + + const [available] = await tx` + select default_version + from pg_available_extensions + where name = ${name} + `; + + if (!available || semver.order(available.default_version, minVersion) < 0) { + const found = available + ? `found ${available.default_version} available` + : "not available"; + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required (${found})`, + ); + } + + try { + await tx`create extension if not exists ${tx(name)} with schema public`; + } catch (error: unknown) { + // Ignore duplicate extension errors (race condition in concurrent calls) + if ( + error instanceof SQL.PostgresError && + error.errno === "23505" && + error.constraint === "pg_extension_name_index" + ) { + return; + } + throw error; + } +} + +/* TODO: remove this +async function ensureRoles(tx: SQL): Promise { + await tx.unsafe(` + do $block$ + declare + _roles text[] = array['me_ro', 'me_rw', 'me_embed']; + _role text; + _sql text; + begin + for _role in select * from unnest(_roles) loop + perform + from pg_roles r + where r.rolname = _role; + if found then + continue; + end if; + _sql = format($sql$create role %I nologin$sql$, _role); + execute _sql; + _sql = format($sql$grant %I to %I$sql$, _role, current_user); + execute _sql; + end loop; + end; + $block$; + `); +} +*/ diff --git a/packages/engine-core/migrate/idempotent/001_user.sql b/packages/engine-core/migrate/idempotent/001_user.sql new file mode 100644 index 0000000..e69de29 diff --git a/packages/engine-core/migrate/idempotent/002_role_membership.sql b/packages/engine-core/migrate/idempotent/002_role_membership.sql new file mode 100644 index 0000000..e82b78f --- /dev/null +++ b/packages/engine-core/migrate/idempotent/002_role_membership.sql @@ -0,0 +1,242 @@ +------------------------------------------------------------------------------- +-- would_create_cycle +------------------------------------------------------------------------------- +create or replace function {{schema}}.would_create_cycle +( _role_id uuid +, _member_id uuid +) +returns boolean +as $func$ + with recursive ancestors(id) as + ( + select rm.role_id + from {{schema}}.role_membership rm + where rm.member_id = _role_id + union + select rm.role_id + from {{schema}}.role_membership rm + inner join ancestors a on a.id = rm.member_id + ) + select _member_id = _role_id + or exists + ( + select 1 + from ancestors + where id = _member_id + ) +$func$ language sql stable security invoker parallel safe +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- role_membership_before_write trigger +------------------------------------------------------------------------------- +-- Prevent role membership cycles for ordinary writes. +-- Note: this check observes the current transaction snapshot. Concurrent +-- transactions that insert/update related role edges can still race unless the +-- caller uses stronger locking or serializable transactions around +-- role_membership writes. +create or replace function {{schema}}.role_membership_before_write() +returns trigger +as $func$ +begin + if {{schema}}.would_create_cycle(new.role_id, new.member_id) then + raise exception 'role membership would create a cycle: role_id %, member_id %', new.role_id, new.member_id + using errcode = 'integrity_constraint_violation'; + end if; + return new; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +create or replace trigger role_membership_before_write_trg +before insert or update of role_id, member_id on {{schema}}.role_membership +for each row +execute function {{schema}}.role_membership_before_write() +; + +------------------------------------------------------------------------------- +-- calc_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.calc_role_membership(_user_id uuid) +returns table +( role_id uuid +, superuser bool +, dist int4 +) +as $func$ + with recursive ancestors(id, dist) as + ( + select rm.role_id, 1::int4 + from {{schema}}.role_membership rm + where rm.member_id = _user_id + union + select rm.role_id, a.dist + 1 + from {{schema}}.role_membership rm + inner join ancestors a on a.id = rm.member_id + ) + select + u.id + , u.superuser + , 0::int4 + from {{schema}}."user" u + where u.id = _user_id + union + select + u.id + , u.superuser + , a.dist + from {{schema}}."user" u + inner join ancestors a on (u.id = a.id) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- is_superuser +------------------------------------------------------------------------------- +create or replace function {{schema}}.is_superuser(_user_id uuid) +returns boolean +as $func$ + select exists + ( + select 1 + from {{schema}}.calc_role_membership(_user_id) x + where x.superuser + ) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- list_direct_role_members +------------------------------------------------------------------------------- +create or replace function {{schema}}.list_direct_role_members +( _requestor_id uuid +, _role_id uuid +) +returns table +( member_id uuid +, admin bool +) +as $func$ + select + r.member_id + , r.admin + from {{schema}}.role_membership r + where r.role_id = _role_id + and + ( + -- requestor must be an admin on role + -- or requestor must be a superuser + exists + ( + select 1 + from {{schema}}.role_membership m + where m.role_id = _role_id + and m.member_id = _requestor_id + and m.admin + ) + or {{schema}}.is_superuser(_requestor_id) + ) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- grant_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.grant_role_membership +( _grantor_id uuid +, _role_id uuid +, _member_id uuid +, _admin bool default false +) +returns void +as $func$ +declare + _allowed bool; +begin + -- exclusive write access required to fully ensure against cycle creation by concurrent writes + lock table {{schema}}.role_membership in share row exclusive mode; + + -- is grantor allowed to do this? + select + exists + ( + -- does the grantor have with admin privilege directly on this role? + select 1 + from {{schema}}.role_membership rm + where rm.role_id = _role_id + and rm.member_id = _grantor_id + and rm.admin + ) + or {{schema}}.is_superuser(_grantor_id) -- or are they a superuser (even indirectly)? + into strict _allowed + ; + + if not _allowed then + raise exception 'grantor must be a superuser or have with admin option on role: grantor_id % role_id %', _grantor_id, _role_id + using errcode = 'insufficient_privilege'; + end if; + + -- role_membership_before_write_trg protects against cycles in the graph + insert into {{schema}}.role_membership + ( role_id + , member_id + , admin + ) + values + ( _role_id + , _member_id + , _admin + ) + on conflict (member_id, role_id) + do update set admin = _admin + ; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +------------------------------------------------------------------------------- +-- revoke_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.revoke_role_membership +( _revoker_id uuid +, _role_id uuid +, _member_id uuid +) +returns void +as $func$ +declare + _allowed bool; +begin + lock table {{schema}}.role_membership in share row exclusive mode; + + -- is revoker allowed to do this? + select + exists + ( + -- does the revoker have with admin privilege directly on this role? + select 1 + from {{schema}}.role_membership rm + where rm.role_id = _role_id + and rm.member_id = _revoker_id + and rm.admin + ) + or {{schema}}.is_superuser(_revoker_id) -- or are they a superuser (even indirectly)? + into strict _allowed + ; + + if not _allowed then + raise exception 'revoker must be a superuser or have with admin option on role: revoker_id % role_id %', _revoker_id, _role_id + using errcode = 'insufficient_privilege'; + end if; + + delete from {{schema}}.role_membership d + where d.role_id = _role_id + and d.member_id = _member_id + ; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; diff --git a/packages/engine-core/migrate/idempotent/003_tree_access.sql b/packages/engine-core/migrate/idempotent/003_tree_access.sql new file mode 100644 index 0000000..6180d82 --- /dev/null +++ b/packages/engine-core/migrate/idempotent/003_tree_access.sql @@ -0,0 +1,156 @@ +------------------------------------------------------------------------------- +-- calc_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.calc_tree_access(_user_id uuid) +returns table +( role_id uuid +, tree_path ltree +, access int2 +) +as $func$ + with r as + ( + -- the user and the roles they belong to + select + x.role_id + , x.superuser + from {{schema}}.calc_role_membership(_user_id) x + ) + -- superuser + select + r.role_id + , ''::ltree as tree_path + , 3::int2 /* owner */ as access + from r + where r.superuser + union all + -- grants + select + r.role_id + , a.tree_path + , a.access::int2 + from r + inner join {{schema}}.tree_access a on (r.role_id = a.user_id) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- has_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.has_tree_access +( _user_id uuid +, _tree_path ltree +, _access int4 +) +returns bool +as $func$ + select exists + ( + select 1 + from {{schema}}.calc_tree_access(_user_id) x + where x.tree_path @> _tree_path + and x.access >= _access + and _access in (1, 2, 3) + ) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- set_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.set_tree_access +( _grantor_id uuid +, _tree_path ltree +, _user_id uuid +, _access int4 +) +returns bool +as $func$ +begin + -- grantor must be superuser or owner of tree + if not {{schema}}.has_tree_access(_grantor_id, _tree_path, 3) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + insert into {{schema}}.tree_access + ( user_id + , tree_path + , access + ) + values + ( _user_id + , _tree_path + , _access::int2 + ) + on conflict (user_id, tree_path) do update + set access = _access::int2 + ; + + return found; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- remove_all_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.remove_all_tree_access +( _grantor_id uuid +, _tree_path ltree +, _user_id uuid +) +returns bool +as $func$ +begin + -- grantor must be superuser or owner of tree + if not {{schema}}.has_tree_access(_grantor_id, _tree_path, 3) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + delete from {{schema}}.tree_access + where user_id = _user_id + and _tree_path @> tree_path + ; + + return found; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- list_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.list_tree_access +( _requestor_id uuid +, _tree_path ltree +) +returns table +( tree_path ltree +, user_id uuid +, access int2 +) +as $func$ +begin + -- grantor must be superuser or owner of tree + if not {{schema}}.has_tree_access(_requestor_id, _tree_path, 3) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + return query + select + a.tree_path + , a.user_id + , a.access + from {{schema}}.tree_access a + where _tree_path @> a.tree_path + order by a.tree_path, a.user_id + ; +end; +$func$ language plpgsql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; diff --git a/packages/engine-core/migrate/idempotent/004_memory.sql b/packages/engine-core/migrate/idempotent/004_memory.sql new file mode 100644 index 0000000..6de72b3 --- /dev/null +++ b/packages/engine-core/migrate/idempotent/004_memory.sql @@ -0,0 +1,581 @@ +------------------------------------------------------------------------------- +-- memory triggers +------------------------------------------------------------------------------- +create or replace function {{schema}}.memory_before_update() +returns trigger +as $func$ +begin + -- always update the timestamp + new.updated_at = pg_catalog.now(); + + -- content changed -> new embedding needs to be generated + if old.content is distinct from new.content + and old.embedding is not distinct from new.embedding + then + new.embedding = null; + new.embedding_version = old.embedding_version operator(pg_catalog.+) 1; + end if; + + return new; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp -- public required for pgvector's `is not distinct from` +; + +create or replace trigger memory_before_update_trg +before update on {{schema}}.memory +for each row +execute function {{schema}}.memory_before_update(); + +------------------------------------------------------------------------------- +-- get memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.get_memory +( _user_id uuid +, _id uuid default null +) +returns table +( id uuid +, tree ltree +, meta jsonb +, temporal tstzrange +, content text +, created_at timestamptz +, updated_at timestamptz +, has_embedding bool +) +as $func$ + select + m.id + , m.tree + , m.meta + , m.temporal + , m.content + , m.created_at + , m.updated_at + , m.embedding is not null + from {{schema}}.memory m + where m.id = _id + and {{schema}}.has_tree_access(_user_id, m.tree, 1) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- create memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.create_memory +( _user_id uuid +, _tree ltree +, _content text +, _id uuid default null +, _meta jsonb default '{}' +, _temporal tstzrange default null +) +returns uuid +as $func$ +begin + if not {{schema}}.has_tree_access(_user_id, _tree, 2) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + insert into {{schema}}.memory + ( id + , tree + , meta + , temporal + , content + ) + values + ( coalesce(_id, uuidv7()) + , _tree + , coalesce(_meta, '{}'::jsonb) + , _temporal + , _content + ) + returning id into strict _id + ; + return _id; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- patch memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.patch_memory +( _user_id uuid +, _id uuid +, _patch jsonb +) +returns bool +as $func$ +declare + _src ltree; + _dst ltree; + _ok bool; +begin + -- at least one valid field must be present + select count(*) filter (where k in ('meta', 'tree', 'temporal', 'content')) > 0 + into strict _ok + from jsonb_each(_patch) o(k, v) + ; + + if not _ok then + raise exception 'no valid patch fields found' + using errcode = 'invalid_parameter_value'; + end if; + + _dst = (_patch->>'tree')::ltree; + + -- cannot set tree to null + if _patch ? 'tree' and _dst is null then + raise exception 'tree cannot be set to null' + using errcode = 'invalid_parameter_value'; + end if; + + -- find the existing memory and get it's tree + select m.tree into _src + from {{schema}}.memory m + where m.id = _id + for update -- don't let anyone "move" the memory while we're working on it + ; + + if not found then + return false; + end if; + + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.calc_tree_access(_user_id) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 2 + ) + and + ( + _dst is null + or _src @> _dst + or exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + ) + into strict _ok + ; + + if not _ok then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + update {{schema}}.memory m set + tree = case when _patch ? 'tree' then (_patch->>'tree')::ltree else m.tree end + , meta = case when _patch ? 'meta' then _patch->'meta' else m.meta end + , temporal = case when _patch ? 'temporal' then (_patch->>'temporal')::tstzrange else m.temporal end + , content = case when _patch ? 'content' then _patch->>'content' else m.content end + where id = _id + returning id into _id + ; + + return _id is not null; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- move tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.move_tree +( _user_id uuid +, _src ltree +, _dst ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_src bool; + _has_dst bool; + _moved bigint; +begin + -- must have read/write on _src + -- must have read/write on _dst + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.calc_tree_access(_user_id) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 2 + ) + , exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + into strict _has_src, _has_dst + ; + + if not _has_src then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if not _has_dst then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + with x as + ( + select m.id + from {{schema}}.memory m + where _src @> m.tree + ) + , u as + ( + update {{schema}}.memory m + set tree = + case + when nlevel(m.tree) = nlevel(_src) then _dst + else _dst || subpath(m.tree, nlevel(_src), nlevel(m.tree) - nlevel(_src)) + end + from x + where m.id = x.id + and not _dry_run + ) + select count(*) into strict _moved + from x + ; + return _moved; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- copy tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.copy_tree +( _user_id uuid +, _src ltree +, _dst ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_src bool; + _has_dst bool; + _copied bigint; +begin + -- must have read on _src + -- must have read/write on _dst + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.calc_tree_access(_user_id) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 1 + ) + , exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + into strict _has_src, _has_dst + ; + + if not _has_src then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if not _has_dst then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + with m as + ( + select m.* + from {{schema}}.memory m + where _src @> m.tree + ) + , i as + ( + insert into {{schema}}.memory + ( meta + , tree + , temporal + , content + , embedding + , embedding_version + ) + select + m.meta + , case + when nlevel(m.tree) = nlevel(_src) then _dst + else _dst || subpath(m.tree, nlevel(_src), nlevel(m.tree) - nlevel(_src)) + end as dst + , m.temporal + , m.content + , m.embedding + , m.embedding_version + from m + where not _dry_run + ) + select count(*) into strict _copied + from m + ; + + return _copied; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- delete memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.delete_memory +( _user_id uuid +, _id uuid +) +returns bool +as $func$ +declare + _tree ltree; +begin + select m.tree into _tree + from {{schema}}.memory m + where m.id = _id + for update + ; + + if not found then + return false; + end if; + + if not {{schema}}.has_tree_access(_user_id, _tree, 2) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + delete from {{schema}}.memory + where id = _id + ; + return found; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- delete tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.delete_tree +( _user_id uuid +, _tree ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_access bool; + _deleted bigint; +begin + -- must have read/write on _tree + select exists + ( + select 1 + from {{schema}}.calc_tree_access(_user_id) a + where a.tree_path @> _tree + and a.access >= 2 + ) + into strict _has_access + ; + + if not _has_access then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if _dry_run then + select count(*) into strict _deleted + from {{schema}}.memory m + where _tree @> m.tree + ; + else + with d as + ( + delete from {{schema}}.memory m + where _tree @> m.tree + returning id + ) + select count(*) into strict _deleted + from d + ; + end if; + + return _deleted; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _user_id uuid +, _tree ltree +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.calc_tree_access(_user_id) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where _tree @> m.tree + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _user_id uuid +, _query lquery +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.calc_tree_access(_user_id) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where m.tree ~ _query + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _user_id uuid +, _query ltxtquery +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.calc_tree_access(_user_id) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where m.tree @ _query + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- list tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.list_tree +( _user_id uuid +, _query lquery +) +returns table +( tree ltree +, count bigint +) +as $func$ + with a as materialized + ( + select a.tree_path + from {{schema}}.calc_tree_access(_user_id) a + where a.access >= 1 + ) + , m as + ( + select distinct m.id, m.tree + from {{schema}}.memory m + where m.tree ~ _query + and exists + ( + select 1 + from a + where a.tree_path @> m.tree + ) + ) + select + subltree(m.tree, 0, i) as tree + , count(m.id) as count + from m + cross join lateral generate_series(1, nlevel(m.tree)) i + group by 1 + order by 1 +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; diff --git a/packages/engine-core/migrate/idempotent/005_search.sql b/packages/engine-core/migrate/idempotent/005_search.sql new file mode 100644 index 0000000..2790f70 --- /dev/null +++ b/packages/engine-core/migrate/idempotent/005_search.sql @@ -0,0 +1,329 @@ +------------------------------------------------------------------------------- +-- search_memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.search_memory +( _user_id uuid +, _bm25 bm25query default null +, _vec halfvec({{embedding_dimensions}}) default null +, _max_vec_dist float8 default null +, _ltree ltree default null +, _lquery lquery default null +, _ltxtquery ltxtquery default null +, _meta_contains jsonb default null +, _temporal_within tstzrange default null +, _temporal_overlaps tstzrange default null +, _temporal_before timestamptz default null +, _temporal_after timestamptz default null +, _regexp text default null +, _limit bigint default 10 +) +returns table +( id uuid +, meta jsonb +, tree ltree +, temporal tstzrange +, content text +, has_embedding bool +, created_at timestamptz +, updated_at timestamptz +, score float8 +) +as $func$ +declare + _filter_count int = 0; + _score text; + _filters text[] = '{}'::text; + _order_by text; + _sql text; +begin + -- _bm25 OR _vec but NOT BOTH + if _bm25 is not null and _vec is not null then + raise exception 'providing both _bm25 and _vec is not supported' + using errcode = 'invalid_parameter_value'; + end if; + + if _max_vec_dist is not null and _vec is null then + raise exception '_max_vec_dist provided but _vec was not provided' + using errcode = 'invalid_parameter_value'; + end if; + + -- min 1, max 1000, default 10 + _limit = greatest(least(coalesce(_limit, 10), 1000), 1); + + -- bm25 or semantic + -- score and order by + case + when _bm25 is not null then + _filter_count = _filter_count + 1; + -- <@> is negative bm25 score. smaller values means better match. order by this for index scans + -- negative score * -1 = score. higher score means better match + _score = format($sql$, (m.content <@> %L::bm25query) * -1 as score$sql$, _bm25); + _order_by = format($sql$order by m.content <@> %L::bm25query, m.id$sql$, _bm25); + when _vec is not null then + _filter_count = _filter_count + 1; + -- <=> is cosine distance. smaller distance means better match. order by this for index scans + -- distance * -1 = "score". higher score means better match + _score = format($sql$, (m.embedding <=> %L::halfvec({{embedding_dimensions}})) * -1 as score$sql$, _vec); + _order_by = format($sql$order by m.embedding <=> %L::halfvec({{embedding_dimensions}}), m.id$sql$, _vec); + _filters = array_append + ( _filters + , $sql$and m.embedding is not null$sql$ + ); + if _max_vec_dist is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and (m.embedding <=> %L::halfvec({{embedding_dimensions}})) <= %L::float8$sql$, _vec, _max_vec_dist) + ); + end if; + else + _score = $sql$, -1 as score$sql$; + _order_by = $sql$order by m.id; + end case; + + -- ltree + if _ltree is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::ltree @> m.tree$sql$, _ltree) + ); + end if; + + -- lquery + if _lquery is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.tree ~ %L::lquery$sql$, _lquery) + ); + end if; + + -- ltxtquery + if _ltxtquery is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.tree @ %L::ltxtquery$sql$, _ltxtquery) + ); + end if; + + -- meta_contains + if _meta_contains is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.meta @> %L::jsonb$sql$, _meta_contains) + ); + end if; + + -- temporal_within + if _temporal_within is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::tstzrange @> m.temporal$sql$, _temporal_within) + ); + end if; + + -- temporal_overlaps + if _temporal_overlaps is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::tstzrange && m.temporal$sql$, _temporal_overlaps) + ); + end if; + + -- temporal_before + if _temporal_before is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and tstzrange('-infinity'::timestamptz, %L::timestamptz, '[]') @> m.temporal$sql$, _temporal_before) + ); + end if; + + -- temporal_after + if _temporal_after is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and tstzrange(%L::timestamptz, 'infinity'::timestamptz, '[]') @> m.temporal$sql$, _temporal_after) + ); + end if; + + -- regexp + if _regexp is not null then + if _filter_count = 0 then + raise exception 'regexp must not be the only filter criteria' + using errcode = 'invalid_parameter_value'; + end if; + _filters = array_append + ( _filters + , format($sql$and m.content ~* %L::text$sql$, _regexp) + ); + end if; + + -- construct the query + _sql = format( + $sql$ + with x as materialized + ( + select a.tree_path + from {{schema}}.calc_tree_access($1) a + where a.access >= 1 + ) + select + m.id + , m.meta + , m.tree + , m.temporal + , m.content + , m.embedding is not null + , m.created_at + , m.updated_at + %s + from {{schema}}.memory m + where exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) + %s + %s + limit $2 + $sql$ + , _score + , coalesce + (( + select string_agg(x, E'\n ') + from unnest(_filters) x + ), '') + , _order_by + ); + + return query execute _sql using _user_id, _limit; +end; +$func$ language plpgsql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- hybrid_search_memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.hybrid_search_memory +( _user_id uuid +, _bm25 bm25query +, _vec halfvec({{embedding_dimensions}}) +, _max_vec_dist float8 default null +, _ltree ltree default null +, _lquery lquery default null +, _ltxtquery ltxtquery default null +, _meta_contains jsonb default null +, _temporal_within tstzrange default null +, _temporal_overlaps tstzrange default null +, _temporal_before timestamptz default null +, _temporal_after timestamptz default null +, _regexp text default null +, _k float8 default 60.0 +, _candidate_limit bigint default 30 +, _fulltext_weight float8 default 1.0 +, _semantic_weight float8 default 1.0 +, _limit bigint default 10 +) +returns table +( id uuid +, meta jsonb +, tree ltree +, temporal tstzrange +, content text +, has_embedding bool +, created_at timestamptz +, updated_at timestamptz +, score float8 +) +as $func$ +declare +begin + if _bm25 is null then + raise exception '_bm25 must not be null' + using errcode = 'invalid_parameter_value'; + end if; + + if _vec is null then + raise exception '_vec must not be null' + using errcode = 'invalid_parameter_value'; + end if; + + _k = greatest(coalesce(_k, 60.0), 0.0); + _limit = greatest(least(coalesce(_limit, 10), 1000), 1); + _candidate_limit = greatest + ( least(coalesce(_candidate_limit, 30), 1000) + , _limit + ); + _fulltext_weight = greatest(least(coalesce(_fulltext_weight, 1.0), 1.0), 0.0); + _semantic_weight = greatest(least(coalesce(_semantic_weight, 1.0), 1.0), 0.0); + + -- reciprocal rank fusion + return query + select + coalesce(x1.id, x2.id) as id + , coalesce(x1.meta, x2.meta) as meta + , coalesce(x1.tree, x2.tree) as tree + , coalesce(x1.temporal, x2.temporal) as temporal + , coalesce(x1.content, x2.content) as content + , coalesce(x1.has_embedding, x2.has_embedding) as has_embedding + , coalesce(x1.created_at, x2.created_at) as created_at + , coalesce(x1.updated_at, x2.updated_at) as updated_at + , coalesce(_fulltext_weight / (_k + x1.rank), 0.0) + + coalesce(_semantic_weight / (_k + x2.rank), 0.0) as score + from + ( + select + row_number() over (order by m.score desc, m.id) as rank + , m.* + from {{schema}}.search_memory + ( _user_id => _user_id + , _bm25 => _bm25 + , _ltree => _ltree + , _lquery => _lquery + , _ltxtquery => _ltxtquery + , _meta_contains => _meta_contains + , _temporal_within => _temporal_within + , _temporal_overlaps => _temporal_overlaps + , _temporal_before => _temporal_before + , _temporal_after => _temporal_after + , _regexp => _regexp + , _limit => _candidate_limit + ) m + ) x1 + full outer join + ( + select + row_number() over (order by m.score desc, m.id) as rank + , m.* + from {{schema}}.search_memory + ( _user_id => _user_id + , _vec => _vec + , _max_vec_dist => _max_vec_dist + , _ltree => _ltree + , _lquery => _lquery + , _ltxtquery => _ltxtquery + , _meta_contains => _meta_contains + , _temporal_within => _temporal_within + , _temporal_overlaps => _temporal_overlaps + , _temporal_before => _temporal_before + , _temporal_after => _temporal_after + , _regexp => _regexp + , _limit => _candidate_limit + ) m + ) x2 on (x1.id = x2.id) + order by score desc, id + limit _limit + ; +end; +$func$ language plpgsql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; diff --git a/packages/engine-core/migrate/idempotent/006_embedding_queue.sql b/packages/engine-core/migrate/idempotent/006_embedding_queue.sql new file mode 100644 index 0000000..52f9cb5 --- /dev/null +++ b/packages/engine-core/migrate/idempotent/006_embedding_queue.sql @@ -0,0 +1,163 @@ + +------------------------------------------------------------------------------- +-- enqueue_embedding +------------------------------------------------------------------------------- +create or replace function {{schema}}.enqueue_embedding() +returns trigger +as $func$ +begin + insert into {{schema}}.embedding_queue (memory_id, embedding_version) + values (new.id, new.embedding_version); + return new; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +create or replace trigger memory_enqueue_embedding_insert +after insert on {{schema}}.memory +for each row +when (new.embedding is null) -- it's possible to insert with an embedding +execute function {{schema}}.enqueue_embedding() +; + +create or replace trigger memory_enqueue_embedding_update +after update on {{schema}}.memory +for each row +when +( old.content is distinct from new.content + and new.embedding is null +) +execute function {{schema}}.enqueue_embedding() +; + +------------------------------------------------------------------------------- +-- claim_embedding_batch +------------------------------------------------------------------------------- +create or replace function {{schema}}.claim_embedding_batch +( _batch_size int default 10 +, _lock_duration interval default '5 minutes' +, _max_attempts int default 3 +) +returns table +( queue_id bigint +, memory_id uuid +, embedding_version int +, content text +) +as $func$ +declare + _rec record; + _mem record; + _claimed_count int = 0; +begin + -- bulk-cancel visible queue rows superseded by a newer row for the same memory + update {{schema}}.embedding_queue eq + set outcome = 'cancelled' + where eq.outcome is null + and eq.vt <= now() + and exists + ( + select 1 + from {{schema}}.embedding_queue newer + where newer.memory_id = eq.memory_id + and newer.embedding_version > eq.embedding_version + and newer.outcome is null + ); + + -- sweep: finalize exhausted rows orphaned by worker crash + -- (attempts reached max but outcome was never written back) + update {{schema}}.embedding_queue + set + outcome = 'failed' + , last_error = coalesce(last_error, 'exceeded max attempts') + where outcome is null + and vt <= now() + and attempts >= _max_attempts + ; + + for _rec in + ( + select + eq.id + , eq.memory_id + , eq.embedding_version + from {{schema}}.embedding_queue eq + where eq.outcome is null + and eq.vt <= now() + and eq.attempts < _max_attempts + order by eq.vt + for update skip locked + ) + loop + -- check memory still exists + current version + select m.content, m.embedding_version + into _mem + from {{schema}}.memory m + where m.id = _rec.memory_id + ; + + if not found or _mem.content is null then + -- memory deleted or empty → cancel queue row + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + if _rec.embedding_version != _mem.embedding_version then + -- stale version → cancel + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + -- claim this row + update {{schema}}.embedding_queue q set + vt = now() + _lock_duration + , attempts = q.attempts + 1 + where id = _rec.id; + + queue_id = _rec.id; + memory_id = _rec.memory_id; + embedding_version = _rec.embedding_version; + content = _mem.content; + return next; + + _claimed_count = _claimed_count + 1; + exit when _claimed_count >= _batch_size; + end loop; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +------------------------------------------------------------------------------- +-- prune embedding queue +------------------------------------------------------------------------------- +-- prune terminal queue rows older than the retention window. +-- runs opportunistically from the worker on engines that returned no +-- claimable work, so the queue table doesn't grow unbounded. +-- +-- relies on embedding_queue_archive_idx (created_at) where outcome is not null +-- from migration 005, so the no-op case is cheap. +create or replace function {{schema}}.prune_embedding_queue(_retention interval default '7 days') +returns bigint +as $func$ +declare + pruned bigint; +begin + delete from {{schema}}.embedding_queue + where outcome is not null + and created_at < now() - _retention + ; + get diagnostics pruned = row_count; + return pruned; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; diff --git a/packages/engine-core/migrate/idempotent/sql.d.ts b/packages/engine-core/migrate/idempotent/sql.d.ts new file mode 100644 index 0000000..89b092e --- /dev/null +++ b/packages/engine-core/migrate/idempotent/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const content: string; + export default content; +} diff --git a/packages/engine-core/migrate/incremental/000_provision.sql b/packages/engine-core/migrate/incremental/000_provision.sql new file mode 100644 index 0000000..e98b9d9 --- /dev/null +++ b/packages/engine-core/migrate/incremental/000_provision.sql @@ -0,0 +1,15 @@ +create schema {{schema}}; + +create table {{schema}}.version +( version text not null +, at timestamptz not null default now() +); + +create unique index version_singleton_idx on {{schema}}.version ((true)); -- only ONE row allowed +insert into {{schema}}.version (version) values ('0.0.0'); + +create table {{schema}}.migration +( name text not null constraint migration_pkey primary key +, applied_at_version text not null +, applied_at timestamptz not null default pg_catalog.clock_timestamp() +); diff --git a/packages/engine-core/migrate/incremental/001_user.sql b/packages/engine-core/migrate/incremental/001_user.sql new file mode 100644 index 0000000..f93567d --- /dev/null +++ b/packages/engine-core/migrate/incremental/001_user.sql @@ -0,0 +1,12 @@ + +------------------------------------------------------------------------------- +-- users +------------------------------------------------------------------------------- +-- note: "user" is a reserved word, must be quoted +create table {{schema}}."user" +( id uuid primary key default uuidv7() check (uuid_extract_version(id) = 7) +, name citext not null unique +, superuser boolean not null default false +--, type text not null check (type in ('user', 'role', 'agent')) +, created_at timestamptz not null default now() +); diff --git a/packages/engine-core/migrate/incremental/002_role_membership.sql b/packages/engine-core/migrate/incremental/002_role_membership.sql new file mode 100644 index 0000000..5968a99 --- /dev/null +++ b/packages/engine-core/migrate/incremental/002_role_membership.sql @@ -0,0 +1,13 @@ +------------------------------------------------------------------------------- +-- role membership +------------------------------------------------------------------------------- +create table {{schema}}.role_membership +( role_id uuid not null references {{schema}}."user"(id) on delete cascade +, member_id uuid not null references {{schema}}."user"(id) on delete cascade +, admin boolean not null default false +, created_at timestamptz not null default now() +, constraint pkey_role_membership primary key (member_id, role_id) +, constraint no_self_membership check (role_id != member_id) +); + +create index idx_role_membership_role on {{schema}}.role_membership(role_id) include (member_id); diff --git a/packages/engine-core/migrate/incremental/003_tree_access.sql b/packages/engine-core/migrate/incremental/003_tree_access.sql new file mode 100644 index 0000000..345c1d3 --- /dev/null +++ b/packages/engine-core/migrate/incremental/003_tree_access.sql @@ -0,0 +1,12 @@ +------------------------------------------------------------------------------- +-- tree access +------------------------------------------------------------------------------- +create table {{schema}}.tree_access +( user_id uuid not null references {{schema}}."user"(id) on delete cascade +, tree_path ltree not null +, access int2 not null check (access in (1, 2, 3)) -- read, read/write, owner +, created_at timestamptz not null default now() +, constraint pkey_tree_access primary key (user_id, tree_path) +); + +create index idx_tree_access_path on {{schema}}.tree_access using gist (tree_path); diff --git a/packages/engine-core/migrate/incremental/004_memory.sql b/packages/engine-core/migrate/incremental/004_memory.sql new file mode 100644 index 0000000..c66eea7 --- /dev/null +++ b/packages/engine-core/migrate/incremental/004_memory.sql @@ -0,0 +1,48 @@ +------------------------------------------------------------------------------- +-- memory +------------------------------------------------------------------------------- +create table {{schema}}.memory +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, meta jsonb not null default '{}' check (jsonb_typeof(meta) = 'object') +, tree ltree not null default ''::ltree +, temporal tstzrange +, content text not null +, embedding halfvec({{embedding_dimensions}}) +, embedding_version int4 not null default 1 +, created_at timestamptz not null default now() +, updated_at timestamptz +); + +-- index for faceted search +create index memory_meta_gin_idx on {{schema}}.memory using gin (meta); + +-- index for temporal search +create index memory_temporal_gist_idx on {{schema}}.memory using gist (temporal) where (temporal is not null); + +-- index for BM25 text search +create index memory_content_bm25_idx on {{schema}}.memory using bm25 (content) +with (text_config = {{bm25_text_config}}, k1 = {{bm25_k1}}, b = {{bm25_b}}); + +-- index for vector similarity search +create index memory_embedding_hnsw_idx on {{schema}}.memory using hnsw (embedding halfvec_cosine_ops) +with (m = {{hnsw_m}}, ef_construction = {{hnsw_ef_construction}}); + +-- index for hierarchical organization +create index memory_tree_gist_idx on {{schema}}.memory using gist (tree); + +/* +enforce consistent temporal range conventions: +- point-in-time events: lower = upper with inclusive bounds '[same,same]' +- time periods: lower < upper with inclusive-exclusive bounds '[start,end)' +*/ +alter table {{schema}}.memory add constraint temporal_bounds_convention check +( + temporal is null + or ( + -- point-in-time: both bounds equal and inclusive + (lower(temporal) = upper(temporal) and lower_inc(temporal) and upper_inc(temporal)) + or + -- time range: start before end, inclusive-exclusive + (lower(temporal) < upper(temporal) and lower_inc(temporal) and not upper_inc(temporal)) + ) +); diff --git a/packages/engine-core/migrate/incremental/005_embedding_queue.sql b/packages/engine-core/migrate/incremental/005_embedding_queue.sql new file mode 100644 index 0000000..dce748a --- /dev/null +++ b/packages/engine-core/migrate/incremental/005_embedding_queue.sql @@ -0,0 +1,21 @@ +------------------------------------------------------------------------------- +-- embedding queue +------------------------------------------------------------------------------- +-- per-engine embedding queue table +create table {{schema}}.embedding_queue +( id bigint generated always as identity primary key +, memory_id uuid not null references {{schema}}.memory(id) on delete cascade +, embedding_version int not null +, vt timestamptz not null default now() +, outcome text check (outcome is null or outcome in ('completed', 'failed', 'cancelled')) +, attempts int not null default 0 +, last_error text +, created_at timestamptz not null default now() +); + +-- index to find items to claim +create index embedding_queue_claim_idx on {{schema}}.embedding_queue (vt) where outcome is null; +-- index also used in finding items to claim. used to ensure there aren't any items for the same memory with a newer version +create index embedding_queue_memory_idx on {{schema}}.embedding_queue (memory_id, embedding_version desc) where outcome is null; +-- index to find items that have resolved to an outcome. these can be pruned +create index embedding_queue_archive_idx on {{schema}}.embedding_queue (created_at) where outcome is not null; diff --git a/packages/engine-core/migrate/incremental/sql.d.ts b/packages/engine-core/migrate/incremental/sql.d.ts new file mode 100644 index 0000000..89b092e --- /dev/null +++ b/packages/engine-core/migrate/incremental/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const content: string; + export default content; +} diff --git a/packages/engine-core/migrate/migrate.integration.test.ts b/packages/engine-core/migrate/migrate.integration.test.ts new file mode 100644 index 0000000..9b38a2f --- /dev/null +++ b/packages/engine-core/migrate/migrate.integration.test.ts @@ -0,0 +1,524 @@ +import { afterAll, beforeAll, describe, expect, test } from "bun:test"; +import { SQL } from "bun"; +import { bootstrapEngineDatabase } from "./bootstrap"; +import { migrateEngine } from "./migrate"; + +const adminUrl = + process.env.ENGINE_CORE_TEST_DATABASE_URL ?? + "postgresql://postgres@localhost:5432/postgres"; + +// These tests expect the local Postgres image from docker/Dockerfile.postgres, +// usually started with `./bun run pg`, unless ENGINE_CORE_TEST_DATABASE_URL is set. + +let dbName: string | undefined; +let sql: SQL | undefined; + +function assertSafeIdentifier(name: string): void { + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(name)) { + throw new Error(`Unsafe database identifier: ${name}`); + } +} + +function getSql(): SQL { + if (!sql) throw new Error("test database is not initialized"); + return sql; +} + +function randomSlug(): string { + return `t${Math.random().toString(36).slice(2, 13).padEnd(11, "0")}`; +} + +function schemaFor(slug: string): string { + return `me_${slug}`; +} + +async function createTestDatabase(): Promise { + dbName = `test_engine_core_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + assertSafeIdentifier(dbName); + + const admin = new SQL(adminUrl); + try { + await admin.unsafe(`create database ${dbName}`); + } finally { + await admin.close(); + } + + const url = new URL(adminUrl); + url.pathname = `/${dbName}`; + return url.toString(); +} + +async function dropTestDatabase(): Promise { + if (!dbName) return; + assertSafeIdentifier(dbName); + + const admin = new SQL(adminUrl); + try { + await admin` + select pg_terminate_backend(pid) + from pg_stat_activity + where datname = ${dbName} + and pid <> pg_backend_pid() + `; + await admin.unsafe(`drop database if exists ${dbName}`); + } finally { + await admin.close(); + dbName = undefined; + } +} + +async function schemaExists(schema: string): Promise { + const [{ exists }] = await getSql()` + select exists ( + select 1 + from information_schema.schemata + where schema_name = ${schema} + ) as exists + `; + return exists; +} + +async function tableExists(schema: string, table: string): Promise { + const [{ exists }] = await getSql()` + select exists ( + select 1 + from information_schema.tables + where table_schema = ${schema} + and table_name = ${table} + ) as exists + `; + return exists; +} + +async function migrationCount(schema: string): Promise { + const [{ count }] = await getSql()` + select count(*)::int as count + from ${getSql()(schema)}.migration + `; + return count; +} + +async function engineVersion(schema: string): Promise { + const [{ version }] = await getSql()` + select version + from ${getSql()(schema)}.version + `; + return version; +} + +beforeAll(async () => { + const connectionString = await createTestDatabase(); + sql = new SQL(connectionString); + await bootstrapEngineDatabase(sql); +}); + +afterAll(async () => { + await sql?.close(); + sql = undefined; + await dropTestDatabase(); +}); + +describe("migrateEngine", () => { + test("provisions a new engine schema", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + + await migrateEngine(getSql(), { slug, targetVersion: "0.1.0" }); + + expect(await schemaExists(schema)).toBe(true); + expect(await tableExists(schema, "version")).toBe(true); + expect(await tableExists(schema, "migration")).toBe(true); + expect(await tableExists(schema, "user")).toBe(true); + expect(await tableExists(schema, "role_membership")).toBe(true); + expect(await tableExists(schema, "tree_owner")).toBe(true); + expect(await tableExists(schema, "tree_grant")).toBe(true); + expect(await tableExists(schema, "memory")).toBe(true); + expect(await tableExists(schema, "embedding_queue")).toBe(true); + expect(await engineVersion(schema)).toBe("0.1.0"); + + const rows = await getSql()` + select name, applied_at_version, applied_at + from ${getSql()(schema)}.migration + order by name + `; + expect(rows.map((row: { name: string }) => row.name)).toEqual([ + "001_user", + "002_role_membership", + "003_tree_ownership", + "004_tree_grant", + "005_memory", + "006_embedding_queue", + ]); + for (const row of rows as Array<{ + applied_at_version: string; + applied_at: Date; + }>) { + expect(row.applied_at_version).toBe("0.1.0"); + expect(row.applied_at).toBeTruthy(); + } + }); + + test("is idempotent", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + + await migrateEngine(getSql(), { slug, targetVersion: "0.1.0" }); + await migrateEngine(getSql(), { slug, targetVersion: "0.1.0" }); + + expect(await migrationCount(schema)).toBe(6); + expect(await engineVersion(schema)).toBe("0.1.0"); + }); + + test("rejects invalid slug", async () => { + await expect( + migrateEngine(getSql(), { slug: "bad-slug", targetVersion: "0.1.0" }), + ).rejects.toThrow("Invalid engine slug"); + }); + + test("rejects invalid targetVersion", async () => { + await expect( + migrateEngine(getSql(), { slug: randomSlug(), targetVersion: "nope" }), + ).rejects.toThrow("Invalid target version"); + }); + + test("rejects downgrade", async () => { + const slug = randomSlug(); + + await migrateEngine(getSql(), { slug, targetVersion: "0.2.0" }); + + await expect( + migrateEngine(getSql(), { slug, targetVersion: "0.1.0" }), + ).rejects.toThrow("older than database version"); + }); + + test("allows equal current version rerun", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + + await migrateEngine(getSql(), { slug, targetVersion: "0.2.0" }); + await migrateEngine(getSql(), { slug, targetVersion: "0.2.0" }); + + expect(await migrationCount(schema)).toBe(6); + expect(await engineVersion(schema)).toBe("0.2.0"); + }); + + test("allows upgrade without pending migrations", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + + await migrateEngine(getSql(), { slug, targetVersion: "0.1.0" }); + await migrateEngine(getSql(), { slug, targetVersion: "0.2.0" }); + + expect(await migrationCount(schema)).toBe(6); + expect(await engineVersion(schema)).toBe("0.2.0"); + }); + + test("rejects unsafe shardId", async () => { + await expect( + migrateEngine(getSql(), { + slug: randomSlug(), + targetVersion: "0.1.0", + shardId: Number.MAX_SAFE_INTEGER + 1, + }), + ).rejects.toThrow("shardId must be a safe integer"); + }); + + test("grants and revokes tree actions", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + const db = getSql(); + + await migrateEngine(db, { slug, targetVersion: "0.1.0" }); + + const [{ id: ownerId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`owner_${slug}`}) + returning id + `; + const [{ id: granteeId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`grantee_${slug}`}) + returning id + `; + const [{ id: outsiderId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`outsider_${slug}`}) + returning id + `; + + await db` + insert into ${db(schema)}.tree_owner (tree_path, user_id) + values ('project'::ltree, ${ownerId}::uuid) + `; + + try { + await db` + select ${db(schema)}.grant_tree_actions( + ${outsiderId}::uuid, + array['read']::text[], + 'project.alpha'::ltree, + ${granteeId}::uuid + ) + `; + throw new Error("expected grant_tree_actions to reject"); + } catch (error) { + expect(String(error)).toContain( + "must be a superuser or own the tree path", + ); + } + + await db` + select ${db(schema)}.grant_tree_actions( + ${ownerId}::uuid, + array['read']::text[], + 'project.alpha'::ltree, + ${granteeId}::uuid + ) + `; + await db` + select ${db(schema)}.grant_tree_actions( + ${ownerId}::uuid, + array['update']::text[], + 'project.alpha'::ltree, + ${granteeId}::uuid + ) + `; + + const [{ actions: grantedActions }] = await db` + select actions + from ${db(schema)}.tree_grant + where user_id = ${granteeId}::uuid + and tree_path = 'project.alpha'::ltree + `; + expect(grantedActions).toEqual(["read", "update"]); + + await db` + select ${db(schema)}.revoke_tree_actions( + ${ownerId}::uuid, + array['read']::text[], + 'project.alpha'::ltree, + ${granteeId}::uuid + ) + `; + + const [{ actions: remainingActions }] = await db` + select actions + from ${db(schema)}.tree_grant + where user_id = ${granteeId}::uuid + and tree_path = 'project.alpha'::ltree + `; + expect(remainingActions).toEqual(["update"]); + + await db` + select ${db(schema)}.revoke_tree_actions( + ${ownerId}::uuid, + array['update']::text[], + 'project.alpha'::ltree, + ${granteeId}::uuid + ) + `; + + const [{ exists }] = await db` + select exists + ( + select 1 + from ${db(schema)}.tree_grant + where user_id = ${granteeId}::uuid + and tree_path = 'project.alpha'::ltree + ) as exists + `; + expect(exists).toBe(false); + }, 30_000); + + test("lists tree breadcrumbs without double-counting overlapping privileges", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + const db = getSql(); + + await migrateEngine(db, { slug, targetVersion: "0.1.0" }); + + const [{ id: userId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`tree_user_${slug}`}) + returning id + `; + + await db` + insert into ${db(schema)}.tree_owner (tree_path, user_id) + values ('work'::ltree, ${userId}::uuid) + `; + await db` + insert into ${db(schema)}.tree_grant (user_id, tree_path, actions) + values (${userId}::uuid, 'work.api'::ltree, array['read']::text[]) + `; + await db` + insert into ${db(schema)}.memory (tree, content) + values + ('work.api.auth'::ltree, 'auth') + , ('work.api.search'::ltree, 'search') + , ('work.ui'::ltree, 'ui') + `; + + const rows = await db` + select tree::text, count::int + from ${db(schema)}.list_tree(${userId}::uuid, 'work.api.*{0,}'::lquery) + `; + + expect(rows).toEqual([ + { tree: "work", count: 2 }, + { tree: "work.api", count: 2 }, + { tree: "work.api.auth", count: 1 }, + { tree: "work.api.search", count: 1 }, + ]); + }); + + test("moves a tree by rewriting the source prefix", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + const db = getSql(); + + await migrateEngine(db, { slug, targetVersion: "0.1.0" }); + + const [{ id: userId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`move_user_${slug}`}) + returning id + `; + + await db` + insert into ${db(schema)}.tree_owner (tree_path, user_id) + values + ('work'::ltree, ${userId}::uuid) + , ('archive'::ltree, ${userId}::uuid) + `; + await db` + insert into ${db(schema)}.tree_grant (user_id, tree_path, actions) + values (${userId}::uuid, 'work.api'::ltree, array['update']::text[]) + `; + await db` + insert into ${db(schema)}.memory (tree, content) + values + ('work.api'::ltree, 'api') + , ('work.api.auth'::ltree, 'auth') + , ('work.ui'::ltree, 'ui') + `; + + const [{ count: dryRunCount }] = await db` + select ${db(schema)}.move_tree( + ${userId}::uuid, + 'work.api'::ltree, + 'archive.api'::ltree, + true + )::int as count + `; + expect(dryRunCount).toBe(2); + + const dryRunRows = await db` + select tree::text, content + from ${db(schema)}.memory + order by tree::text + `; + expect(dryRunRows).toEqual([ + { tree: "work.api", content: "api" }, + { tree: "work.api.auth", content: "auth" }, + { tree: "work.ui", content: "ui" }, + ]); + + const [{ count: moveCount }] = await db` + select ${db(schema)}.move_tree( + ${userId}::uuid, + 'work.api'::ltree, + 'archive.api'::ltree, + false + )::int as count + `; + expect(moveCount).toBe(2); + + const movedRows = await db` + select tree::text, content + from ${db(schema)}.memory + order by tree::text + `; + expect(movedRows).toEqual([ + { tree: "archive.api", content: "api" }, + { tree: "archive.api.auth", content: "auth" }, + { tree: "work.ui", content: "ui" }, + ]); + }); + + test("copies a tree by rewriting the source prefix", async () => { + const slug = randomSlug(); + const schema = schemaFor(slug); + const db = getSql(); + + await migrateEngine(db, { slug, targetVersion: "0.1.0" }); + + const [{ id: userId }] = await db` + insert into ${db(schema)}."user" (name) + values (${`copy_user_${slug}`}) + returning id + `; + + await db` + insert into ${db(schema)}.tree_owner (tree_path, user_id) + values + ('work'::ltree, ${userId}::uuid) + , ('archive'::ltree, ${userId}::uuid) + `; + await db` + insert into ${db(schema)}.tree_grant (user_id, tree_path, actions) + values (${userId}::uuid, 'work.api'::ltree, array['read']::text[]) + `; + await db` + insert into ${db(schema)}.memory (tree, content) + values + ('work.api'::ltree, 'api') + , ('work.api.auth'::ltree, 'auth') + , ('work.ui'::ltree, 'ui') + `; + + const [{ count: dryRunCount }] = await db` + select ${db(schema)}.copy_tree( + ${userId}::uuid, + 'work.api'::ltree, + 'archive.api'::ltree, + true + )::int as count + `; + expect(dryRunCount).toBe(2); + + const dryRunRows = await db` + select tree::text, content + from ${db(schema)}.memory + order by tree::text, content + `; + expect(dryRunRows).toEqual([ + { tree: "work.api", content: "api" }, + { tree: "work.api.auth", content: "auth" }, + { tree: "work.ui", content: "ui" }, + ]); + + const [{ count: copyCount }] = await db` + select ${db(schema)}.copy_tree( + ${userId}::uuid, + 'work.api'::ltree, + 'archive.api'::ltree, + false + )::int as count + `; + expect(copyCount).toBe(2); + + const copiedRows = await db` + select tree::text, content + from ${db(schema)}.memory + order by tree::text, content + `; + expect(copiedRows).toEqual([ + { tree: "archive.api", content: "api" }, + { tree: "archive.api.auth", content: "auth" }, + { tree: "work.api", content: "api" }, + { tree: "work.api.auth", content: "auth" }, + { tree: "work.ui", content: "ui" }, + ]); + }); +}); diff --git a/packages/engine-core/migrate/migrate.ts b/packages/engine-core/migrate/migrate.ts new file mode 100644 index 0000000..83911df --- /dev/null +++ b/packages/engine-core/migrate/migrate.ts @@ -0,0 +1,384 @@ +import { createHash } from "node:crypto"; +import { info, reportError, span } from "@pydantic/logfire-node"; +import type { SQL } from "bun"; +import { semver } from "bun"; +import { isValidSlug, slugToSchema } from "../slug"; + +import provisionSql from "./incremental/000_provision.sql" with { + type: "text", +}; +import incremental001 from "./incremental/001_user.sql" with { type: "text" }; +import incremental002 from "./incremental/002_role_membership.sql" with { + type: "text", +}; +import incremental003 from "./incremental/003_tree_access.sql" with { + type: "text", +}; +import incremental004 from "./incremental/004_memory.sql" with { type: "text" }; +import incremental005 from "./incremental/005_embedding_queue.sql" with { + type: "text", +}; + +interface Incremental { + name: string; + sql: string; +} + +const incrementals: Incremental[] = [ + { name: "001_user", sql: incremental001 }, + { name: "002_role_membership", sql: incremental002 }, + { name: "003_tree_access", sql: incremental003 }, + { name: "004_memory", sql: incremental004 }, + { name: "005_embedding_queue", sql: incremental005 }, +]; + +import idempotent001 from "./idempotent/001_user.sql" with { type: "text" }; +import idempotent002 from "./idempotent/002_role_membership.sql" with { + type: "text", +}; +import idempotent003 from "./idempotent/003_tree_access.sql" with { + type: "text", +}; +import idempotent004 from "./idempotent/004_memory.sql" with { type: "text" }; +import idempotent005 from "./idempotent/005_search.sql" with { type: "text" }; +import idempotent006 from "./idempotent/006_embedding_queue.sql" with { + type: "text", +}; + +interface Idempotent { + name: string; + sql: string; +} + +const idempotents: Idempotent[] = [ + { name: "001_user", sql: idempotent001 }, + { name: "002_role_membership", sql: idempotent002 }, + { name: "003_tree_access", sql: idempotent003 }, + { name: "004_memory", sql: idempotent004 }, + { name: "005_search", sql: idempotent005 }, + { name: "006_embedding_queue", sql: idempotent006 }, +]; + +export interface MigrateEngineOptions { + slug: string; + targetVersion: string; + shardId?: number; + embeddingDimensions?: number; + bm25TextConfig?: string; + bm25K1?: number; + bm25B?: number; + hnswM?: number; + hnswEfConstruction?: number; + statementTimeout?: string; + lockTimeout?: string; + transactionTimeout?: string; + idleInTransactionSessionTimeout?: string; +} + +interface NormalizedMigrateEngineOptions { + slug: string; + targetVersion: string; + shardId?: number; + embeddingDimensions: number; + bm25TextConfig: string; + bm25K1: number; + bm25B: number; + hnswM: number; + hnswEfConstruction: number; + statementTimeout: string; + lockTimeout: string; + transactionTimeout: string; + idleInTransactionSessionTimeout: string; +} + +export async function migrateEngine( + sql: SQL, + options: MigrateEngineOptions, +): Promise { + const opts = normalizeMigrateEngineOptions(options); + const attributes = migrateAttributes(opts); + + await span("engine_core.migrate", { + attributes, + callback: async () => { + try { + if (!isValidSlug(opts.slug)) { + throw new Error( + `Invalid engine slug: "${opts.slug}" — must be 12 lowercase alphanumeric characters`, + ); + } + if (!semver.satisfies(opts.targetVersion, "*")) { + throw new Error(`Invalid target version: "${opts.targetVersion}"`); + } + const schema = slugToSchema(opts.slug); + const schemaAttributes = { ...attributes, "db.schema": schema }; + const [key1, key2] = advisoryLockKey(`memory-engine:schema:${schema}`); + + await sql.begin(async (tx) => { + if (opts.shardId !== undefined) { + if (!Number.isSafeInteger(opts.shardId)) { + throw new Error( + `shardId must be a safe integer, got: ${opts.shardId}`, + ); + } + await tx.unsafe(`set local pgdog.shard to ${String(opts.shardId)}`); + } + await tx`select set_config('statement_timeout', ${opts.statementTimeout}, true)`; + await tx`select set_config('lock_timeout', ${opts.lockTimeout}, true)`; + await tx`select set_config('transaction_timeout', ${opts.transactionTimeout}, true)`; + await tx`select set_config('idle_in_transaction_session_timeout', ${opts.idleInTransactionSessionTimeout}, true)`; + const acquired = await span("engine_core.migrate.acquire_lock", { + attributes: schemaAttributes, + callback: () => acquireAdvisoryLock(tx, key1, key2), + }); + if (!acquired) { + throw new Error( + `Unable to acquire lock for engine slug ${opts.slug} migrations.`, + ); + } + + if (!(await doesEngineExist(tx, schema))) { + await span("engine_core.migrate.provision", { + attributes: schemaAttributes, + callback: () => provisionEngine(tx, schema), + }); + info("Engine core schema provisioned", schemaAttributes); + } + await span("engine_core.migrate.run", { + attributes: schemaAttributes, + callback: () => runMigrations(tx, schema, opts), + }); + }); + info("Engine core migrations completed", schemaAttributes); + } catch (error) { + reportError("Engine core migration failed", error as Error, attributes); + throw error; + } + }, + }); +} + +function migrateAttributes( + options: NormalizedMigrateEngineOptions, +): Record { + return { + "engine.slug": options.slug, + "engine.target_version": options.targetVersion, + "db.shard": options.shardId, + "db.statement_timeout": options.statementTimeout, + "db.lock_timeout": options.lockTimeout, + "db.transaction_timeout": options.transactionTimeout, + "db.idle_in_transaction_session_timeout": + options.idleInTransactionSessionTimeout, + }; +} + +function normalizeMigrateEngineOptions( + options: MigrateEngineOptions, +): NormalizedMigrateEngineOptions { + return { + slug: options.slug, + targetVersion: options.targetVersion, + shardId: options.shardId, + embeddingDimensions: options.embeddingDimensions ?? 1536, + bm25TextConfig: options.bm25TextConfig ?? "english", + bm25K1: options.bm25K1 ?? 1.2, + bm25B: options.bm25B ?? 0.75, + hnswM: options.hnswM ?? 16, + hnswEfConstruction: options.hnswEfConstruction ?? 64, + statementTimeout: options.statementTimeout ?? "20s", + lockTimeout: options.lockTimeout ?? "5s", + transactionTimeout: options.transactionTimeout ?? "1min", + idleInTransactionSessionTimeout: + options.idleInTransactionSessionTimeout ?? "5s", + }; +} + +function templateVars( + schema: string, + options: NormalizedMigrateEngineOptions, +): Record { + return { + ...options, + schema, + embedding_dimensions: options.embeddingDimensions, + bm25_text_config: options.bm25TextConfig, + bm25_k1: options.bm25K1, + bm25_b: options.bm25B, + hnsw_m: options.hnswM, + hnsw_ef_construction: options.hnswEfConstruction, + }; +} + +function advisoryLockKey(schema: string): [number, number] { + const digest = createHash("sha256").update(schema).digest(); + return [digest.readInt32BE(0), digest.readInt32BE(4)]; +} + +const MAX_LOCK_RETRIES = 5; +const BASE_DELAY_MS = 100; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function acquireAdvisoryLock( + tx: SQL, + key1: number, + key2: number, +): Promise { + let acquired = false; + for (let attempt = 0; attempt < MAX_LOCK_RETRIES; attempt++) { + const [result] = await tx` + select pg_try_advisory_xact_lock(${key1}, ${key2}) as acquired + `; + if (result.acquired) { + acquired = true; + break; + } + if (attempt < MAX_LOCK_RETRIES - 1) { + await sleep(BASE_DELAY_MS * 2 ** attempt); + } + } + return acquired; +} + +async function doesEngineExist(tx: SQL, schema: string): Promise { + const [{ engineExists }] = await tx` + select exists + ( + select 1 + from pg_namespace n + where n.nspname = ${schema} + ) as "engineExists" + `; + return engineExists; +} + +async function provisionEngine(tx: SQL, schema: string): Promise { + await tx.unsafe(template(provisionSql, { schema })); +} + +async function runMigrations( + tx: SQL, + schema: string, + options: NormalizedMigrateEngineOptions, +): Promise { + // check ownership + await assertSchemaOwnership(tx, schema); + + // check version + const [{ version: dbVersion }] = await tx` + select version from ${tx(schema)}.version + `; + const cmp = semver.order(options.targetVersion, dbVersion); + // abort if target is older than the database + if (cmp < 0) { + throw new Error( + `Target version (${options.targetVersion}) is older than database version (${dbVersion}). ` + + "Please upgrade the server.", + ); + } + if (cmp === 0) { + // version matches. no need to run migrations + info("Engine core migration skipped, version current", { + "db.schema": schema, + "engine.version": dbVersion, + "engine.target_version": options.targetVersion, + }); + return; + } + + // run incremental migrations + const sorted1 = [...incrementals].sort((a, b) => + a.name.localeCompare(b.name), + ); + + for (const migration of sorted1) { + const [{ existing }] = await tx` + select exists + ( + select 1 + from ${tx(schema)}.migration + where name = ${migration.name} + ) as existing + `; + + if (existing) { + continue; + } + + await span("engine_core.migrate.incremental", { + attributes: { + "db.schema": schema, + "engine.migration": migration.name, + "engine.migration_type": "incremental", + "engine.target_version": options.targetVersion, + }, + callback: async () => { + const renderedSql = template( + migration.sql, + templateVars(schema, options), + ); + await tx.unsafe(renderedSql); + await tx` + insert into ${tx(schema)}.migration (name, applied_at_version) + values (${migration.name}, ${options.targetVersion})`; + }, + }); + info("Engine core migration applied", { + "db.schema": schema, + "engine.migration": migration.name, + "engine.migration_type": "incremental", + "engine.target_version": options.targetVersion, + }); + } + + // run idempotent migrations + const sorted2 = [...idempotents].sort((a, b) => a.name.localeCompare(b.name)); + + for (const migration of sorted2) { + await span("engine_core.migrate.idempotent", { + attributes: { + "db.schema": schema, + "engine.migration": migration.name, + "engine.migration_type": "idempotent", + "engine.target_version": options.targetVersion, + }, + callback: async () => { + const renderedSql = template( + migration.sql, + templateVars(schema, options), + ); + await tx.unsafe(renderedSql); + }, + }); + } + + // update version + await tx`update ${tx(schema)}.version set version = ${options.targetVersion}, at = now()`; +} + +async function assertSchemaOwnership(tx: SQL, schema: string): Promise { + const [result] = await tx` + select + n.nspowner = (select pg_catalog.to_regrole(current_user)::oid) as is_owner + from pg_catalog.pg_namespace n + where n.nspname = ${schema} + `; + + if (!result?.is_owner) { + throw new Error( + `Only the owner of the ${schema} schema can run database migrations`, + ); + } +} + +function template(sql: string, vars: Record): string { + return sql.replace(/\{\{(\w+)\}\}/g, (_, key) => { + if (!(key in vars)) { + throw new Error(`Missing template variable: ${key}`); + } + return String(vars[key]); + }); +} diff --git a/packages/engine-core/package.json b/packages/engine-core/package.json new file mode 100644 index 0000000..d8fd729 --- /dev/null +++ b/packages/engine-core/package.json @@ -0,0 +1,9 @@ +{ + "name": "@memory.build/engine-core", + "version": "0.0.0", + "private": true, + "type": "module", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1" + } +} diff --git a/packages/engine-core/slug.test.ts b/packages/engine-core/slug.test.ts new file mode 100644 index 0000000..a398df2 --- /dev/null +++ b/packages/engine-core/slug.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, test } from "bun:test"; +import { + isValidEngineSchema, + isValidSlug, + schemaToSlug, + slugToSchema, +} from "./slug"; + +describe("engine slugs", () => { + test("validates slugs", () => { + expect(isValidSlug("abc123def456")).toBe(true); + expect(isValidSlug("000000000000")).toBe(true); + + expect(isValidSlug("abc123def45")).toBe(false); + expect(isValidSlug("abc123def4567")).toBe(false); + expect(isValidSlug("ABC123def456")).toBe(false); + expect(isValidSlug("abc123_def45")).toBe(false); + expect(isValidSlug("abc123-def45")).toBe(false); + }); + + test("validates engine schemas", () => { + expect(isValidEngineSchema("me_abc123def456")).toBe(true); + expect(isValidEngineSchema("me_000000000000")).toBe(true); + + expect(isValidEngineSchema("abc123def456")).toBe(false); + expect(isValidEngineSchema("me_abc123def45")).toBe(false); + expect(isValidEngineSchema("me_abc123def4567")).toBe(false); + expect(isValidEngineSchema("me_ABC123def456")).toBe(false); + expect(isValidEngineSchema("me_abc123_def45")).toBe(false); + }); + + test("converts between slugs and schemas", () => { + const slug = "abc123def456"; + const schema = "me_abc123def456"; + + expect(slugToSchema(slug)).toBe(schema); + expect(schemaToSlug(schema)).toBe(slug); + expect(schemaToSlug(slugToSchema(slug))).toBe(slug); + }); +}); diff --git a/packages/engine-core/slug.ts b/packages/engine-core/slug.ts new file mode 100644 index 0000000..233910c --- /dev/null +++ b/packages/engine-core/slug.ts @@ -0,0 +1,18 @@ +const ENGINE_SCHEMA_RE = /^me_[a-z0-9]{12}$/; +const SLUG_RE = /^[a-z0-9]{12}$/; + +export function isValidEngineSchema(name: string): boolean { + return ENGINE_SCHEMA_RE.test(name); +} + +export function isValidSlug(slug: string): boolean { + return SLUG_RE.test(slug); +} + +export function slugToSchema(slug: string): string { + return `me_${slug}`; +} + +export function schemaToSlug(schema: string): string { + return schema.slice(3); +} diff --git a/packages/engine-core/tsconfig.json b/packages/engine-core/tsconfig.json new file mode 100644 index 0000000..23b1d27 --- /dev/null +++ b/packages/engine-core/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../../tsconfig.json", + "include": ["**/*.ts", "**/*.d.ts"] +} diff --git a/proto/.gitignore b/proto/.gitignore new file mode 100644 index 0000000..f5f242b --- /dev/null +++ b/proto/.gitignore @@ -0,0 +1 @@ +copy.sh diff --git a/proto/copy-memory.sh b/proto/copy-memory.sh new file mode 100755 index 0000000..cb635bf --- /dev/null +++ b/proto/copy-memory.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -euo pipefail + +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +repo_dir="$(dirname -- "$script_dir")" + +exec "$repo_dir/bun" "$script_dir/copy-memory.ts" diff --git a/proto/copy-memory.ts b/proto/copy-memory.ts new file mode 100644 index 0000000..79b8931 --- /dev/null +++ b/proto/copy-memory.ts @@ -0,0 +1,119 @@ +import { $ } from "bun"; + +const columns = [ + "id", + "meta", + "tree", + "temporal", + "content", + "embedding", + "embedding_version", + "created_at", + "created_by", + "updated_at", +].join(", "); + +function requireEnv(name: string): string { + const value = process.env[name]; + if (!value) throw new Error(`${name} is required`); + return value; +} + +function parseBatchSize(): number { + const value = process.env.BATCH_SIZE ?? "1000"; + const parsed = Number.parseInt(value, 10); + if (!Number.isInteger(parsed) || parsed < 1) { + throw new Error(`BATCH_SIZE must be a positive integer, got: ${value}`); + } + return parsed; +} + +function quoteIdent(name: string, value: string): string { + if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(value)) { + throw new Error(`${name} must be a simple SQL identifier, got: ${value}`); + } + return `"${value}"`; +} + +function quoteUuid(value: string): string { + if ( + !/^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/.test( + value, + ) + ) { + throw new Error(`Expected UUID boundary, got: ${value}`); + } + return `'${value}'::uuid`; +} + +async function psqlJson(databaseUrl: string, sql: string): Promise { + const output = + await $`psql ${databaseUrl} -X -q -t -A -v ON_ERROR_STOP=1 -c ${sql}`.text(); + return JSON.parse(output.trim()) as T; +} + +function progress(copied: number, total: number, startedAt: number): string { + const pct = total === 0 ? 100 : (copied / total) * 100; + const elapsedSeconds = (performance.now() - startedAt) / 1000; + const rowsPerSecond = elapsedSeconds > 0 ? copied / elapsedSeconds : 0; + return `${copied}/${total} (${pct.toFixed(1)}%, ${rowsPerSecond.toFixed(0)} rows/s)`; +} + +const databaseUrlFrom = requireEnv("DATABASE_URL_FROM"); +const databaseUrlTo = requireEnv("DATABASE_URL_TO"); +const schemaFrom = quoteIdent("SCHEMA_FROM", requireEnv("SCHEMA_FROM")); +const schemaTo = quoteIdent("SCHEMA_TO", requireEnv("SCHEMA_TO")); +const batchSize = parseBatchSize(); + +type Stats = { min_id: string | null; count: string }; +type Boundary = { last_id: string | null; count: string }; + +const stats = await psqlJson( + databaseUrlFrom, + `select json_build_object('min_id', (select id::text from ${schemaFrom}.memory order by id limit 1), 'count', count(*)::text)::text from ${schemaFrom}.memory`, +); +const total = Number.parseInt(stats.count, 10); +if (!Number.isSafeInteger(total)) { + throw new Error(`Source memory count is not a safe integer: ${stats.count}`); +} + +console.error( + `Copying ${total} memories from ${schemaFrom}.memory to ${schemaTo}.memory in batches of ${batchSize}`, +); +if (stats.min_id) console.error(`First source id: ${stats.min_id}`); + +let copied = 0; +let batch = 0; +let lastId: string | null = null; +const startedAt = performance.now(); + +while (copied < total) { + const whereAfterLast: string = lastId + ? `where id > ${quoteUuid(lastId)}` + : ""; + const boundary: Boundary = await psqlJson( + databaseUrlFrom, + `with batch as (select id from ${schemaFrom}.memory ${whereAfterLast} order by id limit ${batchSize}) select json_build_object('last_id', (select id::text from batch order by id desc limit 1), 'count', (select count(*)::text from batch))::text`, + ); + const batchRows = Number.parseInt(boundary.count, 10); + if (!Number.isSafeInteger(batchRows)) { + throw new Error(`Batch count is not a safe integer: ${boundary.count}`); + } + if (batchRows === 0 || !boundary.last_id) break; + + const lowerBound = lastId ? `id > ${quoteUuid(lastId)} and ` : ""; + const upperBound = `id <= ${quoteUuid(boundary.last_id)}`; + const sourceCopy = `\\copy (select ${columns} from ${schemaFrom}.memory where ${lowerBound}${upperBound} order by id) to stdout with (format binary)`; + const targetCopy = `\\copy ${schemaTo}.memory (${columns}) from stdin with (format binary)`; + + batch++; + await $`psql ${databaseUrlFrom} -X -q -v ON_ERROR_STOP=1 -c ${sourceCopy} | psql ${databaseUrlTo} -X -q -v ON_ERROR_STOP=1 -c ${targetCopy}`; + + copied += batchRows; + lastId = boundary.last_id; + console.error( + `Batch ${batch}: copied ${batchRows} rows, ${progress(copied, total, startedAt)}`, + ); +} + +console.error(`Finished copying ${copied} memories in ${batch} batches.`); diff --git a/proto/create.sh b/proto/create.sh new file mode 100755 index 0000000..57fc072 --- /dev/null +++ b/proto/create.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -euo pipefail + +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +repo_dir="$(dirname -- "$script_dir")" + +: "${DATABASE_URL:?DATABASE_URL is required}" + +"$repo_dir/bun" "$script_dir/create.ts" | psql -d "$DATABASE_URL" \ + --single-transaction \ + -v ON_ERROR_STOP=1 \ + -f - diff --git a/proto/create.sql b/proto/create.sql new file mode 100644 index 0000000..65d6f95 --- /dev/null +++ b/proto/create.sql @@ -0,0 +1,802 @@ + +------------------------------------------------------------------------------- +-- check database version +------------------------------------------------------------------------------- +select current_setting('server_version_num')::int < 180000 as bad_pg_version +\gset +\if :bad_pg_version +\warn postgres 18 or greater required +\q +\endif + +------------------------------------------------------------------------------- +-- check database version +------------------------------------------------------------------------------- +select current_setting('server_version_num')::int < 180000 as bad_pg_version +\gset +\if :bad_pg_version +\warn postgres 18 or greater required +\q +\endif + +------------------------------------------------------------------------------- +-- ensure extensions installed +------------------------------------------------------------------------------- +create extension if not exists citext with schema public version '1.6'; +create extension if not exists ltree with schema public version '1.3'; +create extension if not exists vector with schema public version '0.8.2'; +create extension if not exists pg_textsearch with schema public version '1.1.0'; + +------------------------------------------------------------------------------- +-- database roles +------------------------------------------------------------------------------- +do $block$ +declare + _roles text[] = array['me_ro', 'me_rw', 'me_embed']; + _role text; + _sql text; +begin + for _role in select * from unnest(_roles) loop + perform + from pg_roles r + where r.rolname = _role; + if found then + continue; + end if; + _sql = format($sql$create role %I nologin$sql$, _role); + execute _sql; + _sql = format($sql$grant %I to %I$sql$, _role, current_user); + execute _sql; + end loop; +end; +$block$; + +------------------------------------------------------------------------------- +-- engine schema +------------------------------------------------------------------------------- +drop schema if exists {{schema}} cascade; +create schema {{schema}}; + +------------------------------------------------------------------------------- +-- grant usage on engine schema to roles +------------------------------------------------------------------------------- +do $block$ +declare + _roles text[] = array['me_ro', 'me_rw', 'me_embed']; + _role text; + _sql text; +begin + for _role in select * from unnest(_roles) + loop + _sql = format($sql$grant usage on schema %I to %I$sql$, '{{schema}}', _role); + execute _sql; + end loop; +end; +$block$; + +------------------------------------------------------------------------------- +-- generic updated_at trigger +------------------------------------------------------------------------------- +create or replace function {{schema}}.update_updated_at() +returns trigger +as $func$ +begin + new.updated_at = pg_catalog.now(); + return new; +end; +$func$ language plpgsql volatile security definer +set search_path to {{schema}}, pg_temp; + +------------------------------------------------------------------------------- +-- users +------------------------------------------------------------------------------- +-- User: thing that accesses memories, or a role (can_login = false) +-- identity_id is a soft FK to accounts.identity (nullable for service users) +-- Note: "user" is a reserved word, must be quoted +create table {{schema}}."user" +( id uuid primary key default uuidv7() check (uuid_extract_version(id) = 7) +, name citext not null unique +, superuser boolean not null default false +, created_at timestamptz not null default now() +, updated_at timestamptz +); + +create or replace trigger user_updated_at +before update on {{schema}}."user" +for each row +execute function {{schema}}.update_updated_at() +; + +revoke all on {{schema}}."user" from public; +grant select on {{schema}}."user" to me_ro, me_rw; +-- NO direct writes on the table. must go through functions + +------------------------------------------------------------------------------- +-- role membership +------------------------------------------------------------------------------- +create table {{schema}}.role_membership +( role_id uuid not null references {{schema}}."user"(id) on delete cascade +, member_id uuid not null references {{schema}}."user"(id) on delete cascade +, with_admin_option boolean not null default false +, created_at timestamptz not null default now() +, primary key (member_id, role_id) +, constraint no_self_membership check (role_id != member_id) +); + +create index idx_role_membership_role on {{schema}}.role_membership(role_id) include (member_id); + +revoke all on {{schema}}.role_membership from public; +grant select on {{schema}}.role_membership to me_ro, me_rw; +-- NO direct writes on the table. must go through functions + +------------------------------------------------------------------------------- +-- would_create_cycle +------------------------------------------------------------------------------- +create or replace function {{schema}}.would_create_cycle +( _role_id uuid +, _member_id uuid +) +returns boolean +as $func$ + with recursive ancestors(id) as + ( + select rm.role_id + from {{schema}}.role_membership rm + where rm.member_id = _role_id + union + select rm.role_id + from {{schema}}.role_membership rm + inner join ancestors a on a.id = rm.member_id + ) + select _member_id = _role_id + or exists + ( + select 1 + from ancestors + where id = _member_id + ) +$func$ language sql stable security definer +parallel safe +set search_path to pg_catalog, {{schema}}, pg_temp +; + +revoke all on function {{schema}}.would_create_cycle(uuid, uuid) from public; +grant execute on function {{schema}}.would_create_cycle(uuid, uuid) to me_rw; + +------------------------------------------------------------------------------- +-- role_membership_before_write trigger +------------------------------------------------------------------------------- +-- Prevent role membership cycles for ordinary writes. +-- Note: this check observes the current transaction snapshot. Concurrent +-- transactions that insert/update related role edges can still race unless the +-- caller uses stronger locking or serializable transactions around +-- role_membership writes. +create or replace function {{schema}}.role_membership_before_write() +returns trigger +as $func$ +begin + if {{schema}}.would_create_cycle(new.role_id, new.member_id) then + raise exception 'role membership would create a cycle: role_id %, member_id %', new.role_id, new.member_id + using errcode = 'integrity_constraint_violation'; + end if; + return new; +end; +$func$ language plpgsql volatile security definer +set search_path to pg_catalog, {{schema}}, pg_temp +; + +create or replace trigger role_membership_before_write_trg +before insert or update of role_id, member_id on {{schema}}.role_membership +for each row +execute function {{schema}}.role_membership_before_write() +; + +------------------------------------------------------------------------------- +-- explode_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.explode_role_membership(_user_id uuid) +returns table +( role_id uuid +, superuser bool +, dist int4 +) +as $func$ + with recursive ancestors(id, dist) as + ( + select rm.role_id, 1::int4 + from {{schema}}.role_membership rm + where rm.member_id = _user_id + union + select rm.role_id, a.dist + 1 + from {{schema}}.role_membership rm + inner join ancestors a on a.id = rm.member_id + ) + select + u.id + , u.superuser + , 0::int4 + from {{schema}}."user" u + where u.id = _user_id + union + select + u.id + , u.superuser + , a.dist + from {{schema}}."user" u + inner join ancestors a on (u.id = a.id) +$func$ language sql stable security invoker +; + +revoke all on function {{schema}}.explode_role_membership(uuid) from public; +grant execute on function {{schema}}.explode_role_membership(uuid) to me_ro, me_rw; + +------------------------------------------------------------------------------- +-- is_superuser +------------------------------------------------------------------------------- +create or replace function {{schema}}.is_superuser(_user_id uuid) +returns boolean +as $func$ + select exists + ( + select 1 + from {{schema}}.explode_role_membership(_user_id) x + where x.superuser + ) +$func$ language sql stable security definer +set search_path to pg_catalog, {{schema}}, pg_temp +; + +revoke all on function {{schema}}.is_superuser(uuid) from public; +grant execute on function {{schema}}.is_superuser(uuid) to me_ro, me_rw; + +------------------------------------------------------------------------------- +-- grant_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.grant_role_membership +( _grantor_id uuid +, _role_id uuid +, _member_id uuid +, _with_admin_option bool default false +) +returns void +as $func$ +declare + _allowed bool; +begin + -- exclusive write access required to fully ensure against cycle creation by concurrent writes + lock table {{schema}}.role_membership in share row exclusive mode; + + -- is grantor allowed to do this? + select + exists + ( + -- does the grantor have with admin privilege directly on this role? + select 1 + from {{schema}}.role_membership rm + where rm.role_id = _role_id + and rm.member_id = _grantor_id + and rm.with_admin_option + ) + or {{schema}}.is_superuser(_grantor_id) -- or are they a superuser (even indirectly)? + into strict _allowed + ; + + if not _allowed then + raise exception 'grantor must be a superuser or have with admin option on role: grantor_id % role_id %', _grantor_id, _role_id + using errcode = 'insufficient_privilege'; + end if; + + -- role_membership_before_write_trg protects against cycles in the graph + insert into {{schema}}.role_membership + ( role_id + , member_id + , with_admin_option + ) + values + ( _role_id + , _member_id + , _with_admin_option + ) + on conflict (member_id, role_id) + do update set with_admin_option = _with_admin_option + ; +end; +$func$ language plpgsql volatile security definer +set search_path to pg_catalog, {{schema}}, pg_temp +; + +revoke all on function {{schema}}.grant_role_membership(uuid, uuid, uuid, bool) from public; +grant execute on function {{schema}}.grant_role_membership(uuid, uuid, uuid, bool) to me_rw; + +------------------------------------------------------------------------------- +-- revoke_role_membership +------------------------------------------------------------------------------- +create or replace function {{schema}}.revoke_role_membership +( _revoker_id uuid +, _role_id uuid +, _member_id uuid +) +returns void +as $func$ +declare + _allowed bool; +begin + lock table {{schema}}.role_membership in share row exclusive mode; + + -- is revoker allowed to do this? + select + exists + ( + -- does the revoker have with admin privilege directly on this role? + select 1 + from {{schema}}.role_membership rm + where rm.role_id = _role_id + and rm.member_id = _revoker_id + and rm.with_admin_option + ) + or {{schema}}.is_superuser(_revoker_id) -- or are they a superuser (even indirectly)? + into strict _allowed + ; + + if not _allowed then + raise exception 'revoker must be a superuser or have with admin option on role: revoker_id % role_id %', _revoker_id, _role_id + using errcode = 'insufficient_privilege'; + end if; + + delete from {{schema}}.role_membership d + where d.role_id = _role_id + and d.member_id = _member_id + ; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +revoke all on function {{schema}}.grant_role_membership(uuid, uuid, uuid, bool) from public; +grant execute on function {{schema}}.grant_role_membership(uuid, uuid, uuid, bool) to me_rw; + +------------------------------------------------------------------------------- +-- tree ownership +------------------------------------------------------------------------------- +create table {{schema}}.tree_owner +( tree_path ltree not null primary key +, user_id uuid not null references {{schema}}."user" (id) on delete cascade +, created_by uuid references {{schema}}."user" (id) +, created_at timestamptz not null default now() +); + +create index idx_tree_owner_user on {{schema}}.tree_owner (user_id); +create index idx_tree_owner_gist on {{schema}}.tree_owner using gist (tree_path); + +revoke all on {{schema}}.tree_owner from public; +grant select on {{schema}}.tree_owner to me_ro; +grant select, insert, update, delete on {{schema}}.tree_owner to me_rw; + +------------------------------------------------------------------------------- +-- tree grants +------------------------------------------------------------------------------- +create table {{schema}}.tree_grant +( id uuid primary key default uuidv7() check (uuid_extract_version(id) = 7) +, user_id uuid not null references {{schema}}."user"(id) on delete cascade +, tree_path ltree not null +, actions text[] not null check (actions <@ '{read,create,update,delete}'::text[]) +, granted_by uuid references {{schema}}."user"(id) +, created_at timestamptz not null default now() +, with_grant_option boolean not null default false +); + +create unique index idx_tree_grant_unique on {{schema}}.tree_grant (user_id, tree_path); +create index idx_tree_grant_path on {{schema}}.tree_grant using gist (tree_path); + +revoke all on {{schema}}.tree_grant from public; +grant select on {{schema}}.tree_grant to me_ro; +grant select, insert, update, delete on {{schema}}.tree_grant to me_rw; + +------------------------------------------------------------------------------- +-- calc_tree_privileges +------------------------------------------------------------------------------- +create or replace function {{schema}}.calc_tree_privileges(_user_id uuid) +returns table +( role_id uuid +, tree_path ltree +, actions text[] +, reason text +) +as $func$ + with r as + ( + -- the user + select + u.id as role_id + , u.superuser + from me."user" u + where u.id = _user_id + union + -- the roles they belong to + select + x.role_id + , x.superuser + from {{schema}}.explode_role_membership(_user_id) x + ) + -- superuser + select + r.role_id + , ''::ltree as tree_path + , array['read', 'create', 'update', 'delete'] as actions + , 'superuser' as reason + from r + where r.superuser + union all + -- ownership + select + r.role_id + , o.tree_path + , array['read', 'create', 'update', 'delete'] as actions + , 'owner' as reason + from r + inner join {{schema}}.tree_owner o on (r.role_id = o.user_id) + union all + -- grants + select + r.role_id + , g.tree_path + , g.actions + , 'grant' as reason + from r + inner join {{schema}}.tree_grant g on (r.role_id = g.user_id) +$func$ language sql stable security definer +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +revoke all on function {{schema}}.calc_tree_privileges(uuid) from public; +grant execute on function {{schema}}.calc_tree_privileges(uuid) to me_ro, me_rw; + +------------------------------------------------------------------------------- +-- has_tree_privilege +------------------------------------------------------------------------------- +create or replace function {{schema}}.has_tree_privilege +( _user_id uuid +, _tree_path ltree +, _actions text[] +) +returns bool +as $func$ + select + -- is the user a superuser? + exists + ( + select 1 + from {{schema}}.explode_role_membership(_user_id) x + where x.superuser + ) + -- or do they own the branch of the tree? + or exists + ( + select 1 + from {{schema}}.explode_role_membership(_user_id) x + inner join {{schema}}.tree_owner o + on (x.role_id = o.user_id and o.tree_path @> _tree_path) + ) + -- or do they have an explicit grant to the branch? + or exists + ( + select 1 + from {{schema}}.explode_role_membership(_user_id) x + inner join {{schema}}.tree_grant g + on + ( x.role_id = g.user_id + and g.tree_path @> _tree_path + and g.actions @> _actions + ) + ) +$func$ language sql stable security definer +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +revoke all on function {{schema}}.has_tree_privilege(uuid, ltree, text[]) from public; +grant execute on function {{schema}}.has_tree_privilege(uuid, ltree, text[]) to me_ro, me_rw; + +------------------------------------------------------------------------------- +-- memory +------------------------------------------------------------------------------- +create table {{schema}}.memory +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, meta jsonb not null default '{}' +, tree ltree not null default ''::ltree +, temporal tstzrange +, content text not null +, embedding halfvec({{embedding_dimensions}}) +, embedding_version int4 not null default 1 +, created_at timestamptz not null default now() +, created_by uuid +, updated_at timestamptz +); + +revoke all on {{schema}}.memory from public; +grant select on {{schema}}.memory to me_ro; +grant select, insert, update, delete on {{schema}}.memory to me_rw; +grant select, update on {{schema}}.memory to me_embed; + +-- index for faceted search +create index memory_meta_gin_idx on {{schema}}.memory using gin (meta); + +-- index for temporal search +create index memory_temporal_gist_idx on {{schema}}.memory using gist (temporal) where (temporal is not null); + +-- index for BM25 text search +create index memory_content_bm25_idx on {{schema}}.memory using bm25 (content) +with (text_config = {{bm25_text_config}}, k1 = {{bm25_k1}}, b = {{bm25_b}}); + +-- index for vector similarity search +create index memory_embedding_hnsw_idx on {{schema}}.memory using hnsw (embedding halfvec_cosine_ops) +with (m = {{hnsw_m}}, ef_construction = {{hnsw_ef_construction}}); + +-- index for hierarchical organization +create index memory_tree_gist_idx on {{schema}}.memory using gist (tree); + +-- make sure the metadata is an object +alter table {{schema}}.memory add check (jsonb_typeof(meta) = 'object'); + +/* +enforce consistent temporal range conventions: +- point-in-time events: lower = upper with inclusive bounds '[same,same]' +- time periods: lower < upper with inclusive-exclusive bounds '[start,end)' +*/ +alter table {{schema}}.memory add constraint temporal_bounds_convention check +( + temporal is null + or ( + -- point-in-time: both bounds equal and inclusive + (lower(temporal) = upper(temporal) and lower_inc(temporal) and upper_inc(temporal)) + or + -- time range: start before end, inclusive-exclusive + (lower(temporal) < upper(temporal) and lower_inc(temporal) and not upper_inc(temporal)) + ) +); + +------------------------------------------------------------------------------- +-- memory triggers +------------------------------------------------------------------------------- +create or replace function {{schema}}.memory_before_update() +returns trigger +as $func$ +begin + -- always update the timestamp + new.updated_at = pg_catalog.now(); + + -- content changed -> new embedding needs to be generated + if old.content is distinct from new.content + and old.embedding is not distinct from new.embedding + then + new.embedding = null; + new.embedding_version = old.embedding_version operator(pg_catalog.+) 1; + end if; + + return new; +end; +$func$ language plpgsql volatile security definer +set search_path to {{schema}}, public, pg_temp; -- public required for pgvector's `is not distinct from` + +create or replace trigger memory_before_update_trg +before update on {{schema}}.memory +for each row +execute function {{schema}}.memory_before_update(); + +------------------------------------------------------------------------------- +-- embedding queue +------------------------------------------------------------------------------- +-- per-engine embedding queue table +create table {{schema}}.embedding_queue +( id bigint generated always as identity primary key +, memory_id uuid not null references {{schema}}.memory(id) on delete cascade +, embedding_version int not null +, vt timestamptz not null default now() +, outcome text check (outcome is null or outcome in ('completed', 'failed', 'cancelled')) +, attempts int not null default 0 +, last_error text +, created_at timestamptz not null default now() +); + +-- index to find items to claim +create index embedding_queue_claim_idx on {{schema}}.embedding_queue (vt) where outcome is null; +-- index also used in finding items to claim. used to ensure there aren't any items for the same memory with a newer version +create index embedding_queue_memory_idx on {{schema}}.embedding_queue (memory_id, embedding_version desc) where outcome is null; +-- index to find items that have resolved to an outcome. these can be pruned +create index embedding_queue_archive_idx on {{schema}}.embedding_queue (created_at) where outcome is not null; + +grant select, update, delete on {{schema}}.embedding_queue to me_embed; + +------------------------------------------------------------------------------- +-- enqueue_embedding +------------------------------------------------------------------------------- +-- this must be security definer because we won't allow me_rw to access queue directly +create or replace function {{schema}}.enqueue_embedding() +returns trigger +as $func$ +begin + insert into {{schema}}.embedding_queue (memory_id, embedding_version) + values (new.id, new.embedding_version); + return new; +end; +$func$ +language plpgsql volatile security definer +set search_path to pg_catalog, {{schema}}, pg_temp +; + +------------------------------------------------------------------------------- +-- enqueuing triggers +------------------------------------------------------------------------------- +create or replace trigger memory_enqueue_embedding_insert +after insert on {{schema}}.memory +for each row +when (new.embedding is null) -- it's possible to insert WITH an embedding +execute function {{schema}}.enqueue_embedding() +; + +create or replace trigger memory_enqueue_embedding_update +after update on {{schema}}.memory +for each row +when +( old.content is distinct from new.content + and new.embedding is null +) +execute function {{schema}}.enqueue_embedding() +; + +------------------------------------------------------------------------------- +-- claim_embedding_batch +------------------------------------------------------------------------------- +create or replace function {{schema}}.claim_embedding_batch +( _batch_size int default 10 +, _lock_duration interval default '5 minutes' +, _max_attempts int default 3 +) +returns table +( queue_id bigint +, memory_id uuid +, embedding_version int +, content text +) +as $func$ +declare + _rec record; + _mem record; + _claimed_count int = 0; +begin + -- bulk-cancel visible queue rows superseded by a newer row for the same memory + update {{schema}}.embedding_queue eq + set outcome = 'cancelled' + where eq.outcome is null + and eq.vt <= now() + and exists + ( + select 1 + from {{schema}}.embedding_queue newer + where newer.memory_id = eq.memory_id + and newer.embedding_version > eq.embedding_version + and newer.outcome is null + ); + + -- sweep: finalize exhausted rows orphaned by worker crash + -- (attempts reached max but outcome was never written back) + update {{schema}}.embedding_queue + set + outcome = 'failed' + , last_error = coalesce(last_error, 'exceeded max attempts (worker crash)') + where outcome is null + and vt <= now() + and attempts >= _max_attempts + ; + + for _rec in + ( + select + eq.id + , eq.memory_id + , eq.embedding_version + from {{schema}}.embedding_queue eq + where eq.outcome is null + and eq.vt <= now() + and eq.attempts < _max_attempts + order by eq.vt + for update skip locked + ) + loop + -- check memory still exists + current version + select m.content, m.embedding_version + into _mem + from {{schema}}.memory m + where m.id = _rec.memory_id + ; + + if not found or _mem.content is null then + -- memory deleted or empty → cancel queue row + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + if _rec.embedding_version != _mem.embedding_version then + -- stale version → cancel + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + -- claim this row + update {{schema}}.embedding_queue q set + vt = now() + _lock_duration + , attempts = q.attempts + 1 + where id = _rec.id; + + queue_id = _rec.id; + memory_id = _rec.memory_id; + embedding_version = _rec.embedding_version; + content = _mem.content; + return next; + + _claimed_count = _claimed_count + 1; + exit when _claimed_count >= _batch_size; + end loop; +end; +$func$ +language plpgsql volatile +set search_path to pg_catalog, {{schema}}, pg_temp +; + +grant execute on function {{schema}}.claim_embedding_batch(int, interval, int) to me_embed; + +------------------------------------------------------------------------------- +-- prune embedding queue +------------------------------------------------------------------------------- +-- prune terminal queue rows older than the retention window. +-- runs opportunistically from the worker on engines that returned no +-- claimable work, so the queue table doesn't grow unbounded. +-- +-- relies on embedding_queue_archive_idx (created_at) where outcome is not null +-- from migration 005, so the no-op case is cheap. +create or replace function {{schema}}.prune_embedding_queue(_retention interval default '7 days') +returns bigint +as $func$ +declare + pruned bigint; +begin + delete from {{schema}}.embedding_queue + where outcome is not null + and created_at < now() - _retention + ; + get diagnostics pruned = row_count; + return pruned; +end; +$func$ +language plpgsql volatile +set search_path to pg_catalog, {{schema}}, pg_temp +; + +-- me_embed already has DELETE on embedding_queue (granted in 005); +-- this just exposes the function entrypoint. +grant execute on function {{schema}}.prune_embedding_queue(interval) to me_embed; + + + + + +------------------------------------------------------------------------------- +-- api keys +------------------------------------------------------------------------------- +-- Engine-scoped, user-scoped authentication +create table {{schema}}.api_key +( id uuid primary key default uuidv7() check (uuid_extract_version(id) = 7) +, user_id uuid not null references {{schema}}."user" on delete cascade +, lookup_id text unique not null check (lookup_id ~ '^[A-Za-z0-9_-]{16}$') +, key_hash text not null +, name text not null +, expires_at timestamptz +, created_at timestamptz not null default now() +, revoked_at timestamptz +); + +create index idx_api_key_user on {{schema}}.api_key (user_id); +create index idx_api_key_lookup on {{schema}}.api_key (lookup_id) where revoked_at is null; diff --git a/proto/create.ts b/proto/create.ts new file mode 100644 index 0000000..bdd2884 --- /dev/null +++ b/proto/create.ts @@ -0,0 +1,26 @@ +const variables: Record = { + schema: "me", + embedding_dimensions: "1536", + bm25_text_config: "english", + bm25_k1: "1.2", + bm25_b: "0.75", + hnsw_m: "16", + hnsw_ef_construction: "64", +}; + +const input = await Bun.file(new URL("./create.sql", import.meta.url)).text(); + +const output = input.replace(/\{\{([a-zA-Z0-9_]+)\}\}/g, (match, name) => { + const value = variables[name]; + if (value === undefined) { + throw new Error(`No value configured for placeholder ${match}`); + } + return value; +}); + +const unresolved = output.match(/\{\{[a-zA-Z0-9_]+\}\}/g); +if (unresolved) { + throw new Error(`Unresolved placeholders: ${unresolved.join(", ")}`); +} + +process.stdout.write(output); diff --git a/proto/data.sql b/proto/data.sql new file mode 100644 index 0000000..8ab14c8 --- /dev/null +++ b/proto/data.sql @@ -0,0 +1,58 @@ + + +------------------------------------------------------------------------------- +-- testing data +------------------------------------------------------------------------------- +begin; + +insert into me."user" (id, name, superuser) +values + ('019e2833-f217-7457-ba8b-f110393b6d1c', 'user_0', true) +, ('019e2833-f217-74a0-84ca-49e9655ed2e2', 'user_1', true) +, ('019e2833-f217-74a9-9860-fde559ebc44f', 'user_2', false) +, ('019e2833-f217-74af-b5f5-c0cd0beb78ab', 'user_3', false) +, ('019e2833-f217-74b6-97f5-bad28152696d', 'user_4', false) +, ('019e2833-f217-74bc-9ae6-54f009c08d3e', 'user_5', false) +, ('019e2833-f217-74c2-a612-9ccc17e11380', 'user_6', false) +, ('019e2833-f217-74c8-8eab-bcfcedc95d29', 'user_7', false) +, ('019e2833-f217-74ce-8820-6ea10aebd123', 'user_8', false) +, ('019e2833-f217-74d5-b22f-0091833bf484', 'user_9', false) +; + +insert into me."user" (id, name, superuser) +values + ('019e2835-3ece-7cbc-a450-4abb1d3437c2','role_0', true ) +, ('019e2835-3ece-7d03-91bb-61c94fa959a5','role_0.1', false) +, ('019e2835-3ece-7d0b-bf85-7b8707750774','role_1', false) +, ('019e2835-3ece-7d11-aaa5-24414460784f','role_1.1', false) +, ('019e2835-3ece-7d17-8d9d-7291258c8d0b','role_1.2', false) +, ('019e2835-3ece-7d1e-9acd-b4597611d70c','role_1.2.1', false) +; + +-- roles to roles +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7cbc-a450-4abb1d3437c2', '019e2835-3ece-7d03-91bb-61c94fa959a5'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d0b-bf85-7b8707750774', '019e2835-3ece-7d11-aaa5-24414460784f'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d0b-bf85-7b8707750774', '019e2835-3ece-7d17-8d9d-7291258c8d0b'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d17-8d9d-7291258c8d0b', '019e2835-3ece-7d1e-9acd-b4597611d70c'); + +-- add users to roles +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7cbc-a450-4abb1d3437c2', '019e2833-f217-74a9-9860-fde559ebc44f'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d03-91bb-61c94fa959a5', '019e2833-f217-74af-b5f5-c0cd0beb78ab'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d0b-bf85-7b8707750774', '019e2833-f217-74b6-97f5-bad28152696d'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d11-aaa5-24414460784f', '019e2833-f217-74bc-9ae6-54f009c08d3e'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d17-8d9d-7291258c8d0b', '019e2833-f217-74c2-a612-9ccc17e11380'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d1e-9acd-b4597611d70c', '019e2833-f217-74c8-8eab-bcfcedc95d29'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d1e-9acd-b4597611d70c', '019e2833-f217-74ce-8820-6ea10aebd123'); +select me.grant_role_membership('019e2833-f217-7457-ba8b-f110393b6d1c', '019e2835-3ece-7d1e-9acd-b4597611d70c', '019e2833-f217-74d5-b22f-0091833bf484'); +commit; + + +insert into me.tree_grant +( user_id +, tree_path +, actions +, granted_by +) +values + ('019e2833-f217-74ce-8820-6ea10aebd123', 'wikipedia.air_force', array['read'], '019e2833-f217-7457-ba8b-f110393b6d1c') +; diff --git a/scripts/migrate-engine-core.ts b/scripts/migrate-engine-core.ts new file mode 100644 index 0000000..2800a40 --- /dev/null +++ b/scripts/migrate-engine-core.ts @@ -0,0 +1,29 @@ +#!/usr/bin/env bun + +import { SQL } from "bun"; +import { bootstrapEngineDatabase } from "../packages/engine-core/migrate/bootstrap"; +import { migrateEngine } from "../packages/engine-core/migrate/migrate"; +import { slugToSchema } from "../packages/engine-core/slug"; + +const ENGINE_SLUG = "dev000000001"; +const TARGET_VERSION = process.env.TARGET_VERSION ?? "0.1.0"; +const DATABASE_URL = + process.env.DATABASE_URL ?? + process.env.ENGINE_DATABASE_URL ?? + "postgresql://postgres@localhost:5432/postgres"; + +const sql = new SQL(DATABASE_URL); + +try { + console.log( + `Bootstrapping engine database and migrating ${slugToSchema(ENGINE_SLUG)} to ${TARGET_VERSION}`, + ); + await bootstrapEngineDatabase(sql); + await migrateEngine(sql, { + slug: ENGINE_SLUG, + targetVersion: TARGET_VERSION, + }); + console.log(`Engine schema ${slugToSchema(ENGINE_SLUG)} is up to date.`); +} finally { + await sql.close(); +}