diff --git a/docs/source/design/ssd-prefetch.md b/docs/source/design/ssd-prefetch.md new file mode 100644 index 0000000000..f77e0fc8b9 --- /dev/null +++ b/docs/source/design/ssd-prefetch.md @@ -0,0 +1,331 @@ +# SSD Prefetch-on-Exist Design Document + +> Related: RFC #2213 (prefetch-on-exist), PR #2071 (L2→L1 promotion-on-hit) +> Branch: Mooncake (main). Initially implemented on v0.3.11, now synced to main. + + +## 1. Background and Goals + +### 1.1 Problem Origin +In vLLM-Ascend's three-tier KV pooling scenario (HBM / DRAM / SSD), KV cache is evicted (offloaded) to SSD (`LOCAL_DISK` replicas) when DRAM capacity is insufficient. When a request hits a key that exists **only on SSD, not in DRAM**, `get()` must read from SSD, with latency significantly higher than a DRAM hit. + +vLLM request processing is a multi-stage pipeline. There is a time window between `exists()` (cache probing in the scheduling phase) and `get()` (worker actually loading KV) — measured median 15–17s, amplified by scheduling queueing: + +``` +Phase 1: Scheduler Phase 2: Current batch forward Phase 3: Worker loads KV + get_num_new_matched_tokens() GPU/NPU forward (time-consuming) start_load_kv() + └─ batch_is_exist() ◄─probe └─ get() ◄─actual read + │ │ + └────────────── within this window, silently warm SSD→DRAM ────────┘ +``` + +### 1.2 Goals +Leverage the time window from `exists()` to `get()` to **asynchronously, best-effort** prefetch (promote) SSD-only keys to DRAM during the probe phase, so subsequent `get()` hits DRAM and TTFT is reduced. + +### 1.3 Design Constraints +- Prefetch must be an **additional action** of `exists()`; it must not change the semantics of `exists()` or block scheduling. +- Must not pollute existing promotion-on-hit / eviction / metrics behavior. +- Failures are discardable (best-effort); must never affect normal offload / get. +- Must support **PD disaggregation / cross-node**: when node A initiates prefetch and finds a key's `LOCAL_DISK` replica on node B, B should read from its own SSD into its own DRAM ("Option B: cross-node delegation"), rather than moving data to A first. + +## 2. Current Implementation (Adopted) + +### 2.1 Overview: Dedicated Prefetch Path, Reusing Promotion "Execution Substrate", Bypassing Promotion-on-Hit "Admission and Pacing" + +``` +is_exist(keys, ExistOptions{prefetch_to_memory=true}) + └─ triggerSsdPrefetch(keys) → prefetch_pool_ (fixed size 4) + └─ per chunk (128 keys) BatchQueryForPrefetch(chunk) # batch read-only metadata, 1 RPC/chunk + └─ ClassifySsdPrefetchRoute: filter SSD-only (has LOCAL_DISK, no MEMORY) and size>0 + ├─ local node holds LOCAL_DISK: RunLocalPrefetchRegisterAndPromote(chunk_local) # immediate register + PrefetchKeys (pipelined, no wait for later chunks) + └─ remote node holds: collect into remote_keys, after all chunks complete prefetch_offload_object RPC delegates to holder + └─ holder side runLocalPrefetch → RunLocalPrefetchRegisterAndPromote # completed locally on holder +``` + +**Phase 1 batching + pipelining (2026-06)**: Early implementation called `QueryForPrefetch` per key and **waited for the entire batch of metadata queries** before unified `RegisterPrefetchTask` + `PrefetchKeys`. With large batches (e.g., 954 keys), the first register could be ~100ms later than the first `get()`, causing elevated `prefetch_miss_race`. Now changed to: +- **Batching**: `BatchQueryForPrefetch` / `BatchGetReplicaListForPrefetch`, up to 128 keys per chunk, RPC count reduced from O(N) to O(N/128). +- **Pipelining**: After each chunk's metadata query completes, **immediately** register + promote local keys in that chunk; no longer block subsequent chunk registers. +- **Defer reserve**: `PrefetchThrottle::reserve()` moved from the exist synchronous path to **after BatchQuery confirms SSD-only**, avoiding false trigger stats and useless async work for keys already in DRAM. + +`RunLocalPrefetchRegisterAndPromote` encapsulates the local promotion logic of "per-key RegisterPrefetchTask → markInFlight → PrefetchKeys → cooldown", shared by `triggerSsdPrefetch` chunks and `runLocalPrefetch` (RPC delegation entry). + +`PrefetchKeys()` internally reuses Mooncake's existing promotion execution chain: +`PromotionAllocStart` (master allocates PROCESSING MEMORY replica) → `FileStorage::AllocateBatch` (local staging) → `BatchLoad` (SSD read) → `PromotionWrite` (Transfer Engine writes to DRAM) → `NotifyPromotionSuccess` (marks MEMORY replica COMPLETE). + +### 2.2 Key Design Point: Why a "Dedicated Path" Instead of Directly Reusing Promotion-on-Hit + +Prefetch **only reuses the promotion execution substrate** (`promotion_tasks` / `PromotionAllocStart` / `PromotionWrite`), and **does not reuse promotion-on-hit triggering and scheduling**. Dedicated prefetch master RPCs were added: + +| RPC | Purpose | Key: What it does NOT do | +|---|---|---| +| `GetReplicaListForPrefetch(key)` | Read-only replica metadata to determine SSD-only | Does **not** grant lease, record sketch, increment `valid_get_nums` metrics, or enqueue promotion | +| `BatchGetReplicaListForPrefetch(keys)` | Same as above, batch version; client-side `BatchQueryForPrefetch` wrapper | Same; master side currently loops `GetReplicaListForPrefetch` (same pattern as `BatchGetReplicaList`), can be optimized to true batch later | +| `RegisterPrefetchTask(client_id, key)` | Register a `promotion_tasks` entry on master (for `PromotionAllocStart`) | Does **not** go through `TryPushPromotionQueue` admission gate, does **not** push holder's `promotion_objects` heartbeat queue | + +Compared with native `GetReplicaList` (main, master_service.cpp): the native path does `inc_*_cache_hit_nums` / `inc_valid_get_nums` / `GrantLease` / `TryPushPromotionQueue`, while `GetReplicaListForPrefetch` does none of these. + +### 2.3 Throttling: Preventing Prefetch Storms (cooldown + dedup) +vLLM scheduling busy-loop repeatedly probes the same batch of SSD-only blocks for the same request in the waiting queue, causing high-frequency prefetch triggers. Two client-side throttling parameters were introduced (`PrefetchThrottle`): + +| Parameter (mooncake.json, seconds) | Default | Meaning | +|---|---|---| +| `ssd_prefetch_dedup_ttl_sec` | 30 | Same key triggers prefetch at most once within this TTL, suppressing concurrent duplicate probes (**primary effective item**) | +| `ssd_prefetch_cooldown_sec` | 5 | Backoff window after DRAM saturation: skip prefetch within window, let eviction/offload reclaim memory first, avoid competing with promotion for memory | + +> In practice, `dedup_ttl` is the main factor eliminating prefetch storms and resolving offload starvation; `cooldown` was not triggered at this dataset scale (DRAM not full, offload kept up). + +### 2.4 Concurrency Boundary: Fixed Thread Pool +`triggerSsdPrefetch` / `runLocalPrefetch` use a fixed-size `prefetch_pool_` (`mooncake::ThreadPool`, size 4, aligned with `ClientService::task_thread_pool_(4)`), replacing the early pattern of detached `std::thread` per call, **limiting concurrent SSD reads + DRAM allocations**, avoiding massive detached threads contending for the same physical SSD's IOPS/bandwidth and flooding master with DRAM allocation requests. + +`submitPrefetchJob` uniformly enqueues to the pool; when the pool is unavailable (not initialized / shutting down), **directly discard the best-effort task, never fall back to detached `std::thread`** — the latter is exactly the B1 prefetch storm anti-pattern. Gating ensures `triggerSsdPrefetch` only fires when `file_storage_` exists, and `initPrefetchRuntime()` is established in the same `enable_ssd_offload` branch as `file_storage_`, so the pool-unavailable branch is unreachable in normal operation; discard serves only as an invariant fallback. + +#### Thread Usage Guidelines (this feature and all subsequent modifications must follow) +Per Mooncake native design, threads fall into two categories; **never create threads per load (per request / per key)**: + +| Category | Use Case | Correct Approach | Mooncake Native Precedent | +|---|---|---|---| +| A. Fixed-role long-lived daemon loops | eviction, heartbeat, monitor, ipc server, GC and other singleton background loops | One dedicated `std::thread`/`jthread` per role, stored as member, resident for process lifetime (thread count = O(roles), small constant, load-independent) | `master_service.cpp` eviction/monitor/cleanup/dispatch; `client_service.cpp` leader monitor/storage heartbeat/task poll; `file_storage.cpp` heartbeat/GC | +| B. Load-scaling high-frequency short tasks | prefetch promotion, segmented retries, etc., work whose count scales with request volume/key count | Always use fixed-size `ThreadPool::enqueue` to cap concurrency; **must not** spawn bare threads; when pool full/unavailable, discard or queue, do not bypass the cap | `client_service.cpp` `task_thread_pool_(4)` | + +> Rule of thumb: Will thread count grow with request volume/key count? Yes → must use ThreadPool (Category B); No, fixed-role resident loop → use dedicated `std::thread` member (Category A). Prefetch is Category B, hence `prefetch_pool_`. B1's root cause was applying Category A technique (detached thread per probe) to Category B work. + +### 2.5 get()-Side "Wait Once" Mechanism +If `get()` hits an SSD-only key (`LOCAL_DISK` is the best replica), poll within budget waiting for promotion to complete, then select DRAM replica: +- Config `ssd_get_wait_ms` (mooncake.json) or env var `MOONCAKE_SSD_GET_WAIT_MS` (default **10ms**; `0` = disabled). +- Observation and wait logic hooked in **`batch_get_into_multi_buffers_internal`** (vLLM-Ascend actual batch get path), combined output of `[GET-SRC]` + `[PREFETCH-OUTCOME]` logs. +- **TP0 (local, `prefetch_wait_mode=local`)**: When this process's `PrefetchThrottle` has a trigger record, poll throttle completion status + `TryRefreshBestMemoryReplica` then Query master. +- **TP1~7 (master, `prefetch_wait_mode=master`)**: When this process has no trigger (exist triggered on TP0, get on other ranks), still **poll Query master** for SSD-only keys until a COMPLETE MEMORY replica appears or budget exhausted; does not increase exist count. +- After wait, if MEMORY replica appears, use DRAM; otherwise fall back to original SSD read path. +- `ClassifyPrefetchOutcome` output (`[PREFETCH-OUTCOME]`, see analysis script `analyze_prefetch.sh` metric descriptions): + - `prefetch_hit`: promotion completed before get (`done_ms<=get_ms`) and source=DRAM + - `prefetch_promoted_untracked`: RegisterPrefetchTask was called (SSD-only at BatchQuery time), get time already DRAM but done not recorded in throttle + - `prefetch_miss_race`: **prefetch failed/timeout** — this rank triggered (`prefetch_trigger_ms>=0`) or `promote_attempted=true`, get still source=SSD + - `prefetch_evicted_after_exist`: **read SSD after eviction** — this rank has no trigger and `promote_attempted=false`, get still source=SSD (may have been in DRAM at exist time, evicted within window; or TP1~7 master wait timeout) + - `prefetch_dram_was_resident` (script label **exist false trigger**): get reads DRAM, but key was already in DRAM, this rank did **not** RegisterPrefetchTask; incorrectly entered prefetch wait path. **Should be ≈0 after defer reserve** + - `dram_resident`: get reads DRAM, and **never** participated in prefetch (no trigger, no wait, `prefetch_wait_mode=none`) + - `prefetch_failed`: `PrefetchKeys` promotion failed (throttle state `kFailed`) + - ~~`ssd_no_prefetch`~~: merged into `prefetch_evicted_after_exist` (may still appear in old logs) +- `[GET-SRC]` log also includes `prefetch_promote_attempted=0|1` (whether this rank went through RegisterPrefetchTask / markInFlight), for outcome classification and script cross-stats (e.g., eviction sub-items "no register / has register"). +- **SSD prefetch effective DRAM reads** (core analysis script metric) = `prefetch_hit + prefetch_promoted_untracked`, as proportion of total GET entries; `prefetch_dram_was_resident` / `dram_resident` not counted. +- After batching pipelining shortens "exist → first register" latency, `prefetch_hit` proportion within 10ms budget expected to rise. If `exist→get` interval still exceeds `default_kv_lease_ttl`, prefetched keys may still be evicted before get (see 5.3); in that case tune up master's `--default_kv_lease_ttl` rather than prefetch-specific parameters alone. + +### 2.6 Configuration Switches (vLLM-Ascend side mooncake.json) +| Field | Meaning | +|---|---| +| `enable_ssd_offload` | Must be true, otherwise no SSD-only keys, prefetch meaningless | +| `ssd_offload_path` | SSD storage directory (absolute path, must match master `--root_fs_dir`) | +| `enable_ssd_prefetch` | Enable prefetch-on-exist | +| `ssd_prefetch_cooldown_sec` / `ssd_prefetch_dedup_ttl_sec` | Optional; if unset use Mooncake defaults (5s/30s), vLLM-Ascend side sets no defaults | +| `ssd_get_wait_ms` | Max budget (ms) for get-side wait for prefetch completion; default 10, `0` disables | + +### 2.7 Lease Protection After Prefetch Promotion +When `NotifyPromotionSuccess` has `from_prefetch=true`, grant the promoted MEMORY replica the **same** lease as `exist`/`get`: `GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)` (grouped keys use `GrantLeaseForGroup`, logic consistent with `GetReplicaList`). + +| Parameter | Configuration | Default | Meaning | +|---|---|---|---| +| `default_kv_lease_ttl` | master startup arg `--default_kv_lease_ttl` (ms) | 5000 | Hard lease: not evicted for capacity within window | +| `default_kv_soft_pin_ttl` | master startup arg `--default_kv_soft_pin_ttl` (ms) | see master config | soft-pin renewal, consistent with exist/get | + +> **Deprecated**: Early implementation used env var `MOONCAKE_SSD_PREFETCH_PROTECT_SEC` (default 2s soft-pin, **lease=0**) for separate prefetch key protection; now unified to reuse master's `default_kv_lease_ttl` / `default_kv_soft_pin_ttl`, that env var is **no longer read**. + +### 2.8 Hard Lease Tuning and Put Capacity Risk + +**Background**: In early implementation, promoted MEMORY replicas at `NotifyPromotionSuccess` with only `GrantLease(0, soft_pin_ttl)` had hard lease immediately expired; keys within the `exist→get` window (measured median 15–17s) were easily evicted back to SSD by capacity, causing elevated `prefetch_evicted_after_exist`. + +**Change 1 (code, 2026-06) — Prefetch promotion lease aligned with exist/get** + +| Dimension | Before | After | +|---|---|---| +| Trigger point | `NotifyPromotionSuccess`, `from_prefetch=true` | Same | +| Lease | `GrantLease(0, protect_ms)` or equivalent soft-pin-only | `GrantLeaseForGroup` → **`GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)`** | +| vs exist/get | Inconsistent (hard lease=0) | **Consistent with `GetReplicaList` / `BatchExist` lease renewal behavior** | + +Implementation in `master_service.cpp` `NotifyPromotionSuccess` `from_prefetch` branch (§5.3). + +**Change 2 (deployment config) — Tune up master hard lease TTL as needed** + +| Parameter | Default | Recommended tuning scenario | Purpose | +|---|---|---|---| +| `--default_kv_lease_ttl` | 5000 ms | When `exist→get` median approaches or exceeds 5s, increase to **10000 ms** or higher | Cover longer scheduling window, reduce probability of prefetched key evicted before get | +| `--default_kv_soft_pin_ttl` | master default | Consistent with exist/get | soft-pin renewal, paired with Change 1 | + +> **Note**: `GetReplicaListForPrefetch` / `BatchGetReplicaListForPrefetch` **still do not grant lease** (§2.2); the above lease applies only to **prefetch promotion completion** (`NotifyPromotionSuccess`) and subsequent **exist/get path lease renewal**. Prefetch metadata queries themselves do not pin objects. + +**Put Failure Risk (capacity contention, expected side effect)** + +Hard lease + soft-pin extends MEMORY replica eviction exemption. Under **high DRAM watermark** (e.g., `memory_ratio>0.9`), **high-concurrency long context**, and **`offload_on_evict` queue defer** (Master log `[EVICT] No memory freed … deferred for disk offload`), Master may fail to free segment handles in the current round; Client side shows: + +``` +BatchPut failed for N keys due to insufficient space (NO_AVAILABLE_HANDLE) +``` + +| Dimension | Description | +|---|---| +| **Is it abnormal?** | **Capacity contention** under high pressure + large lease, not a functional bug | +| **Affects current request inference?** | **Generally no**: upper-layer `put()` typically only logs error, does not interrupt completed prefill; Put runs asynchronously in background thread | +| **Actual impact** | Failed blocks **not written to Mooncake external pool**, subsequent same-prefix requests may see lower external hit rate, requiring NPU recompute or SSD get | +| **Mitigation** | Increase `global_segment_size`, lower `eviction_high_watermark_ratio`, idle wait between benchmark rounds for offload drain, or trade off shortening `default_kv_lease_ttl` (increases post-eviction SSD read risk) | + +**Relationship with offload commit order (§8 B10)**: lease alignment resolves prefetched key **capacity eviction**; `BatchOffload` commit local index before `NotifyOffloadSuccess` resolves get `INVALID_KEY` from **Master registered LOCAL_DISK but Client index not ready**. The two are orthogonal; both should be merged in production. + +## 3. Why This Design (Decision Rationale) + +### 3.1 Reuse Promotion Execution Substrate +`PromotionAllocStart → PromotionWrite → NotifyPromotionSuccess` is Mooncake's mature SSD→DRAM data path with PROCESSING intermediate-state protection (eviction won't touch it while `is_completed()==false`). Duplicating transport/allocation/state machine is redundant and error-prone. + +### 3.2 Bypass Promotion-on-Hit Triggering and Pacing (Core) +Directly reusing promotion-on-hit queue has 5 problems, hence a dedicated path (detailed in Section 4). + +### 3.3 Cross-Node Delegation (Option B) +KV replicas may be on a remote node's SSD. Have the holder node "in-place" read its own SSD into its own DRAM, avoiding unnecessary cross-node data movement; native `get` already supports fetching from remote DRAM, prefetch only needs to warm data to holder's DRAM. + +## 4. Rejected Wrong Approaches and Reasons + +### Approach X (Wrong): Directly Reusing Promotion-on-Hit Queue to Trigger Prefetch +i.e., `exist → Query() enqueue → loop calling ProcessPromotionTasks()`. It works, but has 5 long-term problems, hence rejected: + +| # | Problem | Description | +|---|---|---| +| 1 | **Admission gate blocks prefetch** | promotion-on-hit has frequency threshold (Count-Min sketch), DRAM watermark, queue limit before enqueue. Prefetch typical scenario is "warm on first exist", but when `promotion_admission_threshold>1` first time never enqueues, prefetch effectively never happens | +| 2 | **`Query` side effects pollute system** | prefetch using `Query()` goes through `GetReplicaList`: grants lease (pins object, disrupts eviction/offload pacing), raises sketch heat, pollutes `valid_get_nums`/cache hit metrics. A "existence-only check" becomes a "pretend Get" | +| 3 | **Execution pacing designed for 'lazy promotion'** | promotion-on-hit emits at most 1 task per heartbeat, default 10s heartbeat (UT waits 25s). Prefetch needs sub-second to few-second completion, heartbeat too slow. Manually looping `ProcessPromotionTasks()` is "driving fast on the slow lane", conflicts with original design and easily races with heartbeat thread on same queue | +| 4 | **Dual path conflict with dedicated `PrefetchKeys`** | If `Query enqueue` and `PrefetchKeys direct call` run simultaneously, same key gets two promotions, triggering `REPLICA_IS_NOT_READY` | +| 5 | **Resource contention** | promotion queue limit / DRAM alloc / in-flight slot globally shared. Large batch prefetch squeezes out real Get-triggered hot-key promotion, or mutual blocking when DRAM tight | + +**How adopted approach avoids:** read-only `GetReplicaListForPrefetch` (fixes 1, 2) + immediately executed `PrefetchKeys` (fixes 3) + `RegisterPrefetchTask` does not push heartbeat queue (fixes 4) + independent RPC does not piggyback admission (fixes 5). + +### Other Rejected Minor Approaches +- **Detached `std::thread` per prefetch trigger**: high-frequency probing spawns massive threads contending for SSD IOPS/bandwidth and flooding DRAM allocation → changed to fixed thread pool. +- **Register only after entire batch metadata query**: large batches cause first register too late, races with get → changed to chunk batch Query + per-chunk pipelined register (see 2.1). + +## 5. Potential Optimization Points + +### 5.3 Lease Protection After Prefetch Promotion (Implemented) + +**Problem (historical)**: New MEMORY replicas from `PutEnd`/normal promote initially get `GrantLease(0, soft_pin_ttl)`, lease immediately expires, only renewed to `default_kv_lease_ttl` on first `get()`. Prefetch path faces same issue after `NotifyPromotionSuccess` completes: combined with `exist→get` interval up to 15s, many prefetched keys evicted back to SSD before use (early benchmarks ~82% of "prefetch triggered but still SSD" were this type). + +**Implementation (2026-06)**: When `from_prefetch=true`, call `GrantLeaseForGroup` in `NotifyPromotionSuccess` (equivalent to `GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)`), aligned with `GetReplicaList` / `BatchExist` lease renewal. Protection duration unified under master's `--default_kv_lease_ttl` / `--default_kv_soft_pin_ttl` (default lease 5s). + +**Tuning recommendation**: If `prefetch_evicted_after_exist` still high and `exist→get` median > `default_kv_lease_ttl`, prioritize tuning up master lease TTL (long context scenarios commonly use **10000 ms**, see §2.8); if interval extremely long (>15s), lease cannot cover full window, need DRAM capacity or scheduling-side optimization. When tuning up lease, note §2.8 **Put failure (NO_AVAILABLE_HANDLE)** risk. + +**Deprecated**: `MOONCAKE_SSD_PREFETCH_PROTECT_SEC` no longer used. + +1. **Weak sharing of in-flight metering between prefetch and native promotion-on-hit** + Both share `promotion_tasks` map, `promotion_in_flight_` count and `promotion_queue_limit_` cap. Shared map brings cross-path dedup (positive benefit), but shared cap means large batch prefetch may squeeze native promotion-on-hit slots (and vice versa). For isolation, prefetch could get independent in-flight count and cap. This sharing is v0.3.11 existing behavior, not introduced by main sync. + +2. **Prefetch path not aware of multi-tenant (tenant_id)** + main introduced `tenant_id` (multi-tenant), but `GetReplicaListForPrefetch` / `RegisterPrefetchTask` currently fixed to `"default"` tenant (`MakeObjectIdentity(key, "default")`). To support multi-tenant prefetch, prefetch RPCs need to pass through `tenant_id`. Currently fully correct for single-tenant/default scenarios. + +3. **Adaptive throttling parameters** + Ideal `ssd_prefetch_cooldown_sec` should tie to "actual time for DRAM to evict from high watermark to target watermark"; currently static default 5s. Could consider adaptive tuning by runtime eviction rate. + +4. **Prefetch hit rate observability** + Benchmark validation uses P-side logs + `analyze_prefetch.sh` (in-repo analysis script) to stat `[GET-SRC]` / `[PREFETCH-OUTCOME]`. Main metrics: + - GET read source: DRAM / SSD count and proportion + - **SSD prefetch effective DRAM reads** = `prefetch_hit + promoted_untracked` + - Sub-items: `prefetch_hit`, `promoted_untracked`, prefetch failed/timeout (`prefetch_miss_race`), read SSD after eviction (`prefetch_evicted_after_exist`, with no register / has register sub-items), exist false trigger (`prefetch_dram_was_resident`), `dram_resident` + - Optional: `SHOW_DETAIL=1` outputs `prefetch_wait_mode` (local / master / none) distribution + +5. **Metadata query and register latency (Phase 1 mitigated, Phase 2 pending)** + Phase 1: `BatchGetReplicaListForPrefetch` reduces client↔master RPC round trips; pipelined register shortens exist→first task registration time. Phase 2: master-side `BatchGetReplicaListForPrefetch` still loops per-key `GetReplicaListForPrefetch`, can change to single batch metadata query for further latency reduction. + +6. **get()-side wait vs lease eviction tradeoff** + `ssd_get_wait_ms` default 10ms; for TP1~7 and other non-exist-trigger ranks, get side waits via **master Query polling** (log `prefetch_wait_mode=master`) for promotion completion. After prefetch completes key has `default_kv_lease_ttl` protection (5.3); if `exist→get` still exceeds lease window, may still be evicted, need tune up master lease or optimize scheduling interval. + +## 6. Main Code Locations (Mooncake) + +| File | Change Highlights | +|---|---| +| `mooncake-store/include/types.h` | Added `CONFIG_KEY_SSD_PREFETCH_*` / `CONFIG_KEY_SSD_GET_WAIT_MS` and defaults | +| `mooncake-store/include/real_client.h` / `src/real_client.cpp` | `PrefetchThrottle` (includes `promote_attempted`, `defer reserve` after BatchQuery), `prefetch_pool_`, `triggerSsdPrefetch` (chunk batch + pipeline), `runLocalPrefetch`, `RunLocalPrefetchRegisterAndPromote`, `ClassifySsdPrefetchRoute`, `TryRefreshBestMemoryReplica`, `ClassifyPrefetchOutcome`; get-wait (local / master dual path) and `[GET-SRC]`/`[PREFETCH-OUTCOME]` logs (includes `prefetch_promote_attempted`) in `batch_get_into_multi_buffers_internal` | +| `mooncake-store/include/master_service.h` / `src/master_service.cpp` | `GetReplicaListForPrefetch` (read-only), `RegisterPrefetchTask` (no heartbeat queue), `NotifyPromotionSuccess` (`GrantLeaseForGroup` when `from_prefetch` aligns exist/get lease), adapted to main tenant scope | +| `mooncake-store/include/master_client.h` / `src/master_client.cpp` | prefetch RPC client; includes `BatchGetReplicaListForPrefetch` | +| `mooncake-store/include/rpc_service.h` / `src/rpc_service.cpp` | `WrappedMasterService` wrapper and RPC handler registration (includes batch prefetch) | +| `mooncake-store/include/client_service.h` / `src/client_service.cpp` | `QueryForPrefetch` / `BatchQueryForPrefetch` / `RegisterPrefetchTask` | +| `mooncake-store/include/file_storage.h` / `src/file_storage.cpp` | `PrefetchKeys()`: wraps full promotion execution chain; `AllocateBatch`/`BatchLoad` use `MakeTenantScopedStorageKey` as staging map key (fixes B8 INVALID_KEY) | +| `mooncake-store/include/storage_backend.h` / `src/storage_backend.cpp` | `BucketStorageBackend::BatchOffload` commits `object_bucket_map_` before `NotifyOffloadSuccess`; on notify failure `RollbackCommittedBucket` + `CleanupOrphanedBucket` (see §8 B10, `ssd-offload.md` write path) | +| `mooncake-store/include/replica.h` / `store_c.*` / `pyclient.h` / `dummy_client.*` | Signature/descriptor/C API/Python binding adaptation (`ExistOptions.prefetch_to_memory`, `prefetch` parameter) | +| `mooncake-integration/store/store_py.cpp` | pybind: `setup` adds `ssd_prefetch_*` params, `is_exist`/`batch_is_exist` prefetch options | + +## 7. Notes for Syncing from v0.3.11 to main (Handled) +- main introduced `tenant_id`; all `setup_real` signatures append `ssd_prefetch_cooldown_sec` / `ssd_prefetch_dedup_ttl_sec` **after `tenant_id`** (with defaults, no impact on existing main call sites). +- master-side prefetch implementation rewritten per main tenant-scoped metadata model: `MetadataAccessor(this, key)` → `MetadataAccessor(this, MakeObjectIdentity(key, "default"))`; `accessor.GetShard()->promotion_tasks` → `accessor.GetTenantState().promotion_tasks`. +- `get_config_size` on main returns `std::optional`; throttling config reads use `.value_or(default)`. +- **Section 4's 5 problems' 'dedicated path' mitigations fully preserved on main** (read-only RPC no lease/sketch/enqueue, `RegisterPrefetchTask` no heartbeat queue, `PrefetchKeys` immediate execution). +- TODO: actual compile verification on Linux/Ascend environment for `Mooncake` (Windows cannot compile). + +## 8. Historical Bug and Fix Records (Troubleshooting Log) + +> This section records real pitfalls encountered during development in "observed symptom → root cause → fix/conclusion" format, for context alignment when switching sessions. Section 4 focuses on "approach selection rationale"; this section focuses on "actual failures and diagnosis". + +### B1. Prefetch Storm → Offload Starvation (Most Severe, Drove Section 2.3/2.4 Throttling and Thread Pool) +- **Symptom**: Benchmark logs flooded with `REPLICA_IS_NOT_READY`, `INVALID_KEY`, `NO_AVAILABLE_HANDLE`, accompanied by `KV load failure`; `mooncake_master` continuously spamming DRAM eviction logs, `memory_ratio` stuck at ~0.81 repeatedly spinning eviction, offload unable to write to SSD. +- **Root cause**: vLLM scheduling busy-loop repeatedly probes `exist` on the **same batch of SSD-only blocks for the same request** in waiting queue; early implementation used detached `std::thread` per probe to initiate prefetch → massive concurrent prefetch seizing **single IO thread / SSD IOPS / DRAM allocation quota**, starving normal offload (DRAM→SSD); DRAM always at high watermark → master repeatedly tries eviction but cannot free space. +- **Fix**: ① `ssd_prefetch_dedup_ttl_sec` (default 30s) prefetch same key at most once per window — **primary effective item**, directly eliminates storm; ② `ssd_prefetch_cooldown_sec` (default 5s) DRAM saturation backoff; ③ detached threads replaced with fixed-size `prefetch_pool_`. +- **Conclusion**: At this dataset scale cooldown actually not triggered (dedup sufficient, DRAM not full). + +### B2. Dual Path Double Promotion → `REPLICA_IS_NOT_READY` +- **Symptom**: Early coexistence of "`Query()` into promotion-on-hit queue" and "`PrefetchKeys` direct call" paths, each doing one promotion on same key, reporting `REPLICA_IS_NOT_READY`. +- **Root cause/fix**: See Section 4 Approach X problem #4. Finally only dedicated path retained: `RegisterPrefetchTask` **does not push holder heartbeat `promotion_objects` queue**, avoiding duplicate promotion with direct call. + +### B3. `setup_real` Abstract Class Compile Error +- **Symptom**: Compile error `invalid new-expression of abstract class RealClient` (cannot new abstract class). +- **Root cause**: When appending `ssd_prefetch_cooldown_sec`/`ssd_prefetch_dedup_ttl_sec` params to `setup`, only partial declarations changed, causing `setup` pure virtual signatures inconsistent between `real_client.h`, `pyclient.h`, `dummy_client.h` → `RealClient` did not fully override base pure virtuals, still abstract. +- **Fix**: Keep all three (base declaration / `RealClient` / `DummyClient` / pybind) `setup` signatures **fully consistent**, new params uniformly appended after `tenant_id` with defaults. Any future new param passthrough must sync full chain signatures. + +### B4. `QueryResult` const Member → Deleted Move Assignment (get-wait Re-check Path) +- **Symptom**: Implementing get()-side "wait once then re-check DRAM replica" compile failed, `tl::expected` move assignment deleted (because `QueryResult` has const members, cannot reassign to existing variable). +- **Fix**: Use `std::optional` to hold re-check result (`.emplace()` not assignment), bypassing non-movable assignment limit. Used in Section 2.5 get-wait **master polling re-check** path (`prefetch_wait_mode=master`). + +### B5. get Source/Wait Log Instrumentation in Wrong Function +- **Symptom**: "GET-SRC / wait" logs added in `get_buffer_internal` never printed in some integration cases. +- **Root cause**: vLLM-Ascend actually calls `batch_get_into_multi_buffers_internal` (batch multi-buffer path), not `get_buffer_internal`. +- **Conclusion/fix**: Logs and get-wait logic must hook in `batch_get_into_multi_buffers_internal`. This is the correct observation point for "get uses DRAM or SSD" and "whether in-flight prefetch was hit" — also the data source function for conclusions like "exist→get ~15s, SSD→DRAM ~2ms, ~82% evicted after prefetch". + +### B6. `LEASE_EXPIRED` (Known Item, Not Introduced by This Feature) +- **Symptom**: Large batch request logs show `LEASE_EXPIRED` (e.g., several keys on multiple TP ranks batch transfer timeout). +- **Root cause**: Client lease default ~5s; when one query→transfer batch is very large and total time exceeds lease validity, object lease expires before transfer completes. More likely under PD disaggregation when P pulls DRAM remotely from D. Client lease timing issue, no direct causality with prefetch dedicated path (dedicated read-only RPC **does not grant lease**). +- **Disposition**: Recorded as known item; related but distinct from Section 5.3 master-side capacity eviction (lease protection) — former is client fetch lease timeout, latter is master-side DRAM capacity eviction. + +### B7. Serial Metadata Query + exist Synchronous reserve → Elevated `prefetch_miss_race` / `prefetch_dram_was_resident` +- **Symptom (before batching / defer reserve)**: First `registered task` ~**143ms** later than first `[GET-SRC]`; `prefetch_hit` ~40%; `prefetch_miss_race` ~58% (includes much inflated); after correction ~**25%** of GETs were `prefetch_dram_was_resident` (exist false trigger). +- **Root cause 1 (original B7)**: Early `triggerSsdPrefetch` called `QueryForPrefetch` per key and **waited for all keys queried** before unified register; large batch metadata phase long, within get-side 10ms budget master had not registered task. +- **Root cause 2 (defer reserve)**: `PrefetchThrottle::reserve()` used to run on exist synchronous path, recording trigger for **keys already in DRAM**; get reads DRAM but incorrectly counted in prefetch stats (`prefetch_dram_was_resident`). exist only knows "exists", not replica tier; must wait for BatchQuery filter. +- **Fix (Phase 1)**: `BatchQueryForPrefetch` (128 key/chunk) + immediately `RunLocalPrefetchRegisterAndPromote` after each chunk (pipelined register); **reserve moved to after BatchQuery confirms SSD-only**. +- **Verification (after batching + defer reserve + outcome split)**: `prefetch_dram_was_resident`≈**0**; `prefetch_hit` **86.4%**; `prefetch_miss_race` **0.7%**; `prefetch_evicted_after_exist` **10.0%**; exist→first register ~**83ms** (shortened from 143ms pre-fix). + +### B8. `PrefetchKeys` Staging Key Mismatch → `INVALID_KEY` +- **Symptom**: Prefetch path occasionally `INVALID_KEY` / staging slice missing. +- **Root cause**: In `PrefetchKeys`, `AllocateBatch`/`BatchLoad` map keys must be tenant-scoped **storage key** (`MakeTenantScopedStorageKey`); when inconsistent with logical object key, slice not found. +- **Fix**: `PrefetchKeys` uses `storage_key` for staging, logical key still used for master promotion RPC. + +### B9. `analyze_prefetch.sh` Undercounting `prefetch_hit` (grep Order) +- **Symptom**: Analysis script once showed `prefetch_hit_master/local=0`, inconsistent with manual log inspection. +- **Root cause**: Script used one-direction grep (requiring `prefetch_wait_mode=master` before `outcome=prefetch_hit`), but actual log field order is opposite. +- **Fix**: Changed to bidirectional grep (`| outcome=prefetch_hit.*prefetch_wait_mode=master`). + +### B10. `BatchOffload` NotifyOffloadSuccess Before commit Local Index → get Path `INVALID_KEY` +- **Symptom**: Under high-concurrency offload + get/prefetch concurrency, massive `Failed to get key` (`res: -400` = `INVALID_KEY`); `BatchLoad` / `SSD read failed` coexisting with prefetch `BatchLoad failed`; Master `SSD Storage` far from full, `EvictDiskReplica=0`; some request-level load failures. +- **Root cause**: Mooncake `BucketStorageBackend::BatchOffload` after **successful disk write** first calls `complete_handler` (internally `NotifyOffloadSuccess`, Master registers `LOCAL_DISK` and can evict MEMORY), **then** commits Client-side `object_bucket_map_`. + Concurrent get/prefetch within this window takes SSD read path, but local index lacks that storage key → `Key not found` → `INVALID_KEY`. Unrelated to SSD capacity; **Master visibility ahead of Client local index** consistency issue (`publish-before-commit`). +- **Wrong order (pre-fix)**: + ``` + WriteBucket → NotifyOffloadSuccess → [race window] → commit object_bucket_map_ + ``` +- **Fix (2026-06, `storage_backend.cpp`)**: + ``` + WriteBucket → commit object_bucket_map_ → NotifyOffloadSuccess + ``` + If `NotifyOffloadSuccess` fails: `RollbackCommittedBucket` rolls back local index + `CleanupOrphanedBucket` deletes orphan files, avoiding "Client can read, Master doesn't know" or reverse ghost replica. +- **Distinction from B8**: B8 is PrefetchKeys **staging key name** error; B10 is offload **commit order** causing get/offload read path index missing. B8 fix does not eliminate B10. +- **Relationship with §2.8 lease change**: lease alignment resolves prefetched key **eviction**; B10 resolves **index not ready** causing `INVALID_KEY`; orthogonal, both need to be merged. + +### Key Measured Data (Numbers Driving Above Decisions, for Alignment) +- `exist→get` interval median **15–17s** (amplified by scheduling queueing). +- SSD→DRAM single prefetch **~2ms (p90 2.65ms)**. +- ~**82%** of "prefetch triggered but still SSD" (before lease alignment) because promoted MEMORY replica had lease=0, evicted back to SSD by capacity before `get()`; after §5.3 implementation prefetch path aligned lease with exist/get. +- Throttling: `dedup_ttl` main factor eliminating storm, `cooldown` not triggered; prefetch bandwidth not saturated. +- Before batching / defer reserve: metadata phase caused first register **~143ms** later than first get; `prefetch_hit` ~40% / effective DRAM reads ~64%; exist false trigger ~25%. +- After batching + defer reserve + outcome split: `prefetch_hit` **86.4%**; exist false trigger **0**; `prefetch_miss_race` **0.7%**; `prefetch_evicted_after_exist` **10.0%** (main cause TP1~7 master wait timeout / insufficient lease window, see §5.3, §2.8). +- After B10 fix: get path `INVALID_KEY` zeroed; under high pressure tail may still see `BatchPut insufficient space` (§2.8 Put risk), generally does not block single inference success. +- `ssd_get_wait_ms` default **10ms** (get-side wait enabled by default); `0` disables. diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 4a55a2e11e..8dd0d88923 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -1759,6 +1759,15 @@ PYBIND11_MODULE(store, m) { return oss.str(); }); + py::class_(m, "ExistOptions") + .def(py::init<>()) + .def_readwrite("prefetch_to_memory", &ExistOptions::prefetch_to_memory) + .def("__str__", [](const ExistOptions &options) { + std::ostringstream oss; + oss << options; + return oss.str(); + }); + py::enum_(m, "ReplicaStatus") .value("UNDEFINED", ReplicaStatus::UNDEFINED) .value("INITIALIZED", ReplicaStatus::INITIALIZED) @@ -2028,7 +2037,11 @@ PYBIND11_MODULE(store, m) { const py::object &engine = py::none(), bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") { + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) { auto real_client = self.init_real_client(); std::shared_ptr transfer_engine = nullptr; @@ -2040,14 +2053,19 @@ PYBIND11_MODULE(store, m) { local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, transfer_engine, "", enable_ssd_offload, - ssd_offload_path, tenant_id); + ssd_offload_path, tenant_id, ssd_prefetch_cooldown_sec, + ssd_prefetch_dedup_ttl_sec); }, py::arg("local_hostname"), py::arg("metadata_server"), py::arg("global_segment_size"), py::arg("local_buffer_size"), py::arg("protocol"), py::arg("rdma_devices"), py::arg("master_server_addr"), py::arg("engine") = py::none(), py::arg("enable_ssd_offload") = false, - py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default") + py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default", + py::arg("ssd_prefetch_cooldown_sec") = + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + py::arg("ssd_prefetch_dedup_ttl_sec") = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) .def( "setup", [](MooncakeStorePyWrapper &self, const py::dict &config_dict) { @@ -2078,7 +2096,14 @@ PYBIND11_MODULE(store, m) { " ipc_socket_path: IPC socket path.\n" " enable_ssd_offload: Enable SSD offload (default false).\n" " ssd_offload_path: SSD storage directory path (overrides env " - "var).") + "var).\n" + " ssd_prefetch_cooldown_sec: SSD prefetch memory-pressure backoff " + "in seconds (default 5, 0 disables).\n" + " ssd_prefetch_dedup_ttl_sec: SSD prefetch per-key dedup/" + "rate-limit TTL in seconds (default 30, 0 disables).\n" + " ssd_get_wait_ms: get-side prefetch wait budget in milliseconds " + "(default 10, poll 1ms with early exit; 0 disables). Env " + "MOONCAKE_SSD_GET_WAIT_MS overrides this.") .def( "setup_dummy", [](MooncakeStorePyWrapper &self, size_t mem_pool_size, @@ -2184,20 +2209,26 @@ PYBIND11_MODULE(store, m) { py::arg("keys"), py::arg("force") = false, "Batch remove objects by keys. Returns a list of status codes " "(0=success, negative=error code) for each key.") - .def("is_exist", - [](MooncakeStorePyWrapper &self, const std::string &key) { - py::gil_scoped_release release; - return self.store_->isExist(key); - }) + .def( + "is_exist", + [](MooncakeStorePyWrapper &self, const std::string &key, + const ExistOptions &options) { + py::gil_scoped_release release; + return self.store_->isExist(key, options); + }, + py::arg("key"), py::arg("options") = ExistOptions{}) .def( "batch_is_exist", [](MooncakeStorePyWrapper &self, - const std::vector &keys) { + const std::vector &keys, + const ExistOptions &options) { py::gil_scoped_release release; - return self.store_->batchIsExist(keys); + return self.store_->batchIsExist(keys, options); }, - py::arg("keys"), - "Check if multiple objects exist. Returns list of results: 1 if " + py::arg("keys"), py::arg("options") = ExistOptions{}, + "Check if multiple objects exist. When options.prefetch_to_memory " + "is True, triggers SSD-to-DRAM promotion for keys that only have " + "LOCAL_DISK replicas. Returns list of results: 1 if " "exists, 0 if not exists, -1 if error") .def("close", [](MooncakeStorePyWrapper &self) { diff --git a/mooncake-store/include/client_service.h b/mooncake-store/include/client_service.h index 0094f4f14d..1702137fc5 100644 --- a/mooncake-store/include/client_service.h +++ b/mooncake-store/include/client_service.h @@ -127,6 +127,23 @@ class Client { */ tl::expected Query(const std::string& object_key); + /** + * @brief Read-only replica metadata for SSD prefetch (no lease / no + * promotion-on-hit side effects). + */ + tl::expected QueryForPrefetch( + const std::string& object_key); + + std::vector> BatchQueryForPrefetch( + const std::vector& object_keys); + + /** + * @brief Register a master-side promotion task for SSD prefetch without + * using the promotion-on-hit heartbeat queue. + */ + tl::expected RegisterPrefetchTask( + const std::string& object_key); + /** * @brief Queries replica lists for object keys that match a regex pattern. * @param str The regular expression string to match against object keys. diff --git a/mooncake-store/include/dummy_client.h b/mooncake-store/include/dummy_client.h index d538708828..f48b40b22c 100644 --- a/mooncake-store/include/dummy_client.h +++ b/mooncake-store/include/dummy_client.h @@ -21,16 +21,18 @@ class DummyClient : public PyClient { int64_t unregister_shm(); - int setup_real(const std::string &local_hostname, - const std::string &metadata_server, - size_t global_segment_size, size_t local_buffer_size, - const std::string &protocol, const std::string &rdma_devices, - const std::string &master_server_addr, - const std::shared_ptr &transfer_engine, - const std::string &ipc_socket_path, - bool enable_ssd_offload = false, - const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") { + int setup_real( + const std::string &local_hostname, const std::string &metadata_server, + size_t global_segment_size, size_t local_buffer_size, + const std::string &protocol, const std::string &rdma_devices, + const std::string &master_server_addr, + const std::shared_ptr &transfer_engine, + const std::string &ipc_socket_path, bool enable_ssd_offload = false, + const std::string &ssd_offload_path = "", + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) { // Dummy client does not support real setup return -1; }; @@ -147,9 +149,11 @@ class DummyClient : public PyClient { std::vector batchRemove(const std::vector &keys, bool force = false); - int isExist(const std::string &key); + int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}); - std::vector batchIsExist(const std::vector &keys); + std::vector batchIsExist(const std::vector &keys, + const ExistOptions &options = ExistOptions{}); int64_t getSize(const std::string &key); diff --git a/mooncake-store/include/file_storage.h b/mooncake-store/include/file_storage.h index 6aa2ac3d56..e193a8ce12 100644 --- a/mooncake-store/include/file_storage.h +++ b/mooncake-store/include/file_storage.h @@ -1,10 +1,14 @@ #pragma once +#include + #include "client_service.h" #include "client_buffer.hpp" #include "storage_backend.h" #include "pinned_buffer_pool.h" +#include + namespace mooncake { struct SsdMetric; @@ -41,6 +45,30 @@ class FileStorage { FileStorageConfig config_; + /** + * @brief Promote a set of keys from local SSD to DRAM. + * + * For each key the method allocates a staging buffer, reads the data from + * the local SSD backend, transfers it to a freshly-allocated MEMORY + * replica via Transfer Engine, and commits the result on the master. + * Failures are logged per-key but do not propagate; this is best-effort. + * + * @param keys Keys to promote (must already have LOCAL_DISK replicas) + * @param sizes Corresponding object sizes in bytes + * @param dram_pressure Optional out-flag. Set to true if any key failed to + * promote because DRAM was saturated (NO_AVAILABLE_HANDLE), so the + * caller can back off prefetch and let eviction/offload reclaim + * memory. Never reset to false; caller initializes it. + * @return void on success, ErrorCode on critical (pre-loop) failure + */ + using PrefetchKeyCallback = + std::function; + + tl::expected PrefetchKeys( + const std::vector& keys, const std::vector& sizes, + bool* dram_pressure = nullptr, + PrefetchKeyCallback on_key_done = nullptr); + /** * @brief Releases buffer associated with a specific batch_id. * Called by remote client after transfer completion. diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h index cad4159225..0046356780 100644 --- a/mooncake-store/include/master_client.h +++ b/mooncake-store/include/master_client.h @@ -151,6 +151,23 @@ class MasterClient { [[nodiscard]] tl::expected GetReplicaList(const std::string& object_key, const std::string& tenant_id); + /** + * @brief Read-only replica metadata query for best-effort SSD prefetch. + * + * Unlike GetReplicaList, this does NOT grant a lease, update the access + * sketch, bump cache-hit / valid-get counters, or enqueue a promotion. It + * is purely used to decide whether a key is SSD-only and worth prefetching, + * so probing must not perturb eviction/offload pacing or metrics. + * @param object_key Key to query. + * @return Replica list on success, or an ErrorCode on failure. + */ + [[nodiscard]] tl::expected + GetReplicaListForPrefetch(const std::string& object_key); + + [[nodiscard]] std::vector> + BatchGetReplicaListForPrefetch( + const std::vector& object_keys); + /** * @brief Retrieves replica lists for object keys that match a regex * pattern. @@ -433,6 +450,21 @@ class MasterClient { const UUID& client_id, const std::vector& tasks, const std::vector& metadatas); + /** + * @brief Registers a prefetch (SSD->DRAM promotion) task on the master. + * + * Creates a promotion_tasks entry consumed by PromotionAllocStart, but + * deliberately does NOT go through the promotion admission gate + * (TryPushPromotionQueue) and does NOT push to the holder's + * promotion_objects heartbeat queue. This keeps the dedicated prefetch path + * separate from promotion-on-hit and avoids double-promoting the same key. + * @param client_id The UUID of the client requesting the prefetch. + * @param key The object key to promote from SSD to DRAM. + * @return An empty expected on success, or an ErrorCode on failure. + */ + [[nodiscard]] tl::expected RegisterPrefetchTask( + const UUID& client_id, const std::string& key); + /** * @brief Heartbeat-driven pull of pending L2->L1 promotion work for a * client. Returns tenant-scoped tasks the caller should read from local diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index ff9c7e5637..de14e442ea 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -298,6 +298,26 @@ class MasterService { auto GetReplicaList(const std::string& key, const std::string& tenant_id) -> tl::expected; + /** + * @brief Read-only replica metadata for SSD prefetch. + * + * Unlike GetReplicaList, does not grant a lease, bump access metrics, or + * enqueue promotion-on-hit work. + */ + auto GetReplicaListForPrefetch(const std::string& key) + -> tl::expected; + + /** + * @brief Register an in-flight promotion task for SSD prefetch. + * + * Records a PromotionTask on the master without the promotion-on-hit + * frequency/watermark gates and without pushing onto the holder client's + * promotion_objects heartbeat queue. The caller must execute the transfer + * via FileStorage::PrefetchKeys (PromotionAllocStart path). + */ + auto RegisterPrefetchTask(const UUID& client_id, const std::string& key) + -> tl::expected; + /** * @brief Start a put operation for an object * @param[out] replica_list Vector to store replica information for the @@ -1137,6 +1157,8 @@ class MasterService { uint64_t object_size; std::chrono::system_clock::time_point start_time; UUID holder_id; // owner of source LOCAL_DISK; only Notifier allowed + bool from_prefetch{ + false}; // set by RegisterPrefetchTask; enables protect lease }; static constexpr size_t kNumShards = 1024; // Number of metadata shards diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index c2e2201c5d..640ea982fc 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -163,6 +163,21 @@ class ClientRequester { void release_offload_buffer(const std::string &client_addr, uint64_t batch_id); + /** + * @brief Asks a remote holder node to prefetch (promote SSD->DRAM) the + * given keys. Used for cross-node SSD prefetch: the holder must register + * the promotion task itself (Master validates holder_id == client_id), so + * the requester delegates via this RPC instead of calling + * RegisterPrefetchTask locally. Best-effort: errors are logged, not + * propagated, and never block the originating exist() call. + * @param client_addr Network address of the holder's offload RPC service. + * @param keys SSD-only keys held by the remote node. + * @param sizes Object sizes (bytes), index-aligned with keys. + */ + void prefetch_offload_object(const std::string &client_addr, + const std::vector &keys, + const std::vector &sizes); + private: /** * @brief A batch of allocated memory buffers, tracking both handles and @@ -225,7 +240,10 @@ class PyClient { const std::shared_ptr &transfer_engine, const std::string &ipc_socket_path, bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") = 0; + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) = 0; virtual int setup_dummy(size_t mem_pool_size, size_t local_buffer_size, const std::string &server_address, @@ -338,10 +356,12 @@ class PyClient { virtual std::vector batchRemove(const std::vector &keys, bool force = false) = 0; - virtual int isExist(const std::string &key) = 0; + virtual int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}) = 0; virtual std::vector batchIsExist( - const std::vector &keys) = 0; + const std::vector &keys, + const ExistOptions &options = ExistOptions{}) = 0; virtual int64_t getSize(const std::string &key) = 0; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 0678964497..7ea7a278f0 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -18,6 +19,7 @@ #include "mutex.h" #include "utils.h" #include "rpc_types.h" +#include "thread_pool.h" #include #include #include @@ -65,6 +67,254 @@ class ResourceTracker { std::jthread signal_thread_{}; // joins on destruction }; +// Throttle state for best-effort SSD prefetch. Two mechanisms: +// 1. Memory-pressure cooldown: when promotion fails because DRAM is +// saturated (NO_AVAILABLE_HANDLE), open a short cooldown window during +// which prefetch is a no-op, so prefetch (which *adds* to DRAM) stops +// competing with eviction/offload (which *frees* DRAM) on the holder. +// 2. Per-key in-flight dedup / rate-limit: suppress duplicate prefetch +// triggers for the same key from concurrent exist/get probes within a +// TTL window, cutting the RPC/thread storm that starves offload. +// Held via shared_ptr so detached prefetch threads can use it safely without +// capturing a raw RealClient* that may outlive the work. +class PrefetchThrottle { + public: + enum class State : uint8_t { + kTriggered = 0, + kInFlight = 1, + kCompleted = 2, + kFailed = 3, + kAlreadyResident = 4, + }; + + struct Entry { + int64_t trigger_ms{-1}; + int64_t completed_ms{-1}; + State state{State::kTriggered}; + // Set when RegisterPrefetchTask + PrefetchKeys path runs for this key. + bool promote_attempted{false}; + }; + + static int64_t NowMs() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } + + // Reconfigure from mooncake.json (values come in as seconds, stored as ms). + // Non-positive values keep the current setting. A 0 cooldown disables the + // memory-pressure backoff; a 0 dedup TTL disables per-key rate-limiting. + void configure(int64_t cooldown_sec, int64_t dedup_ttl_sec) { + if (cooldown_sec >= 0) { + cooldown_ms_.store(cooldown_sec * 1000, std::memory_order_relaxed); + } + if (dedup_ttl_sec >= 0) { + dedup_ttl_ms_.store(dedup_ttl_sec * 1000, + std::memory_order_relaxed); + } + } + + bool inCooldown() const { + return NowMs() < cooldown_until_ms_.load(std::memory_order_relaxed); + } + + // Open the memory-pressure backoff window. No-op if cooldown is disabled. + void enterCooldown() { + const int64_t cooldown_ms = + cooldown_ms_.load(std::memory_order_relaxed); + if (cooldown_ms <= 0) { + return; + } + cooldown_until_ms_.store(NowMs() + cooldown_ms, + std::memory_order_relaxed); + } + + // Returns the subset of keys not seen within the dedup TTL, registering + // them as triggered synchronously (before the async prefetch job runs). + std::vector reserve(const std::vector &keys) { + const int64_t ttl_ms = dedup_ttl_ms_.load(std::memory_order_relaxed); + const int64_t now = NowMs(); + std::vector out; + out.reserve(keys.size()); + std::lock_guard lock(mutex_); + if (ttl_ms <= 0) { + for (const auto &key : keys) { + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = -1, + .state = State::kTriggered}; + out.push_back(key); + } + return out; + } + for (auto it = entries_.begin(); it != entries_.end();) { + const int64_t last_ms = it->second.completed_ms >= 0 + ? it->second.completed_ms + : it->second.trigger_ms; + if (now - last_ms > ttl_ms) { + it = entries_.erase(it); + } else { + ++it; + } + } + for (const auto &key : keys) { + if (entries_.find(key) != entries_.end()) { + continue; + } + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = -1, + .state = State::kTriggered}; + out.push_back(key); + } + return out; + } + + void markInFlight(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kInFlight; + it->second.promote_attempted = true; + } + + void markCompleted(const std::string &key) { + const int64_t now = NowMs(); + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = now, + .state = State::kCompleted}; + return; + } + it->second.state = State::kCompleted; + it->second.completed_ms = now; + } + + void markFailed(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kFailed; + } + + // Async prefetch decided the key is not SSD-only (e.g. MEMORY already + // present). Clears in-flight semantics without treating as promotion done. + void markAlreadyResident(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kAlreadyResident; + it->second.promote_attempted = false; + } + + bool promoteAttempted(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return false; + } + return it->second.promote_attempted; + } + + int64_t triggeredAt(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return -1; + } + return it->second.trigger_ms; + } + + int64_t completedAt(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end() || it->second.state != State::kCompleted) { + return -1; + } + return it->second.completed_ms; + } + + State stateOf(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return State::kFailed; + } + return it->second.state; + } + + // Poll until promotion completes or the budget expires. Returns true only + // when the key reaches kCompleted before the deadline. + bool waitForCompletion(const std::string &key, int64_t max_wait_ms, + int64_t poll_ms = 1) const { + if (max_wait_ms <= 0) { + return false; + } + const int64_t deadline = NowMs() + max_wait_ms; + const int64_t step_ms = std::max(poll_ms, 1); + while (NowMs() < deadline) { + { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return false; + } + if (it->second.state == State::kCompleted) { + return true; + } + if (it->second.state == State::kFailed) { + return false; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(step_ms)); + } + return stateOf(key) == State::kCompleted; + } + + private: + std::atomic cooldown_until_ms_{0}; + std::atomic cooldown_ms_{DEFAULT_SSD_PREFETCH_COOLDOWN_SEC * 1000}; + std::atomic dedup_ttl_ms_{DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC * + 1000}; + mutable std::mutex mutex_; + std::unordered_map entries_; +}; + +// Replica tier selected for get() after optional prefetch wait ([GET-SRC] source=). +enum class PrefetchReplicaSource : uint8_t { + kDram, + kSsd, + kDisk, + kUnknown, +}; + +// Get-side prefetch observation outcome ([PREFETCH-OUTCOME] outcome=). +enum class PrefetchOutcome : uint8_t { + kDramResident, + kPrefetchEvictedAfterExist, + kPrefetchFailed, + kPrefetchHit, + kPrefetchDramWasResident, + kPrefetchPromotedUntracked, + kPrefetchMissRace, +}; + +PrefetchReplicaSource PrefetchReplicaSourceFromDescriptor( + const Replica::Descriptor& replica); +const char* PrefetchReplicaSourceToString(PrefetchReplicaSource source); + +PrefetchOutcome ClassifyPrefetchOutcome( + int64_t prefetch_trigger_ms, int64_t prefetch_done_ms, int64_t get_ms, + PrefetchReplicaSource source, PrefetchThrottle::State prefetch_state, + bool prefetch_wait_attempted, bool promote_attempted); +const char* PrefetchOutcomeToString(PrefetchOutcome outcome); + class RealClient : public PyClient { public: RealClient(); @@ -84,7 +334,10 @@ class RealClient : public PyClient { const std::string &ipc_socket_path = "", bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default"); + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC); int setup_dummy(size_t mem_pool_size, size_t local_buffer_size, const std::string &server_address, @@ -297,17 +550,21 @@ class RealClient : public PyClient { /** * @brief Check if an object exists * @param key Key to check + * @param options Optional exist behavior (e.g. prefetch_to_memory) * @return 1 if exists, 0 if not exists, -1 if error */ - int isExist(const std::string &key); + int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}); /** * @brief Check if multiple objects exist * @param keys Vector of keys to check + * @param options Optional exist behavior (e.g. prefetch_to_memory) * @return Vector of existence results: 1 if exists, 0 if not exists, -1 if * error */ - std::vector batchIsExist(const std::vector &keys); + std::vector batchIsExist(const std::vector &keys, + const ExistOptions &options = ExistOptions{}); /** * @brief Get the size of an object @@ -679,6 +936,23 @@ class RealClient : public PyClient { batch_get_offload_object(const std::vector &keys, const std::vector &sizes); + /** + * @brief Holder-side RPC handler for cross-node SSD prefetch. + * + * Invoked by a remote requester when the LOCAL_DISK replica of a key is + * held by THIS node. Runs the same promotion as the local prefetch path: + * RegisterPrefetchTask (with this node's own client_id, so Master's + * holder_id == client_id check passes) + FileStorage::PrefetchKeys to + * stage the SSD object into DRAM. Best-effort and fire-and-forget: the + * actual SSD I/O runs on a detached thread and this returns immediately. + * @param keys SSD-only keys held by this node to promote into DRAM. + * @param sizes Object sizes (bytes) captured by the requester from + * Master metadata; index-aligned with keys. + * @return true if the prefetch work was accepted/scheduled. + */ + bool prefetch_offload_object(const std::vector &keys, + const std::vector &sizes); + /** * @brief Releases buffer associated with a specific batch_id. * Called by remote client after transfer completion. @@ -902,6 +1176,52 @@ class RealClient : public PyClient { tl::expected setup_ascend_internal( size_t local_buffer_size); + void triggerSsdPrefetch(const std::vector &keys); + + // Read env-tunables and build prefetch_pool_. Called once from + // setup_internal when SSD offload is enabled. + void initPrefetchRuntime(); + + // Submit a best-effort prefetch job to the bounded prefetch_pool_ so + // high-frequency probes don't explode into unbounded detached threads. + // Falls back to a detached std::thread only if the pool is unavailable + // (e.g. enqueue after shutdown). + void submitPrefetchJob(std::function job); + + // Bounded worker pool for SSD prefetch promotion jobs. Fixed size (see + // initPrefetchRuntime), consistent with ClientService's task pool; bounds + // concurrent SSD reads / DRAM allocations and avoids unbounded detached + // threads. + std::shared_ptr prefetch_pool_; + + // get()-side wait-for-prefetch. When a get selects a LOCAL_DISK (SSD) + // replica but prefetch for the same key is in flight, poll every 1 ms + // (early exit on completion) up to this budget. Env MOONCAKE_SSD_GET_WAIT_MS + // overrides mooncake.json ssd_get_wait_ms. 0 disables waiting. + int64_t ssd_get_wait_ms_{DEFAULT_SSD_GET_WAIT_MS}; + int64_t ssd_get_wait_ms_config_{DEFAULT_SSD_GET_WAIT_MS}; + + // Best-effort SSD prefetch throttle (memory-pressure cooldown + per-key + // in-flight dedup). Shared with detached prefetch threads via shared_ptr. + // Tunable via mooncake.json (ssd_prefetch_cooldown_sec / + // ssd_prefetch_dedup_window_sec); see setup_internal(ConfigDict). + std::shared_ptr prefetch_throttle_ = + std::make_shared(); + + /** + * @brief Run prefetch promotion for keys whose LOCAL_DISK replica is held + * by THIS node. Registers a promotion task per key (this node is the + * holder) and stages the objects from SSD into DRAM via prefetch_pool_. + * Called from prefetch_offload_object (remote holder RPC). Best-effort; + * runs asynchronously. Register + PrefetchKeys execution is delegated to + * RunLocalPrefetchRegisterAndPromote (same as triggerSsdPrefetch local + * branch). + * @param keys Keys to promote (LOCAL_DISK replica held by this node). + * @param sizes Object sizes (bytes), index-aligned with keys. + */ + void runLocalPrefetch(const std::vector &keys, + const std::vector &sizes); + private: std::unordered_map mounted_segment_records_; diff --git a/mooncake-store/include/replica.h b/mooncake-store/include/replica.h index 677e77b333..681a4b6d6c 100644 --- a/mooncake-store/include/replica.h +++ b/mooncake-store/include/replica.h @@ -143,6 +143,22 @@ struct ReplicateConfig { } }; +/** + * @brief Options for is_exist / batch_is_exist (RFC #2213). + * + * Mirrors the ReplicateConfig pattern: a struct leaves room for future + * extensions without changing the function signature each time. + */ +struct ExistOptions { + bool prefetch_to_memory = false; + friend std::ostream& operator<<(std::ostream& os, + const ExistOptions& options) noexcept { + os << "ExistOptions: { prefetch_to_memory: " + << options.prefetch_to_memory << " }"; + return os; + } +}; + enum class ReplicaWriteMode { SINGLE_REPLICA, FLEXIBLE_DUAL_REPLICA, diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h index db3b65e910..77ceffcb10 100644 --- a/mooncake-store/include/rpc_service.h +++ b/mooncake-store/include/rpc_service.h @@ -59,6 +59,12 @@ class WrappedMasterService { tl::expected GetReplicaList( const std::string& key, const std::string& tenant_id = "default"); + tl::expected GetReplicaListForPrefetch( + const std::string& key); + + std::vector> + BatchGetReplicaListForPrefetch(const std::vector& keys); + std::vector> BatchGetReplicaList(const std::vector& keys, const std::string& tenant_id = "default"); @@ -198,6 +204,9 @@ class WrappedMasterService { const UUID& client_id, const std::vector& tasks, const std::vector& metadatas); + tl::expected RegisterPrefetchTask(const UUID& client_id, + const std::string& key); + // Promotion-on-hit RPCs. tl::expected, ErrorCode> PromotionObjectHeartbeat(const UUID& client_id); diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index 5fd071b59b..893db02ce6 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -881,6 +881,14 @@ class BucketStorageBackend : public StorageBackendInterface { */ void CleanupOrphanedBucket(int64_t bucket_id); + /** + * @brief Undo a successful metadata commit when NotifyOffloadSuccess fails. + * Caller must delete on-disk bucket files separately via CleanupOrphanedBucket. + */ + void RollbackCommittedBucket(int64_t bucket_id, + const std::vector& keys, + int64_t data_size, int64_t meta_size); + // Holds eviction state between PrepareEviction and FinalizeEviction. // PrepareEviction removes buckets from metadata maps and returns this. // FinalizeEviction waits for in-flight reads and deletes the files. diff --git a/mooncake-store/include/store_c.h b/mooncake-store/include/store_c.h index b5646cd10f..c21772ae76 100644 --- a/mooncake-store/include/store_c.h +++ b/mooncake-store/include/store_c.h @@ -33,6 +33,11 @@ struct mooncake_replicate_config { }; typedef struct mooncake_replicate_config mooncake_replicate_config_t; +struct mooncake_exist_options { + int prefetch_to_memory; +}; +typedef struct mooncake_exist_options mooncake_exist_options_t; + /* * All memory pointed to by the "char *" parameters will not be used * after the C function returns. @@ -100,6 +105,14 @@ int mooncake_store_is_exist(mooncake_store_t store, const char *key); int mooncake_store_batch_is_exist(mooncake_store_t store, const char **keys, size_t count, int *results_out); +int mooncake_store_is_exist_with_options( + mooncake_store_t store, const char *key, + const mooncake_exist_options_t *options); + +int mooncake_store_batch_is_exist_with_options( + mooncake_store_t store, const char **keys, size_t count, int *results_out, + const mooncake_exist_options_t *options); + int64_t mooncake_store_get_size(mooncake_store_t store, const char *key); int mooncake_store_get_hostname(mooncake_store_t store, char *buf_out, diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index 801693ee14..b0c60102fa 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -216,12 +216,29 @@ constexpr const char* CONFIG_KEY_RDMA_DEVICES = "rdma_devices"; constexpr const char* CONFIG_KEY_MASTER_SERVER_ADDR = "master_server_addr"; constexpr const char* CONFIG_KEY_IPC_SOCKET_PATH = "ipc_socket_path"; constexpr const char* CONFIG_KEY_TENANT_ID = "tenant_id"; +// Backoff window (seconds) for best-effort SSD prefetch after DRAM is +// saturated: while active, prefetch is skipped so eviction/offload can +// reclaim memory instead of competing with promotion. 0 disables the backoff. +constexpr const char* CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC = + "ssd_prefetch_cooldown_sec"; +// De-duplication / rate-limit TTL (seconds): the same key is prefetched at +// most once per this period, suppressing duplicate triggers from concurrent +// probes. +constexpr const char* CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC = + "ssd_prefetch_dedup_ttl_sec"; +// Max wait budget (milliseconds) on get when an SSD-only key has a prefetch +// in flight. Polls every 1 ms and returns early once promotion completes. +constexpr const char* CONFIG_KEY_SSD_GET_WAIT_MS = "ssd_get_wait_ms"; // Store client configuration defaults static constexpr size_t DEFAULT_GLOBAL_SEGMENT_SIZE = 1024 * 1024 * 16; // 16MB static constexpr size_t DEFAULT_LOCAL_BUFFER_SIZE = 1024 * 1024 * 16; // 16MB constexpr const char* DEFAULT_PROTOCOL = "tcp"; constexpr const char* DEFAULT_MASTER_SERVER_ADDR = "127.0.0.1:50051"; +static constexpr size_t DEFAULT_SSD_PREFETCH_COOLDOWN_SEC = 5; +static constexpr size_t DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC = 30; +// SSD->DRAM promotion p90 ~3 ms; 10 ms is enough headroom without wasting TTFT. +static constexpr size_t DEFAULT_SSD_GET_WAIT_MS = 10; inline std::string NormalizeTenantId(const std::string& tenant_id) { return tenant_id.empty() ? "default" : tenant_id; diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index c18dead179..7e0f341388 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -1013,6 +1013,57 @@ tl::expected Client::Query( start_time + std::chrono::milliseconds(result.value().lease_ttl_ms)); } +tl::expected Client::QueryForPrefetch( + const std::string& object_key) { + std::chrono::steady_clock::time_point start_time = + std::chrono::steady_clock::now(); + auto result = master_client_.GetReplicaListForPrefetch(object_key); + if (!result) { + return tl::unexpected(result.error()); + } + return QueryResult( + std::move(result.value().replicas), + start_time + std::chrono::milliseconds(result.value().lease_ttl_ms)); +} + +std::vector> Client::BatchQueryForPrefetch( + const std::vector& object_keys) { + std::chrono::steady_clock::time_point start_time = + std::chrono::steady_clock::now(); + auto response = + master_client_.BatchGetReplicaListForPrefetch(object_keys); + + if (response.size() != object_keys.size()) { + LOG(ERROR) << "BatchQueryForPrefetch response size mismatch. Expected: " + << object_keys.size() << ", Got: " << response.size(); + std::vector> results; + results.reserve(object_keys.size()); + for (size_t i = 0; i < object_keys.size(); ++i) { + results.emplace_back(tl::unexpected(ErrorCode::RPC_FAIL)); + } + return results; + } + + std::vector> results; + results.reserve(response.size()); + for (size_t i = 0; i < response.size(); ++i) { + if (response[i]) { + results.emplace_back(QueryResult( + std::move(response[i].value().replicas), + start_time + std::chrono::milliseconds( + response[i].value().lease_ttl_ms))); + } else { + results.emplace_back(tl::unexpected(response[i].error())); + } + } + return results; +} + +tl::expected Client::RegisterPrefetchTask( + const std::string& object_key) { + return master_client_.RegisterPrefetchTask(client_id_, object_key); +} + std::vector> Client::BatchQuery( const std::vector& object_keys) { return BatchQuery(object_keys, master_client_.tenant_id()); diff --git a/mooncake-store/src/dummy_client.cpp b/mooncake-store/src/dummy_client.cpp index c1a84a5c2a..fe090fd2e7 100644 --- a/mooncake-store/src/dummy_client.cpp +++ b/mooncake-store/src/dummy_client.cpp @@ -860,7 +860,11 @@ std::vector DummyClient::batchRemove(const std::vector& keys, return results; } -int DummyClient::isExist(const std::string& key) { +int DummyClient::isExist(const std::string& key, const ExistOptions& options) { + if (options.prefetch_to_memory) { + LOG(WARNING) << "SSD prefetch is not supported in DummyClient mode; " + << "prefetch_to_memory option ignored."; + } auto result = invoke_rpc<&RealClient::isExist_internal, bool>(key); if (result.has_value()) { @@ -870,8 +874,12 @@ int DummyClient::isExist(const std::string& key) { } } -std::vector DummyClient::batchIsExist( - const std::vector& keys) { +std::vector DummyClient::batchIsExist(const std::vector& keys, + const ExistOptions& options) { + if (options.prefetch_to_memory) { + LOG(WARNING) << "SSD prefetch is not supported in DummyClient mode; " + << "prefetch_to_memory option ignored."; + } auto internal_results = invoke_batch_rpc<&RealClient::batchIsExist_internal, bool>(keys.size(), keys); diff --git a/mooncake-store/src/file_storage.cpp b/mooncake-store/src/file_storage.cpp index 4830c13eb3..57b076fe9c 100644 --- a/mooncake-store/src/file_storage.cpp +++ b/mooncake-store/src/file_storage.cpp @@ -833,6 +833,128 @@ tl::expected FileStorage::ProcessPromotionTasks() { return {}; } +tl::expected FileStorage::PrefetchKeys( + const std::vector& keys, const std::vector& sizes, + bool* dram_pressure, PrefetchKeyCallback on_key_done) { + if (client_ == nullptr) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + if (keys.size() != sizes.size()) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + const std::vector preferred_segments; + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + const auto& tenant_id = client_->tenant_id(); + const auto storage_key = + MakeTenantScopedStorageKey(tenant_id, key); + int64_t size = sizes[i]; + + if (size <= 0) { + LOG(WARNING) << "SSD prefetch: skipping key=" << key + << " with non-positive size=" << size; + continue; + } + + auto alloc_result = client_->PromotionAllocStart( + key, tenant_id, static_cast(size), preferred_segments); + if (!alloc_result) { + VLOG(1) << "SSD prefetch: PromotionAllocStart failed for key=" + << key << ", error=" << alloc_result.error(); + if (dram_pressure != nullptr && + alloc_result.error() == ErrorCode::NO_AVAILABLE_HANDLE) { + *dram_pressure = true; + } + auto release = client_->NotifyPromotionFailure(key, tenant_id); + if (!release) { + VLOG(1) << "SSD prefetch: NotifyPromotionFailure failed for" + << " key=" << key << ", error=" << release.error(); + } + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto release_master_state = [this, &key, &tenant_id]() { + auto release = client_->NotifyPromotionFailure(key, tenant_id); + if (!release) { + VLOG(1) << "SSD prefetch: NotifyPromotionFailure failed for" + << " key=" << key << ", error=" << release.error(); + } + }; + + std::vector single_key{storage_key}; + std::vector single_size{size}; + auto allocate_res = AllocateBatch(single_key, single_size); + if (!allocate_res) { + LOG(WARNING) << "SSD prefetch: AllocateBatch failed for key=" << key + << ", error=" << allocate_res.error(); + if (dram_pressure != nullptr && + allocate_res.error() == ErrorCode::NO_AVAILABLE_HANDLE) { + *dram_pressure = true; + } + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + auto staging = allocate_res.value(); + auto load_res = BatchLoad(staging->slices); + if (!load_res) { + LOG(WARNING) << "SSD prefetch: BatchLoad failed for key=" << key + << ", error=" << load_res.error(); + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto slice_it = staging->slices.find(storage_key); + if (slice_it == staging->slices.end()) { + LOG(WARNING) << "SSD prefetch: staging slice missing for key=" + << key; + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + std::vector tx_slices{slice_it->second}; + ErrorCode write_err = client_->PromotionWrite( + alloc_result.value().memory_descriptor, tx_slices); + if (write_err != ErrorCode::OK) { + LOG(WARNING) << "SSD prefetch: PromotionWrite failed for key=" + << key << ", error=" << write_err; + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto notify_res = client_->NotifyPromotionSuccess(key, tenant_id); + if (!notify_res) { + LOG(WARNING) << "SSD prefetch: NotifyPromotionSuccess failed for" + << "key=" << key << ", error=" << notify_res.error(); + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + if (on_key_done) { + on_key_done(key, true); + } + } + + return {}; +} + tl::expected FileStorage::BatchLoad( std::unordered_map& batch_object) { auto start_time = std::chrono::steady_clock::now(); diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp index dc9282866d..c18f00894a 100644 --- a/mooncake-store/src/master_client.cpp +++ b/mooncake-store/src/master_client.cpp @@ -37,6 +37,21 @@ struct RpcNameTraits<&WrappedMasterService::GetReplicaList> { static constexpr const char* value = "GetReplicaList"; }; +template <> +struct RpcNameTraits<&WrappedMasterService::GetReplicaListForPrefetch> { + static constexpr const char* value = "GetReplicaListForPrefetch"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::BatchGetReplicaListForPrefetch> { + static constexpr const char* value = "BatchGetReplicaListForPrefetch"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::RegisterPrefetchTask> { + static constexpr const char* value = "RegisterPrefetchTask"; +}; + template <> struct RpcNameTraits<&WrappedMasterService::CalcCacheStats> { static constexpr const char* value = "CalcCacheStats"; @@ -530,6 +545,40 @@ tl::expected MasterClient::GetReplicaList( return result; } +tl::expected +MasterClient::GetReplicaListForPrefetch(const std::string& object_key) { + ScopedVLogTimer timer(1, "MasterClient::GetReplicaListForPrefetch"); + timer.LogRequest("object_key=", object_key); + + auto result = invoke_rpc<&WrappedMasterService::GetReplicaListForPrefetch, + GetReplicaListResponse>(object_key); + timer.LogResponseExpected(result); + return result; +} + +std::vector> +MasterClient::BatchGetReplicaListForPrefetch( + const std::vector& object_keys) { + ScopedVLogTimer timer(1, "MasterClient::BatchGetReplicaListForPrefetch"); + timer.LogRequest("keys_count=", object_keys.size()); + + auto result = + invoke_batch_rpc<&WrappedMasterService::BatchGetReplicaListForPrefetch, + GetReplicaListResponse>(object_keys.size(), object_keys); + timer.LogResponse("result=", result.size(), " operations"); + return result; +} + +tl::expected MasterClient::RegisterPrefetchTask( + const UUID& client_id, const std::string& key) { + ScopedVLogTimer timer(1, "MasterClient::RegisterPrefetchTask"); + timer.LogRequest("client_id=", client_id, ", key=", key); + auto result = invoke_rpc<&WrappedMasterService::RegisterPrefetchTask, void>( + client_id, key); + timer.LogResponseExpected(result); + return result; +} + std::vector> MasterClient::BatchGetReplicaList(const std::vector& object_keys) { return BatchGetReplicaList(object_keys, tenant_id_); diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index db74f7cc2c..f337b449a4 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -1442,6 +1442,90 @@ auto MasterService::GetReplicaList(const std::string& key, return resp; } +auto MasterService::GetReplicaListForPrefetch(const std::string& key) + -> tl::expected { + std::shared_lock shared_lock(snapshot_mutex_); + + // NOTE: prefetch path is not tenant-aware; scope to the default tenant. + MetadataAccessorRO accessor(this, MakeObjectIdentity(key, "default")); + if (!accessor.Exists()) { + VLOG(1) << "prefetch_metadata key=" << key << ", info=object_not_found"; + return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); + } + const auto& metadata = accessor.Get(); + + std::vector replica_list; + metadata.VisitReplicas( + &Replica::fn_is_completed, [&replica_list](const Replica& replica) { + replica_list.emplace_back(replica.get_descriptor()); + }); + + if (replica_list.empty()) { + LOG(WARNING) << "prefetch_metadata key=" << key + << ", error=replica_not_ready"; + return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); + } + + return GetReplicaListResponse(std::move(replica_list), + default_kv_lease_ttl_); +} + +auto MasterService::RegisterPrefetchTask(const UUID& client_id, + const std::string& key) + -> tl::expected { + std::shared_lock shared_lock(snapshot_mutex_); + // NOTE: prefetch path is not tenant-aware; scope to the default tenant. + MetadataAccessorRW accessor(this, MakeObjectIdentity(key, "default")); + if (!accessor.Exists()) { + return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); + } + auto& metadata = accessor.Get(); + auto& tenant_state = accessor.GetTenantState(); + + if (metadata.HasReplica(&Replica::fn_is_memory_replica)) { + return {}; + } + + if (tenant_state.promotion_tasks.count(key) > 0) { + return {}; + } + + if (promotion_in_flight_.load(std::memory_order_relaxed) >= + promotion_queue_limit_) { + return tl::make_unexpected(ErrorCode::KEYS_ULTRA_LIMIT); + } + + Replica* source = nullptr; + metadata.VisitReplicas(&Replica::fn_is_local_disk_replica, + [&source](Replica& r) { + if (source == nullptr) source = &r; + }); + if (source == nullptr) { + return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); + } + + auto holder_id = source->get_local_disk_client_id(); + if (!holder_id.has_value() || holder_id.value() != client_id) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + source->inc_refcnt(); + const uint64_t object_size = + source->get_descriptor().get_local_disk_descriptor().object_size; + + tenant_state.promotion_tasks.emplace( + key, PromotionTask{.source_id = source->id(), + .alloc_id = 0, + .object_size = object_size, + .start_time = std::chrono::system_clock::now(), + .holder_id = holder_id.value(), + .from_prefetch = true}); + promotion_in_flight_.fetch_add(1, std::memory_order_relaxed); + VLOG(1) << "prefetch_task_registered key=" << key + << " size=" << object_size; + return {}; +} + auto MasterService::AllocateAndInsertMetadata( MetadataShardAccessorRW& shard, const UUID& client_id, const std::string& key, uint64_t value_length, @@ -3650,6 +3734,7 @@ auto MasterService::NotifyPromotionSuccess(const UUID& client_id, if (task_it->second.holder_id != client_id) { return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } + const bool from_prefetch = task_it->second.from_prefetch; bool committed = false; Replica* staged = metadata.GetReplicaByID(task_it->second.alloc_id); @@ -3694,6 +3779,12 @@ auto MasterService::NotifyPromotionSuccess(const UUID& client_id, if (!committed) { return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); } + + // Prefetch-promoted keys get the same lease as exist/get so DRAM survives + // until the subsequent get() (see ssd-prefetch.md §5.3). + if (from_prefetch) { + GrantLeaseForGroup(tenant_state, object_id.user_key, metadata); + } return {}; } diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 81fd32d8e6..7399a8bd22 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -15,6 +15,7 @@ #include // for atexit #include #include +#include #include #include #include @@ -901,6 +902,8 @@ tl::expected RealClient::setup_internal( ->register_handler<&RealClient::batch_get_offload_object>(this); offload_rpc_server_ ->register_handler<&RealClient::release_offload_buffer>(this); + offload_rpc_server_ + ->register_handler<&RealClient::prefetch_offload_object>(this); offload_rpc_server_->async_start(); auto err = offload_rpc_server_->get_errc(); if (err) { @@ -930,6 +933,7 @@ tl::expected RealClient::setup_internal( << init_result.error(); return init_result; } + initPrefetchRuntime(); } client_requester_ = std::make_shared(); if (FLAGS_enable_http_server) { @@ -949,7 +953,17 @@ int RealClient::setup_real( const std::string &master_server_addr, const std::shared_ptr &transfer_engine, const std::string &ipc_socket_path, bool enable_ssd_offload, - const std::string &ssd_offload_path, const std::string &tenant_id) { + const std::string &ssd_offload_path, const std::string &tenant_id, + int64_t ssd_prefetch_cooldown_sec, int64_t ssd_prefetch_dedup_ttl_sec) { + if (prefetch_throttle_) { + prefetch_throttle_->configure(ssd_prefetch_cooldown_sec, + ssd_prefetch_dedup_ttl_sec); + LOG(INFO) << "SSD prefetch throttle: " + << CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC << "=" + << ssd_prefetch_cooldown_sec << "s, " + << CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC << "=" + << ssd_prefetch_dedup_ttl_sec << "s"; + } return to_py_ret(setup_internal( local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, transfer_engine, @@ -1055,6 +1069,31 @@ tl::expected RealClient::setup_internal( bool enable_ssd_offload = (enable_ssd_offload_str == "true" || enable_ssd_offload_str == "1"); + // SSD prefetch throttle tunables (seconds). Stored on the per-client + // throttle shared with detached prefetch threads. + size_t ssd_prefetch_cooldown_sec = + get_config_size(config, CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC, + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC) + .value_or(DEFAULT_SSD_PREFETCH_COOLDOWN_SEC); + size_t ssd_prefetch_dedup_ttl_sec = + get_config_size(config, CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC, + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) + .value_or(DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC); + if (prefetch_throttle_) { + prefetch_throttle_->configure( + static_cast(ssd_prefetch_cooldown_sec), + static_cast(ssd_prefetch_dedup_ttl_sec)); + LOG(INFO) << "SSD prefetch throttle: " + << CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC << "=" + << ssd_prefetch_cooldown_sec << "s, " + << CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC << "=" + << ssd_prefetch_dedup_ttl_sec << "s"; + } + size_t ssd_get_wait_ms = + get_config_size(config, CONFIG_KEY_SSD_GET_WAIT_MS, + DEFAULT_SSD_GET_WAIT_MS) + .value_or(DEFAULT_SSD_GET_WAIT_MS); + ssd_get_wait_ms_config_ = static_cast(ssd_get_wait_ms); return setup_internal( local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, nullptr, ipc_socket_path, @@ -2002,33 +2041,488 @@ tl::expected RealClient::isExist_internal( return client_->IsExist(key); } -int RealClient::isExist(const std::string &key) { +int RealClient::isExist(const std::string &key, const ExistOptions &options) { auto result = isExist_internal(key); if (result.has_value()) { + if (options.prefetch_to_memory && *result && file_storage_) { + triggerSsdPrefetch({key}); + } return *result ? 1 : 0; // 1 if exists, 0 if not } else { return toInt(result.error()); } } -std::vector RealClient::batchIsExist( - const std::vector &keys) { +std::vector RealClient::batchIsExist(const std::vector &keys, + const ExistOptions &options) { auto internal_results = batchIsExist_internal(keys); std::vector results; results.reserve(internal_results.size()); - for (const auto &result : internal_results) { - if (result.has_value()) { - results.push_back(result.value() ? 1 : 0); // 1 if exists, 0 if not + std::vector prefetch_candidates; + for (size_t i = 0; i < internal_results.size(); ++i) { + if (internal_results[i].has_value()) { + bool exists = internal_results[i].value(); + results.push_back(exists ? 1 : 0); // 1 if exists, 0 if not + if (options.prefetch_to_memory && exists) { + prefetch_candidates.push_back(keys[i]); + } } else { - results.push_back(toInt(result.error())); + results.push_back(toInt(internal_results[i].error())); } } + if (options.prefetch_to_memory && !prefetch_candidates.empty() && + file_storage_) { + triggerSsdPrefetch(prefetch_candidates); + } + return results; } +namespace { +// Parse a non-negative integer env var, returning `fallback` when unset or +// invalid. +int64_t GetEnvInt64(const char *name, int64_t fallback) { + const char *raw = std::getenv(name); + if (raw == nullptr || *raw == '\0') { + return fallback; + } + try { + return static_cast(std::stoll(raw)); + } catch (const std::exception &) { + LOG(WARNING) << "Invalid value for env " << name << "='" << raw + << "', using default " << fallback; + return fallback; + } +} +} // namespace + +namespace { + +constexpr size_t kPrefetchMetadataChunkSize = 128; + +struct SsdPrefetchRoute { + int64_t local_disk_size{0}; + std::string holder_endpoint; +}; + +std::optional ClassifySsdPrefetchRoute( + const std::vector &replicas) { + bool has_memory = false; + bool has_local_disk = false; + SsdPrefetchRoute route; + for (const auto &replica : replicas) { + if (replica.status != ReplicaStatus::COMPLETE && + replica.status != ReplicaStatus::PROCESSING) { + continue; + } + if (replica.is_memory_replica()) { + has_memory = true; + break; + } + if (replica.is_local_disk_replica()) { + has_local_disk = true; + route.local_disk_size = + static_cast(calculate_total_size(replica)); + route.holder_endpoint = + replica.get_local_disk_descriptor().transport_endpoint; + } + } + if (has_memory || !has_local_disk || route.local_disk_size <= 0) { + return std::nullopt; + } + return route; +} + +void RunLocalPrefetchRegisterAndPromote( + const std::shared_ptr &client, FileStorage *file_storage, + const std::shared_ptr &throttle, + const FileStorage::PrefetchKeyCallback &on_key_done, + const std::vector &local_keys, + const std::vector &local_sizes) { + if (local_keys.empty() || file_storage == nullptr) { + return; + } + std::vector prefetch_keys; + std::vector prefetch_sizes; + prefetch_keys.reserve(local_keys.size()); + prefetch_sizes.reserve(local_keys.size()); + for (size_t i = 0; i < local_keys.size(); ++i) { + auto register_result = client->RegisterPrefetchTask(local_keys[i]); + if (!register_result) { + VLOG(1) << "SSD prefetch: RegisterPrefetchTask failed for" + << "key=" << local_keys[i] + << ", error=" << register_result.error(); + continue; + } + prefetch_keys.push_back(local_keys[i]); + prefetch_sizes.push_back(local_sizes[i]); + if (throttle) { + throttle->markInFlight(local_keys[i]); + } + VLOG(1) << "SSD prefetch: registered task for key=" << local_keys[i] + << ", size=" << local_sizes[i]; + } + if (prefetch_keys.empty()) { + return; + } + bool dram_pressure = false; + auto prefetch_res = file_storage->PrefetchKeys( + prefetch_keys, prefetch_sizes, &dram_pressure, on_key_done); + if (!prefetch_res) { + LOG(WARNING) << "SSD prefetch: PrefetchKeys failed, error=" + << prefetch_res.error(); + } else { + VLOG(1) << "SSD prefetch: PrefetchKeys completed for " + << prefetch_keys.size() << " key(s)"; + } + if (dram_pressure && throttle) { + throttle->enterCooldown(); + LOG(INFO) << "SSD prefetch: DRAM saturated, backing off " + "(ssd_prefetch_cooldown_sec)"; + } +} + +// Returns refreshed QueryResult when master sees a COMPLETE MEMORY replica. +std::optional TryRefreshBestMemoryReplica( + Client *client, const std::string &key, + const std::unordered_set &local_endpoints) { + auto requery = client->Query(key); + if (!requery) { + return std::nullopt; + } + const auto *candidate = + SelectBestReplica(requery.value().replicas, local_endpoints); + if (!candidate || !candidate->is_memory_replica()) { + return std::nullopt; + } + return std::move(requery.value()); +} + +} // namespace + +PrefetchReplicaSource PrefetchReplicaSourceFromDescriptor( + const Replica::Descriptor &replica) { + if (replica.is_memory_replica()) { + return PrefetchReplicaSource::kDram; + } + if (replica.is_local_disk_replica()) { + return PrefetchReplicaSource::kSsd; + } + if (replica.is_disk_replica()) { + return PrefetchReplicaSource::kDisk; + } + return PrefetchReplicaSource::kUnknown; +} + +const char *PrefetchReplicaSourceToString(PrefetchReplicaSource source) { + switch (source) { + case PrefetchReplicaSource::kDram: + return "DRAM"; + case PrefetchReplicaSource::kSsd: + return "SSD"; + case PrefetchReplicaSource::kDisk: + return "DISK"; + case PrefetchReplicaSource::kUnknown: + return "UNKNOWN"; + } + return "UNKNOWN"; +} + +PrefetchOutcome ClassifyPrefetchOutcome( + int64_t prefetch_trigger_ms, int64_t prefetch_done_ms, int64_t get_ms, + PrefetchReplicaSource source, PrefetchThrottle::State prefetch_state, + bool prefetch_wait_attempted, bool promote_attempted) { + const bool from_dram = source == PrefetchReplicaSource::kDram; + const bool prefetch_involved = + prefetch_trigger_ms >= 0 || prefetch_wait_attempted; + if (!prefetch_involved) { + if (from_dram) { + return PrefetchOutcome::kDramResident; + } + // Key was DRAM-resident at exist-time (or never triggered); evicted + // before get(), so get falls back to SSD. + return PrefetchOutcome::kPrefetchEvictedAfterExist; + } + if (prefetch_trigger_ms >= 0 && + prefetch_state == PrefetchThrottle::State::kFailed) { + return PrefetchOutcome::kPrefetchFailed; + } + if (from_dram) { + if (prefetch_done_ms >= 0 && prefetch_done_ms <= get_ms) { + return PrefetchOutcome::kPrefetchHit; + } + if (!promote_attempted) { + return PrefetchOutcome::kPrefetchDramWasResident; + } + return PrefetchOutcome::kPrefetchPromotedUntracked; + } + // This rank triggered prefetch or attempted promote, but get still reads + // SSD (promotion lost the race or timed out). + if (prefetch_trigger_ms >= 0 || promote_attempted) { + return PrefetchOutcome::kPrefetchMissRace; + } + // No trigger/promote on this rank; get reads SSD (may have been DRAM at + // exist-time on another rank, or master wait timed out on TP1~7). + return PrefetchOutcome::kPrefetchEvictedAfterExist; +} + +const char *PrefetchOutcomeToString(PrefetchOutcome outcome) { + switch (outcome) { + case PrefetchOutcome::kDramResident: + return "dram_resident"; + case PrefetchOutcome::kPrefetchEvictedAfterExist: + return "prefetch_evicted_after_exist"; + case PrefetchOutcome::kPrefetchFailed: + return "prefetch_failed"; + case PrefetchOutcome::kPrefetchHit: + return "prefetch_hit"; + case PrefetchOutcome::kPrefetchDramWasResident: + return "prefetch_dram_was_resident"; + case PrefetchOutcome::kPrefetchPromotedUntracked: + return "prefetch_promoted_untracked"; + case PrefetchOutcome::kPrefetchMissRace: + return "prefetch_miss_race"; + } + return "prefetch_evicted_after_exist"; +} + +void RealClient::initPrefetchRuntime() { + // Bounded worker pool for SSD prefetch promotion jobs. Fixed size, + // consistent with ClientService's task_thread_pool_(4); bounds concurrent + // SSD reads / DRAM allocations and avoids unbounded detached threads. + constexpr size_t kPrefetchThreadPoolSize = 4; + prefetch_pool_ = std::make_shared(kPrefetchThreadPoolSize); + + // get()-side wait-for-prefetch max budget (poll every 1 ms, early exit). + // Env overrides mooncake.json ssd_get_wait_ms. + const int64_t wait_from_env = GetEnvInt64("MOONCAKE_SSD_GET_WAIT_MS", -1); + if (wait_from_env >= 0) { + ssd_get_wait_ms_ = wait_from_env; + } else { + ssd_get_wait_ms_ = ssd_get_wait_ms_config_; + } + + LOG(INFO) << "SSD prefetch runtime: pool_size=" << kPrefetchThreadPoolSize + << ", ssd_get_wait_ms=" << ssd_get_wait_ms_ + << " (poll 1ms, early exit on completion)"; +} + +void RealClient::submitPrefetchJob(std::function job) { + if (prefetch_pool_) { + try { + prefetch_pool_->enqueue(std::move(job)); + return; + } catch (const std::exception &e) { + // Pool stopped (shutdown in progress): drop the best-effort job. + VLOG(1) << "SSD prefetch: pool enqueue failed (" << e.what() + << "), dropping job"; + return; + } + } + // Pool unavailable (not initialized / shutting down): drop the best-effort + // job. Never fall back to an unbounded detached thread -- that is exactly + // the prefetch-storm anti-pattern this bounded-pool path exists to avoid. + VLOG(1) << "SSD prefetch: pool unavailable, dropping job"; +} + +void RealClient::triggerSsdPrefetch(const std::vector &keys) { + auto throttle = prefetch_throttle_; + if (throttle && throttle->inCooldown()) { + VLOG(1) << "SSD prefetch: skipped (memory-pressure cooldown)"; + return; + } + if (keys.empty()) { + return; + } + + // Do not reserve()/record trigger at exist time. BatchQueryForPrefetch in the + // async job filters SSD-only keys first; reserve() runs only for keys that + // will actually RegisterPrefetchTask, avoiding false triggers on DRAM-resident + // keys (exist only knows "exists", not replica tier). + auto keys_copy = keys; + auto client = client_; + auto file_storage = file_storage_; + auto client_requester = client_requester_; + const std::string local_rpc_addr_copy = local_rpc_addr; + FileStorage::PrefetchKeyCallback on_key_done; + if (throttle) { + on_key_done = [throttle](const std::string &key, bool success) { + if (success) { + throttle->markCompleted(key); + } else { + throttle->markFailed(key); + } + }; + } + submitPrefetchJob([client, file_storage, client_requester, throttle, + local_rpc_addr_copy, on_key_done, + keys_copy = std::move(keys_copy)]() { + // Batch metadata queries in chunks, then register + promote each + // chunk immediately (pipeline) instead of waiting for all keys. + std::unordered_map> remote_keys; + std::unordered_map> remote_sizes; + + for (size_t offset = 0; offset < keys_copy.size(); + offset += kPrefetchMetadataChunkSize) { + const size_t end = std::min(offset + kPrefetchMetadataChunkSize, + keys_copy.size()); + std::vector chunk(keys_copy.begin() + offset, + keys_copy.begin() + end); + std::vector> batch_results; + try { + batch_results = client->BatchQueryForPrefetch(chunk); + } catch (const std::exception &e) { + LOG(WARNING) << "SSD prefetch: BatchQueryForPrefetch failed: " + << e.what(); + continue; + } + if (batch_results.size() != chunk.size()) { + LOG(WARNING) << "SSD prefetch: BatchQueryForPrefetch size " + "mismatch, expected " + << chunk.size() << ", got " + << batch_results.size(); + continue; + } + + std::vector chunk_local_keys; + std::vector chunk_local_sizes; + chunk_local_keys.reserve(chunk.size()); + chunk_local_sizes.reserve(chunk.size()); + + for (size_t i = 0; i < chunk.size(); ++i) { + if (!batch_results[i]) { + VLOG(1) << "SSD prefetch: metadata query failed for" + << " key=" << chunk[i] + << ", error=" << batch_results[i].error(); + continue; + } + auto route = + ClassifySsdPrefetchRoute(batch_results[i].value().replicas); + if (!route) { + continue; + } + if (route->holder_endpoint.empty() || + route->holder_endpoint == local_rpc_addr_copy) { + chunk_local_keys.push_back(chunk[i]); + chunk_local_sizes.push_back(route->local_disk_size); + } else { + remote_keys[route->holder_endpoint].push_back(chunk[i]); + remote_sizes[route->holder_endpoint].push_back( + route->local_disk_size); + } + } + + std::vector promote_local_keys; + std::vector promote_local_sizes; + if (throttle) { + auto reserved = throttle->reserve(chunk_local_keys); + std::unordered_set reserved_set(reserved.begin(), + reserved.end()); + promote_local_keys.reserve(reserved.size()); + promote_local_sizes.reserve(reserved.size()); + for (size_t i = 0; i < chunk_local_keys.size(); ++i) { + if (reserved_set.find(chunk_local_keys[i]) == + reserved_set.end()) { + continue; + } + promote_local_keys.push_back(chunk_local_keys[i]); + promote_local_sizes.push_back(chunk_local_sizes[i]); + } + } else { + promote_local_keys = std::move(chunk_local_keys); + promote_local_sizes = std::move(chunk_local_sizes); + } + + RunLocalPrefetchRegisterAndPromote( + client, file_storage.get(), throttle, on_key_done, + promote_local_keys, promote_local_sizes); + } + + // Remote branch: delegate each holder's keys via RPC. The holder + // registers the promotion task with its own client_id, so Master's + // holder check passes without changing Master logic. Best-effort. + if (client_requester) { + for (auto &[endpoint, group_keys] : remote_keys) { + VLOG(1) << "SSD prefetch: delegating " << group_keys.size() + << " key(s) to remote holder " << endpoint; + client_requester->prefetch_offload_object( + endpoint, group_keys, remote_sizes[endpoint]); + } + } + }); +} + +void RealClient::runLocalPrefetch(const std::vector &keys, + const std::vector &sizes) { + auto throttle = prefetch_throttle_; + if (throttle && throttle->inCooldown()) { + VLOG(1) << "SSD prefetch: skipped (memory-pressure cooldown)"; + return; + } + + std::unordered_set allowed(keys.begin(), keys.end()); + if (throttle) { + auto reserved = throttle->reserve(keys); + allowed.clear(); + allowed.insert(reserved.begin(), reserved.end()); + if (allowed.empty()) { + return; + } + } + + std::vector keys_copy; + std::vector sizes_copy; + keys_copy.reserve(keys.size()); + sizes_copy.reserve(keys.size()); + for (size_t i = 0; i < keys.size(); ++i) { + if (allowed.find(keys[i]) == allowed.end()) { + continue; + } + keys_copy.push_back(keys[i]); + sizes_copy.push_back(i < sizes.size() ? sizes[i] + : static_cast(0)); + } + if (keys_copy.empty()) { + return; + } + + auto client = client_; + auto file_storage = file_storage_; + FileStorage::PrefetchKeyCallback on_key_done; + if (throttle) { + on_key_done = [throttle](const std::string &key, bool success) { + if (success) { + throttle->markCompleted(key); + } else { + throttle->markFailed(key); + } + }; + } + submitPrefetchJob([client, file_storage, throttle, on_key_done, + keys_copy = std::move(keys_copy), + sizes_copy = std::move(sizes_copy)]() { + std::vector local_keys; + std::vector local_sizes; + local_keys.reserve(keys_copy.size()); + local_sizes.reserve(keys_copy.size()); + for (size_t i = 0; i < keys_copy.size(); ++i) { + const int64_t size = + i < sizes_copy.size() ? sizes_copy[i] : static_cast(0); + if (size <= 0) { + continue; + } + local_keys.push_back(keys_copy[i]); + local_sizes.push_back(size); + } + RunLocalPrefetchRegisterAndPromote( + client, file_storage.get(), throttle, on_key_done, local_keys, + local_sizes); + }); +} + tl::expected RealClient::getSize_internal( const std::string &key) { if (!client_) { @@ -4367,7 +4861,6 @@ std::vector> RealClient::batch_get_into_internal(const std::vector &keys, const std::vector &buffers, const std::vector &sizes) { - auto start_time = std::chrono::steady_clock::now(); // Validate preconditions if (!client_) { LOG(ERROR) << "Client is not initialized"; @@ -4636,10 +5129,7 @@ RealClient::batch_get_into_internal(const std::vector &keys, store_segment_it->second.emplace(op_it.first, op_it.second.slices); } - size_t offload_object_count = 0; - auto start_read_store_time = std::chrono::steady_clock::now(); for (auto &offload_objects_it : offload_objects) { - offload_object_count += offload_objects_it.second.size(); auto batch_get_offload_result = batch_get_into_offload_object_internal( offload_objects_it.first, offload_objects_it.second); if (!batch_get_offload_result) { @@ -4653,19 +5143,6 @@ RealClient::batch_get_into_internal(const std::vector &keys, } } - auto end_time = std::chrono::steady_clock::now(); - auto elapsed_time = std::chrono::duration_cast( - end_time - start_time) - .count(); - auto read_store_time = - std::chrono::duration_cast( - end_time - start_read_store_time) - .count(); - // LOG(INFO) << "Time taken for batch_get_into: " << elapsed_time - // << "us, read store: " << read_store_time - // << "us, with memory key count: " << valid_operations.size() - // << ", offload key count: " << offload_object_count; - return results; } @@ -4924,8 +5401,83 @@ RealClient::batch_get_into_multi_buffers_internal( results.emplace_back(tl::unexpected(ErrorCode::INVALID_REPLICA)); continue; } + + // get()-side wait-for-prefetch (A2): poll until promotion completes or + // the configured budget expires (default 10 ms, 1 ms poll interval). + // Local throttle covers TP0; master Query covers TP1~7 where exist + // triggered prefetch in another process. + int64_t prefetch_trigger_ms = + prefetch_throttle_ ? prefetch_throttle_->triggeredAt(key) : -1; + int64_t prefetch_done_ms = + prefetch_throttle_ ? prefetch_throttle_->completedAt(key) : -1; + PrefetchThrottle::State prefetch_state = + prefetch_throttle_ ? prefetch_throttle_->stateOf(key) + : PrefetchThrottle::State::kTriggered; + bool prefetch_wait_attempted = false; + const char *prefetch_wait_mode = "none"; + std::optional refreshed_qr; + if (best_replica->is_local_disk_replica() && ssd_get_wait_ms_ > 0) { + prefetch_wait_attempted = true; + constexpr int64_t kPollMs = 1; + if (prefetch_trigger_ms >= 0 && prefetch_throttle_) { + prefetch_wait_mode = "local"; + if (prefetch_state != PrefetchThrottle::State::kCompleted) { + prefetch_throttle_->waitForCompletion(key, ssd_get_wait_ms_, + kPollMs); + } + prefetch_done_ms = prefetch_throttle_->completedAt(key); + prefetch_state = prefetch_throttle_->stateOf(key); + if (auto qr = TryRefreshBestMemoryReplica( + client_.get(), key, local_endpoints)) { + refreshed_qr.emplace(std::move(*qr)); + best_replica = SelectBestReplica(refreshed_qr->replicas, + local_endpoints); + if (prefetch_done_ms < 0) { + prefetch_done_ms = PrefetchThrottle::NowMs(); + } + } + } else { + prefetch_wait_mode = "master"; + const int64_t deadline = + PrefetchThrottle::NowMs() + ssd_get_wait_ms_; + while (PrefetchThrottle::NowMs() < deadline) { + if (auto qr = TryRefreshBestMemoryReplica( + client_.get(), key, local_endpoints)) { + refreshed_qr.emplace(std::move(*qr)); + best_replica = SelectBestReplica(refreshed_qr->replicas, + local_endpoints); + prefetch_done_ms = PrefetchThrottle::NowMs(); + break; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(kPollMs)); + } + } + } + const auto replica = *best_replica; uint64_t total_size = calculate_total_size(replica); + const int64_t get_ms = PrefetchThrottle::NowMs(); + const PrefetchReplicaSource source = + PrefetchReplicaSourceFromDescriptor(replica); + const bool promote_attempted = + prefetch_throttle_ ? prefetch_throttle_->promoteAttempted(key) + : false; + const PrefetchOutcome outcome = ClassifyPrefetchOutcome( + prefetch_trigger_ms, prefetch_done_ms, get_ms, source, + prefetch_state, prefetch_wait_attempted, promote_attempted); + VLOG(1) << "[GET-SRC] key=" << key + << " source=" << PrefetchReplicaSourceToString(source) + << " size=" << total_size + << " prefetch_trigger_ms=" << prefetch_trigger_ms + << " prefetch_done_ms=" << prefetch_done_ms + << " get_ms=" << get_ms + << " prefetch_wait_mode=" << prefetch_wait_mode + << " prefetch_promote_attempted=" + << (promote_attempted ? "1" : "0") + << " path=multi_buffers" + << " [PREFETCH-OUTCOME] outcome=" + << PrefetchOutcomeToString(outcome); const auto &sizes = all_sizes[i]; uint64_t dst_total_size = 0; for (auto &size : sizes) { @@ -4972,7 +5524,8 @@ RealClient::batch_get_into_multi_buffers_internal( valid_operations.push_back( {.key = key, .original_index = i, - .query_result = FilterQueryResult(query_result_values, replica), + .query_result = FilterQueryResult( + refreshed_qr ? *refreshed_qr : query_result_values, replica), .slices = std::move(key_slices), .total_size = total_size}); // Set success result (actual bytes transferred) @@ -5580,6 +6133,23 @@ bool RealClient::release_offload_buffer(uint64_t batch_id) { return file_storage_->ReleaseBuffer(batch_id); } +bool RealClient::prefetch_offload_object(const std::vector &keys, + const std::vector &sizes) { + if (!file_storage_) { + VLOG(1) << "prefetch_offload_object called but file_storage_ is null"; + return false; + } + if (keys.empty()) { + return true; + } + VLOG(1) << "SSD prefetch: received remote prefetch request for " + << keys.size() << " key(s)"; + // We are the holder for these keys: promote SSD->DRAM locally. Runs on a + // detached thread so the coro_rpc IO thread is not blocked on SSD I/O. + runLocalPrefetch(keys, sizes); + return true; +} + tl::expected RealClient::batch_get_into_offload_object_internal( const std::string &target_rpc_service_addr, @@ -5696,6 +6266,24 @@ void ClientRequester::release_offload_buffer(const std::string &client_addr, } } +void ClientRequester::prefetch_offload_object( + const std::string &client_addr, const std::vector &keys, + const std::vector &sizes) { + // Best-effort delegation to the remote holder. Errors (holder down, + // network) are logged but never propagated: the caller's exist() must not + // block on prefetch, and get() can still fall back to a cross-node SSD + // read if the promotion did not land in time. + auto result = invoke_rpc<&RealClient::prefetch_offload_object, bool>( + client_addr, keys, sizes); + if (!result) { + VLOG(1) << "Failed to invoke prefetch_offload_object, client_addr = " + << client_addr << ", error is: " << result.error(); + } else { + VLOG(1) << "Delegated prefetch of " << keys.size() + << " key(s) to holder " << client_addr; + } +} + template tl::expected ClientRequester::invoke_rpc( const std::string &client_addr, Args &&...args) { diff --git a/mooncake-store/src/rpc_service.cpp b/mooncake-store/src/rpc_service.cpp index 441db11dc1..21350aca3b 100644 --- a/mooncake-store/src/rpc_service.cpp +++ b/mooncake-store/src/rpc_service.cpp @@ -992,6 +992,55 @@ WrappedMasterService::GetReplicaList(const std::string& key, }); } +tl::expected +WrappedMasterService::GetReplicaListForPrefetch(const std::string& key) { + ScopedVLogTimer timer(1, "GetReplicaListForPrefetch"); + timer.LogRequest("key=", key); + auto result = master_service_.GetReplicaListForPrefetch(key); + timer.LogResponseExpected(result); + return result; +} + +std::vector> +WrappedMasterService::BatchGetReplicaListForPrefetch( + const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchGetReplicaListForPrefetch"); + const size_t total_keys = keys.size(); + timer.LogRequest("keys_count=", total_keys); + + std::vector> results; + results.reserve(keys.size()); + + for (const auto& key : keys) { + results.emplace_back(master_service_.GetReplicaListForPrefetch(key)); + } + + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + auto error = results[i].error(); + if (error == ErrorCode::OBJECT_NOT_FOUND || + error == ErrorCode::REPLICA_IS_NOT_READY) { + VLOG(1) << "BatchGetReplicaListForPrefetch failed for key[" << i + << "] '" << keys[i] << "': " << toString(error); + } else { + LOG(ERROR) << "BatchGetReplicaListForPrefetch failed for key[" + << i << "] '" << keys[i] + << "': " << toString(error); + } + } + } + return results; +} + +tl::expected WrappedMasterService::RegisterPrefetchTask( + const UUID& client_id, const std::string& key) { + ScopedVLogTimer timer(1, "RegisterPrefetchTask"); + timer.LogRequest("client_id=", client_id, ", key=", key); + auto result = master_service_.RegisterPrefetchTask(client_id, key); + timer.LogResponseExpected(result); + return result; +} + std::vector> WrappedMasterService::BatchGetReplicaList(const std::vector& keys, const std::string& tenant_id) { @@ -2019,6 +2068,12 @@ void RegisterRpcService( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::GetReplicaList>( &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::GetReplicaListForPrefetch>( + &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::BatchGetReplicaListForPrefetch>( + &wrapped_master_service); server .register_handler<&mooncake::WrappedMasterService::BatchGetReplicaList>( &wrapped_master_service); @@ -2138,6 +2193,9 @@ void RegisterRpcService( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::FetchTasks>( &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::RegisterPrefetchTask>( + &wrapped_master_service); server .register_handler<&mooncake::WrappedMasterService::MarkTaskToComplete>( &wrapped_master_service); diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index 0820170190..3a12db2f9f 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -1329,23 +1329,19 @@ tl::expected BucketStorageBackend::BatchOffload( LOG(ERROR) << "Failed to write bucket with id: " << bucket_id; return tl::make_unexpected(write_bucket_result.error()); } - if (complete_handler != nullptr) { - auto error_code = complete_handler(bucket->keys, metadatas); - if (error_code != ErrorCode::OK) { - LOG(ERROR) << "Complete handler failed: " << error_code - << ", Key count: " << bucket->keys.size() - << ", Bucket id: " << bucket_id; - return tl::make_unexpected(error_code); - } - } - // Commit to metadata maps under exclusive lock. - // Check for duplicate keys and rollback if any found. + const std::vector committed_keys = bucket->keys; + const int64_t bucket_data_size = bucket->data_size; + const int64_t bucket_meta_size = bucket->meta_size; + std::vector metadatas_for_notify = metadatas; + + // Commit local metadata before notifying master so concurrent BatchLoad + // never observes a LOCAL_DISK replica while object_bucket_map_ is empty. { SharedMutexLocker lock(&mutex_); // Pre-check for duplicates before modifying any state - for (const auto& key : bucket->keys) { + for (const auto& key : committed_keys) { if (object_bucket_map_.find(key) != object_bucket_map_.end()) { LOG(WARNING) << "Duplicate key detected in BatchOffload: " << key @@ -1358,21 +1354,36 @@ tl::expected BucketStorageBackend::BatchOffload( } // No duplicates found, safe to commit - total_size_ += bucket->data_size + bucket->meta_size; + total_size_ += bucket_data_size + bucket_meta_size; object_bucket_map_.reserve(object_bucket_map_.size() + - bucket->keys.size()); - for (size_t i = 0; i < bucket->keys.size(); ++i) { + committed_keys.size()); + for (size_t i = 0; i < committed_keys.size(); ++i) { auto [it, inserted] = object_bucket_map_.insert( - {bucket->keys[i], std::move(metadatas[i])}); + {committed_keys[i], std::move(metadatas[i])}); if (!inserted) { LOG(ERROR) << "Unexpected duplicate key after pre-check: " - << bucket->keys[i] << ", bucket_id=" << bucket_id; + << committed_keys[i] << ", bucket_id=" << bucket_id; } } buckets_.emplace(bucket_id, std::move(bucket)); lru_index_.emplace(0LL, bucket_id); } + if (complete_handler != nullptr) { + auto error_code = + complete_handler(committed_keys, metadatas_for_notify); + if (error_code != ErrorCode::OK) { + LOG(ERROR) << "Complete handler failed: " << error_code + << ", Key count: " << committed_keys.size() + << ", Bucket id: " << bucket_id + << ". Rolling back local metadata commit."; + RollbackCommittedBucket(bucket_id, committed_keys, bucket_data_size, + bucket_meta_size); + CleanupOrphanedBucket(bucket_id); + return tl::make_unexpected(error_code); + } + } + return bucket_id; } @@ -2126,6 +2137,24 @@ tl::expected BucketStorageBackend::WriteBucket( return {}; } +void BucketStorageBackend::RollbackCommittedBucket( + int64_t bucket_id, const std::vector& keys, + int64_t data_size, int64_t meta_size) { + SharedMutexLocker lock(&mutex_); + for (const auto& key : keys) { + object_bucket_map_.erase(key); + } + buckets_.erase(bucket_id); + for (auto it = lru_index_.begin(); it != lru_index_.end();) { + if (it->second == bucket_id) { + it = lru_index_.erase(it); + } else { + ++it; + } + } + total_size_ -= data_size + meta_size; +} + void BucketStorageBackend::CleanupOrphanedBucket(int64_t bucket_id) { namespace fs = std::filesystem; std::error_code ec; diff --git a/mooncake-store/src/store_c.cpp b/mooncake-store/src/store_c.cpp index a3a19086ef..8bd2bf7a62 100644 --- a/mooncake-store/src/store_c.cpp +++ b/mooncake-store/src/store_c.cpp @@ -58,6 +58,14 @@ mooncake::ReplicateConfig to_replicate_config( return config; } +mooncake::ExistOptions to_exist_options( + const mooncake_exist_options_t *c_options) { + mooncake::ExistOptions options; + if (!c_options) return options; + options.prefetch_to_memory = c_options->prefetch_to_memory != 0; + return options; +} + StoreHandle *as_handle(mooncake_store_t store) { return static_cast(store); } @@ -280,6 +288,42 @@ int mooncake_store_batch_is_exist(mooncake_store_t store, const char **keys, } } +int mooncake_store_is_exist_with_options( + mooncake_store_t store, const char *key, + const mooncake_exist_options_t *options) { + if (!store || !key) return -1; + try { + return as_client(store)->isExist(std::string(key), + to_exist_options(options)); + } catch (...) { + return -1; + } +} + +int mooncake_store_batch_is_exist_with_options( + mooncake_store_t store, const char **keys, size_t count, int *results_out, + const mooncake_exist_options_t *options) { + if (!store || !keys || !results_out) return -1; + for (size_t i = 0; i < count; ++i) { + if (!keys[i]) return -1; + } + try { + std::vector key_vec; + key_vec.reserve(count); + for (size_t i = 0; i < count; ++i) { + key_vec.emplace_back(keys[i]); + } + auto results = + as_client(store)->batchIsExist(key_vec, to_exist_options(options)); + for (size_t i = 0; i < count; ++i) { + results_out[i] = (i < results.size()) ? results[i] : -1; + } + return 0; + } catch (...) { + return -1; + } +} + int64_t mooncake_store_get_size(mooncake_store_t store, const char *key) { if (!store || !key) return -1; try { diff --git a/mooncake-wheel/tests/test_prefetch_on_exist.py b/mooncake-wheel/tests/test_prefetch_on_exist.py new file mode 100644 index 0000000000..064e996b6e --- /dev/null +++ b/mooncake-wheel/tests/test_prefetch_on_exist.py @@ -0,0 +1,509 @@ +"""Python-binding test for the SSD-to-DRAM prefetch-on-exist feature (RFC #2213). + +Validates that ``is_exist(key, ExistOptions(prefetch_to_memory=True))`` / +``batch_is_exist(keys, options)`` triggers asynchronous SSD-to-DRAM promotion +via the dedicated ``FileStorage::PrefetchKeys`` path (not promotion-on-hit +heartbeat queue). + +Test scenario: + 1. Push enough data to overflow DRAM, forcing eviction + offload to turn + warm keys into LOCAL_DISK-only objects. + 2. Identify a LOCAL_DISK-only key from the replica descriptors. + 3. Call ``is_exist(key, options)`` with ``prefetch_to_memory=True``. + 4. Wait for async ``PrefetchKeys`` (thread-pool + batched metadata query). + 5. Assert the key now has a MEMORY replica (prefetch succeeded). + 6. Assert that a subsequent ``get`` serves from MEMORY (not SSD). + +Prerequisites: + - ``mooncake_master`` running with: + ``--enable_offload=true`` + ``--offload_on_evict=true`` + ``--root_fs_dir=`` + - ``MOONCAKE_OFFLOAD_FILE_STORAGE_PATH=`` set on the client side + (or pass ``ssd_offload_path`` via setup). + - ``MOONCAKE_OFFLOAD_BUCKET_KEYS_LIMIT=10`` and + ``MOONCAKE_OFFLOAD_BUCKET_SIZE_LIMIT_BYTES=10485760`` on the client. +""" + +import os +import statistics +import time +import unittest + +try: + import torch_npu + + torch_npu.npu.set_device(int(os.getenv("NPU_DEVICE_ID", "0"))) +except ImportError: + pass + +from mooncake.store import ExistOptions, MooncakeDistributedStore + + +def _prefetch_options(enabled: bool = True) -> ExistOptions: + options = ExistOptions() + options.prefetch_to_memory = enabled + return options + + +DEFAULT_KV_LEASE_TTL = 5000 # ms +default_kv_lease_ttl = int(os.getenv("DEFAULT_KV_LEASE_TTL", DEFAULT_KV_LEASE_TTL)) + +SEGMENT_SIZE = int(os.getenv("SEGMENT_SIZE_BYTES", str(32 * 1024 * 1024))) +LOCAL_BUFFER_SIZE = int(os.getenv("LOCAL_BUFFER_SIZE_BYTES", str(64 * 1024 * 1024))) + +# Async prefetch runs on a bounded thread pool with batched metadata queries; +# poll briefly instead of relying on a single fixed sleep. +PREFETCH_WAIT_SECONDS = int(os.getenv("PREFETCH_WAIT_SECONDS", "15")) +PREFETCH_POLL_INTERVAL_SECONDS = float( + os.getenv("PREFETCH_POLL_INTERVAL_SECONDS", "0.2") +) +EVICTION_WAIT_SECONDS = int(os.getenv("EVICTION_WAIT_SECONDS", "25")) + + +def setup_store(store, local_hostname=None): + protocol = os.getenv("PROTOCOL", "tcp") + device_name = os.getenv("DEVICE_NAME", "") + if local_hostname is None: + local_hostname = os.getenv("LOCAL_HOSTNAME", "127.0.0.1") + metadata_server = os.getenv("MC_METADATA_SERVER", "P2PHANDSHAKE") + master_server_address = os.getenv("MASTER_SERVER", "127.0.0.1:50051") + ssd_offload_path = os.getenv("MOONCAKE_OFFLOAD_FILE_STORAGE_PATH", "") + tenant_id = os.getenv("MOONCAKE_TENANT_ID", "default") + prefetch_cooldown_sec = int(os.getenv("SSD_PREFETCH_COOLDOWN_SEC", "5")) + prefetch_dedup_ttl_sec = int(os.getenv("SSD_PREFETCH_DEDUP_TTL_SEC", "30")) + + retcode = store.setup( + local_hostname, + metadata_server, + SEGMENT_SIZE, + LOCAL_BUFFER_SIZE, + protocol, + device_name, + master_server_address, + None, # engine + True, # enable_ssd_offload + ssd_offload_path, + tenant_id, + prefetch_cooldown_sec, + prefetch_dedup_ttl_sec, + ) + if retcode: + raise RuntimeError(f"Failed to setup store client. Return code: {retcode}") + + +def _replica_types(descs, key): + infos = descs.get(key) if isinstance(descs, dict) else None + if infos is None: + return [] + if not isinstance(infos, (list, tuple)): + infos = [infos] + tags = [] + for info in infos: + if hasattr(info, "is_memory_replica") and info.is_memory_replica(): + tags.append("MEMORY") + elif hasattr(info, "is_local_disk_replica") and info.is_local_disk_replica(): + tags.append("LOCAL_DISK") + elif hasattr(info, "is_disk_replica") and info.is_disk_replica(): + tags.append("DISK") + else: + tags.append("UNKNOWN") + return tags + + +def _find_cold_key(store, keys): + """Find a key that exists only on LOCAL_DISK (no MEMORY replica).""" + descs = store.batch_get_replica_desc(list(keys)) + for key in keys: + types = _replica_types(descs, key) + if types and all("MEMORY" not in t for t in types) and any("LOCAL_DISK" in t for t in types): + return key + return None + + +def _find_cold_keys(store, keys): + """Return all LOCAL_DISK-only keys from *keys*.""" + descs = store.batch_get_replica_desc(list(keys)) + cold_keys = [] + for key in keys: + types = _replica_types(descs, key) + if types and all("MEMORY" not in t for t in types) and any("LOCAL_DISK" in t for t in types): + cold_keys.append(key) + return cold_keys + + +def _count_memory_replicas(store, keys): + descs = store.batch_get_replica_desc(list(keys)) + return sum(1 for key in keys if "MEMORY" in _replica_types(descs, key)) + + +def _wait_for_memory_replicas( + store, + keys, + min_promoted=1, + timeout_seconds=PREFETCH_WAIT_SECONDS, +): + """Poll replica descriptors until at least *min_promoted* keys have MEMORY.""" + deadline = time.time() + timeout_seconds + promoted = 0 + while time.time() < deadline: + promoted = _count_memory_replicas(store, keys) + if promoted >= min_promoted: + return promoted + time.sleep(PREFETCH_POLL_INTERVAL_SECONDS) + return promoted + + +def _overflow_dram(store, num_keys=96, value_size=1024 * 1024): + """Put enough data to overflow DRAM and trigger offload-on-evict. + Returns {key: value} for successfully stored keys.""" + timestamp = int(time.time()) + keys = [f"prefetch_{i}_{timestamp}" for i in range(num_keys)] + reference = {} + for key in keys: + value = os.urandom(value_size) + if store.put(key, value) == 0: + reference[key] = value + return reference + + +class TestPrefetchOnExist(unittest.TestCase): + """Validates that is_exist(prefetch=True) triggers SSD→DRAM promotion.""" + + @classmethod + def setUpClass(cls): + cls.store = MooncakeDistributedStore() + setup_store(cls.store) + + def test_single_key_prefetch(self): + """is_exist(key, prefetch=True) should promote a LOCAL_DISK-only key.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0, "No PUTs succeeded") + + try: + # Wait for eviction + offload to create LOCAL_DISK replicas + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + self.assertIsNotNone( + cold_key, + "No LOCAL_DISK-only key found. Is offload_on_evict=true " + "and the segment small enough to overflow?", + ) + print(f"Found LOCAL_DISK-only key: {cold_key}") + + # Trigger prefetch via is_exist + result = self.store.is_exist(cold_key, _prefetch_options()) + self.assertEqual(result, 1, "is_exist should return 1 for existing key") + + # Wait for async prefetch (batched metadata + thread pool) + promoted = _wait_for_memory_replicas(self.store, [cold_key], min_promoted=1) + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + print(f"Replica types after prefetch: {types} (promoted={promoted})") + self.assertIn( + "MEMORY", + types, + f"Expected a MEMORY replica for {cold_key} after prefetch, " + f"got types={types}. Prefetch may have failed or not triggered.", + ) + + # Verify the data is correct and served from MEMORY (not SSD) + offload_count_before = self.store.get_offload_rpc_read_count() + got = self.store.get(cold_key) + offload_count_after = self.store.get_offload_rpc_read_count() + self.assertEqual( + got, + reference[cold_key], + "Data mismatch after prefetch", + ) + self.assertEqual( + offload_count_after, + offload_count_before, + f"Post-prefetch read still hit LOCAL_DISK (offload RPC count " + f"went from {offload_count_before} to {offload_count_after}). " + f"MEMORY replica may not be preferred.", + ) + print("PASS: is_exist(prefetch=True) successfully promoted key to DRAM") + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_batch_prefetch(self): + """batch_is_exist(keys, prefetch=True) should promote LOCAL_DISK-only keys.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0, "No PUTs succeeded") + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + all_keys = list(reference.keys()) + cold_keys = _find_cold_keys(self.store, all_keys) + + self.assertGreater(len(cold_keys), 0, "No LOCAL_DISK-only keys found") + print(f"Found {len(cold_keys)} LOCAL_DISK-only keys for batch prefetch") + + # Trigger batch prefetch + results = self.store.batch_is_exist(cold_keys, _prefetch_options()) + self.assertEqual(len(results), len(cold_keys)) + for r in results: + self.assertEqual(r, 1, "batch_is_exist should return 1 for existing keys") + + promoted_count = _wait_for_memory_replicas( + self.store, cold_keys, min_promoted=1 + ) + print(f"Promoted {promoted_count}/{len(cold_keys)} keys to DRAM") + self.assertGreater( + promoted_count, + 0, + f"No keys were promoted after batch_is_exist(prefetch=True). " + f"Prefetch may not be triggering.", + ) + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_prefetch_false_does_not_promote(self): + """is_exist(key, prefetch=False) should NOT trigger promotion.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0) + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + if cold_key is None: + self.skipTest("No LOCAL_DISK-only key found; cannot run negative test") + + # Call without prefetch + result = self.store.is_exist(cold_key, _prefetch_options(False)) + self.assertEqual(result, 1) + + # Short wait; if prefetch were triggered, it would complete quickly + time.sleep(5) + + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + self.assertNotIn( + "MEMORY", + types, + f"Key {cold_key} got a MEMORY replica after is_exist(prefetch=False). " + f"Prefetch should NOT fire when prefetch=False.", + ) + print("PASS: is_exist(prefetch=False) correctly did NOT trigger promotion") + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_prefetch_memory_key_is_noop(self): + """Prefetch on a key already in MEMORY should be a no-op.""" + key = f"mem_only_{int(time.time())}" + value = os.urandom(1024) + try: + self.assertEqual(self.store.put(key, value), 0) + + # Key should be in MEMORY + result = self.store.is_exist(key, _prefetch_options()) + self.assertEqual(result, 1) + + # Verify it's still just MEMORY (no extra replicas) + descs = self.store.batch_get_replica_desc([key]) + types = _replica_types(descs, key) + self.assertIn("MEMORY", types) + print(f"PASS: prefetch on MEMORY key is no-op, types={types}") + finally: + self.store.remove(key) + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +@unittest.skipUnless( + os.getenv("MC_BENCH_PREFETCH_LATENCY"), + "opt-in benchmark — set MC_BENCH_PREFETCH_LATENCY=1 to run.", +) +class BenchPrefetchLatency(unittest.TestCase): + """Latency benchmark comparing reads before and after prefetch. + + Measures: + - Phase A: LOCAL_DISK read latency (before prefetch) + - Phase B: MEMORY read latency (after prefetch via is_exist) + - Speedup: phase_A / phase_B at p50/p95/p99 + """ + + @classmethod + def setUpClass(cls): + cls.store = MooncakeDistributedStore() + setup_store(cls.store) + + def test_latency_comparison(self): + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0) + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + self.assertIsNotNone(cold_key) + expected_bytes = reference[cold_key] + + N = int(os.getenv("LATENCY_SAMPLES", "200")) + + # Phase A: timed reads from LOCAL_DISK (before prefetch) + pre_latencies_ms = [] + for _ in range(N): + t0 = time.perf_counter() + got = self.store.get(cold_key) + t1 = time.perf_counter() + self.assertEqual(got, expected_bytes) + pre_latencies_ms.append((t1 - t0) * 1000.0) + + # Trigger prefetch + self.store.is_exist(cold_key, _prefetch_options()) + promoted = _wait_for_memory_replicas(self.store, [cold_key], min_promoted=1) + + # Verify promotion happened + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + self.assertIn( + "MEMORY", + types, + f"Prefetch did not create MEMORY replica (promoted={promoted})", + ) + + # Phase B: timed reads from MEMORY (after prefetch) + post_latencies_ms = [] + for _ in range(N): + t0 = time.perf_counter() + got = self.store.get(cold_key) + t1 = time.perf_counter() + self.assertEqual(got, expected_bytes) + post_latencies_ms.append((t1 - t0) * 1000.0) + + def _pcts(samples): + if len(samples) >= 2: + cuts = statistics.quantiles(samples, n=100) + return {50: cuts[49], 95: cuts[94], 99: cuts[98]} + v = samples[0] if samples else 0.0 + return {50: v, 95: v, 99: v} + + pre = _pcts(pre_latencies_ms) + post = _pcts(post_latencies_ms) + + print() + print(f"=== prefetch-on-exist latency comparison (key={cold_key}) ===") + print( + f"BEFORE prefetch (LOCAL_DISK): n={len(pre_latencies_ms)}, " + f"p50={pre[50]:.2f}ms, p95={pre[95]:.2f}ms, p99={pre[99]:.2f}ms, " + f"min={min(pre_latencies_ms):.2f}ms, max={max(pre_latencies_ms):.2f}ms" + ) + print( + f"AFTER prefetch (MEMORY): n={len(post_latencies_ms)}, " + f"p50={post[50]:.2f}ms, p95={post[95]:.2f}ms, p99={post[99]:.2f}ms, " + f"min={min(post_latencies_ms):.2f}ms, max={max(post_latencies_ms):.2f}ms" + ) + speedup_50 = pre[50] / post[50] if post[50] > 0 else float("inf") + speedup_95 = pre[95] / post[95] if post[95] > 0 else float("inf") + speedup_99 = pre[99] / post[99] if post[99] > 0 else float("inf") + print(f"speedup: p50={speedup_50:.1f}x | p95={speedup_95:.1f}x | p99={speedup_99:.1f}x") + + self.assertLess( + post[50], + pre[50] / 1.3, + f"Post-prefetch p50 ({post[50]:.2f}ms) not meaningfully lower than " + f"pre-prefetch p50 ({pre[50]:.2f}ms). MEMORY may not be preferred.", + ) + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +@unittest.skipUnless( + os.getenv("MC_TEST_CROSS_NODE"), + "opt-in cross-node test — set MC_TEST_CROSS_NODE=1 and provide two " + "endpoints via NODE_A_HOSTNAME / NODE_B_HOSTNAME (e.g. ip:port).", +) +class TestCrossNodePrefetch(unittest.TestCase): + """Validates方案B: cross-node SSD prefetch delegation to the holder node. + + Scenario (two store clients sharing one Master): + 1. Node A puts keys and overflows DRAM so they are evicted to A's SSD. + A is therefore the LOCAL_DISK holder (replica transport_endpoint == A). + 2. Node B calls batch_is_exist(keys, prefetch_to_memory=True). Because the + holder endpoint != B's own endpoint, B delegates the promotion to A via + the prefetch_offload_object RPC instead of calling RegisterPrefetchTask + locally (which would fail Master's holder check with INVALID_PARAMS). + 3. A registers the promotion task with its own client_id and stages the + object SSD->DRAM. The key gains a MEMORY replica. + + Requires a running master with --enable_offload=true --offload_on_evict=true + and two reachable endpoints. Both stores run in this process but bind + distinct offload RPC servers / segments. + """ + + @classmethod + def setUpClass(cls): + node_a = os.getenv("NODE_A_HOSTNAME", "127.0.0.1:17001") + node_b = os.getenv("NODE_B_HOSTNAME", "127.0.0.1:17002") + cls.store_a = MooncakeDistributedStore() + cls.store_b = MooncakeDistributedStore() + setup_store(cls.store_a, local_hostname=node_a) + setup_store(cls.store_b, local_hostname=node_b) + + def test_remote_holder_prefetch(self): + # Node A populates and overflows so keys land as LOCAL_DISK-only on A. + reference = _overflow_dram(self.store_a) + self.assertGreater(len(reference), 0, "No PUTs succeeded on node A") + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + all_keys = list(reference.keys()) + cold_keys = _find_cold_keys(self.store_a, all_keys) + self.assertGreater( + len(cold_keys), 0, "No LOCAL_DISK-only keys produced on node A" + ) + print(f"Node A holds {len(cold_keys)} LOCAL_DISK-only key(s)") + + # Node B triggers prefetch. B must delegate to holder A via RPC. + results = self.store_b.batch_is_exist(cold_keys, _prefetch_options()) + self.assertEqual(len(results), len(cold_keys)) + for r in results: + self.assertEqual(r, 1, "batch_is_exist should return 1") + + promoted = _wait_for_memory_replicas( + self.store_b, cold_keys, min_promoted=1 + ) + print(f"Cross-node promoted {promoted}/{len(cold_keys)} key(s) to DRAM") + self.assertGreater( + promoted, + 0, + "No keys were promoted via cross-node delegation. Check that the " + "holder received prefetch_offload_object and no INVALID_PARAMS " + "(RegisterPrefetchTask holder check) was logged.", + ) + finally: + for key in reference: + try: + self.store_a.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +if __name__ == "__main__": + unittest.main()