Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
"test:unit:all": "npm run test:unit:main && npm run test:unit:apitests && npm run test:unit:resources && npm run test:unit:lmdb",
"test:unit:lmdb": "HARPER_STORAGE_ENGINE=lmdb npm run test:unit:resources && HARPER_STORAGE_ENGINE=lmdb npm run test:unit:apitests",
"test:unit:components": "mocha 'unitTests/components/**/*.js'",
"test:unit:resources": "mocha 'unitTests/resources/**/*.js'",
"test:unit:resources": "mocha 'unitTests/resources/**/*.js' --exclude 'unitTests/resources/**/*.bench.js'",
"bench": "mocha 'unitTests/resources/caching-rocks-database.bench.js'",
"test:unit:bin": "mocha 'unitTests/bin/**/*.js'",
"test:unit:apitests": "node ./dist/bin/harper.js stop && mocha 'unitTests/apiTests/**/*-test.mjs'",
"test:unit:logging": "mocha 'unitTests/utility/logging/*.js'",
Expand Down Expand Up @@ -196,6 +197,7 @@
"jsonata": "1.8.7",
"jsonwebtoken": "9.0.3",
"lmdb": "3.5.4",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weak-lru-cache is not documented in dependencies.md.

Per this repo's own policy, every new runtime dependency requires an entry in dependencies.md answering the standard checklist (size, security track record, overlap, memory cost, removal plan, etc.). Please add the entry before merging.

"weak-lru-cache": "^1.2.2",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker — runtime dependency not documented in dependencies.md

weak-lru-cache is a new runtime dependency but has no entry in dependencies.md. Per repo policy, every runtime dependency addition must answer the questions listed there (size, security track record, overlap with existing packages, memory cost, removal plan, etc.) so the full engineering team can review the decision alongside the code.

Please add an entry for weak-lru-cache to dependencies.md before merging.

"lodash": "^4.17.23",
"mathjs": "11.12.0",
"micromatch": "^4.0.8",
Expand Down
30 changes: 10 additions & 20 deletions resources/DatabaseTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import { CONFIG_PARAMS } from '../utility/hdbTerms.ts';
import { convertToMS } from '../utility/common_utils.js';
import { when } from '../utility/when.ts';
import { setTimeout as delay } from 'node:timers/promises';
import { Transaction as RocksTransaction, type Store as RocksStore } from '@harperfast/rocksdb-js';
import { Transaction as RocksTransaction, type Store as RocksStore, constants } from '@harperfast/rocksdb-js';
const RETRY_NOW_VALUE = constants.RETRY_NOW_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit — MAX_RETRIES (line 23) is now dead code

The constant MAX_RETRIES = 40 is still defined but no longer used — the ERR_BUSY backoff guard that consumed it was removed when switching to coordinatedRetry. Consider removing it, or reinstate it as a cap inside the new RETRY_NOW_VALUE branch to preserve the JS-side safety net if the native layer ever loops unexpectedly.

import type { RootDatabaseKind } from './databases.ts';
import type { Entry } from './RecordEncoder.ts';

Expand All @@ -20,7 +20,7 @@
OPEN: 1, // the transaction is open and can be used for reads and writes
LINGERING: 2, // the transaction has completed a read, but can be used for immediate writes
};
const MAX_RETRIES = 40;

Check failure on line 23 in resources/DatabaseTransaction.ts

View workflow job for this annotation

GitHub Actions / runLinter

eslint(no-unused-vars)

Variable 'MAX_RETRIES' is declared but never used. Unused variables should start with a '_'.
let outstandingCommit, outstandingCommitStart;
let confirmReplication;
export function replicationConfirmation(callback) {
Expand Down Expand Up @@ -94,7 +94,7 @@
}
if (this.open !== TRANSACTION_STATE.OPEN) return; // can not start a new read transaction as there is no future commit that will take place, just have to allow the read to latest database state

this.transaction = new RocksTransaction(this.db.store);
this.transaction = new RocksTransaction(this.db.store, { coordinatedRetry: true });

if (this.timestamp) {
this.transaction.setTimestamp(this.timestamp);
Expand Down Expand Up @@ -262,7 +262,12 @@
}
const completions = [];
return commitResolution.then(
() => {
(commitResult) => {
if (commitResult === RETRY_NOW_VALUE) {
this.retries++;
harperLogger.debug?.('coordinated retry', transaction.id, this.retries);
return this.commit({ transaction });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No application-level retry bound: this.retries++ is incremented on each RETRY_NOW_VALUE but never checked against a limit. The old ERR_BUSY handler had MAX_RETRIES = 40 (now dead code at line 23) that threw ServerError after 40 attempts. If @harperfast/rocksdb-js guarantees it will eventually stop returning RETRY_NOW_VALUE under any contention scenario, please document that guarantee. Otherwise, reinstate an application-level cap so sustained conflicts don't produce unbounded recursion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No retry cap on the coordinatedRetry path.

MAX_RETRIES = 40 is still defined at the top of the file but is now unreachable dead code — nothing checks it. If RocksDB keeps returning RETRY_NOW_VALUE (sustained write contention, edge-case in the native layer, etc.) this recurses without bound until stack exhaustion.

Suggested change
return this.commit({ transaction });
if (commitResult === RETRY_NOW_VALUE) {
this.retries++;
if (this.retries > MAX_RETRIES) {
throw new ServerError(
`After ${MAX_RETRIES} retries, unable to commit transaction, transaction is in conflict with ongoing writes`
);
}
harperLogger.debug?.('coordinated retry', transaction.id, this.retries);
return this.commit({ transaction });
}

}
transaction.onCommit?.();
if (this.next) {
completions.push(this.next.commit(options));
Expand Down Expand Up @@ -298,22 +303,7 @@
});
},
(error) => {
if (error.code === 'ERR_BUSY') {
// if the transaction failed due to concurrent changes, we need to retry. First record this as an increased risk of contention/retry
// for future transactions
this.retries++;
harperLogger.debug?.('retrying', transaction.id, this.retries);
if (this.retries > 2) {
if (this.retries > MAX_RETRIES) {
throw new ServerError(
`After ${MAX_RETRIES} retries, unable to commit transaction, transaction is in conflict with ongoing writes`
);
}
// start delaying, back off to try to space out transactions and avoid excessive conflicts
return delay(this.retries * this.retries).then(() => this.commit({ transaction }));
}
return this.commit({ transaction }); // try again
} else throw error;
throw error;
}
);
}
Expand Down
178 changes: 178 additions & 0 deletions resources/PrimaryRocksDatabase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js';

const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG;
import { WeakLRUCache } from 'weak-lru-cache';
import { when } from '../utility/when.ts';
import { entryMap, METADATA, type Entry } from './RecordEncoder.ts';

/**
* RocksDatabase subclass that owns all primary-store behaviour for Harper tables:
* - RecordEncoder metadata extraction (version, metadataFlags, etc.)
* - Optional WeakLRUCache keyed on entry version, verified via the process-wide VerificationTable
*
* Replaces both the old CachingRocksDatabase and the RocksDB-specific patches applied by
* handleLocalTimeForGets. Call initStore(rootStore) after open() — or let
* handleLocalTimeForGets delegate to it automatically via the isPrimaryRocksDatabase marker.
*
* Caching is enabled by default; pass { cache: false } to disable (useful for benchmarking
* or stores where version tracking is not desired).
*
* Cache freshness pattern (using the rocksdb-js VT API):
* 1. verifyVersion(key, cached.version) → true → return cached entry (no disk I/O)
* 2. verifyVersion(key, cached.version) → false → read from DB, populateVersion, update cache
*/
export class PrimaryRocksDatabase extends RocksDatabase {
readonly isPrimaryRocksDatabase = true;
#cache?: WeakLRUCache;
readCount = 0;
cachePuts = false;
declare rootStore: any;
declare decoder: any;

get #enc(): any {
return (this as any).encoder;
}

constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions & { cache?: boolean }) {
const enableCache = (options as any)?.cache !== false;
super(pathOrStore, enableCache ? { ...options, verificationTable: true } : options);
if (enableCache) {
this.#cache = new WeakLRUCache();
}
}

/**
* Initialises encoder/decoder state. Must be called once after open() with the root
* RocksDatabase. Equivalent to the RocksDB branch of handleLocalTimeForGets, but as
* a real method rather than instance-level monkey-patching.
*/
initStore(rootStore: RocksDatabase) {
this.readCount = 0;
this.cachePuts = false;
this.rootStore = rootStore;
this.#enc.rootStore = rootStore;
this.#enc.isRocksDB = true;
this.decoder = this.#enc;
}

#withEntry(entry: Entry, id: any): Entry {
if (entry.value) {
if (entry.value.constructor === Object && this.#enc.structPrototype) {
const originalValue = entry.value;
entry.value = new this.#enc.structPrototype.constructor();
Object.assign(entry.value, originalValue);
}
if (typeof entry.value === 'object' && entry.value !== null) {
entryMap.set(entry.value, entry);
}
}
entry.key = id;
return entry;
}

#processEntry(raw: any, id: any): Entry | undefined {
if (raw == null) return undefined;
if (raw[METADATA]) {
raw.metadataFlags = raw[METADATA];
return this.#withEntry(raw, id);
}
return { value: raw, key: id } as Entry;
}

/**
* Core read method. Returns a full Entry (with version, metadataFlags, value, …) or
* undefined. When caching is enabled, passes `expectedVersion` to the native layer so
* a single call handles both verification (returns FRESH_VERSION_FLAG on hit) and VT
* population (auto-seeded on DB read). Only cold reads (no cached version) need a
* separate populateVersion call.
*/
getEntry(id: any, options?: any): any {
this.readCount++;
const cache = this.#cache;
const cached = cache?.get(id) as Entry | undefined;
const expectedVersion = cached?.version;

// Build get options, always merging with caller options to preserve
// transaction snapshot. Pass expectedVersion when cached:
// VT hit → native returns FRESH_VERSION_FLAG, no DB read
// VT miss → native reads DB and auto-populates VT slot
// For cold reads (no cached version), use populateVersion flag so the
// native layer seeds the VT slot in the same call.
let getOptions: any;
if (expectedVersion != null) {
getOptions = options ? { ...options, expectedVersion } : { expectedVersion };
} else if (cache) {
getOptions = options ? { ...options, populateVersion: true } : { populateVersion: true };
} else {
getOptions = options;
}
const raw = options?.async ? super.get(id, getOptions) : super.getSync(id, getOptions);

return when(raw, (result) => {
if (result === FRESH_VERSION_FLAG) return cached;
const entry = this.#processEntry(result, id);
if (entry == null) {
if (cache && cached !== undefined) cache.delete(id);
return undefined;
}
if (entry.version != null && cache) {
cache.set(id, entry, (entry.size ?? 0) >> 10);
}
return entry;
});
}

getSync(id: any, options?: any): any {
const entry = this.getEntry(id, options) as Entry;
const value = entry?.value;
if (value != null && typeof value === 'object') entryMap.set(value, entry);
return value;
}

get(id: any, options?: any): any {
return when(this.getEntry(id, { ...options, async: true }), (entry: Entry) => {
const value = entry?.value;
if (value != null && typeof value === 'object') entryMap.set(value, entry);
return value;
});
}

getRange(options?: any): any {
const iterable = super.getRange(options);
if (options?.valuesForKey) return iterable.map((v: any) => v?.value);
if (options?.values === false || options?.onlyCount) return iterable;
const hasRecordEncoder = !!this.#enc.isRocksDB;
return iterable.map((entry: any) => {
if (hasRecordEncoder) {
if (entry.value?.[METADATA]) {
entry.metadataFlags = entry.value[METADATA];
Object.assign(entry, entry.value);
}
if (entry.value?.constructor === Object && this.#enc.structPrototype) {
const originalValue = entry.value;
entry.value = new this.#enc.structPrototype.constructor();
for (const key in originalValue) entry.value[key] = originalValue[key];
}
}
return entry;
});
}

putSync(id: any, value: any, options?: any): any {
this.#cache?.delete(id);
return super.putSync(id, value, options);
}

removeSync(id: any, options?: any): any {
this.#cache?.delete(id);
return super.removeSync(id, options);
}

open(): PrimaryRocksDatabase {
return super.open() as PrimaryRocksDatabase;
}

static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): PrimaryRocksDatabase {
return new PrimaryRocksDatabase(pathOrStore, options).open();
}
}
Loading
Loading