Skip to content

fix(db): filter out-of-window SSE inserts in collection-subscriber#1555

Open
v-anton wants to merge 2 commits into
TanStack:mainfrom
v-anton:fix/out-of-window-sse-filter
Open

fix(db): filter out-of-window SSE inserts in collection-subscriber#1555
v-anton wants to merge 2 commits into
TanStack:mainfrom
v-anton:fix/out-of-window-sse-filter

Conversation

@v-anton
Copy link
Copy Markdown

@v-anton v-anton commented May 25, 2026

🎯 Changes

sendChangesInRange in CollectionSubscriber passed all SSE changes to D2 unfiltered. Inserts that sort outside the current orderBy + limit window were tracked by D2's TopKState. When an in-window item was later deleted, D2 promoted the tracked out-of-window item instead of triggering loadNextItems to fetch the correct next item from the BTree.

The fix adds a pre-filter that drops new inserts whose sort value falls outside the current window boundary. Deletes, updates, already-sent keys, and inserts within the window pass through unchanged.

✅ Checklist

  • I have tested this code locally with pnpm test:pr.
  • I have followed the steps in the Contributing guide.

🚀 Release Impact

  • This change affects published code, and I have generated a changeset.
  • This change is docs/CI/dev-only (no release).

Summary by CodeRabbit

  • Bug Fixes

    • SSE filter now prevents out-of-window insert events from incorrectly replacing items in an active ordered query window.
  • Tests

    • Added tests covering out-of-window insert handling, behavior when window isn’t full, and unbounded (no-limit) query behavior.
  • Chore

    • Added a changeset entry marking a patch release.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 25, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 40278cd8-72df-468d-9972-a992f9a3a918

📥 Commits

Reviewing files that changed from the base of the PR and between ae146bc and 7269fd4.

📒 Files selected for processing (1)
  • packages/db/tests/collection-subscriber-out-of-window-sse.test.ts

📝 Walkthrough

Walkthrough

sendChangesInRange now computes the ordered window (limit+offset) and a window boundary, filters incoming SSE changes to exclude out-of-window inserts (with exceptions), and uses the filtered set for tracking and downstream updates. Tests and a changeset entry validate and document the fix.

Changes

Out-of-window SSE Insert Filtering

Layer / File(s) Summary
Window filtering for ordered changes
packages/db/src/query/live/collection-subscriber.ts, .changeset/fix-out-of-window-sse-filter.md
sendChangesInRange computes window size and boundary from the current largest value, filters incoming changes so non-inserts are always kept and inserts are kept only when already-sent, within window capacity, or at/after the boundary. Filtered changes are used for value tracking and downstream dispatch; release notes updated.
Out-of-window SSE filter tests
packages/db/tests/collection-subscriber-out-of-window-sse.test.ts
Adds Vitest scenarios: verify an out-of-window SSE insert is not promoted when an in-window item is deleted, verify inserts pass through while the window is not yet full, and verify all inserts appear when there is no limit.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I nibble at windows, counting each hop,
Filtering inserts that try to pop.
When bounds are set, I guard the view,
Only in-window friends get through.
Hop on — the order stays true. 🥕

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: filtering out-of-window SSE inserts in the collection-subscriber component.
Description check ✅ Passed The description covers all required template sections: changes are clearly explained with motivation, all checklist items are addressed, and release impact is documented with a changeset generated.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts`:
- Around line 13-86: Add explicit tests that cover limit/offset edge cases and
async event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.
- Around line 72-74: The test currently uses a fixed 50ms setTimeout to wait for
loadNextItems to fetch a replacement, which is flaky; replace this with a
condition-based wait that polls until the expected window state is observed (or
a timeout elapses). Specifically, poll the collection subscriber's
observable/window (e.g., check collectionSubscriber.window,
collectionSubscriber.getWindowItems(), or the items array that loadNextItems
should update) until the replacement item is present, using a short interval
(e.g., 10–50ms) and a reasonable overall timeout (e.g., 1–2s), and fail the test
if the condition is not met; you can extract a small helper
waitForCondition(pollFn, timeout) to reuse across tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 50122b4b-71f1-4239-8840-dfcd3ee2bad3

📥 Commits

Reviewing files that changed from the base of the PR and between be656be and ae146bc.

📒 Files selected for processing (3)
  • .changeset/fix-out-of-window-sse-filter.md
  • packages/db/src/query/live/collection-subscriber.ts
  • packages/db/tests/collection-subscriber-out-of-window-sse.test.ts

Comment on lines +13 to +86
it(`should not promote an out-of-window SSE insert when an in-window item is deleted`, async () => {
const initialData: Array<TestItem> = [
{ id: `1`, value: 100 },
{ id: `2`, value: 90 },
{ id: `3`, value: 80 },
{ id: `4`, value: 70 },
]

const sourceCollection = createCollection(
mockSyncCollectionOptions({
id: `sse-window-filter`,
getKey: (item: TestItem) => item.id,
initialData,
autoIndex: `eager`,
defaultIndexType: BTreeIndex,
}),
)

await sourceCollection.preload()

const liveQueryCollection = createLiveQueryCollection((q) =>
q
.from({ items: sourceCollection })
.orderBy(({ items }) => items.value, `desc`)
.limit(3)
.select(({ items }) => ({
id: items.id,
value: items.value,
})),
)

await liveQueryCollection.preload()

const initialResults = Array.from(liveQueryCollection.values())
expect(initialResults).toHaveLength(3)
expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// SSE delivers an insert for an item that sorts BELOW the current window
// (value 10 is lower than all top-3 values: 100, 90, 80)
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `out-of-window`, value: 10 },
})
sourceCollection.utils.commit()

// Window should be unchanged
let results = Array.from(liveQueryCollection.values())
expect(results).toHaveLength(3)
expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// Now delete one of the top-3 items
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `delete`,
value: { id: `2`, value: 90 },
})
sourceCollection.utils.commit()

// Wait for loadNextItems to fetch the replacement
await new Promise((resolve) => setTimeout(resolve, 50))

results = Array.from(liveQueryCollection.values())
expect(results).toHaveLength(3)

// The replacement should be item 4 (value 70) — the next item from the
// BTree — NOT the out-of-window SSE insert (value 10).
expect(
results.map((r) => r.id),
`Expected item 4 (value 70) to replace deleted item 2, ` +
`not the out-of-window SSE insert (value 10). ` +
`Got: ${JSON.stringify(results.map((r) => ({ id: r.id, value: r.value })))}`,
).toEqual([`1`, `3`, `4`])
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add explicit edge-case tests for limit/offset and async ordering.

This test reproduces the core bug path well, but it still misses dedicated corner-case coverage for limit/offset boundaries (for example limit: 0 and non-zero offset) and async event ordering/race scenarios, which are high-value for this fix area.

As per coding guidelines, “Test corner cases including: … async race conditions, and limit/offset edge cases”.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts` around
lines 13 - 86, Add explicit tests that cover limit/offset edge cases and async
event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.

Comment thread packages/db/tests/collection-subscriber-out-of-window-sse.test.ts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant