Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 72 additions & 42 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "integration_utils.h"
#include "buffer_pool.h"
#include "gpu_staging_utils.h"

// Forward declaration for EngramStore bindings
namespace mooncake {
Expand Down Expand Up @@ -798,15 +799,46 @@ class MooncakeStorePyWrapper {
const std::vector<std::string> &keys,
const std::vector<PyTensorInfo> &infos,
const ReplicateConfig &config = ReplicateConfig{}) {
return batch_write_tensor_impl(
keys, infos, config, "put",
[this](const std::vector<std::string> &write_keys,
const std::vector<void *> &buffer_ptrs,
const std::vector<size_t> &buffer_sizes,
const ReplicateConfig &write_config) {
return store_->batch_put_from(write_keys, buffer_ptrs,
buffer_sizes, write_config);
});
auto group_ids_error =
ValidateGroupIdsForBatchConfig(config, keys.size(), "put");
if (!group_ids_error.empty()) return group_ids_error;

std::vector<int> results(keys.size(), 0);
{
Comment on lines +802 to +807

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending batch_write_tensor_impl is better for readability.

py::gil_scoped_release release_gil;

std::vector<std::string> valid_keys;
std::vector<std::vector<void *>> all_buffers;
std::vector<std::vector<size_t>> all_sizes;
std::vector<size_t> original_indices;

for (size_t i = 0; i < infos.size(); ++i) {
if (!infos[i].valid()) {
results[i] = to_py_ret(ErrorCode::INVALID_PARAMS);
continue;
}
valid_keys.push_back(keys[i]);
all_buffers.push_back(
{const_cast<void *>(
reinterpret_cast<const void *>(&infos[i].metadata)),
reinterpret_cast<void *>(infos[i].data_ptr)});
all_sizes.push_back({infos[i].metadata.header.data_offset,
infos[i].tensor_size});
original_indices.push_back(i);
}

if (!valid_keys.empty()) {
ReplicateConfig write_config =
MakeIndexedConfig(config, original_indices);
std::vector<int> op_results =
store_->batch_put_from_multi_buffers(
valid_keys, all_buffers, all_sizes, write_config);
for (size_t i = 0; i < op_results.size(); ++i) {
results[original_indices[i]] = op_results[i];
}
}
}
return results;
}

std::vector<int> batch_put_tensor_impl(
Expand Down Expand Up @@ -1366,55 +1398,34 @@ class MooncakeStorePyWrapper {
results[i] = to_py_ret(ErrorCode::INVALID_PARAMS);
}

// 2. Prepare Buffers and Execute (GIL Released)
// 2. Zero-copy: pass metadata and data as separate buffers
{
py::gil_scoped_release release_gil;

std::vector<std::string> valid_keys;
std::vector<void *> buffer_ptrs;
std::vector<size_t> buffer_sizes;
std::vector<std::vector<void *>> all_buffers;
std::vector<std::vector<size_t>> all_sizes;
std::vector<size_t> original_indices;

std::vector<std::unique_ptr<BufferHandle>> temp_allocations;

for (size_t i = 0; i < infos.size(); ++i) {
if (!infos[i].valid()) continue;

size_t total_size =
sizeof(TensorMetadata) + infos[i].tensor_size;
auto alloc_result =
store_->client_buffer_allocator_->allocate(total_size);

if (!alloc_result) {
LOG(ERROR)
<< "Failed to allocate buffer for key: " << keys[i];
results[i] = to_py_ret(ErrorCode::INVALID_PARAMS);
continue;
}

// Copy Metadata & Data
char *dst = static_cast<char *>(alloc_result->ptr());
memcpy(dst, &infos[i].metadata, sizeof(TensorMetadata));
if (infos[i].tensor_size > 0) {
memcpy(dst + sizeof(TensorMetadata),
reinterpret_cast<void *>(infos[i].data_ptr),
infos[i].tensor_size);
}

valid_keys.push_back(keys[i]);
buffer_ptrs.push_back(alloc_result->ptr());
buffer_sizes.push_back(total_size);
all_buffers.push_back(
{const_cast<void *>(
reinterpret_cast<const void *>(&infos[i].metadata)),
reinterpret_cast<void *>(infos[i].data_ptr)});
all_sizes.push_back({infos[i].metadata.header.data_offset,
infos[i].tensor_size});
original_indices.push_back(i);

temp_allocations.push_back(
std::make_unique<BufferHandle>(std::move(*alloc_result)));
}

if (!valid_keys.empty()) {
ReplicateConfig write_config =
MakeIndexedConfig(config, original_indices);
std::vector<int> op_results = store_->batch_upsert_from(
valid_keys, buffer_ptrs, buffer_sizes, write_config);
std::vector<int> op_results =
store_->batch_upsert_from_multi_buffers(
valid_keys, all_buffers, all_sizes, write_config);
for (size_t i = 0; i < op_results.size(); ++i) {
results[original_indices[i]] = op_results[i];
}
Expand Down Expand Up @@ -2504,6 +2515,25 @@ PYBIND11_MODULE(store, m) {
py::arg("config") = ReplicateConfig{},
"Upsert object data directly from pre-allocated buffers for "
"multiple keys")
.def(
"batch_upsert_from_multi_buffers",
[](MooncakeStorePyWrapper &self,
const std::vector<std::string> &keys,
const std::vector<std::vector<uintptr_t>> &all_buffer_ptrs,
const std::vector<std::vector<size_t>> &all_sizes,
const ReplicateConfig &config = ReplicateConfig{}) {
if (!self.is_client_initialized()) {
LOG(ERROR) << "Client is not initialized";
return std::vector<int>{};
}
py::gil_scoped_release release;
return self.store_->batch_upsert_from_multi_buffers(
keys, CastAddrs2Ptrs(all_buffer_ptrs), all_sizes, config);
},
py::arg("keys"), py::arg("all_buffer_ptrs"), py::arg("all_sizes"),
py::arg("config") = ReplicateConfig{},
"Upsert object data directly from multiple pre-allocated buffers "
"for multiple keys")
.def(
"upsert",
[](MooncakeStorePyWrapper &self, const std::string &key,
Expand Down
2 changes: 1 addition & 1 deletion mooncake-integration/store/store_py_parallel_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ std::vector<bool> execute_tensor_into_plan_transfers(
continue;
}
if (plans[plan_idx].materialized_metadata.has_value()) {
std::memcpy(
gpu_staging::MemcpySafe(
reinterpret_cast<void *>(plans[plan_idx].user_buffer_ptr),
&*plans[plan_idx].materialized_metadata,
sizeof(TensorMetadata));
Expand Down
7 changes: 4 additions & 3 deletions mooncake-integration/store/store_py_parallel_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ std::vector<int> batch_write_tensor_impl(const std::vector<std::string> &keys,
std::memcpy(dst, &infos[i].metadata,
infos[i].metadata.header.data_offset);
if (infos[i].tensor_size > 0) {
std::memcpy(dst + infos[i].metadata.header.data_offset,
reinterpret_cast<void *>(infos[i].data_ptr),
infos[i].tensor_size);
gpu_staging::MemcpySafe(
dst + infos[i].metadata.header.data_offset,
reinterpret_cast<void *>(infos[i].data_ptr),
infos[i].tensor_size);
}

valid_keys.push_back(keys[i]);
Expand Down
12 changes: 11 additions & 1 deletion mooncake-store/include/client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ class Client {
public:
virtual ~Client();

/**
* @brief Reinitialize TransferSubmitter with a staging allocator.
*
* Called by RealClient after its client buffer allocator is ready,
* so that ensureRegisteredForRDMA can stage unregistered user buffers.
*/
void setStagingAllocator(
std::shared_ptr<ClientBufferAllocator> staging_allocator);

const UUID& getClientId() const { return client_id_; }
const std::string& tenant_id() const { return master_client_.tenant_id(); }

Expand Down Expand Up @@ -668,7 +677,8 @@ class Client {
const std::string& local_hostname,
const std::string& metadata_connstring, const std::string& protocol,
const std::optional<std::string>& device_names);
void InitTransferSubmitter();
void InitTransferSubmitter(
std::shared_ptr<ClientBufferAllocator> staging_allocator = nullptr);
ErrorCode TransferData(const Replica::Descriptor& replica_descriptor,
std::vector<Slice>& slices,
TransferRequest::OpCode op_code);
Expand Down
156 changes: 156 additions & 0 deletions mooncake-store/include/gpu_staging_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
#endif

#include <cstddef>
#include <cstdlib>
#include <cstring>
#include <limits>
#include <mutex>
#include <string>
#include <unordered_map>
#include <glog/logging.h>

namespace mooncake {
Expand Down Expand Up @@ -107,6 +113,51 @@ inline void SetDevice(int device_id) {
#endif
}

class DeviceGuard {
public:
explicit DeviceGuard(int device_id) {
if (device_id < 0) return;
#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \
defined(USE_HYGON) || defined(USE_COREX)
if (cudaGetDevice(&previous_device_) == cudaSuccess) {
active_ = true;
}
cudaSetDevice(device_id);
#elif defined(USE_HIP)
if (hipGetDevice(&previous_device_) == hipSuccess) {
active_ = true;
}
hipSetDevice(device_id);
#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM)
if (aclrtGetDevice(&previous_device_) == ACL_SUCCESS) {
active_ = true;
}
aclrtSetDevice(device_id);
#else
(void)device_id;
#endif
}

~DeviceGuard() {
if (!active_) return;
#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \
defined(USE_HYGON) || defined(USE_COREX)
cudaSetDevice(previous_device_);
#elif defined(USE_HIP)
hipSetDevice(previous_device_);
#elif defined(USE_ASCEND) || defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM)
aclrtSetDevice(previous_device_);
#endif
}

DeviceGuard(const DeviceGuard&) = delete;
DeviceGuard& operator=(const DeviceGuard&) = delete;

private:
int previous_device_{-1};
bool active_{false};
};

// Copy host memory to device. Caller must have called SetDevice first.
inline bool CopyHostToDevice(void* dst, const void* src, size_t size) {
#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \
Expand Down Expand Up @@ -162,5 +213,110 @@ inline bool IsHostPointer(const void* ptr) {
return true; // CPU-only build: all pointers are host
#endif
}
/// GPU-safe memcpy: auto-detects pointer types and dispatches to CopyAuto
/// for GPU pointers, std::memcpy for host pointers.
inline bool MemcpySafe(void* dst, const void* src, size_t size) {
if (size == 0) return true;
int src_dev = -1, dst_dev = -1;
bool src_gpu = IsDevicePointer(src, &src_dev);
bool dst_gpu = IsDevicePointer(dst, &dst_dev);
if (!src_gpu && !dst_gpu) {
std::memcpy(dst, src, size);
return true;
}
int dev = src_gpu ? src_dev : dst_dev;
DeviceGuard guard(dev);
return CopyAuto(dst, src, size);
}
Comment thread
zxpdemonio marked this conversation as resolved.

inline bool PinMemoryEnabled() {
const char* pin_env = std::getenv("MC_STORE_PIN_MEMORY");
return !(pin_env &&
(std::string(pin_env) == "0" || std::string(pin_env) == "false"));
}

inline size_t PinMemoryMaxBytes() {
const char* max_env = std::getenv("MC_STORE_PIN_MEMORY_MAX_BYTES");
if (!max_env || max_env[0] == '\0') return 0;
char* end = nullptr;
unsigned long long value = std::strtoull(max_env, &end, 10);
if (end == max_env) return 0;
if (value > std::numeric_limits<size_t>::max()) {
return std::numeric_limits<size_t>::max();
}
return static_cast<size_t>(value);
}

std::mutex& PinnedHostMemoryMutex();

size_t& PinnedHostMemoryBytes();

std::unordered_map<void*, size_t>& PinnedHostMemoryRegions();

inline bool TryPinHostMemory(void* ptr, size_t size, const char* name) {
#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \
defined(USE_HYGON) || defined(USE_COREX)
if (!PinMemoryEnabled() || ptr == nullptr || size == 0) return false;

std::lock_guard<std::mutex> lock(PinnedHostMemoryMutex());
auto& regions = PinnedHostMemoryRegions();
if (regions.find(ptr) != regions.end()) return true;

size_t max_bytes = PinMemoryMaxBytes();
size_t current = PinnedHostMemoryBytes();
if (max_bytes > 0 && (current > max_bytes || size > max_bytes - current)) {
LOG(WARNING) << "Skip cudaHostRegister for " << name << " size=" << size
<< " because MC_STORE_PIN_MEMORY_MAX_BYTES=" << max_bytes
<< " current_pinned=" << current;
return false;
}

auto cuda_ret = cudaHostRegister(ptr, size, 0);
if (cuda_ret != cudaSuccess) {
LOG(WARNING) << "cudaHostRegister failed for " << name
<< " size=" << size << ": " << cudaGetErrorString(cuda_ret)
<< "; GPU copies will use pageable fallback";
return false;
}

PinnedHostMemoryBytes() += size;
regions[ptr] = size;
LOG(INFO) << "cudaHostRegister OK for " << name << ", size=" << size;
return true;
#else
(void)ptr;
(void)size;
(void)name;
return false;
#endif
}

inline void UnpinHostMemory(void* ptr, const char* name) {
#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_MACA) || \
defined(USE_HYGON) || defined(USE_COREX)
if (ptr == nullptr) return;

std::lock_guard<std::mutex> lock(PinnedHostMemoryMutex());
auto& regions = PinnedHostMemoryRegions();
auto it = regions.find(ptr);
if (it == regions.end()) return;

size_t size = it->second;
auto cuda_ret = cudaHostUnregister(ptr);
if (cuda_ret != cudaSuccess) {
LOG(WARNING) << "cudaHostUnregister failed for " << name
<< " size=" << size << ": "
<< cudaGetErrorString(cuda_ret);
return;
}

regions.erase(it);
PinnedHostMemoryBytes() -= size;
#else
(void)ptr;
(void)name;
#endif
}

} // namespace gpu_staging
} // namespace mooncake
Loading
Loading