diff --git a/docs/source/deployment/mooncake-store-deployment-guide.md b/docs/source/deployment/mooncake-store-deployment-guide.md index f7502abac3..08728810ab 100644 --- a/docs/source/deployment/mooncake-store-deployment-guide.md +++ b/docs/source/deployment/mooncake-store-deployment-guide.md @@ -189,10 +189,12 @@ mooncake_master \ --- -### Tiered Storage with SSD Offload — Cost-Effective Capacity +### Tiered Storage with SSD Offload - Cost-Effective Capacity Extends the cache pool from DRAM to SSD while keeping normal reads and writes on the distributed memory path. With `--enable_offload=true`, completed memory writes are queued for asynchronous SSD persistence through the master control plane. Set `--offload_on_evict=true` to defer that SSD write until the memory eviction path selects an object for reclamation. When `--promotion_on_hit=true`, SSD-only objects can be promoted back to DRAM after repeated reads; admission is gated by `--promotion_admission_threshold`. +Promotion execution runs on the FileStorage holder client. By default one background worker drains promotion tasks outside the heartbeat thread. Increase `MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS` only when SSD bandwidth, network bandwidth, and DRAM allocation headroom can absorb more concurrent L2-to-L1 copies. + ```bash mooncake_master \ --enable_offload=true \ @@ -483,6 +485,16 @@ Flags for controlling data movement between DRAM and SSD. Start with `--enable_offload=true` for eager asynchronous SSD persistence after `Put` completion. Add `--offload_on_evict=true` when you want SSD writes to happen only when memory pressure selects an object for eviction. Add `--promotion_on_hit=true` to allow hot SSD-only data to be promoted back to DRAM, and tune `--promotion_admission_threshold` to control how many observed reads are required before promotion is queued. +FileStorage holder clients also accept the following environment variables: + +| Env | Default | Description | +|-----|---------|-------------| +| `MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS` | `1` | Background workers used to execute L2-to-L1 promotion tasks; `0` falls back to synchronous heartbeat execution | +| `MOONCAKE_OFFLOAD_PROMOTION_QUEUE_CAPACITY` | `1024` | Soft local backlog cap used to limit additional promotion pulls from the master | +| `MOONCAKE_OFFLOAD_PROMOTION_DRAIN_BATCH_SIZE` | `64` | Max promotion heartbeat pulls per worker per FileStorage heartbeat tick | + +Keep the default worker count for latency-sensitive deployments. Raising it can drain bursty HiCache prefix-hit promotion backlogs faster, but it also increases SSD reads, transfer writes, and memory allocation pressure. + ### CXL Memory | Flag | Default | Description | diff --git a/mooncake-store/include/file_storage.h b/mooncake-store/include/file_storage.h index 6aa2ac3d56..50a48ea2d0 100644 --- a/mooncake-store/include/file_storage.h +++ b/mooncake-store/include/file_storage.h @@ -1,9 +1,13 @@ #pragma once +#include +#include +#include + #include "client_service.h" #include "client_buffer.hpp" -#include "storage_backend.h" #include "pinned_buffer_pool.h" +#include "storage_backend.h" namespace mooncake { @@ -52,6 +56,16 @@ class FileStorage { private: friend class FileStorageTest; friend class FileStoragePromotionTest; + + struct PromotionExecutionResult { + bool alloc_attempted = false; + bool write_attempted = false; + bool notify_success_attempted = false; + bool notify_failure_attempted = false; + bool completed = false; + ErrorCode terminal_error = ErrorCode::OK; + }; + struct AllocatedBatch { uint64_t batch_id; std::vector handles; @@ -83,24 +97,39 @@ class FileStorage { * client. * 2. Receives feedback on which objects should be offloaded. * 3. Triggers asynchronous offloading of pending objects. - * 4. Pulls and processes any pending L2->L1 promotion tasks queued by the - * master (mirror of step 1+2 in the reverse direction). + * 4. Pulls any pending L2->L1 promotion tasks queued by the master and + * dispatches them for execution (mirror of step 1+2 in the reverse + * direction). * @return tl::expected indicating operation status. */ tl::expected Heartbeat(); /** * @brief Drives the L2->L1 promotion pipeline for one heartbeat tick. - * Pulls promotion work from the master, stages a MEMORY replica for each - * key, copies the bytes from local SSD into that replica, and notifies the - * master on success. A failure on any single key is logged and skipped; - * the master-side reaper decrements the source replica's refcnt and - * erases the task entry on TTL expiry, and any orphaned PROCESSING - * MEMORY replica is reaped via the standard discarded-replicas path. + * + * Pulls promotion work from the master and either processes it + * synchronously or enqueues it for background workers. Each task stages a + * MEMORY replica, copies the bytes from local SSD into that replica, and + * notifies the master on success. FileStorage eagerly reports per-key + * failures so the master can release the promotion slot immediately, with + * the reaper acting as a long-stop. + * * @return tl::expected indicating operation status. */ tl::expected ProcessPromotionTasks(); + PromotionExecutionResult ProcessPromotionTask( + const PromotionTaskItem& task, + const std::vector& preferred_segments); + + bool EnqueuePromotionTask(const PromotionTaskItem& task, + bool allow_over_capacity_for_pulled_task = false); + + void ReleasePromotionTask(const std::string& key, + const std::string& tenant_id); + + void PromotionWorkerThreadFunc(); + tl::expected IsEnableOffloading(); tl::expected BatchLoad( @@ -143,6 +172,11 @@ class FileStorage { std::thread heartbeat_thread_; std::atomic client_buffer_gc_running_; std::thread client_buffer_gc_thread_; + std::atomic promotion_workers_running_{false}; + std::vector promotion_worker_threads_; + std::mutex promotion_queue_mutex_; + std::condition_variable promotion_queue_cv_; + std::deque promotion_task_queue_; std::future rescan_future_; std::atomic metadata_resync_pending_{false}; }; diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index 5fd071b59b..3a38a95834 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -226,6 +226,14 @@ struct FileStorageConfig { uint32_t client_buffer_gc_interval_seconds = 1; uint64_t client_buffer_gc_ttl_ms = 5000; + // Background worker settings for L2->L1 promotion-on-hit execution. + // Set promotion_worker_threads to 0 to disable async workers and fall back + // to the synchronous heartbeat path. + uint32_t promotion_worker_threads = 1; + // Soft local backlog cap used to limit additional master pulls. + uint32_t promotion_queue_capacity = 1024; + uint32_t promotion_drain_batch_size = 64; + // Use io_uring for file I/O instead of POSIX pread/pwrite bool use_uring = false; @@ -1209,4 +1217,4 @@ class OffsetAllocatorStorageBackend : public StorageBackendInterface { tl::expected, ErrorCode> CreateStorageBackend(const FileStorageConfig& config); -} // namespace mooncake \ No newline at end of file +} // namespace mooncake diff --git a/mooncake-store/src/file_storage.cpp b/mooncake-store/src/file_storage.cpp index 4830c13eb3..4988179381 100644 --- a/mooncake-store/src/file_storage.cpp +++ b/mooncake-store/src/file_storage.cpp @@ -1,5 +1,6 @@ #include "file_storage.h" +#include #include #include @@ -86,6 +87,16 @@ FileStorageConfig FileStorageConfig::FromEnvironment() { GetEnvOr("MOONCAKE_OFFLOAD_CLIENT_BUFFER_GC_TTL_MS", config.client_buffer_gc_ttl_ms); + config.promotion_worker_threads = + GetEnvOr("MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS", + config.promotion_worker_threads); + config.promotion_queue_capacity = + GetEnvOr("MOONCAKE_OFFLOAD_PROMOTION_QUEUE_CAPACITY", + config.promotion_queue_capacity); + config.promotion_drain_batch_size = + GetEnvOr("MOONCAKE_OFFLOAD_PROMOTION_DRAIN_BATCH_SIZE", + config.promotion_drain_batch_size); + auto use_uring_str = GetEnvStringOr("MOONCAKE_OFFLOAD_USE_URING", GetEnvStringOr("MOONCAKE_USE_URING", "false")); @@ -229,6 +240,25 @@ FileStorage::~FileStorage() { if (client_buffer_gc_thread_.joinable()) { client_buffer_gc_thread_.join(); } + std::vector queued_tasks_to_release; + { + std::lock_guard lock(promotion_queue_mutex_); + while (!promotion_task_queue_.empty()) { + queued_tasks_to_release.push_back( + std::move(promotion_task_queue_.front())); + promotion_task_queue_.pop_front(); + } + } + promotion_workers_running_.store(false); + promotion_queue_cv_.notify_all(); + for (auto& worker : promotion_worker_threads_) { + if (worker.joinable()) { + worker.join(); + } + } + for (const auto& task : queued_tasks_to_release) { + ReleasePromotionTask(task.key, task.tenant_id); + } } tl::expected FileStorage::Init() { @@ -303,6 +333,14 @@ tl::expected FileStorage::Init() { return scan_meta_result; } + if (config_.promotion_worker_threads > 0) { + promotion_workers_running_.store(true); + promotion_worker_threads_.reserve(config_.promotion_worker_threads); + for (uint32_t i = 0; i < config_.promotion_worker_threads; ++i) { + promotion_worker_threads_.emplace_back( + &FileStorage::PromotionWorkerThreadFunc, this); + } + } heartbeat_running_.store(true); heartbeat_thread_ = std::thread([this]() { LOG(INFO) << "Starting periodic task with interval: " @@ -626,6 +664,20 @@ tl::expected FileStorage::Heartbeat() { auto remount_result = client_->MountLocalDiskSegment(enable_offloading_); if (remount_result) { + // Re-report SSD capacity: the master lost this segment's + // ssd_total_capacity_bytes on restart/failover (the field + // is in-memory and not replicated via oplog). Without + // this, the new leader's "SSD Storage" denominator stays + // at 0 and SSD-level eviction never triggers. + if (config_.total_size_limit > 0) { + auto cap_result = + client_->ReportSsdCapacity(config_.total_size_limit); + if (!cap_result) { + LOG(WARNING) << "ReportSsdCapacity failed after " + << "re-registration: " + << cap_result.error(); + } + } heartbeat_result = client_->OffloadObjectHeartbeat( enable_offloading_, offloading_objects); if (!heartbeat_result) { @@ -663,20 +715,19 @@ tl::expected FileStorage::Heartbeat() { } } - if (offloading_objects.empty()) { - return {}; - } - // === STEP 2: Persist offloaded objects (trigger actual data migration) === - auto offload_result = OffloadObjects(offloading_objects); - if (!offload_result) { - LOG(ERROR) << "Failed to persist objects with error: " - << offload_result.error(); - return offload_result; + if (!offloading_objects.empty()) { + // === STEP 2: Persist offloaded objects (trigger actual data + // migration) === + auto offload_result = OffloadObjects(offloading_objects); + if (!offload_result) { + LOG(ERROR) << "Failed to persist objects with error: " + << offload_result.error(); + return offload_result; + } } - // Drive any pending L2->L1 promotion work for this client. Failures - // inside ProcessPromotionTasks are logged per-key and do not propagate; - // promotion is best-effort and must never break offload. + // Pull any pending L2->L1 promotion work for this client. Execution is + // delegated to background workers so the heartbeat path stays responsive. (void)ProcessPromotionTasks(); // TODO(eviction): Implement an LRU eviction mechanism to manage local @@ -689,148 +740,271 @@ tl::expected FileStorage::ProcessPromotionTasks() { return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } - std::vector promotion_objects; - auto heartbeat_result = - client_->PromotionObjectHeartbeat(promotion_objects); - if (!heartbeat_result) { - // SEGMENT_NOT_FOUND happens between MountLocalDiskSegment and the - // first heartbeat tick if the master forgets us (e.g. across a master - // restart): benign no-op until next ReMount. - if (heartbeat_result.error() == ErrorCode::SEGMENT_NOT_FOUND) { + const std::vector sync_preferred_segments; + if (!promotion_workers_running_.load()) { + std::vector promotion_objects; + auto heartbeat_result = + client_->PromotionObjectHeartbeat(promotion_objects); + if (!heartbeat_result) { + // SEGMENT_NOT_FOUND happens between MountLocalDiskSegment and the + // first heartbeat tick if the master forgets us (e.g. across a + // master restart): benign no-op until next ReMount. + if (heartbeat_result.error() == ErrorCode::SEGMENT_NOT_FOUND) { + return {}; + } + LOG(WARNING) << "PromotionObjectHeartbeat failed: " + << heartbeat_result.error(); + return tl::make_unexpected(heartbeat_result.error()); + } + if (promotion_objects.empty()) { return {}; } - LOG(WARNING) << "PromotionObjectHeartbeat failed: " - << heartbeat_result.error(); - return tl::make_unexpected(heartbeat_result.error()); - } - if (promotion_objects.empty()) { - return {}; - } - VLOG(1) << "ProcessPromotionTasks pulled " << promotion_objects.size() - << " promotion candidate(s) from master"; - - // No segment preference from the client: let master pick from any - // DRAM segment. - const std::vector preferred_segments; + VLOG(1) << "ProcessPromotionTasks pulled " << promotion_objects.size() + << " promotion candidate(s) from master"; - // The master caps per-heartbeat work via PromotionObjectHeartbeat, - // returning at most one task per call so the heartbeat thread stays - // within the client-liveness window even for large objects. Leftover - // work stays queued in the master's promotion_objects map and is - // returned on subsequent heartbeats; we process whatever we received - // here without a second client-side cap. - for (const auto& task : promotion_objects) { - const auto& key = task.key; - const auto& tenant_id = task.tenant_id; - const int64_t size = task.size; - const auto storage_key = MakeTenantScopedStorageKey(tenant_id, key); - if (size <= 0) { - LOG(WARNING) << "Skipping promotion for key=" << key - << " with non-positive size=" << size; - continue; + // Preserve pre-worker semantics for tests and pre-Init callers. + for (const auto& task : promotion_objects) { + (void)ProcessPromotionTask(task, sync_preferred_segments); } + return {}; + } - auto alloc_result = client_->PromotionAllocStart( - key, tenant_id, static_cast(size), preferred_segments); - if (!alloc_result) { - // AllocStart failed (typically NO_AVAILABLE_HANDLE under - // DRAM pressure). No staged buffer to release, but the - // task entry already claimed a promotion_in_flight_ slot - // at admission. Notify the master to release it - // immediately; otherwise the slot stays pinned for the - // reaper TTL (~10 min default), turning transient DRAM - // pressure into a sustained outage of promotion_queue_limit_. - // Notify is idempotent and handles alloc_id == 0 correctly. - VLOG(1) << "PromotionAllocStart failed for key=" << key - << ", error=" << alloc_result.error() - << " (likely no free DRAM); releasing master slot"; - auto release = client_->NotifyPromotionFailure(key, tenant_id); - if (!release) { - VLOG(1) << "Promotion: NotifyPromotionFailure failed for key=" - << key << ", error=" << release.error() - << "; master reaper will reclaim on TTL expiry"; + // Hand off execution to background workers. The capacity is a soft pull + // budget: once the master extracts a task, keep it locally instead of + // cancelling promotion because of transient client-side backlog. + size_t available_queue_slots = 0; + { + std::lock_guard lock(promotion_queue_mutex_); + if (config_.promotion_queue_capacity > 0) { + if (promotion_task_queue_.size() >= + config_.promotion_queue_capacity) { + VLOG(1) << "ProcessPromotionTasks skipped master pull because " + << "local promotion queue is full: queue_depth=" + << promotion_task_queue_.size() << ", queue_capacity=" + << config_.promotion_queue_capacity; + return {}; } - continue; + available_queue_slots = + config_.promotion_queue_capacity - promotion_task_queue_.size(); } + } - // Every failure path past this point has a master-side staged - // PROCESSING MEMORY buffer and an incremented in-flight slot. - // Eagerly notify the master on failure so the buffer is - // reclaimed and the slot is freed; otherwise transient SSD - // throttling or RDMA flakes saturate promotion_queue_limit_ - // for the full reaper TTL. NotifyPromotionFailure is - // idempotent and best-effort — the reaper is the long-stop. - auto release_master_state = [this, &key, &tenant_id]() { - auto release = client_->NotifyPromotionFailure(key, tenant_id); - if (!release) { - VLOG(1) << "Promotion: NotifyPromotionFailure failed for key=" - << key << ", error=" << release.error() - << "; master reaper will reclaim on TTL expiry"; - } - }; + const size_t worker_count = + std::max(1, promotion_worker_threads_.size()); + const size_t drain_batch_size = + std::max(1, config_.promotion_drain_batch_size); + size_t remaining_pull_budget = worker_count * drain_batch_size; + if (config_.promotion_queue_capacity > 0) { + remaining_pull_budget = + std::min(remaining_pull_budget, available_queue_slots); + } + if (remaining_pull_budget == 0) { + return {}; + } - // (a) Allocate an O_DIRECT-aligned staging buffer and read the bytes - // from the local SSD backend into it. AllocateBatch returns a - // shared_ptr whose BufferHandles RAII-release the - // staging space when the local goes out of scope. - std::vector single_key{storage_key}; - std::vector single_size{size}; - auto allocate_res = AllocateBatch(single_key, single_size); - if (!allocate_res) { - LOG(WARNING) << "Promotion: AllocateBatch failed for key=" << key - << ", error=" << allocate_res.error(); - release_master_state(); - continue; - } - auto staging = allocate_res.value(); - auto load_res = BatchLoad(staging->slices); - if (!load_res) { - LOG(WARNING) << "Promotion: BatchLoad failed for key=" << key - << ", error=" << load_res.error(); - release_master_state(); - continue; + size_t pulled_tasks = 0; + size_t enqueued_tasks = 0; + size_t heartbeat_rounds = 0; + auto enqueue_batch = [this, &enqueued_tasks](const auto& tasks) { + for (const auto& task : tasks) { + const bool enqueued = EnqueuePromotionTask( + task, /*allow_over_capacity_for_pulled_task=*/ + config_.promotion_queue_capacity > 0); + CHECK(enqueued) + << "Promotion queue capacity changed after pull planning"; + ++enqueued_tasks; } + }; - // (b) TE-write from the staging slice into the freshly-allocated - // MEMORY replica. Slice ptr may have been bumped by O_DIRECT offset - // correction in BatchLoad, so re-read it from the slice map. - auto slice_it = staging->slices.find(storage_key); - if (slice_it == staging->slices.end()) { - LOG(WARNING) << "Promotion: staging slice missing for key=" << key; - release_master_state(); - continue; + while (pulled_tasks < remaining_pull_budget) { + std::vector promotion_objects; + auto heartbeat_result = + client_->PromotionObjectHeartbeat(promotion_objects); + ++heartbeat_rounds; + if (!heartbeat_result) { + if (heartbeat_result.error() == ErrorCode::SEGMENT_NOT_FOUND) { + return {}; + } + LOG(WARNING) << "PromotionObjectHeartbeat failed after pulling " + << pulled_tasks << " promotion candidate(s): " + << heartbeat_result.error(); + return tl::make_unexpected(heartbeat_result.error()); } - std::vector tx_slices{slice_it->second}; - ErrorCode write_err = client_->PromotionWrite( - alloc_result.value().memory_descriptor, tx_slices); - if (write_err != ErrorCode::OK) { - LOG(WARNING) << "Promotion: TransferWrite failed for key=" << key - << ", error=" << write_err; - release_master_state(); - continue; + if (promotion_objects.empty()) { + break; } - // (c) Commit. Master flips the PROCESSING replica to COMPLETE and it - // becomes visible to readers. - auto notify_res = client_->NotifyPromotionSuccess(key, tenant_id); - if (!notify_res) { - // The write landed but the commit failed. We can't retry the - // commit (the success path is one-shot via alloc_id), and we - // don't know whether the failure was transient or structural. - // Release the master-side state so the slot is reusable; the - // bytes we wrote become stranded under a soon-to-be-erased - // PROCESSING replica, which is harmless. - LOG(WARNING) << "Promotion: NotifyPromotionSuccess failed for key=" - << key << ", error=" << notify_res.error(); - release_master_state(); - continue; + pulled_tasks += promotion_objects.size(); + VLOG(1) << "ProcessPromotionTasks pulled " << promotion_objects.size() + << " promotion candidate(s) from master"; + enqueue_batch(promotion_objects); + } + + VLOG(1) << "ProcessPromotionTasks pulled " << pulled_tasks + << " promotion candidate(s) from master in " << heartbeat_rounds + << " heartbeat round(s), enqueued " << enqueued_tasks + << ", queue_capacity=" << config_.promotion_queue_capacity; + return {}; +} + +FileStorage::PromotionExecutionResult FileStorage::ProcessPromotionTask( + const PromotionTaskItem& task, + const std::vector& preferred_segments) { + PromotionExecutionResult result; + const auto& key = task.key; + const auto& tenant_id = task.tenant_id; + const int64_t size = task.size; + const auto storage_key = MakeTenantScopedStorageKey(tenant_id, key); + + if (size <= 0) { + LOG(WARNING) << "Skipping promotion for key=" << key + << " with non-positive size=" << size; + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = ErrorCode::INVALID_PARAMS; + return result; + } + + result.alloc_attempted = true; + auto alloc_result = client_->PromotionAllocStart( + key, tenant_id, static_cast(size), preferred_segments); + if (!alloc_result) { + VLOG(1) << "PromotionAllocStart failed for key=" << key + << ", error=" << alloc_result.error() + << " (likely no free DRAM); releasing master slot"; + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = alloc_result.error(); + return result; + } + + std::vector single_key{storage_key}; + std::vector single_size{size}; + auto allocate_res = AllocateBatch(single_key, single_size); + if (!allocate_res) { + LOG(WARNING) << "Promotion: AllocateBatch failed for key=" << key + << ", error=" << allocate_res.error(); + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = allocate_res.error(); + return result; + } + + auto staging = allocate_res.value(); + auto load_res = BatchLoad(staging->slices); + if (!load_res) { + LOG(WARNING) << "Promotion: BatchLoad failed for key=" << key + << ", error=" << load_res.error(); + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = load_res.error(); + return result; + } + + auto slice_it = staging->slices.find(storage_key); + if (slice_it == staging->slices.end()) { + LOG(WARNING) << "Promotion: staging slice missing for key=" << key; + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = ErrorCode::INVALID_KEY; + return result; + } + + result.write_attempted = true; + std::vector tx_slices{slice_it->second}; + ErrorCode write_err = client_->PromotionWrite( + alloc_result.value().memory_descriptor, tx_slices); + if (write_err != ErrorCode::OK) { + LOG(WARNING) << "Promotion: TransferWrite failed for key=" << key + << ", error=" << write_err; + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = write_err; + return result; + } + + result.notify_success_attempted = true; + auto notify_res = client_->NotifyPromotionSuccess(key, tenant_id); + if (!notify_res) { + LOG(WARNING) << "Promotion: NotifyPromotionSuccess failed for key=" + << key << ", error=" << notify_res.error(); + ReleasePromotionTask(key, tenant_id); + result.notify_failure_attempted = true; + result.terminal_error = notify_res.error(); + return result; + } + + result.completed = true; + VLOG(1) << "Promotion completed for key=" << key << ", size=" << size; + return result; +} + +bool FileStorage::EnqueuePromotionTask( + const PromotionTaskItem& task, bool allow_over_capacity_for_pulled_task) { + std::lock_guard lock(promotion_queue_mutex_); + if (!promotion_workers_running_.load()) { + LOG(WARNING) << "Promotion workers are not running, rejecting key=" + << task.key; + return false; + } + if (!allow_over_capacity_for_pulled_task && + config_.promotion_queue_capacity > 0 && + promotion_task_queue_.size() >= config_.promotion_queue_capacity) { + LOG(WARNING) << "Promotion queue full (" << promotion_task_queue_.size() + << "), rejecting key=" << task.key; + return false; + } + promotion_task_queue_.push_back(task); + promotion_queue_cv_.notify_one(); + return true; +} + +void FileStorage::ReleasePromotionTask(const std::string& key, + const std::string& tenant_id) { + auto release = client_->NotifyPromotionFailure(key, tenant_id); + if (!release) { + VLOG(1) << "Promotion: NotifyPromotionFailure failed for key=" << key + << ", error=" << release.error() + << "; master reaper will reclaim on TTL expiry"; + } +} + +void FileStorage::PromotionWorkerThreadFunc() { + VLOG(1) << "action=promotion_worker_started"; + const std::vector preferred_segments; + const size_t drain_batch_size = + std::max(1, config_.promotion_drain_batch_size); + + while (true) { + std::vector tasks; + tasks.reserve(drain_batch_size); + + { + std::unique_lock lock(promotion_queue_mutex_); + promotion_queue_cv_.wait(lock, [this]() { + return !promotion_workers_running_.load() || + !promotion_task_queue_.empty(); + }); + + if (!promotion_workers_running_.load() && + promotion_task_queue_.empty()) { + break; + } + + while (!promotion_task_queue_.empty() && + tasks.size() < drain_batch_size) { + tasks.push_back(std::move(promotion_task_queue_.front())); + promotion_task_queue_.pop_front(); + } } - VLOG(1) << "Promotion completed for key=" << key << ", size=" << size; + for (const auto& task : tasks) { + (void)ProcessPromotionTask(task, preferred_segments); + } } - return {}; + VLOG(1) << "action=promotion_worker_stopped"; } tl::expected FileStorage::BatchLoad( diff --git a/mooncake-store/src/posix_file.cpp b/mooncake-store/src/posix_file.cpp index e5f0158276..b188c7501b 100644 --- a/mooncake-store/src/posix_file.cpp +++ b/mooncake-store/src/posix_file.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -55,6 +56,12 @@ tl::expected PosixFile::write(std::span data, ssize_t written = ::write(fd_, ptr, remaining); if (written == -1) { if (errno == EINTR) continue; + int saved_errno = errno; + LOG(ERROR) << "write failed for file: " << filename_ + << ", errno=" << saved_errno + << " (" << strerror(saved_errno) << ")" + << ", fd=" << fd_ + << ", remaining=" << remaining; return make_error(ErrorCode::FILE_WRITE_FAIL); } remaining -= written; @@ -86,6 +93,13 @@ tl::expected PosixFile::read(std::string &buffer, ssize_t n = ::read(fd_, ptr, length - read_bytes); if (n == -1) { if (errno == EINTR) continue; + int saved_errno = errno; + LOG(ERROR) << "read failed for file: " << filename_ + << ", errno=" << saved_errno + << " (" << strerror(saved_errno) << ")" + << ", fd=" << fd_ + << ", length=" << length + << ", read_bytes=" << read_bytes; buffer.clear(); return make_error(ErrorCode::FILE_READ_FAIL); } @@ -108,12 +122,40 @@ tl::expected PosixFile::vector_write(const iovec *iov, return make_error(ErrorCode::FILE_NOT_FOUND); } - ssize_t ret = ::pwritev(fd_, iov, iovcnt, offset); - if (ret < 0) { + size_t total_bytes = 0; + for (int i = 0; i < iovcnt; ++i) total_bytes += iov[i].iov_len; + + size_t written_total = 0; + off_t cur_offset = offset; + + for (int idx = 0; idx < iovcnt; idx += UIO_MAXIOV) { + int chunk_cnt = std::min(iovcnt - idx, UIO_MAXIOV); + ssize_t ret = ::pwritev(fd_, iov + idx, chunk_cnt, cur_offset); + if (ret < 0) { + int saved_errno = errno; + LOG(ERROR) << "pwritev failed for file: " << filename_ + << ", errno=" << saved_errno + << " (" << strerror(saved_errno) << ")" + << ", fd=" << fd_ + << ", iovcnt=" << iovcnt + << ", total_bytes=" << total_bytes + << ", offset=" << offset + << ", chunk_start=" << idx + << ", chunk_cnt=" << chunk_cnt; + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + written_total += ret; + cur_offset += ret; + } + + if (written_total != total_bytes) { + LOG(ERROR) << "pwritev partial write for file: " << filename_ + << ", expected=" << total_bytes + << ", written=" << written_total; return make_error(ErrorCode::FILE_WRITE_FAIL); } - return ret; + return written_total; } tl::expected PosixFile::vector_read(const iovec *iov, @@ -123,12 +165,34 @@ tl::expected PosixFile::vector_read(const iovec *iov, return make_error(ErrorCode::FILE_NOT_FOUND); } - ssize_t ret = ::preadv(fd_, iov, iovcnt, offset); - if (ret < 0) { - return make_error(ErrorCode::FILE_READ_FAIL); + size_t total_bytes = 0; + for (int i = 0; i < iovcnt; ++i) total_bytes += iov[i].iov_len; + + size_t read_total = 0; + off_t cur_offset = offset; + + for (int idx = 0; idx < iovcnt; idx += UIO_MAXIOV) { + int chunk_cnt = std::min(iovcnt - idx, UIO_MAXIOV); + ssize_t ret = ::preadv(fd_, iov + idx, chunk_cnt, cur_offset); + if (ret < 0) { + int saved_errno = errno; + LOG(ERROR) << "preadv failed for file: " << filename_ + << ", errno=" << saved_errno + << " (" << strerror(saved_errno) << ")" + << ", fd=" << fd_ + << ", iovcnt=" << iovcnt + << ", total_bytes=" << total_bytes + << ", offset=" << offset + << ", chunk_start=" << idx + << ", chunk_cnt=" << chunk_cnt; + return make_error(ErrorCode::FILE_READ_FAIL); + } + read_total += ret; + cur_offset += ret; + if (ret == 0) break; // EOF } - return ret; + return read_total; } } // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index 0820170190..246d3fa848 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -2003,86 +2003,7 @@ tl::expected BucketStorageBackend::WriteBucket( } auto file = std::move(open_file_result.value()); -#ifdef USE_URING - // Try to use write_aligned for O_DIRECT I/O if file is UringFile - UringFile* uring_file = dynamic_cast(file.get()); - if (uring_file != nullptr) { - size_t total_size = static_cast(bucket_metadata->data_size); - size_t aligned_size = align_up(total_size, kDirectIOAlignment); - - // Allocate aligned buffer if needed - void* write_buffer = nullptr; - std::unique_ptr temp_buffer{nullptr, - [](void*) {}}; - - if (aligned_size <= kAlignedBufferSize && aligned_io_buffer_) { - // Use the pre-allocated buffer - write_buffer = aligned_io_buffer_.get(); - } else { - // Allocate a temporary larger buffer - void* buf = nullptr; - int ret = posix_memalign(&buf, kDirectIOAlignment, aligned_size); - if (ret != 0) { - LOG(ERROR) - << "Failed to allocate aligned buffer for WriteBucket: " - << strerror(ret); - return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); - } - temp_buffer.reset(buf); - temp_buffer = std::unique_ptr( - buf, [](void* p) { free(p); }); - write_buffer = buf; - LOG(WARNING) << "WriteBucket: bucket_id=" << bucket_id - << " requires " << aligned_size - << " bytes, exceeds buffer size " << kAlignedBufferSize - << ", using temporary allocation"; - } - - // Aggregate all iovs data into the aligned buffer - char* dst = static_cast(write_buffer); - for (const auto& iov : iovs) { - memcpy(dst, iov.iov_base, iov.iov_len); - dst += iov.iov_len; - } - - // Zero-pad the remaining bytes - if (aligned_size > total_size) { - memset(dst, 0, aligned_size - total_size); - } - - // Write using write_aligned - auto write_result = - uring_file->write_aligned(write_buffer, aligned_size, 0); - if (!write_result) { - LOG(ERROR) << "write_aligned failed for: " << bucket_id - << ", error: " << write_result.error(); - return tl::make_unexpected(write_result.error()); - } - if (write_result.value() != aligned_size) { - LOG(ERROR) << "Write size mismatch for: " << bucket_data_path - << ", expected: " << aligned_size - << ", got: " << write_result.value(); - return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); - } - - // Flush bucket data to stable storage before writing metadata. - // This prevents a crash from leaving valid metadata pointing at - // incomplete data (write-ordering durability guarantee). - auto sync_result = uring_file->datasync(); - if (!sync_result) { - LOG(ERROR) << "datasync failed for bucket: " << bucket_id; - return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); - } - - // Invalidate cache for this file since content changed - { - MutexLocker cache_locker(&file_cache_mutex_); - file_cache_.erase(bucket_data_path); - } - } else -#endif { - // Fallback to vector_write for non-UringFile auto write_result = file->vector_write(iovs.data(), iovs.size(), 0); if (!write_result) { LOG(ERROR) << "vector_write failed for: " << bucket_id @@ -2097,6 +2018,17 @@ tl::expected BucketStorageBackend::WriteBucket( return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); } +#ifdef USE_URING + UringFile* uring_file = dynamic_cast(file.get()); + if (uring_file != nullptr) { + auto sync_result = uring_file->datasync(); + if (!sync_result) { + LOG(ERROR) << "datasync failed for bucket: " << bucket_id; + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); + } + } +#endif + // Invalidate cache for this file since content changed { MutexLocker cache_locker(&file_cache_mutex_); @@ -2577,9 +2509,11 @@ BucketStorageBackend::OpenFile(const std::string& path, FileMode mode) const { } #ifdef USE_URING - // Use O_DIRECT only for reads: write latency is not sensitive in this - // scenario, and O_DIRECT writes require 4096-byte alignment padding which - // corrupts meta file parsing and wastes disk space on data files. + // Use O_DIRECT only for reads: O_DIRECT writes require 4096-byte alignment + // padding which corrupts meta file parsing and wastes disk space on data + // files. Writes still create a UringFile in buffered mode (use_direct_io + // = false) so they benefit from io_uring async submission and queue depth + // without the padding problem. if (file_storage_config_.use_uring && mode == FileMode::Read) { flags |= O_DIRECT; } @@ -2592,8 +2526,9 @@ BucketStorageBackend::OpenFile(const std::string& path, FileMode mode) const { return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } #ifdef USE_URING - if (file_storage_config_.use_uring && mode == FileMode::Read) { - return std::make_unique(path, fd, 32, true); + if (file_storage_config_.use_uring) { + bool use_direct_io = (mode == FileMode::Read); + return std::make_unique(path, fd, 32, use_direct_io); } #endif return std::make_unique(path, fd); diff --git a/mooncake-store/tests/file_storage_promotion_test.cpp b/mooncake-store/tests/file_storage_promotion_test.cpp index a553d92dcb..9f873e6cfd 100644 --- a/mooncake-store/tests/file_storage_promotion_test.cpp +++ b/mooncake-store/tests/file_storage_promotion_test.cpp @@ -6,10 +6,13 @@ #include #include +#include #include +#include #include #include +#include "allocator.h" #include "client_service.h" #include "file_storage.h" #include "storage_backend.h" @@ -29,10 +32,16 @@ class FakeClient : public Client { /*protocol=*/"tcp", /*labels=*/{}) {} + static std::string& CurrentAllocKey() { + static thread_local std::string current_alloc_key; + return current_alloc_key; + } + // Drives the queue returned to the heartbeat caller. std::vector heartbeat_queue; tl::expected heartbeat_result = tl::expected{}; + size_t heartbeat_batch_limit = 1; tl::expected PromotionObjectHeartbeat( std::vector& promotion_objects) override { @@ -46,9 +55,8 @@ class FakeClient : public Client { // queued for subsequent calls. Tests iterate by calling // ProcessPromotionTasks multiple times until heartbeat_queue is // empty. - constexpr size_t kMaxPerHeartbeat = 1; promotion_objects.clear(); - while (promotion_objects.size() < kMaxPerHeartbeat && + while (promotion_objects.size() < heartbeat_batch_limit && !heartbeat_queue.empty()) { promotion_objects.push_back(std::move(heartbeat_queue.back())); heartbeat_queue.pop_back(); @@ -76,6 +84,7 @@ class FakeClient : public Client { (void)preferred_segments; alloc_calls.fetch_add(1); last_alloc_key = key; + CurrentAllocKey() = key; auto it = alloc_overrides.find(key); if (it != alloc_overrides.end()) { return tl::make_unexpected(it->second); @@ -89,11 +98,24 @@ class FakeClient : public Client { // PromotionWrite: per-key dispatch. std::unordered_map write_overrides; ErrorCode default_write_result = ErrorCode::OK; + std::atomic active_writes{0}; + std::atomic max_concurrent_writes{0}; + std::chrono::milliseconds write_delay{0}; ErrorCode PromotionWrite(const Replica::Descriptor&, std::vector&) override { write_calls.fetch_add(1); - auto it = write_overrides.find(last_alloc_key); + int concurrent = active_writes.fetch_add(1) + 1; + int observed = max_concurrent_writes.load(); + while (concurrent > observed && + !max_concurrent_writes.compare_exchange_weak(observed, + concurrent)) { + } + if (write_delay.count() > 0) { + std::this_thread::sleep_for(write_delay); + } + auto it = write_overrides.find(CurrentAllocKey()); + active_writes.fetch_sub(1); if (it != write_overrides.end()) { return it->second; } @@ -113,6 +135,7 @@ class FakeClient : public Client { const std::string& key, const std::string& tenant_id) override { (void)tenant_id; notify_calls.fetch_add(1); + std::lock_guard lock(record_mutex); notify_keys.push_back(key); auto it = notify_overrides.find(key); if (it != notify_overrides.end()) { @@ -136,6 +159,7 @@ class FakeClient : public Client { const std::string& key, const std::string& tenant_id) override { (void)tenant_id; notify_failure_calls.fetch_add(1); + std::lock_guard lock(record_mutex); notify_failure_keys.push_back(key); return {}; } @@ -145,6 +169,7 @@ class FakeClient : public Client { std::atomic write_calls{0}; std::atomic notify_calls{0}; std::atomic notify_failure_calls{0}; + std::mutex record_mutex; std::vector notify_keys; std::vector notify_failure_keys; std::string last_alloc_key; @@ -176,6 +201,8 @@ class FileStoragePromotionTest : public ::testing::Test { fake = std::make_shared(); file_storage = std::make_unique(cfg, fake, "localhost:9003"); + auto init_res = file_storage->storage_backend_->Init(); + ASSERT_TRUE(init_res.has_value()); } void TearDown() override { @@ -192,6 +219,70 @@ class FileStoragePromotionTest : public ::testing::Test { return file_storage->ProcessPromotionTasks(); } + void StartPromotionWorkers(size_t worker_count = 1) { + file_storage->promotion_workers_running_.store(true); + file_storage->promotion_worker_threads_.reserve(worker_count); + for (size_t i = 0; i < worker_count; ++i) { + file_storage->promotion_worker_threads_.emplace_back( + &FileStorage::PromotionWorkerThreadFunc, file_storage.get()); + } + } + + void SeedPromotionObject(const std::string& key, int64_t size) { + std::shared_ptr allocator = + std::make_shared(128 * 1024 * 1024); + std::unordered_map> batched_slices; + void* buffer = allocator->allocate(size); + ASSERT_NE(buffer, nullptr); + std::memset(buffer, 'x', static_cast(size)); + batched_slices.emplace( + MakeTenantScopedStorageKey("default", key), + std::vector{{buffer, static_cast(size)}}); + + auto offload_res = file_storage->storage_backend_->BatchOffload( + batched_slices, [](const std::vector& keys, + const std::vector&) { + return keys.empty() ? ErrorCode::INVALID_KEY : ErrorCode::OK; + }); + ASSERT_TRUE(offload_res.has_value()); + auto exists_res = file_storage->storage_backend_->IsExist( + MakeTenantScopedStorageKey("default", key)); + ASSERT_TRUE(exists_res.has_value()); + ASSERT_TRUE(exists_res.value()); + } + + void WaitForQueueToDrain(std::chrono::milliseconds timeout) { + auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) { + { + std::lock_guard lock( + file_storage->promotion_queue_mutex_); + if (file_storage->promotion_task_queue_.empty() && + fake->active_writes.load() == 0) { + return; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + bool PromotionQueueEmpty() { + std::lock_guard lock(file_storage->promotion_queue_mutex_); + return file_storage->promotion_task_queue_.empty(); + } + + template + void WaitForCondition(Predicate predicate, + std::chrono::milliseconds timeout) { + auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) { + if (predicate()) { + return; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + // Drain a multi-key queue across multiple ticks. ProcessPromotionTasks // caps work at 1 task/tick (heartbeat-safety) and the FakeClient's // PromotionObjectHeartbeat mirrors production by clearing the queue on @@ -235,6 +326,13 @@ TEST_F(FileStoragePromotionTest, EmptyQueueIsNoOp) { EXPECT_EQ(fake->notify_calls.load(), 0); } +TEST_F(FileStoragePromotionTest, EmptyQueueDoesNotEnqueueWorkerTask) { + fake->heartbeat_queue = {}; + auto res = CallProcessPromotionTasks(); + EXPECT_TRUE(res.has_value()); + EXPECT_TRUE(PromotionQueueEmpty()); +} + // Heartbeat returns SEGMENT_NOT_FOUND (transient post-restart): // swallow silently, no error to caller. TEST_F(FileStoragePromotionTest, HeartbeatSegmentNotFoundIsBenign) { @@ -253,20 +351,25 @@ TEST_F(FileStoragePromotionTest, HeartbeatHardErrorPropagates) { EXPECT_EQ(fake->alloc_calls.load(), 0); } -// Non-positive size in queue: skip that key, continue. -TEST_F(FileStoragePromotionTest, NonPositiveSizeSkipped) { +// Non-positive size in queue: release the master slot and continue. +TEST_F(FileStoragePromotionTest, NonPositiveSizeNotifiesMasterAndContinues) { + SeedPromotionObject("k_good", 1024); + fake->heartbeat_batch_limit = 2; fake->heartbeat_queue = { {.tenant_id = "default", .key = "k_bad", .size = 0}, {.tenant_id = "default", .key = "k_good", .size = 1024}}; auto res = CallProcessPromotionTasks(); EXPECT_TRUE(res.has_value()); - // Only k_good should reach AllocStart. EXPECT_EQ(fake->alloc_calls.load(), 1); EXPECT_EQ(fake->last_alloc_key, "k_good"); + EXPECT_EQ(fake->notify_calls.load(), 1); + EXPECT_EQ(fake->notify_failure_calls.load(), 1); + ASSERT_EQ(fake->notify_failure_keys.size(), 1u); + EXPECT_EQ(fake->notify_failure_keys[0], "k_bad"); } // PromotionAllocStart fails (e.g. master out of DRAM): skip key, no -// write, no notify, advance to next. +// write, no success notify, release the master slot, and advance to next. TEST_F(FileStoragePromotionTest, AllocStartFailureSkipsKey) { fake->alloc_overrides["k1"] = ErrorCode::NO_AVAILABLE_HANDLE; auto res = DrainAllPromotionTasks({{"k1", 1024}, {"k2", 1024}}); @@ -278,8 +381,8 @@ TEST_F(FileStoragePromotionTest, AllocStartFailureSkipsKey) { EXPECT_LE(fake->notify_calls.load(), 1); } -// BatchLoad failure (SSD file missing): no PromotionWrite, no Notify. -// Master-side reaper handles the orphaned PROCESSING replica. +// BatchLoad failure (SSD file missing): no PromotionWrite or success notify. +// FileStorage reports failure so the master can release the slot immediately. TEST_F(FileStoragePromotionTest, BatchLoadFailureLeavesNoNotify) { fake->heartbeat_queue = { {.tenant_id = "default", .key = "k_missing", .size = 1024}}; @@ -295,7 +398,7 @@ TEST_F(FileStoragePromotionTest, BatchLoadFailureLeavesNoNotify) { << "NotifyPromotionSuccess must not run if BatchLoad failed"; } -// PromotionWrite failure: no Notify. +// PromotionWrite failure: no success notify. TEST_F(FileStoragePromotionTest, TransferWriteFailureLeavesNoNotify) { fake->heartbeat_queue = { {.tenant_id = "default", .key = "k_te_fail", .size = 1024}}; @@ -370,6 +473,7 @@ TEST_F(FileStoragePromotionTest, PostAllocFailuresAllNotifyMaster) { // will fail because no SSD file exists for this key in data_path. // k_notify_fail: AllocStart and BatchLoad and TransferWrite all // succeed; Notify is overridden to fail. + SeedPromotionObject("k_notify_fail", 1024); fake->notify_overrides["k_notify_fail"] = ErrorCode::OBJECT_NOT_FOUND; auto res = DrainAllPromotionTasks({ @@ -379,6 +483,10 @@ TEST_F(FileStoragePromotionTest, PostAllocFailuresAllNotifyMaster) { }); EXPECT_TRUE(res.has_value()); EXPECT_EQ(fake->alloc_calls.load(), 3); + EXPECT_EQ(fake->notify_calls.load(), 1) + << "Exactly one key should reach NotifyPromotionSuccess: " + << "k_notify_fail must pass BatchLoad/TransferWrite before its " + << "overridden notify failure path releases the master slot."; // Every failed key must have its slot released via Notify-Failure. // k_notify_fail also counts: Notify-Success failed, so we still @@ -394,6 +502,92 @@ TEST_F(FileStoragePromotionTest, PostAllocFailuresAllNotifyMaster) { EXPECT_EQ(got.count("k_notify_fail"), 1u); } +TEST_F(FileStoragePromotionTest, QueueFullStopsAdditionalMasterPulls) { + file_storage->config_.promotion_queue_capacity = 1; + StartPromotionWorkers(); + SeedPromotionObject("k1", 1024); + SeedPromotionObject("k2", 1024); + fake->write_delay = std::chrono::milliseconds(150); + fake->heartbeat_batch_limit = 1; + fake->heartbeat_queue = { + {.tenant_id = "default", .key = "k2", .size = 1024}, + {.tenant_id = "default", .key = "k1", .size = 1024}}; + + auto res = CallProcessPromotionTasks(); + EXPECT_TRUE(res.has_value()); + WaitForCondition([this]() { return fake->alloc_calls.load() >= 1; }, + std::chrono::milliseconds(500)); + EXPECT_EQ(fake->heartbeat_calls.load(), 1); + EXPECT_EQ(fake->alloc_calls.load(), 1); + EXPECT_EQ(fake->notify_failure_calls.load(), 0); + EXPECT_EQ(fake->heartbeat_queue.size(), 1u); +} + +TEST_F(FileStoragePromotionTest, PulledBatchCanExceedSoftQueueCapacity) { + file_storage->config_.promotion_queue_capacity = 1; + file_storage->config_.promotion_drain_batch_size = 4; + StartPromotionWorkers(); + SeedPromotionObject("k1", 1024); + SeedPromotionObject("k2", 1024); + fake->heartbeat_batch_limit = 2; + fake->heartbeat_queue = { + {.tenant_id = "default", .key = "k2", .size = 1024}, + {.tenant_id = "default", .key = "k1", .size = 1024}}; + + auto res = CallProcessPromotionTasks(); + EXPECT_TRUE(res.has_value()); + WaitForCondition([this]() { return fake->alloc_calls.load() == 2; }, + std::chrono::milliseconds(500)); + EXPECT_EQ(fake->heartbeat_calls.load(), 1); + EXPECT_EQ(fake->alloc_calls.load(), 2); + EXPECT_EQ(fake->notify_failure_calls.load(), 0); +} + +TEST_F(FileStoragePromotionTest, WorkerDrainsQueuedTasks) { + file_storage->config_.promotion_queue_capacity = 8; + file_storage->config_.promotion_drain_batch_size = 4; + StartPromotionWorkers(); + SeedPromotionObject("k1", 1024); + SeedPromotionObject("k2", 1024); + SeedPromotionObject("k3", 1024); + SeedPromotionObject("k4", 1024); + fake->heartbeat_batch_limit = 4; + fake->heartbeat_queue = { + {.tenant_id = "default", .key = "k4", .size = 1024}, + {.tenant_id = "default", .key = "k3", .size = 1024}, + {.tenant_id = "default", .key = "k2", .size = 1024}, + {.tenant_id = "default", .key = "k1", .size = 1024}}; + + auto res = CallProcessPromotionTasks(); + EXPECT_TRUE(res.has_value()); + WaitForCondition([this]() { return fake->alloc_calls.load() == 4; }, + std::chrono::milliseconds(500)); + EXPECT_EQ(fake->alloc_calls.load(), 4); +} + +TEST_F(FileStoragePromotionTest, WorkerModePullsMultipleHeartbeatBatches) { + file_storage->config_.promotion_queue_capacity = 8; + file_storage->config_.promotion_drain_batch_size = 4; + StartPromotionWorkers(); + SeedPromotionObject("k1", 1024); + SeedPromotionObject("k2", 1024); + SeedPromotionObject("k3", 1024); + SeedPromotionObject("k4", 1024); + fake->heartbeat_batch_limit = 1; + fake->heartbeat_queue = { + {.tenant_id = "default", .key = "k4", .size = 1024}, + {.tenant_id = "default", .key = "k3", .size = 1024}, + {.tenant_id = "default", .key = "k2", .size = 1024}, + {.tenant_id = "default", .key = "k1", .size = 1024}}; + + auto res = CallProcessPromotionTasks(); + EXPECT_TRUE(res.has_value()); + WaitForCondition([this]() { return fake->alloc_calls.load() == 4; }, + std::chrono::milliseconds(500)); + EXPECT_EQ(fake->heartbeat_calls.load(), 4); + EXPECT_EQ(fake->alloc_calls.load(), 4); +} + } // namespace mooncake int main(int argc, char** argv) { diff --git a/mooncake-store/tests/file_storage_test.cpp b/mooncake-store/tests/file_storage_test.cpp index 24b2f67243..508efc9650 100644 --- a/mooncake-store/tests/file_storage_test.cpp +++ b/mooncake-store/tests/file_storage_test.cpp @@ -33,6 +33,9 @@ class FileStorageTest : public ::testing::Test { UnsetEnv("MOONCAKE_OFFLOAD_TOTAL_KEYS_LIMIT"); UnsetEnv("MOONCAKE_OFFLOAD_TOTAL_SIZE_LIMIT_BYTES"); UnsetEnv("MOONCAKE_OFFLOAD_HEARTBEAT_INTERVAL_SECONDS"); + UnsetEnv("MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS"); + UnsetEnv("MOONCAKE_OFFLOAD_PROMOTION_QUEUE_CAPACITY"); + UnsetEnv("MOONCAKE_OFFLOAD_PROMOTION_DRAIN_BATCH_SIZE"); data_path = std::filesystem::current_path().string() + "/data"; fs::create_directories(data_path); for (const auto& entry : fs::directory_iterator(data_path)) { @@ -280,6 +283,9 @@ TEST_F(FileStorageTest, DefaultValuesWhenNoEnvSet) { EXPECT_EQ(config.total_keys_limit, 10'000'000); EXPECT_EQ(config.total_size_limit, 2ULL * 1024 * 1024 * 1024 * 1024); EXPECT_EQ(config.heartbeat_interval_seconds, 10u); + EXPECT_EQ(config.promotion_worker_threads, 1u); + EXPECT_EQ(config.promotion_queue_capacity, 1024u); + EXPECT_EQ(config.promotion_drain_batch_size, 64u); } TEST_F(FileStorageTest, ReadStringFromEnv) { @@ -304,15 +310,31 @@ TEST_F(FileStorageTest, ReadInt64FromEnv) { TEST_F(FileStorageTest, ReadUint32FromEnv) { SetEnv("MOONCAKE_OFFLOAD_HEARTBEAT_INTERVAL_SECONDS", "5"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS", "3"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_QUEUE_CAPACITY", "2048"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_DRAIN_BATCH_SIZE", "16"); auto config = FileStorageConfig::FromEnvironment(); EXPECT_EQ(config.heartbeat_interval_seconds, 5u); + EXPECT_EQ(config.promotion_worker_threads, 3u); + EXPECT_EQ(config.promotion_queue_capacity, 2048u); + EXPECT_EQ(config.promotion_drain_batch_size, 16u); +} + +TEST_F(FileStorageTest, ZeroPromotionWorkersDisablesAsyncMode) { + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS", "0"); + + auto config = FileStorageConfig::FromEnvironment(); + EXPECT_EQ(config.promotion_worker_threads, 0u); } TEST_F(FileStorageTest, InvalidIntValueUsesDefault) { SetEnv("MOONCAKE_OFFLOAD_BUCKET_KEYS_LIMIT", "abc"); SetEnv("MOONCAKE_OFFLOAD_TOTAL_SIZE_LIMIT_BYTES", "sdfsdf"); SetEnv("MOONCAKE_OFFLOAD_HEARTBEAT_INTERVAL_SECONDS", "-1"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_WORKER_THREADS", "-5"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_QUEUE_CAPACITY", "bad"); + SetEnv("MOONCAKE_OFFLOAD_PROMOTION_DRAIN_BATCH_SIZE", "-1"); auto config = FileStorageConfig::FromEnvironment(); auto bucket_backend_config = BucketBackendConfig::FromEnvironment(); @@ -320,6 +342,9 @@ TEST_F(FileStorageTest, InvalidIntValueUsesDefault) { EXPECT_EQ(bucket_backend_config.bucket_keys_limit, 500); EXPECT_EQ(config.total_size_limit, 2ULL * 1024 * 1024 * 1024 * 1024); EXPECT_EQ(config.heartbeat_interval_seconds, 10u); + EXPECT_EQ(config.promotion_worker_threads, 1u); + EXPECT_EQ(config.promotion_queue_capacity, 1024u); + EXPECT_EQ(config.promotion_drain_batch_size, 64u); } TEST_F(FileStorageTest, OutOfRangeValueUsesDefault) { @@ -542,4 +567,4 @@ TEST_F(FileStorageTest, NullSsdMetricDoesNotCrash) { // No crash = success. No metrics pointer, so nothing to verify. } -} // namespace mooncake \ No newline at end of file +} // namespace mooncake diff --git a/mooncake-wheel/tests/test_promotion_on_hit.py b/mooncake-wheel/tests/test_promotion_on_hit.py index 19efc17722..12c53ddf8b 100644 --- a/mooncake-wheel/tests/test_promotion_on_hit.py +++ b/mooncake-wheel/tests/test_promotion_on_hit.py @@ -1,7 +1,7 @@ """Python-binding test for the L2->L1 promotion-on-hit feature. Mirror of ``test_offload_on_eviction.py``: that test asserts data flows -DRAM -> SSD on eviction; this test asserts the reverse — once an object +DRAM -> SSD on eviction; this test asserts the reverse - once an object exists only on LOCAL_DISK and a client reads it enough times to clear the admission threshold, the master enqueues a promotion task and the client's next heartbeat tick stages a fresh MEMORY replica. @@ -45,7 +45,7 @@ ) # Keep the segment small so we can overflow it cheaply in CI. -# 32 MB — small enough that the 96 x 1 MB workload reliably overflows past +# 32 MB - small enough that the 96 x 1 MB workload reliably overflows past # the eviction high watermark even with OffsetAllocator block padding. # Larger values (e.g. 64 MB) sometimes fit the full workload because the # allocator's effective capacity exceeds the nominal segment size. @@ -73,7 +73,7 @@ def get_client(store): device_name, master_server_address, None, # engine - True, # enable_ssd_offload — required for FileStorage / LOCAL_DISK + True, # enable_ssd_offload - required for FileStorage / LOCAL_DISK ) if retcode: raise RuntimeError(f"Failed to setup store client. Return code: {retcode}") @@ -101,6 +101,53 @@ def _replica_types(descs, key): return tags +def _has_memory_replica(descs, key): + return "MEMORY" in _replica_types(descs, key) + + +def _collect_cold_keys(descs, keys): + cold_keys = [] + type_hist = {} + for key in keys: + types = _replica_types(descs, key) + hist_key = ",".join(sorted(set(types))) + type_hist[hist_key] = type_hist.get(hist_key, 0) + 1 + if ( + types + and all("MEMORY" not in t for t in types) + and any("LOCAL_DISK" in t for t in types) + ): + cold_keys.append(key) + return cold_keys, type_hist + + +def _percentiles(samples, ps): + if not samples: + return {p: 0.0 for p in ps} + ordered = sorted(samples) + result = {} + for p in ps: + if len(ordered) == 1: + result[p] = ordered[0] + continue + rank = (len(ordered) - 1) * (p / 100.0) + lo = int(rank) + hi = min(lo + 1, len(ordered) - 1) + frac = rank - lo + result[p] = ordered[lo] + (ordered[hi] - ordered[lo]) * frac + return result + + +def _wait_until_memory_replica(store, key, timeout_seconds, poll_interval_seconds): + deadline = time.time() + timeout_seconds + while time.time() < deadline: + descs = store.batch_get_replica_desc([key]) + if _has_memory_replica(descs, key): + return True + time.sleep(poll_interval_seconds) + return False + + class TestPromotionOnHit(unittest.TestCase): """Python-binding test for the L2->L1 promotion-on-hit behavioral contract.""" @@ -133,13 +180,13 @@ def test_promotion_after_repeated_hits(self): value = os.urandom(VALUE_SIZE) retcode = self.store.put(key, value) # NO_AVAILABLE_HANDLE-style failures (-200) under pressure - # are expected and not fatal — we just need *some* keys to + # are expected and not fatal - we just need *some* keys to # successfully land on LOCAL_DISK. if retcode == 0: reference[key] = value self.assertGreater( - len(reference), 0, "No PUTs succeeded — cannot run promotion test" + len(reference), 0, "No PUTs succeeded - cannot run promotion test" ) # Phase 2: wait long enough for offload heartbeat to flush the @@ -164,14 +211,14 @@ def test_promotion_after_repeated_hits(self): self.assertIsNotNone( cold_key, - "No LOCAL_DISK-only key found after eviction — is " + "No LOCAL_DISK-only key found after eviction - is " "offload_on_evict=true and the segment small enough to " "overflow? master config / SEGMENT_SIZE_BYTES env may need " "tuning. Histogram above shows the actual replica state.", ) # Phase 3: clear the admission threshold via per-key reads. - # ``store.get`` goes through ``Client::Query`` → master's + # ``store.get`` goes through ``Client::Query`` -> master's # ``GetReplicaList``, so each call fires the promotion gate # once; 4 calls comfortably clear any reasonable admission # threshold. We also assert bit-exact bytes back, exercising @@ -189,7 +236,7 @@ def test_promotion_after_repeated_hits(self): f"store.get on LOCAL_DISK-only key {cold_key} returned " f"wrong/empty bytes (got len={len(got) if got else 0}, " f"expected len={len(expected_bytes)}). The LOCAL_DISK " - f"read path is broken — promotion cannot be tested.", + f"read path is broken - promotion cannot be tested.", ) # Phase 4: wait for the master to enqueue the promotion task and @@ -214,7 +261,7 @@ def test_promotion_after_repeated_hits(self): # it does, no offload-RPC call is issued for this read. # # offload_rpc_read_count counts every invocation of - # batch_get_into_offload_object_internal — the single + # batch_get_into_offload_object_internal - the single # chokepoint for LOCAL_DISK reads served via peer offload-RPC. # We snapshot the counter, do a read, then assert the counter # didn't move. Bytes-back alone wouldn't distinguish MEMORY @@ -313,14 +360,14 @@ def test_below_watermark_workload_stays_memory_only(self): @unittest.skipUnless( os.getenv("MC_BENCH_PROMOTION_LATENCY"), - "opt-in benchmark — set MC_BENCH_PROMOTION_LATENCY=1 to run. " + "opt-in benchmark - set MC_BENCH_PROMOTION_LATENCY=1 to run. " "Not part of CI; gives p50/p95/p99 latency comparison of LOCAL_DISK " "reads (pre-promotion) vs MEMORY reads (post-promotion).", ) class BenchPromotionLatency(unittest.TestCase): """Ad-hoc latency benchmark for the L2->L1 promotion feature. - Runs the same overflow → eviction → promote workflow as + Runs the same overflow -> eviction -> promote workflow as ``TestPromotionOnHit`` but: - times every read with ``time.perf_counter``; - aborts the pre-promotion sample loop the moment a read serves @@ -400,7 +447,7 @@ def test_latency_comparison(self): self.assertGreater( len(pre_latencies_ms), 1, - "collected <2 pre-promotion samples — promotion fired " + "collected <2 pre-promotion samples - promotion fired " "before measurement could capture LOCAL_DISK latency. " "Bump MOONCAKE_OFFLOAD_HEARTBEAT_INTERVAL_SECONDS or " "lower LATENCY_SAMPLES.", @@ -476,7 +523,7 @@ def _speedup(a, b): f"post-promotion p50 latency ({post[50]:.2f} ms) is not " f"meaningfully lower than pre-promotion p50 " f"({pre[50]:.2f} ms). Real-hardware speedup is typically " - f"10–50x; failing this 1.3x check means MEMORY likely " + f"10-50x; failing this 1.3x check means MEMORY likely " f"isn't being preferred.", ) finally: @@ -488,5 +535,198 @@ def _speedup(a, b): time.sleep(default_kv_lease_ttl / 1000 + 0.5) +@unittest.skipUnless( + os.getenv("MC_BENCH_PROMOTION_DRAIN"), + "opt-in benchmark - set MC_BENCH_PROMOTION_DRAIN=1 to run. " + "Measures how quickly a batch of LOCAL_DISK-only keys regain MEMORY " + "replicas after promotion admission fires.", +) +class BenchPromotionDrain(unittest.TestCase): + """Promotion backlog-drain benchmark for fair main-vs-PR comparison. + + Assumes the master runs with ``--promotion_admission_threshold=2``: + the replica-descriptor scan used to discover LOCAL_DISK-only keys + provides the first hit, and the timed ``selected_keys`` query provides + the second hit that actually admits promotion. + """ + + @classmethod + def setUpClass(cls): + cls.store = MooncakeDistributedStore() + get_client(cls.store) + + def test_promotion_drain(self): + value_size = int( + os.getenv("PROMOTION_DRAIN_VALUE_SIZE_BYTES", str(1024 * 1024)) + ) + total_keys = int(os.getenv("PROMOTION_DRAIN_TOTAL_KEYS", "128")) + target_cold_keys = int(os.getenv("PROMOTION_DRAIN_TARGET_KEYS", "32")) + poll_interval_seconds = float( + os.getenv("PROMOTION_DRAIN_POLL_INTERVAL_SECONDS", "0.2") + ) + timeout_seconds = float( + os.getenv( + "PROMOTION_DRAIN_TIMEOUT_SECONDS", + str(max(PROMOTION_WAIT_SECONDS * 3, 60)), + ) + ) + expected_admission_threshold = int( + os.getenv("PROMOTION_DRAIN_EXPECT_ADMISSION_THRESHOLD", "2") + ) + post_read_samples = int(os.getenv("PROMOTION_DRAIN_POST_READ_SAMPLES", "64")) + fail_on_timeout = os.getenv("PROMOTION_DRAIN_FAIL_ON_TIMEOUT", "1") != "0" + + timestamp = int(time.time()) + keys = [f"bench_poh_drain_{i}_{timestamp}" for i in range(total_keys)] + reference = {} + + try: + for key in keys: + value = os.urandom(value_size) + if self.store.put(key, value) == 0: + reference[key] = value + + self.assertGreater( + len(reference), 0, "No PUTs succeeded - cannot run drain benchmark" + ) + self.assertEqual( + expected_admission_threshold, + 2, + "BenchPromotionDrain assumes promotion_admission_threshold=2: " + "the cold-key discovery scan contributes hit #1 and the timed " + "selected-keys query contributes hit #2. Use " + "PROMOTION_DRAIN_EXPECT_ADMISSION_THRESHOLD=2 and run the " + "master with --promotion_admission_threshold=2 for fair data.", + ) + + time.sleep(PROMOTION_WAIT_SECONDS) + + descs = self.store.batch_get_replica_desc(list(reference.keys())) + cold_keys, type_hist = _collect_cold_keys(descs, list(reference.keys())) + print(f"replica-type histogram after offload: {type_hist}") + self.assertGreater( + len(cold_keys), + 0, + "No LOCAL_DISK-only key found after eviction/offload", + ) + + selected_keys = cold_keys[: min(target_cold_keys, len(cold_keys))] + self.assertGreaterEqual( + len(selected_keys), + 1, + "Need at least one LOCAL_DISK-only key to measure promotion drain", + ) + + offload_reads_before = self.store.get_offload_rpc_read_count() + admission_t0 = time.perf_counter() + admission_descs = self.store.batch_get_replica_desc(selected_keys) + admission_t1 = time.perf_counter() + print( + "promotion admission batch_get_replica_desc: " + f"keys={len(selected_keys)}, " + f"elapsed_ms={(admission_t1 - admission_t0) * 1000.0:.2f}, " + f"assumed_threshold={expected_admission_threshold}" + ) + + for key in selected_keys: + self.assertIn( + "LOCAL_DISK", + _replica_types(admission_descs, key), + f"selected key {key} lost LOCAL_DISK state before benchmark started", + ) + + start = admission_t0 + promoted_at = {} + poll_count = 0 + while len(promoted_at) < len(selected_keys): + elapsed = time.perf_counter() - start + if elapsed > timeout_seconds: + break + poll_count += 1 + descs_now = self.store.batch_get_replica_desc(selected_keys) + now = time.perf_counter() + for key in selected_keys: + if key in promoted_at: + continue + if _has_memory_replica(descs_now, key): + promoted_at[key] = now - start + if len(promoted_at) == len(selected_keys): + break + time.sleep(poll_interval_seconds) + + offload_reads_after = self.store.get_offload_rpc_read_count() + completed_all = len(promoted_at) == len(selected_keys) + completion_times = sorted(promoted_at.values()) + observed_pcts = _percentiles(completion_times, [50, 95, 100]) + + post_latency_line = "skipped (not all selected keys promoted)" + if completed_all: + post_latencies_ms = [] + sample_key = selected_keys[0] + expected = reference[sample_key] + post_offload_before = self.store.get_offload_rpc_read_count() + for _ in range(post_read_samples): + t0 = time.perf_counter() + got = self.store.get(sample_key) + t1 = time.perf_counter() + self.assertEqual(got, expected) + post_latencies_ms.append((t1 - t0) * 1000.0) + post_offload_after = self.store.get_offload_rpc_read_count() + self.assertEqual( + post_offload_after, + post_offload_before, + "Post-promotion reads still hit offload RPC path; MEMORY replica is not serving reads", + ) + latency_pcts = _percentiles(post_latencies_ms, [50, 95, 99]) + post_latency_line = ( + f"p50={latency_pcts[50]:.2f}, " + f"p95={latency_pcts[95]:.2f}, p99={latency_pcts[99]:.2f}" + ) + + print() + print("=== promotion drain benchmark ===") + print( + f"selected_keys={len(selected_keys)} / cold_keys={len(cold_keys)} / " + f"total_successful_puts={len(reference)} / polls={poll_count} / " + f"promoted_before_timeout={len(promoted_at)}" + ) + if completed_all: + time_to_all = completion_times[-1] + promoted_per_sec = ( + len(selected_keys) / time_to_all + if time_to_all > 0 + else float("inf") + ) + print( + f"time_to_50pct_promoted={observed_pcts[50] * 1000.0:.2f} ms | " + f"time_to_95pct_promoted={observed_pcts[95] * 1000.0:.2f} ms | " + f"time_to_all_promoted={observed_pcts[100] * 1000.0:.2f} ms" + ) + print(f"promoted_keys_per_sec={promoted_per_sec:.2f}") + else: + print( + f"timed_out_before_all_promoted=yes | timeout_seconds={timeout_seconds:.2f} | " + f"observed_time_to_last_promoted={observed_pcts[100] * 1000.0:.2f} ms" + ) + print(f"post_promotion_read_latency_ms: {post_latency_line}") + print( + f"offload_rpc_reads_during_benchmark=" + f"{offload_reads_after - offload_reads_before}" + ) + if fail_on_timeout: + self.assertEqual( + len(promoted_at), + len(selected_keys), + "Timed out waiting for all selected LOCAL_DISK-only keys to regain MEMORY replicas", + ) + finally: + for key in keys: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + if __name__ == "__main__": unittest.main()