diff --git a/Cargo.lock b/Cargo.lock index 1ac2b5006..a26e62c15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1271,7 +1271,9 @@ dependencies = [ "prost-reflect", "prost-types 0.14.3", "rand", + "rdkafka", "smallvec 2.0.0-alpha.11", + "sonic-rs", "tokio", "unchecked-index", ] @@ -2470,6 +2472,18 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "libz-sys" +version = "1.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -2755,6 +2769,28 @@ dependencies = [ "libm", ] +[[package]] +name = "num_enum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "object" version = "0.36.7" @@ -3378,6 +3414,36 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.10.0+2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "recursive" version = "0.1.1" @@ -4338,6 +4404,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index a59cd1d5c..830d39f8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -187,6 +187,7 @@ tonic-build = "0.13.1" transpose = "0.2.3" unchecked-index = "0.2.2" zstd = "0.13.3" +rdkafka = { version = "0.36.0", features = ["tokio"] } [patch.crates-io] # datafusion: branch=v49.0.0-blaze diff --git a/native-engine/auron-planner/proto/auron.proto b/native-engine/auron-planner/proto/auron.proto index 8a00d7406..49ecd2d2d 100644 --- a/native-engine/auron-planner/proto/auron.proto +++ b/native-engine/auron-planner/proto/auron.proto @@ -51,6 +51,7 @@ message PhysicalPlanNode { GenerateExecNode generate = 23; ParquetSinkExecNode parquet_sink = 24; OrcScanExecNode orc_scan = 25; + KafkaScanExecNode kafka_scan = 26; } } @@ -744,6 +745,29 @@ message ExpandProjection { repeated PhysicalExprNode expr = 1; } + +message KafkaScanExecNode { + string kafka_topic = 1; + string kafka_properties_json = 2; + Schema schema = 3; + int32 batch_size = 4; + KafkaStartupMode startup_mode = 5; + string auron_operator_id = 6; + KafkaFormat data_format = 7; + string format_config_json = 8; +} + +enum KafkaFormat { + JSON = 0; + PROTOBUF = 1; +} + +enum KafkaStartupMode { + GROUP_OFFSET = 0; + EARLIEST = 1; + LATEST = 2; + TIMESTAMP = 3; +} /////////////////////////////////////////////////////////////////////////////////////////////////// // Task related /////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/native-engine/auron-planner/src/planner.rs b/native-engine/auron-planner/src/planner.rs index 2a34e7cf9..a058c67b0 100644 --- a/native-engine/auron-planner/src/planner.rs +++ b/native-engine/auron-planner/src/planner.rs @@ -72,6 +72,7 @@ use datafusion_ext_plans::{ expand_exec::ExpandExec, ffi_reader_exec::FFIReaderExec, filter_exec::FilterExec, + flink::kafka_scan_exec::KafkaScanExec, generate::{create_generator, create_udtf_generator}, generate_exec::GenerateExec, ipc_reader_exec::IpcReaderExec, @@ -801,6 +802,19 @@ impl PhysicalPlanner { props, ))) } + PhysicalPlanType::KafkaScan(kafka_scan) => { + let schema = Arc::new(convert_required!(kafka_scan.schema)?); + Ok(Arc::new(KafkaScanExec::new( + kafka_scan.kafka_topic.clone(), + kafka_scan.kafka_properties_json.clone(), + schema, + kafka_scan.batch_size as i32, + kafka_scan.startup_mode, + kafka_scan.auron_operator_id.clone(), + kafka_scan.data_format, + kafka_scan.format_config_json.clone(), + ))) + } } } diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml index 88ca18d7f..6861eff3b 100644 --- a/native-engine/datafusion-ext-plans/Cargo.toml +++ b/native-engine/datafusion-ext-plans/Cargo.toml @@ -66,6 +66,8 @@ unchecked-index = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } prost-reflect = { workspace = true } +rdkafka = { workspace = true } +sonic-rs = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] procfs = { workspace = true } diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs new file mode 100644 index 000000000..0840b3d71 --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs @@ -0,0 +1,487 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc}; + +use arrow::array::{ + ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder, Int64Array, Int64Builder, + RecordBatch, +}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use auron_jni_bridge::{jni_call_static, jni_get_string, jni_new_string}; +use datafusion::{ + common::{DataFusionError, Statistics}, + error::Result, + execution::TaskContext, + logical_expr::UserDefinedLogicalNode, + physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + }, +}; +use futures::StreamExt; +use once_cell::sync::OnceCell; +use rdkafka::{ + ClientConfig, ClientContext, Offset, TopicPartitionList, + config::RDKafkaLogLevel, + consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}, + error::KafkaResult, +}; +use sonic_rs::{JsonContainerTrait, JsonValueTrait}; + +use crate::{ + common::{column_pruning::ExecuteWithColumnPruning, execution_context::ExecutionContext}, + flink::serde::{flink_deserializer::FlinkDeserializer, pb_deserializer::PbDeserializer}, + rdkafka::Message, +}; + +struct CustomContext; + +impl ClientContext for CustomContext {} + +impl ConsumerContext for CustomContext { + fn pre_rebalance(&self, rebalance: &Rebalance) { + log::info!("Kafka Pre re-balance {rebalance:?}"); + } + + fn post_rebalance(&self, rebalance: &Rebalance) { + log::info!("Kafka Post re-balance {rebalance:?}"); + } + + fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { + log::info!("Kafka Committing offsets: {result:?}"); + } +} + +#[derive(Debug, Clone)] +pub struct KafkaScanExec { + kafka_topic: String, + kafka_properties_json: String, + schema: SchemaRef, + batch_size: i32, + startup_mode: i32, + auron_operator_id: String, + data_format: i32, + format_config_json: String, + metrics: ExecutionPlanMetricsSet, + props: OnceCell, +} + +impl KafkaScanExec { + pub fn new( + kafka_topic: String, + kafka_properties_json: String, + schema: SchemaRef, + batch_size: i32, + startup_mode: i32, + auron_operator_id: String, + data_format: i32, + format_config_json: String, + ) -> Self { + Self { + kafka_topic, + kafka_properties_json, + schema, + batch_size, + startup_mode, + auron_operator_id, + data_format, + format_config_json, + metrics: ExecutionPlanMetricsSet::new(), + props: OnceCell::new(), + } + } + + fn execute_with_ctx( + &self, + exec_ctx: Arc, + ) -> Result { + let serialized_pb_stream = read_serialized_records_from_kafka( + exec_ctx.clone(), + self.kafka_topic.clone(), + self.kafka_properties_json.clone(), + self.batch_size as usize, + self.startup_mode, + self.auron_operator_id.clone(), + self.data_format, + self.format_config_json.clone(), + )?; + + let deserialized_pb_stream = parse_records( + exec_ctx.output_schema(), + exec_ctx.clone(), + serialized_pb_stream, + self.format_config_json.clone(), + )?; + Ok(deserialized_pb_stream) + } +} + +impl DisplayAs for KafkaScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "KafkaScanExec") + } +} + +impl ExecutionPlan for KafkaScanExec { + fn name(&self) -> &str { + "KafkaScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + self.props.get_or_init(|| { + PlanProperties::new( + EquivalenceProperties::new(self.schema()), + UnknownPartitioning(1), + EmissionType::Both, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) + }) + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + self.kafka_topic.clone(), + self.kafka_properties_json.clone(), + self.schema.clone(), + self.batch_size, + self.startup_mode, + self.auron_operator_id.clone(), + self.data_format, + self.format_config_json.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + self.execute_with_ctx(exec_ctx) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + todo!() + } +} + +impl ExecuteWithColumnPruning for KafkaScanExec { + fn execute_projected( + &self, + partition: usize, + context: Arc, + projection: &[usize], + ) -> Result { + let projected_schema = Arc::new(self.schema().project(projection)?); + let exec_ctx = + ExecutionContext::new(context, partition, projected_schema.clone(), &self.metrics); + self.execute_with_ctx(exec_ctx) + } +} + +// A type alias with your custom consumer can be created for convenience. +type LoggingConsumer = StreamConsumer; + +fn read_serialized_records_from_kafka( + exec_ctx: Arc, + kafka_topic: String, + kafka_properties_json: String, + batch_size: usize, + startup_mode: i32, + auron_operator_id: String, + data_format: i32, + format_config_json: String, +) -> Result { + let context = CustomContext; + // get source json string from jni bridge resource + let resource_id = jni_new_string!(&auron_operator_id)?; + let kafka_task_json_java = + jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?; + let kafka_task_json = jni_get_string!(kafka_task_json_java.as_obj().into()) + .expect("kafka_task_json_java is not valid java string"); + let task_json = sonic_rs::from_str::(&kafka_task_json) + .expect("source_json_str is not valid json"); + let num_readers = task_json + .get("num_readers") + .as_i64() + .expect("num_readers is not valid json") as i32; + let subtask_index = task_json + .get("subtask_index") + .as_i64() + .expect("subtask_index is not valid json") as i32; + let kafka_properties = sonic_rs::from_str::(&kafka_properties_json) + .expect("kafka_properties_json is not valid json"); + let mut config = ClientConfig::new(); + config.set_log_level(RDKafkaLogLevel::Info); + kafka_properties + .into_object() + .expect("kafka_properties is not valid json") + .iter_mut() + .for_each(|(key, value)| { + config.set( + key, + value + .as_str() + .expect("kafka property value is not valid json string"), + ); + }); + + let consumer: Arc = Arc::new( + config + .create_with_context(context) + .expect("Kafka Consumer creation failed"), + ); + let metadata = consumer + .fetch_metadata(Some(&kafka_topic), Some(std::time::Duration::from_secs(5))) + .expect("Failed to fetch kafka metadata"); + + // get topic metadata + let topic_metadata = metadata + .topics() + .iter() + .find(|t| t.name() == kafka_topic) + .expect("Topic not found"); + + // get partition metadata + let partitions: Vec = topic_metadata + .partitions() + .iter() + .filter(|p| { + flink_kafka_partition_assign(kafka_topic.clone(), p.id(), num_readers) + .expect("flink_kafka_partition_assign failed") + == subtask_index + }) + .map(|p| p.id()) + .collect(); + + if partitions.is_empty() { + return Err(DataFusionError::Execution(format!( + "No partitions found for topic: {kafka_topic}" + ))); + } + + // GROUP_OFFSET = 0; + // EARLIEST = 1; + // LATEST = 2; + // TIMESTAMP = 3; + let offset = match startup_mode { + 0 => Offset::Stored, + 1 => Offset::Beginning, + 2 => Offset::End, + _ => { + return Err(DataFusionError::Execution(format!( + "Invalid startup mode: {startup_mode}" + ))); + } + }; + + log::info!("Subtask {subtask_index} consumed partitions {partitions:?}"); + let mut partition_list = TopicPartitionList::with_capacity(partitions.len()); + for partition in partitions.iter() { + partition_list.add_partition_offset(&kafka_topic, *partition, offset); + } + consumer + .assign(&partition_list) + .expect("Can't assign partitions to consumer"); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new("serialized_kafka_records_partition", DataType::Int32, false), + Field::new("serialized_kafka_records_offset", DataType::Int64, false), + Field::new("serialized_kafka_records_timestamp", DataType::Int64, false), + Field::new("serialized_pb_records", DataType::Binary, false), + ])); + + Ok(exec_ctx + .with_new_output_schema(output_schema.clone()) + .output_with_sender("KafkaScanExec.KafkaConsumer", move |sender| async move { + let mut serialized_kafka_records_partition_builder = Int32Builder::with_capacity(0); + let mut serialized_kafka_records_offset_builder = Int64Builder::with_capacity(0); + let mut serialized_kafka_records_timestamp_builder = Int64Builder::with_capacity(0); + let mut serialized_pb_records_builder = BinaryBuilder::with_capacity(batch_size, 0); + loop { + while serialized_pb_records_builder.len() < batch_size { + match consumer.recv().await { + Err(e) => log::warn!("Kafka error: {e}"), + Ok(msg) => { + if let Some(payload) = msg.payload() { + serialized_kafka_records_partition_builder + .append_value(msg.partition()); + serialized_kafka_records_offset_builder.append_value(msg.offset()); + serialized_kafka_records_timestamp_builder + .append_option(msg.timestamp().to_millis()); + serialized_pb_records_builder.append_value(payload); + } + } + } + } + let batch = RecordBatch::try_new( + output_schema.clone(), + vec![ + Arc::new(serialized_kafka_records_partition_builder.finish()), + Arc::new(serialized_kafka_records_offset_builder.finish()), + Arc::new(serialized_kafka_records_timestamp_builder.finish()), + Arc::new(serialized_pb_records_builder.finish()), + ], + )?; + sender.send(batch).await; + } + })) +} + +fn parse_records( + schema: SchemaRef, + exec_ctx: Arc, + mut input_stream: SendableRecordBatchStream, + parser_config_json: String, +) -> Result { + let parser_config = sonic_rs::from_str::(&parser_config_json) + .expect("parser_config_json is not valid json"); + let pb_desc_file = parser_config + .get("pb_desc_file") + .and_then(|v| v.as_str()) + .expect("pb_desc_file is not valid string") + .to_string(); + let root_message_name = parser_config + .get("root_message_name") + .and_then(|v| v.as_str()) + .expect("root_message_name is not valid string") + .to_string(); + let skip_fields = parser_config + .get("skip_fields") + .and_then(|v| v.as_str()) + .expect("skip_fields is not valid string") + .to_string(); + let nested_col_mapping_json = parser_config + .get("nested_col_mapping") + .expect("nested_col_mapping is not valid json"); + let mut nested_msg_mapping: HashMap = HashMap::new(); + if let Some(obj) = nested_col_mapping_json.as_object() { + for (key, value) in obj { + if let Some(val_str) = value.as_str() { + nested_msg_mapping.insert(key.to_string(), val_str.to_string()); + } + } + } + + let local_pb_desc_file = env::var("PWD").expect("PWD env var is not set") + "/" + &pb_desc_file; + log::info!("load desc from {local_pb_desc_file}"); + let file_descriptor_bytes = fs::read(local_pb_desc_file).expect("Failed to read file"); + let skip_fields_vec: Vec = skip_fields + .split(",") + .map(|s| s.to_string()) + .collect::>(); + Ok(exec_ctx.clone().output_with_sender( + "KafkaScanExec.ParseRecords", + move |sender| async move { + // TODO: json parser + let mut parser: Box = Box::new(PbDeserializer::new( + &file_descriptor_bytes, + &root_message_name, + schema.clone(), + &nested_msg_mapping, + &skip_fields_vec, + )?); + while let Some(batch) = input_stream.next().await.transpose()? { + let kafka_partition = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("input must be Int32Array"); + let kafka_offset = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("input must be Int64Array"); + let kafka_timestamp = batch + .column(2) + .as_any() + .downcast_ref::() + .expect("input must be Int64Array"); + let records = batch + .column(3) + .as_any() + .downcast_ref::() + .expect("input must be BinaryArray"); + let output_batch = parser.parse_messages_with_kafka_meta( + &records, + &kafka_partition, + &kafka_offset, + &kafka_timestamp, + )?; + sender.send(output_batch).await; + } + #[allow(unreachable_code)] + Ok(()) + }, + )) +} + +fn java_string_hashcode(s: &str) -> i32 { + let mut hash: i32 = 0; + for c in s.chars() { + let mut buf = [0; 2]; + let encoded = c.encode_utf16(&mut buf); + for code_unit in encoded.iter().cloned() { + hash = hash.wrapping_mul(31).wrapping_add(code_unit as i32); + } + } + hash +} + +fn flink_kafka_partition_assign(topic: String, partition_id: i32, num_readers: i32) -> Result { + if num_readers <= 0 { + return Err(DataFusionError::Execution(format!( + "num_readers must be positive: {num_readers}" + ))); + } + // Java hashcode + let hash_code = java_string_hashcode(&topic); + let start_index = (hash_code.wrapping_mul(31) & i32::MAX) % num_readers; + Ok((start_index + partition_id).rem_euclid(num_readers)) +} + +#[test] +fn test_flink_kafka_partition_assign() { + let topic = "flink_test_topic".to_string(); + let partition_id = 0; + let num_readers = 1000; + // the result same with flink + let result = flink_kafka_partition_assign(topic, partition_id, num_readers); + assert_eq!(result.expect("Error assigning partition"), 471); +} diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs b/native-engine/datafusion-ext-plans/src/flink/mod.rs index fb72d4058..359b29f5c 100644 --- a/native-engine/datafusion-ext-plans/src/flink/mod.rs +++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs @@ -13,4 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod kafka_scan_exec; pub mod serde; diff --git a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs index 6d245ff23..3b8bcfde1 100644 --- a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs +++ b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs @@ -17,7 +17,7 @@ use arrow::array::{BinaryArray, Int32Array, Int64Array, RecordBatch}; /// FlinkDeserializer is used to deserialize messages from kafka. /// Supports Protobuf, JSON, etc. -pub trait FlinkDeserializer { +pub trait FlinkDeserializer: Send { /// Parse messages from kafka, including kafka metadata such as partitions, /// offsets, and timestamps. fn parse_messages_with_kafka_meta( diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs index d114c27aa..c6339e146 100644 --- a/native-engine/datafusion-ext-plans/src/lib.rs +++ b/native-engine/datafusion-ext-plans/src/lib.rs @@ -28,6 +28,8 @@ extern crate datafusion; extern crate datafusion_ext_commons; extern crate prost; extern crate prost_reflect; +extern crate rdkafka; +extern crate sonic_rs; // execution plan implementations pub mod agg;