Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-loaded-subsets-dedup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

fix(db): prevent unbounded loadedSubsets growth in subscription and join lazy loading
42 changes: 24 additions & 18 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PropRef, Value } from '../query/ir.js'
import { EventEmitter } from '../event-emitter.js'
import { compileExpression } from '../query/compiler/evaluators.js'
import { buildCursor } from '../utils/cursor.js'
import { isPredicateSubset } from '../query/predicate-utils.js'
import {
createFilterFunctionFromExpression,
createFilteredCallback,
Expand Down Expand Up @@ -376,17 +377,19 @@ export class CollectionSubscription
orderBy: opts?.orderBy,
limit: opts?.limit,
}
const syncResult = this.collection._sync.loadSubset(loadOptions)

// Pass the raw loadSubset result to the caller for external tracking
opts?.onLoadSubsetResult?.(syncResult)

// Track this loadSubset call so we can unload it later
this.loadedSubsets.push(loadOptions)
// Check dedup BEFORE calling loadSubset to avoid sending duplicate requests
const isAlreadyCovered = this.loadedSubsets.some((existing) =>
isPredicateSubset(loadOptions, existing),
)

const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true
if (trackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
if (!isAlreadyCovered) {
const syncResult = this.collection._sync.loadSubset(loadOptions)
opts?.onLoadSubsetResult?.(syncResult)
this.loadedSubsets.push(loadOptions)
const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true
if (trackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
}
}

// Also load data immediately from the collection
Expand Down Expand Up @@ -614,15 +617,18 @@ export class CollectionSubscription
offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset
subscription: this,
}
const syncResult = this.collection._sync.loadSubset(loadOptions)

// Pass the raw loadSubset result to the caller for external tracking
onLoadSubsetResult?.(syncResult)
// Check dedup BEFORE calling loadSubset to avoid sending duplicate requests
const isAlreadyCovered = this.loadedSubsets.some((existing) =>
isPredicateSubset(loadOptions, existing),
)

// Track this loadSubset call
this.loadedSubsets.push(loadOptions)
if (shouldTrackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
if (!isAlreadyCovered) {
const syncResult = this.collection._sync.loadSubset(loadOptions)
onLoadSubsetResult?.(syncResult)
this.loadedSubsets.push(loadOptions)
if (shouldTrackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
}
}
}

Expand Down
12 changes: 10 additions & 2 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,23 @@ function processJoin(

// Set up lazy loading: intercept active side's stream and dynamically load
// matching rows from lazy side based on join keys.
const loadedJoinKeys = new Set<unknown>()
const activePipelineWithLoading: IStreamBuilder<
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
> = activePipeline.pipe(
tap((data) => {
// Deduplicate and filter null keys before requesting snapshot
const joinKeys = [
const allJoinKeys = [
...new Set(
data
.getInner()
.map(([[joinKey]]) => joinKey)
.filter((key) => key != null),
),
]
const joinKeys = allJoinKeys.filter(
(key) => !loadedJoinKeys.has(key),
)

if (joinKeys.length === 0) {
return
Expand Down Expand Up @@ -314,7 +318,11 @@ function processJoin(
optimizedOnly: true,
})

if (!loaded) {
if (loaded) {
for (const key of joinKeys) {
loadedJoinKeys.add(key)
}
} else {
Comment on lines +321 to +325
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

loadedJoinKeys is updated before async load success, which can suppress needed retries.

requestSnapshot can return true even if the underlying async loadSubset later rejects. Adding keys immediately means failed keys won’t be retried in later batches.

Suggested fix
-            const loaded = lazySourceSubscription.requestSnapshot({
+            let loadResult: Promise<void> | true = true
+            const loaded = lazySourceSubscription.requestSnapshot({
               where: inArray(lazyJoinRef, joinKeys),
               optimizedOnly: true,
+              onLoadSubsetResult: (result) => {
+                loadResult = result
+              },
             })

             if (loaded) {
-              for (const key of joinKeys) {
-                loadedJoinKeys.add(key)
-              }
+              if (loadResult instanceof Promise) {
+                loadResult
+                  .then(() => {
+                    for (const key of joinKeys) loadedJoinKeys.add(key)
+                  })
+                  .catch(() => {
+                    // keep keys retriable
+                  })
+              } else {
+                for (const key of joinKeys) loadedJoinKeys.add(key)
+              }
             } else {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/src/query/compiler/joins.ts` around lines 321 - 325, The code is
adding joinKeys into loadedJoinKeys before confirming the async loadSubset
succeeded, which can prevent retries when loadSubset later rejects; in the
requestSnapshot / join handling logic (referencing requestSnapshot, loadSubset,
loadedJoinKeys, and joinKeys), move the loadedJoinKeys.add(key) updates so they
only run after the async loadSubset/requestSnapshot completes successfully
(i.e., inside the success path or then block), and ensure failed loads do not
mark keys as loaded so subsequent batches can retry.

// Snapshot wasn't sent because it could not be loaded from the indexes
const collectionId = target.collection.id
const fieldPath = target.path.join(`.`)
Expand Down
15 changes: 11 additions & 4 deletions packages/db/tests/collection-subscription.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, expect, it } from 'vitest'
import { createCollection } from '../src/collection/index.js'
import { Func, PropRef, Value } from '../src/query/ir.js'
import { flushPromises } from './utils'

describe(`CollectionSubscription status tracking`, () => {
Expand Down Expand Up @@ -130,12 +131,18 @@ describe(`CollectionSubscription status tracking`, () => {
includeInitialState: false,
})

// Trigger first load
subscription.requestSnapshot({ optimizedOnly: false })
// Trigger first load with a distinct where expression
subscription.requestSnapshot({
optimizedOnly: false,
where: new Func(`eq`, [new PropRef([`id`]), new Value(`a`)]),
})
expect(subscription.status).toBe(`loadingSubset`)

// Trigger second load
subscription.requestSnapshot({ optimizedOnly: false })
// Trigger second load with a different where expression so dedup doesn't skip it
subscription.requestSnapshot({
optimizedOnly: false,
where: new Func(`eq`, [new PropRef([`id`]), new Value(`b`)]),
})
expect(subscription.status).toBe(`loadingSubset`)

// Resolve first promise
Expand Down
192 changes: 192 additions & 0 deletions packages/db/tests/query/loaded-subsets-dedup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { describe, expect, it, vi } from 'vitest'
import { createLiveQueryCollection, eq } from '../../src/query/index.js'
import { createCollection } from '../../src/collection/index.js'
import { BasicIndex } from '../../src/indexes/basic-index.js'
import { extractSimpleComparisons } from '../../src/query/expression-helpers.js'
import { flushPromises } from '../utils.js'
import type {
ChangeMessageOrDeleteKeyMessage,
LoadSubsetOptions,
} from '../../src/types.js'

type Parent = {
id: number
name: string
}

type Child = {
id: number
parentId: number
title: string
}

const sampleParents: Array<Parent> = [
{ id: 1, name: `Parent A` },
{ id: 2, name: `Parent B` },
{ id: 3, name: `Parent C` },
]

const sampleChildren: Array<Child> = [
{ id: 10, parentId: 1, title: `Child A1` },
{ id: 11, parentId: 1, title: `Child A2` },
{ id: 20, parentId: 2, title: `Child B1` },
]

describe(`loadedSubsets deduplication`, () => {
function createParentsCollection() {
return createCollection<Parent>({
id: `dedup-parents`,
getKey: (p) => p.id,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
for (const parent of sampleParents) {
write({ type: `insert`, value: parent })
}
commit()
markReady()
},
},
})
}

function createChildrenCollectionWithTracking() {
const loadSubsetCalls: Array<LoadSubsetOptions> = []

const collection = createCollection<Child>({
id: `dedup-children`,
getKey: (child) => child.id,
syncMode: `on-demand`,
autoIndex: `eager`,
defaultIndexType: BasicIndex,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
for (const child of sampleChildren) {
write({ type: `insert`, value: child })
}
commit()
markReady()
return {
loadSubset: vi.fn((options: LoadSubsetOptions) => {
loadSubsetCalls.push(options)
return Promise.resolve()
}),
}
},
},
})

return { collection, loadSubsetCalls }
}

it(`should not grow loadedSubsets when requestSnapshot is called with the same predicate`, async () => {
const parents = createParentsCollection()
const { collection: children, loadSubsetCalls } =
createChildrenCollectionWithTracking()

const liveQuery = createLiveQueryCollection((q) =>
q
.from({ p: parents })
.join({ c: children }, ({ p, c }) => eq(c.parentId, p.id))
.select(({ p, c }) => ({
parentId: p.id,
parentName: p.name,
childId: c.id,
childTitle: c.title,
})),
)

await liveQuery.preload()

const initialCallCount = loadSubsetCalls.length
expect(initialCallCount).toBeGreaterThan(0)

const firstCall = loadSubsetCalls[0]!
expect(firstCall.where).toBeDefined()

const filters = extractSimpleComparisons(firstCall.where)
expect(filters).toEqual([
{
field: [`parentId`],
operator: `in`,
value: expect.arrayContaining([1, 2, 3]),
},
])

// Trigger a second preload — this re-runs the pipeline with the same
// predicates, so the dedup layer should prevent any new loadSubset calls.
await liveQuery.preload()

expect(loadSubsetCalls.length).toBe(initialCallCount)
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.

it(`should deduplicate join key requests across pipeline batches`, async () => {
let parentBegin: () => void
let parentWrite: (msg: ChangeMessageOrDeleteKeyMessage<Parent, number>) => void
let parentCommit: () => void

const parents = createCollection<Parent>({
id: `dedup-parents-sync`,
getKey: (p) => p.id,
sync: {
sync: ({ begin, write, commit, markReady }) => {
parentBegin = begin
parentWrite = write
parentCommit = commit

begin()
for (const parent of sampleParents) {
write({ type: `insert`, value: parent })
}
commit()
markReady()
},
},
})

const { collection: children, loadSubsetCalls } =
createChildrenCollectionWithTracking()

const liveQuery = createLiveQueryCollection((q) =>
q
.from({ p: parents })
.join({ c: children }, ({ p, c }) => eq(c.parentId, p.id))
.select(({ p, c }) => ({
parentId: p.id,
parentName: p.name,
childId: c.id,
childTitle: c.title,
})),
)

await liveQuery.preload()

const callCountAfterPreload = loadSubsetCalls.length

parentBegin!()
parentWrite!({ type: `insert`, value: { id: 4, name: `Parent D` } })
parentCommit!()
await flushPromises()

const newCalls = loadSubsetCalls.slice(callCountAfterPreload)
expect(newCalls.length).toBeGreaterThan(0)

let inFilterCount = 0
for (const call of newCalls) {
if (!call.where) continue
const filters = extractSimpleComparisons(call.where)
for (const filter of filters) {
if (filter.operator === `in`) {
inFilterCount++
const values = filter.value as Array<number>
expect(values).toContain(4)
expect(values).not.toContain(1)
expect(values).not.toContain(2)
expect(values).not.toContain(3)
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
expect(inFilterCount).toBeGreaterThan(0)
})
})