From f8828b345290e98b84b08abb7fed4176fb3f32a8 Mon Sep 17 00:00:00 2001 From: Anton Vozghrin Date: Mon, 25 May 2026 10:11:06 +0300 Subject: [PATCH 1/3] fix(db): deduplicate loadedSubsets and join key requests --- .changeset/fix-loaded-subsets-dedup.md | 5 + packages/db/src/collection/subscription.ts | 15 +- packages/db/src/query/compiler/joins.ts | 12 +- .../tests/query/loaded-subsets-dedup.test.ts | 178 ++++++++++++++++++ 4 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 .changeset/fix-loaded-subsets-dedup.md create mode 100644 packages/db/tests/query/loaded-subsets-dedup.test.ts diff --git a/.changeset/fix-loaded-subsets-dedup.md b/.changeset/fix-loaded-subsets-dedup.md new file mode 100644 index 000000000..c5c405d0e --- /dev/null +++ b/.changeset/fix-loaded-subsets-dedup.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +fix(db): prevent unbounded loadedSubsets growth in subscription and join lazy loading diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 2d48add4b..c5d151d79 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -4,6 +4,7 @@ import { PropRef, Value } from '../query/ir.js' import { EventEmitter } from '../event-emitter.js' import { compileExpression } from '../query/compiler/evaluators.js' import { buildCursor } from '../utils/cursor.js' +import { isPredicateSubset } from '../query/predicate-utils.js' import { createFilterFunctionFromExpression, createFilteredCallback, @@ -382,7 +383,12 @@ export class CollectionSubscription opts?.onLoadSubsetResult?.(syncResult) // Track this loadSubset call so we can unload it later - this.loadedSubsets.push(loadOptions) + const isAlreadyCovered = this.loadedSubsets.some((existing) => + isPredicateSubset(loadOptions, existing), + ) + if (!isAlreadyCovered) { + this.loadedSubsets.push(loadOptions) + } const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true if (trackLoadSubsetPromise) { @@ -620,7 +626,12 @@ export class CollectionSubscription onLoadSubsetResult?.(syncResult) // Track this loadSubset call - this.loadedSubsets.push(loadOptions) + const isAlreadyCovered = this.loadedSubsets.some((existing) => + isPredicateSubset(loadOptions, existing), + ) + if (!isAlreadyCovered) { + this.loadedSubsets.push(loadOptions) + } if (shouldTrackLoadSubsetPromise) { this.trackLoadSubsetPromise(syncResult) } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 0c37e05f4..e77248ce9 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -273,12 +273,13 @@ function processJoin( // Set up lazy loading: intercept active side's stream and dynamically load // matching rows from lazy side based on join keys. + const loadedJoinKeys = new Set() const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( tap((data) => { // Deduplicate and filter null keys before requesting snapshot - const joinKeys = [ + const allJoinKeys = [ ...new Set( data .getInner() @@ -286,6 +287,9 @@ function processJoin( .filter((key) => key != null), ), ] + const joinKeys = allJoinKeys.filter( + (key) => !loadedJoinKeys.has(key), + ) if (joinKeys.length === 0) { return @@ -314,7 +318,11 @@ function processJoin( optimizedOnly: true, }) - if (!loaded) { + if (loaded) { + for (const key of joinKeys) { + loadedJoinKeys.add(key) + } + } else { // Snapshot wasn't sent because it could not be loaded from the indexes const collectionId = target.collection.id const fieldPath = target.path.join(`.`) diff --git a/packages/db/tests/query/loaded-subsets-dedup.test.ts b/packages/db/tests/query/loaded-subsets-dedup.test.ts new file mode 100644 index 000000000..a452a25f9 --- /dev/null +++ b/packages/db/tests/query/loaded-subsets-dedup.test.ts @@ -0,0 +1,178 @@ +import { describe, expect, it, vi } from 'vitest' +import { createLiveQueryCollection, eq } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { extractSimpleComparisons } from '../../src/query/expression-helpers.js' +import { flushPromises } from '../utils.js' +import type { LoadSubsetOptions } from '../../src/types.js' + +type Parent = { + id: number + name: string +} + +type Child = { + id: number + parentId: number + title: string +} + +const sampleParents: Array = [ + { id: 1, name: `Parent A` }, + { id: 2, name: `Parent B` }, + { id: 3, name: `Parent C` }, +] + +const sampleChildren: Array = [ + { id: 10, parentId: 1, title: `Child A1` }, + { id: 11, parentId: 1, title: `Child A2` }, + { id: 20, parentId: 2, title: `Child B1` }, +] + +describe(`loadedSubsets deduplication`, () => { + function createParentsCollection() { + return createCollection({ + id: `dedup-parents`, + getKey: (p) => p.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const parent of sampleParents) { + write({ type: `insert`, value: parent }) + } + commit() + markReady() + }, + }, + }) + } + + function createChildrenCollectionWithTracking() { + const loadSubsetCalls: Array = [] + + const collection = createCollection({ + id: `dedup-children`, + getKey: (child) => child.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const child of sampleChildren) { + write({ type: `insert`, value: child }) + } + commit() + markReady() + return { + loadSubset: vi.fn((options: LoadSubsetOptions) => { + loadSubsetCalls.push(options) + return Promise.resolve() + }), + } + }, + }, + }) + + return { collection, loadSubsetCalls } + } + + it(`should not grow loadedSubsets when requestSnapshot is called with the same predicate`, async () => { + const parents = createParentsCollection() + const { collection: children, loadSubsetCalls } = + createChildrenCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q + .from({ p: parents }) + .join({ c: children }, ({ p, c }) => eq(c.parentId, p.id)) + .select(({ p, c }) => ({ + parentId: p.id, + parentName: p.name, + childId: c.id, + childTitle: c.title, + })), + ) + + await liveQuery.preload() + + const initialCallCount = loadSubsetCalls.length + expect(initialCallCount).toBeGreaterThan(0) + + const firstCall = loadSubsetCalls[0]! + expect(firstCall.where).toBeDefined() + + const filters = extractSimpleComparisons(firstCall.where) + expect(filters).toEqual([ + { + field: [`parentId`], + operator: `in`, + value: expect.arrayContaining([1, 2, 3]), + }, + ]) + + expect(loadSubsetCalls.length).toBe(initialCallCount) + }) + + it(`should deduplicate join key requests across pipeline batches`, async () => { + let parentBegin: () => void + let parentWrite: (msg: { type: string; value: Parent }) => void + let parentCommit: () => void + + const parents = createCollection({ + id: `dedup-parents-sync`, + getKey: (p) => p.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + parentBegin = begin + parentWrite = write as any + parentCommit = commit + + begin() + for (const parent of sampleParents) { + write({ type: `insert`, value: parent }) + } + commit() + markReady() + }, + }, + }) + + const { collection: children, loadSubsetCalls } = + createChildrenCollectionWithTracking() + + const liveQuery = createLiveQueryCollection((q) => + q + .from({ p: parents }) + .join({ c: children }, ({ p, c }) => eq(c.parentId, p.id)) + .select(({ p, c }) => ({ + parentId: p.id, + parentName: p.name, + childId: c.id, + childTitle: c.title, + })), + ) + + await liveQuery.preload() + + const callCountAfterPreload = loadSubsetCalls.length + + parentBegin!() + parentWrite!({ type: `insert`, value: { id: 4, name: `Parent D` } }) + parentCommit!() + await flushPromises() + + const newCalls = loadSubsetCalls.slice(callCountAfterPreload) + + for (const call of newCalls) { + if (!call.where) continue + const filters = extractSimpleComparisons(call.where) + for (const filter of filters) { + if (filter.operator === `in`) { + const values = filter.value as Array + expect(values).toContain(4) + expect(values).not.toContain(1) + expect(values).not.toContain(2) + expect(values).not.toContain(3) + } + } + } + }) +}) From 4c1262e03717224d79c45beceaefc6c8b71efca9 Mon Sep 17 00:00:00 2001 From: Anton Vozghrin Date: Mon, 25 May 2026 10:49:43 +0300 Subject: [PATCH 2/3] fix: check dedup before loadSubset, improve test assertions --- packages/db/src/collection/subscription.ts | 35 ++++++++----------- .../db/tests/collection-subscription.test.ts | 15 +++++--- .../tests/query/loaded-subsets-dedup.test.ts | 16 +++++++-- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index c5d151d79..718cb175d 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -377,22 +377,19 @@ export class CollectionSubscription orderBy: opts?.orderBy, limit: opts?.limit, } - const syncResult = this.collection._sync.loadSubset(loadOptions) - - // Pass the raw loadSubset result to the caller for external tracking - opts?.onLoadSubsetResult?.(syncResult) - - // Track this loadSubset call so we can unload it later + // Check dedup BEFORE calling loadSubset to avoid sending duplicate requests const isAlreadyCovered = this.loadedSubsets.some((existing) => isPredicateSubset(loadOptions, existing), ) + if (!isAlreadyCovered) { + const syncResult = this.collection._sync.loadSubset(loadOptions) + opts?.onLoadSubsetResult?.(syncResult) this.loadedSubsets.push(loadOptions) - } - - const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true - if (trackLoadSubsetPromise) { - this.trackLoadSubsetPromise(syncResult) + const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true + if (trackLoadSubsetPromise) { + this.trackLoadSubsetPromise(syncResult) + } } // Also load data immediately from the collection @@ -620,20 +617,18 @@ export class CollectionSubscription offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset subscription: this, } - const syncResult = this.collection._sync.loadSubset(loadOptions) - - // Pass the raw loadSubset result to the caller for external tracking - onLoadSubsetResult?.(syncResult) - - // Track this loadSubset call + // Check dedup BEFORE calling loadSubset to avoid sending duplicate requests const isAlreadyCovered = this.loadedSubsets.some((existing) => isPredicateSubset(loadOptions, existing), ) + if (!isAlreadyCovered) { + const syncResult = this.collection._sync.loadSubset(loadOptions) + onLoadSubsetResult?.(syncResult) this.loadedSubsets.push(loadOptions) - } - if (shouldTrackLoadSubsetPromise) { - this.trackLoadSubsetPromise(syncResult) + if (shouldTrackLoadSubsetPromise) { + this.trackLoadSubsetPromise(syncResult) + } } } diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts index 65465a6a8..256db3ea4 100644 --- a/packages/db/tests/collection-subscription.test.ts +++ b/packages/db/tests/collection-subscription.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from 'vitest' import { createCollection } from '../src/collection/index.js' +import { Func, PropRef, Value } from '../src/query/ir.js' import { flushPromises } from './utils' describe(`CollectionSubscription status tracking`, () => { @@ -130,12 +131,18 @@ describe(`CollectionSubscription status tracking`, () => { includeInitialState: false, }) - // Trigger first load - subscription.requestSnapshot({ optimizedOnly: false }) + // Trigger first load with a distinct where expression + subscription.requestSnapshot({ + optimizedOnly: false, + where: new Func(`eq`, [new PropRef([`id`]), new Value(`a`)]), + }) expect(subscription.status).toBe(`loadingSubset`) - // Trigger second load - subscription.requestSnapshot({ optimizedOnly: false }) + // Trigger second load with a different where expression so dedup doesn't skip it + subscription.requestSnapshot({ + optimizedOnly: false, + where: new Func(`eq`, [new PropRef([`id`]), new Value(`b`)]), + }) expect(subscription.status).toBe(`loadingSubset`) // Resolve first promise diff --git a/packages/db/tests/query/loaded-subsets-dedup.test.ts b/packages/db/tests/query/loaded-subsets-dedup.test.ts index a452a25f9..4496f7909 100644 --- a/packages/db/tests/query/loaded-subsets-dedup.test.ts +++ b/packages/db/tests/query/loaded-subsets-dedup.test.ts @@ -1,9 +1,13 @@ import { describe, expect, it, vi } from 'vitest' import { createLiveQueryCollection, eq } from '../../src/query/index.js' import { createCollection } from '../../src/collection/index.js' +import { BasicIndex } from '../../src/indexes/basic-index.js' import { extractSimpleComparisons } from '../../src/query/expression-helpers.js' import { flushPromises } from '../utils.js' -import type { LoadSubsetOptions } from '../../src/types.js' +import type { + ChangeMessageOrDeleteKeyMessage, + LoadSubsetOptions, +} from '../../src/types.js' type Parent = { id: number @@ -53,6 +57,8 @@ describe(`loadedSubsets deduplication`, () => { id: `dedup-children`, getKey: (child) => child.id, syncMode: `on-demand`, + autoIndex: `eager`, + defaultIndexType: BasicIndex, sync: { sync: ({ begin, write, commit, markReady }) => { begin() @@ -113,7 +119,7 @@ describe(`loadedSubsets deduplication`, () => { it(`should deduplicate join key requests across pipeline batches`, async () => { let parentBegin: () => void - let parentWrite: (msg: { type: string; value: Parent }) => void + let parentWrite: (msg: ChangeMessageOrDeleteKeyMessage) => void let parentCommit: () => void const parents = createCollection({ @@ -122,7 +128,7 @@ describe(`loadedSubsets deduplication`, () => { sync: { sync: ({ begin, write, commit, markReady }) => { parentBegin = begin - parentWrite = write as any + parentWrite = write parentCommit = commit begin() @@ -160,12 +166,15 @@ describe(`loadedSubsets deduplication`, () => { await flushPromises() const newCalls = loadSubsetCalls.slice(callCountAfterPreload) + expect(newCalls.length).toBeGreaterThan(0) + let inFilterCount = 0 for (const call of newCalls) { if (!call.where) continue const filters = extractSimpleComparisons(call.where) for (const filter of filters) { if (filter.operator === `in`) { + inFilterCount++ const values = filter.value as Array expect(values).toContain(4) expect(values).not.toContain(1) @@ -174,5 +183,6 @@ describe(`loadedSubsets deduplication`, () => { } } } + expect(inFilterCount).toBeGreaterThan(0) }) }) From 96350c105498577b9678bd4eaa7d31e4d9d64d69 Mon Sep 17 00:00:00 2001 From: Anton Vozghrin Date: Mon, 25 May 2026 11:12:52 +0300 Subject: [PATCH 3/3] test: exercise second snapshot request in dedup test --- packages/db/tests/query/loaded-subsets-dedup.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/db/tests/query/loaded-subsets-dedup.test.ts b/packages/db/tests/query/loaded-subsets-dedup.test.ts index 4496f7909..37e4fb7d0 100644 --- a/packages/db/tests/query/loaded-subsets-dedup.test.ts +++ b/packages/db/tests/query/loaded-subsets-dedup.test.ts @@ -114,6 +114,10 @@ describe(`loadedSubsets deduplication`, () => { }, ]) + // Trigger a second preload — this re-runs the pipeline with the same + // predicates, so the dedup layer should prevent any new loadSubset calls. + await liveQuery.preload() + expect(loadSubsetCalls.length).toBe(initialCallCount) })