Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-out-of-window-sse-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

fix(db): filter SSE inserts outside the current window before they reach D2
17 changes: 14 additions & 3 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,26 @@ export class CollectionSubscriber<
// Use a holder to forward-reference subscription in the callback
const subscriptionHolder: { current?: CollectionSubscription } = {}

const windowSize = limit + offset
const sendChangesInRange = (
changes: Iterable<ChangeMessage<any, string | number>>,
) => {
const changesArray = Array.isArray(changes) ? changes : [...changes]

this.trackSentValues(changesArray, orderByInfo.comparator)
const windowBoundary = this.biggest
const relevant =
windowBoundary === undefined || windowSize === Infinity
? changesArray
: changesArray.filter((change) => {
if (change.type !== `insert`) return true
if (this.sentToD2Keys.has(change.key)) return true
if (this.sentToD2Keys.size < windowSize) return true
return orderByInfo.comparator(windowBoundary, change.value) >= 0
})

this.trackSentValues(relevant, orderByInfo.comparator)

// Split live updates into a delete of the old value and an insert of the new value
const splittedChanges = splitUpdates(changesArray)
const splittedChanges = splitUpdates(relevant)
this.sendChangesToPipelineWithTracking(
splittedChanges,
subscriptionHolder.current!,
Expand Down
209 changes: 209 additions & 0 deletions packages/db/tests/collection-subscriber-out-of-window-sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import { describe, expect, it, vi } from 'vitest'
import { createCollection } from '../src/collection/index.js'
import { BTreeIndex } from '../src/indexes/btree-index.js'
import { createLiveQueryCollection } from '../src/query/index.js'
import { mockSyncCollectionOptions } from './utils.js'

type TestItem = {
id: string
value: number
}

describe(`CollectionSubscriber out-of-window SSE filter`, () => {
it(`should not promote an out-of-window SSE insert when an in-window item is deleted`, async () => {
const initialData: Array<TestItem> = [
{ id: `1`, value: 100 },
{ id: `2`, value: 90 },
{ id: `3`, value: 80 },
{ id: `4`, value: 70 },
]

const sourceCollection = createCollection(
mockSyncCollectionOptions({
id: `sse-window-filter`,
getKey: (item: TestItem) => item.id,
initialData,
autoIndex: `eager`,
defaultIndexType: BTreeIndex,
}),
)

await sourceCollection.preload()

const liveQueryCollection = createLiveQueryCollection((q) =>
q
.from({ items: sourceCollection })
.orderBy(({ items }) => items.value, `desc`)
.limit(3)
.select(({ items }) => ({
id: items.id,
value: items.value,
})),
)

await liveQueryCollection.preload()

const initialResults = Array.from(liveQueryCollection.values())
expect(initialResults).toHaveLength(3)
expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// SSE delivers an insert for an item that sorts BELOW the current window
// (value 10 is lower than all top-3 values: 100, 90, 80)
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `out-of-window`, value: 10 },
})
sourceCollection.utils.commit()

// Window should be unchanged
let results = Array.from(liveQueryCollection.values())
expect(results).toHaveLength(3)
expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// Now delete one of the top-3 items
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `delete`,
value: { id: `2`, value: 90 },
})
sourceCollection.utils.commit()

// Wait for loadNextItems to fetch the replacement
await vi.waitFor(() => {
const r = Array.from(liveQueryCollection.values())
expect(r).toHaveLength(3)
expect(r.some((item) => item.value === 70)).toBe(true)
})

Comment thread
coderabbitai[bot] marked this conversation as resolved.
results = Array.from(liveQueryCollection.values())

// The replacement should be item 4 (value 70) — the next item from the
// BTree — NOT the out-of-window SSE insert (value 10).
expect(
results.map((r) => r.id),
`Expected item 4 (value 70) to replace deleted item 2, ` +
`not the out-of-window SSE insert (value 10). ` +
`Got: ${JSON.stringify(results.map((r) => ({ id: r.id, value: r.value })))}`,
).toEqual([`1`, `3`, `4`])
})
Comment on lines +13 to +89
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add explicit edge-case tests for limit/offset and async ordering.

This test reproduces the core bug path well, but it still misses dedicated corner-case coverage for limit/offset boundaries (for example limit: 0 and non-zero offset) and async event ordering/race scenarios, which are high-value for this fix area.

As per coding guidelines, “Test corner cases including: … async race conditions, and limit/offset edge cases”.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts` around
lines 13 - 86, Add explicit tests that cover limit/offset edge cases and async
event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.


it(`should pass through all inserts when window is not full yet`, async () => {
const sourceCollection = createCollection(
mockSyncCollectionOptions({
id: `sse-window-not-full`,
getKey: (item: TestItem) => item.id,
initialData: [],
autoIndex: `eager`,
defaultIndexType: BTreeIndex,
}),
)

await sourceCollection.preload()

const liveQueryCollection = createLiveQueryCollection((q) =>
q
.from({ items: sourceCollection })
.orderBy(({ items }) => items.value, `desc`)
.limit(3)
.select(({ items }) => ({
id: items.id,
value: items.value,
})),
)

await liveQueryCollection.preload()

// Insert items one at a time — all should pass through since window
// (limit=3) is not full yet, regardless of sort position
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `a`, value: 5 },
})
sourceCollection.utils.commit()

await vi.waitFor(() => {
expect(Array.from(liveQueryCollection.values())).toHaveLength(1)
})

sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `b`, value: 50 },
})
sourceCollection.utils.commit()

await vi.waitFor(() => {
expect(Array.from(liveQueryCollection.values())).toHaveLength(2)
})

sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `c`, value: 1 },
})
sourceCollection.utils.commit()

await vi.waitFor(() => {
expect(Array.from(liveQueryCollection.values())).toHaveLength(3)
})

const results = Array.from(liveQueryCollection.values())
expect(results.map((r) => r.id)).toEqual([`b`, `a`, `c`])
})

it(`should pass through all inserts when there is no limit`, async () => {
const initialData: Array<TestItem> = [
{ id: `1`, value: 100 },
{ id: `2`, value: 50 },
]

const sourceCollection = createCollection(
mockSyncCollectionOptions({
id: `sse-no-limit`,
getKey: (item: TestItem) => item.id,
initialData,
autoIndex: `eager`,
defaultIndexType: BTreeIndex,
}),
)

await sourceCollection.preload()

// No .limit() — infinite window
const liveQueryCollection = createLiveQueryCollection((q) =>
q
.from({ items: sourceCollection })
.orderBy(({ items }) => items.value, `desc`)
.select(({ items }) => ({
id: items.id,
value: items.value,
})),
)

await liveQueryCollection.preload()

expect(Array.from(liveQueryCollection.values())).toHaveLength(2)

// Insert items that would sort at the very bottom — should still pass
// through since there is no limit
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `low-1`, value: 1 },
})
sourceCollection.utils.write({
type: `insert`,
value: { id: `low-2`, value: 2 },
})
sourceCollection.utils.commit()

await vi.waitFor(() => {
expect(Array.from(liveQueryCollection.values())).toHaveLength(4)
})

const results = Array.from(liveQueryCollection.values())
expect(results.map((r) => r.id)).toEqual([`1`, `2`, `low-2`, `low-1`])
})
})