diff --git a/xllm/core/framework/block/block_manager_impl.cpp b/xllm/core/framework/block/block_manager_impl.cpp index 17efb6e16..f006fbb23 100644 --- a/xllm/core/framework/block/block_manager_impl.cpp +++ b/xllm/core/framework/block/block_manager_impl.cpp @@ -20,6 +20,31 @@ limitations under the License. #include "framework/prefix_cache/prefix_cache_factory.h" namespace xllm { +namespace { + +bool mark_used(std::vector* usage_ids, int32_t block_id) { + CHECK(usage_ids != nullptr); + CHECK_GE(block_id, 0); + CHECK_LT(static_cast(block_id), usage_ids->size()); + if ((*usage_ids)[block_id] != 0) { + return false; + } + (*usage_ids)[block_id] = 1; + return true; +} + +bool clear_used(std::vector* usage_ids, int32_t block_id) { + CHECK(usage_ids != nullptr); + CHECK_GE(block_id, 0); + CHECK_LT(static_cast(block_id), usage_ids->size()); + if ((*usage_ids)[block_id] == 0) { + return false; + } + (*usage_ids)[block_id] = 0; + return true; +} + +} // namespace BlockManagerImpl::BlockManagerImpl(const Options& options) : BlockManager(options) { @@ -34,6 +59,9 @@ BlockManagerImpl::BlockManagerImpl(const Options& options) size_t total_blocks = options_.num_blocks(); block_size_ = options_.block_size(); num_free_blocks_.store(total_blocks, std::memory_order_relaxed); + if (options_.enable_prefix_cache()) { + usage_accounted_ids_.assign(total_blocks, 0); + } free_blocks_.reserve(total_blocks); for (int32_t i = 0; i < total_blocks; ++i) { // push smaller block ids to the back of the vector @@ -57,6 +85,10 @@ std::vector BlockManagerImpl::allocate(size_t num_blocks) { size_t prev_count = num_free_blocks_.fetch_sub(1, std::memory_order_relaxed); const int32_t block_id = free_blocks_[prev_count - 1]; + if (options_.enable_prefix_cache()) { + CHECK(mark_used(&usage_accounted_ids_, block_id)) + << "block " << block_id << " usage accounted repeatedly"; + } blocks.emplace_back(block_id, this); } @@ -69,7 +101,8 @@ void BlockManagerImpl::deallocate(const Slice& blocks) { if (options_.enable_prefix_cache()) { for (const auto& block : blocks) { // the block is not shared by other sequence - if (block.is_valid() && block.ref_count() <= 2) { + if (block.is_valid() && block.ref_count() <= 2 && + clear_used(&usage_accounted_ids_, block.id())) { if (num_used_blocks_ == 0) { LOG(ERROR) << "num_used_blocks_==0 cannot fetch_sub for id:" << block.id() @@ -138,8 +171,7 @@ std::vector BlockManagerImpl::allocate_shared( // update effective block usage for (const auto& block : shared_blocks) { - // the block is not shared by any sequence - if (block.ref_count() <= 2) { + if (mark_used(&usage_accounted_ids_, block.id())) { num_used_blocks_.fetch_add(1, std::memory_order_relaxed); } } @@ -187,6 +219,11 @@ Block BlockManagerImpl::allocate() { void BlockManagerImpl::free(int32_t block_id) { // do nothing for reserved block 0 if (block_id != 0) { + if (options_.enable_prefix_cache() && + clear_used(&usage_accounted_ids_, block_id)) { + CHECK_GT(num_used_blocks_.load(std::memory_order_relaxed), 0u); + num_used_blocks_.fetch_sub(1, std::memory_order_relaxed); + } size_t prev_count = num_free_blocks_.fetch_add(1, std::memory_order_relaxed); CHECK(prev_count < free_blocks_.size()); diff --git a/xllm/core/framework/block/block_manager_impl.h b/xllm/core/framework/block/block_manager_impl.h index 8dc5540eb..13454e851 100644 --- a/xllm/core/framework/block/block_manager_impl.h +++ b/xllm/core/framework/block/block_manager_impl.h @@ -112,6 +112,9 @@ class BlockManagerImpl : public BlockManager { // free block list std::vector free_blocks_; + + // Whether a block is already counted in num_used_blocks_. + std::vector usage_accounted_ids_; }; } // namespace xllm diff --git a/xllm/core/scheduler/prefill_only_scheduler.cpp b/xllm/core/scheduler/prefill_only_scheduler.cpp index 0821edc65..92082b6fe 100644 --- a/xllm/core/scheduler/prefill_only_scheduler.cpp +++ b/xllm/core/scheduler/prefill_only_scheduler.cpp @@ -673,4 +673,4 @@ std::vector PrefillOnlyScheduler::prepare_batch() { return batches; } -} // namespace xllm \ No newline at end of file +} // namespace xllm