feat: support hetero mlu mooncake push pd.#1304
feat: support hetero mlu mooncake push pd.#1304phantomlei3 wants to merge 6 commits intojd-opensource:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a PD topology guard to validate compatibility between prefill and decode instances, supporting heterogeneous configurations for MLU with MLA and PUSH-based KV cache transfer. It introduces specialized MLU tensor allocation, routing logic for KV cache distribution, and logic for merging KV cache information. Review feedback identifies a potential integer overflow in memory allocation size calculations and requires adherence to the style guide regarding vector reservation to optimize performance.
| for (int64_t dim : dims) { | ||
| CHECK_GE(dim, 0) << "tensor dim must be non-negative"; | ||
| count *= static_cast<size_t>(dim); | ||
| } |
There was a problem hiding this comment.
The multiplication count *= static_cast<size_t>(dim) can overflow if the tensor dimensions are very large. This can lead to allocating a much smaller buffer than required, causing memory corruption later. You should add a check to prevent overflow.
You'll also need to include <limits>.
for (int64_t dim : dims) {
CHECK_GE(dim, 0) << "tensor dim must be non-negative";
if (dim > 0 && count > std::numeric_limits<size_t>::max() / static_cast<size_t>(dim)) {
LOG(FATAL) << "Tensor dimensions are too large and would cause overflow.";
}
count *= static_cast<size_t>(dim);
}| if (use_push_owner(src_tp_size, dst_tp_size)) { | ||
| int32_t dst_rank = dst_dp_rank * dst_tp_size + src_tp_rank % dst_tp_size; | ||
| dst_ranks.emplace_back(dst_rank); | ||
| return dst_ranks; | ||
| } | ||
|
|
||
| int32_t start_rank = src_tp_rank % dst_tp_size + dst_tp_size * dst_dp_rank; | ||
| int32_t end_rank = dst_tp_size * (dst_dp_rank + 1); | ||
| for (int32_t i = start_rank; i < end_rank; i += src_tp_size) { | ||
| dst_ranks.emplace_back(i); | ||
| } | ||
| return dst_ranks; |
There was a problem hiding this comment.
According to the style guide (rule 274), you should reserve() memory for a std::vector before filling it when the size is known or can be estimated. This avoids multiple reallocations. In this function, the number of elements to be added can be calculated before the loop.
if (use_push_owner(src_tp_size, dst_tp_size)) {
dst_ranks.reserve(1);
int32_t dst_rank = dst_dp_rank * dst_tp_size + src_tp_rank % dst_tp_size;
dst_ranks.emplace_back(dst_rank);
return dst_ranks;
}
int32_t start_rank = src_tp_rank % dst_tp_size + dst_tp_size * dst_dp_rank;
int32_t end_rank = dst_tp_size * (dst_dp_rank + 1);
if (start_rank < end_rank && src_tp_size > 0) {
dst_ranks.reserve((end_rank - 1 - start_rank) / src_tp_size + 1);
}
for (int32_t i = start_rank; i < end_rank; i += src_tp_size) {
dst_ranks.emplace_back(i);
}
return dst_ranks;References
- Always
reserve()before filling astd::vectorwhen the size is known or can be estimated to avoid multiple reallocations. (link)
| merged_layer_offsets[layer_id].k_offsets.insert( | ||
| merged_layer_offsets[layer_id].k_offsets.end(), | ||
| layer_offsets[layer_id].k_offsets.begin(), | ||
| layer_offsets[layer_id].k_offsets.end()); | ||
| merged_layer_offsets[layer_id].v_offsets.insert( | ||
| merged_layer_offsets[layer_id].v_offsets.end(), | ||
| layer_offsets[layer_id].v_offsets.begin(), | ||
| layer_offsets[layer_id].v_offsets.end()); |
There was a problem hiding this comment.
According to the style guide (rule 274), you should reserve() memory for a std::vector before filling it when the size is known. This avoids multiple reallocations. Here, you can reserve space for k_offsets and v_offsets before inserting new elements.
| merged_layer_offsets[layer_id].k_offsets.insert( | |
| merged_layer_offsets[layer_id].k_offsets.end(), | |
| layer_offsets[layer_id].k_offsets.begin(), | |
| layer_offsets[layer_id].k_offsets.end()); | |
| merged_layer_offsets[layer_id].v_offsets.insert( | |
| merged_layer_offsets[layer_id].v_offsets.end(), | |
| layer_offsets[layer_id].v_offsets.begin(), | |
| layer_offsets[layer_id].v_offsets.end()); | |
| auto& k_target = merged_layer_offsets[layer_id].k_offsets; | |
| const auto& k_source = layer_offsets[layer_id].k_offsets; | |
| k_target.reserve(k_target.size() + k_source.size()); | |
| k_target.insert(k_target.end(), k_source.begin(), k_source.end()); | |
| auto& v_target = merged_layer_offsets[layer_id].v_offsets; | |
| const auto& v_source = layer_offsets[layer_id].v_offsets; | |
| v_target.reserve(v_target.size() + v_source.size()); | |
| v_target.insert(v_target.end(), v_source.begin(), v_source.end()); |
References
- Always
reserve()before filling astd::vectorwhen the size is known or can be estimated to avoid multiple reallocations. (link)
| it->second.src_blocks.insert(it->second.src_blocks.end(), | ||
| info.local_blocks_ids.begin(), | ||
| info.local_blocks_ids.end()); | ||
| it->second.dst_blocks.insert(it->second.dst_blocks.end(), | ||
| info.remote_blocks_ids.begin(), | ||
| info.remote_blocks_ids.end()); |
There was a problem hiding this comment.
According to the style guide (rule 274), you should reserve() memory for a std::vector before filling it when the size is known. This avoids multiple reallocations. Here, you can reserve space for src_blocks and dst_blocks before inserting new elements.
| it->second.src_blocks.insert(it->second.src_blocks.end(), | |
| info.local_blocks_ids.begin(), | |
| info.local_blocks_ids.end()); | |
| it->second.dst_blocks.insert(it->second.dst_blocks.end(), | |
| info.remote_blocks_ids.begin(), | |
| info.remote_blocks_ids.end()); | |
| auto& src_blocks = it->second.src_blocks; | |
| src_blocks.reserve(src_blocks.size() + info.local_blocks_ids.size()); | |
| src_blocks.insert(src_blocks.end(), info.local_blocks_ids.begin(), info.local_blocks_ids.end()); | |
| auto& dst_blocks = it->second.dst_blocks; | |
| dst_blocks.reserve(dst_blocks.size() + info.remote_blocks_ids.size()); | |
| dst_blocks.insert(dst_blocks.end(), info.remote_blocks_ids.begin(), info.remote_blocks_ids.end()); |
References
- Always
reserve()before filling astd::vectorwhen the size is known or can be estimated to avoid multiple reallocations. (link)
| false, false, false, true, "invalid remote pd topo: " + reason}; | ||
| } | ||
|
|
||
| return check_mlu_pd_topo( |
There was a problem hiding this comment.
The naming of check_mlu_pd_topo feels a bit misleading to me. The function currently mixes two different concerns: a generic PD topology compatibility check (dp_size / tp_size match) and the current hetero-PD allowlist that happens to be MLU-specific (is_mlu_build, kv_mode == PUSH, enable_mla). Because of that, the mlu prefix suggests the whole function is platform-specific, while part of the logic is actually generic and could potentially be reused by other backends in the future. It may be clearer to either split the generic topology check from the platform policy check, or rename this function to something closer to its actual responsibility, such as check_pd_topo_compatibility plus a separate check_hetero_pd_requirements.
There was a problem hiding this comment.
check_pd_topo_compatibility->check_pd_topo
There was a problem hiding this comment.
Agree. The current name is misleading because the same dp/tp path is backend-neutral, while only the hetero allowlist is MLU-specific. I’ll split the generic topology check from the hetero backend requirements and rename the public check to check_pd_topo.
| return PdTopoRule{ | ||
| false, true, false, false, "hetero pd requires kv_mode=PUSH"}; | ||
| } | ||
| if (!enable_mla) { |
There was a problem hiding this comment.
why only support mla?
There was a problem hiding this comment.
Hetero PD is currently limited to MLU + PUSH + MLA because the non-MLA KV cache is still sharded by local KV heads, so different TP sizes would require additional head-dimension split/merge handling that this PR does not implement or validate yet.
| false, true, false, false, "hetero pd requires enable_mla=true"}; | ||
| } | ||
|
|
||
| return PdTopoRule{true, true, false, false, ""}; |
There was a problem hiding this comment.
PdTopoRule need to rename.
There was a problem hiding this comment.
Agree. PdTopoRule is not really a rule object; it is the result of a topology check. I’ll rename it to PdTopoResult.
| is_mlu_build, | ||
| options_.kv_cache_transfer_mode(), | ||
| enable_mla); | ||
| if (!rule.allow) { |
There was a problem hiding this comment.
I am not sure PdTopoRule is the right abstraction here. If the goal is to describe topology compatibility, hetero as a standalone boolean feels a bit awkward, because homogeneous and heterogeneous are both topology relations rather than one being a special side flag. Also, if the caller only needs an allow/deny decision plus the failure reason, a simpler result type may be easier to follow. If we do need structured branching (for example, invalid local vs invalid remote), it may be clearer to model that explicitly as an enum/status instead of several loosely related booleans.
There was a problem hiding this comment.
Agree. The loose boolean fields make homogeneous, heterogeneous, and invalid states harder to reason about. I’ll replace them with an explicit status enum so the scheduler can branch on ALLOW_HOMO, ALLOW_HETERO, INVALID_LOCAL, and INVALID_REMOTE directly.
3e6bead to
eb8343d
Compare
There was a problem hiding this comment.
move push_route to kv_cache_transfer dir.
There was a problem hiding this comment.
move to kv_cache_transfer dir.
422f1d8 to
8facc61
Compare
No description provided.