Skip to content

[Kafka] remove mutable state items#44

Merged
alephys26 merged 4 commits intomainfrom
alephys26/handle-concurrent-kafka
Feb 23, 2026
Merged

[Kafka] remove mutable state items#44
alephys26 merged 4 commits intomainfrom
alephys26/handle-concurrent-kafka

Conversation

@alephys26
Copy link
Contributor

Description

Handles the concurrent task runs by removing variables from state that are shared between goroutines.

Changes

This pull request mainly refactors the Kafka pipeline task to improve context handling and retry logic, making the retry counters local to each run instead of being stored on the struct. It also removes unnecessary struct fields and updates method signatures for clarity and correctness. Additionally, a test pipeline is updated to set task concurrency.

Kafka Task Refactoring:

  • Removed ctx, readErrorRetries, and emptyReadRetries fields from the kafka struct, and moved retry counters to be local variables within the read method. This makes the retry logic per-run and avoids shared state issues. [1] [2]
  • Updated method signatures for write and read to accept a context.Context parameter (runCtx) instead of using a struct field, improving context propagation and cancellation handling. [1] [2] [3] [4]
  • Refactored handleReadError to accept pointers to the local retry counters, and updated its usage accordingly. [1] [2]

Test Pipeline Update:

  • Added task_concurrency: 3 to the Kafka task in test/pipelines/kafka_group_read.yaml to enable concurrent task execution in tests.

Minor Code Cleanup:

  • Removed an unnecessary blank line in the compress task for consistency.

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Tests

  • Tested on local build

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

@alephys26 alephys26 requested a review from a team as a code owner February 18, 2026 19:02
Copilot AI review requested due to automatic review settings February 18, 2026 19:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the Kafka pipeline task to avoid shared mutable state across concurrent task_concurrency workers by moving retry/cancellation-related variables to per-run scope, and updates a test pipeline to exercise Kafka group consumption with concurrency.

Changes:

  • Remove ctx and retry counters from the kafka struct; make retry counters local to each read() invocation.
  • Update Kafka read/write helpers to accept a context.Context parameter rather than relying on struct state.
  • Update test/pipelines/kafka_group_read.yaml to run the Kafka task with task_concurrency: 3 (plus minor whitespace cleanups elsewhere).

Reviewed changes

Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.

File Description
internal/pkg/pipeline/task/kafka/kafka.go Removes shared mutable state from the Kafka task and makes retry tracking per worker run.
test/pipelines/kafka_group_read.yaml Enables concurrent execution for the Kafka group-read test pipeline.
internal/pkg/pipeline/task/converter/sst.go Removes trailing whitespace.
internal/pkg/pipeline/task/compress/compress.go Removes an extra blank line.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional)
BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch
RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors
ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we are removing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I still think this config should be present is:

  1. The kafka task is going to be replacement for sqs task, and sqs task has this behaviour currently. So for continuing the same behaviour and ease in migration from sqs to kafka.
  2. This config allows early exit of tasks in case we are not certain about number of messages present, e.g.:
    • This will allow hourly dags such as pdp/keepa to exit early, instead of waiting for a specified timeout.
    • Having a fixed timeout can cause early/delayed exits, consuming unnecessary processing time/latency.

I am not sure whether we can tackle above scenarios using timeout + retries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExitOnEmpty behaviour essentially was using timeout+retries behind the scenes.
So, the actual functionality is still the same but without the verbose "exit on empty" option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latency is still there even if we have this option in or out.

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for clarification.
I understand that behaviour can be emulated using above 2 params. Just that there are still going to be scenarios where I want to exit as soon as there are few messages left, or even no messages.

So it would be difficult to keep early exit condition inline with retries + timeout, everytime.

I think a better approach would have been finding available messages for current partition+consumer group, but that would complicate the implementation, and I am not sure if that is even possible.

@alephys26 alephys26 merged commit 6e54efd into main Feb 23, 2026
7 checks passed
@alephys26 alephys26 deleted the alephys26/handle-concurrent-kafka branch February 23, 2026 06:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants