Conversation
There was a problem hiding this comment.
Pull request overview
Refactors the Kafka pipeline task to avoid shared mutable state across concurrent task_concurrency workers by moving retry/cancellation-related variables to per-run scope, and updates a test pipeline to exercise Kafka group consumption with concurrency.
Changes:
- Remove
ctxand retry counters from thekafkastruct; make retry counters local to eachread()invocation. - Update Kafka
read/writehelpers to accept acontext.Contextparameter rather than relying on struct state. - Update
test/pipelines/kafka_group_read.yamlto run the Kafka task withtask_concurrency: 3(plus minor whitespace cleanups elsewhere).
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
internal/pkg/pipeline/task/kafka/kafka.go |
Removes shared mutable state from the Kafka task and makes retry tracking per worker run. |
test/pipelines/kafka_group_read.yaml |
Enables concurrent execution for the Kafka group-read test pipeline. |
internal/pkg/pipeline/task/converter/sst.go |
Removes trailing whitespace. |
internal/pkg/pipeline/task/compress/compress.go |
Removes an extra blank line. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional) | ||
| BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch | ||
| RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors | ||
| ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available |
There was a problem hiding this comment.
Any reason we are removing this?
There was a problem hiding this comment.
The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks
There was a problem hiding this comment.
The reason I still think this config should be present is:
- The kafka task is going to be replacement for sqs task, and sqs task has this behaviour currently. So for continuing the same behaviour and ease in migration from sqs to kafka.
- This config allows early exit of tasks in case we are not certain about number of messages present, e.g.:
- This will allow hourly dags such as pdp/keepa to exit early, instead of waiting for a specified timeout.
- Having a fixed timeout can cause early/delayed exits, consuming unnecessary processing time/latency.
I am not sure whether we can tackle above scenarios using timeout + retries
There was a problem hiding this comment.
The ExitOnEmpty behaviour essentially was using timeout+retries behind the scenes.
So, the actual functionality is still the same but without the verbose "exit on empty" option.
There was a problem hiding this comment.
The latency is still there even if we have this option in or out.
There was a problem hiding this comment.
Got it, thanks for clarification.
I understand that behaviour can be emulated using above 2 params. Just that there are still going to be scenarios where I want to exit as soon as there are few messages left, or even no messages.
So it would be difficult to keep early exit condition inline with retries + timeout, everytime.
I think a better approach would have been finding available messages for current partition+consumer group, but that would complicate the implementation, and I am not sure if that is even possible.
Description
Handles the concurrent task runs by removing variables from state that are shared between goroutines.
Changes
This pull request mainly refactors the Kafka pipeline task to improve context handling and retry logic, making the retry counters local to each run instead of being stored on the struct. It also removes unnecessary struct fields and updates method signatures for clarity and correctness. Additionally, a test pipeline is updated to set task concurrency.
Kafka Task Refactoring:
ctx,readErrorRetries, andemptyReadRetriesfields from thekafkastruct, and moved retry counters to be local variables within thereadmethod. This makes the retry logic per-run and avoids shared state issues. [1] [2]writeandreadto accept acontext.Contextparameter (runCtx) instead of using a struct field, improving context propagation and cancellation handling. [1] [2] [3] [4]handleReadErrorto accept pointers to the local retry counters, and updated its usage accordingly. [1] [2]Test Pipeline Update:
task_concurrency: 3to the Kafka task intest/pipelines/kafka_group_read.yamlto enable concurrent task execution in tests.Minor Code Cleanup:
Types of changes
Tests
Checklist