Skip to content

refactor batch stream processor#847

Closed
xinfei-shi wants to merge 1 commit intomainfrom
develop/normal_batch_processor_refactor
Closed

refactor batch stream processor#847
xinfei-shi wants to merge 1 commit intomainfrom
develop/normal_batch_processor_refactor

Conversation

@xinfei-shi
Copy link
Copy Markdown
Collaborator

No description provided.

@xinfei-shi xinfei-shi requested a review from LLLLKKKK as a code owner March 31, 2026 08:29
@xinfei-shi xinfei-shi force-pushed the develop/normal_batch_processor_refactor branch from 3be858e to f13abd8 Compare April 1, 2026 06:25
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented Apr 1, 2026

Code Review: PR #847 — refactor batch stream processor

Author: xinfei-shi
Review version: v1
Head SHA: f13abd8
Verdict: P1

Summary

NormalBatchStreamProcessor 拆分为三个独立类:

  • NormalModelInputGatherer — 负责收集模型输入
  • NormalSamplerInputGatherer — 负责收集采样器输入
  • NormalOutputDispatcher — 负责分发输出

重构方向正确,职责分离清晰。gatherSamplerInput 签名移除了 model_inputs 参数(不再需要),测试中 setKVCacheGroupTypes 改为构造时通过 cache_config.group_types 传入,更合理。

P1 Issues

1. hasCacheKeys() 守卫被移除 — 可能导致崩溃

File: NormalModelInputGatherer.cc (processContextStreams)

原始代码:

if (role_type_ == RoleType::PREFILL && stream->hasCacheKeys()) {
    std::memcpy((*model_input.cache_keys)[batch_idx - total_decode_batch_size].data(),
                stream->cacheKeys(i).data(),
                stream->cacheKeys(i).size() * sizeof(int64_t));
}

新代码:

if (ctx.max_blocks_num && config_.role_type == RoleType::PREFILL) {
    std::memcpy((*ctx.cache_keys)[prefill_batch_idx].data(),
                stream->cacheKeys(i).data(),
                stream->cacheKeys(i).size() * sizeof(int64_t));
}

hasCacheKeys() 检查被丢失了。当 stream 没有 cache keys 时(非 reuse cache 场景),直接调用 stream->cacheKeys(i) 可能返回空数据或触发断言失败。需要恢复 && stream->hasCacheKeys() 条件。

P2 Suggestions

2. GatherModelInputContext 结构体过大

GatherModelInputContext 包含 20+ 个字段,本质上是把原来的局部变量打包成了一个 struct。考虑是否可以进一步简化,例如让 decode/context 处理函数直接操作 GptModelInputs 而不是通过中间 context 转发。当前实现可以工作,但增加了理解成本。

3. fillSamplerLogitsProcessorscore_batch 参数未使用

void NormalSamplerInputGatherer::fillSamplerLogitsProcessor(..., bool score_batch) const {
    (void)score_batch;  // unused

如果 score_batch 不再需要,建议从接口中移除而不是 (void) 抑制警告。同样 MtpBatchStreamProcessor::gatherSpecSamplerInput 中的 (void)model_inputs 也是类似情况。

P3 Nits

4. NormalOutputDispatcher::dispatch 中的 device_ 局部变量

auto* device_ = config_.device;

局部变量命名为 device_(带下划线后缀)容易与成员变量混淆,建议改为 deviceNormalSamplerInputGatherer::allocateSamplerInputs 中也有同样的问题。

5. NormalModelInputGatherer::allocateModelInputBuffers 中同样的 device_ 命名

auto* device_ = config_.device;

这是因为 CACHED_HOST_BUF 宏内部引用 device_。如果宏依赖这个命名,建议在类中添加注释说明这个约定,避免后续维护者困惑。

@xinfei-shi xinfei-shi force-pushed the develop/normal_batch_processor_refactor branch 5 times, most recently from dfd3e7d to 7668064 Compare April 3, 2026 03:50
@xinfei-shi xinfei-shi force-pushed the develop/normal_batch_processor_refactor branch from 7668064 to 28b57c4 Compare April 7, 2026 06:51
@xinfei-shi xinfei-shi closed this Apr 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants