fix(db): filter out-of-window SSE inserts in collection-subscriber#1555
fix(db): filter out-of-window SSE inserts in collection-subscriber#1555v-anton wants to merge 2 commits into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughsendChangesInRange now computes the ordered window (limit+offset) and a window boundary, filters incoming SSE changes to exclude out-of-window inserts (with exceptions), and uses the filtered set for tracking and downstream updates. Tests and a changeset entry validate and document the fix. ChangesOut-of-window SSE Insert Filtering
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts`:
- Around line 13-86: Add explicit tests that cover limit/offset edge cases and
async event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.
- Around line 72-74: The test currently uses a fixed 50ms setTimeout to wait for
loadNextItems to fetch a replacement, which is flaky; replace this with a
condition-based wait that polls until the expected window state is observed (or
a timeout elapses). Specifically, poll the collection subscriber's
observable/window (e.g., check collectionSubscriber.window,
collectionSubscriber.getWindowItems(), or the items array that loadNextItems
should update) until the replacement item is present, using a short interval
(e.g., 10–50ms) and a reasonable overall timeout (e.g., 1–2s), and fail the test
if the condition is not met; you can extract a small helper
waitForCondition(pollFn, timeout) to reuse across tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 50122b4b-71f1-4239-8840-dfcd3ee2bad3
📒 Files selected for processing (3)
.changeset/fix-out-of-window-sse-filter.mdpackages/db/src/query/live/collection-subscriber.tspackages/db/tests/collection-subscriber-out-of-window-sse.test.ts
| it(`should not promote an out-of-window SSE insert when an in-window item is deleted`, async () => { | ||
| const initialData: Array<TestItem> = [ | ||
| { id: `1`, value: 100 }, | ||
| { id: `2`, value: 90 }, | ||
| { id: `3`, value: 80 }, | ||
| { id: `4`, value: 70 }, | ||
| ] | ||
|
|
||
| const sourceCollection = createCollection( | ||
| mockSyncCollectionOptions({ | ||
| id: `sse-window-filter`, | ||
| getKey: (item: TestItem) => item.id, | ||
| initialData, | ||
| autoIndex: `eager`, | ||
| defaultIndexType: BTreeIndex, | ||
| }), | ||
| ) | ||
|
|
||
| await sourceCollection.preload() | ||
|
|
||
| const liveQueryCollection = createLiveQueryCollection((q) => | ||
| q | ||
| .from({ items: sourceCollection }) | ||
| .orderBy(({ items }) => items.value, `desc`) | ||
| .limit(3) | ||
| .select(({ items }) => ({ | ||
| id: items.id, | ||
| value: items.value, | ||
| })), | ||
| ) | ||
|
|
||
| await liveQueryCollection.preload() | ||
|
|
||
| const initialResults = Array.from(liveQueryCollection.values()) | ||
| expect(initialResults).toHaveLength(3) | ||
| expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`]) | ||
|
|
||
| // SSE delivers an insert for an item that sorts BELOW the current window | ||
| // (value 10 is lower than all top-3 values: 100, 90, 80) | ||
| sourceCollection.utils.begin() | ||
| sourceCollection.utils.write({ | ||
| type: `insert`, | ||
| value: { id: `out-of-window`, value: 10 }, | ||
| }) | ||
| sourceCollection.utils.commit() | ||
|
|
||
| // Window should be unchanged | ||
| let results = Array.from(liveQueryCollection.values()) | ||
| expect(results).toHaveLength(3) | ||
| expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`]) | ||
|
|
||
| // Now delete one of the top-3 items | ||
| sourceCollection.utils.begin() | ||
| sourceCollection.utils.write({ | ||
| type: `delete`, | ||
| value: { id: `2`, value: 90 }, | ||
| }) | ||
| sourceCollection.utils.commit() | ||
|
|
||
| // Wait for loadNextItems to fetch the replacement | ||
| await new Promise((resolve) => setTimeout(resolve, 50)) | ||
|
|
||
| results = Array.from(liveQueryCollection.values()) | ||
| expect(results).toHaveLength(3) | ||
|
|
||
| // The replacement should be item 4 (value 70) — the next item from the | ||
| // BTree — NOT the out-of-window SSE insert (value 10). | ||
| expect( | ||
| results.map((r) => r.id), | ||
| `Expected item 4 (value 70) to replace deleted item 2, ` + | ||
| `not the out-of-window SSE insert (value 10). ` + | ||
| `Got: ${JSON.stringify(results.map((r) => ({ id: r.id, value: r.value })))}`, | ||
| ).toEqual([`1`, `3`, `4`]) | ||
| }) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add explicit edge-case tests for limit/offset and async ordering.
This test reproduces the core bug path well, but it still misses dedicated corner-case coverage for limit/offset boundaries (for example limit: 0 and non-zero offset) and async event ordering/race scenarios, which are high-value for this fix area.
As per coding guidelines, “Test corner cases including: … async race conditions, and limit/offset edge cases”.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts` around
lines 13 - 86, Add explicit tests that cover limit/offset edge cases and async
event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.
🎯 Changes
sendChangesInRangeinCollectionSubscriberpassed all SSE changes to D2 unfiltered. Inserts that sort outside the currentorderBy + limitwindow were tracked by D2's TopKState. When an in-window item was later deleted, D2 promoted the tracked out-of-window item instead of triggeringloadNextItemsto fetch the correct next item from the BTree.The fix adds a pre-filter that drops new inserts whose sort value falls outside the current window boundary. Deletes, updates, already-sent keys, and inserts within the window pass through unchanged.
✅ Checklist
pnpm test:pr.🚀 Release Impact
Summary by CodeRabbit
Bug Fixes
Tests
Chore