Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/optimize-hot-paths.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/db-sqlite-persistence-core': patch
'@tanstack/offline-transactions': patch
'@tanstack/react-db': patch
---

Optimize hot paths: Schwartzian transform for stable serialization sorting, splice-based queue flushing, Map-based transaction lookups, Set-based filtering, and lazy property access in useLiveQuery so status-only consumers skip full entry materialization.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"prepare": "husky",
"test": "pnpm --filter \"./packages/**\" test",
"test:docs": "node scripts/verify-links.ts",
"bench:perf": "tsx scripts/perf-bench.ts",
"test:sherif": "sherif -i zod -p offline-transactions-react-native -p shopping-list-react-native",
"generate-docs": "node scripts/generate-docs.ts"
},
Expand Down
47 changes: 23 additions & 24 deletions packages/db-sqlite-persistence-core/src/persisted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,15 +649,15 @@ function toStableSerializable(value: unknown): unknown {
return Array.from(value)
.map((entry) => toStableSerializable(entry))
.filter((entry) => entry !== undefined)
.sort((left, right) => {
const leftSerialized = JSON.stringify(left)
const rightSerialized = JSON.stringify(right)
return leftSerialized < rightSerialized
.map((entry) => ({ entry, serialized: JSON.stringify(entry) }))
.sort((left, right) =>
left.serialized < right.serialized
? -1
: leftSerialized > rightSerialized
: left.serialized > right.serialized
? 1
: 0
})
: 0,
)
.map(({ entry }) => entry)
}

if (value instanceof Map) {
Expand All @@ -667,15 +667,18 @@ function toStableSerializable(value: unknown): unknown {
value: toStableSerializable(mapValue),
}))
.filter((entry) => entry.key !== undefined && entry.value !== undefined)
.sort((left, right) => {
const leftSerialized = JSON.stringify(left.key)
const rightSerialized = JSON.stringify(right.key)
return leftSerialized < rightSerialized
.map((entry) => ({
entry,
serializedKey: JSON.stringify(entry.key),
}))
.sort((left, right) =>
left.serializedKey < right.serializedKey
? -1
: leftSerialized > rightSerialized
: left.serializedKey > right.serializedKey
? 1
: 0
})
: 0,
)
.map(({ entry }) => entry)
}

const record = value as Record<string, unknown>
Expand Down Expand Up @@ -1298,11 +1301,9 @@ class PersistedCollectionRuntime<
}

private async flushQueuedHydrationTransactionsUnsafe(): Promise<void> {
while (this.queuedHydrationTransactions.length > 0) {
const transaction = this.queuedHydrationTransactions.shift()
if (!transaction) {
continue
}
const queuedTransactions = this.queuedHydrationTransactions.splice(0)

for (const transaction of queuedTransactions) {
await this.applyBufferedSyncTransactionUnsafe(transaction)
}
}
Expand Down Expand Up @@ -1867,11 +1868,9 @@ class PersistedCollectionRuntime<
}

private async flushQueuedTxCommittedUnsafe(): Promise<void> {
while (this.queuedTxCommitted.length > 0) {
const queued = this.queuedTxCommitted.shift()
if (!queued) {
continue
}
const queuedTransactions = this.queuedTxCommitted.splice(0)

for (const queued of queuedTransactions) {
await this.processCommittedTxUnsafe(queued)
}
}
Expand Down
126 changes: 126 additions & 0 deletions packages/db-sqlite-persistence-core/tests/persisted.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,78 @@ describe(`persistedCollectionOptions`, () => {
})
})

it(`processes tx:committed messages queued during a reload flush`, async () => {
const adapter = createRecordingAdapter([{ id: `1`, title: `Initial` }])
const coordinator = createCoordinatorHarness()

const collection = createCollection(
persistedCollectionOptions<Todo, string>({
id: `sync-present`,
getKey: (item) => item.id,
sync: {
sync: ({ markReady }) => {
markReady()
},
},
persistence: {
adapter,
coordinator,
},
}),
)

await collection.preload()
await flushAsyncWork()

const originalLoadSubset = adapter.loadSubset
let emittedDuringReload = false
adapter.loadSubset = async (collectionId, options, ctx) => {
const rows = await originalLoadSubset(collectionId, options, ctx)

if (!emittedDuringReload) {
emittedDuringReload = true
coordinator.emit({
type: `tx:committed`,
term: 1,
seq: 2,
txId: `tx-during-reload`,
latestRowVersion: 2,
requiresFullReload: false,
changedRows: [
{ key: `2`, value: { id: `2`, title: `Queued during reload` } },
],
deletedKeys: [],
})
}

return rows
}

adapter.rows.set(`2`, {
id: `2`,
title: `Queued during reload`,
})

coordinator.emit({
type: `tx:committed`,
term: 1,
seq: 1,
txId: `tx-reload`,
latestRowVersion: 1,
requiresFullReload: true,
})

await flushAsyncWork()
await flushAsyncWork()
await flushAsyncWork()

expect(emittedDuringReload).toBe(true)
expect(stripVirtualProps(collection.get(`2`))).toEqual({
id: `2`,
title: `Queued during reload`,
})
})

it(`removes deleted rows after tx:committed invalidation reload`, async () => {
const adapter = createRecordingAdapter([
{ id: `1`, title: `Keep` },
Expand Down Expand Up @@ -1513,6 +1585,60 @@ describe(`persistedCollectionOptions`, () => {
expect(collection.get(`2`)).toBeUndefined()
})

it(`dedupes active subsets with stable Map and Set option serialization`, async () => {
const adapter = createRecordingAdapter([{ id: `1`, title: `Row 1` }])
const coordinator = createCoordinatorHarness()

const collection = createCollection(
persistedCollectionOptions<Todo, string>({
id: `sync-present`,
syncMode: `on-demand`,
getKey: (item) => item.id,
sync: {
sync: ({ markReady }) => {
markReady()
},
},
persistence: {
adapter,
coordinator,
},
}),
)

collection.startSyncImmediate()
await flushAsyncWork()

const firstCursor = new Map<unknown, unknown>([
[`set`, new Set([`b`, `a`])],
[`nested`, new Map([[`z`, 1]])],
])
const secondCursor = new Map<unknown, unknown>([
[`nested`, new Map([[`z`, 1]])],
[`set`, new Set([`a`, `b`])],
])

await (collection as any)._sync.loadSubset({ cursor: firstCursor })
await (collection as any)._sync.loadSubset({ cursor: secondCursor })
await flushAsyncWork()

const loadSubsetCallsBeforeReload = adapter.loadSubsetCalls.length

coordinator.emit({
type: `tx:committed`,
term: 1,
seq: 1,
txId: `tx-reload-stable-key`,
latestRowVersion: 1,
requiresFullReload: true,
})

await flushAsyncWork()
await flushAsyncWork()

expect(adapter.loadSubsetCalls.length).toBe(loadSubsetCallsBeforeReload + 1)
})

it(`paginated subset falls back to full reload`, async () => {
const adapter = createRecordingAdapter([{ id: `1`, title: `Row 1` }])
const coordinator = createCoordinatorHarness()
Expand Down
32 changes: 27 additions & 5 deletions packages/offline-transactions/src/executor/KeyScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,37 @@ export class KeyScheduler {
return [...this.pendingTransactions]
}

getEarliestRetryTime(): number | null {
let earliestRetryTime: number | null = null

for (const transaction of this.pendingTransactions) {
earliestRetryTime =
earliestRetryTime === null
? transaction.nextAttemptAt
: Math.min(earliestRetryTime, transaction.nextAttemptAt)
}

return earliestRetryTime
}

updateTransactions(updatedTransactions: Array<OfflineTransaction>): void {
for (const updatedTx of updatedTransactions) {
const index = this.pendingTransactions.findIndex(
(tx) => tx.id === updatedTx.id,
if (updatedTransactions.length === 0) {
return
}

const updatedById = new Map(
updatedTransactions.map((transaction) => [transaction.id, transaction]),
)

for (let index = 0; index < this.pendingTransactions.length; index++) {
const updatedTransaction = updatedById.get(
this.pendingTransactions[index]!.id,
)
if (index >= 0) {
this.pendingTransactions[index] = updatedTx
if (updatedTransaction) {
this.pendingTransactions[index] = updatedTransaction
}
}

// Re-sort to maintain FIFO order after updates
this.pendingTransactions.sort(
(a, b) => a.createdAt.getTime() - b.createdAt.getTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,11 @@ export class TransactionExecutor {
// Schedule retry timer for loaded transactions
this.scheduleNextRetry()

const filteredTransactionIds = new Set(
filteredTransactions.map((transaction) => transaction.id),
)
const removedTransactions = transactions.filter(
(tx) => !filteredTransactions.some((filtered) => filtered.id === tx.id),
(transaction) => !filteredTransactionIds.has(transaction.id),
)

if (removedTransactions.length > 0) {
Expand Down Expand Up @@ -355,13 +358,7 @@ export class TransactionExecutor {
}

private getEarliestRetryTime(): number | null {
const allTransactions = this.scheduler.getAllPendingTransactions()

if (allTransactions.length === 0) {
return null
}

return Math.min(...allTransactions.map((tx) => tx.nextAttemptAt))
return this.scheduler.getEarliestRetryTime()
}

private clearRetryTimer(): void {
Expand All @@ -380,10 +377,11 @@ export class TransactionExecutor {
}

resetRetryDelays(): void {
const nextAttemptAt = Date.now()
const allTransactions = this.scheduler.getAllPendingTransactions()
const updatedTransactions = allTransactions.map((transaction) => ({
...transaction,
nextAttemptAt: Date.now(),
nextAttemptAt,
}))

this.scheduler.updateTransactions(updatedTransactions)
Expand Down
Loading
Loading