Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ tonic-build = "0.13.1"
transpose = "0.2.3"
unchecked-index = "0.2.2"
zstd = "0.13.3"
rdkafka = { version = "0.36.0", features = ["tokio"] }

[patch.crates-io]
# datafusion: branch=v49.0.0-blaze
Expand Down
24 changes: 24 additions & 0 deletions native-engine/auron-planner/proto/auron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message PhysicalPlanNode {
GenerateExecNode generate = 23;
ParquetSinkExecNode parquet_sink = 24;
OrcScanExecNode orc_scan = 25;
KafkaScanExecNode kafka_scan = 26;
}
}

Expand Down Expand Up @@ -744,6 +745,29 @@ message ExpandProjection {
repeated PhysicalExprNode expr = 1;

}

message KafkaScanExecNode {
string kafka_topic = 1;
string kafka_properties_json = 2;
Schema schema = 3;
int32 batch_size = 4;
KafkaStartupMode startup_mode = 5;
string auron_operator_id = 6;
KafkaFormat data_format = 7;
string format_config_json = 8;
}

enum KafkaFormat {
JSON = 0;
PROTOBUF = 1;
}

enum KafkaStartupMode {
GROUP_OFFSET = 0;
EARLIEST = 1;
LATEST = 2;
TIMESTAMP = 3;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Task related
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
14 changes: 14 additions & 0 deletions native-engine/auron-planner/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use datafusion_ext_plans::{
expand_exec::ExpandExec,
ffi_reader_exec::FFIReaderExec,
filter_exec::FilterExec,
flink::kafka_scan_exec::KafkaScanExec,
generate::{create_generator, create_udtf_generator},
generate_exec::GenerateExec,
ipc_reader_exec::IpcReaderExec,
Expand Down Expand Up @@ -801,6 +802,19 @@ impl PhysicalPlanner {
props,
)))
}
PhysicalPlanType::KafkaScan(kafka_scan) => {
let schema = Arc::new(convert_required!(kafka_scan.schema)?);
Ok(Arc::new(KafkaScanExec::new(
kafka_scan.kafka_topic.clone(),
kafka_scan.kafka_properties_json.clone(),
schema,
kafka_scan.batch_size as i32,
kafka_scan.startup_mode,
kafka_scan.auron_operator_id.clone(),
kafka_scan.data_format,
kafka_scan.format_config_json.clone(),
)))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions native-engine/datafusion-ext-plans/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ unchecked-index = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
prost-reflect = { workspace = true }
rdkafka = { workspace = true }
sonic-rs = { workspace = true }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { workspace = true }
Expand Down
Loading
Loading