Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 331 additions & 0 deletions docs/source/design/ssd-prefetch.md

Large diffs are not rendered by default.

57 changes: 44 additions & 13 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,15 @@ PYBIND11_MODULE(store, m) {
return oss.str();
});

py::class_<ExistOptions>(m, "ExistOptions")
.def(py::init<>())
.def_readwrite("prefetch_to_memory", &ExistOptions::prefetch_to_memory)
.def("__str__", [](const ExistOptions &options) {
std::ostringstream oss;
oss << options;
return oss.str();
});

py::enum_<ReplicaStatus>(m, "ReplicaStatus")
.value("UNDEFINED", ReplicaStatus::UNDEFINED)
.value("INITIALIZED", ReplicaStatus::INITIALIZED)
Expand Down Expand Up @@ -2028,7 +2037,11 @@ PYBIND11_MODULE(store, m) {
const py::object &engine = py::none(),
bool enable_ssd_offload = false,
const std::string &ssd_offload_path = "",
const std::string &tenant_id = "default") {
const std::string &tenant_id = "default",
int64_t ssd_prefetch_cooldown_sec =
DEFAULT_SSD_PREFETCH_COOLDOWN_SEC,
int64_t ssd_prefetch_dedup_ttl_sec =
DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) {
auto real_client = self.init_real_client();
std::shared_ptr<mooncake::TransferEngine> transfer_engine =
nullptr;
Expand All @@ -2040,14 +2053,19 @@ PYBIND11_MODULE(store, m) {
local_hostname, metadata_server, global_segment_size,
local_buffer_size, protocol, rdma_devices,
master_server_addr, transfer_engine, "", enable_ssd_offload,
ssd_offload_path, tenant_id);
ssd_offload_path, tenant_id, ssd_prefetch_cooldown_sec,
ssd_prefetch_dedup_ttl_sec);
},
py::arg("local_hostname"), py::arg("metadata_server"),
py::arg("global_segment_size"), py::arg("local_buffer_size"),
py::arg("protocol"), py::arg("rdma_devices"),
py::arg("master_server_addr"), py::arg("engine") = py::none(),
py::arg("enable_ssd_offload") = false,
py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default")
py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default",
py::arg("ssd_prefetch_cooldown_sec") =
DEFAULT_SSD_PREFETCH_COOLDOWN_SEC,
py::arg("ssd_prefetch_dedup_ttl_sec") =
DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC)
.def(
"setup",
[](MooncakeStorePyWrapper &self, const py::dict &config_dict) {
Expand Down Expand Up @@ -2078,7 +2096,14 @@ PYBIND11_MODULE(store, m) {
" ipc_socket_path: IPC socket path.\n"
" enable_ssd_offload: Enable SSD offload (default false).\n"
" ssd_offload_path: SSD storage directory path (overrides env "
"var).")
"var).\n"
" ssd_prefetch_cooldown_sec: SSD prefetch memory-pressure backoff "
"in seconds (default 5, 0 disables).\n"
" ssd_prefetch_dedup_ttl_sec: SSD prefetch per-key dedup/"
"rate-limit TTL in seconds (default 30, 0 disables).\n"
" ssd_get_wait_ms: get-side prefetch wait budget in milliseconds "
"(default 10, poll 1ms with early exit; 0 disables). Env "
"MOONCAKE_SSD_GET_WAIT_MS overrides this.")
.def(
"setup_dummy",
[](MooncakeStorePyWrapper &self, size_t mem_pool_size,
Expand Down Expand Up @@ -2184,20 +2209,26 @@ PYBIND11_MODULE(store, m) {
py::arg("keys"), py::arg("force") = false,
"Batch remove objects by keys. Returns a list of status codes "
"(0=success, negative=error code) for each key.")
.def("is_exist",
[](MooncakeStorePyWrapper &self, const std::string &key) {
py::gil_scoped_release release;
return self.store_->isExist(key);
})
.def(
"is_exist",
[](MooncakeStorePyWrapper &self, const std::string &key,
const ExistOptions &options) {
py::gil_scoped_release release;
return self.store_->isExist(key, options);
},
py::arg("key"), py::arg("options") = ExistOptions{})
.def(
"batch_is_exist",
[](MooncakeStorePyWrapper &self,
const std::vector<std::string> &keys) {
const std::vector<std::string> &keys,
const ExistOptions &options) {
py::gil_scoped_release release;
return self.store_->batchIsExist(keys);
return self.store_->batchIsExist(keys, options);
},
py::arg("keys"),
"Check if multiple objects exist. Returns list of results: 1 if "
py::arg("keys"), py::arg("options") = ExistOptions{},
"Check if multiple objects exist. When options.prefetch_to_memory "
"is True, triggers SSD-to-DRAM promotion for keys that only have "
"LOCAL_DISK replicas. Returns list of results: 1 if "
"exists, 0 if not exists, -1 if error")
.def("close",
[](MooncakeStorePyWrapper &self) {
Expand Down
17 changes: 17 additions & 0 deletions mooncake-store/include/client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ class Client {
*/
tl::expected<QueryResult, ErrorCode> Query(const std::string& object_key);

/**
* @brief Read-only replica metadata for SSD prefetch (no lease / no
* promotion-on-hit side effects).
*/
tl::expected<QueryResult, ErrorCode> QueryForPrefetch(
const std::string& object_key);

std::vector<tl::expected<QueryResult, ErrorCode>> BatchQueryForPrefetch(
const std::vector<std::string>& object_keys);

/**
* @brief Register a master-side promotion task for SSD prefetch without
* using the promotion-on-hit heartbeat queue.
*/
tl::expected<void, ErrorCode> RegisterPrefetchTask(
const std::string& object_key);

/**
* @brief Queries replica lists for object keys that match a regex pattern.
* @param str The regular expression string to match against object keys.
Expand Down
28 changes: 16 additions & 12 deletions mooncake-store/include/dummy_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ class DummyClient : public PyClient {

int64_t unregister_shm();

int setup_real(const std::string &local_hostname,
const std::string &metadata_server,
size_t global_segment_size, size_t local_buffer_size,
const std::string &protocol, const std::string &rdma_devices,
const std::string &master_server_addr,
const std::shared_ptr<TransferEngine> &transfer_engine,
const std::string &ipc_socket_path,
bool enable_ssd_offload = false,
const std::string &ssd_offload_path = "",
const std::string &tenant_id = "default") {
int setup_real(
const std::string &local_hostname, const std::string &metadata_server,
size_t global_segment_size, size_t local_buffer_size,
const std::string &protocol, const std::string &rdma_devices,
const std::string &master_server_addr,
const std::shared_ptr<TransferEngine> &transfer_engine,
const std::string &ipc_socket_path, bool enable_ssd_offload = false,
const std::string &ssd_offload_path = "",
const std::string &tenant_id = "default",
int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC,
int64_t ssd_prefetch_dedup_ttl_sec =
DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) {
// Dummy client does not support real setup
return -1;
};
Expand Down Expand Up @@ -147,9 +149,11 @@ class DummyClient : public PyClient {
std::vector<int> batchRemove(const std::vector<std::string> &keys,
bool force = false);

int isExist(const std::string &key);
int isExist(const std::string &key,
const ExistOptions &options = ExistOptions{});

std::vector<int> batchIsExist(const std::vector<std::string> &keys);
std::vector<int> batchIsExist(const std::vector<std::string> &keys,
const ExistOptions &options = ExistOptions{});

int64_t getSize(const std::string &key);

Expand Down
28 changes: 28 additions & 0 deletions mooncake-store/include/file_storage.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#pragma once

#include <functional>

#include "client_service.h"
#include "client_buffer.hpp"
#include "storage_backend.h"
#include "pinned_buffer_pool.h"

#include <functional>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The header is included twice in this file (at line 3 and line 10). Please remove the duplicate include at line 10.


namespace mooncake {

struct SsdMetric;
Expand Down Expand Up @@ -41,6 +45,30 @@ class FileStorage {

FileStorageConfig config_;

/**
* @brief Promote a set of keys from local SSD to DRAM.
*
* For each key the method allocates a staging buffer, reads the data from
* the local SSD backend, transfers it to a freshly-allocated MEMORY
* replica via Transfer Engine, and commits the result on the master.
* Failures are logged per-key but do not propagate; this is best-effort.
*
* @param keys Keys to promote (must already have LOCAL_DISK replicas)
* @param sizes Corresponding object sizes in bytes
* @param dram_pressure Optional out-flag. Set to true if any key failed to
* promote because DRAM was saturated (NO_AVAILABLE_HANDLE), so the
* caller can back off prefetch and let eviction/offload reclaim
* memory. Never reset to false; caller initializes it.
* @return void on success, ErrorCode on critical (pre-loop) failure
*/
using PrefetchKeyCallback =
std::function<void(const std::string& key, bool success)>;

tl::expected<void, ErrorCode> PrefetchKeys(
const std::vector<std::string>& keys, const std::vector<int64_t>& sizes,
bool* dram_pressure = nullptr,
PrefetchKeyCallback on_key_done = nullptr);

/**
* @brief Releases buffer associated with a specific batch_id.
* Called by remote client after transfer completion.
Expand Down
32 changes: 32 additions & 0 deletions mooncake-store/include/master_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@ class MasterClient {
[[nodiscard]] tl::expected<GetReplicaListResponse, ErrorCode>
GetReplicaList(const std::string& object_key, const std::string& tenant_id);

/**
* @brief Read-only replica metadata query for best-effort SSD prefetch.
*
* Unlike GetReplicaList, this does NOT grant a lease, update the access
* sketch, bump cache-hit / valid-get counters, or enqueue a promotion. It
* is purely used to decide whether a key is SSD-only and worth prefetching,
* so probing must not perturb eviction/offload pacing or metrics.
* @param object_key Key to query.
* @return Replica list on success, or an ErrorCode on failure.
*/
[[nodiscard]] tl::expected<GetReplicaListResponse, ErrorCode>
GetReplicaListForPrefetch(const std::string& object_key);

[[nodiscard]] std::vector<tl::expected<GetReplicaListResponse, ErrorCode>>
BatchGetReplicaListForPrefetch(
const std::vector<std::string>& object_keys);

/**
* @brief Retrieves replica lists for object keys that match a regex
* pattern.
Expand Down Expand Up @@ -433,6 +450,21 @@ class MasterClient {
const UUID& client_id, const std::vector<OffloadTaskItem>& tasks,
const std::vector<StorageObjectMetadata>& metadatas);

/**
* @brief Registers a prefetch (SSD->DRAM promotion) task on the master.
*
* Creates a promotion_tasks entry consumed by PromotionAllocStart, but
* deliberately does NOT go through the promotion admission gate
* (TryPushPromotionQueue) and does NOT push to the holder's
* promotion_objects heartbeat queue. This keeps the dedicated prefetch path
* separate from promotion-on-hit and avoids double-promoting the same key.
* @param client_id The UUID of the client requesting the prefetch.
* @param key The object key to promote from SSD to DRAM.
* @return An empty expected on success, or an ErrorCode on failure.
*/
[[nodiscard]] tl::expected<void, ErrorCode> RegisterPrefetchTask(
const UUID& client_id, const std::string& key);

/**
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for a
* client. Returns tenant-scoped tasks the caller should read from local
Expand Down
22 changes: 22 additions & 0 deletions mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,26 @@ class MasterService {
auto GetReplicaList(const std::string& key, const std::string& tenant_id)
-> tl::expected<GetReplicaListResponse, ErrorCode>;

/**
* @brief Read-only replica metadata for SSD prefetch.
*
* Unlike GetReplicaList, does not grant a lease, bump access metrics, or
* enqueue promotion-on-hit work.
*/
auto GetReplicaListForPrefetch(const std::string& key)
-> tl::expected<GetReplicaListResponse, ErrorCode>;

/**
* @brief Register an in-flight promotion task for SSD prefetch.
*
* Records a PromotionTask on the master without the promotion-on-hit
* frequency/watermark gates and without pushing onto the holder client's
* promotion_objects heartbeat queue. The caller must execute the transfer
* via FileStorage::PrefetchKeys (PromotionAllocStart path).
*/
auto RegisterPrefetchTask(const UUID& client_id, const std::string& key)
-> tl::expected<void, ErrorCode>;

/**
* @brief Start a put operation for an object
* @param[out] replica_list Vector to store replica information for the
Expand Down Expand Up @@ -1137,6 +1157,8 @@ class MasterService {
uint64_t object_size;
std::chrono::system_clock::time_point start_time;
UUID holder_id; // owner of source LOCAL_DISK; only Notifier allowed
bool from_prefetch{
false}; // set by RegisterPrefetchTask; enables protect lease
};

static constexpr size_t kNumShards = 1024; // Number of metadata shards
Expand Down
26 changes: 23 additions & 3 deletions mooncake-store/include/pyclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ class ClientRequester {
void release_offload_buffer(const std::string &client_addr,
uint64_t batch_id);

/**
* @brief Asks a remote holder node to prefetch (promote SSD->DRAM) the
* given keys. Used for cross-node SSD prefetch: the holder must register
* the promotion task itself (Master validates holder_id == client_id), so
* the requester delegates via this RPC instead of calling
* RegisterPrefetchTask locally. Best-effort: errors are logged, not
* propagated, and never block the originating exist() call.
* @param client_addr Network address of the holder's offload RPC service.
* @param keys SSD-only keys held by the remote node.
* @param sizes Object sizes (bytes), index-aligned with keys.
*/
void prefetch_offload_object(const std::string &client_addr,
const std::vector<std::string> &keys,
const std::vector<int64_t> &sizes);

private:
/**
* @brief A batch of allocated memory buffers, tracking both handles and
Expand Down Expand Up @@ -225,7 +240,10 @@ class PyClient {
const std::shared_ptr<TransferEngine> &transfer_engine,
const std::string &ipc_socket_path, bool enable_ssd_offload = false,
const std::string &ssd_offload_path = "",
const std::string &tenant_id = "default") = 0;
const std::string &tenant_id = "default",
int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC,
int64_t ssd_prefetch_dedup_ttl_sec =
DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) = 0;

virtual int setup_dummy(size_t mem_pool_size, size_t local_buffer_size,
const std::string &server_address,
Expand Down Expand Up @@ -338,10 +356,12 @@ class PyClient {
virtual std::vector<int> batchRemove(const std::vector<std::string> &keys,
bool force = false) = 0;

virtual int isExist(const std::string &key) = 0;
virtual int isExist(const std::string &key,
const ExistOptions &options = ExistOptions{}) = 0;

virtual std::vector<int> batchIsExist(
const std::vector<std::string> &keys) = 0;
const std::vector<std::string> &keys,
const ExistOptions &options = ExistOptions{}) = 0;

virtual int64_t getSize(const std::string &key) = 0;

Expand Down
Loading
Loading