diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 4a55a2e11e..2949d708f0 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -19,6 +19,7 @@ #include "integration_utils.h" #include "buffer_pool.h" +#include "gpu_staging_utils.h" // Forward declaration for EngramStore bindings namespace mooncake { @@ -798,15 +799,46 @@ class MooncakeStorePyWrapper { const std::vector &keys, const std::vector &infos, const ReplicateConfig &config = ReplicateConfig{}) { - return batch_write_tensor_impl( - keys, infos, config, "put", - [this](const std::vector &write_keys, - const std::vector &buffer_ptrs, - const std::vector &buffer_sizes, - const ReplicateConfig &write_config) { - return store_->batch_put_from(write_keys, buffer_ptrs, - buffer_sizes, write_config); - }); + auto group_ids_error = + ValidateGroupIdsForBatchConfig(config, keys.size(), "put"); + if (!group_ids_error.empty()) return group_ids_error; + + std::vector results(keys.size(), 0); + { + py::gil_scoped_release release_gil; + + std::vector valid_keys; + std::vector> all_buffers; + std::vector> all_sizes; + std::vector original_indices; + + for (size_t i = 0; i < infos.size(); ++i) { + if (!infos[i].valid()) { + results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); + continue; + } + valid_keys.push_back(keys[i]); + all_buffers.push_back( + {const_cast( + reinterpret_cast(&infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); + all_sizes.push_back({infos[i].metadata.header.data_offset, + infos[i].tensor_size}); + original_indices.push_back(i); + } + + if (!valid_keys.empty()) { + ReplicateConfig write_config = + MakeIndexedConfig(config, original_indices); + std::vector op_results = + store_->batch_put_from_multi_buffers( + valid_keys, all_buffers, all_sizes, write_config); + for (size_t i = 0; i < op_results.size(); ++i) { + results[original_indices[i]] = op_results[i]; + } + } + } + return results; } std::vector batch_put_tensor_impl( @@ -1366,55 +1398,34 @@ class MooncakeStorePyWrapper { results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); } - // 2. Prepare Buffers and Execute (GIL Released) + // 2. Zero-copy: pass metadata and data as separate buffers { py::gil_scoped_release release_gil; std::vector valid_keys; - std::vector buffer_ptrs; - std::vector buffer_sizes; + std::vector> all_buffers; + std::vector> all_sizes; std::vector original_indices; - std::vector> temp_allocations; - for (size_t i = 0; i < infos.size(); ++i) { if (!infos[i].valid()) continue; - size_t total_size = - sizeof(TensorMetadata) + infos[i].tensor_size; - auto alloc_result = - store_->client_buffer_allocator_->allocate(total_size); - - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for key: " << keys[i]; - results[i] = to_py_ret(ErrorCode::INVALID_PARAMS); - continue; - } - - // Copy Metadata & Data - char *dst = static_cast(alloc_result->ptr()); - memcpy(dst, &infos[i].metadata, sizeof(TensorMetadata)); - if (infos[i].tensor_size > 0) { - memcpy(dst + sizeof(TensorMetadata), - reinterpret_cast(infos[i].data_ptr), - infos[i].tensor_size); - } - valid_keys.push_back(keys[i]); - buffer_ptrs.push_back(alloc_result->ptr()); - buffer_sizes.push_back(total_size); + all_buffers.push_back( + {const_cast( + reinterpret_cast(&infos[i].metadata)), + reinterpret_cast(infos[i].data_ptr)}); + all_sizes.push_back({infos[i].metadata.header.data_offset, + infos[i].tensor_size}); original_indices.push_back(i); - - temp_allocations.push_back( - std::make_unique(std::move(*alloc_result))); } if (!valid_keys.empty()) { ReplicateConfig write_config = MakeIndexedConfig(config, original_indices); - std::vector op_results = store_->batch_upsert_from( - valid_keys, buffer_ptrs, buffer_sizes, write_config); + std::vector op_results = + store_->batch_upsert_from_multi_buffers( + valid_keys, all_buffers, all_sizes, write_config); for (size_t i = 0; i < op_results.size(); ++i) { results[original_indices[i]] = op_results[i]; } @@ -2504,6 +2515,25 @@ PYBIND11_MODULE(store, m) { py::arg("config") = ReplicateConfig{}, "Upsert object data directly from pre-allocated buffers for " "multiple keys") + .def( + "batch_upsert_from_multi_buffers", + [](MooncakeStorePyWrapper &self, + const std::vector &keys, + const std::vector> &all_buffer_ptrs, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}) { + if (!self.is_client_initialized()) { + LOG(ERROR) << "Client is not initialized"; + return std::vector{}; + } + py::gil_scoped_release release; + return self.store_->batch_upsert_from_multi_buffers( + keys, CastAddrs2Ptrs(all_buffer_ptrs), all_sizes, config); + }, + py::arg("keys"), py::arg("all_buffer_ptrs"), py::arg("all_sizes"), + py::arg("config") = ReplicateConfig{}, + "Upsert object data directly from multiple pre-allocated buffers " + "for multiple keys") .def( "upsert", [](MooncakeStorePyWrapper &self, const std::string &key, diff --git a/mooncake-integration/store/store_py_parallel_read.h b/mooncake-integration/store/store_py_parallel_read.h index 06207cdf55..e61ebf4a7f 100644 --- a/mooncake-integration/store/store_py_parallel_read.h +++ b/mooncake-integration/store/store_py_parallel_read.h @@ -1467,7 +1467,7 @@ std::vector execute_tensor_into_plan_transfers( continue; } if (plans[plan_idx].materialized_metadata.has_value()) { - std::memcpy( + gpu_staging::MemcpySafe( reinterpret_cast(plans[plan_idx].user_buffer_ptr), &*plans[plan_idx].materialized_metadata, sizeof(TensorMetadata)); diff --git a/mooncake-integration/store/store_py_parallel_write.h b/mooncake-integration/store/store_py_parallel_write.h index 2bba450a8f..c1e3a548aa 100644 --- a/mooncake-integration/store/store_py_parallel_write.h +++ b/mooncake-integration/store/store_py_parallel_write.h @@ -59,9 +59,10 @@ std::vector batch_write_tensor_impl(const std::vector &keys, std::memcpy(dst, &infos[i].metadata, infos[i].metadata.header.data_offset); if (infos[i].tensor_size > 0) { - std::memcpy(dst + infos[i].metadata.header.data_offset, - reinterpret_cast(infos[i].data_ptr), - infos[i].tensor_size); + gpu_staging::MemcpySafe( + dst + infos[i].metadata.header.data_offset, + reinterpret_cast(infos[i].data_ptr), + infos[i].tensor_size); } valid_keys.push_back(keys[i]); diff --git a/mooncake-store/include/client_service.h b/mooncake-store/include/client_service.h index 0094f4f14d..7d68b5c5d3 100644 --- a/mooncake-store/include/client_service.h +++ b/mooncake-store/include/client_service.h @@ -64,6 +64,15 @@ class Client { public: virtual ~Client(); + /** + * @brief Reinitialize TransferSubmitter with a staging allocator. + * + * Called by RealClient after its client buffer allocator is ready, + * so that ensureRegisteredForRDMA can stage unregistered user buffers. + */ + void setStagingAllocator( + std::shared_ptr staging_allocator); + const UUID& getClientId() const { return client_id_; } const std::string& tenant_id() const { return master_client_.tenant_id(); } @@ -668,7 +677,8 @@ class Client { const std::string& local_hostname, const std::string& metadata_connstring, const std::string& protocol, const std::optional& device_names); - void InitTransferSubmitter(); + void InitTransferSubmitter( + std::shared_ptr staging_allocator = nullptr); ErrorCode TransferData(const Replica::Descriptor& replica_descriptor, std::vector& slices, TransferRequest::OpCode op_code); diff --git a/mooncake-store/include/gpu_staging_utils.h b/mooncake-store/include/gpu_staging_utils.h index 07982aa015..41e72f5d38 100644 --- a/mooncake-store/include/gpu_staging_utils.h +++ b/mooncake-store/include/gpu_staging_utils.h @@ -7,6 +7,12 @@ #endif #include +#include +#include +#include +#include +#include +#include #include namespace mooncake { @@ -107,6 +113,51 @@ inline void SetDevice(int device_id) { #endif } +class DeviceGuard { + public: + explicit DeviceGuard(int device_id) { + if (device_id < 0) return; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (cudaGetDevice(&previous_device_) == cudaSuccess) { + active_ = true; + } + cudaSetDevice(device_id); +#elif defined(USE_HIP) + if (hipGetDevice(&previous_device_) == hipSuccess) { + active_ = true; + } + hipSetDevice(device_id); +#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) + if (aclrtGetDevice(&previous_device_) == ACL_SUCCESS) { + active_ = true; + } + aclrtSetDevice(device_id); +#else + (void)device_id; +#endif + } + + ~DeviceGuard() { + if (!active_) return; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + cudaSetDevice(previous_device_); +#elif defined(USE_HIP) + hipSetDevice(previous_device_); +#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) + aclrtSetDevice(previous_device_); +#endif + } + + DeviceGuard(const DeviceGuard&) = delete; + DeviceGuard& operator=(const DeviceGuard&) = delete; + + private: + int previous_device_{-1}; + bool active_{false}; +}; + // Copy host memory to device. Caller must have called SetDevice first. inline bool CopyHostToDevice(void* dst, const void* src, size_t size) { #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ @@ -162,5 +213,110 @@ inline bool IsHostPointer(const void* ptr) { return true; // CPU-only build: all pointers are host #endif } +/// GPU-safe memcpy: auto-detects pointer types and dispatches to CopyAuto +/// for GPU pointers, std::memcpy for host pointers. +inline bool MemcpySafe(void* dst, const void* src, size_t size) { + if (size == 0) return true; + int src_dev = -1, dst_dev = -1; + bool src_gpu = IsDevicePointer(src, &src_dev); + bool dst_gpu = IsDevicePointer(dst, &dst_dev); + if (!src_gpu && !dst_gpu) { + std::memcpy(dst, src, size); + return true; + } + int dev = src_gpu ? src_dev : dst_dev; + DeviceGuard guard(dev); + return CopyAuto(dst, src, size); +} + +inline bool PinMemoryEnabled() { + const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY"); + return !(pin_env && + (std::string(pin_env) == "0" || std::string(pin_env) == "false")); +} + +inline size_t PinMemoryMaxBytes() { + const char* max_env = std::getenv("MC_STORE_PIN_MEMORY_MAX_BYTES"); + if (!max_env || max_env[0] == '\0') return 0; + char* end = nullptr; + unsigned long long value = std::strtoull(max_env, &end, 10); + if (end == max_env) return 0; + if (value > std::numeric_limits::max()) { + return std::numeric_limits::max(); + } + return static_cast(value); +} + +std::mutex& PinnedHostMemoryMutex(); + +size_t& PinnedHostMemoryBytes(); + +std::unordered_map& PinnedHostMemoryRegions(); + +inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (!PinMemoryEnabled() || ptr == nullptr || size == 0) return false; + + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + if (regions.find(ptr) != regions.end()) return true; + + size_t max_bytes = PinMemoryMaxBytes(); + size_t current = PinnedHostMemoryBytes(); + if (max_bytes > 0 && (current > max_bytes || size > max_bytes - current)) { + LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" << size + << " because MC_STORE_PIN_MEMORY_MAX_BYTES=" << max_bytes + << " current_pinned=" << current; + return false; + } + + auto cuda_ret = cudaHostRegister(ptr, size, 0); + if (cuda_ret != cudaSuccess) { + LOG(WARNING) << "cudaHostRegister failed for " << name + << " size=" << size << ": " << cudaGetErrorString(cuda_ret) + << "; GPU copies will use pageable fallback"; + return false; + } + + PinnedHostMemoryBytes() += size; + regions[ptr] = size; + LOG(INFO) << "cudaHostRegister OK for " << name << ", size=" << size; + return true; +#else + (void)ptr; + (void)size; + (void)name; + return false; +#endif +} + +inline void UnpinHostMemory(void* ptr, const char* name) { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + if (ptr == nullptr) return; + + std::lock_guard lock(PinnedHostMemoryMutex()); + auto& regions = PinnedHostMemoryRegions(); + auto it = regions.find(ptr); + if (it == regions.end()) return; + + size_t size = it->second; + auto cuda_ret = cudaHostUnregister(ptr); + if (cuda_ret != cudaSuccess) { + LOG(WARNING) << "cudaHostUnregister failed for " << name + << " size=" << size << ": " + << cudaGetErrorString(cuda_ret); + return; + } + + regions.erase(it); + PinnedHostMemoryBytes() -= size; +#else + (void)ptr; + (void)name; +#endif +} + } // namespace gpu_staging } // namespace mooncake diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index c2e2201c5d..3b87b136a8 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -318,6 +318,17 @@ class PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}) = 0; + virtual std::vector batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}) { + (void)all_buffers; + (void)all_sizes; + (void)config; + return std::vector(keys.size(), -1); + } + virtual int upsert_parts( const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}) = 0; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 0678964497..361bcb30a5 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -168,28 +168,28 @@ class RealClient : public PyClient { /** * @brief Put object data directly from a pre-allocated buffer * @param key Key of the object to put - * @param buffer Pointer to Store-managed registered memory, either - * explicitly registered with register_buffer() or inside the setup-time - * local buffer + * @param buffer Pointer to the source data buffer * @param size Size of the data to put * @return 0 on success, negative value on error - * @note The buffer address must resolve to Store-managed registered memory, - * either an explicit register_buffer() region or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ int put_from(const std::string &key, void *buffer, size_t size, const ReplicateConfig &config = ReplicateConfig{}); /** - * @brief Put one object directly from a registered data buffer plus a - * registered metadata buffer + * @brief Put one object directly from a data buffer plus a metadata buffer * @param key Key of the object to put - * @param buffer Pointer to the registered data buffer - * @param metadata_buffer Pointer to the registered metadata buffer + * @param buffer Pointer to the data buffer + * @param metadata_buffer Pointer to the metadata buffer * @param size Size of the data buffer in bytes * @param metadata_size Size of the metadata buffer in bytes * @param config Replication configuration * @return 0 on success, negative value on error + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ int put_from_with_metadata( const std::string &key, void *buffer, void *metadata_buffer, @@ -205,9 +205,9 @@ class RealClient : public PyClient { * @param config Replication configuration * @return Vector of integers, where each element is 0 on success, or a * negative value on error - * @note The buffer addresses must resolve to Store-managed registered - * memory, either explicit register_buffer() regions or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ std::vector batch_put_from( @@ -225,9 +225,9 @@ class RealClient : public PyClient { * @param config Replication configuration * @return Vector of integers, where each element is 0 on success, or a * negative value on error - * @note The buffer addresses must resolve to Store-managed registered - * memory, either explicit register_buffer() regions or the setup-time local - * buffer + * @note Registered buffers are used directly for RDMA writes. Unregistered + * buffers are staged in the transfer layer when RDMA requires registered + * source memory. */ std::vector batch_put_from_multi_buffers( const std::vector &keys, @@ -254,6 +254,12 @@ class RealClient : public PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}); + std::vector batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}); + int upsert_parts(const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}); @@ -593,6 +599,13 @@ class RealClient : public PyClient { const std::vector &buffers, const std::vector &sizes, const ReplicateConfig &config = ReplicateConfig{}); + std::vector> + batch_upsert_from_multi_buffers_internal( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config = ReplicateConfig{}); + tl::expected upsert_parts_internal( const std::string &key, std::vector> values, const ReplicateConfig &config = ReplicateConfig{}, diff --git a/mooncake-store/include/transfer_task.h b/mooncake-store/include/transfer_task.h index 63b061914d..b65e4bbace 100644 --- a/mooncake-store/include/transfer_task.h +++ b/mooncake-store/include/transfer_task.h @@ -20,6 +20,7 @@ #include "replica.h" #include "storage_backend.h" #include "client_metric.h" +#include "client_buffer.hpp" #ifdef USE_NOF #include "spdk/spdk_wrapper.h" #endif @@ -285,8 +286,17 @@ class TransferFuture { */ TransferStrategy strategy() const; + /** + * @brief Attach staging buffer handles that must stay alive until transfer + * completes. Used by ensureRegisteredForRDMA to keep staging memory valid. + */ + void attachStagingHandles(std::vector handles) { + staging_handles_ = std::move(handles); + } + private: std::shared_ptr state_; + std::vector staging_handles_; }; /** @@ -528,11 +538,11 @@ class FilereadWorkerPool { */ class TransferSubmitter { public: - explicit TransferSubmitter(TransferEngine& engine, - std::shared_ptr& backend, - const std::string& local_hostname, - TransferMetric* transfer_metric = nullptr, - int numa_socket_id = 0); + explicit TransferSubmitter( + TransferEngine& engine, std::shared_ptr& backend, + const std::string& local_hostname, + std::shared_ptr staging_allocator, + TransferMetric* transfer_metric = nullptr, int numa_socket_id = 0); /** * @brief Submit an asynchronous transfer operation @@ -585,11 +595,23 @@ class TransferSubmitter { const std::string& local_endpoint); private: + /** + * @brief Ensure all slices are in RDMA-registered memory. + * + * For each slice NOT registered with the transfer engine, copies its + * data into a staging buffer (from staging_allocator_) that IS registered. + * Returns the prepared slices and staging handles that must stay alive + * until the transfer completes. + */ + std::pair, std::vector> + ensureRegisteredForRDMA(const std::vector& slices); + TransferEngine& engine_; // Cached at construction: the local transport endpoint never changes for // the lifetime of the TransferSubmitter, so we avoid calling // engine_.getLocalIpAndPort() (which allocates a string) on every transfer. const std::string local_endpoint_; + std::shared_ptr staging_allocator_; std::unique_ptr memcpy_pool_; #ifdef USE_NOF std::unique_ptr spdk_nvmf_pool_; diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index e8887223e3..3cf476addc 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -28,6 +28,7 @@ set(MOONCAKE_STORE_SOURCES shm_helper.cpp http_metadata_server.cpp file_storage.cpp + gpu_staging_utils.cpp serialize/serializer.cpp storage/distributed/distributed_storage_backend.cpp ha/leadership/leader_coordinator_factory.cpp diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 7210e0d52b..03a3ea0404 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -34,6 +34,7 @@ #include "ha/leadership/leader_coordinator_factory.h" #include "types.h" #include "client_buffer.hpp" +#include "gpu_staging_utils.h" #include "utils.h" #include "rpc_types.h" #include "local_hot_cache.h" @@ -832,7 +833,8 @@ ErrorCode Client::InitTransferEngine( return ErrorCode::OK; } -void Client::InitTransferSubmitter() { +void Client::InitTransferSubmitter( + std::shared_ptr staging_allocator) { // Initialize TransferSubmitter after transfer engine is ready // Keep using logical local_hostname for name-based behaviors; endpoint is // used separately where needed. @@ -841,14 +843,21 @@ void Client::InitTransferSubmitter() { GetConfiguredNumaSocketId().value_or(GetCurrentNumaSocketId()); transfer_submitter_ = std::make_unique( *transfer_engine_, storage_backend_, local_hostname_, + std::move(staging_allocator), metrics_ ? &metrics_->transfer_metric : nullptr, numa_socket_id); #else transfer_submitter_ = std::make_unique( *transfer_engine_, storage_backend_, local_hostname_, + std::move(staging_allocator), metrics_ ? &metrics_->transfer_metric : nullptr); #endif } +void Client::setStagingAllocator( + std::shared_ptr staging_allocator) { + InitTransferSubmitter(std::move(staging_allocator)); +} + std::optional> Client::Create( const std::string& local_hostname, const std::string& metadata_connstring, const std::string& protocol, const std::optional& device_names, @@ -2707,6 +2716,12 @@ tl::expected Client::UnmountSegmentImpl( return tl::unexpected(err); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + gpu_staging::UnpinHostMemory(reinterpret_cast(it->second.base), + "segment"); +#endif + int rc = transfer_engine_->unregisterLocalMemory( reinterpret_cast(it->second.base)); if (rc != 0) { @@ -2780,6 +2795,16 @@ tl::expected Client::MountSegmentAndGetId( return tl::unexpected(ErrorCode::INVALID_PARAMS); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + { + // Pin segment buffer by default so GPU→host copies use DMA. + // Opt out with MC_STORE_PIN_MEMORY=0. + // Cap with MC_STORE_PIN_MEMORY_MAX_BYTES=N. + gpu_staging::TryPinHostMemory((void*)buffer, size, "segment"); + } +#endif + Segment segment; segment.id = generate_uuid(); segment.name = local_hostname_; @@ -2797,6 +2822,17 @@ tl::expected Client::MountSegmentAndGetId( ErrorCode err = mount_result.error(); LOG(ERROR) << "mount_segment_to_master_failed base=" << buffer << " size=" << size << ", error=" << err; +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + gpu_staging::UnpinHostMemory((void*)buffer, "segment"); +#endif + int unregister_rc = + transfer_engine_->unregisterLocalMemory((void*)buffer); + if (unregister_rc != 0) { + LOG(WARNING) << "Failed to unregister transfer buffer after " + "mount failure, ret=" + << unregister_rc; + } return tl::unexpected(err); } diff --git a/mooncake-store/src/gpu_staging_utils.cpp b/mooncake-store/src/gpu_staging_utils.cpp new file mode 100644 index 0000000000..6d7efda86d --- /dev/null +++ b/mooncake-store/src/gpu_staging_utils.cpp @@ -0,0 +1,22 @@ +#include "gpu_staging_utils.h" + +namespace mooncake { +namespace gpu_staging { + +std::mutex& PinnedHostMemoryMutex() { + static std::mutex mutex; + return mutex; +} + +size_t& PinnedHostMemoryBytes() { + static size_t bytes = 0; + return bytes; +} + +std::unordered_map& PinnedHostMemoryRegions() { + static std::unordered_map regions; + return regions; +} + +} // namespace gpu_staging +} // namespace mooncake diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 81fd32d8e6..0113ac5143 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -97,6 +97,13 @@ size_t sum_value_sizes(const std::vector> &values) { return total; } +bool has_client_buffer_for_write( + const std::shared_ptr &client_buffer_allocator, + size_t size) { + if (size == 0) return true; + return client_buffer_allocator && client_buffer_allocator->size() > 0; +} + size_t sum_sizes(const std::vector &sizes) { size_t total = 0; for (size_t size : sizes) { @@ -270,7 +277,7 @@ inline tl::expected scatter_host_to_maybe_device( void *dst, const void *src, size_t size, const std::string &context) { int device_id = -1; if (gpu_staging::IsDevicePointer(dst, &device_id)) { - gpu_staging::SetDevice(device_id); + gpu_staging::DeviceGuard guard(device_id); if (!gpu_staging::CopyHostToDevice(dst, src, size)) { LOG(ERROR) << "H2D copy failed: " << context; return tl::unexpected(ErrorCode::TRANSFER_FAIL); @@ -761,6 +768,17 @@ tl::expected RealClient::setup_internal( << toString(result.error()); return tl::unexpected(result.error()); } +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + { + // Pin staging buffer by default so GPU→host copies use DMA. + // Opt out with MC_STORE_PIN_MEMORY=0. + // Cap with MC_STORE_PIN_MEMORY_MAX_BYTES=N. + gpu_staging::TryPinHostMemory(client_buffer_allocator_->getBase(), + local_buffer_size, + "client staging buffer"); + } +#endif { std::unique_lock lock(registered_buffer_mutex_); local_buffer_region_ = WritableBufferRegion{ @@ -773,6 +791,15 @@ tl::expected RealClient::setup_internal( LOG(INFO) << "Local buffer size is 0, skip registering local memory"; } + // Reinitialize TransferSubmitter with the staging allocator so that + // ensureRegisteredForRDMA can stage unregistered user buffers for RDMA. + client_->setStagingAllocator(client_buffer_allocator_); + + auto cleanup_on_setup_failure = [this](ErrorCode error) { + tearDownAll_internal(); + return tl::unexpected(error); + }; + // If global_segment_size is 0, skip mount segment; // If global_segment_size is larger than max_mr_size, split to multiple // mapped_shms. @@ -786,7 +813,7 @@ tl::expected RealClient::setup_internal( cxl_dev_size = static_cast(val); } else { LOG(FATAL) << "MC_CXL_DEV_SIZE not set"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); + return cleanup_on_setup_failure(ErrorCode::INVALID_PARAMS); } void *ptr = client_->GetBaseAddr(); @@ -796,7 +823,7 @@ tl::expected RealClient::setup_internal( if (!mount_result.has_value()) { LOG(ERROR) << "Failed to mount segment: " << toString(mount_result.error()); - return tl::unexpected(mount_result.error()); + return cleanup_on_setup_failure(mount_result.error()); } } else { @@ -855,7 +882,7 @@ tl::expected RealClient::setup_internal( if (!ptr) { LOG(ERROR) << "Failed to allocate segment memory"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); + return cleanup_on_setup_failure(ErrorCode::INVALID_PARAMS); } if (this->protocol == "ascend" || this->protocol == "ubshmem") { ascend_segment_ptrs_.emplace_back( @@ -876,7 +903,7 @@ tl::expected RealClient::setup_internal( if (!mount_result.has_value()) { LOG(ERROR) << "Failed to mount segment: " << toString(mount_result.error()); - return tl::unexpected(mount_result.error()); + return cleanup_on_setup_failure(mount_result.error()); } } if (total_glbseg_size == 0) { @@ -888,7 +915,7 @@ tl::expected RealClient::setup_internal( if (!ipc_socket_path_.empty()) { if (start_ipc_server() != 0) { LOG(ERROR) << "Failed to start IPC server at " << ipc_socket_path_; - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return cleanup_on_setup_failure(ErrorCode::INTERNAL_ERROR); } LOG(INFO) << "Starting IPC server at " << ipc_socket_path_; } @@ -907,7 +934,7 @@ tl::expected RealClient::setup_internal( LOG(ERROR) << "Failed to start offload RPC server: " << err.message(); offload_rpc_server_.reset(); - return tl::unexpected(ErrorCode::INTERNAL_ERROR); + return cleanup_on_setup_failure(ErrorCode::INTERNAL_ERROR); } offload_rpc_port_ = offload_rpc_server_->port(); LOG(INFO) << "Offload RPC server started on port " << offload_rpc_port_; @@ -928,7 +955,7 @@ tl::expected RealClient::setup_internal( if (!init_result) { LOG(ERROR) << "file storage init failed with error: " << init_result.error(); - return init_result; + return cleanup_on_setup_failure(init_result.error()); } } client_requester_ = std::make_shared(); @@ -1099,6 +1126,11 @@ tl::expected RealClient::tearDownAll_internal() { } if (client_buffer_allocator_ && client_buffer_allocator_->size() > 0 && protocol != "cxl") { +#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \ + defined(USE_HYGON) || defined(USE_COREX) + gpu_staging::UnpinHostMemory(client_buffer_allocator_->getBase(), + "client staging buffer"); +#endif auto unregister_result = client_->unregisterLocalMemory( client_buffer_allocator_->getBase(), true); if (!unregister_result) { @@ -1681,20 +1713,19 @@ tl::expected RealClient::put_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto alloc_result = client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for put operation, key: " - << key << ", value size: " << value.size(); + if (!has_client_buffer_for_write(client_buffer_allocator, + value.size_bytes())) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + // Staging is deferred to ensureRegisteredForRDMA in the transfer layer. + // For LOCAL_MEMCPY this eliminates one copy; for RDMA with registered + // buffers it also eliminates one copy; for RDMA with unregistered buffers + // the total work is the same (staging just happens later). + (void)client_buffer_allocator; - std::vector slices = split_into_slices(buffer_handle); + std::vector slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); auto put_result = client_->Put(key, slices, config); if (!put_result) { @@ -1750,43 +1781,18 @@ tl::expected RealClient::put_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - std::vector buffer_handles; - std::unordered_map> batched_slices; - batched_slices.reserve(keys.size()); + (void)client_buffer_allocator; - for (size_t i = 0; i < keys.size(); ++i) { - auto &key = keys[i]; - auto &value = values[i]; - auto alloc_result = - client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for put_batch operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); - auto slices = split_into_slices(buffer_handle); - buffer_handles.emplace_back(std::move(*alloc_result)); - batched_slices.emplace(key, std::move(slices)); - } - - // Convert unordered_map to vector format expected by BatchPut std::vector> ordered_batched_slices; ordered_batched_slices.reserve(keys.size()); - for (const auto &key : keys) { - auto it = batched_slices.find(key); - if (it != batched_slices.end()) { - ordered_batched_slices.emplace_back(it->second); - } else { - LOG(ERROR) << "Missing slices for key: " << key; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + for (size_t i = 0; i < keys.size(); ++i) { + ordered_batched_slices.emplace_back(split_into_slices( + const_cast(values[i].data()), values[i].size_bytes())); } auto results = client_->BatchPut(keys, ordered_batched_slices, config); @@ -1845,44 +1851,30 @@ tl::expected RealClient::put_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + (void)client_buffer_allocator; - // Calculate total size needed - size_t total_size = 0; + // Create slices directly from each part's data pointer. + // Parts are stored sequentially in the object — the server assembles + // them in order. No staging copy needed; ensureRegisteredForRDMA handles + // RDMA registration if required. + std::vector slices; for (const auto &value : values) { - total_size += value.size_bytes(); + if (value.size_bytes() == 0) continue; + auto part_slices = split_into_slices(const_cast(value.data()), + value.size_bytes()); + slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } - if (total_size == 0) { + if (slices.empty()) { LOG(WARNING) << "Attempting to put empty data for key: " << key; return {}; } - // Allocate buffer using the new allocator - auto alloc_result = client_buffer_allocator->allocate(total_size); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for put_parts operation, key: " - << key << ", total size: " << total_size; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - - auto &buffer_handle = *alloc_result; - - // Copy all parts into the contiguous buffer - size_t offset = 0; - for (const auto &value : values) { - memcpy(static_cast(buffer_handle.ptr()) + offset, value.data(), - value.size_bytes()); - offset += value.size_bytes(); - } - - // Split into slices - std::vector slices = split_into_slices(buffer_handle); - - // Perform the put operation - buffer_handle will be automatically released auto put_result = client_->Put(key, slices, config); if (!put_result) { LOG(ERROR) << "Put operation failed with error: " @@ -3652,8 +3644,9 @@ std::vector> RealClient::batch_put_from_internal( tl::expected RealClient::put_from_internal( const std::string &key, void *buffer, size_t size, const ReplicateConfig &config) { - // NOTE: The buffer address must resolve to Store-managed registered - // memory for zero-copy RDMA operations to work correctly + // Registered buffers are used directly for RDMA writes. Unregistered + // buffers are staged in the transfer layer when RDMA requires registered + // source memory. if (config.prefer_alloc_in_same_node) { LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; return tl::unexpected(ErrorCode::INVALID_PARAMS); @@ -3713,20 +3706,15 @@ tl::expected RealClient::upsert_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto alloc_result = client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) << "Failed to allocate buffer for upsert operation, key: " - << key << ", value size: " << value.size(); + if (!has_client_buffer_for_write(client_buffer_allocator, + value.size_bytes())) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); + (void)client_buffer_allocator; - std::vector slices = split_into_slices(buffer_handle); + std::vector slices = + split_into_slices(const_cast(value.data()), value.size_bytes()); auto result = client_->Upsert(key, slices, config); if (!result) { @@ -3945,38 +3933,26 @@ tl::expected RealClient::upsert_parts_internal( LOG(ERROR) << "Client is not initialized"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } + (void)client_buffer_allocator; - size_t total_size = 0; + std::vector slices; for (const auto &value : values) { - total_size += value.size_bytes(); + if (value.size_bytes() == 0) continue; + auto part_slices = split_into_slices(const_cast(value.data()), + value.size_bytes()); + slices.insert(slices.end(), part_slices.begin(), part_slices.end()); } - if (total_size == 0) { + + if (slices.empty()) { LOG(WARNING) << "Attempting to upsert empty data for key: " << key; return {}; } - auto alloc_result = client_buffer_allocator->allocate(total_size); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for upsert_parts operation, key: " - << key << ", total size: " << total_size; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - - auto &buffer_handle = *alloc_result; - size_t offset = 0; - for (const auto &value : values) { - memcpy(static_cast(buffer_handle.ptr()) + offset, value.data(), - value.size_bytes()); - offset += value.size_bytes(); - } - - std::vector slices = split_into_slices(buffer_handle); - auto result = client_->Upsert(key, slices, config); if (!result) { LOG(ERROR) << "Upsert operation failed with error: " @@ -4034,43 +4010,17 @@ tl::expected RealClient::upsert_batch_internal( LOG(ERROR) << "Key and value size mismatch"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - if (!client_buffer_allocator) { - LOG(ERROR) << "Client buffer allocator is not provided"; + if (!has_client_buffer_for_write(client_buffer_allocator, + sum_value_sizes(values))) { + LOG(ERROR) << "Client buffer allocator is not available"; return tl::unexpected(ErrorCode::INVALID_PARAMS); } - std::vector buffer_handles; - std::unordered_map> batched_slices; - batched_slices.reserve(keys.size()); - - for (size_t i = 0; i < keys.size(); ++i) { - auto &key = keys[i]; - auto &value = values[i]; - auto alloc_result = - client_buffer_allocator->allocate(value.size_bytes()); - if (!alloc_result) { - LOG(ERROR) - << "Failed to allocate buffer for upsert_batch operation, key: " - << key << ", value size: " << value.size(); - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } - auto &buffer_handle = *alloc_result; - memcpy(buffer_handle.ptr(), value.data(), value.size_bytes()); - auto slices = split_into_slices(buffer_handle); - buffer_handles.emplace_back(std::move(*alloc_result)); - batched_slices.emplace(key, std::move(slices)); - } - - // Convert unordered_map to vector format expected by BatchUpsert + (void)client_buffer_allocator; // staging moved to ensureRegisteredForRDMA std::vector> ordered_batched_slices; ordered_batched_slices.reserve(keys.size()); - for (const auto &key : keys) { - auto it = batched_slices.find(key); - if (it != batched_slices.end()) { - ordered_batched_slices.emplace_back(it->second); - } else { - LOG(ERROR) << "Missing slices for key: " << key; - return tl::unexpected(ErrorCode::INVALID_PARAMS); - } + for (size_t i = 0; i < keys.size(); ++i) { + ordered_batched_slices.emplace_back(split_into_slices( + const_cast(values[i].data()), values[i].size_bytes())); } auto results = client_->BatchUpsert(keys, ordered_batched_slices, config); @@ -4691,8 +4641,9 @@ int RealClient::put_from_with_metadata(const std::string &key, void *buffer, size_t metadata_size, const ReplicateConfig &config) { const auto start_time = std::chrono::steady_clock::now(); - // NOTE: The buffer address must resolve to Store-managed registered - // memory for zero-copy RDMA operations to work correctly + // Registered buffers are used directly for RDMA writes. Unregistered + // buffers are staged in the transfer layer when RDMA requires registered + // source memory. if (config.prefer_alloc_in_same_node) { LOG(ERROR) << "prefer_alloc_in_same_node is not supported."; return -1; @@ -4810,6 +4761,76 @@ RealClient::batch_put_from_multi_buffers_internal( return client_->BatchPut(keys, batched_slices, config); } +std::vector RealClient::batch_upsert_from_multi_buffers( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config) { + auto internal_results = + execute_timed_operation>>( + [&]() { + return batch_upsert_from_multi_buffers_internal( + keys, all_buffers, all_sizes, config); + }, + [](const auto &) { return true; }, + [&](uint64_t latency_us, const auto &ret) { + std::vector py_results; + py_results.reserve(ret.size()); + for (const auto &item : ret) { + py_results.push_back(to_py_ret(item)); + } + client_->ObserveTransferOperation( + TransferOperationKind::kWrite, + "batch_upsert_from_multi_buffers", + sum_successful_nested_sizes(py_results, all_sizes), + latency_us); + }); + std::vector results; + results.reserve(internal_results.size()); + + for (const auto &result : internal_results) { + results.push_back(to_py_ret(result)); + } + + return results; +} + +std::vector> +RealClient::batch_upsert_from_multi_buffers_internal( + const std::vector &keys, + const std::vector> &all_buffers, + const std::vector> &all_sizes, + const ReplicateConfig &config) { + if (!client_) { + LOG(ERROR) << "Client is not initialized"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + if ((keys.size() != all_buffers.size()) || + (all_buffers.size() != all_sizes.size())) { + LOG(ERROR) << "Mismatched sizes for keys, buffers, and sizes"; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + + std::vector> batched_slices(keys.size()); + for (size_t i = 0; i < all_buffers.size(); ++i) { + const auto &buffers = all_buffers[i]; + const auto &sizes = all_sizes[i]; + if (buffers.size() != sizes.size()) { + LOG(ERROR) << "Mismatched buffers and sizes of key:" << keys[i]; + return std::vector>( + keys.size(), tl::unexpected(ErrorCode::INVALID_PARAMS)); + } + batched_slices[i].reserve(buffers.size()); + for (size_t j = 0; j < buffers.size(); ++j) { + batched_slices[i].emplace_back(Slice{buffers[j], sizes[j]}); + } + } + return client_->BatchUpsert(keys, batched_slices, config); +} + std::vector RealClient::batch_get_into_multi_buffers( const std::vector &keys, const std::vector> &all_buffers, diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 5995e3c4fd..0aba748ed9 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -661,7 +662,7 @@ void MemcpyWorkerPool::workerThread() { std::memcpy(op.dest, op.src, op.size); } else { int dev = src_on_gpu ? src_dev : dst_dev; - gpu_staging::SetDevice(dev); + gpu_staging::DeviceGuard guard(dev); if (!gpu_staging::CopyAuto(op.dest, op.src, op.size)) { LOG(ERROR) << "GPU memcpy failed: src_dev=" << src_dev @@ -900,13 +901,14 @@ TransferStrategy TransferFuture::strategy() const { // TransferSubmitter Implementation // ============================================================================ -TransferSubmitter::TransferSubmitter(TransferEngine& engine, - std::shared_ptr& backend, - const std::string& local_hostname, - TransferMetric* transfer_metric, - int numa_socket_id) +TransferSubmitter::TransferSubmitter( + TransferEngine& engine, std::shared_ptr& backend, + const std::string& local_hostname, + std::shared_ptr staging_allocator, + TransferMetric* transfer_metric, int numa_socket_id) : engine_(engine), local_endpoint_(engine.getLocalIpAndPort()), + staging_allocator_(std::move(staging_allocator)), memcpy_pool_(std::make_unique()), #ifdef USE_NOF spdk_nvmf_pool_(std::make_unique(numa_socket_id)), @@ -947,6 +949,85 @@ TransferSubmitter::TransferSubmitter(TransferEngine& engine, << memcpy_enabled_; } +std::pair, std::vector> +TransferSubmitter::ensureRegisteredForRDMA(const std::vector& slices) { + // First pass: identify unregistered slices and compute total staging size. + // Batching into one contiguous allocation avoids per-slice allocator + // overhead which can be significant (30ms+ for 33 slices at 512MB). + size_t total_staging = 0; + std::vector needs_staging(slices.size(), false); + for (size_t i = 0; i < slices.size(); ++i) { + if (!engine_.isLocalMemoryRegistered(slices[i].ptr, slices[i].size)) { + if (slices[i].size > + std::numeric_limits::max() - total_staging) { + LOG(ERROR) << "RDMA staging size overflow"; + return std::make_pair(std::vector{}, + std::vector{}); + } + needs_staging[i] = true; + total_staging += slices[i].size; + } + } + + // Fast path: all slices already registered — still need kMaxSliceSize + // splitting because callers (e.g. multi_buffers) may pass large slices. + if (total_staging == 0) { + std::vector split; + split.reserve(slices.size()); + for (const auto& s : slices) { + auto sub = split_into_slices(s.ptr, s.size); + split.insert(split.end(), sub.begin(), sub.end()); + } + return std::make_pair(std::move(split), std::vector{}); + } + + if (!staging_allocator_) { + LOG(ERROR) << "No staging allocator for unregistered RDMA slices"; + return std::make_pair(std::vector{}, + std::vector{}); + } + + // Allocate ONE contiguous staging buffer for all unregistered data. + auto staging_alloc = staging_allocator_->allocate(total_staging); + if (!staging_alloc) { + LOG(ERROR) << "Staging alloc failed, total_size=" << total_staging; + return std::make_pair(std::vector{}, + std::vector{}); + } + + char* staging_base = static_cast(staging_alloc->ptr()); + size_t staging_offset = 0; + + std::vector prepared; + std::vector staging_handles; + prepared.reserve(slices.size()); + + // Sync path (default): copy each unregistered slice into the contiguous + // staging buffer. MemcpySafe auto-detects GPU vs CPU pointers. + // Staged slices are split by kMaxSliceSize to match RDMA transfer limits. + for (size_t i = 0; i < slices.size(); ++i) { + if (!needs_staging[i]) { + // Already registered — split by kMaxSliceSize for RDMA limits. + auto sub = split_into_slices(slices[i].ptr, slices[i].size); + prepared.insert(prepared.end(), sub.begin(), sub.end()); + } else { + void* staged_ptr = staging_base + staging_offset; + staging_offset += slices[i].size; + if (!gpu_staging::MemcpySafe(staged_ptr, slices[i].ptr, + slices[i].size)) { + LOG(ERROR) << "MemcpySafe failed during RDMA staging"; + return std::make_pair(std::vector{}, + std::vector{}); + } + // Split staged data by kMaxSliceSize for RDMA transfer limits. + auto sub = split_into_slices(staged_ptr, slices[i].size); + prepared.insert(prepared.end(), sub.begin(), sub.end()); + } + } + staging_handles.push_back(std::move(*staging_alloc)); + return {std::move(prepared), std::move(staging_handles)}; +} + std::optional TransferSubmitter::submit( const Replica::Descriptor& replica, std::vector& slices, TransferRequest::OpCode op_code, void* ptr, size_t size) { @@ -969,10 +1050,25 @@ std::optional TransferSubmitter::submit( case TransferStrategy::LOCAL_MEMCPY: future = submitMemcpyOperation(handle, slices, op_code); break; - case TransferStrategy::TRANSFER_ENGINE: - future = - submitTransferEngineOperation(handle, slices, op_code); + case TransferStrategy::TRANSFER_ENGINE: { + if (handle.protocol_ == "rdma") { + auto [prepared, staging] = + ensureRegisteredForRDMA(slices); + if (prepared.empty() && !slices.empty()) { + LOG(ERROR) << "ensureRegisteredForRDMA failed"; + return std::nullopt; + } + future = submitTransferEngineOperation(handle, prepared, + op_code); + if (future && !staging.empty()) { + future->attachStagingHandles(std::move(staging)); + } + } else { + future = submitTransferEngineOperation(handle, slices, + op_code); + } break; + } default: LOG(ERROR) << "Unknown transfer strategy: " << strategy; return std::nullopt; @@ -1008,16 +1104,38 @@ std::optional TransferSubmitter::submit_batch( const std::vector& replicas, std::vector>& all_slices, TransferRequest::OpCode op_code) { + // NOTE: submit_batch always goes through TRANSFER_ENGINE (RDMA), never + // LOCAL_MEMCPY. The selectStrategy dispatch is in submit() for single- + // replica transfers; BatchPut/BatchUpsert only call submit_batch when + // all replicas are remote (memory-backed, non-local). std::optional future; std::vector requests; + std::vector all_staging_handles; for (size_t i = 0; i < replicas.size(); ++i) { auto& replica = replicas[i]; auto& slices = all_slices[i]; auto& mem_desc = replica.get_memory_descriptor(); - if (!validateTransferParams(mem_desc.buffer_descriptor, slices)) { + auto& handle = mem_desc.buffer_descriptor; + if (!validateTransferParams(handle, slices)) { return std::nullopt; } - auto& handle = mem_desc.buffer_descriptor; + + // For RDMA WRITE ops, ensure slices are in registered memory. + const std::vector* effective_slices = &slices; + std::vector prepared; + if (op_code == TransferRequest::WRITE && handle.protocol_ == "rdma") { + auto [p, staging] = ensureRegisteredForRDMA(slices); + if (p.empty() && !slices.empty()) { + LOG(ERROR) << "ensureRegisteredForRDMA failed in submit_batch"; + return std::nullopt; + } + prepared = std::move(p); + effective_slices = &prepared; + for (auto& h : staging) { + all_staging_handles.push_back(std::move(h)); + } + } + uint64_t offset = 0; SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_); if (seg == static_cast(ERR_INVALID_ARGUMENT)) { @@ -1025,7 +1143,7 @@ std::optional TransferSubmitter::submit_batch( << handle.transport_endpoint_; return std::nullopt; } - for (auto slice : slices) { + for (const auto& slice : *effective_slices) { TransferRequest request; request.opcode = op_code; request.source = static_cast(slice.ptr); @@ -1039,6 +1157,9 @@ std::optional TransferSubmitter::submit_batch( future = submitTransfer(requests); // Update metrics on successful submission if (future.has_value()) { + if (!all_staging_handles.empty()) { + future->attachStagingHandles(std::move(all_staging_handles)); + } for (auto& slices : all_slices) { updateTransferMetrics(slices, op_code); } @@ -1091,39 +1212,63 @@ std::optional TransferSubmitter::submitMemcpyOperation( const TransferRequest::OpCode op_code, uint64_t src_offset) { auto state = std::make_shared(); - // Create memcpy operations - std::vector operations; - operations.reserve(slices.size()); uint64_t base_address = static_cast(handle.buffer_address_); uint64_t offset = src_offset; + int base_dev = -1; + bool base_on_gpu = false; + if (base_address != 0) { + base_on_gpu = gpu_staging::IsDevicePointer( + reinterpret_cast(base_address), &base_dev); + } + + // Execute memcpy inline on the calling thread instead of going through + // the worker pool. This eliminates thread synchronization overhead and + // keeps data on the same NUMA node. The worker pool (1 thread) added + // ~30ms overhead for 512MB on eRDMA hardware. + bool ok = true; for (size_t i = 0; i < slices.size(); ++i) { const auto& slice = slices[i]; - - if (slice.ptr == nullptr) continue; + if (slice.ptr == nullptr || slice.size == 0) continue; void* dest; const void* src; - if (op_code == TransferRequest::READ) { - // READ: from handle (remote buffer) to slice (local buffer) dest = slice.ptr; src = reinterpret_cast(base_address + offset); } else { - // WRITE: from slice (local buffer) to handle (remote buffer) dest = reinterpret_cast(base_address + offset); src = slice.ptr; } offset += slice.size; - operations.emplace_back(dest, src, slice.size); + int slice_dev = -1; + bool slice_on_gpu = gpu_staging::IsDevicePointer(slice.ptr, &slice_dev); + + bool src_on_gpu = + (op_code == TransferRequest::READ) ? base_on_gpu : slice_on_gpu; + int src_dev = (op_code == TransferRequest::READ) ? base_dev : slice_dev; + bool dst_on_gpu = + (op_code == TransferRequest::READ) ? slice_on_gpu : base_on_gpu; + int dst_dev = (op_code == TransferRequest::READ) ? slice_dev : base_dev; + + if (!src_on_gpu && !dst_on_gpu) { + std::memcpy(dest, src, slice.size); + } else { + int dev = src_on_gpu ? src_dev : dst_dev; + gpu_staging::DeviceGuard guard(dev); + if (!gpu_staging::CopyAuto(dest, src, slice.size)) { + LOG(ERROR) << "GPU memcpy failed: src_dev=" << src_dev + << " dst_dev=" << dst_dev << " size=" << slice.size; + ok = false; + break; + } + } } - // Submit memcpy operations to worker pool for async execution - MemcpyTask task(std::move(operations), state); - memcpy_pool_->submitTask(std::move(task)); + state->set_completed(ok ? ErrorCode::OK : ErrorCode::TRANSFER_FAIL); - VLOG(1) << "Memcpy transfer submitted to worker pool with " << slices.size() + VLOG(1) << "Memcpy transfer completed inline with " << slices.size() << " operations"; return TransferFuture(state); @@ -1317,18 +1462,10 @@ std::optional TransferSubmitter::submitFileReadOperation( TransferStrategy TransferSubmitter::selectStrategy( const AllocatedBuffer::Descriptor& handle, const std::vector& slices) const { - // Check if memcpy operations are enabled via environment variable - if (!memcpy_enabled_) { - VLOG(2) << "Memcpy operations disabled via MC_STORE_MEMCPY environment " - "variable"; - return TransferStrategy::TRANSFER_ENGINE; - } - - // Check conditions for local memcpy optimization - if (isLocalTransfer(handle)) { + (void)slices; + if (memcpy_enabled_ && isLocalTransfer(handle)) { return TransferStrategy::LOCAL_MEMCPY; } - return TransferStrategy::TRANSFER_ENGINE; } diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 80a31edc98..3b6e904246 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -184,6 +184,8 @@ class TransferEngine { bool checkOverlap(void* addr, uint64_t length); + bool isLocalMemoryRegistered(const void* addr, size_t length); + void setAutoDiscover(bool auto_discover); void* getBaseAddr(); diff --git a/mooncake-transfer-engine/include/transfer_engine_impl.h b/mooncake-transfer-engine/include/transfer_engine_impl.h index b1e4fff7e8..976a5ef2f8 100644 --- a/mooncake-transfer-engine/include/transfer_engine_impl.h +++ b/mooncake-transfer-engine/include/transfer_engine_impl.h @@ -361,6 +361,8 @@ class TransferEngineImpl { bool checkOverlap(void* addr, uint64_t length); + bool isLocalMemoryRegistered(const void* addr, size_t length); + #ifdef ENABLE_MULTI_PROTOCOL struct RegisteredRecord { Transport* transport; diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 3747244e76..77176d7259 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -204,6 +204,10 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return impl_->checkOverlap(addr, length); } +bool TransferEngine::isLocalMemoryRegistered(const void* addr, size_t length) { + return impl_->isLocalMemoryRegistered(addr, length); +} + void TransferEngine::setAutoDiscover(bool auto_discover) { impl_->setAutoDiscover(auto_discover); } @@ -637,6 +641,11 @@ bool TransferEngine::checkOverlap(void* addr, uint64_t length) { return false; } +bool TransferEngine::isLocalMemoryRegistered(const void* addr, size_t length) { + if (!use_tent_) return impl_->isLocalMemoryRegistered(addr, length); + return false; // Conservative for TENT mode +} + void TransferEngine::setAutoDiscover(bool auto_discover) { if (!use_tent_) impl_->setAutoDiscover(auto_discover); } diff --git a/mooncake-transfer-engine/src/transfer_engine_impl.cpp b/mooncake-transfer-engine/src/transfer_engine_impl.cpp index 83960ad575..23e239f18f 100644 --- a/mooncake-transfer-engine/src/transfer_engine_impl.cpp +++ b/mooncake-transfer-engine/src/transfer_engine_impl.cpp @@ -561,6 +561,19 @@ bool TransferEngineImpl::checkOverlap(void* addr, uint64_t length) { return hasOverlapLocked(reinterpret_cast(addr), length); } +bool TransferEngineImpl::isLocalMemoryRegistered(const void* addr, + size_t length) { + if (length == 0) return true; + std::shared_lock lock(mutex_); + auto it = findMemoryRegionContaining(reinterpret_cast(addr)); + if (it == local_memory_regions_.end()) return false; + + uintptr_t start = reinterpret_cast(addr); + uintptr_t region_start = reinterpret_cast(it->second.addr); + size_t offset = start - region_start; + return offset <= it->second.length && length <= it->second.length - offset; +} + int TransferEngineImpl::registerLocalMemory(void* addr, size_t length, const std::string& location, bool remote_accessible,