Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion e2e/test-server-sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ describe('test-server sync via Docker engine', () => {
expect(messages.filter((msg) => msg.type === 'source_state').length).toBeGreaterThan(1)
}, 120_000)

it('no duplicate record IDs emitted by source across ranges', async () => {
// Disabled: last-segment subdivision now drops the cursor and re-fetches the
// boundary second, so the source can emit duplicate IDs (destination upsert
// handles idempotency). Re-enable if cursor inheritance is restored.
it.skip('no duplicate record IDs emitted by source across ranges', async () => {
const CONC = 5
const destSchema = uniqueSchema('dupcheck')
const ranges = buildSegmentRanges(CONC)
Expand Down
30 changes: 13 additions & 17 deletions packages/protocol/src/utils/binary-subdivision.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ describe('subdivideRanges', () => {
it('splits older remainder into N equal segments', () => {
const remaining: Range[] = [{ gte: iso(0), lt: iso(1000), cursor: 'cur_1' }]
const segments = subdivideRanges(remaining, new Map([[remaining[0], 900]]), N)
// N segments of [0, 900)
expect(segments[segments.length - 1]).toEqual({ gte: iso(450), lt: iso(901), cursor: 'cur_1' })
expect(segments[segments.length - 1]).toEqual({ gte: iso(450), lt: iso(901), cursor: null })
expect(segments).toHaveLength(DEFAULT_SUBDIVISION_FACTOR)
// All segments are contiguous and cover [0, 900)
expect(toUnixSeconds(segments[0].gte)).toBe(0)
expect(toUnixSeconds(segments[segments.length - 1].lt)).toBe(901)
for (let i = 1; i < segments.length; i++) {
expect(segments[i].gte).toBe(segments[i - 1].lt)
}
for (let i = 0; i < segments.length - 1; i++) {
expect(segments[i].cursor).toBeNull()
for (const seg of segments) {
expect(seg.cursor).toBeNull()
}
})

Expand All @@ -58,15 +56,15 @@ describe('subdivideRanges', () => {
expect(out[0]).toEqual(a)
expect(out[1]).toEqual(b)
expect(out[2]).toEqual({ gte: iso(60), lt: iso(90), cursor: null })
expect(out[3]).toEqual({ gte: iso(90), lt: iso(121), cursor: 'cur_c' })
expect(out[3]).toEqual({ gte: iso(90), lt: iso(121), cursor: null })
})

it('passes through a range with cursor but no lastObserved entry', () => {
const range: Range = { gte: iso(0), lt: iso(100), cursor: 'cur_only' }
expect(subdivideRanges([range], new Map(), N)).toEqual([range])
})

it('emits single segment when older remainder is 30 second', () => {
it('passes through small remainders so the existing cursor paginates them', () => {
const remaining: Range[] = [{ gte: iso(1000), lt: iso(1002), cursor: 'cur_tail' }]
const out = subdivideRanges(remaining, new Map([[remaining[0], 1001]]), N)
expect(out).toEqual([{ gte: iso(1000), lt: iso(1002), cursor: 'cur_tail' }])
Expand All @@ -75,21 +73,20 @@ describe('subdivideRanges', () => {
it('N segments for a splittable range', () => {
const remaining: Range[] = [{ gte: iso(0), lt: iso(1000), cursor: 'cur_dense' }]
const out = subdivideRanges(remaining, new Map([[remaining[0], 900]]), N)
expect(out).toHaveLength(DEFAULT_SUBDIVISION_FACTOR) // boundary + N segments
expect(out).toHaveLength(DEFAULT_SUBDIVISION_FACTOR)
expect(out[0]).toEqual({ gte: iso(0), lt: iso(450), cursor: null })
expect(out[1]).toEqual({ gte: iso(450), lt: iso(901), cursor: 'cur_dense' })
expect(out[1]).toEqual({ gte: iso(450), lt: iso(901), cursor: null })

// Segments cover [0, 900) contiguously
for (let i = 2; i < out.length; i++) {
expect(out[i].gte).toBe(out[i - 1].lt)
}
})

it('keeps the entire last observed second in the cursor-backed boundary range', () => {
it('extends final segment past the boundary second to catch records the cursor missed', () => {
const remaining: Range[] = [{ gte: iso(1000), lt: iso(2000), cursor: 'cur_same_second' }]
const out = subdivideRanges(remaining, new Map([[remaining[0], 1900]]), N)
expect(out[0]).toEqual({ gte: iso(1000), lt: iso(1450), cursor: null })
expect(out[1]).toEqual({ gte: iso(1450), lt: iso(1901), cursor: 'cur_same_second' })
expect(out[1]).toEqual({ gte: iso(1450), lt: iso(1901), cursor: null })
})
})

Expand Down Expand Up @@ -120,13 +117,12 @@ function simulateRound(ranges: Range[], density: (ts: number) => number, pageSiz
}

describe('binary subdivision: data distribution scenarios', () => {
it('uniform density: splits into boundary + N segments', () => {
it('uniform density: splits into N segments with fresh cursors', () => {
const ranges: Range[] = [{ gte: iso(0), lt: iso(61000), cursor: null }]
const round1 = simulateRound(ranges, () => 1)
expect(round1.length).toBe(DEFAULT_SUBDIVISION_FACTOR) // segments
expect(round1[round1.length - 1].cursor).not.toBeNull() // boundary is part of the last segment
for (let i = 0; i < round1.length - 1; i++) {
expect(round1[i].cursor).toBeNull() // all except last segment start fresh
expect(round1.length).toBe(DEFAULT_SUBDIVISION_FACTOR)
for (const r of round1) {
expect(r.cursor).toBeNull()
}
})

Expand Down
7 changes: 4 additions & 3 deletions packages/protocol/src/utils/binary-subdivision.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export function toIso(unixSeconds: number): string {
return new Date(unixSeconds * 1000).toISOString()
}


// MARK: - Subdivision

/** Default number of segments to split the older remainder into. */
Expand Down Expand Up @@ -84,7 +85,7 @@ export function subdivideRanges(
const secondsLeft = newEnd - start
const segments = Math.min(n, secondsLeft)
const segmentDuration = Math.floor(secondsLeft / segments)
if (segmentDuration < 30) {
if (segmentDuration <= 1) {
result.push(range)
continue
}
Expand All @@ -96,7 +97,7 @@ export function subdivideRanges(
if (lastSegment) {
// handle the edge case where there are multiple objects created in a same second
// but our fetch didn't return all of them because of the limit of 100.
result.push({ gte: toIso(segGte), lt: toIso(newEnd + 1), cursor: range.cursor })
result.push({ gte: toIso(segGte), lt: toIso(newEnd + 1), cursor: null })
} else {
result.push({ gte: toIso(segGte), lt: toIso(segLt), cursor: null })
}
Expand Down Expand Up @@ -215,7 +216,7 @@ export async function* streamingSubdivide<T>(opts: {
while (launchNext()) {}

yield {
range,
range: { ...range },
data,
hasMore,
exhausted: !hasMore,
Expand Down
14 changes: 7 additions & 7 deletions packages/source-stripe/src/src-list-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ async function fetchPageForRange(opts: {
const data: Record<string, unknown>[] = []
for (const item of response.data) {
const record = item as Record<string, unknown>
const created = record.created
// Invoice item is the only object that uses .date as created timestamp
const created = record.created ?? record.date
if (typeof created === 'number') lastObserved = created
data.push({
...record,
Expand Down Expand Up @@ -530,8 +531,7 @@ async function* iterateStream(opts: {
supportsLimit,
supportsForwardPagination,
}),
//concurrency: 100, // rate limiter is the real bottleneck
concurrency: 1, // serialized for reliability; parallelism re-enabled if data gaps are due to parallelism
concurrency: 100, // rate limiter is the real bottleneck
subdivisionFactor,
})

Expand Down Expand Up @@ -565,11 +565,11 @@ async function* iterateStream(opts: {
} else if (event.hasMore && event.data.length > 0) {
// Range was subdivided — the fetched head (from oldest record to range.lt)
// is already accounted for. Emit range_complete so the progress bar fills.
const oldest = event.data.findLast((r) => typeof r.created === 'number') as
| { created: number }
| undefined
const oldest = event.data.findLast(
(r) => typeof r.created === 'number' || typeof r.date === 'number'
) as { created?: number; date?: number } | undefined
if (oldest) {
const headGte = toIso(oldest.created + 1)
const headGte = toIso((oldest.created ?? oldest.date)! + 1)
if (headGte < event.range.lt) {
yield msg.stream_status({
stream: streamName,
Expand Down
Loading