Describe the bug
When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.
I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?
To Reproduce
Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError
Reproduce the steps
- Set the output plugin buffer size to 1
- send message
- check the log
Expected behavior
It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.
Your Environment
- Fluentd version: 1.16.2
- TD Agent version:
- fluent-plugin-kafka version: v0.19.2
- ruby-kafka version: 1.5.0
- Operating system:
- Kernel version:
Your Configuration
<source>
@type kafka_group
consumer_group fluentd-consumer
brokers kafka-controller-0.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.kafka.svc.cluster.local:9092
topics test-topic
format json
offset_commit_interval 5
offset_commit_threshold 100
</source>
<match test-topic>
@type stdout
<buffer>
@type memory
retry_max_times 3
flush_mode interval
flush_interval 1s
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 5
chunk_full_threshold 0.1
chunk_limit_size 1 # This is just a setup to intentionally throw a BufferChunkOverFlow error.
</buffer>
</match>
Your Error Log
2024-07-04 00:52:37 +0000 [info]: #1 Subscribe to topics matching the regex test-topic
2024-07-04 00:52:37 +0000 [warn]: #1 Re-starting consumer 2024-07-04 00:52:37 +0000
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="telemetry.ab"
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:111:in `block (2 levels) in process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `block in process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:885:in `emit_sync'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:196:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="test-topic"
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [error]: #1 unexpected error during consuming events from kafka. Re-fetch events. error="Kafka::ProcessingError"
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:345:in `rescue in block (3 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:338:in `block (3 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [warn]: #1 Stopping Consumer
2024-07-04 00:52:45 +0000 [warn]: #1 Could not connect to broker. retry_time:0. Next retry will be in 30 seconds
Additional context
No response
Describe the bug
When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.
I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?
To Reproduce
Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError
Reproduce the steps
Expected behavior
It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.
Your Environment
Your Configuration
Your Error Log
Additional context
No response