Skip to content

Fix OOM in recursive diff by limiting goroutine concurrency#81

Merged
mason-sharp merged 1 commit intomainfrom
fix/ACE-169-fix-oom
Feb 23, 2026
Merged

Fix OOM in recursive diff by limiting goroutine concurrency#81
mason-sharp merged 1 commit intomainfrom
fix/ACE-169-fix-oom

Conversation

@mason-sharp
Copy link
Member

The recursive diff phase had no concurrency control — every mismatched range spawned a new goroutine, and each recursion level spawned more. With large numbers of mismatches, this caused exponential goroutine fan-out, each fetching rows from Postgres into memory simultaneously, leading to OOM.

Add a channel-based semaphore (diffSem) that limits concurrent recursive diff goroutines to maxConcurrent (NumCPU * ConcurrencyFactor), the same limit used by the initial hash phase. The semaphore is acquired inside the goroutine body to avoid parent-child deadlock.

Measured on 100k mismatched rows (4 CPUs):

  • Before: 2+ GB heap, 5 GB system memory, ~24M heap objects
  • After: 372 MB heap, 1 GB system memory, 5 goroutines

@coderabbitai
Copy link

coderabbitai bot commented Feb 23, 2026

📝 Walkthrough

Walkthrough

Added a buffered semaphore (diffSem) to TableDiffTask to cap concurrent recursive diff goroutines, applied in ExecuteTask and recursiveDiff with cancellation-aware releases. Added an integration test that runs a 100k-row table diff, measures heap before/after, and asserts memory stays below 512 MB and diffs are as expected.

Changes

Cohort / File(s) Summary
Concurrency Control
internal/consistency/diff/table_diff.go
Added diffSem chan struct{} to TableDiffTask; initialize buffered semaphore in ExecuteTask; wrap top-level and recursive goroutine launches to acquire/release slots; add context-aware releases and inline comments explaining rate-limiting and OOM prevention.
Integration Testing
tests/integration/table_diff_memory_test.go
Added TestTableDiffMemoryUsage: builds 100k-row tables, truncates node2, runs GC-baselined table-diff with configured params, logs memory metrics, enforces a 512 MB heap ceiling, and asserts expected diff outcomes; includes setup/cleanup and assertions.

Poem

🐇 I dig through diffs with careful little paws,
Slots hold my hops so the fan-out withdraws.
I count every goroutine, release when they're done,
Heap stays below limits — a snug little run.
Cheers from the rabbit for controlled concurrency!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Fix OOM in recursive diff by limiting goroutine concurrency' directly describes the main change: addressing an out-of-memory issue by adding concurrency control to the recursive diff goroutines.
Description check ✅ Passed The description clearly explains the problem (exponential goroutine fan-out causing OOM), the solution (channel-based semaphore limiting concurrent goroutines), and includes concrete performance measurements demonstrating the fix's effectiveness.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/ACE-169-fix-oom

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/integration/table_diff_memory_test.go (2)

102-105: Consider adding runtime.GC() before the final ReadMemStats and renaming peakHeapInUse.

Two small issues with the memory measurement:

  1. No GC before final read: Without a runtime.GC() call before ReadMemStats(&memAfter), HeapInuse may include unreachable objects that inflate the number. Adding a GC pass before the final read makes the assertion more deterministic and less susceptible to CI flakiness.

  2. Misleading name: peakHeapInUse is actually "heap in use at measurement time," not the peak during the test. Go's MemStats doesn't track peak heap. A more accurate name would be heapInUseAfter.

Suggested change
 	// Capture memory after diff
+	runtime.GC()
 	var memAfter runtime.MemStats
 	runtime.ReadMemStats(&memAfter)
 
 	// Calculate memory usage
 	heapAllocDelta := int64(memAfter.TotalAlloc) - int64(memBefore.TotalAlloc)
-	peakHeapInUse := memAfter.HeapInuse
-	peakSys := memAfter.Sys
+	heapInUseAfter := memAfter.HeapInuse
+	sysAfter := memAfter.Sys

(Update references at lines 148-153 and 177-179 accordingly.)

Also applies to: 138-145

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_diff_memory_test.go` around lines 102 - 105, Add a
runtime.GC() call immediately before the final runtime.ReadMemStats(&memAfter)
to force a GC pass so HeapInuse reflects live objects only, and rename the
variable peakHeapInUse to heapInUseAfter (and update all references) because it
represents heap in use at measurement time rather than a true peak; update any
assertions or log messages that reference peakHeapInUse to use heapInUseAfter
and ensure you read memAfter via runtime.ReadMemStats after the added
runtime.GC() call.

120-129: Pre-initialized DiffResult is overwritten by ExecuteTask.

ExecuteTask() unconditionally replaces t.DiffResult (table_diff.go, line 1301), so this block has no effect. It can be removed to avoid confusion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/table_diff_memory_test.go` around lines 120 - 129, The test
pre-initializes tdTask.DiffResult but ExecuteTask() unconditionally overwrites
t.DiffResult (see ExecuteTask in table_diff.go), so remove the entire
tdTask.DiffResult initialization block from
tests/integration/table_diff_memory_test.go to avoid dead/redundant setup; if
the test needs specific DiffResult values instead, set them after calling
ExecuteTask or modify ExecuteTask to accept initial state, otherwise simply
delete the block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/integration/table_diff_memory_test.go`:
- Around line 102-105: Add a runtime.GC() call immediately before the final
runtime.ReadMemStats(&memAfter) to force a GC pass so HeapInuse reflects live
objects only, and rename the variable peakHeapInUse to heapInUseAfter (and
update all references) because it represents heap in use at measurement time
rather than a true peak; update any assertions or log messages that reference
peakHeapInUse to use heapInUseAfter and ensure you read memAfter via
runtime.ReadMemStats after the added runtime.GC() call.
- Around line 120-129: The test pre-initializes tdTask.DiffResult but
ExecuteTask() unconditionally overwrites t.DiffResult (see ExecuteTask in
table_diff.go), so remove the entire tdTask.DiffResult initialization block from
tests/integration/table_diff_memory_test.go to avoid dead/redundant setup; if
the test needs specific DiffResult values instead, set them after calling
ExecuteTask or modify ExecuteTask to accept initial state, otherwise simply
delete the block.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f8374a4 and 5086517.

📒 Files selected for processing (2)
  • internal/consistency/diff/table_diff.go
  • tests/integration/table_diff_memory_test.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1558-1566: The goroutine spawn uses an unconditional send to
t.diffSem which can block forever if ctx is cancelled, leaving diffWg.Wait()
hung; change the semaphore acquisition in the anonymous goroutine that calls
t.recursiveDiff (and the analogous recursive spawn site) to a context-aware
select: attempt to send on t.diffSem in one case and return (ensuring the
associated diffWg.Done is invoked) if ctx.Done() fires in the other case, so the
goroutine doesn't block on semaphore acquisition when cancelled and ensures the
wait group is decremented or the caller returns promptly.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5086517 and a43c7c0.

📒 Files selected for processing (2)
  • internal/consistency/diff/table_diff.go
  • tests/integration/table_diff_memory_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/integration/table_diff_memory_test.go

The recursive diff phase had no concurrency control — every mismatched
range spawned a new goroutine, and each recursion level spawned more.
With large numbers of mismatches, this caused exponential goroutine
fan-out, each fetching rows from Postgres into memory simultaneously,
leading to OOM.

Add a channel-based semaphore (diffSem) that limits concurrent
recursive diff goroutines to maxConcurrent (NumCPU * ConcurrencyFactor),
the same limit used by the initial hash phase. The semaphore is acquired
inside the goroutine body to avoid parent-child deadlock.

Measured on 100k mismatched rows (4 CPUs):
- Before: 2+ GB heap, 5 GB system memory, ~24M heap objects
- After:  372 MB heap, 1 GB system memory, 5 goroutines

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)

1552-1577: Optional: bound goroutine creation count, not just the running count.

The current loop spawns all len(mismatchedTasks) goroutines upfront. At most maxConcurrent run; the rest block on t.diffSem. Blocked goroutines don't hold DB connections or row data, so memory is much better than before. However, for very large mismatch counts (e.g., tens of thousands of initial mismatched ranges), goroutine stacks alone (~2–8 KB each) can add up.

The initial hash phase (Lines 1458–1485) already uses the canonical bounded worker-pool pattern — a fixed pool of maxConcurrent goroutines draining a channel. Applying the same pattern here would cap the number of created diff goroutines to maxConcurrent, not just the number running:

♻️ Sketch of worker-pool approach
-   for _, task := range mismatchedTasks {
-       if t.shouldStopDueToLimit() {
-           diffBar.Increment()
-           continue
-       }
-       diffWg.Add(1)
-       go func(task RecursiveDiffTask) {
-           select {
-           case t.diffSem <- struct{}{}:
-               defer func() { <-t.diffSem }()
-           case <-ctx.Done():
-               diffWg.Done()
-               diffBar.Increment()
-               return
-           }
-           defer diffBar.Increment()
-           t.recursiveDiff(ctx, task, &diffWg)
-       }(task)
-   }
+   diffTaskQueue := make(chan RecursiveDiffTask, len(mismatchedTasks))
+   for i := 0; i < maxConcurrent; i++ {
+       diffWg.Add(1)
+       go func() {
+           defer diffWg.Done()
+           for task := range diffTaskQueue {
+               if t.shouldStopDueToLimit() {
+                   diffBar.Increment()
+                   continue
+               }
+               // Each sub-task is a unit of work; recursiveDiff spawns
+               // its own children via t.diffSem as before.
+               innerWg := &sync.WaitGroup{}
+               innerWg.Add(1)
+               t.recursiveDiff(ctx, task, innerWg)
+               innerWg.Wait()
+               diffBar.Increment()
+           }
+       }()
+   }
+   for _, task := range mismatchedTasks {
+       diffTaskQueue <- task
+   }
+   close(diffTaskQueue)

Note: this sketch omits the recursive sub-range goroutines spawned inside recursiveDiff; those would still use t.diffSem as-is.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/diff/table_diff.go` around lines 1552 - 1577, The loop
currently starts a goroutine per entry in mismatchedTasks which can create
thousands of goroutines; instead implement a bounded worker-pool: create a tasks
channel, start maxConcurrent worker goroutines (using the same t.diffSem or
without it) that range over the channel and for each task call
t.recursiveDiff(ctx, task, &diffWg) while managing diffWg and diffBar (increment
diffBar and call diffWg.Add/Done appropriately inside workers), close the
channel after enqueuing mismatchedTasks, and wait for all workers to finish;
reference mismatchedTasks, recursiveDiff, t.diffSem, diffWg, and diffBar when
making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1552-1577: The loop currently starts a goroutine per entry in
mismatchedTasks which can create thousands of goroutines; instead implement a
bounded worker-pool: create a tasks channel, start maxConcurrent worker
goroutines (using the same t.diffSem or without it) that range over the channel
and for each task call t.recursiveDiff(ctx, task, &diffWg) while managing diffWg
and diffBar (increment diffBar and call diffWg.Add/Done appropriately inside
workers), close the channel after enqueuing mismatchedTasks, and wait for all
workers to finish; reference mismatchedTasks, recursiveDiff, t.diffSem, diffWg,
and diffBar when making the change.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a43c7c0 and 76f9598.

📒 Files selected for processing (2)
  • internal/consistency/diff/table_diff.go
  • tests/integration/table_diff_memory_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/integration/table_diff_memory_test.go

@mason-sharp mason-sharp merged commit 6997687 into main Feb 23, 2026
3 checks passed
@mason-sharp
Copy link
Member Author

Reviewed together with Ibrar online

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant