Skip to content

Latest commit

 

History

History
198 lines (140 loc) · 6.17 KB

File metadata and controls

198 lines (140 loc) · 6.17 KB

基于Message Group的Pop消息顺序优化 POC

概述

本POC实现了基于Message Group的Pop消息顺序优化方案,将传统的队列级别阻塞机制优化为基于Message Group的细粒度阻塞机制,从而提升顺序消息的并发处理能力。

问题背景

传统方式的问题

  • 队列级别阻塞:只要队列中有未确认的消息就阻塞整个队列
  • 并发能力低:不同Message Group之间无法并发处理
  • 资源浪费:大量消费者因为阻塞而空闲等待

优化目标

  • 细粒度锁定:只阻塞相同Message Group的消息
  • 提升并发:不同Message Group可以并发处理
  • 智能预取:通过预取分析Message Group分布,平衡性能和资源消耗

核心设计

架构变化

传统方式:
topic@group -> queueId -> OrderInfo (队列级别阻塞)

优化方式:
topic@group -> queueId -> messageGroup -> OrderInfo (Message Group级别阻塞)

关键组件

  1. MessageGroupOrderInfoManager - 基于Message Group的顺序信息管理器
  2. PopMessageProcessorWithMessageGroup - 优化的Pop消息处理器
  3. MessageGroupAnalysis - 消息组分析工具

实现文件

核心实现

  • broker/src/main/java/org/apache/rocketmq/broker/offset/MessageGroupOrderInfoManager.java
  • broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessorWithMessageGroup.java

测试用例

  • broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorWithMessageGroupTest.java

使用示例

  • broker/src/main/java/org/apache/rocketmq/broker/pop/MessageGroupPopExample.java

核心算法

1. 消息预取分析算法

// 预取少量消息分析Message Group分布
MessageGroupAnalysis analysis = analyzeMessageGroups(topic, group, queueId, offset, 3);

// 检查各Message Group的阻塞状态
List<String> availableGroups = getAvailableMessageGroups(analysis, ...);

// 智能获取消息策略
GetMessageResult result = intelligentGetMessages(..., availableGroups);

2. 智能获取策略

  • 单一Message Group:直接批量获取
  • 所有Message Group可用:正常批量获取
  • 部分Message Group阻塞:选择性获取可用组的消息

3. 阻塞检查机制

// 基于Message Group的阻塞检查
boolean isBlocked = checkMessageGroupBlocked(topic, group, queueId, messageGroup, attemptId, invisibleTime);

// 相同attemptId不阻塞(重复请求)
// 不同attemptId且有未确认消息则阻塞

性能优化

配置参数

  • PRE_READ_MESSAGE_COUNT = 3 - 预读消息数量
  • MAX_RETRY_COUNT = 2 - 最大重试次数

调优建议

  1. Message Group设计

    • 保持Message Group数量适中(2-10个)
    • 每个Message Group消息量相对均衡
    • 避免单个Message Group过大
  2. 预读数量调整

    • Message Group较少:2-3条
    • Message Group较多:5-8条

兼容性

  • 向后兼容:现有顺序消息逻辑保持不变
  • 渐进式升级:可以逐步开启Message Group优化
  • 性能无退化:单Message Group场景性能基本一致

测试场景

功能测试

  1. 单一Message Group的消息处理
  2. 多个Message Group的并发处理
  3. Message Group阻塞机制验证
  4. 消息确认流程测试
  5. 消息分析功能测试

性能测试

  1. 并发性能对比测试
  2. 边界条件测试
  3. 异常处理测试
  4. 长时间运行稳定性测试

配置测试

  1. 不同预读参数的影响测试
  2. 不同Message Group分布的性能测试

使用方法

1. 基本使用

// 创建处理器
PopMessageProcessorWithMessageGroup processor = 
    new PopMessageProcessorWithMessageGroup(brokerController);

// Pop消息
CompletableFuture<GetMessageResult> future = processor.popMsgFromQueueWithMessageGroup(
    requestHeader, topic, group, queueId, offset, batchSize, messageFilter);

// 消息确认
CompletableFuture<Boolean> ackFuture = processor.ackMessageWithMessageGroup(
    topic, group, queueId, queueOffset, messageGroup, popTime);

2. 消息分析

MessageGroupOrderInfoManager mgManager = processor.getMessageGroupOrderInfoManager();
MessageGroupAnalysis analysis = mgManager.analyzeMessageGroups(topic, group, queueId, offset, 5);

System.out.println("总消息数: " + analysis.getTotalMessages());
System.out.println("Message Group数量: " + analysis.getTotalMessageGroups());
System.out.println("Message Group列表: " + analysis.getOrderedGroups());

3. 阻塞状态检查

boolean isBlocked = processor.isMessageGroupBlocked(
    topic, group, queueId, messageGroup, attemptId, invisibleTime);

Map<String, List<String>> blockedGroups = mgManager.getBlockedMessageGroups(topic, group, queueId);

性能提升预期

理论提升

  • 并发度提升:N倍(N为Message Group数量)
  • 延迟降低:不同Message Group间无相互阻塞
  • 吞吐量提升:更好的资源利用率

适用场景

  1. 顺序消息中有多个不同的Message Group
  2. 不同Message Group之间没有依赖关系
  3. 希望提升并发处理能力的场景

监控指标

建议监控以下指标:

  1. Message Group级别的消费延迟
  2. 阻塞状态分布
  3. 并发处理效率
  4. 预取命中率
  5. Message Group分布均衡度

后续优化方向

  1. 动态调整预取数量:根据Message Group分布动态调整
  2. 智能负载均衡:基于Message Group负载进行消费者分配
  3. 更细粒度的监控:Message Group级别的详细监控
  4. 自适应阻塞策略:根据业务特点调整阻塞策略

风险控制

  1. 兼容性保障:保持与现有系统的完全兼容
  2. 性能监控:实时监控性能变化
  3. 回滚机制:支持快速回滚到传统方式
  4. 渐进式部署:小规模验证后逐步推广

总结

本POC通过将队列级别的阻塞优化为基于Message Group的细粒度阻塞,在保持顺序消息语义的前提下,显著提升了并发处理能力。通过智能预取和分析机制,在性能和资源消耗之间达到了良好的平衡。

该方案具有良好的兼容性和扩展性,可以作为RocketMQ顺序消息优化的重要方向进行进一步开发和优化。