diff --git a/package.json b/package.json index f86b9d328..79d6a8157 100644 --- a/package.json +++ b/package.json @@ -62,7 +62,8 @@ "test:unit:all": "npm run test:unit:main && npm run test:unit:apitests && npm run test:unit:resources && npm run test:unit:lmdb", "test:unit:lmdb": "HARPER_STORAGE_ENGINE=lmdb npm run test:unit:resources && HARPER_STORAGE_ENGINE=lmdb npm run test:unit:apitests", "test:unit:components": "mocha 'unitTests/components/**/*.js'", - "test:unit:resources": "mocha 'unitTests/resources/**/*.js'", + "test:unit:resources": "mocha 'unitTests/resources/**/*.js' --exclude 'unitTests/resources/**/*.bench.js'", + "bench": "mocha 'unitTests/resources/caching-rocks-database.bench.js'", "test:unit:bin": "mocha 'unitTests/bin/**/*.js'", "test:unit:apitests": "node ./dist/bin/harper.js stop && mocha 'unitTests/apiTests/**/*-test.mjs'", "test:unit:logging": "mocha 'unitTests/utility/logging/*.js'", @@ -196,6 +197,7 @@ "jsonata": "1.8.7", "jsonwebtoken": "9.0.3", "lmdb": "3.5.4", + "weak-lru-cache": "^1.2.2", "lodash": "^4.17.23", "mathjs": "11.12.0", "micromatch": "^4.0.8", diff --git a/resources/DatabaseTransaction.ts b/resources/DatabaseTransaction.ts index 424d38996..6a6565bfd 100644 --- a/resources/DatabaseTransaction.ts +++ b/resources/DatabaseTransaction.ts @@ -7,8 +7,8 @@ import * as envMngr from '../utility/environment/environmentManager.js'; import { CONFIG_PARAMS } from '../utility/hdbTerms.ts'; import { convertToMS } from '../utility/common_utils.js'; import { when } from '../utility/when.ts'; -import { setTimeout as delay } from 'node:timers/promises'; -import { Transaction as RocksTransaction, type Store as RocksStore } from '@harperfast/rocksdb-js'; +import { Transaction as RocksTransaction, type Store as RocksStore, constants } from '@harperfast/rocksdb-js'; +const RETRY_NOW_VALUE = constants.RETRY_NOW_VALUE; import type { RootDatabaseKind } from './databases.ts'; import type { Entry } from './RecordEncoder.ts'; @@ -94,7 +94,7 @@ export class DatabaseTransaction implements Transaction { } if (this.open !== TRANSACTION_STATE.OPEN) return; // can not start a new read transaction as there is no future commit that will take place, just have to allow the read to latest database state - this.transaction = new RocksTransaction(this.db.store); + this.transaction = new RocksTransaction(this.db.store, { coordinatedRetry: true }); if (this.timestamp) { this.transaction.setTimestamp(this.timestamp); @@ -262,7 +262,12 @@ export class DatabaseTransaction implements Transaction { } const completions = []; return commitResolution.then( - () => { + (commitResult) => { + if (commitResult === RETRY_NOW_VALUE) { + this.retries++; + harperLogger.debug?.('coordinated retry', transaction.id, this.retries); + return this.commit({ transaction }); + } transaction.onCommit?.(); if (this.next) { completions.push(this.next.commit(options)); @@ -298,22 +303,7 @@ export class DatabaseTransaction implements Transaction { }); }, (error) => { - if (error.code === 'ERR_BUSY') { - // if the transaction failed due to concurrent changes, we need to retry. First record this as an increased risk of contention/retry - // for future transactions - this.retries++; - harperLogger.debug?.('retrying', transaction.id, this.retries); - if (this.retries > 2) { - if (this.retries > MAX_RETRIES) { - throw new ServerError( - `After ${MAX_RETRIES} retries, unable to commit transaction, transaction is in conflict with ongoing writes` - ); - } - // start delaying, back off to try to space out transactions and avoid excessive conflicts - return delay(this.retries * this.retries).then(() => this.commit({ transaction })); - } - return this.commit({ transaction }); // try again - } else throw error; + throw error; } ); } diff --git a/resources/PrimaryRocksDatabase.ts b/resources/PrimaryRocksDatabase.ts new file mode 100644 index 000000000..aca5b93b0 --- /dev/null +++ b/resources/PrimaryRocksDatabase.ts @@ -0,0 +1,178 @@ +import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js'; + +const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG; +import { WeakLRUCache } from 'weak-lru-cache'; +import { when } from '../utility/when.ts'; +import { entryMap, METADATA, type Entry } from './RecordEncoder.ts'; + +/** + * RocksDatabase subclass that owns all primary-store behaviour for Harper tables: + * - RecordEncoder metadata extraction (version, metadataFlags, etc.) + * - Optional WeakLRUCache keyed on entry version, verified via the process-wide VerificationTable + * + * Replaces both the old CachingRocksDatabase and the RocksDB-specific patches applied by + * handleLocalTimeForGets. Call initStore(rootStore) after open() — or let + * handleLocalTimeForGets delegate to it automatically via the isPrimaryRocksDatabase marker. + * + * Caching is enabled by default; pass { cache: false } to disable (useful for benchmarking + * or stores where version tracking is not desired). + * + * Cache freshness pattern (using the rocksdb-js VT API): + * 1. verifyVersion(key, cached.version) → true → return cached entry (no disk I/O) + * 2. verifyVersion(key, cached.version) → false → read from DB, populateVersion, update cache + */ +export class PrimaryRocksDatabase extends RocksDatabase { + readonly isPrimaryRocksDatabase = true; + #cache?: WeakLRUCache; + readCount = 0; + cachePuts = false; + declare rootStore: any; + declare decoder: any; + + get #enc(): any { + return (this as any).encoder; + } + + constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions & { cache?: boolean }) { + const enableCache = (options as any)?.cache !== false; + super(pathOrStore, enableCache ? { ...options, verificationTable: true } : options); + if (enableCache) { + this.#cache = new WeakLRUCache(); + } + } + + /** + * Initialises encoder/decoder state. Must be called once after open() with the root + * RocksDatabase. Equivalent to the RocksDB branch of handleLocalTimeForGets, but as + * a real method rather than instance-level monkey-patching. + */ + initStore(rootStore: RocksDatabase) { + this.readCount = 0; + this.cachePuts = false; + this.rootStore = rootStore; + this.#enc.rootStore = rootStore; + this.#enc.isRocksDB = true; + this.decoder = this.#enc; + } + + #withEntry(entry: Entry, id: any): Entry { + if (entry.value) { + if (entry.value.constructor === Object && this.#enc.structPrototype) { + const originalValue = entry.value; + entry.value = new this.#enc.structPrototype.constructor(); + Object.assign(entry.value, originalValue); + } + if (typeof entry.value === 'object' && entry.value !== null) { + entryMap.set(entry.value, entry); + } + } + entry.key = id; + return entry; + } + + #processEntry(raw: any, id: any): Entry | undefined { + if (raw == null) return undefined; + if (raw[METADATA]) { + raw.metadataFlags = raw[METADATA]; + return this.#withEntry(raw, id); + } + return { value: raw, key: id } as Entry; + } + + /** + * Core read method. Returns a full Entry (with version, metadataFlags, value, …) or + * undefined. When caching is enabled, passes `expectedVersion` to the native layer so + * a single call handles both verification (returns FRESH_VERSION_FLAG on hit) and VT + * population (auto-seeded on DB read). Only cold reads (no cached version) need a + * separate populateVersion call. + */ + getEntry(id: any, options?: any): any { + this.readCount++; + const cache = this.#cache; + const cached = cache?.get(id) as Entry | undefined; + const expectedVersion = cached?.version; + + // Build get options, always merging with caller options to preserve + // transaction snapshot. Pass expectedVersion when cached: + // VT hit → native returns FRESH_VERSION_FLAG, no DB read + // VT miss → native reads DB and auto-populates VT slot + // For cold reads (no cached version), use populateVersion flag so the + // native layer seeds the VT slot in the same call. + let getOptions: any; + if (expectedVersion != null) { + getOptions = options ? { ...options, expectedVersion } : { expectedVersion }; + } else if (cache) { + getOptions = options ? { ...options, populateVersion: true } : { populateVersion: true }; + } else { + getOptions = options; + } + const raw = options?.async ? super.get(id, getOptions) : super.getSync(id, getOptions); + + return when(raw, (result) => { + if (result === FRESH_VERSION_FLAG) return cached; + const entry = this.#processEntry(result, id); + if (entry == null) { + if (cache && cached !== undefined) cache.delete(id); + return undefined; + } + if (entry.version != null && cache) { + cache.set(id, entry, (entry.size ?? 0) >> 10); + } + return entry; + }); + } + + getSync(id: any, options?: any): any { + const entry = this.getEntry(id, options) as Entry; + const value = entry?.value; + if (value != null && typeof value === 'object') entryMap.set(value, entry); + return value; + } + + get(id: any, options?: any): any { + return when(this.getEntry(id, { ...options, async: true }), (entry: Entry) => { + const value = entry?.value; + if (value != null && typeof value === 'object') entryMap.set(value, entry); + return value; + }); + } + + getRange(options?: any): any { + const iterable = super.getRange(options); + if (options?.valuesForKey) return iterable.map((v: any) => v?.value); + if (options?.values === false || options?.onlyCount) return iterable; + const hasRecordEncoder = !!this.#enc.isRocksDB; + return iterable.map((entry: any) => { + if (hasRecordEncoder) { + if (entry.value?.[METADATA]) { + entry.metadataFlags = entry.value[METADATA]; + Object.assign(entry, entry.value); + } + if (entry.value?.constructor === Object && this.#enc.structPrototype) { + const originalValue = entry.value; + entry.value = new this.#enc.structPrototype.constructor(); + for (const key in originalValue) entry.value[key] = originalValue[key]; + } + } + return entry; + }); + } + + putSync(id: any, value: any, options?: any): any { + this.#cache?.delete(id); + return super.putSync(id, value, options); + } + + removeSync(id: any, options?: any): any { + this.#cache?.delete(id); + return super.removeSync(id, options); + } + + open(): PrimaryRocksDatabase { + return super.open() as PrimaryRocksDatabase; + } + + static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): PrimaryRocksDatabase { + return new PrimaryRocksDatabase(pathOrStore, options).open(); + } +} diff --git a/resources/RecordEncoder.ts b/resources/RecordEncoder.ts index de8d0dee7..e917cf2c8 100644 --- a/resources/RecordEncoder.ts +++ b/resources/RecordEncoder.ts @@ -335,52 +335,37 @@ function getTimestamp() { } export function handleLocalTimeForGets(store, rootStore) { - const isRocksDB = store instanceof RocksDatabase; + if ((store as any).isPrimaryRocksDatabase) { + (store as any).initStore(rootStore); + return store; + } store.readCount = 0; store.cachePuts = false; store.rootStore = rootStore; store.encoder.rootStore = rootStore; - store.encoder.isRocksDB = isRocksDB; store.decoder = store.encoder; const storeGetEntry = store.getEntry; - const storeGetSync = store.getSync; - const storeGet = store.get; store.getEntry = function (id, options) { store.readCount++; lastMetadata = null; - if (isRocksDB) { - return when( - options?.async ? storeGet.call(store, id, options) : storeGetSync.call(store, id, options), - (entry) => { - if (entry) { - if (entry[METADATA]) { - entry.metadataFlags = entry[METADATA]; - return withEntry(entry); - } else return { value: entry }; - } else return entry; - } - ); - } else { - let entry: Entry = storeGetEntry.call(this, id, options); - if (lastMetadata) { - entry.metadataFlags = lastMetadata[METADATA]; - entry.localTime = lastMetadata.localTime; - entry.residencyId = lastMetadata.residencyId; - entry.nodeId = lastMetadata.nodeId; - entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; - entry.size = lastMetadata.size; - if (lastMetadata.expiresAt >= 0) { - entry.expiresAt = lastMetadata.expiresAt; - } - if (isRocksDB) entry.version = lastMetadata.localTime; - if (entry.value) { - entryMap.set(entry.value, entry); // allow the record to access the entry - } - entry.key = id; + let entry: Entry = storeGetEntry.call(this, id, options); + if (lastMetadata) { + entry.metadataFlags = lastMetadata[METADATA]; + entry.localTime = lastMetadata.localTime; + entry.residencyId = lastMetadata.residencyId; + entry.nodeId = lastMetadata.nodeId; + entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; + entry.size = lastMetadata.size; + if (lastMetadata.expiresAt >= 0) { + entry.expiresAt = lastMetadata.expiresAt; } - return entry && withEntry(entry); + if (entry.value) { + entryMap.set(entry.value, entry); // allow the record to access the entry + } + entry.key = id; } + return entry && withEntry(entry); // if we have decoded with metadata, we want to pull it out and assign to this entry function withEntry(entry) { if (entry.value) { @@ -425,15 +410,9 @@ export function handleLocalTimeForGets(store, rootStore) { if (options.values === false || options.onlyCount) return iterable; return iterable.map((entry) => { // if we have metadata, move the metadata to the entry - if (isRocksDB) { - if (entry.value?.[METADATA]) { - entry.metadataFlags = entry.value[METADATA]; - Object.assign(entry, entry.value); - } - } else if (lastMetadata) { + if (lastMetadata) { entry.metadataFlags = lastMetadata[METADATA]; entry.localTime = lastMetadata.localTime; - if (isRocksDB) entry.version = lastMetadata.localTime; entry.residencyId = lastMetadata.residencyId; entry.nodeId = lastMetadata.nodeId; entry.additionalAuditRefs = lastMetadata.additionalAuditRefs; @@ -453,34 +432,32 @@ export function handleLocalTimeForGets(store, rootStore) { }); }; - if (!isRocksDB) { - // add read transaction tracking - const txn = store.useReadTransaction(); - txn.done(); - if (!txn.done.isTracked) { - const Txn = txn.constructor; - const use = txn.use; - const done = txn.done; - Txn.prototype.use = function () { - if (!this.timerTracked) { - this.timerTracked = true; - trackedTxns.push(new WeakRef(this)); - } - use.call(this); - }; - Txn.prototype.done = function () { - done.call(this); - if (this.isDone) { - for (let i = 0; i < trackedTxns.length; i++) { - const txn = trackedTxns[i].deref(); - if (!txn || txn.isDone || txn.isCommitted) { - trackedTxns.splice(i--, 1); - } + // add read transaction tracking (LMDB only — RocksDB stores are PrimaryRocksDatabase) + const txn = store.useReadTransaction(); + txn.done(); + if (!txn.done.isTracked) { + const Txn = txn.constructor; + const use = txn.use; + const done = txn.done; + Txn.prototype.use = function () { + if (!this.timerTracked) { + this.timerTracked = true; + trackedTxns.push(new WeakRef(this)); + } + use.call(this); + }; + Txn.prototype.done = function () { + done.call(this); + if (this.isDone) { + for (let i = 0; i < trackedTxns.length; i++) { + const txn = trackedTxns[i].deref(); + if (!txn || txn.isDone || txn.isCommitted) { + trackedTxns.splice(i--, 1); } } - }; - Txn.prototype.done.isTracked = true; - } + } + }; + Txn.prototype.done.isTracked = true; } return store; diff --git a/resources/databases.ts b/resources/databases.ts index 4b5873532..07fe84465 100644 --- a/resources/databases.ts +++ b/resources/databases.ts @@ -27,6 +27,7 @@ import { deleteRootBlobPathsForDB } from './blob.ts'; import { CUSTOM_INDEXES } from './indexes/customIndexes.ts'; import { OpenDBIObject } from '../utility/lmdb/OpenDBIObject.js'; import { RocksDatabase, type RocksDatabaseOptions } from '@harperfast/rocksdb-js'; +import { PrimaryRocksDatabase } from './PrimaryRocksDatabase.ts'; import { replayLogs } from './replayLogs.ts'; import { totalmem } from 'node:os'; import { RocksIndexStore } from './RocksIndexStore.ts'; @@ -122,7 +123,7 @@ function openRocksDatabase(path: string, options: RocksDatabaseOptions & { dupSo if (options.dupSort) { db = new RocksIndexStore(path, options).open() as RocksDatabaseEx; } else { - db = RocksDatabase.open(path, options) as RocksDatabaseEx; + db = new PrimaryRocksDatabase(path, options).open() as unknown as RocksDatabaseEx; // the RocksDB put and remove return promises, which masks thrown errors in non-awaiting calls to put/remove, // making them unsafe to replace LMDB methods, which will synchronously throw errors if there is a problem db.put = db.putSync; diff --git a/unitTests/resources/caching-rocks-database.bench.js b/unitTests/resources/caching-rocks-database.bench.js new file mode 100644 index 000000000..b41c12827 --- /dev/null +++ b/unitTests/resources/caching-rocks-database.bench.js @@ -0,0 +1,185 @@ +/** + * Benchmark: PrimaryRocksDatabase without caching vs with caching (WeakLRUCache + VT) + * + * Run via: npm run bench + */ +require('../testUtils'); +const { setupTestDBPath } = require('../testUtils'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); +const { PrimaryRocksDatabase } = require('#src/resources/PrimaryRocksDatabase'); +const { RecordEncoder, recordUpdater } = require('#src/resources/RecordEncoder'); +const path = require('path'); +const { mkdirSync } = require('fs'); + +const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; +const RECORD_COUNT = 2_000; +const WARMUP_ROUNDS = 2; + +function opsPerSec(n, ms) { + return ((n / ms) * 1000).toFixed(0).padStart(10); +} + +function row(label, plain, caching) { + const ratio = (plain / caching).toFixed(2); + console.log( + ` ${label.padEnd(38)} | ${opsPerSec(RECORD_COUNT, plain)} op/s` + + ` | ${opsPerSec(RECORD_COUNT, caching)} op/s` + + ` | ${ratio.padStart(5)}x` + ); +} + +function header() { + console.log(''); + console.log( + ` ${'Scenario'.padEnd(38)} | ${'cache: false'.padStart(13)}` + + ` | ${'cache: true'.padStart(15)}` + + ` | Speedup` + ); + console.log(' ' + '-'.repeat(85)); +} + +describe('Benchmark: PrimaryRocksDatabase cache:false vs cache:true', function () { + this.timeout(120_000); + + let noCacheDb, cachingDb, noCacheUpdate, cachingUpdate, dbBase; + const keys = Array.from({ length: RECORD_COUNT }, (_, i) => `record-${String(i).padStart(8, '0')}`); + const values = keys.map((k, i) => ({ id: i, name: k, payload: 'x'.repeat(80) })); + + before(function () { + if (isLMDB) return this.skip(); + setMainIsWorker(true); + dbBase = path.join(setupTestDBPath(), 'bench'); + mkdirSync(dbBase, { recursive: true }); + + const primaryOptions = { + disableWAL: true, + name: 'bench', + encoder: { Encoder: RecordEncoder }, + }; + + // No cache: PrimaryRocksDatabase without WeakLRUCache or VT + noCacheDb = new PrimaryRocksDatabase(path.join(dbBase, 'nocache'), { ...primaryOptions, cache: false }).open(); + noCacheDb.initStore(noCacheDb); + noCacheUpdate = recordUpdater(noCacheDb, 1, null); + + // With cache: PrimaryRocksDatabase with WeakLRUCache + VT (default) + cachingDb = new PrimaryRocksDatabase(path.join(dbBase, 'caching'), primaryOptions).open(); + cachingDb.initStore(cachingDb); + cachingUpdate = recordUpdater(cachingDb, 1, null); + + // Populate both stores using recordUpdater so values carry encoded version bytes. + // Versions must be in the Date.now() range (~1e12) so RecordEncoder recognises + // the 0x42 first byte of the float64 and strips the version prefix on decode. + let version = Date.now(); + for (let i = 0; i < RECORD_COUNT; i++) { + noCacheUpdate(keys[i], values[i], null, version, 0, false); + cachingUpdate(keys[i], values[i], null, version, 0, false); + version++; + } + }); + + after(function () { + noCacheDb?.close?.(); + cachingDb?.close?.(); + }); + + it('prints benchmark results', function () { + header(); + + // ── 1. Cold read: first read of every key ── + { + for (let r = 0; r < WARMUP_ROUNDS; r++) { + for (const k of keys) noCacheDb.getSync(k); + } + // flush cachingDb's WeakLRUCache by writing each key, then time the cold cache read + let flushVersion = Date.now() + 100_000; + for (let i = 0; i < RECORD_COUNT; i++) cachingUpdate(keys[i], values[i], null, flushVersion++, 0, false); + + const t0 = performance.now(); + for (const k of keys) noCacheDb.getSync(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tCold = performance.now() - t1; + + row('getSync — cold (cache miss)', tPlain, tCold); + } + + // ── 2. Soft VT miss: second read (WeakLRUCache warm, VT not yet populated) ── + { + const t0 = performance.now(); + for (const k of keys) noCacheDb.getSync(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tSoftMiss = performance.now() - t1; + + row('getSync — soft VT miss (2nd read)', tPlain, tSoftMiss); + } + + // ── 3. VT fast-path: third+ read ── + { + const t0 = performance.now(); + for (const k of keys) noCacheDb.getSync(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.getSync(k); + const tVTHit = performance.now() - t1; + + row('getSync — VT fast-path (3rd+ read)', tPlain, tVTHit); + } + + // ── 4. Repeated single key (hot key) ── + { + const hotKey = keys[0]; + const N = RECORD_COUNT; + + const t0 = performance.now(); + for (let i = 0; i < N; i++) noCacheDb.getSync(hotKey); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (let i = 0; i < N; i++) cachingDb.getSync(hotKey); + const tHot = performance.now() - t1; + + row('getSync — hot single key', tPlain, tHot); + } + + // ── 5. Async get: VT fast-path ── + { + for (const k of keys) cachingDb.getSync(k); // ensure VT is populated + + const t0 = performance.now(); + for (const k of keys) noCacheDb.get(k); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (const k of keys) cachingDb.get(k); + const tVTHit = performance.now() - t1; + + row('get (async) — VT fast-path (sync fallback)', tPlain, tVTHit); + } + + // ── 6. Write throughput (recordUpdater) ── + { + let v = Date.now() + 200_000; + const t0 = performance.now(); + for (let i = 0; i < RECORD_COUNT; i++) noCacheUpdate(keys[i], values[i], null, v++, 0, false); + const tPlain = performance.now() - t0; + + const t1 = performance.now(); + for (let i = 0; i < RECORD_COUNT; i++) cachingUpdate(keys[i], values[i], null, v++, 0, false); + const tCaching = performance.now() - t1; + + row('recordUpdater put', tPlain, tCaching); + } + + console.log(''); + console.log(` Records: ${RECORD_COUNT}, value size: ~80 bytes`); + console.log(` Speedup > 1x means cache:true is faster`); + console.log(''); + }); +}); diff --git a/unitTests/resources/caching-rocks-database.test.js b/unitTests/resources/caching-rocks-database.test.js new file mode 100644 index 000000000..e68c0c4ad --- /dev/null +++ b/unitTests/resources/caching-rocks-database.test.js @@ -0,0 +1,124 @@ +require('../testUtils'); +const assert = require('assert'); +const { setupTestDBPath } = require('../testUtils'); +const { table } = require('#src/resources/databases'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); +const { RocksDatabase } = require('@harperfast/rocksdb-js'); +const { PrimaryRocksDatabase } = require('#src/resources/PrimaryRocksDatabase'); + +const isLMDB = process.env.HARPER_STORAGE_ENGINE === 'lmdb'; + +describe('PrimaryRocksDatabase', function () { + let TestTable; + + before(async function () { + if (isLMDB) return this.skip(); + setupTestDBPath(); + setMainIsWorker(true); + TestTable = table({ + table: 'PrimaryRocksTest', + database: 'test', + attributes: [ + { name: 'id', isPrimaryKey: true }, + { name: 'name' }, + ], + }); + }); + + it('Primary store is a PrimaryRocksDatabase instance (and a RocksDatabase)', function () { + assert(TestTable.primaryStore instanceof PrimaryRocksDatabase); + assert(TestTable.primaryStore instanceof RocksDatabase); + }); + + it('Basic read/write returns correct value', async function () { + await TestTable.put(1, { name: 'one' }); + const result = await TestTable.get(1); + assert.equal(result.name, 'one'); + }); + + it('Repeated reads return consistent value', async function () { + await TestTable.put(2, { name: 'two' }); + const first = await TestTable.get(2); + const second = await TestTable.get(2); + assert.equal(first.name, second.name); + }); + + it('VT slot is populated after two reads, enabling fast-path verification', async function () { + await TestTable.put(3, { name: 'three' }); + // First read: cache cold, entry stored in WeakLRUCache, VT slot not yet populated + await TestTable.get(3); + // Second read: cache warm, expectedVersion passed → soft VT miss populates slot + const entry = await TestTable.primaryStore.getEntry(3); + assert(entry.version, 'entry should have a version after read'); + assert( + TestTable.primaryStore.verifyVersion(3, entry.version), + 'VT slot should be populated after two reads' + ); + }); + + it('Third read hits VT fast path (no DB access needed)', async function () { + await TestTable.put(10, { name: 'ten' }); + await TestTable.get(10); // 1st: populates WeakLRUCache + const entry = await TestTable.primaryStore.getEntry(10); // 2nd: populates VT + assert(entry.version); + // 3rd read: VT slot matches → FRESH returned without DB access; value is still correct + const result = await TestTable.get(10); + assert.equal(result.name, 'ten'); + assert(TestTable.primaryStore.verifyVersion(10, entry.version), 'VT should still hold the version'); + }); + + it('Write clears VT slot so stale cached version cannot be verified', async function () { + await TestTable.put(4, { name: 'four' }); + await TestTable.get(4); + const entry = await TestTable.primaryStore.getEntry(4); + const oldVersion = entry.version; + assert(TestTable.primaryStore.verifyVersion(4, oldVersion), 'VT should be populated before write'); + + // Write clears the VT slot via LockTracker registerIntent/releaseIntent + await TestTable.put(4, { name: 'four updated' }); + assert( + !TestTable.primaryStore.verifyVersion(4, oldVersion), + 'VT slot should be cleared after write' + ); + // New read should return the updated value + const updated = await TestTable.get(4); + assert.equal(updated.name, 'four updated'); + }); + + it('Remove clears cache entry and subsequent read returns undefined', async function () { + await TestTable.put(5, { name: 'five' }); + await TestTable.get(5); // populate cache + await TestTable.delete(5); + const result = await TestTable.get(5); + assert.equal(result, undefined); + }); + + it('Read with a transaction context returns correct value', async function () { + await TestTable.put(6, { name: 'six' }); + const context = {}; + const result = await TestTable.get(6, context); + assert.equal(result.name, 'six'); + }); + + it('Concurrent writes to the same key both complete with coordinatedRetry', async function () { + await TestTable.put(7, { name: 'seven' }); + // Fire two concurrent writes; coordinatedRetry means no ERR_BUSY thrown + const [, ] = await Promise.all([ + TestTable.put(7, { name: 'seven-a' }), + TestTable.put(7, { name: 'seven-b' }), + ]); + const result = await TestTable.get(7); + assert( + result.name === 'seven-a' || result.name === 'seven-b', + `Expected one of the concurrent writes to win, got: ${result.name}` + ); + }); + + it('Write followed by immediate read returns the new value, not stale cache', async function () { + await TestTable.put(8, { name: 'eight' }); + await TestTable.get(8); // populate cache with version T1 + await TestTable.put(8, { name: 'eight updated' }); // clears cache entry + const result = await TestTable.get(8); + assert.equal(result.name, 'eight updated'); + }); +}); diff --git a/unitTests/resources/vectorIndex.test.js b/unitTests/resources/vectorIndex.test.js index 9f716be7a..41915eb78 100644 --- a/unitTests/resources/vectorIndex.test.js +++ b/unitTests/resources/vectorIndex.test.js @@ -1,6 +1,9 @@ const assert = require('node:assert'); +const { Worker } = require('worker_threads'); +const { setupTestDBPath } = require('../testUtils'); const { table } = require('#src/resources/databases'); const { HierarchicalNavigableSmallWorld } = require('#src/resources/indexes/HierarchicalNavigableSmallWorld'); +const { setMainIsWorker } = require('#js/server/threads/manageThreads'); describe('HierarchicalNavigableSmallWorld indexing', () => { if (process.env.HARPER_STORAGE_ENGINE === 'lmdb') return; // don't try to test lmdb @@ -290,6 +293,67 @@ describe('HierarchicalNavigableSmallWorld indexing', () => { assert(invertedSimiliarities <= 6, `expected at most 6 distance inversions, got ${invertedSimiliarities}`); } }); +describe('HNSW concurrent PUT race condition (issue #386)', () => { + if (process.env.HARPER_STORAGE_ENGINE === 'lmdb') return; + const WORKER_COUNT = 4; + const PUTS_PER_WORKER = 25; + const DIMS = 768; + let ConcurrentTest; + let workers = []; + + before(() => { + setupTestDBPath(); + setMainIsWorker(true); + ConcurrentTest = table({ + table: 'HNSWConcurrentTest', + database: 'test', + attributes: [ + { name: 'id', isPrimaryKey: true }, + { name: 'embedding', indexed: { type: 'HNSW' }, type: 'Array' }, + ], + }); + for (let w = 0; w < WORKER_COUNT; w++) { + workers.push(new Worker(__dirname + '/vectorIndex-thread.js')); + } + }); + + it('handles concurrent multi-worker PUTs without race conditions', async () => { + const replies = await Promise.all( + workers.map( + (worker, w) => + new Promise((resolve) => { + worker.once('message', resolve); + worker.once('error', (err) => + resolve({ type: 'error', start: w * PUTS_PER_WORKER, message: err.message, stack: err.stack }) + ); + worker.postMessage({ + type: 'insert', + start: w * PUTS_PER_WORKER, + count: PUTS_PER_WORKER, + dims: DIMS, + }); + }) + ) + ); + const errors = replies.filter((r) => r.type === 'error'); + assert.deepEqual( + errors, + [], + `expected no worker errors, got: ${errors.map((e) => `[start=${e.start}] ${e.message} ${e.stack}`).join('; ')}` + ); + + const expected = WORKER_COUNT * PUTS_PER_WORKER; + let count = 0; + for await (const _ of ConcurrentTest.search([])) count++; + assert.equal(count, expected, `expected ${expected} records after concurrent puts, got ${count}`); + }); + + after(async () => { + await Promise.all(workers.map((w) => w.terminate())); + ConcurrentTest.dropTable(); + }); +}); + async function fromAsync(iterable) { let results = []; for await (let entry of iterable) {