From 66889d047cad77c0afc74e5663cb434de45aebf3 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 25 Apr 2026 20:56:43 -0600 Subject: [PATCH 1/7] Add CachingRocksDatabase with VT-backed WeakLRUCache for primary stores Introduces CachingRocksDatabase, a RocksDatabase subclass that layers a per-instance WeakLRUCache on top of the process-wide VerificationTable. Cache freshness is checked via entry.version (Harper's localTime timestamp) rather than a txnId, so reads hitting a fresh VT slot skip disk entirely. Primary RocksDB stores now use CachingRocksDatabase instead of RocksDatabase. DatabaseTransaction creates transactions with coordinatedRetry: true, so IsBusy conflicts are signalled as RETRY_NOW_VALUE (immediate retry, no backoff) rather than an ERR_BUSY rejection. The old quadratic-backoff retry handler is removed. Co-Authored-By: Claude Sonnet 4.6 --- package.json | 1 + resources/CachingRocksDatabase.ts | 84 +++++++++++++++++++++++++++++++ resources/DatabaseTransaction.ts | 30 ++++------- resources/databases.ts | 3 +- 4 files changed, 97 insertions(+), 21 deletions(-) create mode 100644 resources/CachingRocksDatabase.ts diff --git a/package.json b/package.json index f86b9d328..bec6ddf2c 100644 --- a/package.json +++ b/package.json @@ -196,6 +196,7 @@ "jsonata": "1.8.7", "jsonwebtoken": "9.0.3", "lmdb": "3.5.4", + "weak-lru-cache": "^1.2.2", "lodash": "^4.17.23", "mathjs": "11.12.0", "micromatch": "^4.0.8", diff --git a/resources/CachingRocksDatabase.ts b/resources/CachingRocksDatabase.ts new file mode 100644 index 000000000..286eacfa8 --- /dev/null +++ b/resources/CachingRocksDatabase.ts @@ -0,0 +1,84 @@ +import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js'; +import { WeakLRUCache } from 'weak-lru-cache'; +import { when } from '../utility/when.ts'; + +const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG; + +/** + * RocksDatabase subclass that layers a per-instance WeakLRUCache on top of the + * verification table. Cache entries are decoded metadata objects (the shape returned by + * RecordEncoder.decode for RocksDB records: `{ version, value, [METADATA], ... }`). + * Freshness is verified with `entry.version` via the process-wide VerificationTable, + * so reads that hit a fresh VT slot never touch the disk. + */ +export class CachingRocksDatabase extends RocksDatabase { + #cache = new WeakLRUCache(); + + constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions) { + super(pathOrStore, { ...options, verificationTable: true }); + } + + getSync(id: any, options?: any): any { + if (options?.transaction) { + return super.getSync(id, options); + } + const cachedValue = this.#cache.getValue(id); + if (cachedValue !== undefined && cachedValue.version) { + const result = super.getSync(id, { ...options, expectedVersion: cachedValue.version }); + if (result === FRESH_VERSION_FLAG) return cachedValue; + if (result === undefined) { + this.#cache.delete(id); + return undefined; + } + this.#cache.setValue(id, result, result.size >> 10); + return result; + } + const result = super.getSync(id, options); + if (result !== undefined) { + this.#cache.setValue(id, result, result.size >> 10); + } + return result; + } + + get(id: any, options?: any): any { + if (options?.transaction) { + return super.get(id, options); + } + const cachedValue = this.#cache.getValue(id); + if (cachedValue !== undefined && cachedValue.version) { + return when(super.get(id, { ...options, expectedVersion: cachedValue.version }), (result) => { + if (result === FRESH_VERSION_FLAG) return cachedValue; + if (result === undefined) { + this.#cache.delete(id); + return undefined; + } + this.#cache.setValue(id, result, result.size >> 10); + return result; + }); + } + return when(super.get(id, options), (result) => { + if (result !== undefined) { + this.#cache.setValue(id, result, result.size >> 10); + } + return result; + }); + } + + putSync(id: any, value: any, options?: any): any { + this.#cache.delete(id); + return super.putSync(id, value, options); + } + + removeSync(id: any, options?: any): any { + this.#cache.delete(id); + return super.removeSync(id, options); + } + + open(): CachingRocksDatabase { + return super.open() as CachingRocksDatabase; + } + + static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): CachingRocksDatabase { + return new CachingRocksDatabase(pathOrStore, options).open(); + } +} diff --git a/resources/DatabaseTransaction.ts b/resources/DatabaseTransaction.ts index 424d38996..6a6565bfd 100644 --- a/resources/DatabaseTransaction.ts +++ b/resources/DatabaseTransaction.ts @@ -7,8 +7,8 @@ import * as envMngr from '../utility/environment/environmentManager.js'; import { CONFIG_PARAMS } from '../utility/hdbTerms.ts'; import { convertToMS } from '../utility/common_utils.js'; import { when } from '../utility/when.ts'; -import { setTimeout as delay } from 'node:timers/promises'; -import { Transaction as RocksTransaction, type Store as RocksStore } from '@harperfast/rocksdb-js'; +import { Transaction as RocksTransaction, type Store as RocksStore, constants } from '@harperfast/rocksdb-js'; +const RETRY_NOW_VALUE = constants.RETRY_NOW_VALUE; import type { RootDatabaseKind } from './databases.ts'; import type { Entry } from './RecordEncoder.ts'; @@ -94,7 +94,7 @@ export class DatabaseTransaction implements Transaction { } if (this.open !== TRANSACTION_STATE.OPEN) return; // can not start a new read transaction as there is no future commit that will take place, just have to allow the read to latest database state - this.transaction = new RocksTransaction(this.db.store); + this.transaction = new RocksTransaction(this.db.store, { coordinatedRetry: true }); if (this.timestamp) { this.transaction.setTimestamp(this.timestamp); @@ -262,7 +262,12 @@ export class DatabaseTransaction implements Transaction { } const completions = []; return commitResolution.then( - () => { + (commitResult) => { + if (commitResult === RETRY_NOW_VALUE) { + this.retries++; + harperLogger.debug?.('coordinated retry', transaction.id, this.retries); + return this.commit({ transaction }); + } transaction.onCommit?.(); if (this.next) { completions.push(this.next.commit(options)); @@ -298,22 +303,7 @@ export class DatabaseTransaction implements Transaction { }); }, (error) => { - if (error.code === 'ERR_BUSY') { - // if the transaction failed due to concurrent changes, we need to retry. First record this as an increased risk of contention/retry - // for future transactions - this.retries++; - harperLogger.debug?.('retrying', transaction.id, this.retries); - if (this.retries > 2) { - if (this.retries > MAX_RETRIES) { - throw new ServerError( - `After ${MAX_RETRIES} retries, unable to commit transaction, transaction is in conflict with ongoing writes` - ); - } - // start delaying, back off to try to space out transactions and avoid excessive conflicts - return delay(this.retries * this.retries).then(() => this.commit({ transaction })); - } - return this.commit({ transaction }); // try again - } else throw error; + throw error; } ); } diff --git a/resources/databases.ts b/resources/databases.ts index 4b5873532..71034e0a4 100644 --- a/resources/databases.ts +++ b/resources/databases.ts @@ -27,6 +27,7 @@ import { deleteRootBlobPathsForDB } from './blob.ts'; import { CUSTOM_INDEXES } from './indexes/customIndexes.ts'; import { OpenDBIObject } from '../utility/lmdb/OpenDBIObject.js'; import { RocksDatabase, type RocksDatabaseOptions } from '@harperfast/rocksdb-js'; +import { CachingRocksDatabase } from './CachingRocksDatabase.ts'; import { replayLogs } from './replayLogs.ts'; import { totalmem } from 'node:os'; import { RocksIndexStore } from './RocksIndexStore.ts'; @@ -122,7 +123,7 @@ function openRocksDatabase(path: string, options: RocksDatabaseOptions & { dupSo if (options.dupSort) { db = new RocksIndexStore(path, options).open() as RocksDatabaseEx; } else { - db = RocksDatabase.open(path, options) as RocksDatabaseEx; + db = new CachingRocksDatabase(path, options).open() as RocksDatabaseEx; // the RocksDB put and remove return promises, which masks thrown errors in non-awaiting calls to put/remove, // making them unsafe to replace LMDB methods, which will synchronously throw errors if there is a problem db.put = db.putSync; From 1876c0a1db43f5a02893ad1c25e3823ed9b2b2c0 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 25 Apr 2026 22:03:27 -0600 Subject: [PATCH 2/7] Add unit tests for CachingRocksDatabase Tests cover: - primaryStore is a CachingRocksDatabase (and RocksDatabase) instance - Basic read/write correctness - VT slot population after two reads enabling the fast-path - Third read returns value without DB access (VT fast-path) - Write clears VT slot so stale version cannot verify - Remove clears cache; subsequent read returns undefined - Transaction-scoped reads bypass cache and return correct value - Concurrent writes complete without error via coordinatedRetry - Write-then-read returns updated value, not stale cache Tests are skipped when HARPER_STORAGE_ENGINE=lmdb. Co-Authored-By: Claude Sonnet 4.6 --- .../resources/caching-rocks-database.test.js | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 unitTests/resources/caching-rocks-database.test.js diff --git a/unitTests/resources/caching-rocks-database.test.js b/unitTests/resources/caching-rocks-database.test.js new file mode 100644 index 000000000..a96ce2cc3 --- /dev/null +++ b/unitTests/resources/caching-rocks-database.test.js @@ -0,0 +1,126 @@ +require('../testUtils'); +const assert = require('assert'); +const { setupTestDBPath } = require('../testUtils'); +const { table } = require('#src/resources/databases'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); +const { RocksDatabase } = require('@harperfast/rocksdb-js'); +const { CachingRocksDatabase } = require('#src/resources/CachingRocksDatabase'); + +const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; + +describe('CachingRocksDatabase', function () { + let TestTable; + + before(async function () { + if (isLMDB) return this.skip(); + setupTestDBPath(); + setMainIsWorker(true); + TestTable = table({ + table: 'CachingRocksTest', + database: 'test', + attributes: [ + { name: 'id', isPrimaryKey: true }, + { name: 'name' }, + ], + }); + }); + + it('Primary store is a CachingRocksDatabase instance', function () { + assert(TestTable.primaryStore instanceof CachingRocksDatabase); + assert(TestTable.primaryStore instanceof RocksDatabase); + }); + + it('Basic read/write returns correct value', async function () { + await TestTable.put(1, { name: 'one' }); + const result = await TestTable.get(1); + assert.equal(result.name, 'one'); + }); + + it('Repeated reads return consistent value', async function () { + await TestTable.put(2, { name: 'two' }); + const first = await TestTable.get(2); + const second = await TestTable.get(2); + assert.equal(first.name, second.name); + }); + + it('VT slot is populated after two reads, enabling fast-path verification', async function () { + await TestTable.put(3, { name: 'three' }); + // First read: cache cold, populates WeakLRUCache, VT not yet populated + await TestTable.get(3); + // Second read: cache warm, passes expectedVersion → soft VT miss populates slot + const entry = await TestTable.primaryStore.getEntry(3); + assert(entry.version, 'entry should have a version after read'); + // VT slot should now hold this version + assert( + TestTable.primaryStore.verifyVersion(3, entry.version), + 'VT slot should be populated after two reads' + ); + }); + + it('Third read hits VT fast path (no DB access needed)', async function () { + await TestTable.put(10, { name: 'ten' }); + await TestTable.get(10); // 1st: populates WeakLRUCache + const entry = await TestTable.primaryStore.getEntry(10); // 2nd: populates VT + assert(entry.version); + // 3rd read: VT slot matches → FRESH returned without DB access; value is still correct + const result = await TestTable.get(10); + assert.equal(result.name, 'ten'); + assert(TestTable.primaryStore.verifyVersion(10, entry.version), 'VT should still hold the version'); + }); + + it('Write clears VT slot so stale cached version cannot be verified', async function () { + await TestTable.put(4, { name: 'four' }); + await TestTable.get(4); + const entry = await TestTable.primaryStore.getEntry(4); + const oldVersion = entry.version; + assert(TestTable.primaryStore.verifyVersion(4, oldVersion), 'VT should be populated before write'); + + // Write clears the VT slot via LockTracker registerIntent/releaseIntent + await TestTable.put(4, { name: 'four updated' }); + assert( + !TestTable.primaryStore.verifyVersion(4, oldVersion), + 'VT slot should be cleared after write' + ); + // New read should return the updated value + const updated = await TestTable.get(4); + assert.equal(updated.name, 'four updated'); + }); + + it('Remove clears cache entry and subsequent read returns undefined', async function () { + await TestTable.put(5, { name: 'five' }); + await TestTable.get(5); // populate cache + await TestTable.remove(5); + const result = await TestTable.get(5); + assert.equal(result, undefined); + }); + + it('Read with a transaction bypasses the cache', async function () { + await TestTable.put(6, { name: 'six' }); + const context = {}; + // transaction-scoped read should still return the correct value + const result = await TestTable.get(6, context); + assert.equal(result.name, 'six'); + }); + + it('Concurrent writes to the same key both complete with coordinatedRetry', async function () { + await TestTable.put(7, { name: 'seven' }); + // Fire two concurrent writes; coordinatedRetry means no ERR_BUSY thrown + const [, ] = await Promise.all([ + TestTable.put(7, { name: 'seven-a' }), + TestTable.put(7, { name: 'seven-b' }), + ]); + const result = await TestTable.get(7); + assert( + result.name === 'seven-a' || result.name === 'seven-b', + `Expected one of the concurrent writes to win, got: ${result.name}` + ); + }); + + it('Write followed by immediate read returns the new value, not stale cache', async function () { + await TestTable.put(8, { name: 'eight' }); + await TestTable.get(8); // populate cache with version T1 + await TestTable.put(8, { name: 'eight updated' }); // clears cache entry + const result = await TestTable.get(8); + assert.equal(result.name, 'eight updated'); + }); +}); From 7bda4886fc5646d3d1e70453a7573c6fb459d6b6 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 25 Apr 2026 22:19:59 -0600 Subject: [PATCH 3/7] Add benchmark: RocksDatabase vs CachingRocksDatabase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Benchmarks six scenarios with 2,000 records (~80-byte values each): 1. getSync cold — first read, cache miss (both go to DB) 2. getSync soft VT miss — 2nd read: cache warm, expectedVersion passed, DB read happens but slot is populated + FRESH returned 3. getSync VT fast-path — 3rd+ read: VT slot matches, no DB access 4. getSync hot key — repeated single-key reads 5. get (async) VT hit — async path with warm cache 6. putSync — write throughput (cache invalidation cost) Each scenario reports ops/sec for plain RocksDatabase and CachingRocksDatabase with a speedup ratio, printed as a formatted table. Adds `npm run bench` script. Also fixes `test:unit:resources` to exclude *.bench.js files so they don't run as part of the normal test suite. Co-Authored-By: Claude Sonnet 4.6 --- package.json | 3 +- .../resources/caching-rocks-database.bench.js | 193 ++++++++++++++++++ 2 files changed, 195 insertions(+), 1 deletion(-) create mode 100644 unitTests/resources/caching-rocks-database.bench.js diff --git a/package.json b/package.json index bec6ddf2c..79d6a8157 100644 --- a/package.json +++ b/package.json @@ -62,7 +62,8 @@ "test:unit:all": "npm run test:unit:main && npm run test:unit:apitests && npm run test:unit:resources && npm run test:unit:lmdb", "test:unit:lmdb": "HARPER_STORAGE_ENGINE=lmdb npm run test:unit:resources && HARPER_STORAGE_ENGINE=lmdb npm run test:unit:apitests", "test:unit:components": "mocha 'unitTests/components/**/*.js'", - "test:unit:resources": "mocha 'unitTests/resources/**/*.js'", + "test:unit:resources": "mocha 'unitTests/resources/**/*.js' --exclude 'unitTests/resources/**/*.bench.js'", + "bench": "mocha 'unitTests/resources/caching-rocks-database.bench.js'", "test:unit:bin": "mocha 'unitTests/bin/**/*.js'", "test:unit:apitests": "node ./dist/bin/harper.js stop && mocha 'unitTests/apiTests/**/*-test.mjs'", "test:unit:logging": "mocha 'unitTests/utility/logging/*.js'", diff --git a/unitTests/resources/caching-rocks-database.bench.js b/unitTests/resources/caching-rocks-database.bench.js new file mode 100644 index 000000000..a28810aae --- /dev/null +++ b/unitTests/resources/caching-rocks-database.bench.js @@ -0,0 +1,193 @@ +/** + * Benchmark: RocksDatabase (no VT/cache) vs CachingRocksDatabase (WeakLRUCache + VT) + * + * Run via: npm run bench (or directly with mocha --file unitTests/resources/caching-rocks-database.bench.js) + */ +require('../testUtils'); +const { setupTestDBPath } = require('../testUtils'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); +const { RocksDatabase } = require('@harperfast/rocksdb-js'); +const { CachingRocksDatabase } = require('#src/resources/CachingRocksDatabase'); +const { RecordEncoder, handleLocalTimeForGets } = require('#src/resources/RecordEncoder'); +const path = require('path'); +const { mkdirSync } = require('fs'); + +const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; +const RECORD_COUNT = 2_000; +const WARMUP_ROUNDS = 2; // reads before timing (to prime block cache) + +/** Format a number of ops/sec nicely */ +function opsPerSec(n, ms) { + return ((n / ms) * 1000).toFixed(0).padStart(10); +} + +/** Print a formatted results row */ +function row(label, plain, caching) { + const ratio = (plain / caching).toFixed(2); + console.log( + ` ${label.padEnd(38)} | ${opsPerSec(RECORD_COUNT, plain)} op/s` + + ` | ${opsPerSec(RECORD_COUNT, caching)} op/s` + + ` | ${ratio.padStart(5)}x` + ); +} + +function header() { + console.log(''); + console.log( + ` ${'Scenario'.padEnd(38)} | ${'RocksDatabase'.padStart(13)}` + + ` | ${'CachingRocksDB'.padStart(15)}` + + ` | Speedup` + ); + console.log(' ' + '-'.repeat(85)); +} + +describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { + this.timeout(120_000); + + let plainDb, cachingDb, dbBase; + const keys = Array.from({ length: RECORD_COUNT }, (_, i) => `record-${String(i).padStart(8, '0')}`); + const values = keys.map((k, i) => ({ id: i, name: k, payload: 'x'.repeat(80) })); + + before(function () { + if (isLMDB) return this.skip(); + setMainIsWorker(true); + dbBase = path.join(setupTestDBPath(), 'bench'); + mkdirSync(dbBase, { recursive: true }); + + const encoderOptions = { + encoder: { Encoder: RecordEncoder }, + disableWAL: true, + name: 'bench', + }; + + // Plain RocksDatabase: standard msgpack + RecordEncoder, no VT or cache + plainDb = RocksDatabase.open(path.join(dbBase, 'plain'), encoderOptions); + plainDb.put = plainDb.putSync; + plainDb.remove = plainDb.removeSync; + plainDb.encoder.name = 'bench'; + handleLocalTimeForGets(plainDb, plainDb); + + // CachingRocksDatabase: same encoder + VT + WeakLRUCache + cachingDb = new CachingRocksDatabase(path.join(dbBase, 'caching'), encoderOptions).open(); + cachingDb.put = cachingDb.putSync; + cachingDb.remove = cachingDb.removeSync; + cachingDb.encoder.name = 'bench'; + handleLocalTimeForGets(cachingDb, cachingDb); + + // Populate both stores with the same records + for (let i = 0; i < RECORD_COUNT; i++) { + plainDb.putSync(keys[i], values[i]); + cachingDb.putSync(keys[i], values[i]); + } + }); + + after(function () { + plainDb?.close?.(); + cachingDb?.close?.(); + }); + + it('prints benchmark results', function () { + header(); + + // ── 1. Cold read: first read of every key (neither cache nor VT populated) ── + { + // prime block cache with WARMUP_ROUNDS reads so disk I/O doesn't dominate + for (let r = 0; r < WARMUP_ROUNDS; r++) { + for (const k of keys) plainDb.getSync(k); + } + // flush CachingRocksDatabase's WeakLRUCache by writing each key + // (putSync deletes from cache), then read once to populate WeakLRUCache + for (const k of keys) cachingDb.putSync(k, values[keys.indexOf(k)]); + + const t0 = performance.now(); + for (const k of keys) plainDb.getSync(k); + const tPlain = performance.now() - t0; + + // cold caching read: WeakLRUCache was cleared by the putSync above + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tCold = performance.now() - t1; + + row('getSync — cold (cache miss)', tPlain, tCold); + } + + // ── 2. Soft VT miss: second read (cache warm, VT not yet populated) ── + { + const t0 = performance.now(); + for (const k of keys) plainDb.getSync(k); + const tPlain = performance.now() - t0; + + // second read: WeakLRUCache is warm, passes expectedVersion, soft VT miss + // → DB read happens but FRESH is returned and VT slot populated + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tSoftMiss = performance.now() - t1; + + row('getSync — soft VT miss (2nd read)', tPlain, tSoftMiss); + } + + // ── 3. VT fast-path: third+ read (VT populated, no DB access) ── + { + const t0 = performance.now(); + for (const k of keys) plainDb.getSync(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tVTHit = performance.now() - t1; + + row('getSync — VT fast-path (3rd+ read)', tPlain, tVTHit); + } + + // ── 4. Repeated single key (hot key) ── + { + const hotKey = keys[0]; + const N = RECORD_COUNT; + + const t0 = performance.now(); + for (let i = 0; i < N; i++) plainDb.getSync(hotKey); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (let i = 0; i < N; i++) cachingDb.getSync(hotKey); + const tHot = performance.now() - t1; + + row('getSync — hot single key', tPlain, tHot); + } + + // ── 5. Async get: VT fast-path ── + // (measuring the async path separately since it goes through a different code path) + { + // Ensure VT is populated (do a sync read first) + for (const k of keys) cachingDb.getSync(k); + + const t0 = performance.now(); + for (const k of keys) plainDb.get(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.get(k); + const tVTHit = performance.now() - t1; + + row('get (async) — VT fast-path (sync fallback)', tPlain, tVTHit); + } + + // ── 6. Write throughput (putSync) ── + { + const t0 = performance.now(); + for (let i = 0; i < RECORD_COUNT; i++) plainDb.putSync(keys[i], values[i]); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (let i = 0; i < RECORD_COUNT; i++) cachingDb.putSync(keys[i], values[i]); + const tCaching = performance.now() - t1; + + row('putSync', tPlain, tCaching); + } + + console.log(''); + console.log(` Records: ${RECORD_COUNT}, value size: ~80 bytes`); + console.log(` Speedup > 1x means CachingRocksDatabase is faster`); + console.log(''); + }); +}); From 63907bb2a3029ca929505c854b243b068d3f3e14 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 26 Apr 2026 06:41:21 -0600 Subject: [PATCH 4/7] Redesign CachingRocksDatabase as PrimaryRocksDatabase with proper VT integration Renames CachingRocksDatabase to PrimaryRocksDatabase and moves all RocksDB-specific entry handling (previously done by handleLocalTimeForGets instance patches) into real class methods. Caching now operates at the getEntry level using full Entry objects with `version`, using verifyVersion/populateVersion for VT-based freshness checks rather than expectedVersion. handleLocalTimeForGets delegates to initStore() for PrimaryRocksDatabase instances via the isPrimaryRocksDatabase marker. All 10 new tests and 1720 existing tests pass. Co-Authored-By: Claude Sonnet 4.6 --- resources/CachingRocksDatabase.ts | 84 --------- resources/PrimaryRocksDatabase.ts | 160 ++++++++++++++++++ resources/RecordEncoder.ts | 4 + resources/databases.ts | 4 +- .../resources/caching-rocks-database.bench.js | 87 ++++------ .../resources/caching-rocks-database.test.js | 17 +- 6 files changed, 209 insertions(+), 147 deletions(-) delete mode 100644 resources/CachingRocksDatabase.ts create mode 100644 resources/PrimaryRocksDatabase.ts diff --git a/resources/CachingRocksDatabase.ts b/resources/CachingRocksDatabase.ts deleted file mode 100644 index 286eacfa8..000000000 --- a/resources/CachingRocksDatabase.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js'; -import { WeakLRUCache } from 'weak-lru-cache'; -import { when } from '../utility/when.ts'; - -const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG; - -/** - * RocksDatabase subclass that layers a per-instance WeakLRUCache on top of the - * verification table. Cache entries are decoded metadata objects (the shape returned by - * RecordEncoder.decode for RocksDB records: `{ version, value, [METADATA], ... }`). - * Freshness is verified with `entry.version` via the process-wide VerificationTable, - * so reads that hit a fresh VT slot never touch the disk. - */ -export class CachingRocksDatabase extends RocksDatabase { - #cache = new WeakLRUCache(); - - constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions) { - super(pathOrStore, { ...options, verificationTable: true }); - } - - getSync(id: any, options?: any): any { - if (options?.transaction) { - return super.getSync(id, options); - } - const cachedValue = this.#cache.getValue(id); - if (cachedValue !== undefined && cachedValue.version) { - const result = super.getSync(id, { ...options, expectedVersion: cachedValue.version }); - if (result === FRESH_VERSION_FLAG) return cachedValue; - if (result === undefined) { - this.#cache.delete(id); - return undefined; - } - this.#cache.setValue(id, result, result.size >> 10); - return result; - } - const result = super.getSync(id, options); - if (result !== undefined) { - this.#cache.setValue(id, result, result.size >> 10); - } - return result; - } - - get(id: any, options?: any): any { - if (options?.transaction) { - return super.get(id, options); - } - const cachedValue = this.#cache.getValue(id); - if (cachedValue !== undefined && cachedValue.version) { - return when(super.get(id, { ...options, expectedVersion: cachedValue.version }), (result) => { - if (result === FRESH_VERSION_FLAG) return cachedValue; - if (result === undefined) { - this.#cache.delete(id); - return undefined; - } - this.#cache.setValue(id, result, result.size >> 10); - return result; - }); - } - return when(super.get(id, options), (result) => { - if (result !== undefined) { - this.#cache.setValue(id, result, result.size >> 10); - } - return result; - }); - } - - putSync(id: any, value: any, options?: any): any { - this.#cache.delete(id); - return super.putSync(id, value, options); - } - - removeSync(id: any, options?: any): any { - this.#cache.delete(id); - return super.removeSync(id, options); - } - - open(): CachingRocksDatabase { - return super.open() as CachingRocksDatabase; - } - - static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): CachingRocksDatabase { - return new CachingRocksDatabase(pathOrStore, options).open(); - } -} diff --git a/resources/PrimaryRocksDatabase.ts b/resources/PrimaryRocksDatabase.ts new file mode 100644 index 000000000..34f625e58 --- /dev/null +++ b/resources/PrimaryRocksDatabase.ts @@ -0,0 +1,160 @@ +import { RocksDatabase, type RocksDatabaseOptions, type Store } from '@harperfast/rocksdb-js'; +import { WeakLRUCache } from 'weak-lru-cache'; +import { when } from '../utility/when.ts'; +import { entryMap, METADATA, type Entry } from './RecordEncoder.ts'; + +/** + * RocksDatabase subclass that owns all primary-store behaviour for Harper tables: + * - RecordEncoder metadata extraction (version, metadataFlags, etc.) + * - Optional WeakLRUCache keyed on entry version, verified via the process-wide VerificationTable + * + * Replaces both the old CachingRocksDatabase and the RocksDB-specific patches applied by + * handleLocalTimeForGets. Call initStore(rootStore) after open() — or let + * handleLocalTimeForGets delegate to it automatically via the isPrimaryRocksDatabase marker. + * + * Caching is enabled by default; pass { cache: false } to disable (useful for benchmarking + * or stores where version tracking is not desired). + * + * Cache freshness pattern (using the rocksdb-js VT API): + * 1. verifyVersion(key, cached.version) → true → return cached entry (no disk I/O) + * 2. verifyVersion(key, cached.version) → false → read from DB, populateVersion, update cache + */ +export class PrimaryRocksDatabase extends RocksDatabase { + readonly isPrimaryRocksDatabase = true; + #cache?: WeakLRUCache; + readCount = 0; + cachePuts = false; + declare rootStore: any; + declare decoder: any; + + get #enc(): any { + return (this as any).encoder; + } + + constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions & { cache?: boolean }) { + const enableCache = (options as any)?.cache !== false; + super(pathOrStore, enableCache ? { ...options, verificationTable: true } : options); + if (enableCache) { + this.#cache = new WeakLRUCache(); + } + } + + /** + * Initialises encoder/decoder state. Must be called once after open() with the root + * RocksDatabase. Equivalent to the RocksDB branch of handleLocalTimeForGets, but as + * a real method rather than instance-level monkey-patching. + */ + initStore(rootStore: RocksDatabase) { + this.readCount = 0; + this.cachePuts = false; + this.rootStore = rootStore; + this.#enc.rootStore = rootStore; + this.#enc.isRocksDB = true; + this.decoder = this.#enc; + } + + #withEntry(entry: Entry, id: any): Entry { + if (entry.value) { + if (entry.value.constructor === Object && this.#enc.structPrototype) { + const originalValue = entry.value; + entry.value = new this.#enc.structPrototype.constructor(); + Object.assign(entry.value, originalValue); + } + if (typeof entry.value === 'object' && entry.value !== null) { + entryMap.set(entry.value, entry); + } + } + entry.key = id; + return entry; + } + + #processEntry(raw: any, id: any): Entry | undefined { + if (raw == null) return undefined; + if (raw[METADATA]) { + raw.metadataFlags = raw[METADATA]; + return this.#withEntry(raw, id); + } + return { value: raw, key: id } as Entry; + } + + /** + * Core read method. Returns a full Entry (with version, metadataFlags, value, …) or + * undefined. When caching is enabled and the VerificationTable slot is current, + * returns the cached Entry without touching disk. + */ + getEntry(id: any, options?: any): any { + this.readCount++; + const cache = options?.transaction ? null : this.#cache; + + if (cache) { + const cached = cache.get(id) as Entry | undefined; + if (cached !== undefined && cached.version != null && this.verifyVersion(id, cached.version)) { + return cached; + } + } + + const raw = options?.async ? super.get(id, options) : super.getSync(id, options); + return when(raw, (result) => { + const entry = this.#processEntry(result, id); + if (entry?.version != null && cache) { + this.populateVersion(id, entry.version); + cache.set(id, entry, (entry.size ?? 0) >> 10); + } + return entry; + }); + } + + getSync(id: any, options?: any): any { + const entry = this.getEntry(id, options) as Entry; + const value = entry?.value; + if (value != null && typeof value === 'object') entryMap.set(value, entry); + return value; + } + + get(id: any, options?: any): any { + return when(this.getEntry(id, { ...options, async: true }), (entry: Entry) => { + const value = entry?.value; + if (value != null && typeof value === 'object') entryMap.set(value, entry); + return value; + }); + } + + getRange(options?: any): any { + const iterable = super.getRange(options); + if (options?.valuesForKey) return iterable.map((v: any) => v?.value); + if (options?.values === false || options?.onlyCount) return iterable; + const hasRecordEncoder = !!this.#enc.isRocksDB; + return iterable.map((entry: any) => { + if (hasRecordEncoder) { + if (entry.value?.[METADATA]) { + entry.metadataFlags = entry.value[METADATA]; + Object.assign(entry, entry.value); + } + if (entry.value?.constructor === Object && this.#enc.structPrototype) { + const originalValue = entry.value; + entry.value = new this.#enc.structPrototype.constructor(); + for (const key in originalValue) entry.value[key] = originalValue[key]; + } + } + return entry; + }); + } + + putSync(id: any, value: any, options?: any): any { + this.#cache?.delete(id); + return super.putSync(id, value, options); + } + + removeSync(id: any, options?: any): any { + this.#cache?.delete(id); + return super.removeSync(id, options); + } + + open(): PrimaryRocksDatabase { + return super.open() as PrimaryRocksDatabase; + } + + static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): PrimaryRocksDatabase { + return new PrimaryRocksDatabase(pathOrStore, options).open(); + } +} diff --git a/resources/RecordEncoder.ts b/resources/RecordEncoder.ts index de8d0dee7..a773a74d4 100644 --- a/resources/RecordEncoder.ts +++ b/resources/RecordEncoder.ts @@ -335,6 +335,10 @@ function getTimestamp() { } export function handleLocalTimeForGets(store, rootStore) { + if ((store as any).isPrimaryRocksDatabase) { + (store as any).initStore(rootStore); + return store; + } const isRocksDB = store instanceof RocksDatabase; store.readCount = 0; store.cachePuts = false; diff --git a/resources/databases.ts b/resources/databases.ts index 71034e0a4..07fe84465 100644 --- a/resources/databases.ts +++ b/resources/databases.ts @@ -27,7 +27,7 @@ import { deleteRootBlobPathsForDB } from './blob.ts'; import { CUSTOM_INDEXES } from './indexes/customIndexes.ts'; import { OpenDBIObject } from '../utility/lmdb/OpenDBIObject.js'; import { RocksDatabase, type RocksDatabaseOptions } from '@harperfast/rocksdb-js'; -import { CachingRocksDatabase } from './CachingRocksDatabase.ts'; +import { PrimaryRocksDatabase } from './PrimaryRocksDatabase.ts'; import { replayLogs } from './replayLogs.ts'; import { totalmem } from 'node:os'; import { RocksIndexStore } from './RocksIndexStore.ts'; @@ -123,7 +123,7 @@ function openRocksDatabase(path: string, options: RocksDatabaseOptions & { dupSo if (options.dupSort) { db = new RocksIndexStore(path, options).open() as RocksDatabaseEx; } else { - db = new CachingRocksDatabase(path, options).open() as RocksDatabaseEx; + db = new PrimaryRocksDatabase(path, options).open() as unknown as RocksDatabaseEx; // the RocksDB put and remove return promises, which masks thrown errors in non-awaiting calls to put/remove, // making them unsafe to replace LMDB methods, which will synchronously throw errors if there is a problem db.put = db.putSync; diff --git a/unitTests/resources/caching-rocks-database.bench.js b/unitTests/resources/caching-rocks-database.bench.js index a28810aae..152b51305 100644 --- a/unitTests/resources/caching-rocks-database.bench.js +++ b/unitTests/resources/caching-rocks-database.bench.js @@ -1,27 +1,23 @@ /** - * Benchmark: RocksDatabase (no VT/cache) vs CachingRocksDatabase (WeakLRUCache + VT) + * Benchmark: PrimaryRocksDatabase without caching vs with caching (WeakLRUCache + VT) * - * Run via: npm run bench (or directly with mocha --file unitTests/resources/caching-rocks-database.bench.js) + * Run via: npm run bench */ require('../testUtils'); const { setupTestDBPath } = require('../testUtils'); const { setMainIsWorker } = require('#js/server/threads/manageThreads'); -const { RocksDatabase } = require('@harperfast/rocksdb-js'); -const { CachingRocksDatabase } = require('#src/resources/CachingRocksDatabase'); -const { RecordEncoder, handleLocalTimeForGets } = require('#src/resources/RecordEncoder'); +const { PrimaryRocksDatabase } = require('#src/resources/PrimaryRocksDatabase'); const path = require('path'); const { mkdirSync } = require('fs'); const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; const RECORD_COUNT = 2_000; -const WARMUP_ROUNDS = 2; // reads before timing (to prime block cache) +const WARMUP_ROUNDS = 2; -/** Format a number of ops/sec nicely */ function opsPerSec(n, ms) { return ((n / ms) * 1000).toFixed(0).padStart(10); } -/** Print a formatted results row */ function row(label, plain, caching) { const ratio = (plain / caching).toFixed(2); console.log( @@ -34,17 +30,17 @@ function row(label, plain, caching) { function header() { console.log(''); console.log( - ` ${'Scenario'.padEnd(38)} | ${'RocksDatabase'.padStart(13)}` + - ` | ${'CachingRocksDB'.padStart(15)}` + + ` ${'Scenario'.padEnd(38)} | ${'cache: false'.padStart(13)}` + + ` | ${'cache: true'.padStart(15)}` + ` | Speedup` ); console.log(' ' + '-'.repeat(85)); } -describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { +describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function () { this.timeout(120_000); - let plainDb, cachingDb, dbBase; + let noCacheDb, cachingDb, dbBase; const keys = Array.from({ length: RECORD_COUNT }, (_, i) => `record-${String(i).padStart(8, '0')}`); const values = keys.map((k, i) => ({ id: i, name: k, payload: 'x'.repeat(80) })); @@ -54,56 +50,47 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { dbBase = path.join(setupTestDBPath(), 'bench'); mkdirSync(dbBase, { recursive: true }); - const encoderOptions = { - encoder: { Encoder: RecordEncoder }, - disableWAL: true, - name: 'bench', - }; - - // Plain RocksDatabase: standard msgpack + RecordEncoder, no VT or cache - plainDb = RocksDatabase.open(path.join(dbBase, 'plain'), encoderOptions); - plainDb.put = plainDb.putSync; - plainDb.remove = plainDb.removeSync; - plainDb.encoder.name = 'bench'; - handleLocalTimeForGets(plainDb, plainDb); - - // CachingRocksDatabase: same encoder + VT + WeakLRUCache - cachingDb = new CachingRocksDatabase(path.join(dbBase, 'caching'), encoderOptions).open(); + const sharedOptions = { disableWAL: true, name: 'bench' }; + + // No cache: PrimaryRocksDatabase without WeakLRUCache or VT + noCacheDb = new PrimaryRocksDatabase(path.join(dbBase, 'nocache'), { ...sharedOptions, cache: false }).open(); + noCacheDb.put = noCacheDb.putSync; + noCacheDb.remove = noCacheDb.removeSync; + noCacheDb.initStore(noCacheDb); + + // With cache: PrimaryRocksDatabase with WeakLRUCache + VT (default) + cachingDb = new PrimaryRocksDatabase(path.join(dbBase, 'caching'), sharedOptions).open(); cachingDb.put = cachingDb.putSync; cachingDb.remove = cachingDb.removeSync; - cachingDb.encoder.name = 'bench'; - handleLocalTimeForGets(cachingDb, cachingDb); + cachingDb.initStore(cachingDb); // Populate both stores with the same records for (let i = 0; i < RECORD_COUNT; i++) { - plainDb.putSync(keys[i], values[i]); + noCacheDb.putSync(keys[i], values[i]); cachingDb.putSync(keys[i], values[i]); } }); after(function () { - plainDb?.close?.(); + noCacheDb?.close?.(); cachingDb?.close?.(); }); it('prints benchmark results', function () { header(); - // ── 1. Cold read: first read of every key (neither cache nor VT populated) ── + // ── 1. Cold read: first read of every key ── { - // prime block cache with WARMUP_ROUNDS reads so disk I/O doesn't dominate for (let r = 0; r < WARMUP_ROUNDS; r++) { - for (const k of keys) plainDb.getSync(k); + for (const k of keys) noCacheDb.getSync(k); } - // flush CachingRocksDatabase's WeakLRUCache by writing each key - // (putSync deletes from cache), then read once to populate WeakLRUCache - for (const k of keys) cachingDb.putSync(k, values[keys.indexOf(k)]); + // flush cachingDb's WeakLRUCache by writing each key, then time the cold cache read + for (let i = 0; i < RECORD_COUNT; i++) cachingDb.putSync(keys[i], values[i]); const t0 = performance.now(); - for (const k of keys) plainDb.getSync(k); + for (const k of keys) noCacheDb.getSync(k); const tPlain = performance.now() - t0; - // cold caching read: WeakLRUCache was cleared by the putSync above const t1 = performance.now(); for (const k of keys) cachingDb.getSync(k); const tCold = performance.now() - t1; @@ -111,14 +98,12 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { row('getSync — cold (cache miss)', tPlain, tCold); } - // ── 2. Soft VT miss: second read (cache warm, VT not yet populated) ── + // ── 2. Soft VT miss: second read (WeakLRUCache warm, VT not yet populated) ── { const t0 = performance.now(); - for (const k of keys) plainDb.getSync(k); + for (const k of keys) noCacheDb.getSync(k); const tPlain = performance.now() - t0; - // second read: WeakLRUCache is warm, passes expectedVersion, soft VT miss - // → DB read happens but FRESH is returned and VT slot populated const t1 = performance.now(); for (const k of keys) cachingDb.getSync(k); const tSoftMiss = performance.now() - t1; @@ -126,10 +111,10 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { row('getSync — soft VT miss (2nd read)', tPlain, tSoftMiss); } - // ── 3. VT fast-path: third+ read (VT populated, no DB access) ── + // ── 3. VT fast-path: third+ read ── { const t0 = performance.now(); - for (const k of keys) plainDb.getSync(k); + for (const k of keys) noCacheDb.getSync(k); const tPlain = performance.now() - t0; const t1 = performance.now(); @@ -145,7 +130,7 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { const N = RECORD_COUNT; const t0 = performance.now(); - for (let i = 0; i < N; i++) plainDb.getSync(hotKey); + for (let i = 0; i < N; i++) noCacheDb.getSync(hotKey); const tPlain = performance.now() - t0; const t1 = performance.now(); @@ -156,13 +141,11 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { } // ── 5. Async get: VT fast-path ── - // (measuring the async path separately since it goes through a different code path) { - // Ensure VT is populated (do a sync read first) - for (const k of keys) cachingDb.getSync(k); + for (const k of keys) cachingDb.getSync(k); // ensure VT is populated const t0 = performance.now(); - for (const k of keys) plainDb.get(k); + for (const k of keys) noCacheDb.get(k); const tPlain = performance.now() - t0; const t1 = performance.now(); @@ -175,7 +158,7 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { // ── 6. Write throughput (putSync) ── { const t0 = performance.now(); - for (let i = 0; i < RECORD_COUNT; i++) plainDb.putSync(keys[i], values[i]); + for (let i = 0; i < RECORD_COUNT; i++) noCacheDb.putSync(keys[i], values[i]); const tPlain = performance.now() - t0; const t1 = performance.now(); @@ -187,7 +170,7 @@ describe('Benchmark: RocksDatabase vs CachingRocksDatabase', function () { console.log(''); console.log(` Records: ${RECORD_COUNT}, value size: ~80 bytes`); - console.log(` Speedup > 1x means CachingRocksDatabase is faster`); + console.log(` Speedup > 1x means cache:true is faster`); console.log(''); }); }); diff --git a/unitTests/resources/caching-rocks-database.test.js b/unitTests/resources/caching-rocks-database.test.js index a96ce2cc3..2f34a7816 100644 --- a/unitTests/resources/caching-rocks-database.test.js +++ b/unitTests/resources/caching-rocks-database.test.js @@ -4,11 +4,11 @@ const { setupTestDBPath } = require('../testUtils'); const { table } = require('#src/resources/databases'); const { setMainIsWorker } = require('#js/server/threads/manageThreads'); const { RocksDatabase } = require('@harperfast/rocksdb-js'); -const { CachingRocksDatabase } = require('#src/resources/CachingRocksDatabase'); +const { PrimaryRocksDatabase } = require('#src/resources/PrimaryRocksDatabase'); const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; -describe('CachingRocksDatabase', function () { +describe('PrimaryRocksDatabase', function () { let TestTable; before(async function () { @@ -16,7 +16,7 @@ describe('CachingRocksDatabase', function () { setupTestDBPath(); setMainIsWorker(true); TestTable = table({ - table: 'CachingRocksTest', + table: 'PrimaryRocksTest', database: 'test', attributes: [ { name: 'id', isPrimaryKey: true }, @@ -25,8 +25,8 @@ describe('CachingRocksDatabase', function () { }); }); - it('Primary store is a CachingRocksDatabase instance', function () { - assert(TestTable.primaryStore instanceof CachingRocksDatabase); + it('Primary store is a PrimaryRocksDatabase instance (and a RocksDatabase)', function () { + assert(TestTable.primaryStore instanceof PrimaryRocksDatabase); assert(TestTable.primaryStore instanceof RocksDatabase); }); @@ -45,12 +45,11 @@ describe('CachingRocksDatabase', function () { it('VT slot is populated after two reads, enabling fast-path verification', async function () { await TestTable.put(3, { name: 'three' }); - // First read: cache cold, populates WeakLRUCache, VT not yet populated + // First read: cache cold, entry stored in WeakLRUCache, VT slot not yet populated await TestTable.get(3); - // Second read: cache warm, passes expectedVersion → soft VT miss populates slot + // Second read: cache warm, expectedVersion passed → soft VT miss populates slot const entry = await TestTable.primaryStore.getEntry(3); assert(entry.version, 'entry should have a version after read'); - // VT slot should now hold this version assert( TestTable.primaryStore.verifyVersion(3, entry.version), 'VT slot should be populated after two reads' @@ -89,7 +88,7 @@ describe('CachingRocksDatabase', function () { it('Remove clears cache entry and subsequent read returns undefined', async function () { await TestTable.put(5, { name: 'five' }); await TestTable.get(5); // populate cache - await TestTable.remove(5); + await TestTable.delete(5); const result = await TestTable.get(5); assert.equal(result, undefined); }); From e5e248be48ff8667eed4c0f7ad0820b449ecd8a8 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 26 Apr 2026 06:48:42 -0600 Subject: [PATCH 5/7] Collapse verifyVersion+populateVersion into single getSync(expectedVersion) call The rocksdb-js store.getSync/get natively accept expectedVersion: on a VT hit they return FRESH_VERSION_FLAG without touching disk; on a VT miss they read from the DB and auto-populate the VT slot. Replacing the two-step verifyVersion+populateVersion pattern with a single native call removes one round-trip per read on the warm cache path. Co-Authored-By: Claude Sonnet 4.6 --- resources/PrimaryRocksDatabase.ts | 36 ++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/resources/PrimaryRocksDatabase.ts b/resources/PrimaryRocksDatabase.ts index 34f625e58..56b91a12b 100644 --- a/resources/PrimaryRocksDatabase.ts +++ b/resources/PrimaryRocksDatabase.ts @@ -1,4 +1,6 @@ -import { RocksDatabase, type RocksDatabaseOptions, type Store } from '@harperfast/rocksdb-js'; +import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js'; + +const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG; import { WeakLRUCache } from 'weak-lru-cache'; import { when } from '../utility/when.ts'; import { entryMap, METADATA, type Entry } from './RecordEncoder.ts'; @@ -79,25 +81,35 @@ export class PrimaryRocksDatabase extends RocksDatabase { /** * Core read method. Returns a full Entry (with version, metadataFlags, value, …) or - * undefined. When caching is enabled and the VerificationTable slot is current, - * returns the cached Entry without touching disk. + * undefined. When caching is enabled, passes `expectedVersion` to the native layer so + * a single call handles both verification (returns FRESH_VERSION_FLAG on hit) and VT + * population (auto-seeded on DB read). Only cold reads (no cached version) need a + * separate populateVersion call. */ getEntry(id: any, options?: any): any { this.readCount++; const cache = options?.transaction ? null : this.#cache; + const cached = cache?.get(id) as Entry | undefined; + const expectedVersion = cached?.version; - if (cache) { - const cached = cache.get(id) as Entry | undefined; - if (cached !== undefined && cached.version != null && this.verifyVersion(id, cached.version)) { - return cached; - } - } + // Pass expectedVersion when we have a cached version: + // VT hit → native returns FRESH_VERSION_FLAG, no DB read + // VT miss → native reads DB and auto-populates VT slot + const getOptions = expectedVersion != null ? ({ expectedVersion } as any) : options; + const raw = options?.async ? super.get(id, getOptions) : super.getSync(id, getOptions); - const raw = options?.async ? super.get(id, options) : super.getSync(id, options); return when(raw, (result) => { + if (result === FRESH_VERSION_FLAG) return cached; const entry = this.#processEntry(result, id); - if (entry?.version != null && cache) { - this.populateVersion(id, entry.version); + if (entry == null) { + if (cache && cached !== undefined) cache.delete(id); + return undefined; + } + if (entry.version != null && cache) { + if (expectedVersion == null) { + // cold read: no expectedVersion passed, native doesn't auto-populate + this.populateVersion(id, entry.version); + } cache.set(id, entry, (entry.size ?? 0) >> 10); } return entry; From 26717cd4b2982cfd1cb7ca9d8df44b5418d8a609 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 26 Apr 2026 07:49:22 -0600 Subject: [PATCH 6/7] refactor: clean up handleLocalTimeForGets RocksDB branches and fix benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PrimaryRocksDatabase.getEntry now uses populateVersion option for cold reads (single native call, no separate populateVersion()) and merges options when passing expectedVersion so transaction snapshot is preserved - handleLocalTimeForGets: remove all isRocksDB branches — RocksDB stores now use the isPrimaryRocksDatabase early-return path exclusively; the remaining code is LMDB-only, so isRocksDB is always false there - Benchmark: use RecordEncoder + recordUpdater with Date.now()-range versions so values carry encoded version bytes and caching works correctly Co-Authored-By: Claude Sonnet 4.6 --- resources/PrimaryRocksDatabase.ts | 20 ++-- resources/RecordEncoder.ts | 107 +++++++----------- .../resources/caching-rocks-database.bench.js | 41 ++++--- .../resources/caching-rocks-database.test.js | 3 +- 4 files changed, 79 insertions(+), 92 deletions(-) diff --git a/resources/PrimaryRocksDatabase.ts b/resources/PrimaryRocksDatabase.ts index 56b91a12b..aca5b93b0 100644 --- a/resources/PrimaryRocksDatabase.ts +++ b/resources/PrimaryRocksDatabase.ts @@ -88,14 +88,24 @@ export class PrimaryRocksDatabase extends RocksDatabase { */ getEntry(id: any, options?: any): any { this.readCount++; - const cache = options?.transaction ? null : this.#cache; + const cache = this.#cache; const cached = cache?.get(id) as Entry | undefined; const expectedVersion = cached?.version; - // Pass expectedVersion when we have a cached version: + // Build get options, always merging with caller options to preserve + // transaction snapshot. Pass expectedVersion when cached: // VT hit → native returns FRESH_VERSION_FLAG, no DB read // VT miss → native reads DB and auto-populates VT slot - const getOptions = expectedVersion != null ? ({ expectedVersion } as any) : options; + // For cold reads (no cached version), use populateVersion flag so the + // native layer seeds the VT slot in the same call. + let getOptions: any; + if (expectedVersion != null) { + getOptions = options ? { ...options, expectedVersion } : { expectedVersion }; + } else if (cache) { + getOptions = options ? { ...options, populateVersion: true } : { populateVersion: true }; + } else { + getOptions = options; + } const raw = options?.async ? super.get(id, getOptions) : super.getSync(id, getOptions); return when(raw, (result) => { @@ -106,10 +116,6 @@ export class PrimaryRocksDatabase extends RocksDatabase { return undefined; } if (entry.version != null && cache) { - if (expectedVersion == null) { - // cold read: no expectedVersion passed, native doesn't auto-populate - this.populateVersion(id, entry.version); - } cache.set(id, entry, (entry.size ?? 0) >> 10); } return entry; diff --git a/resources/RecordEncoder.ts b/resources/RecordEncoder.ts index a773a74d4..e917cf2c8 100644 --- a/resources/RecordEncoder.ts +++ b/resources/RecordEncoder.ts @@ -339,52 +339,33 @@ export function handleLocalTimeForGets(store, rootStore) { (store as any).initStore(rootStore); return store; } - const isRocksDB = store instanceof RocksDatabase; store.readCount = 0; store.cachePuts = false; store.rootStore = rootStore; store.encoder.rootStore = rootStore; - store.encoder.isRocksDB = isRocksDB; store.decoder = store.encoder; const storeGetEntry = store.getEntry; - const storeGetSync = store.getSync; - const storeGet = store.get; store.getEntry = function (id, options) { store.readCount++; lastMetadata = null; - if (isRocksDB) { - return when( - options?.async ? storeGet.call(store, id, options) : storeGetSync.call(store, id, options), - (entry) => { - if (entry) { - if (entry[METADATA]) { - entry.metadataFlags = entry[METADATA]; - return withEntry(entry); - } else return { value: entry }; - } else return entry; - } - ); - } else { - let entry: Entry = storeGetEntry.call(this, id, options); - if (lastMetadata) { - entry.metadataFlags = lastMetadata[METADATA]; - entry.localTime = lastMetadata.localTime; - entry.residencyId = lastMetadata.residencyId; - entry.nodeId = lastMetadata.nodeId; - entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; - entry.size = lastMetadata.size; - if (lastMetadata.expiresAt >= 0) { - entry.expiresAt = lastMetadata.expiresAt; - } - if (isRocksDB) entry.version = lastMetadata.localTime; - if (entry.value) { - entryMap.set(entry.value, entry); // allow the record to access the entry - } - entry.key = id; + let entry: Entry = storeGetEntry.call(this, id, options); + if (lastMetadata) { + entry.metadataFlags = lastMetadata[METADATA]; + entry.localTime = lastMetadata.localTime; + entry.residencyId = lastMetadata.residencyId; + entry.nodeId = lastMetadata.nodeId; + entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; + entry.size = lastMetadata.size; + if (lastMetadata.expiresAt >= 0) { + entry.expiresAt = lastMetadata.expiresAt; + } + if (entry.value) { + entryMap.set(entry.value, entry); // allow the record to access the entry } - return entry && withEntry(entry); + entry.key = id; } + return entry && withEntry(entry); // if we have decoded with metadata, we want to pull it out and assign to this entry function withEntry(entry) { if (entry.value) { @@ -429,15 +410,9 @@ export function handleLocalTimeForGets(store, rootStore) { if (options.values === false || options.onlyCount) return iterable; return iterable.map((entry) => { // if we have metadata, move the metadata to the entry - if (isRocksDB) { - if (entry.value?.[METADATA]) { - entry.metadataFlags = entry.value[METADATA]; - Object.assign(entry, entry.value); - } - } else if (lastMetadata) { + if (lastMetadata) { entry.metadataFlags = lastMetadata[METADATA]; entry.localTime = lastMetadata.localTime; - if (isRocksDB) entry.version = lastMetadata.localTime; entry.residencyId = lastMetadata.residencyId; entry.nodeId = lastMetadata.nodeId; entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; @@ -457,34 +432,32 @@ export function handleLocalTimeForGets(store, rootStore) { }); }; - if (!isRocksDB) { - // add read transaction tracking - const txn = store.useReadTransaction(); - txn.done(); - if (!txn.done.isTracked) { - const Txn = txn.constructor; - const use = txn.use; - const done = txn.done; - Txn.prototype.use = function () { - if (!this.timerTracked) { - this.timerTracked = true; - trackedTxns.push(new WeakRef(this)); - } - use.call(this); - }; - Txn.prototype.done = function () { - done.call(this); - if (this.isDone) { - for (let i = 0; i < trackedTxns.length; i++) { - const txn = trackedTxns[i].deref(); - if (!txn || txn.isDone || txn.isCommitted) { - trackedTxns.splice(i--, 1); - } + // add read transaction tracking (LMDB only — RocksDB stores are PrimaryRocksDatabase) + const txn = store.useReadTransaction(); + txn.done(); + if (!txn.done.isTracked) { + const Txn = txn.constructor; + const use = txn.use; + const done = txn.done; + Txn.prototype.use = function () { + if (!this.timerTracked) { + this.timerTracked = true; + trackedTxns.push(new WeakRef(this)); + } + use.call(this); + }; + Txn.prototype.done = function () { + done.call(this); + if (this.isDone) { + for (let i = 0; i < trackedTxns.length; i++) { + const txn = trackedTxns[i].deref(); + if (!txn || txn.isDone || txn.isCommitted) { + trackedTxns.splice(i--, 1); } } - }; - Txn.prototype.done.isTracked = true; - } + } + }; + Txn.prototype.done.isTracked = true; } return store; diff --git a/unitTests/resources/caching-rocks-database.bench.js b/unitTests/resources/caching-rocks-database.bench.js index 152b51305..b41c12827 100644 --- a/unitTests/resources/caching-rocks-database.bench.js +++ b/unitTests/resources/caching-rocks-database.bench.js @@ -7,6 +7,7 @@ require('../testUtils'); const { setupTestDBPath } = require('../testUtils'); const { setMainIsWorker } = require('#js/server/threads/manageThreads'); const { PrimaryRocksDatabase } = require('#src/resources/PrimaryRocksDatabase'); +const { RecordEncoder, recordUpdater } = require('#src/resources/RecordEncoder'); const path = require('path'); const { mkdirSync } = require('fs'); @@ -40,7 +41,7 @@ function header() { describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function () { this.timeout(120_000); - let noCacheDb, cachingDb, dbBase; + let noCacheDb, cachingDb, noCacheUpdate, cachingUpdate, dbBase; const keys = Array.from({ length: RECORD_COUNT }, (_, i) => `record-${String(i).padStart(8, '0')}`); const values = keys.map((k, i) => ({ id: i, name: k, payload: 'x'.repeat(80) })); @@ -50,24 +51,30 @@ describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function ( dbBase = path.join(setupTestDBPath(), 'bench'); mkdirSync(dbBase, { recursive: true }); - const sharedOptions = { disableWAL: true, name: 'bench' }; + const primaryOptions = { + disableWAL: true, + name: 'bench', + encoder: { Encoder: RecordEncoder }, + }; // No cache: PrimaryRocksDatabase without WeakLRUCache or VT - noCacheDb = new PrimaryRocksDatabase(path.join(dbBase, 'nocache'), { ...sharedOptions, cache: false }).open(); - noCacheDb.put = noCacheDb.putSync; - noCacheDb.remove = noCacheDb.removeSync; + noCacheDb = new PrimaryRocksDatabase(path.join(dbBase, 'nocache'), { ...primaryOptions, cache: false }).open(); noCacheDb.initStore(noCacheDb); + noCacheUpdate = recordUpdater(noCacheDb, 1, null); // With cache: PrimaryRocksDatabase with WeakLRUCache + VT (default) - cachingDb = new PrimaryRocksDatabase(path.join(dbBase, 'caching'), sharedOptions).open(); - cachingDb.put = cachingDb.putSync; - cachingDb.remove = cachingDb.removeSync; + cachingDb = new PrimaryRocksDatabase(path.join(dbBase, 'caching'), primaryOptions).open(); cachingDb.initStore(cachingDb); + cachingUpdate = recordUpdater(cachingDb, 1, null); - // Populate both stores with the same records + // Populate both stores using recordUpdater so values carry encoded version bytes. + // Versions must be in the Date.now() range (~1e12) so RecordEncoder recognises + // the 0x42 first byte of the float64 and strips the version prefix on decode. + let version = Date.now(); for (let i = 0; i < RECORD_COUNT; i++) { - noCacheDb.putSync(keys[i], values[i]); - cachingDb.putSync(keys[i], values[i]); + noCacheUpdate(keys[i], values[i], null, version, 0, false); + cachingUpdate(keys[i], values[i], null, version, 0, false); + version++; } }); @@ -85,7 +92,8 @@ describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function ( for (const k of keys) noCacheDb.getSync(k); } // flush cachingDb's WeakLRUCache by writing each key, then time the cold cache read - for (let i = 0; i < RECORD_COUNT; i++) cachingDb.putSync(keys[i], values[i]); + let flushVersion = Date.now() + 100_000; + for (let i = 0; i < RECORD_COUNT; i++) cachingUpdate(keys[i], values[i], null, flushVersion++, 0, false); const t0 = performance.now(); for (const k of keys) noCacheDb.getSync(k); @@ -155,17 +163,18 @@ describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function ( row('get (async) — VT fast-path (sync fallback)', tPlain, tVTHit); } - // ── 6. Write throughput (putSync) ── + // ── 6. Write throughput (recordUpdater) ── { + let v = Date.now() + 200_000; const t0 = performance.now(); - for (let i = 0; i < RECORD_COUNT; i++) noCacheDb.putSync(keys[i], values[i]); + for (let i = 0; i < RECORD_COUNT; i++) noCacheUpdate(keys[i], values[i], null, v++, 0, false); const tPlain = performance.now() - t0; const t1 = performance.now(); - for (let i = 0; i < RECORD_COUNT; i++) cachingDb.putSync(keys[i], values[i]); + for (let i = 0; i < RECORD_COUNT; i++) cachingUpdate(keys[i], values[i], null, v++, 0, false); const tCaching = performance.now() - t1; - row('putSync', tPlain, tCaching); + row('recordUpdater put', tPlain, tCaching); } console.log(''); diff --git a/unitTests/resources/caching-rocks-database.test.js b/unitTests/resources/caching-rocks-database.test.js index 2f34a7816..e68c0c4ad 100644 --- a/unitTests/resources/caching-rocks-database.test.js +++ b/unitTests/resources/caching-rocks-database.test.js @@ -93,10 +93,9 @@ describe('PrimaryRocksDatabase', function () { assert.equal(result, undefined); }); - it('Read with a transaction bypasses the cache', async function () { + it('Read with a transaction context returns correct value', async function () { await TestTable.put(6, { name: 'six' }); const context = {}; - // transaction-scoped read should still return the correct value const result = await TestTable.get(6, context); assert.equal(result.name, 'six'); }); From 3e691632ac8a69160e066e31440970ca38642066 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 27 Apr 2026 05:50:25 -0600 Subject: [PATCH 7/7] Add concurrent PUT race condition test for HNSW index (issue #386) --- unitTests/resources/vectorIndex.test.js | 64 +++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/unitTests/resources/vectorIndex.test.js b/unitTests/resources/vectorIndex.test.js index 9f716be7a..41915eb78 100644 --- a/unitTests/resources/vectorIndex.test.js +++ b/unitTests/resources/vectorIndex.test.js @@ -1,6 +1,9 @@ const assert = require('node:assert'); +const { Worker } = require('worker_threads'); +const { setupTestDBPath } = require('../testUtils'); const { table } = require('#src/resources/databases'); const { HierarchicalNavigableSmallWorld } = require('#src/resources/indexes/HierarchicalNavigableSmallWorld'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); describe('HierarchicalNavigableSmallWorld indexing', () => { if (process.env.HARPER_STORAGE_ENGINE === 'lmdb') return; // don't try to test lmdb @@ -290,6 +293,67 @@ describe('HierarchicalNavigableSmallWorld indexing', () => { assert(invertedSimiliarities <= 6, `expected at most 6 distance inversions, got ${invertedSimiliarities}`); } }); +describe('HNSW concurrent PUT race condition (issue #386)', () => { + if (process.env.HARPER_STORAGE_ENGINE === 'lmdb') return; + const WORKER_COUNT = 4; + const PUTS_PER_WORKER = 25; + const DIMS = 768; + let ConcurrentTest; + let workers = []; + + before(() => { + setupTestDBPath(); + setMainIsWorker(true); + ConcurrentTest = table({ + table: 'HNSWConcurrentTest', + database: 'test', + attributes: [ + { name: 'id', isPrimaryKey: true }, + { name: 'embedding', indexed: { type: 'HNSW' }, type: 'Array' }, + ], + }); + for (let w = 0; w < WORKER_COUNT; w++) { + workers.push(new Worker(__dirname + '/vectorIndex-thread.js')); + } + }); + + it('handles concurrent multi-worker PUTs without race conditions', async () => { + const replies = await Promise.all( + workers.map( + (worker, w) => + new Promise((resolve) => { + worker.once('message', resolve); + worker.once('error', (err) => + resolve({ type: 'error', start: w * PUTS_PER_WORKER, message: err.message, stack: err.stack }) + ); + worker.postMessage({ + type: 'insert', + start: w * PUTS_PER_WORKER, + count: PUTS_PER_WORKER, + dims: DIMS, + }); + }) + ) + ); + const errors = replies.filter((r) => r.type === 'error'); + assert.deepEqual( + errors, + [], + `expected no worker errors, got: ${errors.map((e) => `[start=${e.start}] ${e.message} ${e.stack}`).join('; ')}` + ); + + const expected = WORKER_COUNT * PUTS_PER_WORKER; + let count = 0; + for await (const _ of ConcurrentTest.search([])) count++; + assert.equal(count, expected, `expected ${expected} records after concurrent puts, got ${count}`); + }); + + after(async () => { + await Promise.all(workers.map((w) => w.terminate())); + ConcurrentTest.dropTable(); + }); +}); + async function fromAsync(iterable) { let results = []; for await (let entry of iterable) {