Skip to content

DATA-7369: Make join operation thread safe#45

Closed
prasadlohakpure wants to merge 1 commit intomainfrom
DATA-7369
Closed

DATA-7369: Make join operation thread safe#45
prasadlohakpure wants to merge 1 commit intomainfrom
DATA-7369

Conversation

@prasadlohakpure
Copy link
Contributor

@prasadlohakpure prasadlohakpure commented Feb 23, 2026

Description

Make join operation thread safe

Types of changes

Testing :
file: test/pipelines/join.yaml
With task_concurreny set to 2:

Before changes(is task_concurreny was implemented):
Non deterministic behaviour
e.g. Data written multiple times

2026-02-23 12:28:29 - echo - Abigail|Abner|Abraham ◆ Abram|Ace|Ada ◆ Adalee|Adaline|Adalyn ◆ Adalynn|Adam|Adan ◆ Addilyn|Addison|Adelaide
2026-02-23 12:28:29 - echo - Adele|Adelina|Adeline ◆ Adelyn|Adelynn|Aden ◆ Adhara|Adler|Adley ◆ Adonis|Adrian|Adriana ◆ Aadhya|Aaron|Abby ◆ Abdiel|Abdullah|Abel
2026-02-23 12:28:29 - echo - Adele|Adelina|Adeline ◆ Adelyn|Adelynn|Aden ◆ Adhara|Adler|Adley ◆ Adonis|Adrian|Adriana ◆ Aadhya|Aaron|Abby ◆ Abdiel|Abdullah|Abel ◆ Aadhya|Aaron|Abby|Aaliyah
2026-02-23 12:28:29 - echo - Adele|Adelina|Adeline ◆ Adelyn|Adelynn|Aden ◆ Adhara|Adler|Adley ◆ Adonis|Adrian|Adriana ◆ Aadhya|Aaron|Abby

( Adele appearing multiple times)

After changes:
No loss in data, all records from names.txt present in output have just one occurence

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

Copilot AI review requested due to automatic review settings February 23, 2026 07:05
@prasadlohakpure prasadlohakpure requested a review from a team as a code owner February 23, 2026 07:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to make the join pipeline task safe to run with task_concurrency > 1 by removing shared mutable buffering state from the task struct and keeping buffering local to each Run invocation.

Changes:

  • Remove the buffer field from join and allocate a per-Run local buffer slice.
  • Update flushing/joining helpers to operate on the per-Run buffer rather than shared struct state.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 58 to 66
for {
select {
default:
// Try to get a record from input
r, ok := j.GetRecord(input)
if !ok {
// Input channel closed, send any remaining records
j.flushBuffer(output)
j.flushBuffer(&buffer, output)
return nil
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The select { default: ...; case <-tickerCh: ... } pattern prevents reliable duration-based flushing: the default branch is always ready and then blocks on GetRecord(input), so the ticker case can't run while waiting for input (and may be starved even when ticks are ready). Consider removing default and using a case r, ok := <-input: branch alongside case <-tickerCh: so the loop can wake up on either input or the timer.

Copilot uses AI. Check for mistakes.
Comment on lines 47 to 51
totalSize := 0
var ticker *time.Ticker
var tickerCh <-chan time.Time
buffer := make([]*record.Record, 0, defaultBufferSize)

Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer := make([]*record.Record, 0, defaultBufferSize) preallocates space for 1,000,000 pointers every time Run starts. With task_concurrency > 1, this scales memory linearly per worker (e.g., ~8MB per worker on 64-bit) even for small joins. Consider sizing the initial capacity based on j.Number (when set) or using a much smaller default and letting append grow as needed.

Copilot uses AI. Check for mistakes.
j.buffer = j.buffer[:0]
func (j *join) flushBuffer(buffer *[]*record.Record, output chan<- *record.Record) {
if len(*buffer) > 0 {
j.sendJoinedRecords(*buffer, output)
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*buffer = (*buffer)[:0] resets the slice length but leaves the underlying array entries pointing at the previous *record.Record values, which can retain a large amount of memory until those slots are overwritten. Consider clearing the used portion (e.g., clear((*buffer)[:len(*buffer)])) before reslicing to allow GC to reclaim flushed records promptly.

Suggested change
j.sendJoinedRecords(*buffer, output)
j.sendJoinedRecords(*buffer, output)
// Clear the used portion so flushed records can be garbage-collected.
clear((*buffer)[:len(*buffer)])

Copilot uses AI. Check for mistakes.
@prasadlohakpure prasadlohakpure deleted the DATA-7369 branch February 23, 2026 07:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants