本POC实现了基于Message Group的Pop消息顺序优化方案,将传统的队列级别阻塞机制优化为基于Message Group的细粒度阻塞机制,从而提升顺序消息的并发处理能力。
- 队列级别阻塞:只要队列中有未确认的消息就阻塞整个队列
- 并发能力低:不同Message Group之间无法并发处理
- 资源浪费:大量消费者因为阻塞而空闲等待
- 细粒度锁定:只阻塞相同Message Group的消息
- 提升并发:不同Message Group可以并发处理
- 智能预取:通过预取分析Message Group分布,平衡性能和资源消耗
传统方式:
topic@group -> queueId -> OrderInfo (队列级别阻塞)
优化方式:
topic@group -> queueId -> messageGroup -> OrderInfo (Message Group级别阻塞)
- MessageGroupOrderInfoManager - 基于Message Group的顺序信息管理器
- PopMessageProcessorWithMessageGroup - 优化的Pop消息处理器
- MessageGroupAnalysis - 消息组分析工具
broker/src/main/java/org/apache/rocketmq/broker/offset/MessageGroupOrderInfoManager.javabroker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessorWithMessageGroup.java
broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorWithMessageGroupTest.java
broker/src/main/java/org/apache/rocketmq/broker/pop/MessageGroupPopExample.java
// 预取少量消息分析Message Group分布
MessageGroupAnalysis analysis = analyzeMessageGroups(topic, group, queueId, offset, 3);
// 检查各Message Group的阻塞状态
List<String> availableGroups = getAvailableMessageGroups(analysis, ...);
// 智能获取消息策略
GetMessageResult result = intelligentGetMessages(..., availableGroups);- 单一Message Group:直接批量获取
- 所有Message Group可用:正常批量获取
- 部分Message Group阻塞:选择性获取可用组的消息
// 基于Message Group的阻塞检查
boolean isBlocked = checkMessageGroupBlocked(topic, group, queueId, messageGroup, attemptId, invisibleTime);
// 相同attemptId不阻塞(重复请求)
// 不同attemptId且有未确认消息则阻塞PRE_READ_MESSAGE_COUNT = 3- 预读消息数量MAX_RETRY_COUNT = 2- 最大重试次数
-
Message Group设计
- 保持Message Group数量适中(2-10个)
- 每个Message Group消息量相对均衡
- 避免单个Message Group过大
-
预读数量调整
- Message Group较少:2-3条
- Message Group较多:5-8条
- 向后兼容:现有顺序消息逻辑保持不变
- 渐进式升级:可以逐步开启Message Group优化
- 性能无退化:单Message Group场景性能基本一致
- 单一Message Group的消息处理
- 多个Message Group的并发处理
- Message Group阻塞机制验证
- 消息确认流程测试
- 消息分析功能测试
- 并发性能对比测试
- 边界条件测试
- 异常处理测试
- 长时间运行稳定性测试
- 不同预读参数的影响测试
- 不同Message Group分布的性能测试
// 创建处理器
PopMessageProcessorWithMessageGroup processor =
new PopMessageProcessorWithMessageGroup(brokerController);
// Pop消息
CompletableFuture<GetMessageResult> future = processor.popMsgFromQueueWithMessageGroup(
requestHeader, topic, group, queueId, offset, batchSize, messageFilter);
// 消息确认
CompletableFuture<Boolean> ackFuture = processor.ackMessageWithMessageGroup(
topic, group, queueId, queueOffset, messageGroup, popTime);MessageGroupOrderInfoManager mgManager = processor.getMessageGroupOrderInfoManager();
MessageGroupAnalysis analysis = mgManager.analyzeMessageGroups(topic, group, queueId, offset, 5);
System.out.println("总消息数: " + analysis.getTotalMessages());
System.out.println("Message Group数量: " + analysis.getTotalMessageGroups());
System.out.println("Message Group列表: " + analysis.getOrderedGroups());boolean isBlocked = processor.isMessageGroupBlocked(
topic, group, queueId, messageGroup, attemptId, invisibleTime);
Map<String, List<String>> blockedGroups = mgManager.getBlockedMessageGroups(topic, group, queueId);- 并发度提升:N倍(N为Message Group数量)
- 延迟降低:不同Message Group间无相互阻塞
- 吞吐量提升:更好的资源利用率
- 顺序消息中有多个不同的Message Group
- 不同Message Group之间没有依赖关系
- 希望提升并发处理能力的场景
建议监控以下指标:
- Message Group级别的消费延迟
- 阻塞状态分布
- 并发处理效率
- 预取命中率
- Message Group分布均衡度
- 动态调整预取数量:根据Message Group分布动态调整
- 智能负载均衡:基于Message Group负载进行消费者分配
- 更细粒度的监控:Message Group级别的详细监控
- 自适应阻塞策略:根据业务特点调整阻塞策略
- 兼容性保障:保持与现有系统的完全兼容
- 性能监控:实时监控性能变化
- 回滚机制:支持快速回滚到传统方式
- 渐进式部署:小规模验证后逐步推广
本POC通过将队列级别的阻塞优化为基于Message Group的细粒度阻塞,在保持顺序消息语义的前提下,显著提升了并发处理能力。通过智能预取和分析机制,在性能和资源消耗之间达到了良好的平衡。
该方案具有良好的兼容性和扩展性,可以作为RocketMQ顺序消息优化的重要方向进行进一步开发和优化。