diff --git a/.env.sample b/.env.sample index 74a6131..7ab64b6 100644 --- a/.env.sample +++ b/.env.sample @@ -57,22 +57,6 @@ GOOGLE_CLIENT_SECRET= # Cron schedule for cleaning up expired device authorizations (UTC) # DEVICE_FLOW_CLEANUP_CRON=*/15 * * * * -# ----------------------------------------------------------------------------- -# Optional — Embedding -# ----------------------------------------------------------------------------- - -# Base URL for the embedding API (omit for OpenAI default) -# EMBEDDING_BASE_URL= - -# Timeout per embedding API call (ms) -# EMBEDDING_TIMEOUT_MS= - -# Max retries on transient failures (default: 2, from Vercel AI SDK) -# EMBEDDING_MAX_RETRIES= - -# Max concurrent chunk requests for batch embedding (default: Infinity) -# EMBEDDING_MAX_PARALLEL_CALLS= - # ----------------------------------------------------------------------------- # Optional — Telemetry # ----------------------------------------------------------------------------- @@ -144,6 +128,22 @@ GOOGLE_CLIENT_SECRET= # PostgreSQL idle-in-transaction timeout for engine DB transactions # ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT=30s +# ----------------------------------------------------------------------------- +# Optional — Embedding +# ----------------------------------------------------------------------------- + +# Base URL for the embedding API (omit for OpenAI default) +# EMBEDDING_BASE_URL= + +# Timeout per embedding API call (ms) +# EMBEDDING_TIMEOUT_MS= + +# Max retries on transient failures (default: 2, from Vercel AI SDK) +# EMBEDDING_MAX_RETRIES= + +# Max concurrent chunk requests for batch embedding (default: Infinity) +# EMBEDDING_MAX_PARALLEL_CALLS= + # ----------------------------------------------------------------------------- # Optional — Embedding Worker # ----------------------------------------------------------------------------- @@ -166,6 +166,30 @@ GOOGLE_CLIENT_SECRET= # Engine re-discovery interval (ms) # WORKER_REFRESH_INTERVAL_MS=60000 +# ----------------------------------------------------------------------------- +# Optional — Embedding Worker Engine Database +# ----------------------------------------------------------------------------- + +# PostgreSQL connection string for embedding worker engine traffic. +# Defaults to ENGINE_DATABASE_URL when unset. +# WORKER_ENGINE_DATABASE_URL=postgres://postgres:postgres@localhost:5432/me + +# Maximum connections in the dedicated embedding worker engine pool. +# Defaults to max(WORKER_COUNT, 1) when unset. +# WORKER_ENGINE_POOL_MAX=2 + +# Close idle embedding worker engine connections after N seconds. +# Defaults to ENGINE_POOL_IDLE_REAP_SECONDS when unset. +# WORKER_ENGINE_POOL_IDLE_REAP_SECONDS=300 + +# Max embedding worker engine connection lifetime in seconds (0 = forever). +# Defaults to ENGINE_POOL_MAX_LIFETIME when unset. +# WORKER_ENGINE_POOL_MAX_LIFETIME=0 + +# Timeout for establishing embedding worker engine connections (seconds). +# Defaults to ENGINE_POOL_CONNECTION_TIMEOUT when unset. +# WORKER_ENGINE_POOL_CONNECTION_TIMEOUT=30 + # PostgreSQL statement timeout for embedding worker engine DB transactions # Defaults to ENGINE_STATEMENT_TIMEOUT when unset. # WORKER_ENGINE_STATEMENT_TIMEOUT=25s diff --git a/packages/server/index.ts b/packages/server/index.ts index a9399a0..c3f07d7 100644 --- a/packages/server/index.ts +++ b/packages/server/index.ts @@ -8,6 +8,10 @@ import { } from "@memory.build/engine/migrate"; import { bootstrap as bootstrapEngine } from "@memory.build/engine/migrate/bootstrap"; import { migrateAll as migrateEngines } from "@memory.build/engine/migrate/runner"; +import { + DEFAULT_ENGINE_TIMEOUTS, + type EngineTimeouts, +} from "@memory.build/engine/ops/_tx"; import { WorkerPool } from "@memory.build/worker"; import { configure, info, reportError, span } from "@pydantic/logfire-node"; import { MIN_CLIENT_VERSION, SERVER_VERSION } from "../../version"; @@ -90,6 +94,17 @@ configure({ // ENGINE_TRANSACTION_TIMEOUT - Per-engine-transaction timeout (default: 30s) // ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Idle-in-transaction timeout (default: 30s) // +// Embedding Worker Engine Database: +// WORKER_ENGINE_DATABASE_URL - PostgreSQL connection string for worker engine traffic (default: ENGINE_DATABASE_URL) +// WORKER_ENGINE_POOL_MAX - Max worker engine connections (default: WORKER_COUNT) +// WORKER_ENGINE_POOL_IDLE_REAP_SECONDS - Close idle pooled connections after N seconds (default: ENGINE_POOL_IDLE_REAP_SECONDS) +// WORKER_ENGINE_POOL_MAX_LIFETIME - Max lifetime in seconds, 0=forever (default: ENGINE_POOL_MAX_LIFETIME) +// WORKER_ENGINE_POOL_CONNECTION_TIMEOUT - Connection timeout in seconds (default: ENGINE_POOL_CONNECTION_TIMEOUT) +// WORKER_ENGINE_STATEMENT_TIMEOUT - Worker engine query timeout (default: ENGINE_STATEMENT_TIMEOUT) +// WORKER_ENGINE_LOCK_TIMEOUT - Worker engine lock wait timeout (default: ENGINE_LOCK_TIMEOUT) +// WORKER_ENGINE_TRANSACTION_TIMEOUT - Worker engine transaction timeout (default: ENGINE_TRANSACTION_TIMEOUT) +// WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Worker engine idle-in-transaction timeout (default: ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT) +// // Cleanup: // DEVICE_FLOW_CLEANUP_CRON - Cron schedule for cleaning up expired device auths // (default: "*/15 * * * *" = every 15 minutes, UTC) @@ -101,10 +116,6 @@ configure({ // WORKER_IDLE_DELAY_MS - Poll interval when idle in ms (default: 10000) // WORKER_MAX_BACKOFF_MS - Max error backoff in ms (default: 60000) // WORKER_REFRESH_INTERVAL_MS - Engine re-discovery interval in ms (default: 60000) -// WORKER_ENGINE_STATEMENT_TIMEOUT - Worker engine query timeout (default: ENGINE_STATEMENT_TIMEOUT) -// WORKER_ENGINE_LOCK_TIMEOUT - Worker engine lock wait timeout (default: ENGINE_LOCK_TIMEOUT) -// WORKER_ENGINE_TRANSACTION_TIMEOUT - Worker engine transaction timeout (default: ENGINE_TRANSACTION_TIMEOUT) -// WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Worker engine idle-in-transaction timeout (default: ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT) // // ============================================================================= @@ -153,6 +164,12 @@ const deviceFlowCleanupCron = const accountsSchema = process.env.ACCOUNTS_SCHEMA || "accounts"; +const workerCount = parseIntEnv( + "WORKER_COUNT", + process.env.WORKER_COUNT || "", + "2", +); + // Connection pool settings - Accounts database const accountsPoolMax = parseIntEnv( "ACCOUNTS_POOL_MAX", @@ -197,6 +214,44 @@ const enginePoolConnectionTimeout = parseIntEnv( "30", ); +// Connection pool settings - Embedding worker engine database +const workerEngineDatabaseUrl = + process.env.WORKER_ENGINE_DATABASE_URL || engineDatabaseUrl; +const workerEnginePoolMax = parseIntEnv( + "WORKER_ENGINE_POOL_MAX", + process.env.WORKER_ENGINE_POOL_MAX || "", + String(Math.max(workerCount, 1)), +); +const workerEnginePoolIdleReapSeconds = parseIntEnv( + "WORKER_ENGINE_POOL_IDLE_REAP_SECONDS", + process.env.WORKER_ENGINE_POOL_IDLE_REAP_SECONDS || "", + String(enginePoolIdleReapSeconds), +); +const workerEnginePoolMaxLifetime = parseIntEnv( + "WORKER_ENGINE_POOL_MAX_LIFETIME", + process.env.WORKER_ENGINE_POOL_MAX_LIFETIME || "", + String(enginePoolMaxLifetime), +); +const workerEnginePoolConnectionTimeout = parseIntEnv( + "WORKER_ENGINE_POOL_CONNECTION_TIMEOUT", + process.env.WORKER_ENGINE_POOL_CONNECTION_TIMEOUT || "", + String(enginePoolConnectionTimeout), +); +const workerEngineTimeouts: EngineTimeouts = { + statementTimeout: + process.env.WORKER_ENGINE_STATEMENT_TIMEOUT ?? + DEFAULT_ENGINE_TIMEOUTS.statementTimeout, + lockTimeout: + process.env.WORKER_ENGINE_LOCK_TIMEOUT ?? + DEFAULT_ENGINE_TIMEOUTS.lockTimeout, + transactionTimeout: + process.env.WORKER_ENGINE_TRANSACTION_TIMEOUT ?? + DEFAULT_ENGINE_TIMEOUTS.transactionTimeout, + idleInTransactionSessionTimeout: + process.env.WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT ?? + DEFAULT_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout, +}; + // ============================================================================= // Embedding Config // ============================================================================= @@ -305,6 +360,13 @@ const engineSql = new Bun.SQL(engineDatabaseUrl, { connectionTimeout: enginePoolConnectionTimeout, }); +const workerEngineSql = new Bun.SQL(workerEngineDatabaseUrl, { + max: workerEnginePoolMax, + idleTimeout: workerEnginePoolIdleReapSeconds, + maxLifetime: workerEnginePoolMaxLifetime, + connectionTimeout: workerEnginePoolConnectionTimeout, +}); + // Create accounts DB with operations layer const accountsDb = createAccountsDB(accountsSql, accountsSchema, { masterKey: masterKeyBuffer, @@ -410,13 +472,7 @@ const router = createRouter(serverContext); // Embedding Worker Pool // ============================================================================= -const workerCount = parseIntEnv( - "WORKER_COUNT", - process.env.WORKER_COUNT || "", - "2", -); - -const workerPool = new WorkerPool(engineSql, { +const workerPool = new WorkerPool(workerEngineSql, { embedding: embeddingConfig, discover: async () => { const engines = await accountsDb.listActiveEngines(); @@ -446,6 +502,7 @@ const workerPool = new WorkerPool(engineSql, { process.env.WORKER_REFRESH_INTERVAL_MS || "", "60000", ), + workerEngineTimeouts, }); await workerPool.start(workerCount); @@ -535,6 +592,7 @@ async function shutdown() { try { await accountsSql.close(); await engineSql.close(); + await workerEngineSql.close(); } catch (error) { reportError("Error closing database connections", error as Error); } diff --git a/packages/worker/pool.ts b/packages/worker/pool.ts index cd178ef..0d4a806 100644 --- a/packages/worker/pool.ts +++ b/packages/worker/pool.ts @@ -3,7 +3,7 @@ import type { WorkerConfig, WorkerStats } from "./types"; import { Worker } from "./worker"; /** - * Pool of N embedding workers sharing a single SQL connection pool. + * Pool of N embedding workers using the provided SQL connection pool. * Each worker independently discovers engines, shuffles its target list, * and polls queues. FOR UPDATE SKIP LOCKED prevents double-processing. */ diff --git a/packages/worker/process.ts b/packages/worker/process.ts index 9532923..93ce22b 100644 --- a/packages/worker/process.ts +++ b/packages/worker/process.ts @@ -12,28 +12,19 @@ import { info, reportError, span, warning } from "@pydantic/logfire-node"; import type { SQL } from "bun"; import type { EngineTarget, ProcessResult, WorkerConfig } from "./types"; -const WORKER_ENGINE_TIMEOUTS: EngineTimeouts = { - statementTimeout: - process.env.WORKER_ENGINE_STATEMENT_TIMEOUT ?? - DEFAULT_ENGINE_TIMEOUTS.statementTimeout, - lockTimeout: - process.env.WORKER_ENGINE_LOCK_TIMEOUT ?? - DEFAULT_ENGINE_TIMEOUTS.lockTimeout, - transactionTimeout: - process.env.WORKER_ENGINE_TRANSACTION_TIMEOUT ?? - DEFAULT_ENGINE_TIMEOUTS.transactionTimeout, - idleInTransactionSessionTimeout: - process.env.WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT ?? - DEFAULT_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout, -}; +function workerEngineTimeouts(config?: WorkerConfig): EngineTimeouts { + return config?.workerEngineTimeouts ?? DEFAULT_ENGINE_TIMEOUTS; +} -const WORKER_ENGINE_TIMEOUT_ATTRIBUTES = { - "db.statement_timeout": WORKER_ENGINE_TIMEOUTS.statementTimeout, - "db.lock_timeout": WORKER_ENGINE_TIMEOUTS.lockTimeout, - "db.transaction_timeout": WORKER_ENGINE_TIMEOUTS.transactionTimeout, - "db.idle_in_transaction_session_timeout": - WORKER_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout, -}; +function workerEngineTimeoutAttributes(timeouts: EngineTimeouts) { + return { + "db.statement_timeout": timeouts.statementTimeout, + "db.lock_timeout": timeouts.lockTimeout, + "db.transaction_timeout": timeouts.transactionTimeout, + "db.idle_in_transaction_session_timeout": + timeouts.idleInTransactionSessionTimeout, + }; +} function asError(error: unknown): Error { return error instanceof Error ? error : new Error(String(error)); @@ -49,11 +40,13 @@ export async function pruneQueue( sql: SQL, target: EngineTarget, retention: string, + config?: WorkerConfig, ): Promise { const { schema, shard } = target; + const timeouts = workerEngineTimeouts(config); return sql.begin(async (tx) => { await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`); - await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS); + await setLocalEngineTimeouts(tx, timeouts); await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`); await tx.unsafe("SET LOCAL ROLE me_embed"); const rows = (await tx.unsafe( @@ -85,12 +78,14 @@ export async function processBatch( const { schema, shard } = target; const batchSize = config.batchSize ?? 10; const lockDuration = config.lockDuration ?? "5 minutes"; + const timeouts = workerEngineTimeouts(config); + const timeoutAttributes = workerEngineTimeoutAttributes(timeouts); // --- Claim --- const claimStart = performance.now(); const claimed = await sql.begin(async (tx) => { await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`); - await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS); + await setLocalEngineTimeouts(tx, timeouts); await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`); await tx.unsafe("SET LOCAL ROLE me_embed"); return tx.unsafe( @@ -113,7 +108,7 @@ export async function processBatch( "batch.claim_duration_ms": claimDurationMs, "batch.memoryIds": claimed.map((r) => r.memory_id), "batch.queueIds": claimed.map((r) => r.queue_id), - ...WORKER_ENGINE_TIMEOUT_ATTRIBUTES, + ...timeoutAttributes, }); // Process claimed items with telemetry @@ -127,7 +122,7 @@ export async function processBatch( "batch.claim_duration_ms": claimDurationMs, "batch.memoryIds": claimed.map((r) => r.memory_id), "batch.queueIds": claimed.map((r) => r.queue_id), - ...WORKER_ENGINE_TIMEOUT_ATTRIBUTES, + ...timeoutAttributes, }, callback: async () => { // --- Embed --- @@ -145,7 +140,7 @@ export async function processBatch( // and should not consume max_attempts await sql.begin(async (tx) => { await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`); - await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS); + await setLocalEngineTimeouts(tx, timeouts); await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`); await tx.unsafe("SET LOCAL ROLE me_embed"); for (const row of claimed) { @@ -184,14 +179,14 @@ export async function processBatch( "batch.size": claimed.length, "batch.embed_successes": embedResults.filter((r) => !r.error).length, "batch.embed_errors": embedResults.filter((r) => r.error).length, - ...WORKER_ENGINE_TIMEOUT_ATTRIBUTES, + ...timeoutAttributes, }, callback: async () => { for (const row of claimed) { try { await sql.begin(async (tx) => { await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`); - await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS); + await setLocalEngineTimeouts(tx, timeouts); await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`); await tx.unsafe("SET LOCAL ROLE me_embed"); @@ -258,7 +253,7 @@ export async function processBatch( try { await sql.begin(async (tx) => { await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`); - await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS); + await setLocalEngineTimeouts(tx, timeouts); await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`); await tx.unsafe("SET LOCAL ROLE me_embed"); await tx.unsafe( diff --git a/packages/worker/types.ts b/packages/worker/types.ts index 7796c89..1388cbf 100644 --- a/packages/worker/types.ts +++ b/packages/worker/types.ts @@ -1,4 +1,5 @@ import type { EmbeddingConfig } from "@memory.build/embedding"; +import type { EngineTimeouts } from "@memory.build/engine/ops/_tx"; export interface EngineTarget { schema: string; @@ -19,6 +20,8 @@ export interface WorkerConfig { maxBackoffMs?: number; /** How often to re-discover engines (default: 60_000ms) */ refreshIntervalMs?: number; + /** PostgreSQL transaction/session timeouts for worker engine DB work */ + workerEngineTimeouts?: EngineTimeouts; /** Exit gracefully after this much idle time (optional) */ drainTimeoutMs?: number; /** diff --git a/packages/worker/worker.ts b/packages/worker/worker.ts index ffe7915..055d45c 100644 --- a/packages/worker/worker.ts +++ b/packages/worker/worker.ts @@ -169,7 +169,12 @@ async function run( // terminal queue rows. Best-effort: failures are logged but do // not trigger the worker error backoff path. try { - const pruned = await pruneQueue(sql, target, pruneRetention); + const pruned = await pruneQueue( + sql, + target, + pruneRetention, + config, + ); stats.totalPruned += pruned; } catch (pruneError) { if (isMissingSchemaError(pruneError)) {