Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_config::KafkaSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{IndexUid, Position};
use quickwit_proto::types::{IndexUid, NodeIdRef, Position};
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::{
BaseConsumer, CommitMode, Consumer, ConsumerContext, DefaultConsumerContext, Rebalance,
Expand Down Expand Up @@ -240,6 +240,7 @@ impl KafkaSource {
let (events_tx, events_rx) = mpsc::channel(100);
let (truncate_tx, truncate_rx) = watch::channel(SourceCheckpoint::default());
let (client_config, consumer, group_id) = create_consumer(
source_runtime.node_id(),
source_runtime.index_uid(),
source_runtime.source_id(),
source_params,
Expand Down Expand Up @@ -654,6 +655,7 @@ pub(super) async fn check_connectivity(params: KafkaSourceParams) -> anyhow::Res

/// Creates a new `KafkaSourceConsumer`.
fn create_consumer(
node_id: &NodeIdRef,
index_uid: &IndexUid,
source_id: &str,
params: KafkaSourceParams,
Expand All @@ -676,6 +678,7 @@ fn create_consumer(
params.enable_backfill_mode.to_string(),
)
.set("group.id", &group_id)
.set("client.id", node_id.as_str())
.set_log_level(log_level)
Comment on lines 680 to 682
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.id is being set unconditionally, which will override any client.id provided by users via params.client_params. That’s a behavioral change that can break existing Kafka quota/monitoring setups. Consider only setting a default client.id when it is not already present in client_params, and leaving the user-specified value intact when provided.

Copilot uses AI. Check for mistakes.
Comment on lines 680 to 682
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using only node_id for client.id is not sufficient to uniquely identify a consumer instance when multiple indexing pipelines/sources run on the same node (e.g., multiple pipelines per (index_uid, source_id) are allowed). This can still make it hard to associate partitions/metrics to a specific consumer. Consider incorporating additional identifiers (such as source_id and/or pipeline_uid/group_id) into the default client.id value so each consumer instance can be distinguished.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to be able to associate a partition to its indexer, not its pipeline

Comment on lines 680 to 682
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s no test covering the new defaulting behavior for client.id. Since create_consumer returns a ClientConfig, consider adding a unit test that asserts the produced native config includes the expected client.id (and that an explicit client.id in client_params is preserved if that is the intended behavior).

Copilot uses AI. Check for mistakes.
.create_with_context(RdKafkaContext {
topic: params.topic,
Expand Down
Loading