Skip to content

lggy-carson/kill_fake_logs

Repository files navigation

kill_fake_logs

English | 中文


English

A high-throughput log pipeline built on Rust + C++ (librdkafka). Kafka messages are consumed at native speed via a C++ bridge, filtered in parallel by Rust workers, and written to Elasticsearch — with a built-in control plane for managing pipelines, workers, and runtime metrics.

In local benchmark runs with large messages tested separately by payload size, end-to-end Kafka consumer throughput exceeded 2 GB/s while filter matching remained correct.

Architecture

Kafka ──► C++ librdkafka bridge (buffer pool)
               │  FFI
               ▼
         Rust ingress threads
               │
        ┌──────┴──────┐
        │  worker mode │  bounded queue → N parallel workers (memmem filter)
        │  direct mode │  ingress thread filters directly (A/B baseline)
        └──────┬───────┘
               │  matched only
               ▼
        Elasticsearch sink
        (memory queue → spill to disk when full)
  • C++ layerlibrdkafka consumer + fixed buffer pool; prints pool stats every second
  • Rust layer — ingress threads, parallel worker pool, ES bulk writer, spill/replay
  • Control plane — REST API (control_plane binary) backed by SQLite; manages pipeline configs, workers, and metrics
  • Sync worker — sidecar process that polls the control plane and manages the sync subprocess lifecycle

Requirements

  • Rust 1.70+
  • CMake 3.15+
  • librdkafka (brew install librdkafka on macOS)
  • Docker + Docker Compose (for local Kafka)

Build

# Build C++ bridge library
cmake -S . -B build
cmake --build build -j

# Build Rust binaries
cargo build --release

Quick Start

1. Start Kafka

docker compose up -d

2. Run the pipeline

RUN_SECONDS=20 \
BRIDGE_LIB=build/libkafka_bridge.dylib \
KAFKA_BROKERS=127.0.0.1:29092 \
KAFKA_TOPIC=test \
KAFKA_GROUP_ID=rust-main-g1 \
POOL_SIZE=256 \
PIPELINE_MODE=worker \
RUST_WORKERS=8 \
MATCH_PATTERN=ERROR \
cargo run --release

3. Write matched messages to Elasticsearch

ES_ENABLED=1 \
ES_URL=http://localhost:9200 \
ES_INDEX=logs-filtered \
ES_BULK_ACTIONS=5000 \
ES_BULK_BYTES=16777216 \
ES_BULK_FLUSH_MS=100 \
cargo run --release

Control Plane

# Start control plane
mkdir -p data
CONTROL_LISTEN=0.0.0.0:8088 \
CONTROL_DB_PATH=data/control.db \
cargo run --bin control_plane

# Start sync worker
CONTROL_PLANE_URL=http://127.0.0.1:8088 \
WORKER_ID=worker-1 \
SYNC_BIN_PATH=target/release/ffi_pool_monitor \
cargo run --bin sync_worker

Key API Endpoints

# Health check
curl http://127.0.0.1:8088/health

# Create a pipeline
curl -X POST http://127.0.0.1:8088/api/pipelines \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "topic-test-to-es",
    "kafka_brokers": "127.0.0.1:29092",
    "kafka_topic": "test",
    "kafka_group_id": "rust-main-g1",
    "es_url": "http://localhost:9200",
    "es_index": "app-json-generic",
    "es_pipeline": "parse_message_json_generic",
    "match_pattern": "ERROR",
    "enabled": true
  }'

# List / start / stop pipelines
curl http://127.0.0.1:8088/api/pipelines
curl -X POST http://127.0.0.1:8088/api/pipelines/1/start
curl -X POST http://127.0.0.1:8088/api/pipelines/1/stop

# Runtime metrics
curl http://127.0.0.1:8088/api/metrics/runtime
curl "http://127.0.0.1:8088/api/metrics/runtime/history?seconds=300"
curl "http://127.0.0.1:8088/api/metrics/es?pipeline_id=1"

# Worker management
curl http://127.0.0.1:8088/api/workers
curl -X PUT http://127.0.0.1:8088/api/workers/worker-1/assignment \
  -H 'Content-Type: application/json' \
  -d '{"pipeline_id": 1, "desired_state": "running"}'

Directory Structure

src/
  main.rs                  # Entry point; routes by PRODUCT_LINE
  query/mod.rs             # Query pipeline (Kafka → filter → ES)
  metrics/mod.rs           # Metrics pipeline (stub)
  bin/control_plane.rs     # Control plane API server
  bin/sync_worker.rs       # Sync worker sidecar
  control/db.rs            # SQLite config store
  control/es.rs            # ES metrics collector

Environment Variables

Rust / Pipeline

Variable Default Description
PIPELINE_MODE worker worker (parallel) or direct (single-thread baseline)
RUST_WORKERS 8 Number of parallel filter workers
INGRESS_THREADS 2 Kafka ingress thread count
BATCH_PULL_SIZE 128 Messages pulled per ingress batch
TASK_QUEUE_CAP 4096 Bounded queue capacity
MATCH_PATTERN ERROR Byte pattern filter (memmem)
RUN_SECONDS 20 Run duration (0 = run forever)
BRIDGE_LIB build/libkafka_bridge.dylib C++ bridge shared library path
PRODUCT_LINE query Pipeline type (query or metrics)

Elasticsearch

Variable Default Description
ES_ENABLED 0 Set to 1 to enable ES writes
ES_URL http://localhost:9200 Elasticsearch endpoint
ES_INDEX logs-filtered Target index
ES_PIPELINE (empty) Ingest pipeline name
ES_BULK_ACTIONS 5000 Max docs per bulk request
ES_BULK_BYTES 16777216 Max bytes per bulk request (16 MiB)
ES_BULK_FLUSH_MS 100 Bulk flush interval (ms)
ES_QUEUE_CAP 200000 In-memory queue capacity
ES_DROP_WHEN_FULL 0 0 = backpressure, 1 = drop on full
ES_SPILL_PATH data/es_spill.bin Spill file path
ES_SPILL_MAX_BYTES 4294967296 Max spill file size (4 GiB)
ES_TIMEOUT_MS 3000 ES request timeout (ms)

C++ Bridge

Variable Default Description
KAFKA_BROKERS 127.0.0.1:29092 Kafka broker list
KAFKA_TOPIC test Topic to consume
KAFKA_GROUP_ID rust-main-g1 Consumer group ID
POOL_SIZE 256 Fixed buffer pool size
BUFFER_BYTES 1048576 Per-buffer size (1 MiB)
BATCH_BYTES 262144 Target bytes per pull batch
BATCH_MSGS 256 Max messages per pull batch
OVERFLOW_DROP_NEWEST 0 0 = backpressure, 1 = drop newest

Control Plane & Sync Worker

Variable Default Description
CONTROL_LISTEN 0.0.0.0:8088 Listen address
CONTROL_DB_PATH data/control.db SQLite database path
RUNTIME_METRICS_PATH data/runtime_metrics.json Metrics snapshot path
WORKER_ID (required) Unique worker ID
WORKER_POLL_MS 1000 Poll interval (ms)

Benchmarking

# Produce test data
NUM_RECORDS=150000 RECORD_SIZE=1024 THROUGHPUT=-1 \
TOPIC=test BROKER=localhost:29092 \
./scripts/produce_test_topic.sh

# C++ consumer baseline
KAFKA_TOPIC=test KAFKA_GROUP_ID=cpp_bench_1 \
BENCH_PROFILE=aggressive RUN_SECONDS=25 \
./build/bridge_consumer_bench

# Rust + C++ bridge in Docker
docker build -f Dockerfile.rustbridge -t rustbridge:latest .
docker run --rm --network kafka_default \
  -e KAFKA_BROKERS=kafka:9092 \
  -e KAFKA_TOPIC=test_500 \
  -e PIPELINE_MODE=worker \
  -e RUST_WORKERS=8 \
  rustbridge:latest

Observability

C++ bridge — per-second pool stats:

free=240 ready=4 in_flight=12 total=256 | kafka=85000 msg/s 82.3 MiB/s

Rust pipeline — per-second counters:

polled=85000 msg/s 82.3 MiB/s | matched=12 | drop_filter=84988

Key metrics to watch during ES backpressure:

  • es_spill_queue_bytes — should trend down during replay
  • es_mem_queue_len — should not grow unbounded
  • es_429_proxy_rejected_total — ES rejecting writes under load

If you see "filter rate = 0 but ES write rate is high", the pipeline is replaying from the spill file after a backpressure episode — this is expected behavior.

Production Notes

  • Deploy consumer nodes in the same VPC/AZ as Kafka to avoid virtualization network overhead
  • Set Kafka topic retention to at least 3 days
  • Commit offsets only after successful processing (at-least-once delivery)
  • Run one worker process per pipeline; use multiple processes for multi-topic setups

License

MIT


中文

基于 Rust + C++(librdkafka) 构建的高吞吐日志过滤管道。C++ 层负责以原生速度消费 Kafka 消息,Rust worker 并行过滤后写入 Elasticsearch,并提供完整的控制面用于管理 pipeline、worker 和运行时指标。

在本地压测中,按不同消息体积分别测试时,Kafka 消费吞吐可稳定达到 2 GB/s 以上,同时过滤结果保持正确。

架构

Kafka ──► C++ librdkafka 消费层(buffer pool)
               │  FFI
               ▼
         Rust ingress 线程
               │
        ┌──────┴──────┐
        │  worker 模式 │  有界队列 → N 个并行 worker(memmem 过滤)
        │  direct 模式 │  ingress 线程直接过滤(A/B 基准对比)
        └──────┬───────┘
               │  仅匹配数据
               ▼
        Elasticsearch 写入层
        (内存队列 → 满时 spill 落盘)
  • C++ 层librdkafka 消费者 + 固定 buffer pool,每秒输出池统计
  • Rust 层 — ingress 线程、并行 worker 池、ES bulk 写入、spill/回放
  • 控制面 — 基于 SQLite 的 REST API(control_plane 二进制),管理 pipeline 配置、worker 和指标
  • Sync Worker — 轮询控制面并管理同步子进程生命周期的 sidecar 进程

环境要求

  • Rust 1.70+
  • CMake 3.15+
  • librdkafka(macOS:brew install librdkafka
  • Docker + Docker Compose(本地 Kafka)

构建

# 构建 C++ bridge 动态库
cmake -S . -B build
cmake --build build -j

# 构建 Rust 二进制
cargo build --release

快速开始

1. 启动 Kafka

docker compose up -d

2. 启动 pipeline

RUN_SECONDS=20 \
BRIDGE_LIB=build/libkafka_bridge.dylib \
KAFKA_BROKERS=127.0.0.1:29092 \
KAFKA_TOPIC=test \
KAFKA_GROUP_ID=rust-main-g1 \
POOL_SIZE=256 \
PIPELINE_MODE=worker \
RUST_WORKERS=8 \
MATCH_PATTERN=ERROR \
cargo run --release

3. 开启 Elasticsearch 写入

ES_ENABLED=1 \
ES_URL=http://localhost:9200 \
ES_INDEX=logs-filtered \
ES_BULK_ACTIONS=5000 \
ES_BULK_BYTES=16777216 \
ES_BULK_FLUSH_MS=100 \
cargo run --release

控制面

# 启动控制面
mkdir -p data
CONTROL_LISTEN=0.0.0.0:8088 \
CONTROL_DB_PATH=data/control.db \
cargo run --bin control_plane

# 启动 sync worker
CONTROL_PLANE_URL=http://127.0.0.1:8088 \
WORKER_ID=worker-1 \
SYNC_BIN_PATH=target/release/ffi_pool_monitor \
cargo run --bin sync_worker

核心 API

# 健康检查
curl http://127.0.0.1:8088/health

# 新增 pipeline 配置
curl -X POST http://127.0.0.1:8088/api/pipelines \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "topic-test-to-es",
    "kafka_brokers": "127.0.0.1:29092",
    "kafka_topic": "test",
    "kafka_group_id": "rust-main-g1",
    "es_url": "http://localhost:9200",
    "es_index": "app-json-generic",
    "es_pipeline": "parse_message_json_generic",
    "match_pattern": "ERROR",
    "enabled": true
  }'

# 列表 / 启动 / 停止
curl http://127.0.0.1:8088/api/pipelines
curl -X POST http://127.0.0.1:8088/api/pipelines/1/start
curl -X POST http://127.0.0.1:8088/api/pipelines/1/stop

# 运行时指标
curl http://127.0.0.1:8088/api/metrics/runtime
curl "http://127.0.0.1:8088/api/metrics/runtime/history?seconds=300"
curl "http://127.0.0.1:8088/api/metrics/es?pipeline_id=1"

# Worker 管理
curl http://127.0.0.1:8088/api/workers
curl -X PUT http://127.0.0.1:8088/api/workers/worker-1/assignment \
  -H 'Content-Type: application/json' \
  -d '{"pipeline_id": 1, "desired_state": "running"}'

目录结构

src/
  main.rs                  # 入口,按 PRODUCT_LINE 分流
  query/mod.rs             # 查询 pipeline(Kafka → 过滤 → ES)
  metrics/mod.rs           # 指标 pipeline(预留骨架)
  bin/control_plane.rs     # 控制面 API 服务
  bin/sync_worker.rs       # Sync worker sidecar
  control/db.rs            # SQLite 配置存储
  control/es.rs            # ES 指标采集

环境变量

Rust / Pipeline

变量 默认值 说明
PIPELINE_MODE worker worker(并行)或 direct(单线程基准)
RUST_WORKERS 8 并行 worker 数
INGRESS_THREADS 2 ingress 线程数
BATCH_PULL_SIZE 128 每次拉取消息数
TASK_QUEUE_CAP 4096 有界队列容量
MATCH_PATTERN ERROR 过滤字节串(memmem)
RUN_SECONDS 20 运行时长(0 表示持续运行)
BRIDGE_LIB build/libkafka_bridge.dylib C++ bridge 动态库路径
PRODUCT_LINE query 主线类型(querymetrics

Elasticsearch

变量 默认值 说明
ES_ENABLED 0 设为 1 开启 ES 写入
ES_URL http://localhost:9200 ES 地址
ES_INDEX logs-filtered 目标索引
ES_PIPELINE (空) Ingest pipeline 名称
ES_BULK_ACTIONS 5000 每次 bulk 最大文档数
ES_BULK_BYTES 16777216 每次 bulk 最大字节数(16 MiB)
ES_BULK_FLUSH_MS 100 bulk 刷新间隔(ms)
ES_QUEUE_CAP 200000 内存队列容量
ES_DROP_WHEN_FULL 0 0 = 背压,1 = 满时丢弃
ES_SPILL_PATH data/es_spill.bin spill 文件路径
ES_SPILL_MAX_BYTES 4294967296 spill 文件最大容量(4 GiB)
ES_TIMEOUT_MS 3000 ES 请求超时(ms)

C++ Bridge

变量 默认值 说明
KAFKA_BROKERS 127.0.0.1:29092 Kafka broker 地址
KAFKA_TOPIC test 消费 topic
KAFKA_GROUP_ID rust-main-g1 消费组 ID
POOL_SIZE 256 固定 buffer pool 大小
BUFFER_BYTES 1048576 单 buffer 大小(1 MiB)
BATCH_BYTES 262144 每批目标字节数
BATCH_MSGS 256 每批最大消息数
OVERFLOW_DROP_NEWEST 0 0 = 背压,1 = 溢出丢弃新消息

控制面 & Sync Worker

变量 默认值 说明
CONTROL_LISTEN 0.0.0.0:8088 监听地址
CONTROL_DB_PATH data/control.db SQLite 数据库路径
RUNTIME_METRICS_PATH data/runtime_metrics.json 运行时指标快照路径
WORKER_ID (必填) 唯一 worker ID
WORKER_POLL_MS 1000 轮询间隔(ms)

压测

# 生产测试数据
NUM_RECORDS=150000 RECORD_SIZE=1024 THROUGHPUT=-1 \
TOPIC=test BROKER=localhost:29092 \
./scripts/produce_test_topic.sh

# 纯 C++ 消费基准
KAFKA_TOPIC=test KAFKA_GROUP_ID=cpp_bench_1 \
BENCH_PROFILE=aggressive RUN_SECONDS=25 \
./build/bridge_consumer_bench

# Docker 容器内运行 Rust + C++ bridge
docker build -f Dockerfile.rustbridge -t rustbridge:latest .
docker run --rm --network kafka_default \
  -e KAFKA_BROKERS=kafka:9092 \
  -e KAFKA_TOPIC=test_500 \
  -e PIPELINE_MODE=worker \
  -e RUST_WORKERS=8 \
  rustbridge:latest

可观测性

C++ bridge 每秒输出:

free=240 ready=4 in_flight=12 total=256 | kafka=85000 msg/s 82.3 MiB/s

Rust pipeline 每秒输出:

polled=85000 msg/s 82.3 MiB/s | matched=12 | drop_filter=84988

ES 背压时重点关注:

  • es_spill_queue_bytes — 回放期间应持续下降
  • es_mem_queue_len — 不应持续增长
  • es_429_proxy_rejected_total — ES 在高负载下拒绝写入

如果出现"过滤速率为 0,但 ES 写入速率很高",说明 pipeline 正在从 spill 文件回放历史积压数据,属于正常现象。

生产部署建议

  • 将消费节点部署在与 Kafka 同 VPC / 同 AZ 的内网节点,避免跨虚拟化网络边界带来的吞吐损耗
  • Kafka topic retention 建议至少设置 3 天
  • 仅在处理成功后提交 offset,保证至少一次语义
  • 遵循 1 worker = 1 pipeline 原则,同机多 topic 使用多进程

License

MIT

About

a fast and security log filter ,like logstash but better

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors