feat(core): add fetch listener callbacks and async inter-broker requests#3248
Merged
feat(core): add fetch listener callbacks and async inter-broker requests#3248
Conversation
06b861a to
930015a
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a new async inter-broker request sender utility and adds fetch lifecycle callbacks (per-partition fetch results + fetch session close notifications) to the ElasticKafkaApis path.
Changes:
- Add
AsyncSender+InterBrokerAsyncSenderto support async inter-broker requests backed byInterBrokerSendThread. - Introduce
FetchListenerand wire it intoElasticKafkaApisto report fetch offsets/timestamps, plus session-close notifications viaFetchSessionCacheShard. - Add/extend unit tests covering the sender behavior, fetch listener notifications, and session removal listener behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| server-common/src/main/java/org/apache/kafka/server/util/AsyncSender.java | New async sender interface for inter-broker requests |
| server-common/src/main/java/org/apache/kafka/server/util/InterBrokerAsyncSender.java | Async wrapper around InterBrokerSendThread returning CompletableFuture |
| server-common/src/test/java/org/apache/kafka/server/util/InterBrokerAsyncSenderTest.java | Unit tests for async sender success/error/timeout paths |
| core/src/main/scala/kafka/server/streamaspect/FetchListener.java | New listener interface for fetch + session-close callbacks |
| core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala | Wire fetch listener notifications into fetch handling |
| core/src/main/scala/kafka/server/FetchSession.scala | Add removal listener hook to FetchSessionCacheShard |
| core/src/main/scala/kafka/server/BrokerServer.scala | Instantiate listener + hook session-close notifications + set fetch listener on ElasticKafkaApis |
| core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | Add tests validating fetch listener notifications |
| core/src/test/scala/unit/kafka/server/FetchSessionTest.scala | Add test for removal listener notifications |
| core/src/test/scala/unit/kafka/server/streamaspect/ElasticKafkaApisTest.scala | Update helper to accept/set fetch listener |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e17325f to
03e253e
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f275710 to
3129dd4
Compare
d8c5a65 to
c6e8f2a
Compare
superhx
reviewed
Apr 10, 2026
203e776 to
ab84639
Compare
superhx
reviewed
Apr 21, 2026
superhx
reviewed
Apr 21, 2026
…tion - introduce AsyncSender interface for async request sending - add InterBrokerAsyncSender based on InterBrokerSendThread with CompletableFuture completion - add InterBrokerAsyncSenderTest covering success, readiness transitions, disconnect/auth failure, timeout, wakeup and close
…BrokerExtensionHandle
…ction - Add AsyncFetchListener wrapper to handle both onFetch and onSessionClosed asynchronously - Move fetchListenerExecutor from ElasticKafkaApis to BrokerServer for unified lifecycle management - Simplify notifyFetchListener in ElasticKafkaApis by delegating async logic to AsyncFetchListener - Ensure proper executor shutdown in BrokerServer.shutdown() This change addresses the issue where calling fetchListener.onSessionClosed() directly inside FetchSessionCacheShard.remove() (while holding synchronized lock) could block fetch session cache operations if the listener is slow or blocked.
Cherry-picked from 9d53374 (feat/consumer-lag). Adds LatestAppendState to UnifiedLog to capture maxTimestamp and shallowOffsetOfMaxTimestamp on each successful append, avoiding the expensive fetchOffsetByTimestamp lookup.
- Add @volatile to fetchListener for cross-thread visibility - Drain pending requests on InterBrokerAsyncSender close - Replace removed shallowOffsetOfMaxTimestamp with lastOffset
Use a bounded LinkedBlockingQueue(1024) with DiscardOldestPolicy to prevent unbounded memory growth under high load. Also batch session removal notifications into a single executor task to minimize lock hold time on FetchSessionCacheShard.
- Add volatile closed flag to InterBrokerAsyncSender to reject requests after close, preventing futures that never complete - Add awaitTermination for fetchListenerExecutor during broker shutdown
- Add onSessionClosedBatch to AsyncFetchListener to eliminate duplicated async dispatch logic in BrokerServer sessionRemovalListener - Skip notifyFetchListener when listener is NOOP to avoid unnecessary map allocation and record batch iteration on the fetch hot path - Remove speculative TODO comment in extractFetchOffsetAndTimestamp - Add missing license header to AsyncSender.java and trailing newlines
Delegate session removal notification to AsyncFetchListener batch method instead of duplicating the async dispatch logic inline.
ab84639 to
bf5f33d
Compare
d7130e8 to
d3a40d4
Compare
superhx
approved these changes
Apr 25, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.