DATA-7369: Make join operation thread safe#45
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to make the join pipeline task safe to run with task_concurrency > 1 by removing shared mutable buffering state from the task struct and keeping buffering local to each Run invocation.
Changes:
- Remove the
bufferfield fromjoinand allocate a per-Runlocal buffer slice. - Update flushing/joining helpers to operate on the per-
Runbuffer rather than shared struct state.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for { | ||
| select { | ||
| default: | ||
| // Try to get a record from input | ||
| r, ok := j.GetRecord(input) | ||
| if !ok { | ||
| // Input channel closed, send any remaining records | ||
| j.flushBuffer(output) | ||
| j.flushBuffer(&buffer, output) | ||
| return nil |
There was a problem hiding this comment.
The select { default: ...; case <-tickerCh: ... } pattern prevents reliable duration-based flushing: the default branch is always ready and then blocks on GetRecord(input), so the ticker case can't run while waiting for input (and may be starved even when ticks are ready). Consider removing default and using a case r, ok := <-input: branch alongside case <-tickerCh: so the loop can wake up on either input or the timer.
| totalSize := 0 | ||
| var ticker *time.Ticker | ||
| var tickerCh <-chan time.Time | ||
| buffer := make([]*record.Record, 0, defaultBufferSize) | ||
|
|
There was a problem hiding this comment.
buffer := make([]*record.Record, 0, defaultBufferSize) preallocates space for 1,000,000 pointers every time Run starts. With task_concurrency > 1, this scales memory linearly per worker (e.g., ~8MB per worker on 64-bit) even for small joins. Consider sizing the initial capacity based on j.Number (when set) or using a much smaller default and letting append grow as needed.
| j.buffer = j.buffer[:0] | ||
| func (j *join) flushBuffer(buffer *[]*record.Record, output chan<- *record.Record) { | ||
| if len(*buffer) > 0 { | ||
| j.sendJoinedRecords(*buffer, output) |
There was a problem hiding this comment.
*buffer = (*buffer)[:0] resets the slice length but leaves the underlying array entries pointing at the previous *record.Record values, which can retain a large amount of memory until those slots are overwritten. Consider clearing the used portion (e.g., clear((*buffer)[:len(*buffer)])) before reslicing to allow GC to reclaim flushed records promptly.
| j.sendJoinedRecords(*buffer, output) | |
| j.sendJoinedRecords(*buffer, output) | |
| // Clear the used portion so flushed records can be garbage-collected. | |
| clear((*buffer)[:len(*buffer)]) |
Description
Make join operation thread safe
Types of changes
Testing :
file: test/pipelines/join.yaml
With task_concurreny set to 2:
Before changes(is task_concurreny was implemented):
Non deterministic behaviour
e.g. Data written multiple times
(
Adeleappearing multiple times)After changes:
No loss in data, all records from names.txt present in output have just one occurence
Checklist