A high-throughput log pipeline built on Rust + C++ (librdkafka). Kafka messages are consumed at native speed via a C++ bridge, filtered in parallel by Rust workers, and written to Elasticsearch — with a built-in control plane for managing pipelines, workers, and runtime metrics.
In local benchmark runs with large messages tested separately by payload size, end-to-end Kafka consumer throughput exceeded 2 GB/s while filter matching remained correct.
Kafka ──► C++ librdkafka bridge (buffer pool)
│ FFI
▼
Rust ingress threads
│
┌──────┴──────┐
│ worker mode │ bounded queue → N parallel workers (memmem filter)
│ direct mode │ ingress thread filters directly (A/B baseline)
└──────┬───────┘
│ matched only
▼
Elasticsearch sink
(memory queue → spill to disk when full)
- C++ layer —
librdkafkaconsumer + fixed buffer pool; prints pool stats every second - Rust layer — ingress threads, parallel worker pool, ES bulk writer, spill/replay
- Control plane — REST API (
control_planebinary) backed by SQLite; manages pipeline configs, workers, and metrics - Sync worker — sidecar process that polls the control plane and manages the sync subprocess lifecycle
- Rust 1.70+
- CMake 3.15+
librdkafka(brew install librdkafkaon macOS)- Docker + Docker Compose (for local Kafka)
# Build C++ bridge library
cmake -S . -B build
cmake --build build -j
# Build Rust binaries
cargo build --release1. Start Kafka
docker compose up -d2. Run the pipeline
RUN_SECONDS=20 \
BRIDGE_LIB=build/libkafka_bridge.dylib \
KAFKA_BROKERS=127.0.0.1:29092 \
KAFKA_TOPIC=test \
KAFKA_GROUP_ID=rust-main-g1 \
POOL_SIZE=256 \
PIPELINE_MODE=worker \
RUST_WORKERS=8 \
MATCH_PATTERN=ERROR \
cargo run --release3. Write matched messages to Elasticsearch
ES_ENABLED=1 \
ES_URL=http://localhost:9200 \
ES_INDEX=logs-filtered \
ES_BULK_ACTIONS=5000 \
ES_BULK_BYTES=16777216 \
ES_BULK_FLUSH_MS=100 \
cargo run --release# Start control plane
mkdir -p data
CONTROL_LISTEN=0.0.0.0:8088 \
CONTROL_DB_PATH=data/control.db \
cargo run --bin control_plane
# Start sync worker
CONTROL_PLANE_URL=http://127.0.0.1:8088 \
WORKER_ID=worker-1 \
SYNC_BIN_PATH=target/release/ffi_pool_monitor \
cargo run --bin sync_worker# Health check
curl http://127.0.0.1:8088/health
# Create a pipeline
curl -X POST http://127.0.0.1:8088/api/pipelines \
-H 'Content-Type: application/json' \
-d '{
"name": "topic-test-to-es",
"kafka_brokers": "127.0.0.1:29092",
"kafka_topic": "test",
"kafka_group_id": "rust-main-g1",
"es_url": "http://localhost:9200",
"es_index": "app-json-generic",
"es_pipeline": "parse_message_json_generic",
"match_pattern": "ERROR",
"enabled": true
}'
# List / start / stop pipelines
curl http://127.0.0.1:8088/api/pipelines
curl -X POST http://127.0.0.1:8088/api/pipelines/1/start
curl -X POST http://127.0.0.1:8088/api/pipelines/1/stop
# Runtime metrics
curl http://127.0.0.1:8088/api/metrics/runtime
curl "http://127.0.0.1:8088/api/metrics/runtime/history?seconds=300"
curl "http://127.0.0.1:8088/api/metrics/es?pipeline_id=1"
# Worker management
curl http://127.0.0.1:8088/api/workers
curl -X PUT http://127.0.0.1:8088/api/workers/worker-1/assignment \
-H 'Content-Type: application/json' \
-d '{"pipeline_id": 1, "desired_state": "running"}'src/
main.rs # Entry point; routes by PRODUCT_LINE
query/mod.rs # Query pipeline (Kafka → filter → ES)
metrics/mod.rs # Metrics pipeline (stub)
bin/control_plane.rs # Control plane API server
bin/sync_worker.rs # Sync worker sidecar
control/db.rs # SQLite config store
control/es.rs # ES metrics collector
| Variable | Default | Description |
|---|---|---|
PIPELINE_MODE |
worker |
worker (parallel) or direct (single-thread baseline) |
RUST_WORKERS |
8 |
Number of parallel filter workers |
INGRESS_THREADS |
2 |
Kafka ingress thread count |
BATCH_PULL_SIZE |
128 |
Messages pulled per ingress batch |
TASK_QUEUE_CAP |
4096 |
Bounded queue capacity |
MATCH_PATTERN |
ERROR |
Byte pattern filter (memmem) |
RUN_SECONDS |
20 |
Run duration (0 = run forever) |
BRIDGE_LIB |
build/libkafka_bridge.dylib |
C++ bridge shared library path |
PRODUCT_LINE |
query |
Pipeline type (query or metrics) |
| Variable | Default | Description |
|---|---|---|
ES_ENABLED |
0 |
Set to 1 to enable ES writes |
ES_URL |
http://localhost:9200 |
Elasticsearch endpoint |
ES_INDEX |
logs-filtered |
Target index |
ES_PIPELINE |
(empty) | Ingest pipeline name |
ES_BULK_ACTIONS |
5000 |
Max docs per bulk request |
ES_BULK_BYTES |
16777216 |
Max bytes per bulk request (16 MiB) |
ES_BULK_FLUSH_MS |
100 |
Bulk flush interval (ms) |
ES_QUEUE_CAP |
200000 |
In-memory queue capacity |
ES_DROP_WHEN_FULL |
0 |
0 = backpressure, 1 = drop on full |
ES_SPILL_PATH |
data/es_spill.bin |
Spill file path |
ES_SPILL_MAX_BYTES |
4294967296 |
Max spill file size (4 GiB) |
ES_TIMEOUT_MS |
3000 |
ES request timeout (ms) |
| Variable | Default | Description |
|---|---|---|
KAFKA_BROKERS |
127.0.0.1:29092 |
Kafka broker list |
KAFKA_TOPIC |
test |
Topic to consume |
KAFKA_GROUP_ID |
rust-main-g1 |
Consumer group ID |
POOL_SIZE |
256 |
Fixed buffer pool size |
BUFFER_BYTES |
1048576 |
Per-buffer size (1 MiB) |
BATCH_BYTES |
262144 |
Target bytes per pull batch |
BATCH_MSGS |
256 |
Max messages per pull batch |
OVERFLOW_DROP_NEWEST |
0 |
0 = backpressure, 1 = drop newest |
| Variable | Default | Description |
|---|---|---|
CONTROL_LISTEN |
0.0.0.0:8088 |
Listen address |
CONTROL_DB_PATH |
data/control.db |
SQLite database path |
RUNTIME_METRICS_PATH |
data/runtime_metrics.json |
Metrics snapshot path |
WORKER_ID |
(required) | Unique worker ID |
WORKER_POLL_MS |
1000 |
Poll interval (ms) |
# Produce test data
NUM_RECORDS=150000 RECORD_SIZE=1024 THROUGHPUT=-1 \
TOPIC=test BROKER=localhost:29092 \
./scripts/produce_test_topic.sh
# C++ consumer baseline
KAFKA_TOPIC=test KAFKA_GROUP_ID=cpp_bench_1 \
BENCH_PROFILE=aggressive RUN_SECONDS=25 \
./build/bridge_consumer_bench
# Rust + C++ bridge in Docker
docker build -f Dockerfile.rustbridge -t rustbridge:latest .
docker run --rm --network kafka_default \
-e KAFKA_BROKERS=kafka:9092 \
-e KAFKA_TOPIC=test_500 \
-e PIPELINE_MODE=worker \
-e RUST_WORKERS=8 \
rustbridge:latestC++ bridge — per-second pool stats:
free=240 ready=4 in_flight=12 total=256 | kafka=85000 msg/s 82.3 MiB/s
Rust pipeline — per-second counters:
polled=85000 msg/s 82.3 MiB/s | matched=12 | drop_filter=84988
Key metrics to watch during ES backpressure:
es_spill_queue_bytes— should trend down during replayes_mem_queue_len— should not grow unboundedes_429_proxy_rejected_total— ES rejecting writes under load
If you see "filter rate = 0 but ES write rate is high", the pipeline is replaying from the spill file after a backpressure episode — this is expected behavior.
- Deploy consumer nodes in the same VPC/AZ as Kafka to avoid virtualization network overhead
- Set Kafka topic retention to at least 3 days
- Commit offsets only after successful processing (at-least-once delivery)
- Run one worker process per pipeline; use multiple processes for multi-topic setups
MIT
基于 Rust + C++(librdkafka) 构建的高吞吐日志过滤管道。C++ 层负责以原生速度消费 Kafka 消息,Rust worker 并行过滤后写入 Elasticsearch,并提供完整的控制面用于管理 pipeline、worker 和运行时指标。
在本地压测中,按不同消息体积分别测试时,Kafka 消费吞吐可稳定达到 2 GB/s 以上,同时过滤结果保持正确。
Kafka ──► C++ librdkafka 消费层(buffer pool)
│ FFI
▼
Rust ingress 线程
│
┌──────┴──────┐
│ worker 模式 │ 有界队列 → N 个并行 worker(memmem 过滤)
│ direct 模式 │ ingress 线程直接过滤(A/B 基准对比)
└──────┬───────┘
│ 仅匹配数据
▼
Elasticsearch 写入层
(内存队列 → 满时 spill 落盘)
- C++ 层 —
librdkafka消费者 + 固定 buffer pool,每秒输出池统计 - Rust 层 — ingress 线程、并行 worker 池、ES bulk 写入、spill/回放
- 控制面 — 基于 SQLite 的 REST API(
control_plane二进制),管理 pipeline 配置、worker 和指标 - Sync Worker — 轮询控制面并管理同步子进程生命周期的 sidecar 进程
- Rust 1.70+
- CMake 3.15+
librdkafka(macOS:brew install librdkafka)- Docker + Docker Compose(本地 Kafka)
# 构建 C++ bridge 动态库
cmake -S . -B build
cmake --build build -j
# 构建 Rust 二进制
cargo build --release1. 启动 Kafka
docker compose up -d2. 启动 pipeline
RUN_SECONDS=20 \
BRIDGE_LIB=build/libkafka_bridge.dylib \
KAFKA_BROKERS=127.0.0.1:29092 \
KAFKA_TOPIC=test \
KAFKA_GROUP_ID=rust-main-g1 \
POOL_SIZE=256 \
PIPELINE_MODE=worker \
RUST_WORKERS=8 \
MATCH_PATTERN=ERROR \
cargo run --release3. 开启 Elasticsearch 写入
ES_ENABLED=1 \
ES_URL=http://localhost:9200 \
ES_INDEX=logs-filtered \
ES_BULK_ACTIONS=5000 \
ES_BULK_BYTES=16777216 \
ES_BULK_FLUSH_MS=100 \
cargo run --release# 启动控制面
mkdir -p data
CONTROL_LISTEN=0.0.0.0:8088 \
CONTROL_DB_PATH=data/control.db \
cargo run --bin control_plane
# 启动 sync worker
CONTROL_PLANE_URL=http://127.0.0.1:8088 \
WORKER_ID=worker-1 \
SYNC_BIN_PATH=target/release/ffi_pool_monitor \
cargo run --bin sync_worker# 健康检查
curl http://127.0.0.1:8088/health
# 新增 pipeline 配置
curl -X POST http://127.0.0.1:8088/api/pipelines \
-H 'Content-Type: application/json' \
-d '{
"name": "topic-test-to-es",
"kafka_brokers": "127.0.0.1:29092",
"kafka_topic": "test",
"kafka_group_id": "rust-main-g1",
"es_url": "http://localhost:9200",
"es_index": "app-json-generic",
"es_pipeline": "parse_message_json_generic",
"match_pattern": "ERROR",
"enabled": true
}'
# 列表 / 启动 / 停止
curl http://127.0.0.1:8088/api/pipelines
curl -X POST http://127.0.0.1:8088/api/pipelines/1/start
curl -X POST http://127.0.0.1:8088/api/pipelines/1/stop
# 运行时指标
curl http://127.0.0.1:8088/api/metrics/runtime
curl "http://127.0.0.1:8088/api/metrics/runtime/history?seconds=300"
curl "http://127.0.0.1:8088/api/metrics/es?pipeline_id=1"
# Worker 管理
curl http://127.0.0.1:8088/api/workers
curl -X PUT http://127.0.0.1:8088/api/workers/worker-1/assignment \
-H 'Content-Type: application/json' \
-d '{"pipeline_id": 1, "desired_state": "running"}'src/
main.rs # 入口,按 PRODUCT_LINE 分流
query/mod.rs # 查询 pipeline(Kafka → 过滤 → ES)
metrics/mod.rs # 指标 pipeline(预留骨架)
bin/control_plane.rs # 控制面 API 服务
bin/sync_worker.rs # Sync worker sidecar
control/db.rs # SQLite 配置存储
control/es.rs # ES 指标采集
| 变量 | 默认值 | 说明 |
|---|---|---|
PIPELINE_MODE |
worker |
worker(并行)或 direct(单线程基准) |
RUST_WORKERS |
8 |
并行 worker 数 |
INGRESS_THREADS |
2 |
ingress 线程数 |
BATCH_PULL_SIZE |
128 |
每次拉取消息数 |
TASK_QUEUE_CAP |
4096 |
有界队列容量 |
MATCH_PATTERN |
ERROR |
过滤字节串(memmem) |
RUN_SECONDS |
20 |
运行时长(0 表示持续运行) |
BRIDGE_LIB |
build/libkafka_bridge.dylib |
C++ bridge 动态库路径 |
PRODUCT_LINE |
query |
主线类型(query 或 metrics) |
| 变量 | 默认值 | 说明 |
|---|---|---|
ES_ENABLED |
0 |
设为 1 开启 ES 写入 |
ES_URL |
http://localhost:9200 |
ES 地址 |
ES_INDEX |
logs-filtered |
目标索引 |
ES_PIPELINE |
(空) | Ingest pipeline 名称 |
ES_BULK_ACTIONS |
5000 |
每次 bulk 最大文档数 |
ES_BULK_BYTES |
16777216 |
每次 bulk 最大字节数(16 MiB) |
ES_BULK_FLUSH_MS |
100 |
bulk 刷新间隔(ms) |
ES_QUEUE_CAP |
200000 |
内存队列容量 |
ES_DROP_WHEN_FULL |
0 |
0 = 背压,1 = 满时丢弃 |
ES_SPILL_PATH |
data/es_spill.bin |
spill 文件路径 |
ES_SPILL_MAX_BYTES |
4294967296 |
spill 文件最大容量(4 GiB) |
ES_TIMEOUT_MS |
3000 |
ES 请求超时(ms) |
| 变量 | 默认值 | 说明 |
|---|---|---|
KAFKA_BROKERS |
127.0.0.1:29092 |
Kafka broker 地址 |
KAFKA_TOPIC |
test |
消费 topic |
KAFKA_GROUP_ID |
rust-main-g1 |
消费组 ID |
POOL_SIZE |
256 |
固定 buffer pool 大小 |
BUFFER_BYTES |
1048576 |
单 buffer 大小(1 MiB) |
BATCH_BYTES |
262144 |
每批目标字节数 |
BATCH_MSGS |
256 |
每批最大消息数 |
OVERFLOW_DROP_NEWEST |
0 |
0 = 背压,1 = 溢出丢弃新消息 |
| 变量 | 默认值 | 说明 |
|---|---|---|
CONTROL_LISTEN |
0.0.0.0:8088 |
监听地址 |
CONTROL_DB_PATH |
data/control.db |
SQLite 数据库路径 |
RUNTIME_METRICS_PATH |
data/runtime_metrics.json |
运行时指标快照路径 |
WORKER_ID |
(必填) | 唯一 worker ID |
WORKER_POLL_MS |
1000 |
轮询间隔(ms) |
# 生产测试数据
NUM_RECORDS=150000 RECORD_SIZE=1024 THROUGHPUT=-1 \
TOPIC=test BROKER=localhost:29092 \
./scripts/produce_test_topic.sh
# 纯 C++ 消费基准
KAFKA_TOPIC=test KAFKA_GROUP_ID=cpp_bench_1 \
BENCH_PROFILE=aggressive RUN_SECONDS=25 \
./build/bridge_consumer_bench
# Docker 容器内运行 Rust + C++ bridge
docker build -f Dockerfile.rustbridge -t rustbridge:latest .
docker run --rm --network kafka_default \
-e KAFKA_BROKERS=kafka:9092 \
-e KAFKA_TOPIC=test_500 \
-e PIPELINE_MODE=worker \
-e RUST_WORKERS=8 \
rustbridge:latestC++ bridge 每秒输出:
free=240 ready=4 in_flight=12 total=256 | kafka=85000 msg/s 82.3 MiB/s
Rust pipeline 每秒输出:
polled=85000 msg/s 82.3 MiB/s | matched=12 | drop_filter=84988
ES 背压时重点关注:
es_spill_queue_bytes— 回放期间应持续下降es_mem_queue_len— 不应持续增长es_429_proxy_rejected_total— ES 在高负载下拒绝写入
如果出现"过滤速率为 0,但 ES 写入速率很高",说明 pipeline 正在从 spill 文件回放历史积压数据,属于正常现象。
- 将消费节点部署在与 Kafka 同 VPC / 同 AZ 的内网节点,避免跨虚拟化网络边界带来的吞吐损耗
- Kafka topic retention 建议至少设置 3 天
- 仅在处理成功后提交 offset,保证至少一次语义
- 遵循 1 worker = 1 pipeline 原则,同机多 topic 使用多进程
MIT