-
Notifications
You must be signed in to change notification settings - Fork 5
Record caching #410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Record caching #410
Changes from all commits
66889d0
1876c0a
7bda488
63907bb
e5e248b
26717cd
3e69163
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,7 +62,8 @@ | |
| "test:unit:all": "npm run test:unit:main && npm run test:unit:apitests && npm run test:unit:resources && npm run test:unit:lmdb", | ||
| "test:unit:lmdb": "HARPER_STORAGE_ENGINE=lmdb npm run test:unit:resources && HARPER_STORAGE_ENGINE=lmdb npm run test:unit:apitests", | ||
| "test:unit:components": "mocha 'unitTests/components/**/*.js'", | ||
| "test:unit:resources": "mocha 'unitTests/resources/**/*.js'", | ||
| "test:unit:resources": "mocha 'unitTests/resources/**/*.js' --exclude 'unitTests/resources/**/*.bench.js'", | ||
| "bench": "mocha 'unitTests/resources/caching-rocks-database.bench.js'", | ||
| "test:unit:bin": "mocha 'unitTests/bin/**/*.js'", | ||
| "test:unit:apitests": "node ./dist/bin/harper.js stop && mocha 'unitTests/apiTests/**/*-test.mjs'", | ||
| "test:unit:logging": "mocha 'unitTests/utility/logging/*.js'", | ||
|
|
@@ -196,6 +197,7 @@ | |
| "jsonata": "1.8.7", | ||
| "jsonwebtoken": "9.0.3", | ||
| "lmdb": "3.5.4", | ||
| "weak-lru-cache": "^1.2.2", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocker — runtime dependency not documented in
Please add an entry for |
||
| "lodash": "^4.17.23", | ||
| "mathjs": "11.12.0", | ||
| "micromatch": "^4.0.8", | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,8 +7,8 @@ | |||||||||||||||||||||||
| import { CONFIG_PARAMS } from '../utility/hdbTerms.ts'; | ||||||||||||||||||||||||
| import { convertToMS } from '../utility/common_utils.js'; | ||||||||||||||||||||||||
| import { when } from '../utility/when.ts'; | ||||||||||||||||||||||||
| import { setTimeout as delay } from 'node:timers/promises'; | ||||||||||||||||||||||||
| import { Transaction as RocksTransaction, type Store as RocksStore } from '@harperfast/rocksdb-js'; | ||||||||||||||||||||||||
| import { Transaction as RocksTransaction, type Store as RocksStore, constants } from '@harperfast/rocksdb-js'; | ||||||||||||||||||||||||
| const RETRY_NOW_VALUE = constants.RETRY_NOW_VALUE; | ||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit — The constant |
||||||||||||||||||||||||
| import type { RootDatabaseKind } from './databases.ts'; | ||||||||||||||||||||||||
| import type { Entry } from './RecordEncoder.ts'; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
@@ -20,7 +20,7 @@ | |||||||||||||||||||||||
| OPEN: 1, // the transaction is open and can be used for reads and writes | ||||||||||||||||||||||||
| LINGERING: 2, // the transaction has completed a read, but can be used for immediate writes | ||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||
| const MAX_RETRIES = 40; | ||||||||||||||||||||||||
| let outstandingCommit, outstandingCommitStart; | ||||||||||||||||||||||||
| let confirmReplication; | ||||||||||||||||||||||||
| export function replicationConfirmation(callback) { | ||||||||||||||||||||||||
|
|
@@ -94,7 +94,7 @@ | |||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if (this.open !== TRANSACTION_STATE.OPEN) return; // can not start a new read transaction as there is no future commit that will take place, just have to allow the read to latest database state | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| this.transaction = new RocksTransaction(this.db.store); | ||||||||||||||||||||||||
| this.transaction = new RocksTransaction(this.db.store, { coordinatedRetry: true }); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if (this.timestamp) { | ||||||||||||||||||||||||
| this.transaction.setTimestamp(this.timestamp); | ||||||||||||||||||||||||
|
|
@@ -262,7 +262,12 @@ | |||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| const completions = []; | ||||||||||||||||||||||||
| return commitResolution.then( | ||||||||||||||||||||||||
| () => { | ||||||||||||||||||||||||
| (commitResult) => { | ||||||||||||||||||||||||
| if (commitResult === RETRY_NOW_VALUE) { | ||||||||||||||||||||||||
| this.retries++; | ||||||||||||||||||||||||
| harperLogger.debug?.('coordinated retry', transaction.id, this.retries); | ||||||||||||||||||||||||
| return this.commit({ transaction }); | ||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No application-level retry bound:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No retry cap on the
Suggested change
|
||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| transaction.onCommit?.(); | ||||||||||||||||||||||||
| if (this.next) { | ||||||||||||||||||||||||
| completions.push(this.next.commit(options)); | ||||||||||||||||||||||||
|
|
@@ -298,22 +303,7 @@ | |||||||||||||||||||||||
| }); | ||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||
| (error) => { | ||||||||||||||||||||||||
| if (error.code === 'ERR_BUSY') { | ||||||||||||||||||||||||
| // if the transaction failed due to concurrent changes, we need to retry. First record this as an increased risk of contention/retry | ||||||||||||||||||||||||
| // for future transactions | ||||||||||||||||||||||||
| this.retries++; | ||||||||||||||||||||||||
| harperLogger.debug?.('retrying', transaction.id, this.retries); | ||||||||||||||||||||||||
| if (this.retries > 2) { | ||||||||||||||||||||||||
| if (this.retries > MAX_RETRIES) { | ||||||||||||||||||||||||
| throw new ServerError( | ||||||||||||||||||||||||
| `After ${MAX_RETRIES} retries, unable to commit transaction, transaction is in conflict with ongoing writes` | ||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| // start delaying, back off to try to space out transactions and avoid excessive conflicts | ||||||||||||||||||||||||
| return delay(this.retries * this.retries).then(() => this.commit({ transaction })); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return this.commit({ transaction }); // try again | ||||||||||||||||||||||||
| } else throw error; | ||||||||||||||||||||||||
| throw error; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| import { RocksDatabase, type RocksDatabaseOptions, constants, type Store } from '@harperfast/rocksdb-js'; | ||
|
|
||
| const FRESH_VERSION_FLAG = constants.FRESH_VERSION_FLAG; | ||
| import { WeakLRUCache } from 'weak-lru-cache'; | ||
| import { when } from '../utility/when.ts'; | ||
| import { entryMap, METADATA, type Entry } from './RecordEncoder.ts'; | ||
|
|
||
| /** | ||
| * RocksDatabase subclass that owns all primary-store behaviour for Harper tables: | ||
| * - RecordEncoder metadata extraction (version, metadataFlags, etc.) | ||
| * - Optional WeakLRUCache keyed on entry version, verified via the process-wide VerificationTable | ||
| * | ||
| * Replaces both the old CachingRocksDatabase and the RocksDB-specific patches applied by | ||
| * handleLocalTimeForGets. Call initStore(rootStore) after open() — or let | ||
| * handleLocalTimeForGets delegate to it automatically via the isPrimaryRocksDatabase marker. | ||
| * | ||
| * Caching is enabled by default; pass { cache: false } to disable (useful for benchmarking | ||
| * or stores where version tracking is not desired). | ||
| * | ||
| * Cache freshness pattern (using the rocksdb-js VT API): | ||
| * 1. verifyVersion(key, cached.version) → true → return cached entry (no disk I/O) | ||
| * 2. verifyVersion(key, cached.version) → false → read from DB, populateVersion, update cache | ||
| */ | ||
| export class PrimaryRocksDatabase extends RocksDatabase { | ||
| readonly isPrimaryRocksDatabase = true; | ||
| #cache?: WeakLRUCache; | ||
| readCount = 0; | ||
| cachePuts = false; | ||
| declare rootStore: any; | ||
| declare decoder: any; | ||
|
|
||
| get #enc(): any { | ||
| return (this as any).encoder; | ||
| } | ||
|
|
||
| constructor(pathOrStore: string | Store, options?: RocksDatabaseOptions & { cache?: boolean }) { | ||
| const enableCache = (options as any)?.cache !== false; | ||
| super(pathOrStore, enableCache ? { ...options, verificationTable: true } : options); | ||
| if (enableCache) { | ||
| this.#cache = new WeakLRUCache(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Initialises encoder/decoder state. Must be called once after open() with the root | ||
| * RocksDatabase. Equivalent to the RocksDB branch of handleLocalTimeForGets, but as | ||
| * a real method rather than instance-level monkey-patching. | ||
| */ | ||
| initStore(rootStore: RocksDatabase) { | ||
| this.readCount = 0; | ||
| this.cachePuts = false; | ||
| this.rootStore = rootStore; | ||
| this.#enc.rootStore = rootStore; | ||
| this.#enc.isRocksDB = true; | ||
| this.decoder = this.#enc; | ||
| } | ||
|
|
||
| #withEntry(entry: Entry, id: any): Entry { | ||
| if (entry.value) { | ||
| if (entry.value.constructor === Object && this.#enc.structPrototype) { | ||
| const originalValue = entry.value; | ||
| entry.value = new this.#enc.structPrototype.constructor(); | ||
| Object.assign(entry.value, originalValue); | ||
| } | ||
| if (typeof entry.value === 'object' && entry.value !== null) { | ||
| entryMap.set(entry.value, entry); | ||
| } | ||
| } | ||
| entry.key = id; | ||
| return entry; | ||
| } | ||
|
|
||
| #processEntry(raw: any, id: any): Entry | undefined { | ||
| if (raw == null) return undefined; | ||
| if (raw[METADATA]) { | ||
| raw.metadataFlags = raw[METADATA]; | ||
| return this.#withEntry(raw, id); | ||
| } | ||
| return { value: raw, key: id } as Entry; | ||
| } | ||
|
|
||
| /** | ||
| * Core read method. Returns a full Entry (with version, metadataFlags, value, …) or | ||
| * undefined. When caching is enabled, passes `expectedVersion` to the native layer so | ||
| * a single call handles both verification (returns FRESH_VERSION_FLAG on hit) and VT | ||
| * population (auto-seeded on DB read). Only cold reads (no cached version) need a | ||
| * separate populateVersion call. | ||
| */ | ||
| getEntry(id: any, options?: any): any { | ||
| this.readCount++; | ||
| const cache = this.#cache; | ||
| const cached = cache?.get(id) as Entry | undefined; | ||
| const expectedVersion = cached?.version; | ||
|
|
||
| // Build get options, always merging with caller options to preserve | ||
| // transaction snapshot. Pass expectedVersion when cached: | ||
| // VT hit → native returns FRESH_VERSION_FLAG, no DB read | ||
| // VT miss → native reads DB and auto-populates VT slot | ||
| // For cold reads (no cached version), use populateVersion flag so the | ||
| // native layer seeds the VT slot in the same call. | ||
| let getOptions: any; | ||
| if (expectedVersion != null) { | ||
| getOptions = options ? { ...options, expectedVersion } : { expectedVersion }; | ||
| } else if (cache) { | ||
| getOptions = options ? { ...options, populateVersion: true } : { populateVersion: true }; | ||
| } else { | ||
| getOptions = options; | ||
| } | ||
| const raw = options?.async ? super.get(id, getOptions) : super.getSync(id, getOptions); | ||
|
|
||
| return when(raw, (result) => { | ||
| if (result === FRESH_VERSION_FLAG) return cached; | ||
| const entry = this.#processEntry(result, id); | ||
| if (entry == null) { | ||
| if (cache && cached !== undefined) cache.delete(id); | ||
| return undefined; | ||
| } | ||
| if (entry.version != null && cache) { | ||
| cache.set(id, entry, (entry.size ?? 0) >> 10); | ||
| } | ||
| return entry; | ||
| }); | ||
| } | ||
|
|
||
| getSync(id: any, options?: any): any { | ||
| const entry = this.getEntry(id, options) as Entry; | ||
| const value = entry?.value; | ||
| if (value != null && typeof value === 'object') entryMap.set(value, entry); | ||
| return value; | ||
| } | ||
|
|
||
| get(id: any, options?: any): any { | ||
| return when(this.getEntry(id, { ...options, async: true }), (entry: Entry) => { | ||
| const value = entry?.value; | ||
| if (value != null && typeof value === 'object') entryMap.set(value, entry); | ||
| return value; | ||
| }); | ||
| } | ||
|
|
||
| getRange(options?: any): any { | ||
| const iterable = super.getRange(options); | ||
| if (options?.valuesForKey) return iterable.map((v: any) => v?.value); | ||
| if (options?.values === false || options?.onlyCount) return iterable; | ||
| const hasRecordEncoder = !!this.#enc.isRocksDB; | ||
| return iterable.map((entry: any) => { | ||
| if (hasRecordEncoder) { | ||
| if (entry.value?.[METADATA]) { | ||
| entry.metadataFlags = entry.value[METADATA]; | ||
| Object.assign(entry, entry.value); | ||
| } | ||
| if (entry.value?.constructor === Object && this.#enc.structPrototype) { | ||
| const originalValue = entry.value; | ||
| entry.value = new this.#enc.structPrototype.constructor(); | ||
| for (const key in originalValue) entry.value[key] = originalValue[key]; | ||
| } | ||
| } | ||
| return entry; | ||
| }); | ||
| } | ||
|
|
||
| putSync(id: any, value: any, options?: any): any { | ||
| this.#cache?.delete(id); | ||
| return super.putSync(id, value, options); | ||
| } | ||
|
|
||
| removeSync(id: any, options?: any): any { | ||
| this.#cache?.delete(id); | ||
| return super.removeSync(id, options); | ||
| } | ||
|
|
||
| open(): PrimaryRocksDatabase { | ||
| return super.open() as PrimaryRocksDatabase; | ||
| } | ||
|
|
||
| static open(pathOrStore: string | Store, options?: RocksDatabaseOptions): PrimaryRocksDatabase { | ||
| return new PrimaryRocksDatabase(pathOrStore, options).open(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weak-lru-cacheis not documented independencies.md.Per this repo's own policy, every new runtime dependency requires an entry in
dependencies.mdanswering the standard checklist (size, security track record, overlap, memory cost, removal plan, etc.). Please add the entry before merging.