From 111b0ff27eeff40106b6b6186077a052dd8dcd47 Mon Sep 17 00:00:00 2001 From: Darkheir Date: Mon, 30 Mar 2026 15:44:27 +0200 Subject: [PATCH] feat: Set client.id for Kafka source Signed-off-by: Darkheir --- quickwit/quickwit-indexing/src/source/kafka_source.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index 5f93d0a9344..f1aca45bb98 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -25,7 +25,7 @@ use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::KafkaSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{IndexUid, Position}; +use quickwit_proto::types::{IndexUid, NodeIdRef, Position}; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::{ BaseConsumer, CommitMode, Consumer, ConsumerContext, DefaultConsumerContext, Rebalance, @@ -240,6 +240,7 @@ impl KafkaSource { let (events_tx, events_rx) = mpsc::channel(100); let (truncate_tx, truncate_rx) = watch::channel(SourceCheckpoint::default()); let (client_config, consumer, group_id) = create_consumer( + source_runtime.node_id(), source_runtime.index_uid(), source_runtime.source_id(), source_params, @@ -654,6 +655,7 @@ pub(super) async fn check_connectivity(params: KafkaSourceParams) -> anyhow::Res /// Creates a new `KafkaSourceConsumer`. fn create_consumer( + node_id: &NodeIdRef, index_uid: &IndexUid, source_id: &str, params: KafkaSourceParams, @@ -676,6 +678,7 @@ fn create_consumer( params.enable_backfill_mode.to_string(), ) .set("group.id", &group_id) + .set("client.id", node_id.as_str()) .set_log_level(log_level) .create_with_context(RdKafkaContext { topic: params.topic,