From 32838e7028417cf0d81b8f27dd254510f6a845ac Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Mon, 22 Jun 2026 03:14:49 +0000 Subject: [PATCH 01/15] [Store] Zero-copy write: eliminate staging copy in Pattern A internals Remove the unconditional alloc + memcpy staging from all 6 Pattern A *_internal functions (put, put_parts, put_batch, upsert, upsert_parts, upsert_batch). Slices are now created directly from source data via split_into_slices(ptr, size), deferring staging to ensureRegisteredForRDMA only when the RDMA path encounters unregistered memory. Key changes: - real_client.cpp: 6 *_internal functions skip staging, use direct slices - transfer_task.cpp: ensureRegisteredForRDMA stages unregistered slices with single contiguous alloc; selectStrategy dispatches LOCAL_MEMCPY (inline, GPU-safe) vs TRANSFER_ENGINE; kMaxSliceSize splitting applied to both registered and staged slices - gpu_staging_utils.h: MemcpySafe, CopyAuto, IsDevicePointer utilities; async batch copy support (CUDA Driver API / MUSA Runtime API) - transfer_engine: isLocalMemoryRegistered query for registration check - client_service: setStagingAllocator to pass allocator to TransferSubmitter - store_py.cpp: batch_put/upsert_tensor use multi_buffers API to bypass integration-layer staging - batch_upsert_from_multi_buffers: new API mirroring batch_put variant Performance: LOCAL_MEMCPY path reduces from 2 copies to 1 for all Pattern A callers. RDMA with registered buffers also saves 1 copy. RDMA with unregistered buffers unchanged (staging moves to submit layer). Co-Authored-By: Claude Opus 4.6 --- mooncake-integration/store/store_py.cpp | 114 ++++--- .../store/store_py_parallel_read.h | 2 +- .../store/store_py_parallel_write.h | 7 +- mooncake-store/include/client_service.h | 12 +- mooncake-store/include/gpu_staging_utils.h | 169 +++++++++++ mooncake-store/include/pyclient.h | 11 + mooncake-store/include/real_client.h | 13 + mooncake-store/include/transfer_task.h | 23 ++ mooncake-store/src/client_service.cpp | 37 ++- mooncake-store/src/real_client.cpp | 281 ++++++++---------- mooncake-store/src/transfer_task.cpp | 247 +++++++++++++-- .../include/transfer_engine.h | 2 + .../include/transfer_engine_impl.h | 2 + .../src/transfer_engine.cpp | 9 + .../src/transfer_engine_impl.cpp | 6 + 15 files changed, 711 insertions(+), 224 deletions(-) diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 4a55a2e11e..040d69f233 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -19,6 +19,7 @@ #include "integration_utils.h" #include "buffer_pool.h" +#include "gpu_staging_utils.h" // Forward declaration for EngramStore bindings namespace mooncake { @@ -798,15 +799,46 @@ class MooncakeStorePyWrapper { const std::vector &keys, const std::vector &infos, const ReplicateConfig &config = ReplicateConfig{}) { - return batch_write_tensor_impl( - keys, infos, config, "put", - [this](const std::vector &write_keys, - const std::vector &buffer_ptrs, - const std::vector &buffer_sizes, - const ReplicateConfig &write_config) { - return store_->batch_put_from(write_keys, buffer_ptrs, - buffer_sizes, write_config); - }); + auto group_ids_error = + ValidateGroupIdsForBatchConfig(config, keys.size(), "put"); + if (!group_ids_error.empty()) return group_ids_error; + + std::vector results(keys.size(), 0); + { + py::gil_scoped_release release_gil; + + std::vector valid_keys; + std::vector> all_buffers; + std::vector> all_sizes; + std::vector original_indices; + + for (size_t i = 0; i < infos.size(); ++i) { + if (!infos[i].valid()) { + results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); + continue; + } + valid_keys.push_back(keys[i]); + all_buffers.push_back( + {const_cast(reinterpret_cast( + &infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); + all_sizes.push_back({infos[i].metadata.header.data_offset, + infos[i].tensor_size}); + original_indices.push_back(i); + } + + if (!valid_keys.empty()) { + ReplicateConfig write_config = + MakeIndexedConfig(config, original_indices); + std::vector op_results = + store_->batch_put_from_multi_buffers( + valid_keys, all_buffers, all_sizes, write_config); + for (size_t i = 0; i < op_results.size(); ++i) { + results[original_indices[i]] = op_results[i]; + } + } + } + return results; } std::vector batch_put_tensor_impl( @@ -1366,55 +1398,34 @@ class MooncakeStorePyWrapper { results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); } - // 2. Prepare Buffers and Execute (GIL Released) + // 2. Zero-copy: pass metadata and data as separate buffers { py::gil_scoped_release release_gil; std::vector valid_keys; - std::vector buffer_ptrs; - std::vector buffer_sizes; + std::vector> all_buffers; + std::vector> all_sizes; std::vector original_indices; - std::vector> temp_allocations; - for (size_t i = 0; i < infos.size(); ++i) { if (!infos[i].valid()) continue; - size_t total_size = - sizeof(TensorMetadata) + infos[i].tensor_size; - auto alloc_result = - store_->client_buffer_allocator_->allocate(total_size); - - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for key: " << keys[i]; - results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); - continue; - } - - // Copy Metadata & Data - char *dst = static_cast(alloc_result->ptr()); - memcpy(dst, &infos[i].metadata, sizeof(TensorMetadata)); - if (infos[i].tensor_size > 0) { - memcpy(dst + sizeof(TensorMetadata), - reinterpret_cast(infos[i].data_ptr), - infos[i].tensor_size); - } - valid_keys.push_back(keys[i]); - buffer_ptrs.push_back(alloc_result->ptr()); - buffer_sizes.push_back(total_size); + all_buffers.push_back( + {const_cast(reinterpret_cast( + &infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); + all_sizes.push_back({infos[i].metadata.header.data_offset, + infos[i].tensor_size}); original_indices.push_back(i); - - temp_allocations.push_back( - std::make_unique(std::move(*alloc_result))); } if (!valid_keys.empty()) { ReplicateConfig write_config = MakeIndexedConfig(config, original_indices); - std::vector op_results = store_->batch_upsert_from( - valid_keys, buffer_ptrs, buffer_sizes, write_config); + std::vector op_results = + store_->batch_upsert_from_multi_buffers( + valid_keys, all_buffers, all_sizes, write_config); for (size_t i = 0; i < op_results.size(); ++i) { results[original_indices[i]] = op_results[i]; } @@ -2504,6 +2515,25 @@ PYBIND11_MODULE(store, m) { py::arg("config") = ReplicateConfig{}, "Upsert object data directly from pre-allocated buffers for " "multiple keys") + .def( + "batch_upsert_from_multi_buffers", + [](MooncakeStorePyWrapper &self, + const std::vector &keys, + const std::vector> &all_buffer_ptrs, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return std::vector{}; + } + py::gil_scoped_release release; + return self.store_->batch_upsert_from_multi_buffers( + keys, CastAddrs2Ptrs(all_buffer_ptrs), all_sizes, config); + }, + py::arg("keys"), py::arg("all_buffer_ptrs"), py::arg("all_sizes"), + py::arg("config") = ReplicateConfig{}, + "Upsert object data directly from multiple pre-allocated buffers " + "for multiple keys") .def( "upsert", [](MooncakeStorePyWrapper &self, const std::string &key, diff --git a/mooncake-integration/store/store_py_parallel_read.h b/mooncake-integration/store/store_py_parallel_read.h index 06207cdf55..e61ebf4a7f 100644 --- a/mooncake-integration/store/store_py_parallel_read.h +++ b/mooncake-integration/store/store_py_parallel_read.h @@ -1467,7 +1467,7 @@ std::vector execute_tensor_into_plan_transfers( continue; } if (plans[plan_idx].materialized_metadata.has_value()) { - std::memcpy( + gpu_staging::MemcpySafe( reinterpret_cast(plans[plan_idx].user_buffer_ptr), &*plans[plan_idx].materialized_metadata, sizeof(TensorMetadata)); diff --git a/mooncake-integration/store/store_py_parallel_write.h b/mooncake-integration/store/store_py_parallel_write.h index 2bba450a8f..c1e3a548aa 100644 --- a/mooncake-integration/store/store_py_parallel_write.h +++ b/mooncake-integration/store/store_py_parallel_write.h @@ -59,9 +59,10 @@ std::vector batch_write_tensor_impl(const std::vector &keys, std::memcpy(dst, &infos[i].metadata, infos[i].metadata.header.data_offset); if (infos[i].tensor_size > 0) { - std::memcpy(dst + infos[i].metadata.header.data_offset, - reinterpret_cast(infos[i].data_ptr), - infos[i].tensor_size); + gpu_staging::MemcpySafe( + dst + infos[i].metadata.header.data_offset, + reinterpret_cast(infos[i].data_ptr), + infos[i].tensor_size); } valid_keys.push_back(keys[i]); diff --git a/mooncake-store/include/client_service.h b/mooncake-store/include/client_service.h index 0094f4f14d..7d68b5c5d3 100644 --- a/mooncake-store/include/client_service.h +++ b/mooncake-store/include/client_service.h @@ -64,6 +64,15 @@ class Client { public: virtual ~Client(); + /** + * @brief Reinitialize TransferSubmitter with a staging allocator. + * + * Called by RealClient after its client buffer allocator is ready, + * so that ensureRegisteredForRDMA can stage unregistered user buffers. + */ + void setStagingAllocator( + std::shared_ptr staging_allocator); + const UUID& getClientId() const { return client_id_; } const std::string& tenant_id() const { return master_client_.tenant_id(); } @@ -668,7 +677,8 @@ class Client { const std::string& local_hostname, const std::string& metadata_connstring, const std::string& protocol, const std::optional& device_names); - void InitTransferSubmitter(); + void InitTransferSubmitter( + std::shared_ptr staging_allocator = nullptr); ErrorCode TransferData(const Replica::Descriptor& replica_descriptor, std::vector& slices, TransferRequest::OpCode op_code); diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 07982aa015..d2bb26fd7b 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -7,6 +7,7 @@ #endif #include +#include #include namespace mooncake { @@ -162,5 +163,173 @@ inline bool IsHostPointer(const void* ptr) { return true; // CPU-only build: all pointers are host #endif } +/// GPU-safe memcpy: auto-detects pointer types and dispatches to CopyAuto +/// for GPU pointers, std::memcpy for host pointers. +inline bool MemcpySafe(void* dst, const void* src, size_t size) { + int src_dev = -1, dst_dev = -1; + bool src_gpu = IsDevicePointer(src, &src_dev); + bool dst_gpu = IsDevicePointer(dst, &dst_dev); + if (!src_gpu && !dst_gpu) { + std::memcpy(dst, src, size); + return true; + } + int dev = src_gpu ? src_dev : dst_dev; + SetDevice(dev); + return CopyAuto(dst, src, size); +} + +// ── Async copy support ───────────────────────────────────────────────── +// Used by ensureRegisteredForRDMA to batch multiple GPU→host copies on a +// single stream, synchronise once, amortising per-call overhead. +// +// Design choices (borrowed from community PRs): +// - CUDA/HYGON/COREX: use Driver API (cuMemcpyAsync) instead of Runtime +// API (cudaMemcpyAsync) to avoid deadlocking with PyTorch's global +// CUDA Runtime mutex. See: github.com/kvcache-ai/Mooncake/pull/2094 +// - CUDA 12.8+: cudaMemcpyBatchAsync batches all copies into one driver +// call for lower per-transfer overhead. +// See: github.com/kvcache-ai/Mooncake/pull/1890 +// - MUSA/MACA: use Runtime API (no known deadlock issue with these runtimes). + +#if defined(USE_CUDA) || defined(USE_HYGON) || defined(USE_COREX) +// ── CUDA Driver API path ─────────────────────────────────────────────── + +/// One-shot Driver API initialization. +inline bool InitDriverAPI() { + static CUresult init_result = cuInit(0); + return init_result == CUDA_SUCCESS; +} + +/// Thread-local stream type for CUDA Driver API. +using AsyncCopyStream = CUstream; + +/// Get (or lazily create) a thread-local CUDA stream via Driver API. +inline AsyncCopyStream GetCopyStream() { + thread_local CUstream stream = nullptr; + if (!stream) { + if (!InitDriverAPI()) return nullptr; + auto err = cuStreamCreate(&stream, CU_STREAM_NON_BLOCKING); + if (err != CUDA_SUCCESS) { + const char* err_str = nullptr; + cuGetErrorString(err, &err_str); + LOG(WARNING) << "cuStreamCreate failed: " + << (err_str ? err_str : "unknown") + << "; falling back to sync"; + stream = nullptr; + } + } + return stream; +} + +/// Enqueue a single async copy via Driver API (auto-detects direction via UVA). +inline bool CopyAutoAsync(void* dst, const void* src, size_t size, + AsyncCopyStream stream) { + return cuMemcpyAsync(reinterpret_cast(dst), + reinterpret_cast(src), + size, stream) == CUDA_SUCCESS; +} + +/// Batch-enqueue multiple async copies. Uses cudaMemcpyBatchAsync on +/// CUDA 12.8+ for lower per-call overhead; falls back to looped +/// cuMemcpyAsync on older toolkits. +/// Returns false on any copy failure. +inline bool CopyBatchAsync(void* const* dsts, const void* const* srcs, + const size_t* sizes, size_t count, + AsyncCopyStream stream) { +#if CUDART_VERSION >= 13000 + // CUDA 13+: const void** signature, no fail_idx. + cudaMemcpyAttributes attr{}; + attr.srcAccessOrder = cudaMemcpySrcAccessOrderStream; + size_t attrs_idx = 0; + return cudaMemcpyBatchAsync( + const_cast(reinterpret_cast( + const_cast(dsts))), + const_cast(reinterpret_cast( + const_cast(srcs))), + sizes, count, &attr, &attrs_idx, 1, + reinterpret_cast(stream)) == cudaSuccess; +#elif CUDART_VERSION >= 12080 + // CUDA 12.8+: void** signature with fail_idx. + cudaMemcpyAttributes attr{}; + attr.srcAccessOrder = cudaMemcpySrcAccessOrderStream; + size_t attrs_idx = 0; + size_t fail_idx = count; + auto err = cudaMemcpyBatchAsync( + const_cast(reinterpret_cast(dsts)), + const_cast(reinterpret_cast(srcs)), + sizes, count, &attr, &attrs_idx, 1, &fail_idx, + reinterpret_cast(stream)); + if (err != cudaSuccess) { + LOG(ERROR) << "cudaMemcpyBatchAsync failed at index " << fail_idx + << ": " << cudaGetErrorString(err); + } + return err == cudaSuccess; +#else + // Pre-12.8: loop cuMemcpyAsync (Driver API, deadlock-safe). + for (size_t i = 0; i < count; ++i) { + auto err = cuMemcpyAsync(reinterpret_cast(dsts[i]), + reinterpret_cast(srcs[i]), + sizes[i], stream); + if (err != CUDA_SUCCESS) { + const char* err_str = nullptr; + cuGetErrorString(err, &err_str); + LOG(ERROR) << "cuMemcpyAsync failed at index " << i << ": " + << (err_str ? err_str : "unknown"); + return false; + } + } + return true; +#endif +} + +/// Synchronize the copy stream (Driver API). +inline bool SyncCopyStream(AsyncCopyStream stream) { + return cuStreamSynchronize(stream) == CUDA_SUCCESS; +} + +#elif defined(USE_MUSA) || defined(USE_MACA) +// ── MUSA/MACA Runtime API path ───────────────────────────────────────── + +using AsyncCopyStream = cudaStream_t; + +inline AsyncCopyStream GetCopyStream() { + thread_local cudaStream_t stream = nullptr; + if (!stream) { + auto err = cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking); + if (err != cudaSuccess) { + LOG(WARNING) << "cudaStreamCreate failed: " + << cudaGetErrorString(err); + stream = nullptr; + } + } + return stream; +} + +inline bool CopyAutoAsync(void* dst, const void* src, size_t size, + AsyncCopyStream stream) { + return cudaMemcpyAsync(dst, src, size, cudaMemcpyDefault, stream) == + cudaSuccess; +} + +inline bool CopyBatchAsync(void* const* dsts, const void* const* srcs, + const size_t* sizes, size_t count, + AsyncCopyStream stream) { + for (size_t i = 0; i < count; ++i) { + if (cudaMemcpyAsync(const_cast( + static_cast(dsts[i])), + srcs[i], sizes[i], cudaMemcpyDefault, + stream) != cudaSuccess) { + return false; + } + } + return true; +} + +inline bool SyncCopyStream(AsyncCopyStream stream) { + return cudaStreamSynchronize(stream) == cudaSuccess; +} + +#endif // platform selection + } // namespace gpu_staging } // namespace mooncake diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index c2e2201c5d..3b87b136a8 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -318,6 +318,17 @@ class PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}) = 0; + virtual std::vector batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}) { + (void)all_buffers; + (void)all_sizes; + (void)config; + return std::vector(keys.size(), -1); + } + virtual int upsert_parts( const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}) = 0; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 0678964497..eeaf7fa085 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -254,6 +254,12 @@ class RealClient : public PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}); + std::vector batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}); + int upsert_parts(const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}); @@ -593,6 +599,13 @@ class RealClient : public PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}); + std::vector> + batch_upsert_from_multi_buffers_internal( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}); + tl::expected upsert_parts_internal( const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}, diff --git a/mooncake-store/include/transfer_task.h b/mooncake-store/include/transfer_task.h index 63b061914d..f8d6e69281 100644 --- a/mooncake-store/include/transfer_task.h +++ b/mooncake-store/include/transfer_task.h @@ -20,6 +20,7 @@ #include "replica.h" #include "storage_backend.h" #include "client_metric.h" +#include "client_buffer.hpp" #ifdef USE_NOF #include "spdk/spdk_wrapper.h" #endif @@ -285,8 +286,17 @@ class TransferFuture { */ TransferStrategy strategy() const; + /** + * @brief Attach staging buffer handles that must stay alive until transfer + * completes. Used by ensureRegisteredForRDMA to keep staging memory valid. + */ + void attachStagingHandles(std::vector handles) { + staging_handles_ = std::move(handles); + } + private: std::shared_ptr state_; + std::vector staging_handles_; }; /** @@ -531,6 +541,7 @@ class TransferSubmitter { explicit TransferSubmitter(TransferEngine& engine, std::shared_ptr& backend, const std::string& local_hostname, + std::shared_ptr staging_allocator, TransferMetric* transfer_metric = nullptr, int numa_socket_id = 0); @@ -585,11 +596,23 @@ class TransferSubmitter { const std::string& local_endpoint); private: + /** + * @brief Ensure all slices are in RDMA-registered memory. + * + * For each slice NOT registered with the transfer engine, copies its + * data into a staging buffer (from staging_allocator_) that IS registered. + * Returns the prepared slices and staging handles that must stay alive + * until the transfer completes. + */ + std::pair, std::vector> + ensureRegisteredForRDMA(const std::vector& slices); + TransferEngine& engine_; // Cached at construction: the local transport endpoint never changes for // the lifetime of the TransferSubmitter, so we avoid calling // engine_.getLocalIpAndPort() (which allocates a string) on every transfer. const std::string local_endpoint_; + std::shared_ptr staging_allocator_; std::unique_ptr memcpy_pool_; #ifdef USE_NOF std::unique_ptr spdk_nvmf_pool_; diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 7210e0d52b..8b8b43a581 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -832,7 +832,8 @@ ErrorCode Client::InitTransferEngine( return ErrorCode::OK; } -void Client::InitTransferSubmitter() { +void Client::InitTransferSubmitter( + std::shared_ptr staging_allocator) { // Initialize TransferSubmitter after transfer engine is ready // Keep using logical local_hostname for name-based behaviors; endpoint is // used separately where needed. @@ -841,14 +842,21 @@ void Client::InitTransferSubmitter() { GetConfiguredNumaSocketId().value_or(GetCurrentNumaSocketId()); transfer_submitter_ = std::make_unique( *transfer_engine_, storage_backend_, local_hostname_, + std::move(staging_allocator), metrics_ ? &metrics_->transfer_metric : nullptr, numa_socket_id); #else transfer_submitter_ = std::make_unique( *transfer_engine_, storage_backend_, local_hostname_, + std::move(staging_allocator), metrics_ ? &metrics_->transfer_metric : nullptr); #endif } +void Client::setStagingAllocator( + std::shared_ptr staging_allocator) { + InitTransferSubmitter(std::move(staging_allocator)); +} + std::optional> Client::Create( const std::string& local_hostname, const std::string& metadata_connstring, const std::string& protocol, const std::optional& device_names, @@ -2707,6 +2715,12 @@ tl::expected Client::UnmountSegmentImpl( return tl::unexpected(err); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + cudaHostUnregister(reinterpret_cast(it->second.base)); + // Ignore failure — may not have been registered +#endif + int rc = transfer_engine_->unregisterLocalMemory( reinterpret_cast(it->second.base)); if (rc != 0) { @@ -2780,6 +2794,27 @@ tl::expected Client::MountSegmentAndGetId( return tl::unexpected(ErrorCode::INVALID_PARAMS); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + { + const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); + if (pin_env && + (std::string(pin_env) == "1" || + std::string(pin_env) == "true")) { + auto cuda_ret = cudaHostRegister((void*)buffer, size, + cudaHostRegisterDefault); + if (cuda_ret != cudaSuccess) { + LOG(WARNING) + << "cudaHostRegister failed for segment (size=" << size + << "): " << cudaGetErrorString(cuda_ret) + << "; GPU copies will use pageable fallback"; + } else { + LOG(INFO) << "cudaHostRegister segment OK, size=" << size; + } + } + } +#endif + Segment segment; segment.id = generate_uuid(); segment.name = local_hostname_; diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 81fd32d8e6..3b6a010ceb 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -761,6 +761,27 @@ tl::expected RealClient::setup_internal( << toString(result.error()); return tl::unexpected(result.error()); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + { + const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); + if (pin_env && + (std::string(pin_env) == "1" || + std::string(pin_env) == "true")) { + auto cuda_ret = cudaHostRegister( + client_buffer_allocator_->getBase(), local_buffer_size, + cudaHostRegisterDefault); + if (cuda_ret != cudaSuccess) { + LOG(WARNING) + << "cudaHostRegister staging buffer failed: " + << cudaGetErrorString(cuda_ret); + } else { + LOG(INFO) << "cudaHostRegister staging buffer OK, size=" + << local_buffer_size; + } + } + } +#endif { std::unique_lock lock(registered_buffer_mutex_); local_buffer_region_ = WritableBufferRegion{ @@ -773,6 +794,10 @@ tl::expected RealClient::setup_internal( LOG(INFO) << "Local buffer size is 0, skip registering local memory"; } + // Reinitialize TransferSubmitter with the staging allocator so that + // ensureRegisteredForRDMA can stage unregistered user buffers for RDMA. + client_->setStagingAllocator(client_buffer_allocator_); + // If global_segment_size is 0, skip mount segment; // If global_segment_size is larger than max_mr_size, split to multiple // mapped_shms. @@ -1681,20 +1706,14 @@ tl::expected RealClient::put_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto alloc_result = client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for put operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + // Staging is deferred to ensureRegisteredForRDMA in the transfer layer. + // For LOCAL_MEMCPY this eliminates one copy; for RDMA with registered + // buffers it also eliminates one copy; for RDMA with unregistered buffers + // the total work is the same (staging just happens later). + (void)client_buffer_allocator; - std::vector slices = split_into_slices(buffer_handle); + std::vector slices = split_into_slices( + const_cast(value.data()), value.size_bytes()); auto put_result = client_->Put(key, slices, config); if (!put_result) { @@ -1750,43 +1769,13 @@ tl::expected RealClient::put_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - std::vector buffer_handles; - std::unordered_map> batched_slices; - batched_slices.reserve(keys.size()); + (void)client_buffer_allocator; - for (size_t i = 0; i < keys.size(); ++i) { - auto &key = keys[i]; - auto &value = values[i]; - auto alloc_result = - client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for put_batch operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); - auto slices = split_into_slices(buffer_handle); - buffer_handles.emplace_back(std::move(*alloc_result)); - batched_slices.emplace(key, std::move(slices)); - } - - // Convert unordered_map to vector format expected by BatchPut std::vector> ordered_batched_slices; ordered_batched_slices.reserve(keys.size()); - for (const auto &key : keys) { - auto it = batched_slices.find(key); - if (it != batched_slices.end()) { - ordered_batched_slices.emplace_back(it->second); - } else { - LOG(ERROR) << "Missing slices for key: " << key; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + for (size_t i = 0; i < keys.size(); ++i) { + ordered_batched_slices.emplace_back(split_into_slices( + const_cast(values[i].data()), values[i].size_bytes())); } auto results = client_->BatchPut(keys, ordered_batched_slices, config); @@ -1845,44 +1834,25 @@ tl::expected RealClient::put_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + (void)client_buffer_allocator; - // Calculate total size needed - size_t total_size = 0; + // Create slices directly from each part's data pointer. + // Parts are stored sequentially in the object — the server assembles + // them in order. No staging copy needed; ensureRegisteredForRDMA handles + // RDMA registration if required. + std::vector slices; for (const auto &value : values) { - total_size += value.size_bytes(); + if (value.size_bytes() == 0) continue; + auto part_slices = split_into_slices( + const_cast(value.data()), value.size_bytes()); + slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } - if (total_size == 0) { + if (slices.empty()) { LOG(WARNING) << "Attempting to put empty data for key: " << key; return {}; } - // Allocate buffer using the new allocator - auto alloc_result = client_buffer_allocator->allocate(total_size); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for put_parts operation, key: " - << key << ", total size: " << total_size; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - - auto &buffer_handle = *alloc_result; - - // Copy all parts into the contiguous buffer - size_t offset = 0; - for (const auto &value : values) { - memcpy(static_cast(buffer_handle.ptr()) + offset, value.data(), - value.size_bytes()); - offset += value.size_bytes(); - } - - // Split into slices - std::vector slices = split_into_slices(buffer_handle); - - // Perform the put operation - buffer_handle will be automatically released auto put_result = client_->Put(key, slices, config); if (!put_result) { LOG(ERROR) << "Put operation failed with error: " @@ -3713,20 +3683,10 @@ tl::expected RealClient::upsert_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto alloc_result = client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for upsert operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + (void)client_buffer_allocator; - std::vector slices = split_into_slices(buffer_handle); + std::vector slices = split_into_slices( + const_cast(value.data()), value.size_bytes()); auto result = client_->Upsert(key, slices, config); if (!result) { @@ -3945,38 +3905,21 @@ tl::expected RealClient::upsert_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + (void)client_buffer_allocator; - size_t total_size = 0; + std::vector slices; for (const auto &value : values) { - total_size += value.size_bytes(); + if (value.size_bytes() == 0) continue; + auto part_slices = split_into_slices( + const_cast(value.data()), value.size_bytes()); + slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } - if (total_size == 0) { + + if (slices.empty()) { LOG(WARNING) << "Attempting to upsert empty data for key: " << key; return {}; } - auto alloc_result = client_buffer_allocator->allocate(total_size); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for upsert_parts operation, key: " - << key << ", total size: " << total_size; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - - auto &buffer_handle = *alloc_result; - size_t offset = 0; - for (const auto &value : values) { - memcpy(static_cast(buffer_handle.ptr()) + offset, value.data(), - value.size_bytes()); - offset += value.size_bytes(); - } - - std::vector slices = split_into_slices(buffer_handle); - auto result = client_->Upsert(key, slices, config); if (!result) { LOG(ERROR) << "Upsert operation failed with error: " @@ -4034,43 +3977,12 @@ tl::expected RealClient::upsert_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - std::vector buffer_handles; - std::unordered_map> batched_slices; - batched_slices.reserve(keys.size()); - - for (size_t i = 0; i < keys.size(); ++i) { - auto &key = keys[i]; - auto &value = values[i]; - auto alloc_result = - client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for upsert_batch operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); - auto slices = split_into_slices(buffer_handle); - buffer_handles.emplace_back(std::move(*alloc_result)); - batched_slices.emplace(key, std::move(slices)); - } - - // Convert unordered_map to vector format expected by BatchUpsert + (void)client_buffer_allocator; // staging moved to ensureRegisteredForRDMA std::vector> ordered_batched_slices; ordered_batched_slices.reserve(keys.size()); - for (const auto &key : keys) { - auto it = batched_slices.find(key); - if (it != batched_slices.end()) { - ordered_batched_slices.emplace_back(it->second); - } else { - LOG(ERROR) << "Missing slices for key: " << key; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + for (size_t i = 0; i < keys.size(); ++i) { + ordered_batched_slices.emplace_back(split_into_slices( + const_cast(values[i].data()), values[i].size_bytes())); } auto results = client_->BatchUpsert(keys, ordered_batched_slices, config); @@ -4810,6 +4722,75 @@ RealClient::batch_put_from_multi_buffers_internal( return client_->BatchPut(keys, batched_slices, config); } +std::vector RealClient::batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &sizes, + const ReplicateConfig &config) { + auto internal_results = + execute_timed_operation>>( + [&]() { + return batch_upsert_from_multi_buffers_internal( + keys, all_buffers, sizes, config); + }, + [](const auto &) { return true; }, + [&](uint64_t latency_us, const auto &ret) { + std::vector py_results; + py_results.reserve(ret.size()); + for (const auto &item : ret) { + py_results.push_back(to_py_ret(item)); + } + client_->ObserveTransferOperation( + TransferOperationKind::kWrite, + "batch_upsert_from_multi_buffers", + sum_successful_nested_sizes(py_results, sizes), latency_us); + }); + std::vector results; + results.reserve(internal_results.size()); + + for (const auto &result : internal_results) { + results.push_back(to_py_ret(result)); + } + + return results; +} + +std::vector> +RealClient::batch_upsert_from_multi_buffers_internal( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config) { + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + if ((keys.size() != all_buffers.size()) || + (all_buffers.size() != all_sizes.size())) { + LOG(ERROR) << "Mismatched sizes for keys, buffers, and sizes"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + std::vector> batched_slices(keys.size()); + for (size_t i = 0; i < all_buffers.size(); ++i) { + const auto &buffers = all_buffers[i]; + const auto &sizes = all_sizes[i]; + if (buffers.size() != sizes.size()) { + LOG(ERROR) << "Mismatched buffers and sizes of key:" << keys[i]; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + batched_slices[i].reserve(buffers.size()); + for (size_t j = 0; j < buffers.size(); ++j) { + batched_slices[i].emplace_back(Slice{buffers[j], sizes[j]}); + } + } + return client_->BatchUpsert(keys, batched_slices, config); +} + std::vector RealClient::batch_get_into_multi_buffers( const std::vector &keys, const std::vector> &all_buffers, diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 5995e3c4fd..b4ee3fe632 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -903,10 +903,12 @@ TransferStrategy TransferFuture::strategy() const { TransferSubmitter::TransferSubmitter(TransferEngine& engine, std::shared_ptr& backend, const std::string& local_hostname, + std::shared_ptr staging_allocator, TransferMetric* transfer_metric, int numa_socket_id) : engine_(engine), local_endpoint_(engine.getLocalIpAndPort()), + staging_allocator_(std::move(staging_allocator)), memcpy_pool_(std::make_unique()), #ifdef USE_NOF spdk_nvmf_pool_(std::make_unique(numa_socket_id)), @@ -947,6 +949,161 @@ TransferSubmitter::TransferSubmitter(TransferEngine& engine, << memcpy_enabled_; } +std::pair, std::vector> +TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { + // First pass: identify unregistered slices and compute total staging size. + // Batching into one contiguous allocation avoids per-slice allocator + // overhead which can be significant (30ms+ for 33 slices at 512MB). + size_t total_staging = 0; + std::vector needs_staging(slices.size(), false); + for (size_t i = 0; i < slices.size(); ++i) { + if (!engine_.isLocalMemoryRegistered(slices[i].ptr)) { + needs_staging[i] = true; + total_staging += slices[i].size; + } + } + + // Fast path: all slices already registered — still need kMaxSliceSize + // splitting because callers (e.g. multi_buffers) may pass large slices. + if (total_staging == 0) { + std::vector split; + split.reserve(slices.size()); + for (const auto& s : slices) { + auto sub = split_into_slices(s.ptr, s.size); + split.insert(split.end(), sub.begin(), sub.end()); + } + return std::make_pair(std::move(split), std::vector{}); + } + + if (!staging_allocator_) { + LOG(ERROR) << "No staging allocator for unregistered RDMA slices"; + return std::make_pair(std::vector{}, std::vector{}); + } + + // Allocate ONE contiguous staging buffer for all unregistered data. + auto staging_alloc = staging_allocator_->allocate(total_staging); + if (!staging_alloc) { + LOG(ERROR) << "Staging alloc failed, total_size=" << total_staging; + return std::make_pair(std::vector{}, std::vector{}); + } + + char* staging_base = static_cast(staging_alloc->ptr()); + size_t staging_offset = 0; + + std::vector prepared; + std::vector staging_handles; + prepared.reserve(slices.size()); + +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + // Check whether the async staging path is enabled (env checked per call + // so benchmarks can toggle mid-process; cost is negligible vs copy work). + { + const char* async_env = std::getenv("MC_STORE_ASYNC_STAGING"); + const bool kAsyncStaging = + async_env && + (std::string(async_env) == "1" || std::string(async_env) == "true"); + // Async path: uses Driver API (CUDA) or Runtime API (MUSA/MACA) to + // batch GPU→host copies on one stream, then synchronise once. + // Driver API avoids deadlocking with PyTorch's CUDA Runtime mutex. + if (kAsyncStaging) { + auto stream = gpu_staging::GetCopyStream(); + if (stream) { + + // Collect GPU→staging copies for batch submission. + std::vector batch_dsts; + std::vector batch_srcs; + std::vector batch_sizes; + + for (size_t i = 0; i < slices.size(); ++i) { + if (!needs_staging[i]) { + // Already registered — split by kMaxSliceSize for RDMA limits. + auto sub = split_into_slices(slices[i].ptr, slices[i].size); + prepared.insert(prepared.end(), sub.begin(), sub.end()); + } else { + void* staged_ptr = staging_base + staging_offset; + staging_offset += slices[i].size; + + int src_dev = -1; + if (gpu_staging::IsDevicePointer(slices[i].ptr, &src_dev)) { + // GPU source: enqueue for batch async copy. + gpu_staging::SetDevice(src_dev); + batch_dsts.push_back(staged_ptr); + batch_srcs.push_back(slices[i].ptr); + batch_sizes.push_back(slices[i].size); + } else { + // Host→host: plain memcpy is faster than GPU runtime. + std::memcpy(staged_ptr, slices[i].ptr, slices[i].size); + } + + // Split staged data by kMaxSliceSize for RDMA transfer limits + size_t remaining = slices[i].size; + size_t split_offset = 0; + while (remaining > 0) { + size_t chunk = std::min(remaining, kMaxSliceSize); + prepared.emplace_back( + Slice{static_cast(staged_ptr) + split_offset, + chunk}); + split_offset += chunk; + remaining -= chunk; + } + } + } + + // Submit all GPU copies in one batch call, then sync once. + if (!batch_dsts.empty()) { + if (!gpu_staging::CopyBatchAsync( + batch_dsts.data(), batch_srcs.data(), batch_sizes.data(), + batch_dsts.size(), stream)) { + LOG(ERROR) << "CopyBatchAsync failed"; + return std::make_pair(std::vector{}, std::vector{}); + } + if (!gpu_staging::SyncCopyStream(stream)) { + LOG(ERROR) << "SyncCopyStream failed"; + return std::make_pair(std::vector{}, std::vector{}); + } + } + staging_handles.push_back(std::move(*staging_alloc)); + return {std::move(prepared), std::move(staging_handles)}; + + } // if (stream) + // stream creation failed — fall through to sync path + } + } // kAsyncStaging scope +#endif // USE_CUDA / USE_MUSA / ... + + // Sync path (default): copy each unregistered slice into the contiguous + // staging buffer. MemcpySafe auto-detects GPU vs CPU pointers. + // Staged slices are split by kMaxSliceSize to match RDMA transfer limits. + for (size_t i = 0; i < slices.size(); ++i) { + if (!needs_staging[i]) { + // Already registered — split by kMaxSliceSize for RDMA limits. + auto sub = split_into_slices(slices[i].ptr, slices[i].size); + prepared.insert(prepared.end(), sub.begin(), sub.end()); + } else { + void* staged_ptr = staging_base + staging_offset; + staging_offset += slices[i].size; + if (!gpu_staging::MemcpySafe(staged_ptr, slices[i].ptr, + slices[i].size)) { + LOG(ERROR) << "MemcpySafe failed during RDMA staging"; + return std::make_pair(std::vector{}, std::vector{}); + } + // Split staged data by kMaxSliceSize for RDMA transfer limits + size_t remaining = slices[i].size; + size_t split_offset = 0; + while (remaining > 0) { + size_t chunk = std::min(remaining, kMaxSliceSize); + prepared.emplace_back( + Slice{static_cast(staged_ptr) + split_offset, chunk}); + split_offset += chunk; + remaining -= chunk; + } + } + } + staging_handles.push_back(std::move(*staging_alloc)); + return {std::move(prepared), std::move(staging_handles)}; +} + std::optional TransferSubmitter::submit( const Replica::Descriptor& replica, std::vector& slices, TransferRequest::OpCode op_code, void* ptr, size_t size) { @@ -969,10 +1126,20 @@ std::optional TransferSubmitter::submit( case TransferStrategy::LOCAL_MEMCPY: future = submitMemcpyOperation(handle, slices, op_code); break; - case TransferStrategy::TRANSFER_ENGINE: - future = - submitTransferEngineOperation(handle, slices, op_code); + case TransferStrategy::TRANSFER_ENGINE: { + auto [prepared, staging] = + ensureRegisteredForRDMA(slices); + if (prepared.empty() && !slices.empty()) { + LOG(ERROR) << "ensureRegisteredForRDMA failed"; + return std::nullopt; + } + future = submitTransferEngineOperation( + handle, prepared, op_code); + if (future && !staging.empty()) { + future->attachStagingHandles(std::move(staging)); + } break; + } default: LOG(ERROR) << "Unknown transfer strategy: " << strategy; return std::nullopt; @@ -1008,8 +1175,13 @@ std::optional TransferSubmitter::submit_batch( const std::vector& replicas, std::vector>& all_slices, TransferRequest::OpCode op_code) { + // NOTE: submit_batch always goes through TRANSFER_ENGINE (RDMA), never + // LOCAL_MEMCPY. The selectStrategy dispatch is in submit() for single- + // replica transfers; BatchPut/BatchUpsert only call submit_batch when + // all replicas are remote (memory-backed, non-local). std::optional future; std::vector requests; + std::vector all_staging_handles; for (size_t i = 0; i < replicas.size(); ++i) { auto& replica = replicas[i]; auto& slices = all_slices[i]; @@ -1017,6 +1189,23 @@ std::optional TransferSubmitter::submit_batch( if (!validateTransferParams(mem_desc.buffer_descriptor, slices)) { return std::nullopt; } + + // For WRITE ops, ensure slices are in RDMA-registered memory. + const std::vector* effective_slices = &slices; + std::vector prepared; + if (op_code == TransferRequest::WRITE) { + auto [p, staging] = ensureRegisteredForRDMA(slices); + if (p.empty() && !slices.empty()) { + LOG(ERROR) << "ensureRegisteredForRDMA failed in submit_batch"; + return std::nullopt; + } + prepared = std::move(p); + effective_slices = &prepared; + for (auto& h : staging) { + all_staging_handles.push_back(std::move(h)); + } + } + auto& handle = mem_desc.buffer_descriptor; uint64_t offset = 0; SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_); @@ -1025,7 +1214,7 @@ std::optional TransferSubmitter::submit_batch( << handle.transport_endpoint_; return std::nullopt; } - for (auto slice : slices) { + for (const auto& slice : *effective_slices) { TransferRequest request; request.opcode = op_code; request.source = static_cast(slice.ptr); @@ -1039,6 +1228,9 @@ std::optional TransferSubmitter::submit_batch( future = submitTransfer(requests); // Update metrics on successful submission if (future.has_value()) { + if (!all_staging_handles.empty()) { + future->attachStagingHandles(std::move(all_staging_handles)); + } for (auto& slices : all_slices) { updateTransferMetrics(slices, op_code); } @@ -1091,39 +1283,50 @@ std::optional TransferSubmitter::submitMemcpyOperation( const TransferRequest::OpCode op_code, uint64_t src_offset) { auto state = std::make_shared(); - // Create memcpy operations - std::vector operations; - operations.reserve(slices.size()); uint64_t base_address = static_cast(handle.buffer_address_); uint64_t offset = src_offset; + // Execute memcpy inline on the calling thread instead of going through + // the worker pool. This eliminates thread synchronization overhead and + // keeps data on the same NUMA node. The worker pool (1 thread) added + // ~30ms overhead for 512MB on eRDMA hardware. + bool ok = true; for (size_t i = 0; i < slices.size(); ++i) { const auto& slice = slices[i]; - if (slice.ptr == nullptr) continue; void* dest; const void* src; - if (op_code == TransferRequest::READ) { - // READ: from handle (remote buffer) to slice (local buffer) dest = slice.ptr; src = reinterpret_cast(base_address + offset); } else { - // WRITE: from slice (local buffer) to handle (remote buffer) dest = reinterpret_cast(base_address + offset); src = slice.ptr; } offset += slice.size; - operations.emplace_back(dest, src, slice.size); + int src_dev = -1, dst_dev = -1; + bool src_on_gpu = gpu_staging::IsDevicePointer(src, &src_dev); + bool dst_on_gpu = gpu_staging::IsDevicePointer(dest, &dst_dev); + + if (!src_on_gpu && !dst_on_gpu) { + std::memcpy(dest, src, slice.size); + } else { + int dev = src_on_gpu ? src_dev : dst_dev; + gpu_staging::SetDevice(dev); + if (!gpu_staging::CopyAuto(dest, src, slice.size)) { + LOG(ERROR) << "GPU memcpy failed: src_dev=" << src_dev + << " dst_dev=" << dst_dev << " size=" << slice.size; + ok = false; + break; + } + } } - // Submit memcpy operations to worker pool for async execution - MemcpyTask task(std::move(operations), state); - memcpy_pool_->submitTask(std::move(task)); + state->set_completed(ok ? ErrorCode::OK : ErrorCode::TRANSFER_FAIL); - VLOG(1) << "Memcpy transfer submitted to worker pool with " << slices.size() + VLOG(1) << "Memcpy transfer completed inline with " << slices.size() << " operations"; return TransferFuture(state); @@ -1317,18 +1520,10 @@ std::optional TransferSubmitter::submitFileReadOperation( TransferStrategy TransferSubmitter::selectStrategy( const AllocatedBuffer::Descriptor& handle, const std::vector& slices) const { - // Check if memcpy operations are enabled via environment variable - if (!memcpy_enabled_) { - VLOG(2) << "Memcpy operations disabled via MC_STORE_MEMCPY environment " - "variable"; - return TransferStrategy::TRANSFER_ENGINE; - } - - // Check conditions for local memcpy optimization - if (isLocalTransfer(handle)) { + (void)slices; + if (memcpy_enabled_ && isLocalTransfer(handle)) { return TransferStrategy::LOCAL_MEMCPY; } - return TransferStrategy::TRANSFER_ENGINE; } diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 80a31edc98..b7a9664c74 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -184,6 +184,8 @@ class TransferEngine { bool checkOverlap(void* addr, uint64_t length); + bool isLocalMemoryRegistered(const void* addr); + void setAutoDiscover(bool auto_discover); void* getBaseAddr(); diff --git a/mooncake-transfer-engine/include/transfer_engine_impl.h b/mooncake-transfer-engine/include/transfer_engine_impl.h index b1e4fff7e8..fd01a90288 100644 --- a/mooncake-transfer-engine/include/transfer_engine_impl.h +++ b/mooncake-transfer-engine/include/transfer_engine_impl.h @@ -361,6 +361,8 @@ class TransferEngineImpl { bool checkOverlap(void* addr, uint64_t length); + bool isLocalMemoryRegistered(const void* addr); + #ifdef ENABLE_MULTI_PROTOCOL struct RegisteredRecord { Transport* transport; diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 3747244e76..1d5510a8ee 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -204,6 +204,10 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return impl_->checkOverlap(addr, length); } +bool TransferEngine::isLocalMemoryRegistered(const void* addr) { + return impl_->isLocalMemoryRegistered(addr); +} + void TransferEngine::setAutoDiscover(bool auto_discover) { impl_->setAutoDiscover(auto_discover); } @@ -637,6 +641,11 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return false; } +bool TransferEngine::isLocalMemoryRegistered(const void* addr) { + if (!use_tent_) return impl_->isLocalMemoryRegistered(addr); + return false; // Conservative for TENT mode +} + void TransferEngine::setAutoDiscover(bool auto_discover) { if (!use_tent_) impl_->setAutoDiscover(auto_discover); } diff --git a/mooncake-transfer-engine/src/transfer_engine_impl.cpp b/mooncake-transfer-engine/src/transfer_engine_impl.cpp index 83960ad575..2c96462ef6 100644 --- a/mooncake-transfer-engine/src/transfer_engine_impl.cpp +++ b/mooncake-transfer-engine/src/transfer_engine_impl.cpp @@ -561,6 +561,12 @@ bool TransferEngineImpl::checkOverlap(void* addr, uint64_t length) { return hasOverlapLocked(reinterpret_cast(addr), length); } +bool TransferEngineImpl::isLocalMemoryRegistered(const void* addr) { + std::shared_lock lock(mutex_); + return findMemoryRegionContaining(reinterpret_cast(addr)) + != local_memory_regions_.end(); +} + int TransferEngineImpl::registerLocalMemory(void* addr, size_t length, const std::string& location, bool remote_accessible, From fc7aa5ea9c54b325f016f8f9b7c3e881946da6e0 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Mon, 22 Jun 2026 04:18:10 +0000 Subject: [PATCH 02/15] [Store] Enable cudaHostRegister by default for GPU builds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pin staging and segment buffers by default so that GPU→host copies use direct DMA instead of CUDA's internal staging through a temporary pinned buffer. This roughly doubles GPU→CPU memcpy bandwidth on PCIe. Opt out with MC_STORE_PIN_MEMORY=0 if pinning conflicts with other GPU workloads or exceeds the locked-page limit. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/src/client_service.cpp | 9 ++++++--- mooncake-store/src/real_client.cpp | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 8b8b43a581..5044cb0dcd 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -2797,10 +2797,13 @@ tl::expected Client::MountSegmentAndGetId( #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ defined(USE_HYGON) || defined(USE_COREX) { + // Pin segment buffer by default so GPU→host copies use DMA + // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0 const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); - if (pin_env && - (std::string(pin_env) == "1" || - std::string(pin_env) == "true")) { + bool pin_memory = !(pin_env && + (std::string(pin_env) == "0" || + std::string(pin_env) == "false")); + if (pin_memory) { auto cuda_ret = cudaHostRegister((void*)buffer, size, cudaHostRegisterDefault); if (cuda_ret != cudaSuccess) { diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 3b6a010ceb..8f7803df0a 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -764,10 +764,13 @@ tl::expected RealClient::setup_internal( #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ defined(USE_HYGON) || defined(USE_COREX) { + // Pin staging buffer by default so GPU→host copies use DMA + // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0 const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); - if (pin_env && - (std::string(pin_env) == "1" || - std::string(pin_env) == "true")) { + bool pin_memory = !(pin_env && + (std::string(pin_env) == "0" || + std::string(pin_env) == "false")); + if (pin_memory) { auto cuda_ret = cudaHostRegister( client_buffer_allocator_->getBase(), local_buffer_size, cudaHostRegisterDefault); From 774b6659e55a455710255c4002cfcb51926e6b9a Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 06:51:50 +0000 Subject: [PATCH 03/15] [Store] Remove async GPU staging path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the MC_STORE_ASYNC_STAGING path and its CUDA/MUSA async-copy helpers. Benchmarking showed no stable benefit for GPU→RDMA staging: single put_from was 0.97x-1.01x and small batch cases regressed up to 0.93x, while the synchronous MemcpySafe path remains simpler and stable. Pinned host memory remains enabled by default for GPU builds, which provides the material GPU→CPU gain. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 152 --------------------- mooncake-store/src/transfer_task.cpp | 77 ----------- 2 files changed, 229 deletions(-) diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index d2bb26fd7b..33b5b4563c 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -178,158 +178,6 @@ inline bool MemcpySafe(void* dst, const void* src, size_t size) { return CopyAuto(dst, src, size); } -// ── Async copy support ───────────────────────────────────────────────── -// Used by ensureRegisteredForRDMA to batch multiple GPU→host copies on a -// single stream, synchronise once, amortising per-call overhead. -// -// Design choices (borrowed from community PRs): -// - CUDA/HYGON/COREX: use Driver API (cuMemcpyAsync) instead of Runtime -// API (cudaMemcpyAsync) to avoid deadlocking with PyTorch's global -// CUDA Runtime mutex. See: github.com/kvcache-ai/Mooncake/pull/2094 -// - CUDA 12.8+: cudaMemcpyBatchAsync batches all copies into one driver -// call for lower per-transfer overhead. -// See: github.com/kvcache-ai/Mooncake/pull/1890 -// - MUSA/MACA: use Runtime API (no known deadlock issue with these runtimes). - -#if defined(USE_CUDA) || defined(USE_HYGON) || defined(USE_COREX) -// ── CUDA Driver API path ─────────────────────────────────────────────── - -/// One-shot Driver API initialization. -inline bool InitDriverAPI() { - static CUresult init_result = cuInit(0); - return init_result == CUDA_SUCCESS; -} - -/// Thread-local stream type for CUDA Driver API. -using AsyncCopyStream = CUstream; - -/// Get (or lazily create) a thread-local CUDA stream via Driver API. -inline AsyncCopyStream GetCopyStream() { - thread_local CUstream stream = nullptr; - if (!stream) { - if (!InitDriverAPI()) return nullptr; - auto err = cuStreamCreate(&stream, CU_STREAM_NON_BLOCKING); - if (err != CUDA_SUCCESS) { - const char* err_str = nullptr; - cuGetErrorString(err, &err_str); - LOG(WARNING) << "cuStreamCreate failed: " - << (err_str ? err_str : "unknown") - << "; falling back to sync"; - stream = nullptr; - } - } - return stream; -} - -/// Enqueue a single async copy via Driver API (auto-detects direction via UVA). -inline bool CopyAutoAsync(void* dst, const void* src, size_t size, - AsyncCopyStream stream) { - return cuMemcpyAsync(reinterpret_cast(dst), - reinterpret_cast(src), - size, stream) == CUDA_SUCCESS; -} - -/// Batch-enqueue multiple async copies. Uses cudaMemcpyBatchAsync on -/// CUDA 12.8+ for lower per-call overhead; falls back to looped -/// cuMemcpyAsync on older toolkits. -/// Returns false on any copy failure. -inline bool CopyBatchAsync(void* const* dsts, const void* const* srcs, - const size_t* sizes, size_t count, - AsyncCopyStream stream) { -#if CUDART_VERSION >= 13000 - // CUDA 13+: const void** signature, no fail_idx. - cudaMemcpyAttributes attr{}; - attr.srcAccessOrder = cudaMemcpySrcAccessOrderStream; - size_t attrs_idx = 0; - return cudaMemcpyBatchAsync( - const_cast(reinterpret_cast( - const_cast(dsts))), - const_cast(reinterpret_cast( - const_cast(srcs))), - sizes, count, &attr, &attrs_idx, 1, - reinterpret_cast(stream)) == cudaSuccess; -#elif CUDART_VERSION >= 12080 - // CUDA 12.8+: void** signature with fail_idx. - cudaMemcpyAttributes attr{}; - attr.srcAccessOrder = cudaMemcpySrcAccessOrderStream; - size_t attrs_idx = 0; - size_t fail_idx = count; - auto err = cudaMemcpyBatchAsync( - const_cast(reinterpret_cast(dsts)), - const_cast(reinterpret_cast(srcs)), - sizes, count, &attr, &attrs_idx, 1, &fail_idx, - reinterpret_cast(stream)); - if (err != cudaSuccess) { - LOG(ERROR) << "cudaMemcpyBatchAsync failed at index " << fail_idx - << ": " << cudaGetErrorString(err); - } - return err == cudaSuccess; -#else - // Pre-12.8: loop cuMemcpyAsync (Driver API, deadlock-safe). - for (size_t i = 0; i < count; ++i) { - auto err = cuMemcpyAsync(reinterpret_cast(dsts[i]), - reinterpret_cast(srcs[i]), - sizes[i], stream); - if (err != CUDA_SUCCESS) { - const char* err_str = nullptr; - cuGetErrorString(err, &err_str); - LOG(ERROR) << "cuMemcpyAsync failed at index " << i << ": " - << (err_str ? err_str : "unknown"); - return false; - } - } - return true; -#endif -} - -/// Synchronize the copy stream (Driver API). -inline bool SyncCopyStream(AsyncCopyStream stream) { - return cuStreamSynchronize(stream) == CUDA_SUCCESS; -} - -#elif defined(USE_MUSA) || defined(USE_MACA) -// ── MUSA/MACA Runtime API path ───────────────────────────────────────── - -using AsyncCopyStream = cudaStream_t; - -inline AsyncCopyStream GetCopyStream() { - thread_local cudaStream_t stream = nullptr; - if (!stream) { - auto err = cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking); - if (err != cudaSuccess) { - LOG(WARNING) << "cudaStreamCreate failed: " - << cudaGetErrorString(err); - stream = nullptr; - } - } - return stream; -} - -inline bool CopyAutoAsync(void* dst, const void* src, size_t size, - AsyncCopyStream stream) { - return cudaMemcpyAsync(dst, src, size, cudaMemcpyDefault, stream) == - cudaSuccess; -} - -inline bool CopyBatchAsync(void* const* dsts, const void* const* srcs, - const size_t* sizes, size_t count, - AsyncCopyStream stream) { - for (size_t i = 0; i < count; ++i) { - if (cudaMemcpyAsync(const_cast( - static_cast(dsts[i])), - srcs[i], sizes[i], cudaMemcpyDefault, - stream) != cudaSuccess) { - return false; - } - } - return true; -} - -inline bool SyncCopyStream(AsyncCopyStream stream) { - return cudaStreamSynchronize(stream) == cudaSuccess; -} - -#endif // platform selection } // namespace gpu_staging } // namespace mooncake diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index b4ee3fe632..a6d6d05172 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -994,83 +994,6 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { std::vector staging_handles; prepared.reserve(slices.size()); -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ - defined(USE_HYGON) || defined(USE_COREX) - // Check whether the async staging path is enabled (env checked per call - // so benchmarks can toggle mid-process; cost is negligible vs copy work). - { - const char* async_env = std::getenv("MC_STORE_ASYNC_STAGING"); - const bool kAsyncStaging = - async_env && - (std::string(async_env) == "1" || std::string(async_env) == "true"); - // Async path: uses Driver API (CUDA) or Runtime API (MUSA/MACA) to - // batch GPU→host copies on one stream, then synchronise once. - // Driver API avoids deadlocking with PyTorch's CUDA Runtime mutex. - if (kAsyncStaging) { - auto stream = gpu_staging::GetCopyStream(); - if (stream) { - - // Collect GPU→staging copies for batch submission. - std::vector batch_dsts; - std::vector batch_srcs; - std::vector batch_sizes; - - for (size_t i = 0; i < slices.size(); ++i) { - if (!needs_staging[i]) { - // Already registered — split by kMaxSliceSize for RDMA limits. - auto sub = split_into_slices(slices[i].ptr, slices[i].size); - prepared.insert(prepared.end(), sub.begin(), sub.end()); - } else { - void* staged_ptr = staging_base + staging_offset; - staging_offset += slices[i].size; - - int src_dev = -1; - if (gpu_staging::IsDevicePointer(slices[i].ptr, &src_dev)) { - // GPU source: enqueue for batch async copy. - gpu_staging::SetDevice(src_dev); - batch_dsts.push_back(staged_ptr); - batch_srcs.push_back(slices[i].ptr); - batch_sizes.push_back(slices[i].size); - } else { - // Host→host: plain memcpy is faster than GPU runtime. - std::memcpy(staged_ptr, slices[i].ptr, slices[i].size); - } - - // Split staged data by kMaxSliceSize for RDMA transfer limits - size_t remaining = slices[i].size; - size_t split_offset = 0; - while (remaining > 0) { - size_t chunk = std::min(remaining, kMaxSliceSize); - prepared.emplace_back( - Slice{static_cast(staged_ptr) + split_offset, - chunk}); - split_offset += chunk; - remaining -= chunk; - } - } - } - - // Submit all GPU copies in one batch call, then sync once. - if (!batch_dsts.empty()) { - if (!gpu_staging::CopyBatchAsync( - batch_dsts.data(), batch_srcs.data(), batch_sizes.data(), - batch_dsts.size(), stream)) { - LOG(ERROR) << "CopyBatchAsync failed"; - return std::make_pair(std::vector{}, std::vector{}); - } - if (!gpu_staging::SyncCopyStream(stream)) { - LOG(ERROR) << "SyncCopyStream failed"; - return std::make_pair(std::vector{}, std::vector{}); - } - } - staging_handles.push_back(std::move(*staging_alloc)); - return {std::move(prepared), std::move(staging_handles)}; - - } // if (stream) - // stream creation failed — fall through to sync path - } - } // kAsyncStaging scope -#endif // USE_CUDA / USE_MUSA / ... // Sync path (default): copy each unregistered slice into the contiguous // staging buffer. MemcpySafe auto-detects GPU vs CPU pointers. From 49078b3e45d760b98c28f710fcfea329920cb801 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 08:23:55 +0000 Subject: [PATCH 04/15] [Store] Address zero-copy write review comments Handle zero-sized copies safely and reduce redundant GPU pointer queries in the memcpy submit path. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 1 + mooncake-store/src/real_client.cpp | 2 +- mooncake-store/src/transfer_task.cpp | 34 +++++++++++++--------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 33b5b4563c..1fafca01dd 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -166,6 +166,7 @@ inline bool IsHostPointer(const void* ptr) { /// GPU-safe memcpy: auto-detects pointer types and dispatches to CopyAuto /// for GPU pointers, std::memcpy for host pointers. inline bool MemcpySafe(void* dst, const void* src, size_t size) { + if (size == 0) return true; int src_dev = -1, dst_dev = -1; bool src_gpu = IsDevicePointer(src, &src_dev); bool dst_gpu = IsDevicePointer(dst, &dst_dev); diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 8f7803df0a..df6109bfb9 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -770,7 +770,7 @@ tl::expected RealClient::setup_internal( bool pin_memory = !(pin_env && (std::string(pin_env) == "0" || std::string(pin_env) == "false")); - if (pin_memory) { + if (pin_memory && local_buffer_size > 0) { auto cuda_ret = cudaHostRegister( client_buffer_allocator_->getBase(), local_buffer_size, cudaHostRegisterDefault); diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index a6d6d05172..22e01d35b9 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -1011,16 +1011,9 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { LOG(ERROR) << "MemcpySafe failed during RDMA staging"; return std::make_pair(std::vector{}, std::vector{}); } - // Split staged data by kMaxSliceSize for RDMA transfer limits - size_t remaining = slices[i].size; - size_t split_offset = 0; - while (remaining > 0) { - size_t chunk = std::min(remaining, kMaxSliceSize); - prepared.emplace_back( - Slice{static_cast(staged_ptr) + split_offset, chunk}); - split_offset += chunk; - remaining -= chunk; - } + // Split staged data by kMaxSliceSize for RDMA transfer limits. + auto sub = split_into_slices(staged_ptr, slices[i].size); + prepared.insert(prepared.end(), sub.begin(), sub.end()); } } staging_handles.push_back(std::move(*staging_alloc)); @@ -1209,6 +1202,13 @@ std::optional TransferSubmitter::submitMemcpyOperation( uint64_t base_address = static_cast(handle.buffer_address_); uint64_t offset = src_offset; + int base_dev = -1; + bool base_on_gpu = false; + if (base_address != 0) { + base_on_gpu = gpu_staging::IsDevicePointer( + reinterpret_cast(base_address), &base_dev); + } + // Execute memcpy inline on the calling thread instead of going through // the worker pool. This eliminates thread synchronization overhead and // keeps data on the same NUMA node. The worker pool (1 thread) added @@ -1216,7 +1216,7 @@ std::optional TransferSubmitter::submitMemcpyOperation( bool ok = true; for (size_t i = 0; i < slices.size(); ++i) { const auto& slice = slices[i]; - if (slice.ptr == nullptr) continue; + if (slice.ptr == nullptr || slice.size == 0) continue; void* dest; const void* src; @@ -1229,9 +1229,15 @@ std::optional TransferSubmitter::submitMemcpyOperation( } offset += slice.size; - int src_dev = -1, dst_dev = -1; - bool src_on_gpu = gpu_staging::IsDevicePointer(src, &src_dev); - bool dst_on_gpu = gpu_staging::IsDevicePointer(dest, &dst_dev); + int slice_dev = -1; + bool slice_on_gpu = gpu_staging::IsDevicePointer(slice.ptr, &slice_dev); + + bool src_on_gpu = + (op_code == TransferRequest::READ) ? base_on_gpu : slice_on_gpu; + int src_dev = (op_code == TransferRequest::READ) ? base_dev : slice_dev; + bool dst_on_gpu = + (op_code == TransferRequest::READ) ? slice_on_gpu : base_on_gpu; + int dst_dev = (op_code == TransferRequest::READ) ? slice_dev : base_dev; if (!src_on_gpu && !dst_on_gpu) { std::memcpy(dest, src, slice.size); From dc769ca8da19ba20f7fc54c54473be2c19e0fba4 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 09:18:10 +0000 Subject: [PATCH 05/15] [Store] Add bounded pinned host memory Cap Store-managed cudaHostRegister usage with MC_STORE_PIN_MEMORY_MAX_BYTES and keep formatting changes targeted. Co-Authored-By: Claude Opus 4.6 --- mooncake-integration/store/store_py.cpp | 18 +-- mooncake-store/include/gpu_staging_utils.h | 110 ++++++++++++++++++ mooncake-store/include/transfer_task.h | 11 +- mooncake-store/src/client_service.cpp | 25 +--- mooncake-store/src/real_client.cpp | 39 +++---- mooncake-store/src/transfer_task.cpp | 28 ++--- .../src/transfer_engine_impl.cpp | 4 +- 7 files changed, 160 insertions(+), 75 deletions(-) diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 040d69f233..ad11cd1c77 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -818,10 +818,11 @@ class MooncakeStorePyWrapper { continue; } valid_keys.push_back(keys[i]); - all_buffers.push_back( - {const_cast(reinterpret_cast( - &infos[i].metadata)), - reinterpret_cast(infos[i].data_ptr)}); + all_buffers.push_back({const_cast( + reinterpret_cast( + &infos[i].metadata)), + reinterpret_cast( + infos[i].data_ptr)}); all_sizes.push_back({infos[i].metadata.header.data_offset, infos[i].tensor_size}); original_indices.push_back(i); @@ -1411,10 +1412,11 @@ class MooncakeStorePyWrapper { if (!infos[i].valid()) continue; valid_keys.push_back(keys[i]); - all_buffers.push_back( - {const_cast(reinterpret_cast( - &infos[i].metadata)), - reinterpret_cast(infos[i].data_ptr)}); + all_buffers.push_back({const_cast( + reinterpret_cast( + &infos[i].metadata)), + reinterpret_cast( + infos[i].data_ptr)}); all_sizes.push_back({infos[i].metadata.header.data_offset, infos[i].tensor_size}); original_indices.push_back(i); diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 1fafca01dd..58ddc689c1 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -7,7 +7,12 @@ #endif #include +#include #include +#include +#include +#include +#include #include namespace mooncake { @@ -179,6 +184,111 @@ inline bool MemcpySafe(void* dst, const void* src, size_t size) { return CopyAuto(dst, src, size); } +inline bool PinMemoryEnabled() { + const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); + return !(pin_env && + (std::string(pin_env) == "0" || std::string(pin_env) == "false")); +} + +inline size_t PinMemoryMaxBytes() { + const char* max_env = std::getenv("MC_STORE_PIN_MEMORY_MAX_BYTES"); + if (!max_env || max_env[0] == '\0') return 0; + char* end = nullptr; + unsigned long long value = std::strtoull(max_env, &end, 10); + if (end == max_env) return 0; + if (value > std::numeric_limits::max()) { + return std::numeric_limits::max(); + } + return static_cast(value); +} + +inline std::mutex& PinnedHostMemoryMutex() { + static std::mutex mutex; + return mutex; +} + +inline size_t& PinnedHostMemoryBytes() { + static size_t bytes = 0; + return bytes; +} + +inline std::unordered_map& PinnedHostMemoryRegions() { + static std::unordered_map regions; + return regions; +} + +inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (!PinMemoryEnabled() || ptr == nullptr || size == 0) return false; + + { + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + if (regions.find(ptr) != regions.end()) return true; + + size_t max_bytes = PinMemoryMaxBytes(); + size_t current = PinnedHostMemoryBytes(); + if (max_bytes > 0 && + (current > max_bytes || size > max_bytes - current)) { + LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" + << size << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" + << max_bytes << " current_pinned=" << current; + return false; + } + PinnedHostMemoryBytes() += size; + } + + auto cuda_ret = cudaHostRegister(ptr, size, cudaHostRegisterDefault); + if (cuda_ret != cudaSuccess) { + std::lock_guard lock(PinnedHostMemoryMutex()); + PinnedHostMemoryBytes() -= size; + LOG(WARNING) << "cudaHostRegister failed for " << name << " size=" + << size << ": " << cudaGetErrorString(cuda_ret) + << "; GPU copies will use pageable fallback"; + return false; + } + + { + std::lock_guard lock(PinnedHostMemoryMutex()); + PinnedHostMemoryRegions()[ptr] = size; + } + LOG(INFO) << "cudaHostRegister OK for " << name << ", size=" << size; + return true; +#else + (void)ptr; + (void)size; + (void)name; + return false; +#endif +} + +inline void UnpinHostMemory(void* ptr, const char* name) { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (ptr == nullptr) return; + + size_t size = 0; + { + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + auto it = regions.find(ptr); + if (it == regions.end()) return; + size = it->second; + regions.erase(it); + PinnedHostMemoryBytes() -= size; + } + + auto cuda_ret = cudaHostUnregister(ptr); + if (cuda_ret != cudaSuccess) { + LOG(WARNING) << "cudaHostUnregister failed for " << name << " size=" + << size << ": " << cudaGetErrorString(cuda_ret); + } +#else + (void)ptr; + (void)name; +#endif +} } // namespace gpu_staging } // namespace mooncake diff --git a/mooncake-store/include/transfer_task.h b/mooncake-store/include/transfer_task.h index f8d6e69281..b65e4bbace 100644 --- a/mooncake-store/include/transfer_task.h +++ b/mooncake-store/include/transfer_task.h @@ -538,12 +538,11 @@ class FilereadWorkerPool { */ class TransferSubmitter { public: - explicit TransferSubmitter(TransferEngine& engine, - std::shared_ptr& backend, - const std::string& local_hostname, - std::shared_ptr staging_allocator, - TransferMetric* transfer_metric = nullptr, - int numa_socket_id = 0); + explicit TransferSubmitter( + TransferEngine& engine, std::shared_ptr& backend, + const std::string& local_hostname, + std::shared_ptr staging_allocator, + TransferMetric* transfer_metric = nullptr, int numa_socket_id = 0); /** * @brief Submit an asynchronous transfer operation diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 5044cb0dcd..becf1406c1 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -34,6 +34,7 @@ #include "ha/leadership/leader_coordinator_factory.h" #include "types.h" #include "client_buffer.hpp" +#include "gpu_staging_utils.h" #include "utils.h" #include "rpc_types.h" #include "local_hot_cache.h" @@ -2717,8 +2718,8 @@ tl::expected Client::UnmountSegmentImpl( #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ defined(USE_HYGON) || defined(USE_COREX) - cudaHostUnregister(reinterpret_cast(it->second.base)); - // Ignore failure — may not have been registered + gpu_staging::UnpinHostMemory(reinterpret_cast(it->second.base), + "segment"); #endif int rc = transfer_engine_->unregisterLocalMemory( @@ -2798,23 +2799,9 @@ tl::expected Client::MountSegmentAndGetId( defined(USE_HYGON) || defined(USE_COREX) { // Pin segment buffer by default so GPU→host copies use DMA - // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0 - const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); - bool pin_memory = !(pin_env && - (std::string(pin_env) == "0" || - std::string(pin_env) == "false")); - if (pin_memory) { - auto cuda_ret = cudaHostRegister((void*)buffer, size, - cudaHostRegisterDefault); - if (cuda_ret != cudaSuccess) { - LOG(WARNING) - << "cudaHostRegister failed for segment (size=" << size - << "): " << cudaGetErrorString(cuda_ret) - << "; GPU copies will use pageable fallback"; - } else { - LOG(INFO) << "cudaHostRegister segment OK, size=" << size; - } - } + // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0. + // Limit total pinned bytes: MC_STORE_PIN_MEMORY_MAX_BYTES=N. + gpu_staging::TryPinHostMemory((void*)buffer, size, "segment"); } #endif diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index df6109bfb9..28402f8b60 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -765,24 +765,11 @@ tl::expected RealClient::setup_internal( defined(USE_HYGON) || defined(USE_COREX) { // Pin staging buffer by default so GPU→host copies use DMA - // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0 - const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); - bool pin_memory = !(pin_env && - (std::string(pin_env) == "0" || - std::string(pin_env) == "false")); - if (pin_memory && local_buffer_size > 0) { - auto cuda_ret = cudaHostRegister( - client_buffer_allocator_->getBase(), local_buffer_size, - cudaHostRegisterDefault); - if (cuda_ret != cudaSuccess) { - LOG(WARNING) - << "cudaHostRegister staging buffer failed: " - << cudaGetErrorString(cuda_ret); - } else { - LOG(INFO) << "cudaHostRegister staging buffer OK, size=" - << local_buffer_size; - } - } + // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0. + // Limit total pinned bytes: MC_STORE_PIN_MEMORY_MAX_BYTES=N. + gpu_staging::TryPinHostMemory(client_buffer_allocator_->getBase(), + local_buffer_size, + "client staging buffer"); } #endif { @@ -1715,8 +1702,8 @@ tl::expected RealClient::put_internal( // the total work is the same (staging just happens later). (void)client_buffer_allocator; - std::vector slices = split_into_slices( - const_cast(value.data()), value.size_bytes()); + std::vector slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); auto put_result = client_->Put(key, slices, config); if (!put_result) { @@ -1846,8 +1833,8 @@ tl::expected RealClient::put_parts_internal( std::vector slices; for (const auto &value : values) { if (value.size_bytes() == 0) continue; - auto part_slices = split_into_slices( - const_cast(value.data()), value.size_bytes()); + auto part_slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } @@ -3688,8 +3675,8 @@ tl::expected RealClient::upsert_internal( } (void)client_buffer_allocator; - std::vector slices = split_into_slices( - const_cast(value.data()), value.size_bytes()); + std::vector slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); auto result = client_->Upsert(key, slices, config); if (!result) { @@ -3913,8 +3900,8 @@ tl::expected RealClient::upsert_parts_internal( std::vector slices; for (const auto &value : values) { if (value.size_bytes() == 0) continue; - auto part_slices = split_into_slices( - const_cast(value.data()), value.size_bytes()); + auto part_slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 22e01d35b9..155ad1c66d 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -900,12 +900,11 @@ TransferStrategy TransferFuture::strategy() const { // TransferSubmitter Implementation // ============================================================================ -TransferSubmitter::TransferSubmitter(TransferEngine& engine, - std::shared_ptr& backend, - const std::string& local_hostname, - std::shared_ptr staging_allocator, - TransferMetric* transfer_metric, - int numa_socket_id) +TransferSubmitter::TransferSubmitter( + TransferEngine& engine, std::shared_ptr& backend, + const std::string& local_hostname, + std::shared_ptr staging_allocator, + TransferMetric* transfer_metric, int numa_socket_id) : engine_(engine), local_endpoint_(engine.getLocalIpAndPort()), staging_allocator_(std::move(staging_allocator)), @@ -977,14 +976,16 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { if (!staging_allocator_) { LOG(ERROR) << "No staging allocator for unregistered RDMA slices"; - return std::make_pair(std::vector{}, std::vector{}); + return std::make_pair(std::vector{}, + std::vector{}); } // Allocate ONE contiguous staging buffer for all unregistered data. auto staging_alloc = staging_allocator_->allocate(total_staging); if (!staging_alloc) { LOG(ERROR) << "Staging alloc failed, total_size=" << total_staging; - return std::make_pair(std::vector{}, std::vector{}); + return std::make_pair(std::vector{}, + std::vector{}); } char* staging_base = static_cast(staging_alloc->ptr()); @@ -994,7 +995,6 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { std::vector staging_handles; prepared.reserve(slices.size()); - // Sync path (default): copy each unregistered slice into the contiguous // staging buffer. MemcpySafe auto-detects GPU vs CPU pointers. // Staged slices are split by kMaxSliceSize to match RDMA transfer limits. @@ -1009,7 +1009,8 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { if (!gpu_staging::MemcpySafe(staged_ptr, slices[i].ptr, slices[i].size)) { LOG(ERROR) << "MemcpySafe failed during RDMA staging"; - return std::make_pair(std::vector{}, std::vector{}); + return std::make_pair(std::vector{}, + std::vector{}); } // Split staged data by kMaxSliceSize for RDMA transfer limits. auto sub = split_into_slices(staged_ptr, slices[i].size); @@ -1043,14 +1044,13 @@ std::optional TransferSubmitter::submit( future = submitMemcpyOperation(handle, slices, op_code); break; case TransferStrategy::TRANSFER_ENGINE: { - auto [prepared, staging] = - ensureRegisteredForRDMA(slices); + auto [prepared, staging] = ensureRegisteredForRDMA(slices); if (prepared.empty() && !slices.empty()) { LOG(ERROR) << "ensureRegisteredForRDMA failed"; return std::nullopt; } - future = submitTransferEngineOperation( - handle, prepared, op_code); + future = submitTransferEngineOperation(handle, prepared, + op_code); if (future && !staging.empty()) { future->attachStagingHandles(std::move(staging)); } diff --git a/mooncake-transfer-engine/src/transfer_engine_impl.cpp b/mooncake-transfer-engine/src/transfer_engine_impl.cpp index 2c96462ef6..e359a14f12 100644 --- a/mooncake-transfer-engine/src/transfer_engine_impl.cpp +++ b/mooncake-transfer-engine/src/transfer_engine_impl.cpp @@ -563,8 +563,8 @@ bool TransferEngineImpl::checkOverlap(void* addr, uint64_t length) { bool TransferEngineImpl::isLocalMemoryRegistered(const void* addr) { std::shared_lock lock(mutex_); - return findMemoryRegionContaining(reinterpret_cast(addr)) - != local_memory_regions_.end(); + return findMemoryRegionContaining(reinterpret_cast(addr)) != + local_memory_regions_.end(); } int TransferEngineImpl::registerLocalMemory(void* addr, size_t length, From c65bed7b19a06f2cc9ed45e063993304321e7f7a Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 09:59:44 +0000 Subject: [PATCH 06/15] [Store] Tighten zero-copy write safety Validate full registered ranges before RDMA fast paths and clean up pinned host registrations on teardown/failure paths so zero-copy writes fail closed without leaking resources. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/real_client.h | 32 +++++++++---------- mooncake-store/src/client_service.cpp | 11 +++++++ mooncake-store/src/real_client.cpp | 15 ++++++--- mooncake-store/src/transfer_task.cpp | 8 ++++- .../include/transfer_engine.h | 2 +- .../include/transfer_engine_impl.h | 2 +- .../src/transfer_engine.cpp | 8 ++--- .../src/transfer_engine_impl.cpp | 13 ++++++-- 8 files changed, 61 insertions(+), 30 deletions(-) diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index eeaf7fa085..361bcb30a5 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -168,28 +168,28 @@ class RealClient : public PyClient { /** * @brief Put object data directly from a pre-allocated buffer * @param key Key of the object to put - * @param buffer Pointer to Store-managed registered memory, either - * explicitly registered with register_buffer() or inside the setup-time - * local buffer + * @param buffer Pointer to the source data buffer * @param size Size of the data to put * @return 0 on success, negative value on error - * @note The buffer address must resolve to Store-managed registered memory, - * either an explicit register_buffer() region or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ int put_from(const std::string &key, void *buffer, size_t size, const ReplicateConfig &config = ReplicateConfig{}); /** - * @brief Put one object directly from a registered data buffer plus a - * registered metadata buffer + * @brief Put one object directly from a data buffer plus a metadata buffer * @param key Key of the object to put - * @param buffer Pointer to the registered data buffer - * @param metadata_buffer Pointer to the registered metadata buffer + * @param buffer Pointer to the data buffer + * @param metadata_buffer Pointer to the metadata buffer * @param size Size of the data buffer in bytes * @param metadata_size Size of the metadata buffer in bytes * @param config Replication configuration * @return 0 on success, negative value on error + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ int put_from_with_metadata( const std::string &key, void *buffer, void *metadata_buffer, @@ -205,9 +205,9 @@ class RealClient : public PyClient { * @param config Replication configuration * @return Vector of integers, where each element is 0 on success, or a * negative value on error - * @note The buffer addresses must resolve to Store-managed registered - * memory, either explicit register_buffer() regions or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ std::vector batch_put_from( @@ -225,9 +225,9 @@ class RealClient : public PyClient { * @param config Replication configuration * @return Vector of integers, where each element is 0 on success, or a * negative value on error - * @note The buffer addresses must resolve to Store-managed registered - * memory, either explicit register_buffer() regions or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ std::vector batch_put_from_multi_buffers( const std::vector &keys, diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index becf1406c1..ec1b0895b4 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -2822,6 +2822,17 @@ tl::expected Client::MountSegmentAndGetId( ErrorCode err = mount_result.error(); LOG(ERROR) << "mount_segment_to_master_failed base=" << buffer << " size=" << size << ", error=" << err; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + gpu_staging::UnpinHostMemory((void*)buffer, "segment"); +#endif + int unregister_rc = + transfer_engine_->unregisterLocalMemory((void*)buffer); + if (unregister_rc != 0) { + LOG(WARNING) << "Failed to unregister transfer buffer after " + "mount failure, ret=" + << unregister_rc; + } return tl::unexpected(err); } diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 28402f8b60..663dd70b9a 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -1114,6 +1114,11 @@ tl::expected RealClient::tearDownAll_internal() { } if (client_buffer_allocator_ && client_buffer_allocator_->size() > 0 && protocol != "cxl") { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + gpu_staging::UnpinHostMemory(client_buffer_allocator_->getBase(), + "client staging buffer"); +#endif auto unregister_result = client_->unregisterLocalMemory( client_buffer_allocator_->getBase(), true); if (!unregister_result) { @@ -3612,8 +3617,9 @@ std::vector> RealClient::batch_put_from_internal( tl::expected RealClient::put_from_internal( const std::string &key, void *buffer, size_t size, const ReplicateConfig &config) { - // NOTE: The buffer address must resolve to Store-managed registered - // memory for zero-copy RDMA operations to work correctly + // Registered buffers are used directly for RDMA writes. Unregistered + // buffers are staged in the transfer layer when RDMA requires registered + // source memory. if (config.prefer_alloc_in_same_node) { LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; return tl::unexpected(ErrorCode::INVALID_PARAMS); @@ -4593,8 +4599,9 @@ int RealClient::put_from_with_metadata(const std::string &key, void *buffer, size_t metadata_size, const ReplicateConfig &config) { const auto start_time = std::chrono::steady_clock::now(); - // NOTE: The buffer address must resolve to Store-managed registered - // memory for zero-copy RDMA operations to work correctly + // Registered buffers are used directly for RDMA writes. Unregistered + // buffers are staged in the transfer layer when RDMA requires registered + // source memory. if (config.prefer_alloc_in_same_node) { LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; return -1; diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 155ad1c66d..6c5cf1eaab 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -956,7 +957,12 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { size_t total_staging = 0; std::vector needs_staging(slices.size(), false); for (size_t i = 0; i < slices.size(); ++i) { - if (!engine_.isLocalMemoryRegistered(slices[i].ptr)) { + if (!engine_.isLocalMemoryRegistered(slices[i].ptr, slices[i].size)) { + if (slices[i].size > std::numeric_limits::max() - total_staging) { + LOG(ERROR) << "RDMA staging size overflow"; + return std::make_pair(std::vector{}, + std::vector{}); + } needs_staging[i] = true; total_staging += slices[i].size; } diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index b7a9664c74..3b6e904246 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -184,7 +184,7 @@ class TransferEngine { bool checkOverlap(void* addr, uint64_t length); - bool isLocalMemoryRegistered(const void* addr); + bool isLocalMemoryRegistered(const void* addr, size_t length); void setAutoDiscover(bool auto_discover); diff --git a/mooncake-transfer-engine/include/transfer_engine_impl.h b/mooncake-transfer-engine/include/transfer_engine_impl.h index fd01a90288..976a5ef2f8 100644 --- a/mooncake-transfer-engine/include/transfer_engine_impl.h +++ b/mooncake-transfer-engine/include/transfer_engine_impl.h @@ -361,7 +361,7 @@ class TransferEngineImpl { bool checkOverlap(void* addr, uint64_t length); - bool isLocalMemoryRegistered(const void* addr); + bool isLocalMemoryRegistered(const void* addr, size_t length); #ifdef ENABLE_MULTI_PROTOCOL struct RegisteredRecord { diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 1d5510a8ee..77176d7259 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -204,8 +204,8 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return impl_->checkOverlap(addr, length); } -bool TransferEngine::isLocalMemoryRegistered(const void* addr) { - return impl_->isLocalMemoryRegistered(addr); +bool TransferEngine::isLocalMemoryRegistered(const void* addr, size_t length) { + return impl_->isLocalMemoryRegistered(addr, length); } void TransferEngine::setAutoDiscover(bool auto_discover) { @@ -641,8 +641,8 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return false; } -bool TransferEngine::isLocalMemoryRegistered(const void* addr) { - if (!use_tent_) return impl_->isLocalMemoryRegistered(addr); +bool TransferEngine::isLocalMemoryRegistered(const void* addr, size_t length) { + if (!use_tent_) return impl_->isLocalMemoryRegistered(addr, length); return false; // Conservative for TENT mode } diff --git a/mooncake-transfer-engine/src/transfer_engine_impl.cpp b/mooncake-transfer-engine/src/transfer_engine_impl.cpp index e359a14f12..23e239f18f 100644 --- a/mooncake-transfer-engine/src/transfer_engine_impl.cpp +++ b/mooncake-transfer-engine/src/transfer_engine_impl.cpp @@ -561,10 +561,17 @@ bool TransferEngineImpl::checkOverlap(void* addr, uint64_t length) { return hasOverlapLocked(reinterpret_cast(addr), length); } -bool TransferEngineImpl::isLocalMemoryRegistered(const void* addr) { +bool TransferEngineImpl::isLocalMemoryRegistered(const void* addr, + size_t length) { + if (length == 0) return true; std::shared_lock lock(mutex_); - return findMemoryRegionContaining(reinterpret_cast(addr)) != - local_memory_regions_.end(); + auto it = findMemoryRegionContaining(reinterpret_cast(addr)); + if (it == local_memory_regions_.end()) return false; + + uintptr_t start = reinterpret_cast(addr); + uintptr_t region_start = reinterpret_cast(it->second.addr); + size_t offset = start - region_start; + return offset <= it->second.length && length <= it->second.length - offset; } int TransferEngineImpl::registerLocalMemory(void* addr, size_t length, From 317d3ac418d75a14fe602a24cf03db5bee0848d2 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 10:44:53 +0000 Subject: [PATCH 07/15] [Store] Fix zero-copy write formatting Apply the clang-format-20 layout expected by CI for the zero-copy write and pinned-memory changes. Co-Authored-By: Claude Opus 4.6 --- mooncake-integration/store/store_py.cpp | 18 ++++++++---------- mooncake-store/include/gpu_staging_utils.h | 10 +++++----- mooncake-store/src/client_service.cpp | 6 +++--- mooncake-store/src/real_client.cpp | 14 +++++++------- mooncake-store/src/transfer_task.cpp | 3 ++- 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index ad11cd1c77..2949d708f0 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -818,11 +818,10 @@ class MooncakeStorePyWrapper { continue; } valid_keys.push_back(keys[i]); - all_buffers.push_back({const_cast( - reinterpret_cast( - &infos[i].metadata)), - reinterpret_cast( - infos[i].data_ptr)}); + all_buffers.push_back( + {const_cast( + reinterpret_cast(&infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); all_sizes.push_back({infos[i].metadata.header.data_offset, infos[i].tensor_size}); original_indices.push_back(i); @@ -1412,11 +1411,10 @@ class MooncakeStorePyWrapper { if (!infos[i].valid()) continue; valid_keys.push_back(keys[i]); - all_buffers.push_back({const_cast( - reinterpret_cast( - &infos[i].metadata)), - reinterpret_cast( - infos[i].data_ptr)}); + all_buffers.push_back( + {const_cast( + reinterpret_cast(&infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); all_sizes.push_back({infos[i].metadata.header.data_offset, infos[i].tensor_size}); original_indices.push_back(i); diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 58ddc689c1..4f55e44528 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -231,9 +231,9 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { size_t current = PinnedHostMemoryBytes(); if (max_bytes > 0 && (current > max_bytes || size > max_bytes - current)) { - LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" - << size << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" - << max_bytes << " current_pinned=" << current; + LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" << size + << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" << max_bytes + << " current_pinned=" << current; return false; } PinnedHostMemoryBytes() += size; @@ -243,8 +243,8 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { if (cuda_ret != cudaSuccess) { std::lock_guard lock(PinnedHostMemoryMutex()); PinnedHostMemoryBytes() -= size; - LOG(WARNING) << "cudaHostRegister failed for " << name << " size=" - << size << ": " << cudaGetErrorString(cuda_ret) + LOG(WARNING) << "cudaHostRegister failed for " << name << " size=" << size + << ": " << cudaGetErrorString(cuda_ret) << "; GPU copies will use pageable fallback"; return false; } diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index ec1b0895b4..03a3ea0404 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -2798,9 +2798,9 @@ tl::expected Client::MountSegmentAndGetId( #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ defined(USE_HYGON) || defined(USE_COREX) { - // Pin segment buffer by default so GPU→host copies use DMA - // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0. - // Limit total pinned bytes: MC_STORE_PIN_MEMORY_MAX_BYTES=N. + // Pin segment buffer by default so GPU→host copies use DMA. + // Opt out with MC_STORE_PIN_MEMORY=0. + // Cap with MC_STORE_PIN_MEMORY_MAX_BYTES=N. gpu_staging::TryPinHostMemory((void*)buffer, size, "segment"); } #endif diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 663dd70b9a..ff890eb379 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -764,9 +764,9 @@ tl::expected RealClient::setup_internal( #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ defined(USE_HYGON) || defined(USE_COREX) { - // Pin staging buffer by default so GPU→host copies use DMA - // instead of CUDA's internal staging. Opt out: MC_STORE_PIN_MEMORY=0. - // Limit total pinned bytes: MC_STORE_PIN_MEMORY_MAX_BYTES=N. + // Pin staging buffer by default so GPU→host copies use DMA. + // Opt out with MC_STORE_PIN_MEMORY=0. + // Cap with MC_STORE_PIN_MEMORY_MAX_BYTES=N. gpu_staging::TryPinHostMemory(client_buffer_allocator_->getBase(), local_buffer_size, "client staging buffer"); @@ -1838,8 +1838,8 @@ tl::expected RealClient::put_parts_internal( std::vector slices; for (const auto &value : values) { if (value.size_bytes() == 0) continue; - auto part_slices = - split_into_slices(const_cast(value.data()), value.size_bytes()); + auto part_slices = split_into_slices(const_cast(value.data()), + value.size_bytes()); slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } @@ -3906,8 +3906,8 @@ tl::expected RealClient::upsert_parts_internal( std::vector slices; for (const auto &value : values) { if (value.size_bytes() == 0) continue; - auto part_slices = - split_into_slices(const_cast(value.data()), value.size_bytes()); + auto part_slices = split_into_slices(const_cast(value.data()), + value.size_bytes()); slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 6c5cf1eaab..811a0d9d1f 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -958,7 +958,8 @@ TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { std::vector needs_staging(slices.size(), false); for (size_t i = 0; i < slices.size(); ++i) { if (!engine_.isLocalMemoryRegistered(slices[i].ptr, slices[i].size)) { - if (slices[i].size > std::numeric_limits::max() - total_staging) { + if (slices[i].size > + std::numeric_limits::max() - total_staging) { LOG(ERROR) << "RDMA staging size overflow"; return std::make_pair(std::vector{}, std::vector{}); From ade3a4377a694d322b39ee3c04c2e9b024def1d5 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 10:54:11 +0000 Subject: [PATCH 08/15] [Store] Match CI clang-format output Format pinned-memory logging with clang-format 20.1.8 to match the CI formatter exactly. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 4f55e44528..834f942ec1 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -231,9 +231,10 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { size_t current = PinnedHostMemoryBytes(); if (max_bytes > 0 && (current > max_bytes || size > max_bytes - current)) { - LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" << size - << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" << max_bytes - << " current_pinned=" << current; + LOG(WARNING) << "Skip cudaHostRegister for " << name + << " size=" << size + << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" + << max_bytes << " current_pinned=" << current; return false; } PinnedHostMemoryBytes() += size; @@ -243,8 +244,8 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { if (cuda_ret != cudaSuccess) { std::lock_guard lock(PinnedHostMemoryMutex()); PinnedHostMemoryBytes() -= size; - LOG(WARNING) << "cudaHostRegister failed for " << name << " size=" << size - << ": " << cudaGetErrorString(cuda_ret) + LOG(WARNING) << "cudaHostRegister failed for " << name + << " size=" << size << ": " << cudaGetErrorString(cuda_ret) << "; GPU copies will use pageable fallback"; return false; } @@ -281,8 +282,9 @@ inline void UnpinHostMemory(void* ptr, const char* name) { auto cuda_ret = cudaHostUnregister(ptr); if (cuda_ret != cudaSuccess) { - LOG(WARNING) << "cudaHostUnregister failed for " << name << " size=" - << size << ": " << cudaGetErrorString(cuda_ret); + LOG(WARNING) << "cudaHostUnregister failed for " << name + << " size=" << size << ": " + << cudaGetErrorString(cuda_ret); } #else (void)ptr; From 555c63a37d154700b45b03a7c5a6a32adcf55e42 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 11:52:58 +0000 Subject: [PATCH 09/15] [Store] Avoid CUDA-only host register flag Use the default host-register flag value directly so CUDA-alike builds such as MUSA do not require a cudaHostRegisterDefault macro mapping. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 834f942ec1..31826f2952 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -240,7 +240,7 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { PinnedHostMemoryBytes() += size; } - auto cuda_ret = cudaHostRegister(ptr, size, cudaHostRegisterDefault); + auto cuda_ret = cudaHostRegister(ptr, size, 0); if (cuda_ret != cudaSuccess) { std::lock_guard lock(PinnedHostMemoryMutex()); PinnedHostMemoryBytes() -= size; From 9ff8dafaa0aa369eb6e96e494dbf9aa1d4e13308 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 12:11:20 +0000 Subject: [PATCH 10/15] [Store] Address GPU device guard review Restore the caller's active GPU device after inline GPU copies and move pinned-memory tracking state into a single translation unit. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 62 +++++++++++++++++----- mooncake-store/src/CMakeLists.txt | 1 + mooncake-store/src/gpu_staging_utils.cpp | 22 ++++++++ mooncake-store/src/real_client.cpp | 7 +-- mooncake-store/src/transfer_task.cpp | 2 +- 5 files changed, 77 insertions(+), 17 deletions(-) create mode 100644 mooncake-store/src/gpu_staging_utils.cpp diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 31826f2952..46afc8d58a 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -113,6 +113,51 @@ inline void SetDevice(int device_id) { #endif } +class DeviceGuard { + public: + explicit DeviceGuard(int device_id) { + if (device_id < 0) return; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (cudaGetDevice(&previous_device_) == cudaSuccess) { + active_ = true; + } + cudaSetDevice(device_id); +#elif defined(USE_HIP) + if (hipGetDevice(&previous_device_) == hipSuccess) { + active_ = true; + } + hipSetDevice(device_id); +#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) + if (aclrtGetDevice(&previous_device_) == ACL_SUCCESS) { + active_ = true; + } + aclrtSetDevice(device_id); +#else + (void)device_id; +#endif + } + + ~DeviceGuard() { + if (!active_) return; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + cudaSetDevice(previous_device_); +#elif defined(USE_HIP) + hipSetDevice(previous_device_); +#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) + aclrtSetDevice(previous_device_); +#endif + } + + DeviceGuard(const DeviceGuard&) = delete; + DeviceGuard& operator=(const DeviceGuard&) = delete; + + private: + int previous_device_{-1}; + bool active_{false}; +}; + // Copy host memory to device. Caller must have called SetDevice first. inline bool CopyHostToDevice(void* dst, const void* src, size_t size) { #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ @@ -180,7 +225,7 @@ inline bool MemcpySafe(void* dst, const void* src, size_t size) { return true; } int dev = src_gpu ? src_dev : dst_dev; - SetDevice(dev); + DeviceGuard guard(dev); return CopyAuto(dst, src, size); } @@ -202,20 +247,11 @@ inline size_t PinMemoryMaxBytes() { return static_cast(value); } -inline std::mutex& PinnedHostMemoryMutex() { - static std::mutex mutex; - return mutex; -} +std::mutex& PinnedHostMemoryMutex(); -inline size_t& PinnedHostMemoryBytes() { - static size_t bytes = 0; - return bytes; -} +size_t& PinnedHostMemoryBytes(); -inline std::unordered_map& PinnedHostMemoryRegions() { - static std::unordered_map regions; - return regions; -} +std::unordered_map& PinnedHostMemoryRegions(); inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index e8887223e3..3cf476addc 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -28,6 +28,7 @@ set(MOONCAKE_STORE_SOURCES shm_helper.cpp http_metadata_server.cpp file_storage.cpp + gpu_staging_utils.cpp serialize/serializer.cpp storage/distributed/distributed_storage_backend.cpp ha/leadership/leader_coordinator_factory.cpp diff --git a/mooncake-store/src/gpu_staging_utils.cpp b/mooncake-store/src/gpu_staging_utils.cpp new file mode 100644 index 0000000000..6d7efda86d --- /dev/null +++ b/mooncake-store/src/gpu_staging_utils.cpp @@ -0,0 +1,22 @@ +#include "gpu_staging_utils.h" + +namespace mooncake { +namespace gpu_staging { + +std::mutex& PinnedHostMemoryMutex() { + static std::mutex mutex; + return mutex; +} + +size_t& PinnedHostMemoryBytes() { + static size_t bytes = 0; + return bytes; +} + +std::unordered_map& PinnedHostMemoryRegions() { + static std::unordered_map regions; + return regions; +} + +} // namespace gpu_staging +} // namespace mooncake diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index ff890eb379..32e89d26db 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -4722,13 +4722,13 @@ RealClient::batch_put_from_multi_buffers_internal( std::vector RealClient::batch_upsert_from_multi_buffers( const std::vector &keys, const std::vector> &all_buffers, - const std::vector> &sizes, + const std::vector> &all_sizes, const ReplicateConfig &config) { auto internal_results = execute_timed_operation>>( [&]() { return batch_upsert_from_multi_buffers_internal( - keys, all_buffers, sizes, config); + keys, all_buffers, all_sizes, config); }, [](const auto &) { return true; }, [&](uint64_t latency_us, const auto &ret) { @@ -4740,7 +4740,8 @@ std::vector RealClient::batch_upsert_from_multi_buffers( client_->ObserveTransferOperation( TransferOperationKind::kWrite, "batch_upsert_from_multi_buffers", - sum_successful_nested_sizes(py_results, sizes), latency_us); + sum_successful_nested_sizes(py_results, all_sizes), + latency_us); }); std::vector results; results.reserve(internal_results.size()); diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 811a0d9d1f..ab1bfd3c74 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -1250,7 +1250,7 @@ std::optional TransferSubmitter::submitMemcpyOperation( std::memcpy(dest, src, slice.size); } else { int dev = src_on_gpu ? src_dev : dst_dev; - gpu_staging::SetDevice(dev); + gpu_staging::DeviceGuard guard(dev); if (!gpu_staging::CopyAuto(dest, src, slice.size)) { LOG(ERROR) << "GPU memcpy failed: src_dev=" << src_dev << " dst_dev=" << dst_dev << " size=" << slice.size; From f337b3de85d47aaf6f69d94b86dc349faa8207be Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 12:30:07 +0000 Subject: [PATCH 11/15] [Store] Tighten GPU copy lifecycle handling Serialize pinned-memory registration state transitions and restore GPU device state in remaining copy paths so setup and transfer failure paths do not leak caller-visible state. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/include/gpu_staging_utils.h | 52 +++++++++------------- mooncake-store/src/real_client.cpp | 21 +++++---- mooncake-store/src/transfer_task.cpp | 2 +- 3 files changed, 35 insertions(+), 40 deletions(-) diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 46afc8d58a..41e72f5d38 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -258,38 +258,29 @@ inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { defined(USE_HYGON) || defined(USE_COREX) if (!PinMemoryEnabled() || ptr == nullptr || size == 0) return false; - { - std::lock_guard lock(PinnedHostMemoryMutex()); - auto& regions = PinnedHostMemoryRegions(); - if (regions.find(ptr) != regions.end()) return true; + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + if (regions.find(ptr) != regions.end()) return true; - size_t max_bytes = PinMemoryMaxBytes(); - size_t current = PinnedHostMemoryBytes(); - if (max_bytes > 0 && - (current > max_bytes || size > max_bytes - current)) { - LOG(WARNING) << "Skip cudaHostRegister for " << name - << " size=" << size - << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" - << max_bytes << " current_pinned=" << current; - return false; - } - PinnedHostMemoryBytes() += size; + size_t max_bytes = PinMemoryMaxBytes(); + size_t current = PinnedHostMemoryBytes(); + if (max_bytes > 0 && (current > max_bytes || size > max_bytes - current)) { + LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" << size + << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" << max_bytes + << " current_pinned=" << current; + return false; } auto cuda_ret = cudaHostRegister(ptr, size, 0); if (cuda_ret != cudaSuccess) { - std::lock_guard lock(PinnedHostMemoryMutex()); - PinnedHostMemoryBytes() -= size; LOG(WARNING) << "cudaHostRegister failed for " << name << " size=" << size << ": " << cudaGetErrorString(cuda_ret) << "; GPU copies will use pageable fallback"; return false; } - { - std::lock_guard lock(PinnedHostMemoryMutex()); - PinnedHostMemoryRegions()[ptr] = size; - } + PinnedHostMemoryBytes() += size; + regions[ptr] = size; LOG(INFO) << "cudaHostRegister OK for " << name << ", size=" << size; return true; #else @@ -305,23 +296,22 @@ inline void UnpinHostMemory(void* ptr, const char* name) { defined(USE_HYGON) || defined(USE_COREX) if (ptr == nullptr) return; - size_t size = 0; - { - std::lock_guard lock(PinnedHostMemoryMutex()); - auto& regions = PinnedHostMemoryRegions(); - auto it = regions.find(ptr); - if (it == regions.end()) return; - size = it->second; - regions.erase(it); - PinnedHostMemoryBytes() -= size; - } + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + auto it = regions.find(ptr); + if (it == regions.end()) return; + size_t size = it->second; auto cuda_ret = cudaHostUnregister(ptr); if (cuda_ret != cudaSuccess) { LOG(WARNING) << "cudaHostUnregister failed for " << name << " size=" << size << ": " << cudaGetErrorString(cuda_ret); + return; } + + regions.erase(it); + PinnedHostMemoryBytes() -= size; #else (void)ptr; (void)name; diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 32e89d26db..0c75f1cdd8 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -270,7 +270,7 @@ inline tl::expected scatter_host_to_maybe_device( void *dst, const void *src, size_t size, const std::string &context) { int device_id = -1; if (gpu_staging::IsDevicePointer(dst, &device_id)) { - gpu_staging::SetDevice(device_id); + gpu_staging::DeviceGuard guard(device_id); if (!gpu_staging::CopyHostToDevice(dst, src, size)) { LOG(ERROR) << "H2D copy failed: " << context; return tl::unexpected(ErrorCode::TRANSFER_FAIL); @@ -788,6 +788,11 @@ tl::expected RealClient::setup_internal( // ensureRegisteredForRDMA can stage unregistered user buffers for RDMA. client_->setStagingAllocator(client_buffer_allocator_); + auto cleanup_on_setup_failure = [this](ErrorCode error) { + tearDownAll_internal(); + return tl::unexpected(error); + }; + // If global_segment_size is 0, skip mount segment; // If global_segment_size is larger than max_mr_size, split to multiple // mapped_shms. @@ -801,7 +806,7 @@ tl::expected RealClient::setup_internal( cxl_dev_size = static_cast(val); } else { LOG(FATAL) << "MC_CXL_DEV_SIZE not set"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); + return cleanup_on_setup_failure(ErrorCode::INVALID_PARAMS); } void *ptr = client_->GetBaseAddr(); @@ -811,7 +816,7 @@ tl::expected RealClient::setup_internal( if (!mount_result.has_value()) { LOG(ERROR) << "Failed to mount segment: " << toString(mount_result.error()); - return tl::unexpected(mount_result.error()); + return cleanup_on_setup_failure(mount_result.error()); } } else { @@ -870,7 +875,7 @@ tl::expected RealClient::setup_internal( if (!ptr) { LOG(ERROR) << "Failed to allocate segment memory"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); + return cleanup_on_setup_failure(ErrorCode::INVALID_PARAMS); } if (this->protocol == "ascend" || this->protocol == "ubshmem") { ascend_segment_ptrs_.emplace_back( @@ -891,7 +896,7 @@ tl::expected RealClient::setup_internal( if (!mount_result.has_value()) { LOG(ERROR) << "Failed to mount segment: " << toString(mount_result.error()); - return tl::unexpected(mount_result.error()); + return cleanup_on_setup_failure(mount_result.error()); } } if (total_glbseg_size == 0) { @@ -903,7 +908,7 @@ tl::expected RealClient::setup_internal( if (!ipc_socket_path_.empty()) { if (start_ipc_server() != 0) { LOG(ERROR) << "Failed to start IPC server at " << ipc_socket_path_; - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return cleanup_on_setup_failure(ErrorCode::INTERNAL_ERROR); } LOG(INFO) << "Starting IPC server at " << ipc_socket_path_; } @@ -922,7 +927,7 @@ tl::expected RealClient::setup_internal( LOG(ERROR) << "Failed to start offload RPC server: " << err.message(); offload_rpc_server_.reset(); - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return cleanup_on_setup_failure(ErrorCode::INTERNAL_ERROR); } offload_rpc_port_ = offload_rpc_server_->port(); LOG(INFO) << "Offload RPC server started on port " << offload_rpc_port_; @@ -943,7 +948,7 @@ tl::expected RealClient::setup_internal( if (!init_result) { LOG(ERROR) << "file storage init failed with error: " << init_result.error(); - return init_result; + return cleanup_on_setup_failure(init_result.error()); } } client_requester_ = std::make_shared(); diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index ab1bfd3c74..8d5f361dfa 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -662,7 +662,7 @@ void MemcpyWorkerPool::workerThread() { std::memcpy(op.dest, op.src, op.size); } else { int dev = src_on_gpu ? src_dev : dst_dev; - gpu_staging::SetDevice(dev); + gpu_staging::DeviceGuard guard(dev); if (!gpu_staging::CopyAuto(op.dest, op.src, op.size)) { LOG(ERROR) << "GPU memcpy failed: src_dev=" << src_dev From f8c43ff631e3b951e1377cdfab6b0f2e726600e6 Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Wed, 24 Jun 2026 15:21:23 +0000 Subject: [PATCH 12/15] [Store] skip RDMA staging for TCP-only writes Avoid requiring a staging allocator for TCP-only transfer-engine writes, while keeping RDMA registration staging for non-TCP transports. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/src/transfer_task.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 8d5f361dfa..1648825e31 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -1051,6 +1051,11 @@ std::optional TransferSubmitter::submit( future = submitMemcpyOperation(handle, slices, op_code); break; case TransferStrategy::TRANSFER_ENGINE: { + if (engine_.isTcpOnly()) { + future = submitTransferEngineOperation(handle, slices, + op_code); + break; + } auto [prepared, staging] = ensureRegisteredForRDMA(slices); if (prepared.empty() && !slices.empty()) { LOG(ERROR) << "ensureRegisteredForRDMA failed"; @@ -1113,10 +1118,10 @@ std::optional TransferSubmitter::submit_batch( return std::nullopt; } - // For WRITE ops, ensure slices are in RDMA-registered memory. + // For non-TCP WRITE ops, ensure slices are in RDMA-registered memory. const std::vector* effective_slices = &slices; std::vector prepared; - if (op_code == TransferRequest::WRITE) { + if (op_code == TransferRequest::WRITE && !engine_.isTcpOnly()) { auto [p, staging] = ensureRegisteredForRDMA(slices); if (p.empty() && !slices.empty()) { LOG(ERROR) << "ensureRegisteredForRDMA failed in submit_batch"; From 852cf7564a47979a3d329515e116e89fc041b4ea Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Thu, 25 Jun 2026 02:43:47 +0000 Subject: [PATCH 13/15] [Store] Gate staging on RDMA transport Avoid routing CXL writes through RDMA staging by checking for an RDMA transport directly instead of treating every non-TCP path as RDMA. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/src/transfer_task.cpp | 29 ++++++++++--------- .../include/multi_transport.h | 2 ++ .../include/transfer_engine.h | 2 ++ .../include/transfer_engine_impl.h | 4 +++ .../src/multi_transport.cpp | 4 +++ .../src/transfer_engine.cpp | 9 ++++++ 6 files changed, 36 insertions(+), 14 deletions(-) diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 1648825e31..7bcd8e7702 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -1051,20 +1051,21 @@ std::optional TransferSubmitter::submit( future = submitMemcpyOperation(handle, slices, op_code); break; case TransferStrategy::TRANSFER_ENGINE: { - if (engine_.isTcpOnly()) { + if (engine_.hasRdmaTransport()) { + auto [prepared, staging] = + ensureRegisteredForRDMA(slices); + if (prepared.empty() && !slices.empty()) { + LOG(ERROR) << "ensureRegisteredForRDMA failed"; + return std::nullopt; + } + future = submitTransferEngineOperation(handle, prepared, + op_code); + if (future && !staging.empty()) { + future->attachStagingHandles(std::move(staging)); + } + } else { future = submitTransferEngineOperation(handle, slices, op_code); - break; - } - auto [prepared, staging] = ensureRegisteredForRDMA(slices); - if (prepared.empty() && !slices.empty()) { - LOG(ERROR) << "ensureRegisteredForRDMA failed"; - return std::nullopt; - } - future = submitTransferEngineOperation(handle, prepared, - op_code); - if (future && !staging.empty()) { - future->attachStagingHandles(std::move(staging)); } break; } @@ -1118,10 +1119,10 @@ std::optional TransferSubmitter::submit_batch( return std::nullopt; } - // For non-TCP WRITE ops, ensure slices are in RDMA-registered memory. + // For RDMA WRITE ops, ensure slices are in registered memory. const std::vector* effective_slices = &slices; std::vector prepared; - if (op_code == TransferRequest::WRITE && !engine_.isTcpOnly()) { + if (op_code == TransferRequest::WRITE && engine_.hasRdmaTransport()) { auto [p, staging] = ensureRegisteredForRDMA(slices); if (p.empty() && !slices.empty()) { LOG(ERROR) << "ensureRegisteredForRDMA failed in submit_batch"; diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index c556541bcd..57c144c081 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -63,6 +63,8 @@ class MultiTransport { */ bool isTcpOnly() const; + bool hasRdmaTransport() const; + std::vector listTransports(); void *getBaseAddr(); diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 3b6e904246..720420a1c0 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -178,6 +178,8 @@ class TransferEngine { */ bool isTcpOnly() const; + bool hasRdmaTransport() const; + int syncSegmentCache(const std::string& segment_name = ""); std::shared_ptr getMetadata(); diff --git a/mooncake-transfer-engine/include/transfer_engine_impl.h b/mooncake-transfer-engine/include/transfer_engine_impl.h index 976a5ef2f8..48be23ab18 100644 --- a/mooncake-transfer-engine/include/transfer_engine_impl.h +++ b/mooncake-transfer-engine/include/transfer_engine_impl.h @@ -353,6 +353,10 @@ class TransferEngineImpl { bool isTcpOnly() const { return multi_transports_->isTcpOnly(); } + bool hasRdmaTransport() const { + return multi_transports_->hasRdmaTransport(); + } + int syncSegmentCache(const std::string& segment_name = "") { return metadata_->syncSegmentCache(segment_name); } diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index 577712fd31..30eed9f672 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -513,6 +513,10 @@ bool MultiTransport::isTcpOnly() const { return transport_map_.size() == 1 && transport_map_.count("tcp") == 1; } +bool MultiTransport::hasRdmaTransport() const { + return transport_map_.count("rdma") != 0; +} + std::vector MultiTransport::listTransports() { std::vector transport_list; for (auto& entry : transport_map_) diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 77176d7259..fb4405760d 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -192,6 +192,10 @@ device::RdmaTransport* TransferEngine::getOrCreateRdmaTransport( bool TransferEngine::isTcpOnly() const { return impl_->isTcpOnly(); } +bool TransferEngine::hasRdmaTransport() const { + return impl_->hasRdmaTransport(); +} + int TransferEngine::syncSegmentCache(const std::string& segment_name) { return impl_->syncSegmentCache(segment_name); } @@ -621,6 +625,11 @@ bool TransferEngine::isTcpOnly() const { return impl_->isTcpOnly(); } +bool TransferEngine::hasRdmaTransport() const { + if (use_tent_) return false; + return impl_->hasRdmaTransport(); +} + int TransferEngine::syncSegmentCache(const std::string& segment_name) { if (use_tent_) return 0; From 5f2c69d023bb9a7d89f3efe8a9ad8cfd8be3170c Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Thu, 25 Jun 2026 05:33:25 +0000 Subject: [PATCH 14/15] [Store] Gate staging on target protocol Check each target buffer protocol before RDMA staging so CXL replicas do not inherit RDMA-only requirements when RDMA is also installed. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/src/transfer_task.cpp | 8 ++++---- mooncake-transfer-engine/include/multi_transport.h | 2 -- mooncake-transfer-engine/include/transfer_engine.h | 2 -- mooncake-transfer-engine/include/transfer_engine_impl.h | 4 ---- mooncake-transfer-engine/src/multi_transport.cpp | 4 ---- mooncake-transfer-engine/src/transfer_engine.cpp | 9 --------- 6 files changed, 4 insertions(+), 25 deletions(-) diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 7bcd8e7702..0aba748ed9 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -1051,7 +1051,7 @@ std::optional TransferSubmitter::submit( future = submitMemcpyOperation(handle, slices, op_code); break; case TransferStrategy::TRANSFER_ENGINE: { - if (engine_.hasRdmaTransport()) { + if (handle.protocol_ == "rdma") { auto [prepared, staging] = ensureRegisteredForRDMA(slices); if (prepared.empty() && !slices.empty()) { @@ -1115,14 +1115,15 @@ std::optional TransferSubmitter::submit_batch( auto& replica = replicas[i]; auto& slices = all_slices[i]; auto& mem_desc = replica.get_memory_descriptor(); - if (!validateTransferParams(mem_desc.buffer_descriptor, slices)) { + auto& handle = mem_desc.buffer_descriptor; + if (!validateTransferParams(handle, slices)) { return std::nullopt; } // For RDMA WRITE ops, ensure slices are in registered memory. const std::vector* effective_slices = &slices; std::vector prepared; - if (op_code == TransferRequest::WRITE && engine_.hasRdmaTransport()) { + if (op_code == TransferRequest::WRITE && handle.protocol_ == "rdma") { auto [p, staging] = ensureRegisteredForRDMA(slices); if (p.empty() && !slices.empty()) { LOG(ERROR) << "ensureRegisteredForRDMA failed in submit_batch"; @@ -1135,7 +1136,6 @@ std::optional TransferSubmitter::submit_batch( } } - auto& handle = mem_desc.buffer_descriptor; uint64_t offset = 0; SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_); if (seg == static_cast(ERR_INVALID_ARGUMENT)) { diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index 57c144c081..c556541bcd 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -63,8 +63,6 @@ class MultiTransport { */ bool isTcpOnly() const; - bool hasRdmaTransport() const; - std::vector listTransports(); void *getBaseAddr(); diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 720420a1c0..3b6e904246 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -178,8 +178,6 @@ class TransferEngine { */ bool isTcpOnly() const; - bool hasRdmaTransport() const; - int syncSegmentCache(const std::string& segment_name = ""); std::shared_ptr getMetadata(); diff --git a/mooncake-transfer-engine/include/transfer_engine_impl.h b/mooncake-transfer-engine/include/transfer_engine_impl.h index 48be23ab18..976a5ef2f8 100644 --- a/mooncake-transfer-engine/include/transfer_engine_impl.h +++ b/mooncake-transfer-engine/include/transfer_engine_impl.h @@ -353,10 +353,6 @@ class TransferEngineImpl { bool isTcpOnly() const { return multi_transports_->isTcpOnly(); } - bool hasRdmaTransport() const { - return multi_transports_->hasRdmaTransport(); - } - int syncSegmentCache(const std::string& segment_name = "") { return metadata_->syncSegmentCache(segment_name); } diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index 30eed9f672..577712fd31 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -513,10 +513,6 @@ bool MultiTransport::isTcpOnly() const { return transport_map_.size() == 1 && transport_map_.count("tcp") == 1; } -bool MultiTransport::hasRdmaTransport() const { - return transport_map_.count("rdma") != 0; -} - std::vector MultiTransport::listTransports() { std::vector transport_list; for (auto& entry : transport_map_) diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index fb4405760d..77176d7259 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -192,10 +192,6 @@ device::RdmaTransport* TransferEngine::getOrCreateRdmaTransport( bool TransferEngine::isTcpOnly() const { return impl_->isTcpOnly(); } -bool TransferEngine::hasRdmaTransport() const { - return impl_->hasRdmaTransport(); -} - int TransferEngine::syncSegmentCache(const std::string& segment_name) { return impl_->syncSegmentCache(segment_name); } @@ -625,11 +621,6 @@ bool TransferEngine::isTcpOnly() const { return impl_->isTcpOnly(); } -bool TransferEngine::hasRdmaTransport() const { - if (use_tent_) return false; - return impl_->hasRdmaTransport(); -} - int TransferEngine::syncSegmentCache(const std::string& segment_name) { if (use_tent_) return 0; From d24edcdc0a79ffce95f1e62ebfbd1a5a3d1434ac Mon Sep 17 00:00:00 2001 From: Cruz Zhao Date: Thu, 25 Jun 2026 09:18:18 +0000 Subject: [PATCH 15/15] [Store] Preserve zero local buffer write failure Keep copy-style put and upsert APIs failing when no client buffer is configured, while leaving explicit external-buffer write paths unchanged. Co-Authored-By: Claude Opus 4.6 --- mooncake-store/src/real_client.cpp | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 0c75f1cdd8..0113ac5143 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -97,6 +97,13 @@ size_t sum_value_sizes(const std::vector> &values) { return total; } +bool has_client_buffer_for_write( + const std::shared_ptr &client_buffer_allocator, + size_t size) { + if (size == 0) return true; + return client_buffer_allocator && client_buffer_allocator->size() > 0; +} + size_t sum_sizes(const std::vector &sizes) { size_t total = 0; for (size_t size : sizes) { @@ -1706,6 +1713,11 @@ tl::expected RealClient::put_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + value.size_bytes())) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } // Staging is deferred to ensureRegisteredForRDMA in the transfer layer. // For LOCAL_MEMCPY this eliminates one copy; for RDMA with registered // buffers it also eliminates one copy; for RDMA with unregistered buffers @@ -1769,6 +1781,11 @@ tl::expected RealClient::put_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } (void)client_buffer_allocator; std::vector> ordered_batched_slices; @@ -1834,6 +1851,11 @@ tl::expected RealClient::put_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } (void)client_buffer_allocator; // Create slices directly from each part's data pointer. @@ -3684,6 +3706,11 @@ tl::expected RealClient::upsert_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + value.size_bytes())) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } (void)client_buffer_allocator; std::vector slices = @@ -3906,6 +3933,11 @@ tl::expected RealClient::upsert_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } (void)client_buffer_allocator; std::vector slices; @@ -3978,6 +4010,11 @@ tl::expected RealClient::upsert_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; + return tl::unexpected(ErrorCode::INVALID_PARAMS); + } (void)client_buffer_allocator; // staging moved to ensureRegisteredForRDMA std::vector> ordered_batched_slices; ordered_batched_slices.reserve(keys.size());