From d52fb53982686d4124b4a8bb7f382d729ad8cc18 Mon Sep 17 00:00:00 2001 From: h30027576 Date: Sat, 27 Jun 2026 10:49:22 +0800 Subject: [PATCH 1/2] [Feature]Prefetch SSD-Only Objects to DRAM on Exist Signed-off-by: h30027576 --- docs/source/design/ssd-prefetch.md | 331 +++++++++++++ mooncake-integration/store/store_py.cpp | 57 ++- mooncake-store/include/client_service.h | 17 + mooncake-store/include/dummy_client.h | 28 +- mooncake-store/include/file_storage.h | 28 ++ mooncake-store/include/master_client.h | 32 ++ mooncake-store/include/master_service.h | 22 + mooncake-store/include/pyclient.h | 26 +- mooncake-store/include/real_client.h | 297 ++++++++++- mooncake-store/include/replica.h | 16 + mooncake-store/include/rpc_service.h | 9 + mooncake-store/include/storage_backend.h | 8 + mooncake-store/include/store_c.h | 13 + mooncake-store/include/types.h | 17 + mooncake-store/src/client_service.cpp | 51 ++ mooncake-store/src/dummy_client.cpp | 14 +- mooncake-store/src/file_storage.cpp | 122 +++++ mooncake-store/src/master_client.cpp | 49 ++ mooncake-store/src/master_service.cpp | 91 ++++ mooncake-store/src/real_client.cpp | 602 ++++++++++++++++++++++- mooncake-store/src/rpc_service.cpp | 58 +++ mooncake-store/src/storage_backend.cpp | 63 ++- mooncake-store/src/store_c.cpp | 44 ++ 23 files changed, 1918 insertions(+), 77 deletions(-) create mode 100644 docs/source/design/ssd-prefetch.md diff --git a/docs/source/design/ssd-prefetch.md b/docs/source/design/ssd-prefetch.md new file mode 100644 index 0000000000..a48ab75886 --- /dev/null +++ b/docs/source/design/ssd-prefetch.md @@ -0,0 +1,331 @@ +# SSD Prefetch-on-Exist 设计文档 + +> 关联:RFC #2213(prefetch-on-exist)、PR #2071(L2→L1 promotion-on-hit) +> 适用分支:`Mooncake`(main)。本特性最初在 v0.3.11 上实现并验证,现已同步至 main。 + + +## 1. 背景与目标 + +### 1.1 问题来源 +在 vLLM-Ascend 的 KV 三级池化(HBM / DRAM / SSD)场景下,KV cache 会随 DRAM 容量不足被驱逐(offload)到 SSD(`LOCAL_DISK` 副本)。当请求命中一个**只在 SSD、不在 DRAM** 的 key 时,`get()` 需要从 SSD 读盘,延迟显著高于 DRAM 命中。 + +vLLM 的请求处理是多阶段流水线,`exists()`(调度阶段的 cache 探测)与 `get()`(worker 真正加载 KV)之间存在一个时间窗口(实测中位数 15~17s,因调度排队而被放大): + +``` +阶段1: Scheduler 调度 阶段2: 当前 batch 前向计算 阶段3: Worker 加载 KV + get_num_new_matched_tokens() GPU/NPU forward (耗时) start_load_kv() + └─ batch_is_exist() ◄─探测 └─ get() ◄─真正读数据 + │ │ + └────────────── 这个窗口内可以偷偷把 SSD→DRAM 预热 ──────────────┘ +``` + +### 1.2 目标 +利用 `exists()` 到 `get()` 的时间窗口,在探测阶段**异步、尽力而为(best-effort)**地把 SSD-only 的 key 预取(promote)到 DRAM,使后续 `get()` 命中 DRAM,降低 TTFT。 + +### 1.3 设计约束 +- prefetch 必须是 `exists()` 的**附加动作**,不能改变 `exists()` 的语义、不能阻塞调度。 +- 不能污染既有 promotion-on-hit / eviction / 指标系统的行为。 +- 失败可丢弃(best-effort),绝不能影响正常 offload / get。 +- 需支持 **PD 分离 / 跨节点**:发起预取的节点 A 发现 key 的 `LOCAL_DISK` 副本在节点 B,应让 B 从自己的 SSD 读入自己的 DRAM(“方案 B:跨节点委托”),而非把数据先搬到 A。 + +## 2. 当前实现的方案(采用) + +### 2.1 总览:prefetch 专用路径,复用 promotion 的“执行底座”,绕开 promotion-on-hit 的“准入与节奏” + +``` +is_exist(keys, ExistOptions{prefetch_to_memory=true}) + └─ triggerSsdPrefetch(keys) → prefetch_pool_(固定大小 4) + └─ 按 chunk(128 key)BatchQueryForPrefetch(chunk) # 批量只读元数据,1 次 RPC/chunk + └─ ClassifySsdPrefetchRoute:过滤 SSD-only(有 LOCAL_DISK、无 MEMORY)且 size>0 + ├─ 本节点持有 LOCAL_DISK:RunLocalPrefetchRegisterAndPromote(chunk_local) # 立即 register + PrefetchKeys(流水线,不等待后续 chunk) + └─ 远端节点持有:收集到 remote_keys,全部 chunk 完成后 prefetch_offload_object RPC 委托 holder + └─ holder 侧 runLocalPrefetch → RunLocalPrefetchRegisterAndPromote # 在 holder 本地完成 +``` + +**阶段 1 批量化 + 流水线化(2026-06)**:早期实现是对每个 key 单独 `QueryForPrefetch`,且**等整批 metadata 查完**才统一 `RegisterPrefetchTask` + `PrefetchKeys`。大批量(如 954 key)时,首个 register 可晚于首个 `get()` 约百毫秒,导致 `prefetch_miss_race` 偏高。现改为: +- **批量化**:`BatchQueryForPrefetch` / `BatchGetReplicaListForPrefetch`,每 chunk 最多 128 key,RPC 次数从 O(N) 降为 O(N/128)。 +- **流水线化**:每个 chunk 查完 metadata 后**立即**对本 chunk 的 local key 执行 register + promote,不再阻塞后续 chunk 的 register。 +- **defer reserve**:`PrefetchThrottle::reserve()` 从 exist 同步路径移到 **BatchQuery 确认 SSD-only 之后**,避免「key 已在 DRAM 仍记 trigger」的误统计与无效 async 工作。 + +`RunLocalPrefetchRegisterAndPromote` 封装「逐 key RegisterPrefetchTask → markInFlight → PrefetchKeys → cooldown」的本地提升逻辑,供 `triggerSsdPrefetch` 各 chunk 与 `runLocalPrefetch`(RPC 委托入口)共用。 + +`PrefetchKeys()` 内部复用 Mooncake 既有的 promotion 执行链: +`PromotionAllocStart`(master 分配 PROCESSING MEMORY 副本)→ `FileStorage::AllocateBatch`(本地暂存)→ `BatchLoad`(SSD 读入)→ `PromotionWrite`(Transfer Engine 写入 DRAM)→ `NotifyPromotionSuccess`(标记 MEMORY 副本 COMPLETE)。 + +### 2.2 关键设计点:为什么“专用路径”而不是直接复用 promotion-on-hit + +prefetch **只复用 promotion 的执行底座**(`promotion_tasks` / `PromotionAllocStart` / `PromotionWrite`),而**不复用 promotion-on-hit 的触发与调度**。为此新增了 prefetch 专用 master RPC: + +| RPC | 作用 | 关键:不做什么 | +|---|---|---| +| `GetReplicaListForPrefetch(key)` | 只读取 replica 元数据用于判定 SSD-only | **不**发 lease、**不**记 sketch、**不**累加 `valid_get_nums` 等指标、**不**入 promotion 队列 | +| `BatchGetReplicaListForPrefetch(keys)` | 同上,批量版;client 侧 `BatchQueryForPrefetch` 封装 | 同上;master 侧当前为循环调 `GetReplicaListForPrefetch`(与 `BatchGetReplicaList` 同模式),后续可优化为真批量 | +| `RegisterPrefetchTask(client_id, key)` | 在 master 登记一个 `promotion_tasks`(供 `PromotionAllocStart` 用) | **不**走 `TryPushPromotionQueue` 的准入闸、**不**推 holder 的 `promotion_objects` 心跳队列 | + +与原生 `GetReplicaList`(main, master_service.cpp)对照可见区别:原生路径会 `inc_*_cache_hit_nums` / `inc_valid_get_nums` / `GrantLease` / `TryPushPromotionQueue`,而 `GetReplicaListForPrefetch` 全部不做。 + +### 2.3 节流:防止 prefetch 风暴(cooldown + dedup) +vLLM 调度的 busy-loop 会对 waiting 队列里的同一请求**反复探测**同一批 SSD-only block,导致 prefetch 被高频触发。为此引入两个客户端侧节流参数(`PrefetchThrottle`): + +| 参数(mooncake.json,单位秒) | 默认值 | 含义 | +|---|---|---| +| `ssd_prefetch_dedup_ttl_sec` | 30 | 同一 key 在该 TTL 内最多触发一次 prefetch,抑制并发重复探测(**主要起效项**) | +| `ssd_prefetch_cooldown_sec` | 5 | DRAM 饱和后的退避窗口:窗口内跳过 prefetch,让 eviction/offload 先回收内存,避免与 promotion 抢内存 | + +> 实测中 `dedup_ttl` 是消除 prefetch 风暴、解决 offload 饿死的主要因素;`cooldown` 在本数据集规模下未触发(DRAM 未被打满,offload 跟得上)。 + +### 2.4 并发边界:固定线程池 +`triggerSsdPrefetch` / `runLocalPrefetch` 使用固定大小的 `prefetch_pool_`(`mooncake::ThreadPool`,大小 4,对齐 `ClientService::task_thread_pool_(4)`),替代早期每次 detached `std::thread` 的写法,**限制并发 SSD 读 + DRAM 分配数量**,避免海量 detached 线程争抢同一物理 SSD 的 IOPS/带宽并向 master 狂申请 DRAM。 + +`submitPrefetchJob` 统一入池;当池不可用(未初始化 / 正在关闭)时**直接丢弃该 best-effort 任务,绝不退化为 detached `std::thread`**——后者正是 B1 prefetch 风暴的反模式。由门控保证 `triggerSsdPrefetch` 仅在 `file_storage_` 存在时触发,而 `initPrefetchRuntime()` 与 `file_storage_` 同在 `enable_ssd_offload` 分支内建立,故池不可用分支在正常运行下不可达,丢弃仅作为不变式兜底。 + +#### 线程使用规范(本特性及后续修改一律遵循) +对照 Mooncake 原生设计,线程分两类,**严禁随负载(每请求 / 每 Key)创建线程**: + +| 类别 | 适用场景 | 正确做法 | Mooncake 原生先例 | +|---|---|---|---| +| A. 固定角色的长生命周期守护循环 | eviction、heartbeat、monitor、ipc server、GC 等单例后台循环 | 每个角色一个专用 `std::thread`/`jthread`,存为成员,进程生命周期内常驻(线程数 = O(角色数),小常数,与负载无关) | `master_service.cpp` 的 eviction/monitor/cleanup/dispatch;`client_service.cpp` 的 leader monitor/storage heartbeat/task poll;`file_storage.cpp` 的 heartbeat/GC | +| B. 随负载增长的高频短任务 | prefetch promotion、分段重试等数量随请求量/Key 数膨胀的工作 | 一律走固定大小 `ThreadPool::enqueue` 封顶并发,**不得**裸起线程;池满/不可用时丢弃或排队,不绕过上限 | `client_service.cpp` 的 `task_thread_pool_(4)` | + +> 判定口诀:线程数会不会随请求量/Key 数增长?会 → 必须用 ThreadPool(B 类);不会、是固定角色的常驻循环 → 用专用 `std::thread` 成员(A 类)。prefetch 属 B 类,故用 `prefetch_pool_`。B1 的根因正是把 B 类工作误用了 A 类(每次探测 detached 线程)手法。 + +### 2.5 get()-side “等一次” 机制 +若 `get()` 命中 SSD-only key(`LOCAL_DISK` 为最优副本),可在预算内轮询等待 promotion 完成后再选 DRAM 副本: +- 配置项 `ssd_get_wait_ms`(mooncake.json)或环境变量 `MOONCAKE_SSD_GET_WAIT_MS`(默认 **10ms**;`0` = 关闭)。 +- 观测与等待逻辑挂在 **`batch_get_into_multi_buffers_internal`**(vLLM-Ascend 实际批量 get 路径),合并输出 `[GET-SRC]` + `[PREFETCH-OUTCOME]` 日志。 +- **TP0(local,`prefetch_wait_mode=local`)**:本进程 `PrefetchThrottle` 有 trigger 记录时,轮询 throttle 完成状态 + `TryRefreshBestMemoryReplica` 再 Query master。 +- **TP1~7(master,`prefetch_wait_mode=master`)**:本进程无 trigger 时(exist 在 TP0 触发、get 在其它 rank),仍对 SSD-only key **轮询 Query master**,直到出现 COMPLETE 的 MEMORY 副本或预算耗尽;不增加 exist 次数。 +- 等待后若出现 MEMORY 副本则走 DRAM;否则回退到原 SSD 读路径。 +- `ClassifyPrefetchOutcome` 输出(`[PREFETCH-OUTCOME]`,见分析脚本 `analyze_prefetch.sh` 指标说明): + - `prefetch_hit`:promotion 在 get 前完成(`done_ms<=get_ms`)且 source=DRAM + - `prefetch_promoted_untracked`:曾 RegisterPrefetchTask(BatchQuery 时为 SSD-only),get 时已是 DRAM 但 done 未记入 throttle + - `prefetch_miss_race`:**预取失败/超时**——本 rank 已 trigger(`prefetch_trigger_ms>=0`)或 `promote_attempted=true`,get 仍 source=SSD + - `prefetch_evicted_after_exist`:**驱逐后读SSD**——本 rank 无 trigger 且 `promote_attempted=false`,get 仍 source=SSD(exist 时可能在 DRAM,窗口内被驱逐;或 TP1~7 master 等待超时) + - `prefetch_dram_was_resident`(脚本称 **exist误trigger**):get 读 DRAM,但 key 本来就在 DRAM、本 rank **未** RegisterPrefetchTask;误入 prefetch 等待路径。**defer reserve 后应≈0** + - `dram_resident`:get 读 DRAM,且**从未**参与 prefetch(无 trigger、无 wait,`prefetch_wait_mode=none`) + - `prefetch_failed`:`PrefetchKeys` promotion 失败(throttle 状态 `kFailed`) + - ~~`ssd_no_prefetch`~~:已并入 `prefetch_evicted_after_exist`(旧日志仍可能出现) +- `[GET-SRC]` 日志另含 `prefetch_promote_attempted=0|1`(本 rank 是否已走 RegisterPrefetchTask / markInFlight),供 outcome 分类与脚本交叉统计(如驱逐子项「无 register / 有 register」)。 +- **SSD 预取有效读 DRAM**(分析脚本核心指标)= `prefetch_hit + prefetch_promoted_untracked`,占 GET 总条数比例;`prefetch_dram_was_resident` / `dram_resident` 不计入。 +- 批量化流水线缩短「exist → 首个 register」延迟后,10ms 预算内 `prefetch_hit` 比例预期上升。若 `exist→get` 间隔仍大于 `default_kv_lease_ttl`,预取 key 仍可能在 get 前被驱逐(见 5.3);此时应调大 master 的 `--default_kv_lease_ttl` 而非单独 prefetch 参数。 + +### 2.6 配置开关(vLLM-Ascend 侧 mooncake.json) +| 字段 | 含义 | +|---|---| +| `enable_ssd_offload` | 必须 true,否则无 SSD-only key,prefetch 无意义 | +| `ssd_offload_path` | SSD 存储目录(绝对路径,需与 master `--root_fs_dir` 一致) | +| `enable_ssd_prefetch` | 开启 prefetch-on-exist | +| `ssd_prefetch_cooldown_sec` / `ssd_prefetch_dedup_ttl_sec` | 可选;不配置则用 Mooncake 默认(5s/30s),vLLM-Ascend 侧不设默认 | +| `ssd_get_wait_ms` | get 侧等待 prefetch 完成的最大预算(毫秒);默认 10,`0` 关闭 | + +### 2.7 预取提升后的 lease 保护 +`NotifyPromotionSuccess` 在 `from_prefetch=true` 时,对提升出的 MEMORY 副本发放与 `exist`/`get` **相同**的租约:`GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)`(grouped key 走 `GrantLeaseForGroup`,逻辑与 `GetReplicaList` 一致)。 + +| 参数 | 配置方式 | 默认值 | 含义 | +|---|---|---|---| +| `default_kv_lease_ttl` | master 启动参数 `--default_kv_lease_ttl`(ms) | 5000 | 硬租约:窗口内不会被容量驱逐 | +| `default_kv_soft_pin_ttl` | master 启动参数 `--default_kv_soft_pin_ttl`(ms) | 见 master 配置 | soft-pin 续期,与 exist/get 一致 | + +> **已废弃**:早期曾用环境变量 `MOONCAKE_SSD_PREFETCH_PROTECT_SEC`(默认 2s soft-pin、**lease=0**)单独保护预取 key;现统一复用 master 的 `default_kv_lease_ttl` / `default_kv_soft_pin_ttl`,该环境变量**不再读取**。 + +### 2.8 硬 lease 调优与 Put 容量风险 + +**背景**:早期实现中,预取提升出的 MEMORY 副本在 `NotifyPromotionSuccess` 时若仅 `GrantLease(0, soft_pin_ttl)`,硬 lease 立即过期,key 在 `exist→get` 窗口(实测中位 15~17s)内易被容量驱逐回 SSD,`prefetch_evicted_after_exist` 偏高。 + +**变更 1(代码,2026-06)——预取提升后的 lease 对齐 exist/get** + +| 维度 | 变更前 | 变更后 | +|---|---|---| +| 触发点 | `NotifyPromotionSuccess`,`from_prefetch=true` | 同左 | +| 租约 | `GrantLease(0, protect_ms)` 或等价 soft-pin-only | `GrantLeaseForGroup` → **`GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)`** | +| 与 exist/get | 不一致(硬 lease=0) | **与 `GetReplicaList` / `BatchExist` 续租行为一致** | + +实现见 `master_service.cpp` 中 `NotifyPromotionSuccess` 的 `from_prefetch` 分支(§5.3)。 + +**变更 2(部署配置)——按需调大 master 硬 lease TTL** + +| 参数 | 默认值 | 推荐调优场景 | 目的 | +|---|---|---|---| +| `--default_kv_lease_ttl` | 5000 ms | `exist→get` 中位数接近或超过 5s 时,可增至 **10000 ms** 或更高 | 覆盖更长调度窗口,降低预取 key 在 get 前被驱逐概率 | +| `--default_kv_soft_pin_ttl` | master 默认 | 与 exist/get 一致 | soft-pin 续期,与变更 1 配套 | + +> **注意**:`GetReplicaListForPrefetch` / `BatchGetReplicaListForPrefetch` **仍不发 lease**(§2.2);上述 lease 仅作用于 **预取 promotion 完成**(`NotifyPromotionSuccess`)以及后续 **exist/get 路径上的续租**。预取 metadata 查询本身不会 pin 对象。 + +**Put 失败风险(容量竞争,属预期内副作用)** + +硬 lease + soft-pin 会延长 MEMORY 副本的驱逐豁免期。在 **DRAM 高水位**(如 `memory_ratio>0.9`)、**高并发长上下文**、且 **`offload_on_evict` 队列 defer**(Master 日志 `[EVICT] No memory freed … deferred for disk offload`)时,Master 当轮可能腾不出 segment handle,Client 侧出现: + +``` +BatchPut failed for N keys due to insufficient space (NO_AVAILABLE_HANDLE) +``` + +| 维度 | 说明 | +|---|---| +| **是否异常** | 高压 + 大 lease 下的**容量竞争**,非功能 bug | +| **是否影响当前请求推理** | **一般不影响**:上层 `put()` 通常仅记录错误、不中断已完成的 prefill;Put 在后台线程异步执行 | +| **实际影响** | 失败 block **未写入 Mooncake 外部池**,后续同 prefix 请求的 external hit 可能下降,需 NPU 重算或走 SSD get | +| **缓解** | 增大 `global_segment_size`、降低 `eviction_high_watermark_ratio`、压测轮次间 idle 等待 offload drain、或权衡缩短 `default_kv_lease_ttl`(会增加驱逐后读 SSD 风险) | + +**与 offload 提交顺序(§8 B10)的关系**:lease 对齐解决预取 key **被容量驱逐**;`BatchOffload` 先 commit 本地索引再 `NotifyOffloadSuccess` 解决 **Master 已登记 LOCAL_DISK、Client 索引未就绪** 导致的 get `INVALID_KEY`。二者正交,生产环境应同时合入。 + +## 3. 为什么这样设计(决策理由) + +### 3.1 复用 promotion 执行底座 +`PromotionAllocStart → PromotionWrite → NotifyPromotionSuccess` 已是 Mooncake 成熟的、带 PROCESSING 中间态保护(`is_completed()==false` 期间 eviction 不会动它)的 SSD→DRAM 数据通路。重复造一套传输/分配/状态机既冗余又易错。 + +### 3.2 绕开 promotion-on-hit 的触发与节奏(核心) +直接复用 promotion-on-hit 队列存在 5 个问题,故另起专用路径(见第 4 节详述)。 + +### 3.3 跨节点委托(方案 B) +KV 副本可能在远端节点的 SSD 上。让 holder 节点“就地”把自己的 SSD 读入自己的 DRAM,避免无谓的跨节点数据搬运;原生 `get` 已支持从远端 DRAM 取数,预取只需把数据暖到 holder 的 DRAM 即可。 + +## 4. 不采用的错误方案及原因 + +### 方案 X(错误):直接复用 promotion-on-hit 队列触发 prefetch +即 `exist → Query() 入队 → 循环调 ProcessPromotionTasks()`。能跑通,但长期有 5 个问题,故否决: + +| # | 问题 | 说明 | +|---|---|---| +| 1 | **准入闸挡住 prefetch** | promotion-on-hit 入队前有频率门槛(Count-Min sketch)、DRAM 水位、队列上限。prefetch 的典型场景是“第一次 exist 就要暖”,但 `promotion_admission_threshold>1` 时第一次根本不入队,prefetch 形同未发生 | +| 2 | **`Query` 副作用污染系统** | prefetch 内用 `Query()` 走的是 `GetReplicaList`:会发 lease(pin 住对象、扰乱 eviction/offload 节奏)、抬高 sketch 热度、污染 `valid_get_nums`/cache hit 指标。一次“只查存在”被做成“假装 Get” | +| 3 | **执行节奏是为‘懒提升’设计** | promotion-on-hit 每次心跳最多吐 1 个 task、默认 10s 心跳一次(UT 要等 25s)。prefetch 需要亚秒~数秒内完成,靠心跳来不及。手动循环调 `ProcessPromotionTasks()` 是在“慢车道开快车”,与原设计拧着且易与心跳线程并发处理同一队列 | +| 4 | **与专用 `PrefetchKeys` 双路径打架** | 若 `Query 入队` 与 `PrefetchKeys 直调` 同时跑,会对同一 key 各做一次 promotion,触发 `REPLICA_IS_NOT_READY` | +| 5 | **资源争抢** | promotion 队列上限 / DRAM alloc / in-flight slot 全局共享。大批量 prefetch 会挤掉真正 Get 触发的 hot-key promotion,或在 DRAM 紧张时互相阻塞 | + +**采用方案如何规避:** 用只读 `GetReplicaListForPrefetch`(解决 1、2)+ 立即执行的 `PrefetchKeys`(解决 3)+ `RegisterPrefetchTask` 不推心跳队列(解决 4)+ 独立 RPC 不蹭 admission(解决 5)。 + +### 其它否决的小方案 +- **每次 detached `std::thread` 触发预取**:高频探测会爆出海量线程争抢 SSD IOPS/带宽并狂申请 DRAM → 改为固定线程池。 +- **整批 metadata 查完才 register**:大批量时首个 register 过晚、与 get 竞态 → 改为 chunk 批量 Query + 逐 chunk 流水线 register(见 2.1)。 + +## 5. 可能需要优化的点 + +### 5.3 预取提升后的 lease 保护(已实现) + +**问题(历史)**:`PutEnd`/普通 promote 出来的新 MEMORY 副本初始 `GrantLease(0, soft_pin_ttl)`,lease 立即过期,直到首次 `get()` 才续 `default_kv_lease_ttl`。prefetch 路径在 `NotifyPromotionSuccess` 完成后同样面临此问题:叠加 `exist→get` 间隔可达 15s,大量预取好的 key 会在被使用前因容量驱逐回 SSD(早期压测中约 82% 的“触发过预取却仍走 SSD”属于此类)。 + +**实现(2026-06)**:`from_prefetch=true` 时在 `NotifyPromotionSuccess` 内调用 `GrantLeaseForGroup`(等价于 `GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_)`),与 `GetReplicaList` / `BatchExist` 续租行为对齐。保护时长由 master 的 `--default_kv_lease_ttl` / `--default_kv_soft_pin_ttl` 统一控制(默认 lease 5s)。 + +**调优建议**:若 `prefetch_evicted_after_exist` 仍偏高且 `exist→get` 中位数 > `default_kv_lease_ttl`,优先调大 master lease TTL(长上下文场景常用 **10000 ms**,见 §2.8);若间隔极长(>15s),lease 无法覆盖全窗口,需结合 DRAM 容量或调度侧优化。调大 lease 时注意 §2.8 所述 **Put 失败(NO_AVAILABLE_HANDLE)** 风险。 + +**废弃项**:`MOONCAKE_SSD_PREFETCH_PROTECT_SEC` 不再使用。 + +1. **prefetch 与原生 promotion-on-hit 的 in-flight 计量弱共享** + 两者共用 `promotion_tasks` map、`promotion_in_flight_` 计数与 `promotion_queue_limit_` 上限。共用 map 带来跨路径去重(正向收益),但共用上限意味着大批量 prefetch 可能挤占原生 promotion-on-hit 的名额(反之亦然)。如需隔离,可为 prefetch 增设独立的 in-flight 计数与上限。该共享是 v0.3.11 既有行为,非 main 同步引入。 + +2. **prefetch 路径不感知多租户(tenant_id)** + main 引入了 `tenant_id`(多租户),但 `GetReplicaListForPrefetch` / `RegisterPrefetchTask` 当前固定走 `"default"` 租户(`MakeObjectIdentity(key, "default")`)。若要支持多租户 prefetch,需让 prefetch 的 RPC 透传 `tenant_id`。当前对单租户/default 场景完全正确。 + +3. **节流参数自适应** + `ssd_prefetch_cooldown_sec` 理想值应与“DRAM 从高水位驱逐到目标水位的实际耗时”挂钩,目前为静态默认 5s。可考虑按运行时驱逐速率自适应。 + +4. **prefetch 命中率可观测性** + 压测验证时 P 侧日志 + `analyze_prefetch.sh`(仓库内分析脚本)统计 `[GET-SRC]` / `[PREFETCH-OUTCOME]`。主要指标: + - GET 读盘:DRAM / SSD 条数及占比 + - **SSD预取有效读DRAM** = `prefetch_hit + promoted_untracked` + - 子项:`prefetch_hit`、`promoted_untracked`、`预取失败/超时`(`prefetch_miss_race`)、`驱逐后读SSD`(`prefetch_evicted_after_exist`,含无 register / 有 register 子项)、`exist误trigger`(`prefetch_dram_was_resident`)、`dram_resident` + - 可选:`SHOW_DETAIL=1` 输出 `prefetch_wait_mode`(local / master / none)分布 + +5. **metadata 查询与 register 延迟(阶段 1 已缓解,阶段 2 待做)** + 阶段 1:`BatchGetReplicaListForPrefetch` 减少 client↔master RPC 往返;流水线 register 缩短 exist→首个 task 登记时间。阶段 2:master 侧 `BatchGetReplicaListForPrefetch` 仍逐 key 调 `GetReplicaListForPrefetch`,可改为单次批量查元数据以进一步降延迟。 + +6. **get()-side 等待与 lease 驱逐的权衡** + `ssd_get_wait_ms` 默认 10ms;对 TP1~7 等非 exist 触发 rank,get 侧会通过 **master Query 轮询**(日志 `prefetch_wait_mode=master`)等待 promotion 完成。预取完成后 key 已获 `default_kv_lease_ttl` 保护(5.3);若 `exist→get` 仍超过 lease 窗口,仍可能被驱逐,需调大 master lease 或优化调度间隔。 + +## 6. 涉及的主要代码位置(Mooncake) + +| 文件 | 改动要点 | +|---|---| +| `mooncake-store/include/types.h` | 新增 `CONFIG_KEY_SSD_PREFETCH_*` / `CONFIG_KEY_SSD_GET_WAIT_MS` 及默认值 | +| `mooncake-store/include/real_client.h` / `src/real_client.cpp` | `PrefetchThrottle`(含 `promote_attempted`、`defer reserve` 至 BatchQuery 后)、`prefetch_pool_`、`triggerSsdPrefetch`(chunk 批量 + 流水线)、`runLocalPrefetch`、`RunLocalPrefetchRegisterAndPromote`、`ClassifySsdPrefetchRoute`、`TryRefreshBestMemoryReplica`、`ClassifyPrefetchOutcome`;`batch_get_into_multi_buffers_internal` 内 get-wait(local / master 双路径)与 `[GET-SRC]`/`[PREFETCH-OUTCOME]` 日志(含 `prefetch_promote_attempted`) | +| `mooncake-store/include/master_service.h` / `src/master_service.cpp` | `GetReplicaListForPrefetch`(只读)、`RegisterPrefetchTask`(不入心跳队列)、`NotifyPromotionSuccess`(`from_prefetch` 时 `GrantLeaseForGroup` 对齐 exist/get lease),适配 main 的租户作用域 | +| `mooncake-store/include/master_client.h` / `src/master_client.cpp` | prefetch RPC 客户端;含 `BatchGetReplicaListForPrefetch` | +| `mooncake-store/include/rpc_service.h` / `src/rpc_service.cpp` | `WrappedMasterService` 封装与 RPC handler 注册(含 batch prefetch) | +| `mooncake-store/include/client_service.h` / `src/client_service.cpp` | `QueryForPrefetch` / `BatchQueryForPrefetch` / `RegisterPrefetchTask` | +| `mooncake-store/include/file_storage.h` / `src/file_storage.cpp` | `PrefetchKeys()`:封装完整 promotion 执行链;`AllocateBatch`/`BatchLoad` 使用 `MakeTenantScopedStorageKey` 作为 staging map 键(修复 B8 INVALID_KEY) | +| `mooncake-store/include/storage_backend.h` / `src/storage_backend.cpp` | `BucketStorageBackend::BatchOffload` 先 commit `object_bucket_map_` 再 `NotifyOffloadSuccess`;notify 失败时 `RollbackCommittedBucket` + `CleanupOrphanedBucket`(见 §8 B10、`ssd-offload.md` 写盘流程) | +| `mooncake-store/include/replica.h` / `store_c.*` / `pyclient.h` / `dummy_client.*` | 签名/描述符/ C API / Python binding 适配(`ExistOptions.prefetch_to_memory`、`prefetch` 参数) | +| `mooncake-integration/store/store_py.cpp` | pybind:`setup` 增加 `ssd_prefetch_*` 参数、`is_exist`/`batch_is_exist` 的 prefetch 选项 | + +## 7. 从 v0.3.11 同步到 main 的注意事项(已处理) +- main 引入 `tenant_id`,所有 `setup_real` 签名把 `ssd_prefetch_cooldown_sec` / `ssd_prefetch_dedup_ttl_sec` **追加到 `tenant_id` 之后**(带默认值,不影响 main 既有调用点)。 +- master 端 prefetch 实现按 main 的租户作用域元数据模型改写:`MetadataAccessor(this, key)` → `MetadataAccessor(this, MakeObjectIdentity(key, "default"))`;`accessor.GetShard()->promotion_tasks` → `accessor.GetTenantState().promotion_tasks`。 +- `get_config_size` 在 main 返回 `std::optional`,读取节流配置处改用 `.value_or(默认值)`。 +- **第 4 节 5 个问题对应的‘专用路径’规避在 main 上完整保留**(只读 RPC 不发 lease/sketch/不入队、`RegisterPrefetchTask` 不推心跳队列、`PrefetchKeys` 立即执行)。 +- 待办:在 Linux/Ascend 环境实际编译验证 `Mooncake`(Windows 无法编译)。 + +## 8. 历史 Bug 与修复记录(排障日志) + +> 本节按“观察到的现象 → 根因 → 修复/结论”记录开发过程中真实踩过的坑,作为换会话后对齐上下文的依据。第 4 节侧重“方案选型理由”,本节侧重“实际故障与定位”。 + +### B1. prefetch 风暴 → offload 饿死(最严重,催生第 2.3/2.4 节的节流与线程池) +- **现象**:压测时日志大量出现 `REPLICA_IS_NOT_READY`、`INVALID_KEY`、`NO_AVAILABLE_HANDLE`,伴随 `KV load failure`;`mooncake_master` 持续刷 DRAM eviction 日志,`memory_ratio` 卡在 ~0.81 反复空转驱逐,offload 迟迟写不进 SSD。 +- **根因**:vLLM 调度 busy-loop 会对 waiting 队列里同一请求的**同一批 SSD-only block 反复探测 `exist`**;早期每次探测都 `detached std::thread` 直接发起预取 → 海量并发预取抢占**单 IO 线程 / SSD IOPS / DRAM 分配名额**,把正常的 offload(DRAM→SSD)挤到饿死;DRAM 一直在高水位 → master 反复尝试驱逐却腾不出空间。 +- **修复**:① `ssd_prefetch_dedup_ttl_sec`(默认 30s)同一 key 窗口内只预取一次——**主要起效项**,直接消除风暴;② `ssd_prefetch_cooldown_sec`(默认 5s)DRAM 饱和退避;③ detached 线程改为固定大小 `prefetch_pool_`。 +- **结论**:本数据集规模下 cooldown 实际未触发(dedup 已足够、DRAM 未打满)。 + +### B2. 双路径双提升 → `REPLICA_IS_NOT_READY` +- **现象**:早期同时存在“`Query()` 入 promotion-on-hit 队列”与“`PrefetchKeys` 直调”两条路,对同一 key 各做一次 promotion,报 `REPLICA_IS_NOT_READY`。 +- **根因/修复**:见第 4 节方案 X 问题 #4。最终只保留专用路径:`RegisterPrefetchTask` **不推 holder 心跳 `promotion_objects` 队列**,避免与直调重复提升。 + +### B3. `setup_real` 抽象类编译错误 +- **现象**:编译报 `invalid new-expression of abstract class RealClient`(无法 new 抽象类)。 +- **根因**:给 `setup` 追加 `ssd_prefetch_cooldown_sec`/`ssd_prefetch_dedup_ttl_sec` 参数时,只改了部分声明,导致 `real_client.h`、`pyclient.h`、`dummy_client.h` 之间 `setup` 纯虚函数签名不一致 → `RealClient` 未完全覆盖基类纯虚函数,仍是抽象类。 +- **修复**:保持三处(基类声明 / `RealClient` / `DummyClient` / pybind)`setup` 签名**完全一致**,新参数统一追加在 `tenant_id` 之后并带默认值。后续任何透传新参数都要同步全链路签名。 + +### B4. `QueryResult` const 成员 → 移动赋值被删除(get-wait 复查路径) +- **现象**:实现 get()-side “等一次后复查 DRAM 副本”时编译失败,提示 `tl::expected` 的移动赋值被 `delete`(因 `QueryResult` 含 const 成员,无法对已存在变量重新赋值)。 +- **修复**:用 `std::optional` 承载复查结果(`.emplace()` 而非赋值),绕开不可移动赋值的限制。该写法用于 2.5 节 get-wait 中 **master 轮询复查**路径(`prefetch_wait_mode=master`)。 + +### B5. get 来源/等待日志埋点放错函数 +- **现象**:在 `get_buffer_internal` 加的 “GET-SRC / 等待” 日志在部分集成用例里始终不打印。 +- **根因**:vLLM-Ascend 实际调用的是 `batch_get_into_multi_buffers_internal`(批量多 buffer 路径),而非 `get_buffer_internal`。 +- **结论/修复**:日志与 get-wait 逻辑须挂在 `batch_get_into_multi_buffers_internal`。这是判断“get 走 DRAM 还是 SSD”“是否撞上在途预取”的正确观测点——也是得出 “exist→get ~15s、SSD→DRAM ~2ms、~82% 预取后被驱逐” 等结论的数据来源函数。 + +### B6. `LEASE_EXPIRED`(已知项,非本特性引入) +- **现象**:大批量请求日志出现 `LEASE_EXPIRED`(如多 TP rank 上若干 key 批量传输超时)。 +- **根因**:客户端 lease 默认约 5s;当一次 query→transfer 的批量很大、整体耗时超过 lease 有效期时,对象 lease 在传输完成前过期。PD 分离下 P 从 D 远程拉 DRAM 时更易触发。属客户端租约时序问题,与 prefetch 专用路径无直接因果(专用只读 RPC 本就**不发 lease**)。 +- **处置**:记录为已知项;与 5.3 节 master 侧容量驱逐(lease 保护)是相关但不同的两个问题(前者是客户端取数租约超时,后者是 master 侧 DRAM 容量驱逐)。 + +### B7. metadata 串行查询 + exist 同步 reserve → `prefetch_miss_race` / `prefetch_dram_was_resident` 偏高 +- **现象(批量化 / defer reserve 前)**:首个 `registered task` 比首个 `[GET-SRC]` 晚约 **143ms**;`prefetch_hit` ~40%;`prefetch_miss_race` ~58%(含大量虚高);修正后约 **25%** 的 GET 为 `prefetch_dram_was_resident`(exist 误 trigger)。 +- **根因 1(B7 原)**:早期 `triggerSsdPrefetch` 对每个 key 单独 `QueryForPrefetch`,且**等全部 key 查完**才统一 register;大批量 metadata 阶段耗时长,get 侧 10ms 预算内 master 尚未登记 task。 +- **根因 2(defer reserve)**:`PrefetchThrottle::reserve()` 曾在 exist 同步路径执行,对**已在 DRAM 的 key** 也记 trigger;get 读 DRAM 却误入 prefetch 统计(`prefetch_dram_was_resident`)。exist 只知「存在」,不知副本层级,须等 BatchQuery 过滤。 +- **修复(阶段 1)**:`BatchQueryForPrefetch`(128 key/chunk)+ 每 chunk 完成后立即 `RunLocalPrefetchRegisterAndPromote`(流水线 register);**reserve 移到 BatchQuery 确认 SSD-only 之后**。 +- **验证(批量化 + defer reserve + outcome 拆分后)**:`prefetch_dram_was_resident`≈**0**;`prefetch_hit` **86.4%**;`prefetch_miss_race` **0.7%**;`prefetch_evicted_after_exist` **10.0%**;exist→首个 register ~**83ms**(较修复前 143ms 缩短)。 + +### B8. `PrefetchKeys` staging 键不匹配 → `INVALID_KEY` +- **现象**:预取路径偶发 `INVALID_KEY` / staging slice missing。 +- **根因**:`PrefetchKeys` 内 `AllocateBatch`/`BatchLoad` 的 map 键须为 tenant-scoped **storage key**(`MakeTenantScopedStorageKey`),与 logical object key 不一致时查不到 slice。 +- **修复**:`PrefetchKeys` 对 staging 使用 `storage_key`,logical key 仍用于 master promotion RPC。 + +### B9. `analyze_prefetch.sh` 统计 `prefetch_hit` 漏计(grep 顺序) +- **现象**:分析脚本曾显示 `prefetch_hit_master/local=0`,与日志肉眼不符。 +- **根因**:脚本用单向 grep(要求 `prefetch_wait_mode=master` 在 `outcome=prefetch_hit` 之前),而实际日志字段顺序相反。 +- **修复**:改为双向 grep(`| outcome=prefetch_hit.*prefetch_wait_mode=master`)。 + +### B10. `BatchOffload` 先 NotifyOffloadSuccess、后 commit 本地索引 → get 路径 `INVALID_KEY` +- **现象**:高并发 offload + get/prefetch 并发时,大量 `Failed to get key`(`res: -400` = `INVALID_KEY`);`BatchLoad` / `SSD read failed` 与 prefetch `BatchLoad failed` 并存;Master `SSD Storage` 远未满、`EvictDiskReplica=0`;部分请求级 load 失败。 +- **根因**:Mooncake `BucketStorageBackend::BatchOffload` 在 **写盘成功后** 先调 `complete_handler`(内部 `NotifyOffloadSuccess`,Master 登记 `LOCAL_DISK` 并可驱逐 MEMORY),**之后**才 commit Client 侧 `object_bucket_map_`。 + 并发 get/prefetch 在此窗口内走 SSD 读路径,但本地索引尚无该 storage key → `Key not found` → `INVALID_KEY`。与 SSD 容量无关,是 **Master 可见性早于 Client 本地索引** 的一致性问题(`publish-before-commit`)。 +- **错误顺序(修复前)**: + ``` + WriteBucket → NotifyOffloadSuccess → [竞态窗口] → commit object_bucket_map_ + ``` +- **修复(2026-06,`storage_backend.cpp`)**: + ``` + WriteBucket → commit object_bucket_map_ → NotifyOffloadSuccess + ``` + 若 `NotifyOffloadSuccess` 失败:`RollbackCommittedBucket` 回滚本地索引 + `CleanupOrphanedBucket` 删除 orphan 文件,避免「Client 能读、Master 不知道」或反向 ghost replica。 +- **与 B8 区别**:B8 是 PrefetchKeys **staging 键名**错误;B10 是 offload **提交顺序**导致 get/offload 读路径索引缺失。B8 修复不能消除 B10。 +- **与 §2.8 lease 变更关系**:lease 对齐解决预取 key **被驱逐**;B10 解决 **索引未就绪** 导致的 `INVALID_KEY`,二者正交,需同时合入。 + +### 关键实测数据(驱动上述决策的数字,便于对齐) +- `exist→get` 间隔中位 **15~17s**(受调度排队放大)。 +- SSD→DRAM 单次预取 **~2ms(p90 2.65ms)**。 +- 约 **82%** 的“触发过预取却仍走 SSD”(lease 对齐前),原因是提升出的 MEMORY 副本 lease=0、在 `get()` 前就被容量驱逐回 SSD;§5.3 实现后预取路径已与 exist/get 对齐 lease。 +- 节流:`dedup_ttl` 为消除风暴的主要因素,`cooldown` 未触发;预取带宽未打满。 +- 批量化 / defer reserve **前**:metadata 阶段致首个 register 比首个 get 晚 **~143ms**;`prefetch_hit` ~40% / 有效读 DRAM ~64%;`exist误trigger` ~25%。 +- 批量化 + defer reserve + outcome 拆分**后**:`prefetch_hit` **86.4%**;`exist误trigger` **0**;`prefetch_miss_race` **0.7%**;`prefetch_evicted_after_exist` **10.0%**(主因 TP1~7 master 等待超时 / lease 窗口不足,见 §5.3、§2.8)。 +- B10 修复后:get 路径 `INVALID_KEY` 归零;高压尾部仍可能出现 `BatchPut insufficient space`(§2.8 Put 风险),一般不阻断单次推理成功。 +- `ssd_get_wait_ms` 默认 **10ms**(get 侧等待已默认开启);`0` 可关闭。 diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 4a55a2e11e..8dd0d88923 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -1759,6 +1759,15 @@ PYBIND11_MODULE(store, m) { return oss.str(); }); + py::class_(m, "ExistOptions") + .def(py::init<>()) + .def_readwrite("prefetch_to_memory", &ExistOptions::prefetch_to_memory) + .def("__str__", [](const ExistOptions &options) { + std::ostringstream oss; + oss << options; + return oss.str(); + }); + py::enum_(m, "ReplicaStatus") .value("UNDEFINED", ReplicaStatus::UNDEFINED) .value("INITIALIZED", ReplicaStatus::INITIALIZED) @@ -2028,7 +2037,11 @@ PYBIND11_MODULE(store, m) { const py::object &engine = py::none(), bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") { + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) { auto real_client = self.init_real_client(); std::shared_ptr transfer_engine = nullptr; @@ -2040,14 +2053,19 @@ PYBIND11_MODULE(store, m) { local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, transfer_engine, "", enable_ssd_offload, - ssd_offload_path, tenant_id); + ssd_offload_path, tenant_id, ssd_prefetch_cooldown_sec, + ssd_prefetch_dedup_ttl_sec); }, py::arg("local_hostname"), py::arg("metadata_server"), py::arg("global_segment_size"), py::arg("local_buffer_size"), py::arg("protocol"), py::arg("rdma_devices"), py::arg("master_server_addr"), py::arg("engine") = py::none(), py::arg("enable_ssd_offload") = false, - py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default") + py::arg("ssd_offload_path") = "", py::arg("tenant_id") = "default", + py::arg("ssd_prefetch_cooldown_sec") = + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + py::arg("ssd_prefetch_dedup_ttl_sec") = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) .def( "setup", [](MooncakeStorePyWrapper &self, const py::dict &config_dict) { @@ -2078,7 +2096,14 @@ PYBIND11_MODULE(store, m) { " ipc_socket_path: IPC socket path.\n" " enable_ssd_offload: Enable SSD offload (default false).\n" " ssd_offload_path: SSD storage directory path (overrides env " - "var).") + "var).\n" + " ssd_prefetch_cooldown_sec: SSD prefetch memory-pressure backoff " + "in seconds (default 5, 0 disables).\n" + " ssd_prefetch_dedup_ttl_sec: SSD prefetch per-key dedup/" + "rate-limit TTL in seconds (default 30, 0 disables).\n" + " ssd_get_wait_ms: get-side prefetch wait budget in milliseconds " + "(default 10, poll 1ms with early exit; 0 disables). Env " + "MOONCAKE_SSD_GET_WAIT_MS overrides this.") .def( "setup_dummy", [](MooncakeStorePyWrapper &self, size_t mem_pool_size, @@ -2184,20 +2209,26 @@ PYBIND11_MODULE(store, m) { py::arg("keys"), py::arg("force") = false, "Batch remove objects by keys. Returns a list of status codes " "(0=success, negative=error code) for each key.") - .def("is_exist", - [](MooncakeStorePyWrapper &self, const std::string &key) { - py::gil_scoped_release release; - return self.store_->isExist(key); - }) + .def( + "is_exist", + [](MooncakeStorePyWrapper &self, const std::string &key, + const ExistOptions &options) { + py::gil_scoped_release release; + return self.store_->isExist(key, options); + }, + py::arg("key"), py::arg("options") = ExistOptions{}) .def( "batch_is_exist", [](MooncakeStorePyWrapper &self, - const std::vector &keys) { + const std::vector &keys, + const ExistOptions &options) { py::gil_scoped_release release; - return self.store_->batchIsExist(keys); + return self.store_->batchIsExist(keys, options); }, - py::arg("keys"), - "Check if multiple objects exist. Returns list of results: 1 if " + py::arg("keys"), py::arg("options") = ExistOptions{}, + "Check if multiple objects exist. When options.prefetch_to_memory " + "is True, triggers SSD-to-DRAM promotion for keys that only have " + "LOCAL_DISK replicas. Returns list of results: 1 if " "exists, 0 if not exists, -1 if error") .def("close", [](MooncakeStorePyWrapper &self) { diff --git a/mooncake-store/include/client_service.h b/mooncake-store/include/client_service.h index 0094f4f14d..1702137fc5 100644 --- a/mooncake-store/include/client_service.h +++ b/mooncake-store/include/client_service.h @@ -127,6 +127,23 @@ class Client { */ tl::expected Query(const std::string& object_key); + /** + * @brief Read-only replica metadata for SSD prefetch (no lease / no + * promotion-on-hit side effects). + */ + tl::expected QueryForPrefetch( + const std::string& object_key); + + std::vector> BatchQueryForPrefetch( + const std::vector& object_keys); + + /** + * @brief Register a master-side promotion task for SSD prefetch without + * using the promotion-on-hit heartbeat queue. + */ + tl::expected RegisterPrefetchTask( + const std::string& object_key); + /** * @brief Queries replica lists for object keys that match a regex pattern. * @param str The regular expression string to match against object keys. diff --git a/mooncake-store/include/dummy_client.h b/mooncake-store/include/dummy_client.h index d538708828..f48b40b22c 100644 --- a/mooncake-store/include/dummy_client.h +++ b/mooncake-store/include/dummy_client.h @@ -21,16 +21,18 @@ class DummyClient : public PyClient { int64_t unregister_shm(); - int setup_real(const std::string &local_hostname, - const std::string &metadata_server, - size_t global_segment_size, size_t local_buffer_size, - const std::string &protocol, const std::string &rdma_devices, - const std::string &master_server_addr, - const std::shared_ptr &transfer_engine, - const std::string &ipc_socket_path, - bool enable_ssd_offload = false, - const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") { + int setup_real( + const std::string &local_hostname, const std::string &metadata_server, + size_t global_segment_size, size_t local_buffer_size, + const std::string &protocol, const std::string &rdma_devices, + const std::string &master_server_addr, + const std::shared_ptr &transfer_engine, + const std::string &ipc_socket_path, bool enable_ssd_offload = false, + const std::string &ssd_offload_path = "", + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) { // Dummy client does not support real setup return -1; }; @@ -147,9 +149,11 @@ class DummyClient : public PyClient { std::vector batchRemove(const std::vector &keys, bool force = false); - int isExist(const std::string &key); + int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}); - std::vector batchIsExist(const std::vector &keys); + std::vector batchIsExist(const std::vector &keys, + const ExistOptions &options = ExistOptions{}); int64_t getSize(const std::string &key); diff --git a/mooncake-store/include/file_storage.h b/mooncake-store/include/file_storage.h index 6aa2ac3d56..e193a8ce12 100644 --- a/mooncake-store/include/file_storage.h +++ b/mooncake-store/include/file_storage.h @@ -1,10 +1,14 @@ #pragma once +#include + #include "client_service.h" #include "client_buffer.hpp" #include "storage_backend.h" #include "pinned_buffer_pool.h" +#include + namespace mooncake { struct SsdMetric; @@ -41,6 +45,30 @@ class FileStorage { FileStorageConfig config_; + /** + * @brief Promote a set of keys from local SSD to DRAM. + * + * For each key the method allocates a staging buffer, reads the data from + * the local SSD backend, transfers it to a freshly-allocated MEMORY + * replica via Transfer Engine, and commits the result on the master. + * Failures are logged per-key but do not propagate; this is best-effort. + * + * @param keys Keys to promote (must already have LOCAL_DISK replicas) + * @param sizes Corresponding object sizes in bytes + * @param dram_pressure Optional out-flag. Set to true if any key failed to + * promote because DRAM was saturated (NO_AVAILABLE_HANDLE), so the + * caller can back off prefetch and let eviction/offload reclaim + * memory. Never reset to false; caller initializes it. + * @return void on success, ErrorCode on critical (pre-loop) failure + */ + using PrefetchKeyCallback = + std::function; + + tl::expected PrefetchKeys( + const std::vector& keys, const std::vector& sizes, + bool* dram_pressure = nullptr, + PrefetchKeyCallback on_key_done = nullptr); + /** * @brief Releases buffer associated with a specific batch_id. * Called by remote client after transfer completion. diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h index cad4159225..0046356780 100644 --- a/mooncake-store/include/master_client.h +++ b/mooncake-store/include/master_client.h @@ -151,6 +151,23 @@ class MasterClient { [[nodiscard]] tl::expected GetReplicaList(const std::string& object_key, const std::string& tenant_id); + /** + * @brief Read-only replica metadata query for best-effort SSD prefetch. + * + * Unlike GetReplicaList, this does NOT grant a lease, update the access + * sketch, bump cache-hit / valid-get counters, or enqueue a promotion. It + * is purely used to decide whether a key is SSD-only and worth prefetching, + * so probing must not perturb eviction/offload pacing or metrics. + * @param object_key Key to query. + * @return Replica list on success, or an ErrorCode on failure. + */ + [[nodiscard]] tl::expected + GetReplicaListForPrefetch(const std::string& object_key); + + [[nodiscard]] std::vector> + BatchGetReplicaListForPrefetch( + const std::vector& object_keys); + /** * @brief Retrieves replica lists for object keys that match a regex * pattern. @@ -433,6 +450,21 @@ class MasterClient { const UUID& client_id, const std::vector& tasks, const std::vector& metadatas); + /** + * @brief Registers a prefetch (SSD->DRAM promotion) task on the master. + * + * Creates a promotion_tasks entry consumed by PromotionAllocStart, but + * deliberately does NOT go through the promotion admission gate + * (TryPushPromotionQueue) and does NOT push to the holder's + * promotion_objects heartbeat queue. This keeps the dedicated prefetch path + * separate from promotion-on-hit and avoids double-promoting the same key. + * @param client_id The UUID of the client requesting the prefetch. + * @param key The object key to promote from SSD to DRAM. + * @return An empty expected on success, or an ErrorCode on failure. + */ + [[nodiscard]] tl::expected RegisterPrefetchTask( + const UUID& client_id, const std::string& key); + /** * @brief Heartbeat-driven pull of pending L2->L1 promotion work for a * client. Returns tenant-scoped tasks the caller should read from local diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index ff9c7e5637..de14e442ea 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -298,6 +298,26 @@ class MasterService { auto GetReplicaList(const std::string& key, const std::string& tenant_id) -> tl::expected; + /** + * @brief Read-only replica metadata for SSD prefetch. + * + * Unlike GetReplicaList, does not grant a lease, bump access metrics, or + * enqueue promotion-on-hit work. + */ + auto GetReplicaListForPrefetch(const std::string& key) + -> tl::expected; + + /** + * @brief Register an in-flight promotion task for SSD prefetch. + * + * Records a PromotionTask on the master without the promotion-on-hit + * frequency/watermark gates and without pushing onto the holder client's + * promotion_objects heartbeat queue. The caller must execute the transfer + * via FileStorage::PrefetchKeys (PromotionAllocStart path). + */ + auto RegisterPrefetchTask(const UUID& client_id, const std::string& key) + -> tl::expected; + /** * @brief Start a put operation for an object * @param[out] replica_list Vector to store replica information for the @@ -1137,6 +1157,8 @@ class MasterService { uint64_t object_size; std::chrono::system_clock::time_point start_time; UUID holder_id; // owner of source LOCAL_DISK; only Notifier allowed + bool from_prefetch{ + false}; // set by RegisterPrefetchTask; enables protect lease }; static constexpr size_t kNumShards = 1024; // Number of metadata shards diff --git a/mooncake-store/include/pyclient.h b/mooncake-store/include/pyclient.h index c2e2201c5d..640ea982fc 100644 --- a/mooncake-store/include/pyclient.h +++ b/mooncake-store/include/pyclient.h @@ -163,6 +163,21 @@ class ClientRequester { void release_offload_buffer(const std::string &client_addr, uint64_t batch_id); + /** + * @brief Asks a remote holder node to prefetch (promote SSD->DRAM) the + * given keys. Used for cross-node SSD prefetch: the holder must register + * the promotion task itself (Master validates holder_id == client_id), so + * the requester delegates via this RPC instead of calling + * RegisterPrefetchTask locally. Best-effort: errors are logged, not + * propagated, and never block the originating exist() call. + * @param client_addr Network address of the holder's offload RPC service. + * @param keys SSD-only keys held by the remote node. + * @param sizes Object sizes (bytes), index-aligned with keys. + */ + void prefetch_offload_object(const std::string &client_addr, + const std::vector &keys, + const std::vector &sizes); + private: /** * @brief A batch of allocated memory buffers, tracking both handles and @@ -225,7 +240,10 @@ class PyClient { const std::shared_ptr &transfer_engine, const std::string &ipc_socket_path, bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default") = 0; + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) = 0; virtual int setup_dummy(size_t mem_pool_size, size_t local_buffer_size, const std::string &server_address, @@ -338,10 +356,12 @@ class PyClient { virtual std::vector batchRemove(const std::vector &keys, bool force = false) = 0; - virtual int isExist(const std::string &key) = 0; + virtual int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}) = 0; virtual std::vector batchIsExist( - const std::vector &keys) = 0; + const std::vector &keys, + const ExistOptions &options = ExistOptions{}) = 0; virtual int64_t getSize(const std::string &key) = 0; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 0678964497..8a943205d0 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -18,6 +19,7 @@ #include "mutex.h" #include "utils.h" #include "rpc_types.h" +#include "thread_pool.h" #include #include #include @@ -65,6 +67,225 @@ class ResourceTracker { std::jthread signal_thread_{}; // joins on destruction }; +// Throttle state for best-effort SSD prefetch. Two mechanisms: +// 1. Memory-pressure cooldown: when promotion fails because DRAM is +// saturated (NO_AVAILABLE_HANDLE), open a short cooldown window during +// which prefetch is a no-op, so prefetch (which *adds* to DRAM) stops +// competing with eviction/offload (which *frees* DRAM) on the holder. +// 2. Per-key in-flight dedup / rate-limit: suppress duplicate prefetch +// triggers for the same key from concurrent exist/get probes within a +// TTL window, cutting the RPC/thread storm that starves offload. +// Held via shared_ptr so detached prefetch threads can use it safely without +// capturing a raw RealClient* that may outlive the work. +class PrefetchThrottle { + public: + enum class State : uint8_t { + kTriggered = 0, + kInFlight = 1, + kCompleted = 2, + kFailed = 3, + kAlreadyResident = 4, + }; + + struct Entry { + int64_t trigger_ms{-1}; + int64_t completed_ms{-1}; + State state{State::kTriggered}; + // Set when RegisterPrefetchTask + PrefetchKeys path runs for this key. + bool promote_attempted{false}; + }; + + static int64_t NowMs() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } + + // Reconfigure from mooncake.json (values come in as seconds, stored as ms). + // Non-positive values keep the current setting. A 0 cooldown disables the + // memory-pressure backoff; a 0 dedup TTL disables per-key rate-limiting. + void configure(int64_t cooldown_sec, int64_t dedup_ttl_sec) { + if (cooldown_sec >= 0) { + cooldown_ms_.store(cooldown_sec * 1000, std::memory_order_relaxed); + } + if (dedup_ttl_sec >= 0) { + dedup_ttl_ms_.store(dedup_ttl_sec * 1000, + std::memory_order_relaxed); + } + } + + bool inCooldown() const { + return NowMs() < cooldown_until_ms_.load(std::memory_order_relaxed); + } + + // Open the memory-pressure backoff window. No-op if cooldown is disabled. + void enterCooldown() { + const int64_t cooldown_ms = + cooldown_ms_.load(std::memory_order_relaxed); + if (cooldown_ms <= 0) { + return; + } + cooldown_until_ms_.store(NowMs() + cooldown_ms, + std::memory_order_relaxed); + } + + // Returns the subset of keys not seen within the dedup TTL, registering + // them as triggered synchronously (before the async prefetch job runs). + std::vector reserve(const std::vector &keys) { + const int64_t ttl_ms = dedup_ttl_ms_.load(std::memory_order_relaxed); + const int64_t now = NowMs(); + std::vector out; + out.reserve(keys.size()); + std::lock_guard lock(mutex_); + if (ttl_ms <= 0) { + for (const auto &key : keys) { + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = -1, + .state = State::kTriggered}; + out.push_back(key); + } + return out; + } + for (auto it = entries_.begin(); it != entries_.end();) { + const int64_t last_ms = it->second.completed_ms >= 0 + ? it->second.completed_ms + : it->second.trigger_ms; + if (now - last_ms > ttl_ms) { + it = entries_.erase(it); + } else { + ++it; + } + } + for (const auto &key : keys) { + if (entries_.find(key) != entries_.end()) { + continue; + } + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = -1, + .state = State::kTriggered}; + out.push_back(key); + } + return out; + } + + void markInFlight(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kInFlight; + it->second.promote_attempted = true; + } + + void markCompleted(const std::string &key) { + const int64_t now = NowMs(); + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + entries_[key] = Entry{.trigger_ms = now, + .completed_ms = now, + .state = State::kCompleted}; + return; + } + it->second.state = State::kCompleted; + it->second.completed_ms = now; + } + + void markFailed(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kFailed; + } + + // Async prefetch decided the key is not SSD-only (e.g. MEMORY already + // present). Clears in-flight semantics without treating as promotion done. + void markAlreadyResident(const std::string &key) { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return; + } + it->second.state = State::kAlreadyResident; + it->second.promote_attempted = false; + } + + bool promoteAttempted(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return false; + } + return it->second.promote_attempted; + } + + int64_t triggeredAt(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return -1; + } + return it->second.trigger_ms; + } + + int64_t completedAt(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end() || it->second.state != State::kCompleted) { + return -1; + } + return it->second.completed_ms; + } + + State stateOf(const std::string &key) const { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return State::kFailed; + } + return it->second.state; + } + + // Poll until promotion completes or the budget expires. Returns true only + // when the key reaches kCompleted before the deadline. + bool waitForCompletion(const std::string &key, int64_t max_wait_ms, + int64_t poll_ms = 1) const { + if (max_wait_ms <= 0) { + return false; + } + const int64_t deadline = NowMs() + max_wait_ms; + const int64_t step_ms = std::max(poll_ms, 1); + while (NowMs() < deadline) { + { + std::lock_guard lock(mutex_); + auto it = entries_.find(key); + if (it == entries_.end()) { + return false; + } + if (it->second.state == State::kCompleted) { + return true; + } + if (it->second.state == State::kFailed) { + return false; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(step_ms)); + } + return stateOf(key) == State::kCompleted; + } + + private: + std::atomic cooldown_until_ms_{0}; + std::atomic cooldown_ms_{DEFAULT_SSD_PREFETCH_COOLDOWN_SEC * 1000}; + std::atomic dedup_ttl_ms_{DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC * + 1000}; + mutable std::mutex mutex_; + std::unordered_map entries_; +}; + class RealClient : public PyClient { public: RealClient(); @@ -84,7 +305,10 @@ class RealClient : public PyClient { const std::string &ipc_socket_path = "", bool enable_ssd_offload = false, const std::string &ssd_offload_path = "", - const std::string &tenant_id = "default"); + const std::string &tenant_id = "default", + int64_t ssd_prefetch_cooldown_sec = DEFAULT_SSD_PREFETCH_COOLDOWN_SEC, + int64_t ssd_prefetch_dedup_ttl_sec = + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC); int setup_dummy(size_t mem_pool_size, size_t local_buffer_size, const std::string &server_address, @@ -297,17 +521,21 @@ class RealClient : public PyClient { /** * @brief Check if an object exists * @param key Key to check + * @param options Optional exist behavior (e.g. prefetch_to_memory) * @return 1 if exists, 0 if not exists, -1 if error */ - int isExist(const std::string &key); + int isExist(const std::string &key, + const ExistOptions &options = ExistOptions{}); /** * @brief Check if multiple objects exist * @param keys Vector of keys to check + * @param options Optional exist behavior (e.g. prefetch_to_memory) * @return Vector of existence results: 1 if exists, 0 if not exists, -1 if * error */ - std::vector batchIsExist(const std::vector &keys); + std::vector batchIsExist(const std::vector &keys, + const ExistOptions &options = ExistOptions{}); /** * @brief Get the size of an object @@ -679,6 +907,23 @@ class RealClient : public PyClient { batch_get_offload_object(const std::vector &keys, const std::vector &sizes); + /** + * @brief Holder-side RPC handler for cross-node SSD prefetch. + * + * Invoked by a remote requester when the LOCAL_DISK replica of a key is + * held by THIS node. Runs the same promotion as the local prefetch path: + * RegisterPrefetchTask (with this node's own client_id, so Master's + * holder_id == client_id check passes) + FileStorage::PrefetchKeys to + * stage the SSD object into DRAM. Best-effort and fire-and-forget: the + * actual SSD I/O runs on a detached thread and this returns immediately. + * @param keys SSD-only keys held by this node to promote into DRAM. + * @param sizes Object sizes (bytes) captured by the requester from + * Master metadata; index-aligned with keys. + * @return true if the prefetch work was accepted/scheduled. + */ + bool prefetch_offload_object(const std::vector &keys, + const std::vector &sizes); + /** * @brief Releases buffer associated with a specific batch_id. * Called by remote client after transfer completion. @@ -902,6 +1147,52 @@ class RealClient : public PyClient { tl::expected setup_ascend_internal( size_t local_buffer_size); + void triggerSsdPrefetch(const std::vector &keys); + + // Read env-tunables and build prefetch_pool_. Called once from + // setup_internal when SSD offload is enabled. + void initPrefetchRuntime(); + + // Submit a best-effort prefetch job to the bounded prefetch_pool_ so + // high-frequency probes don't explode into unbounded detached threads. + // Falls back to a detached std::thread only if the pool is unavailable + // (e.g. enqueue after shutdown). + void submitPrefetchJob(std::function job); + + // Bounded worker pool for SSD prefetch promotion jobs. Fixed size (see + // initPrefetchRuntime), consistent with ClientService's task pool; bounds + // concurrent SSD reads / DRAM allocations and avoids unbounded detached + // threads. + std::shared_ptr prefetch_pool_; + + // get()-side wait-for-prefetch. When a get selects a LOCAL_DISK (SSD) + // replica but prefetch for the same key is in flight, poll every 1 ms + // (early exit on completion) up to this budget. Env MOONCAKE_SSD_GET_WAIT_MS + // overrides mooncake.json ssd_get_wait_ms. 0 disables waiting. + int64_t ssd_get_wait_ms_{DEFAULT_SSD_GET_WAIT_MS}; + int64_t ssd_get_wait_ms_config_{DEFAULT_SSD_GET_WAIT_MS}; + + // Best-effort SSD prefetch throttle (memory-pressure cooldown + per-key + // in-flight dedup). Shared with detached prefetch threads via shared_ptr. + // Tunable via mooncake.json (ssd_prefetch_cooldown_sec / + // ssd_prefetch_dedup_window_sec); see setup_internal(ConfigDict). + std::shared_ptr prefetch_throttle_ = + std::make_shared(); + + /** + * @brief Run prefetch promotion for keys whose LOCAL_DISK replica is held + * by THIS node. Registers a promotion task per key (this node is the + * holder) and stages the objects from SSD into DRAM via prefetch_pool_. + * Called from prefetch_offload_object (remote holder RPC). Best-effort; + * runs asynchronously. Register + PrefetchKeys execution is delegated to + * RunLocalPrefetchRegisterAndPromote (same as triggerSsdPrefetch local + * branch). + * @param keys Keys to promote (LOCAL_DISK replica held by this node). + * @param sizes Object sizes (bytes), index-aligned with keys. + */ + void runLocalPrefetch(const std::vector &keys, + const std::vector &sizes); + private: std::unordered_map mounted_segment_records_; diff --git a/mooncake-store/include/replica.h b/mooncake-store/include/replica.h index 677e77b333..681a4b6d6c 100644 --- a/mooncake-store/include/replica.h +++ b/mooncake-store/include/replica.h @@ -143,6 +143,22 @@ struct ReplicateConfig { } }; +/** + * @brief Options for is_exist / batch_is_exist (RFC #2213). + * + * Mirrors the ReplicateConfig pattern: a struct leaves room for future + * extensions without changing the function signature each time. + */ +struct ExistOptions { + bool prefetch_to_memory = false; + friend std::ostream& operator<<(std::ostream& os, + const ExistOptions& options) noexcept { + os << "ExistOptions: { prefetch_to_memory: " + << options.prefetch_to_memory << " }"; + return os; + } +}; + enum class ReplicaWriteMode { SINGLE_REPLICA, FLEXIBLE_DUAL_REPLICA, diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h index db3b65e910..77ceffcb10 100644 --- a/mooncake-store/include/rpc_service.h +++ b/mooncake-store/include/rpc_service.h @@ -59,6 +59,12 @@ class WrappedMasterService { tl::expected GetReplicaList( const std::string& key, const std::string& tenant_id = "default"); + tl::expected GetReplicaListForPrefetch( + const std::string& key); + + std::vector> + BatchGetReplicaListForPrefetch(const std::vector& keys); + std::vector> BatchGetReplicaList(const std::vector& keys, const std::string& tenant_id = "default"); @@ -198,6 +204,9 @@ class WrappedMasterService { const UUID& client_id, const std::vector& tasks, const std::vector& metadatas); + tl::expected RegisterPrefetchTask(const UUID& client_id, + const std::string& key); + // Promotion-on-hit RPCs. tl::expected, ErrorCode> PromotionObjectHeartbeat(const UUID& client_id); diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index 5fd071b59b..893db02ce6 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -881,6 +881,14 @@ class BucketStorageBackend : public StorageBackendInterface { */ void CleanupOrphanedBucket(int64_t bucket_id); + /** + * @brief Undo a successful metadata commit when NotifyOffloadSuccess fails. + * Caller must delete on-disk bucket files separately via CleanupOrphanedBucket. + */ + void RollbackCommittedBucket(int64_t bucket_id, + const std::vector& keys, + int64_t data_size, int64_t meta_size); + // Holds eviction state between PrepareEviction and FinalizeEviction. // PrepareEviction removes buckets from metadata maps and returns this. // FinalizeEviction waits for in-flight reads and deletes the files. diff --git a/mooncake-store/include/store_c.h b/mooncake-store/include/store_c.h index b5646cd10f..c21772ae76 100644 --- a/mooncake-store/include/store_c.h +++ b/mooncake-store/include/store_c.h @@ -33,6 +33,11 @@ struct mooncake_replicate_config { }; typedef struct mooncake_replicate_config mooncake_replicate_config_t; +struct mooncake_exist_options { + int prefetch_to_memory; +}; +typedef struct mooncake_exist_options mooncake_exist_options_t; + /* * All memory pointed to by the "char *" parameters will not be used * after the C function returns. @@ -100,6 +105,14 @@ int mooncake_store_is_exist(mooncake_store_t store, const char *key); int mooncake_store_batch_is_exist(mooncake_store_t store, const char **keys, size_t count, int *results_out); +int mooncake_store_is_exist_with_options( + mooncake_store_t store, const char *key, + const mooncake_exist_options_t *options); + +int mooncake_store_batch_is_exist_with_options( + mooncake_store_t store, const char **keys, size_t count, int *results_out, + const mooncake_exist_options_t *options); + int64_t mooncake_store_get_size(mooncake_store_t store, const char *key); int mooncake_store_get_hostname(mooncake_store_t store, char *buf_out, diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index 801693ee14..b0c60102fa 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -216,12 +216,29 @@ constexpr const char* CONFIG_KEY_RDMA_DEVICES = "rdma_devices"; constexpr const char* CONFIG_KEY_MASTER_SERVER_ADDR = "master_server_addr"; constexpr const char* CONFIG_KEY_IPC_SOCKET_PATH = "ipc_socket_path"; constexpr const char* CONFIG_KEY_TENANT_ID = "tenant_id"; +// Backoff window (seconds) for best-effort SSD prefetch after DRAM is +// saturated: while active, prefetch is skipped so eviction/offload can +// reclaim memory instead of competing with promotion. 0 disables the backoff. +constexpr const char* CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC = + "ssd_prefetch_cooldown_sec"; +// De-duplication / rate-limit TTL (seconds): the same key is prefetched at +// most once per this period, suppressing duplicate triggers from concurrent +// probes. +constexpr const char* CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC = + "ssd_prefetch_dedup_ttl_sec"; +// Max wait budget (milliseconds) on get when an SSD-only key has a prefetch +// in flight. Polls every 1 ms and returns early once promotion completes. +constexpr const char* CONFIG_KEY_SSD_GET_WAIT_MS = "ssd_get_wait_ms"; // Store client configuration defaults static constexpr size_t DEFAULT_GLOBAL_SEGMENT_SIZE = 1024 * 1024 * 16; // 16MB static constexpr size_t DEFAULT_LOCAL_BUFFER_SIZE = 1024 * 1024 * 16; // 16MB constexpr const char* DEFAULT_PROTOCOL = "tcp"; constexpr const char* DEFAULT_MASTER_SERVER_ADDR = "127.0.0.1:50051"; +static constexpr size_t DEFAULT_SSD_PREFETCH_COOLDOWN_SEC = 5; +static constexpr size_t DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC = 30; +// SSD->DRAM promotion p90 ~3 ms; 10 ms is enough headroom without wasting TTFT. +static constexpr size_t DEFAULT_SSD_GET_WAIT_MS = 10; inline std::string NormalizeTenantId(const std::string& tenant_id) { return tenant_id.empty() ? "default" : tenant_id; diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index c18dead179..7e0f341388 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -1013,6 +1013,57 @@ tl::expected Client::Query( start_time + std::chrono::milliseconds(result.value().lease_ttl_ms)); } +tl::expected Client::QueryForPrefetch( + const std::string& object_key) { + std::chrono::steady_clock::time_point start_time = + std::chrono::steady_clock::now(); + auto result = master_client_.GetReplicaListForPrefetch(object_key); + if (!result) { + return tl::unexpected(result.error()); + } + return QueryResult( + std::move(result.value().replicas), + start_time + std::chrono::milliseconds(result.value().lease_ttl_ms)); +} + +std::vector> Client::BatchQueryForPrefetch( + const std::vector& object_keys) { + std::chrono::steady_clock::time_point start_time = + std::chrono::steady_clock::now(); + auto response = + master_client_.BatchGetReplicaListForPrefetch(object_keys); + + if (response.size() != object_keys.size()) { + LOG(ERROR) << "BatchQueryForPrefetch response size mismatch. Expected: " + << object_keys.size() << ", Got: " << response.size(); + std::vector> results; + results.reserve(object_keys.size()); + for (size_t i = 0; i < object_keys.size(); ++i) { + results.emplace_back(tl::unexpected(ErrorCode::RPC_FAIL)); + } + return results; + } + + std::vector> results; + results.reserve(response.size()); + for (size_t i = 0; i < response.size(); ++i) { + if (response[i]) { + results.emplace_back(QueryResult( + std::move(response[i].value().replicas), + start_time + std::chrono::milliseconds( + response[i].value().lease_ttl_ms))); + } else { + results.emplace_back(tl::unexpected(response[i].error())); + } + } + return results; +} + +tl::expected Client::RegisterPrefetchTask( + const std::string& object_key) { + return master_client_.RegisterPrefetchTask(client_id_, object_key); +} + std::vector> Client::BatchQuery( const std::vector& object_keys) { return BatchQuery(object_keys, master_client_.tenant_id()); diff --git a/mooncake-store/src/dummy_client.cpp b/mooncake-store/src/dummy_client.cpp index c1a84a5c2a..fe090fd2e7 100644 --- a/mooncake-store/src/dummy_client.cpp +++ b/mooncake-store/src/dummy_client.cpp @@ -860,7 +860,11 @@ std::vector DummyClient::batchRemove(const std::vector& keys, return results; } -int DummyClient::isExist(const std::string& key) { +int DummyClient::isExist(const std::string& key, const ExistOptions& options) { + if (options.prefetch_to_memory) { + LOG(WARNING) << "SSD prefetch is not supported in DummyClient mode; " + << "prefetch_to_memory option ignored."; + } auto result = invoke_rpc<&RealClient::isExist_internal, bool>(key); if (result.has_value()) { @@ -870,8 +874,12 @@ int DummyClient::isExist(const std::string& key) { } } -std::vector DummyClient::batchIsExist( - const std::vector& keys) { +std::vector DummyClient::batchIsExist(const std::vector& keys, + const ExistOptions& options) { + if (options.prefetch_to_memory) { + LOG(WARNING) << "SSD prefetch is not supported in DummyClient mode; " + << "prefetch_to_memory option ignored."; + } auto internal_results = invoke_batch_rpc<&RealClient::batchIsExist_internal, bool>(keys.size(), keys); diff --git a/mooncake-store/src/file_storage.cpp b/mooncake-store/src/file_storage.cpp index 4830c13eb3..57b076fe9c 100644 --- a/mooncake-store/src/file_storage.cpp +++ b/mooncake-store/src/file_storage.cpp @@ -833,6 +833,128 @@ tl::expected FileStorage::ProcessPromotionTasks() { return {}; } +tl::expected FileStorage::PrefetchKeys( + const std::vector& keys, const std::vector& sizes, + bool* dram_pressure, PrefetchKeyCallback on_key_done) { + if (client_ == nullptr) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + if (keys.size() != sizes.size()) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + const std::vector preferred_segments; + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + const auto& tenant_id = client_->tenant_id(); + const auto storage_key = + MakeTenantScopedStorageKey(tenant_id, key); + int64_t size = sizes[i]; + + if (size <= 0) { + LOG(WARNING) << "SSD prefetch: skipping key=" << key + << " with non-positive size=" << size; + continue; + } + + auto alloc_result = client_->PromotionAllocStart( + key, tenant_id, static_cast(size), preferred_segments); + if (!alloc_result) { + VLOG(1) << "SSD prefetch: PromotionAllocStart failed for key=" + << key << ", error=" << alloc_result.error(); + if (dram_pressure != nullptr && + alloc_result.error() == ErrorCode::NO_AVAILABLE_HANDLE) { + *dram_pressure = true; + } + auto release = client_->NotifyPromotionFailure(key, tenant_id); + if (!release) { + VLOG(1) << "SSD prefetch: NotifyPromotionFailure failed for" + << " key=" << key << ", error=" << release.error(); + } + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto release_master_state = [this, &key, &tenant_id]() { + auto release = client_->NotifyPromotionFailure(key, tenant_id); + if (!release) { + VLOG(1) << "SSD prefetch: NotifyPromotionFailure failed for" + << " key=" << key << ", error=" << release.error(); + } + }; + + std::vector single_key{storage_key}; + std::vector single_size{size}; + auto allocate_res = AllocateBatch(single_key, single_size); + if (!allocate_res) { + LOG(WARNING) << "SSD prefetch: AllocateBatch failed for key=" << key + << ", error=" << allocate_res.error(); + if (dram_pressure != nullptr && + allocate_res.error() == ErrorCode::NO_AVAILABLE_HANDLE) { + *dram_pressure = true; + } + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + auto staging = allocate_res.value(); + auto load_res = BatchLoad(staging->slices); + if (!load_res) { + LOG(WARNING) << "SSD prefetch: BatchLoad failed for key=" << key + << ", error=" << load_res.error(); + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto slice_it = staging->slices.find(storage_key); + if (slice_it == staging->slices.end()) { + LOG(WARNING) << "SSD prefetch: staging slice missing for key=" + << key; + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + std::vector tx_slices{slice_it->second}; + ErrorCode write_err = client_->PromotionWrite( + alloc_result.value().memory_descriptor, tx_slices); + if (write_err != ErrorCode::OK) { + LOG(WARNING) << "SSD prefetch: PromotionWrite failed for key=" + << key << ", error=" << write_err; + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + + auto notify_res = client_->NotifyPromotionSuccess(key, tenant_id); + if (!notify_res) { + LOG(WARNING) << "SSD prefetch: NotifyPromotionSuccess failed for" + << "key=" << key << ", error=" << notify_res.error(); + release_master_state(); + if (on_key_done) { + on_key_done(key, false); + } + continue; + } + if (on_key_done) { + on_key_done(key, true); + } + } + + return {}; +} + tl::expected FileStorage::BatchLoad( std::unordered_map& batch_object) { auto start_time = std::chrono::steady_clock::now(); diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp index dc9282866d..c18f00894a 100644 --- a/mooncake-store/src/master_client.cpp +++ b/mooncake-store/src/master_client.cpp @@ -37,6 +37,21 @@ struct RpcNameTraits<&WrappedMasterService::GetReplicaList> { static constexpr const char* value = "GetReplicaList"; }; +template <> +struct RpcNameTraits<&WrappedMasterService::GetReplicaListForPrefetch> { + static constexpr const char* value = "GetReplicaListForPrefetch"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::BatchGetReplicaListForPrefetch> { + static constexpr const char* value = "BatchGetReplicaListForPrefetch"; +}; + +template <> +struct RpcNameTraits<&WrappedMasterService::RegisterPrefetchTask> { + static constexpr const char* value = "RegisterPrefetchTask"; +}; + template <> struct RpcNameTraits<&WrappedMasterService::CalcCacheStats> { static constexpr const char* value = "CalcCacheStats"; @@ -530,6 +545,40 @@ tl::expected MasterClient::GetReplicaList( return result; } +tl::expected +MasterClient::GetReplicaListForPrefetch(const std::string& object_key) { + ScopedVLogTimer timer(1, "MasterClient::GetReplicaListForPrefetch"); + timer.LogRequest("object_key=", object_key); + + auto result = invoke_rpc<&WrappedMasterService::GetReplicaListForPrefetch, + GetReplicaListResponse>(object_key); + timer.LogResponseExpected(result); + return result; +} + +std::vector> +MasterClient::BatchGetReplicaListForPrefetch( + const std::vector& object_keys) { + ScopedVLogTimer timer(1, "MasterClient::BatchGetReplicaListForPrefetch"); + timer.LogRequest("keys_count=", object_keys.size()); + + auto result = + invoke_batch_rpc<&WrappedMasterService::BatchGetReplicaListForPrefetch, + GetReplicaListResponse>(object_keys.size(), object_keys); + timer.LogResponse("result=", result.size(), " operations"); + return result; +} + +tl::expected MasterClient::RegisterPrefetchTask( + const UUID& client_id, const std::string& key) { + ScopedVLogTimer timer(1, "MasterClient::RegisterPrefetchTask"); + timer.LogRequest("client_id=", client_id, ", key=", key); + auto result = invoke_rpc<&WrappedMasterService::RegisterPrefetchTask, void>( + client_id, key); + timer.LogResponseExpected(result); + return result; +} + std::vector> MasterClient::BatchGetReplicaList(const std::vector& object_keys) { return BatchGetReplicaList(object_keys, tenant_id_); diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index db74f7cc2c..f337b449a4 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -1442,6 +1442,90 @@ auto MasterService::GetReplicaList(const std::string& key, return resp; } +auto MasterService::GetReplicaListForPrefetch(const std::string& key) + -> tl::expected { + std::shared_lock shared_lock(snapshot_mutex_); + + // NOTE: prefetch path is not tenant-aware; scope to the default tenant. + MetadataAccessorRO accessor(this, MakeObjectIdentity(key, "default")); + if (!accessor.Exists()) { + VLOG(1) << "prefetch_metadata key=" << key << ", info=object_not_found"; + return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); + } + const auto& metadata = accessor.Get(); + + std::vector replica_list; + metadata.VisitReplicas( + &Replica::fn_is_completed, [&replica_list](const Replica& replica) { + replica_list.emplace_back(replica.get_descriptor()); + }); + + if (replica_list.empty()) { + LOG(WARNING) << "prefetch_metadata key=" << key + << ", error=replica_not_ready"; + return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); + } + + return GetReplicaListResponse(std::move(replica_list), + default_kv_lease_ttl_); +} + +auto MasterService::RegisterPrefetchTask(const UUID& client_id, + const std::string& key) + -> tl::expected { + std::shared_lock shared_lock(snapshot_mutex_); + // NOTE: prefetch path is not tenant-aware; scope to the default tenant. + MetadataAccessorRW accessor(this, MakeObjectIdentity(key, "default")); + if (!accessor.Exists()) { + return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); + } + auto& metadata = accessor.Get(); + auto& tenant_state = accessor.GetTenantState(); + + if (metadata.HasReplica(&Replica::fn_is_memory_replica)) { + return {}; + } + + if (tenant_state.promotion_tasks.count(key) > 0) { + return {}; + } + + if (promotion_in_flight_.load(std::memory_order_relaxed) >= + promotion_queue_limit_) { + return tl::make_unexpected(ErrorCode::KEYS_ULTRA_LIMIT); + } + + Replica* source = nullptr; + metadata.VisitReplicas(&Replica::fn_is_local_disk_replica, + [&source](Replica& r) { + if (source == nullptr) source = &r; + }); + if (source == nullptr) { + return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); + } + + auto holder_id = source->get_local_disk_client_id(); + if (!holder_id.has_value() || holder_id.value() != client_id) { + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); + } + + source->inc_refcnt(); + const uint64_t object_size = + source->get_descriptor().get_local_disk_descriptor().object_size; + + tenant_state.promotion_tasks.emplace( + key, PromotionTask{.source_id = source->id(), + .alloc_id = 0, + .object_size = object_size, + .start_time = std::chrono::system_clock::now(), + .holder_id = holder_id.value(), + .from_prefetch = true}); + promotion_in_flight_.fetch_add(1, std::memory_order_relaxed); + VLOG(1) << "prefetch_task_registered key=" << key + << " size=" << object_size; + return {}; +} + auto MasterService::AllocateAndInsertMetadata( MetadataShardAccessorRW& shard, const UUID& client_id, const std::string& key, uint64_t value_length, @@ -3650,6 +3734,7 @@ auto MasterService::NotifyPromotionSuccess(const UUID& client_id, if (task_it->second.holder_id != client_id) { return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } + const bool from_prefetch = task_it->second.from_prefetch; bool committed = false; Replica* staged = metadata.GetReplicaByID(task_it->second.alloc_id); @@ -3694,6 +3779,12 @@ auto MasterService::NotifyPromotionSuccess(const UUID& client_id, if (!committed) { return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY); } + + // Prefetch-promoted keys get the same lease as exist/get so DRAM survives + // until the subsequent get() (see ssd-prefetch.md §5.3). + if (from_prefetch) { + GrantLeaseForGroup(tenant_state, object_id.user_key, metadata); + } return {}; } diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 81fd32d8e6..52aedb8d6f 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -15,6 +15,7 @@ #include // for atexit #include #include +#include #include #include #include @@ -901,6 +902,8 @@ tl::expected RealClient::setup_internal( ->register_handler<&RealClient::batch_get_offload_object>(this); offload_rpc_server_ ->register_handler<&RealClient::release_offload_buffer>(this); + offload_rpc_server_ + ->register_handler<&RealClient::prefetch_offload_object>(this); offload_rpc_server_->async_start(); auto err = offload_rpc_server_->get_errc(); if (err) { @@ -930,6 +933,7 @@ tl::expected RealClient::setup_internal( << init_result.error(); return init_result; } + initPrefetchRuntime(); } client_requester_ = std::make_shared(); if (FLAGS_enable_http_server) { @@ -949,7 +953,17 @@ int RealClient::setup_real( const std::string &master_server_addr, const std::shared_ptr &transfer_engine, const std::string &ipc_socket_path, bool enable_ssd_offload, - const std::string &ssd_offload_path, const std::string &tenant_id) { + const std::string &ssd_offload_path, const std::string &tenant_id, + int64_t ssd_prefetch_cooldown_sec, int64_t ssd_prefetch_dedup_ttl_sec) { + if (prefetch_throttle_) { + prefetch_throttle_->configure(ssd_prefetch_cooldown_sec, + ssd_prefetch_dedup_ttl_sec); + LOG(INFO) << "SSD prefetch throttle: " + << CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC << "=" + << ssd_prefetch_cooldown_sec << "s, " + << CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC << "=" + << ssd_prefetch_dedup_ttl_sec << "s"; + } return to_py_ret(setup_internal( local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, transfer_engine, @@ -1055,6 +1069,31 @@ tl::expected RealClient::setup_internal( bool enable_ssd_offload = (enable_ssd_offload_str == "true" || enable_ssd_offload_str == "1"); + // SSD prefetch throttle tunables (seconds). Stored on the per-client + // throttle shared with detached prefetch threads. + size_t ssd_prefetch_cooldown_sec = + get_config_size(config, CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC, + DEFAULT_SSD_PREFETCH_COOLDOWN_SEC) + .value_or(DEFAULT_SSD_PREFETCH_COOLDOWN_SEC); + size_t ssd_prefetch_dedup_ttl_sec = + get_config_size(config, CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC, + DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC) + .value_or(DEFAULT_SSD_PREFETCH_DEDUP_TTL_SEC); + if (prefetch_throttle_) { + prefetch_throttle_->configure( + static_cast(ssd_prefetch_cooldown_sec), + static_cast(ssd_prefetch_dedup_ttl_sec)); + LOG(INFO) << "SSD prefetch throttle: " + << CONFIG_KEY_SSD_PREFETCH_COOLDOWN_SEC << "=" + << ssd_prefetch_cooldown_sec << "s, " + << CONFIG_KEY_SSD_PREFETCH_DEDUP_TTL_SEC << "=" + << ssd_prefetch_dedup_ttl_sec << "s"; + } + size_t ssd_get_wait_ms = + get_config_size(config, CONFIG_KEY_SSD_GET_WAIT_MS, + DEFAULT_SSD_GET_WAIT_MS) + .value_or(DEFAULT_SSD_GET_WAIT_MS); + ssd_get_wait_ms_config_ = static_cast(ssd_get_wait_ms); return setup_internal( local_hostname, metadata_server, global_segment_size, local_buffer_size, protocol, rdma_devices, master_server_addr, nullptr, ipc_socket_path, @@ -2002,33 +2041,453 @@ tl::expected RealClient::isExist_internal( return client_->IsExist(key); } -int RealClient::isExist(const std::string &key) { +int RealClient::isExist(const std::string &key, const ExistOptions &options) { auto result = isExist_internal(key); if (result.has_value()) { + if (options.prefetch_to_memory && *result && file_storage_) { + triggerSsdPrefetch({key}); + } return *result ? 1 : 0; // 1 if exists, 0 if not } else { return toInt(result.error()); } } -std::vector RealClient::batchIsExist( - const std::vector &keys) { +std::vector RealClient::batchIsExist(const std::vector &keys, + const ExistOptions &options) { auto internal_results = batchIsExist_internal(keys); std::vector results; results.reserve(internal_results.size()); - for (const auto &result : internal_results) { - if (result.has_value()) { - results.push_back(result.value() ? 1 : 0); // 1 if exists, 0 if not + std::vector prefetch_candidates; + for (size_t i = 0; i < internal_results.size(); ++i) { + if (internal_results[i].has_value()) { + bool exists = internal_results[i].value(); + results.push_back(exists ? 1 : 0); // 1 if exists, 0 if not + if (options.prefetch_to_memory && exists) { + prefetch_candidates.push_back(keys[i]); + } } else { - results.push_back(toInt(result.error())); + results.push_back(toInt(internal_results[i].error())); } } + if (options.prefetch_to_memory && !prefetch_candidates.empty() && + file_storage_) { + triggerSsdPrefetch(prefetch_candidates); + } + return results; } +namespace { +// Parse a non-negative integer env var, returning `fallback` when unset or +// invalid. +int64_t GetEnvInt64(const char *name, int64_t fallback) { + const char *raw = std::getenv(name); + if (raw == nullptr || *raw == '\0') { + return fallback; + } + try { + return static_cast(std::stoll(raw)); + } catch (const std::exception &) { + LOG(WARNING) << "Invalid value for env " << name << "='" << raw + << "', using default " << fallback; + return fallback; + } +} +} // namespace + +namespace { + +constexpr size_t kPrefetchMetadataChunkSize = 128; + +struct SsdPrefetchRoute { + int64_t local_disk_size{0}; + std::string holder_endpoint; +}; + +std::optional ClassifySsdPrefetchRoute( + const std::vector &replicas) { + bool has_memory = false; + bool has_local_disk = false; + SsdPrefetchRoute route; + for (const auto &replica : replicas) { + if (replica.status != ReplicaStatus::COMPLETE && + replica.status != ReplicaStatus::PROCESSING) { + continue; + } + if (replica.is_memory_replica()) { + has_memory = true; + break; + } + if (replica.is_local_disk_replica()) { + has_local_disk = true; + route.local_disk_size = + static_cast(calculate_total_size(replica)); + route.holder_endpoint = + replica.get_local_disk_descriptor().transport_endpoint; + } + } + if (has_memory || !has_local_disk || route.local_disk_size <= 0) { + return std::nullopt; + } + return route; +} + +void RunLocalPrefetchRegisterAndPromote( + const std::shared_ptr &client, FileStorage *file_storage, + const std::shared_ptr &throttle, + const FileStorage::PrefetchKeyCallback &on_key_done, + const std::vector &local_keys, + const std::vector &local_sizes) { + if (local_keys.empty() || file_storage == nullptr) { + return; + } + std::vector prefetch_keys; + std::vector prefetch_sizes; + prefetch_keys.reserve(local_keys.size()); + prefetch_sizes.reserve(local_keys.size()); + for (size_t i = 0; i < local_keys.size(); ++i) { + auto register_result = client->RegisterPrefetchTask(local_keys[i]); + if (!register_result) { + VLOG(1) << "SSD prefetch: RegisterPrefetchTask failed for" + << "key=" << local_keys[i] + << ", error=" << register_result.error(); + continue; + } + prefetch_keys.push_back(local_keys[i]); + prefetch_sizes.push_back(local_sizes[i]); + if (throttle) { + throttle->markInFlight(local_keys[i]); + } + VLOG(1) << "SSD prefetch: registered task for key=" << local_keys[i] + << ", size=" << local_sizes[i]; + } + if (prefetch_keys.empty()) { + return; + } + bool dram_pressure = false; + auto prefetch_res = file_storage->PrefetchKeys( + prefetch_keys, prefetch_sizes, &dram_pressure, on_key_done); + if (!prefetch_res) { + LOG(WARNING) << "SSD prefetch: PrefetchKeys failed, error=" + << prefetch_res.error(); + } else { + VLOG(1) << "SSD prefetch: PrefetchKeys completed for " + << prefetch_keys.size() << " key(s)"; + } + if (dram_pressure && throttle) { + throttle->enterCooldown(); + LOG(INFO) << "SSD prefetch: DRAM saturated, backing off " + "(ssd_prefetch_cooldown_sec)"; + } +} + +const char *ReplicaSourceLabel(const Replica::Descriptor &replica) { + if (replica.is_memory_replica()) { + return "DRAM"; + } + if (replica.is_local_disk_replica()) { + return "SSD"; + } + if (replica.is_disk_replica()) { + return "DISK"; + } + return "UNKNOWN"; +} + +// Returns refreshed QueryResult when master sees a COMPLETE MEMORY replica. +std::optional TryRefreshBestMemoryReplica( + Client *client, const std::string &key, + const std::unordered_set &local_endpoints) { + auto requery = client->Query(key); + if (!requery) { + return std::nullopt; + } + const auto *candidate = + SelectBestReplica(requery.value().replicas, local_endpoints); + if (!candidate || !candidate->is_memory_replica()) { + return std::nullopt; + } + return std::move(requery.value()); +} + +const char *ClassifyPrefetchOutcome(int64_t prefetch_trigger_ms, + int64_t prefetch_done_ms, + int64_t get_ms, const char *source, + PrefetchThrottle::State prefetch_state, + bool prefetch_wait_attempted, + bool promote_attempted) { + const bool from_dram = + (source != nullptr && std::strcmp(source, "DRAM") == 0); + const bool prefetch_involved = + prefetch_trigger_ms >= 0 || prefetch_wait_attempted; + if (!prefetch_involved) { + if (from_dram) { + return "dram_resident"; + } + // exist 时 BatchQuery 见 DRAM 未 promote(或无 trigger),get 时落 SSD。 + return "prefetch_evicted_after_exist"; + } + if (prefetch_trigger_ms >= 0 && + prefetch_state == PrefetchThrottle::State::kFailed) { + return "prefetch_failed"; + } + if (from_dram) { + if (prefetch_done_ms >= 0 && prefetch_done_ms <= get_ms) { + return "prefetch_hit"; + } + if (!promote_attempted) { + return "prefetch_dram_was_resident"; + } + return "prefetch_promoted_untracked"; + } + // 本 rank 曾 trigger 或已走 promote,但 get 仍读 SSD。 + if (prefetch_trigger_ms >= 0 || promote_attempted) { + return "prefetch_miss_race"; + } + // 本 rank 无 trigger、未 promote,get 时 SSD(exist 阶段可能在 DRAM)。 + return "prefetch_evicted_after_exist"; +} + +} // namespace + +void RealClient::initPrefetchRuntime() { + // Bounded worker pool for SSD prefetch promotion jobs. Fixed size, + // consistent with ClientService's task_thread_pool_(4); bounds concurrent + // SSD reads / DRAM allocations and avoids unbounded detached threads. + constexpr size_t kPrefetchThreadPoolSize = 4; + prefetch_pool_ = std::make_shared(kPrefetchThreadPoolSize); + + // get()-side wait-for-prefetch max budget (poll every 1 ms, early exit). + // Env overrides mooncake.json ssd_get_wait_ms. + const int64_t wait_from_env = GetEnvInt64("MOONCAKE_SSD_GET_WAIT_MS", -1); + if (wait_from_env >= 0) { + ssd_get_wait_ms_ = wait_from_env; + } else { + ssd_get_wait_ms_ = ssd_get_wait_ms_config_; + } + + LOG(INFO) << "SSD prefetch runtime: pool_size=" << kPrefetchThreadPoolSize + << ", ssd_get_wait_ms=" << ssd_get_wait_ms_ + << " (poll 1ms, early exit on completion)"; +} + +void RealClient::submitPrefetchJob(std::function job) { + if (prefetch_pool_) { + try { + prefetch_pool_->enqueue(std::move(job)); + return; + } catch (const std::exception &e) { + // Pool stopped (shutdown in progress): drop the best-effort job. + VLOG(1) << "SSD prefetch: pool enqueue failed (" << e.what() + << "), dropping job"; + return; + } + } + // Pool unavailable (not initialized / shutting down): drop the best-effort + // job. Never fall back to an unbounded detached thread -- that is exactly + // the prefetch-storm anti-pattern this bounded-pool path exists to avoid. + VLOG(1) << "SSD prefetch: pool unavailable, dropping job"; +} + +void RealClient::triggerSsdPrefetch(const std::vector &keys) { + auto throttle = prefetch_throttle_; + if (throttle && throttle->inCooldown()) { + VLOG(1) << "SSD prefetch: skipped (memory-pressure cooldown)"; + return; + } + if (keys.empty()) { + return; + } + + // Do not reserve()/record trigger at exist time. BatchQueryForPrefetch in the + // async job filters SSD-only keys first; reserve() runs only for keys that + // will actually RegisterPrefetchTask, avoiding false triggers on DRAM-resident + // keys (exist only knows "exists", not replica tier). + auto keys_copy = keys; + auto client = client_; + auto file_storage = file_storage_; + auto client_requester = client_requester_; + const std::string local_rpc_addr_copy = local_rpc_addr; + FileStorage::PrefetchKeyCallback on_key_done; + if (throttle) { + on_key_done = [throttle](const std::string &key, bool success) { + if (success) { + throttle->markCompleted(key); + } else { + throttle->markFailed(key); + } + }; + } + submitPrefetchJob([client, file_storage, client_requester, throttle, + local_rpc_addr_copy, on_key_done, + keys_copy = std::move(keys_copy)]() { + // Batch metadata queries in chunks, then register + promote each + // chunk immediately (pipeline) instead of waiting for all keys. + std::unordered_map> remote_keys; + std::unordered_map> remote_sizes; + + for (size_t offset = 0; offset < keys_copy.size(); + offset += kPrefetchMetadataChunkSize) { + const size_t end = std::min(offset + kPrefetchMetadataChunkSize, + keys_copy.size()); + std::vector chunk(keys_copy.begin() + offset, + keys_copy.begin() + end); + std::vector> batch_results; + try { + batch_results = client->BatchQueryForPrefetch(chunk); + } catch (const std::exception &e) { + LOG(WARNING) << "SSD prefetch: BatchQueryForPrefetch failed: " + << e.what(); + continue; + } + if (batch_results.size() != chunk.size()) { + LOG(WARNING) << "SSD prefetch: BatchQueryForPrefetch size " + "mismatch, expected " + << chunk.size() << ", got " + << batch_results.size(); + continue; + } + + std::vector chunk_local_keys; + std::vector chunk_local_sizes; + chunk_local_keys.reserve(chunk.size()); + chunk_local_sizes.reserve(chunk.size()); + + for (size_t i = 0; i < chunk.size(); ++i) { + if (!batch_results[i]) { + VLOG(1) << "SSD prefetch: metadata query failed for" + << " key=" << chunk[i] + << ", error=" << batch_results[i].error(); + continue; + } + auto route = + ClassifySsdPrefetchRoute(batch_results[i].value().replicas); + if (!route) { + continue; + } + if (route->holder_endpoint.empty() || + route->holder_endpoint == local_rpc_addr_copy) { + chunk_local_keys.push_back(chunk[i]); + chunk_local_sizes.push_back(route->local_disk_size); + } else { + remote_keys[route->holder_endpoint].push_back(chunk[i]); + remote_sizes[route->holder_endpoint].push_back( + route->local_disk_size); + } + } + + std::vector promote_local_keys; + std::vector promote_local_sizes; + if (throttle) { + auto reserved = throttle->reserve(chunk_local_keys); + std::unordered_set reserved_set(reserved.begin(), + reserved.end()); + promote_local_keys.reserve(reserved.size()); + promote_local_sizes.reserve(reserved.size()); + for (size_t i = 0; i < chunk_local_keys.size(); ++i) { + if (reserved_set.find(chunk_local_keys[i]) == + reserved_set.end()) { + continue; + } + promote_local_keys.push_back(chunk_local_keys[i]); + promote_local_sizes.push_back(chunk_local_sizes[i]); + } + } else { + promote_local_keys = std::move(chunk_local_keys); + promote_local_sizes = std::move(chunk_local_sizes); + } + + RunLocalPrefetchRegisterAndPromote( + client, file_storage.get(), throttle, on_key_done, + promote_local_keys, promote_local_sizes); + } + + // Remote branch: delegate each holder's keys via RPC. The holder + // registers the promotion task with its own client_id, so Master's + // holder check passes without changing Master logic. Best-effort. + if (client_requester) { + for (auto &[endpoint, group_keys] : remote_keys) { + VLOG(1) << "SSD prefetch: delegating " << group_keys.size() + << " key(s) to remote holder " << endpoint; + client_requester->prefetch_offload_object( + endpoint, group_keys, remote_sizes[endpoint]); + } + } + }); +} + +void RealClient::runLocalPrefetch(const std::vector &keys, + const std::vector &sizes) { + auto throttle = prefetch_throttle_; + if (throttle && throttle->inCooldown()) { + VLOG(1) << "SSD prefetch: skipped (memory-pressure cooldown)"; + return; + } + + std::unordered_set allowed(keys.begin(), keys.end()); + if (throttle) { + auto reserved = throttle->reserve(keys); + allowed.clear(); + allowed.insert(reserved.begin(), reserved.end()); + if (allowed.empty()) { + return; + } + } + + std::vector keys_copy; + std::vector sizes_copy; + keys_copy.reserve(keys.size()); + sizes_copy.reserve(keys.size()); + for (size_t i = 0; i < keys.size(); ++i) { + if (allowed.find(keys[i]) == allowed.end()) { + continue; + } + keys_copy.push_back(keys[i]); + sizes_copy.push_back(i < sizes.size() ? sizes[i] + : static_cast(0)); + } + if (keys_copy.empty()) { + return; + } + + auto client = client_; + auto file_storage = file_storage_; + FileStorage::PrefetchKeyCallback on_key_done; + if (throttle) { + on_key_done = [throttle](const std::string &key, bool success) { + if (success) { + throttle->markCompleted(key); + } else { + throttle->markFailed(key); + } + }; + } + submitPrefetchJob([client, file_storage, throttle, on_key_done, + keys_copy = std::move(keys_copy), + sizes_copy = std::move(sizes_copy)]() { + std::vector local_keys; + std::vector local_sizes; + local_keys.reserve(keys_copy.size()); + local_sizes.reserve(keys_copy.size()); + for (size_t i = 0; i < keys_copy.size(); ++i) { + const int64_t size = + i < sizes_copy.size() ? sizes_copy[i] : static_cast(0); + if (size <= 0) { + continue; + } + local_keys.push_back(keys_copy[i]); + local_sizes.push_back(size); + } + RunLocalPrefetchRegisterAndPromote( + client, file_storage.get(), throttle, on_key_done, local_keys, + local_sizes); + }); +} + tl::expected RealClient::getSize_internal( const std::string &key) { if (!client_) { @@ -4367,7 +4826,6 @@ std::vector> RealClient::batch_get_into_internal(const std::vector &keys, const std::vector &buffers, const std::vector &sizes) { - auto start_time = std::chrono::steady_clock::now(); // Validate preconditions if (!client_) { LOG(ERROR) << "Client is not initialized"; @@ -4636,10 +5094,7 @@ RealClient::batch_get_into_internal(const std::vector &keys, store_segment_it->second.emplace(op_it.first, op_it.second.slices); } - size_t offload_object_count = 0; - auto start_read_store_time = std::chrono::steady_clock::now(); for (auto &offload_objects_it : offload_objects) { - offload_object_count += offload_objects_it.second.size(); auto batch_get_offload_result = batch_get_into_offload_object_internal( offload_objects_it.first, offload_objects_it.second); if (!batch_get_offload_result) { @@ -4653,19 +5108,6 @@ RealClient::batch_get_into_internal(const std::vector &keys, } } - auto end_time = std::chrono::steady_clock::now(); - auto elapsed_time = std::chrono::duration_cast( - end_time - start_time) - .count(); - auto read_store_time = - std::chrono::duration_cast( - end_time - start_read_store_time) - .count(); - // LOG(INFO) << "Time taken for batch_get_into: " << elapsed_time - // << "us, read store: " << read_store_time - // << "us, with memory key count: " << valid_operations.size() - // << ", offload key count: " << offload_object_count; - return results; } @@ -4924,8 +5366,80 @@ RealClient::batch_get_into_multi_buffers_internal( results.emplace_back(tl::unexpected(ErrorCode::INVALID_REPLICA)); continue; } + + // get()-side wait-for-prefetch (A2): poll until promotion completes or + // the configured budget expires (default 10 ms, 1 ms poll interval). + // Local throttle covers TP0; master Query covers TP1~7 where exist + // triggered prefetch in another process. + int64_t prefetch_trigger_ms = + prefetch_throttle_ ? prefetch_throttle_->triggeredAt(key) : -1; + int64_t prefetch_done_ms = + prefetch_throttle_ ? prefetch_throttle_->completedAt(key) : -1; + PrefetchThrottle::State prefetch_state = + prefetch_throttle_ ? prefetch_throttle_->stateOf(key) + : PrefetchThrottle::State::kTriggered; + bool prefetch_wait_attempted = false; + const char *prefetch_wait_mode = "none"; + std::optional refreshed_qr; + if (best_replica->is_local_disk_replica() && ssd_get_wait_ms_ > 0) { + prefetch_wait_attempted = true; + constexpr int64_t kPollMs = 1; + if (prefetch_trigger_ms >= 0 && prefetch_throttle_) { + prefetch_wait_mode = "local"; + if (prefetch_state != PrefetchThrottle::State::kCompleted) { + prefetch_throttle_->waitForCompletion(key, ssd_get_wait_ms_, + kPollMs); + } + prefetch_done_ms = prefetch_throttle_->completedAt(key); + prefetch_state = prefetch_throttle_->stateOf(key); + if (auto qr = TryRefreshBestMemoryReplica( + client_.get(), key, local_endpoints)) { + refreshed_qr.emplace(std::move(*qr)); + best_replica = SelectBestReplica(refreshed_qr->replicas, + local_endpoints); + if (prefetch_done_ms < 0) { + prefetch_done_ms = PrefetchThrottle::NowMs(); + } + } + } else { + prefetch_wait_mode = "master"; + const int64_t deadline = + PrefetchThrottle::NowMs() + ssd_get_wait_ms_; + while (PrefetchThrottle::NowMs() < deadline) { + if (auto qr = TryRefreshBestMemoryReplica( + client_.get(), key, local_endpoints)) { + refreshed_qr.emplace(std::move(*qr)); + best_replica = SelectBestReplica(refreshed_qr->replicas, + local_endpoints); + prefetch_done_ms = PrefetchThrottle::NowMs(); + break; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(kPollMs)); + } + } + } + const auto replica = *best_replica; uint64_t total_size = calculate_total_size(replica); + const int64_t get_ms = PrefetchThrottle::NowMs(); + const char *source = ReplicaSourceLabel(replica); + const bool promote_attempted = + prefetch_throttle_ ? prefetch_throttle_->promoteAttempted(key) + : false; + const char *outcome = ClassifyPrefetchOutcome( + prefetch_trigger_ms, prefetch_done_ms, get_ms, source, + prefetch_state, prefetch_wait_attempted, promote_attempted); + VLOG(1) << "[GET-SRC] key=" << key << " source=" << source + << " size=" << total_size + << " prefetch_trigger_ms=" << prefetch_trigger_ms + << " prefetch_done_ms=" << prefetch_done_ms + << " get_ms=" << get_ms + << " prefetch_wait_mode=" << prefetch_wait_mode + << " prefetch_promote_attempted=" + << (promote_attempted ? "1" : "0") + << " path=multi_buffers" + << " [PREFETCH-OUTCOME] outcome=" << outcome; const auto &sizes = all_sizes[i]; uint64_t dst_total_size = 0; for (auto &size : sizes) { @@ -4972,7 +5486,8 @@ RealClient::batch_get_into_multi_buffers_internal( valid_operations.push_back( {.key = key, .original_index = i, - .query_result = FilterQueryResult(query_result_values, replica), + .query_result = FilterQueryResult( + refreshed_qr ? *refreshed_qr : query_result_values, replica), .slices = std::move(key_slices), .total_size = total_size}); // Set success result (actual bytes transferred) @@ -5580,6 +6095,23 @@ bool RealClient::release_offload_buffer(uint64_t batch_id) { return file_storage_->ReleaseBuffer(batch_id); } +bool RealClient::prefetch_offload_object(const std::vector &keys, + const std::vector &sizes) { + if (!file_storage_) { + VLOG(1) << "prefetch_offload_object called but file_storage_ is null"; + return false; + } + if (keys.empty()) { + return true; + } + VLOG(1) << "SSD prefetch: received remote prefetch request for " + << keys.size() << " key(s)"; + // We are the holder for these keys: promote SSD->DRAM locally. Runs on a + // detached thread so the coro_rpc IO thread is not blocked on SSD I/O. + runLocalPrefetch(keys, sizes); + return true; +} + tl::expected RealClient::batch_get_into_offload_object_internal( const std::string &target_rpc_service_addr, @@ -5696,6 +6228,24 @@ void ClientRequester::release_offload_buffer(const std::string &client_addr, } } +void ClientRequester::prefetch_offload_object( + const std::string &client_addr, const std::vector &keys, + const std::vector &sizes) { + // Best-effort delegation to the remote holder. Errors (holder down, + // network) are logged but never propagated: the caller's exist() must not + // block on prefetch, and get() can still fall back to a cross-node SSD + // read if the promotion did not land in time. + auto result = invoke_rpc<&RealClient::prefetch_offload_object, bool>( + client_addr, keys, sizes); + if (!result) { + VLOG(1) << "Failed to invoke prefetch_offload_object, client_addr = " + << client_addr << ", error is: " << result.error(); + } else { + VLOG(1) << "Delegated prefetch of " << keys.size() + << " key(s) to holder " << client_addr; + } +} + template tl::expected ClientRequester::invoke_rpc( const std::string &client_addr, Args &&...args) { diff --git a/mooncake-store/src/rpc_service.cpp b/mooncake-store/src/rpc_service.cpp index 441db11dc1..21350aca3b 100644 --- a/mooncake-store/src/rpc_service.cpp +++ b/mooncake-store/src/rpc_service.cpp @@ -992,6 +992,55 @@ WrappedMasterService::GetReplicaList(const std::string& key, }); } +tl::expected +WrappedMasterService::GetReplicaListForPrefetch(const std::string& key) { + ScopedVLogTimer timer(1, "GetReplicaListForPrefetch"); + timer.LogRequest("key=", key); + auto result = master_service_.GetReplicaListForPrefetch(key); + timer.LogResponseExpected(result); + return result; +} + +std::vector> +WrappedMasterService::BatchGetReplicaListForPrefetch( + const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchGetReplicaListForPrefetch"); + const size_t total_keys = keys.size(); + timer.LogRequest("keys_count=", total_keys); + + std::vector> results; + results.reserve(keys.size()); + + for (const auto& key : keys) { + results.emplace_back(master_service_.GetReplicaListForPrefetch(key)); + } + + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + auto error = results[i].error(); + if (error == ErrorCode::OBJECT_NOT_FOUND || + error == ErrorCode::REPLICA_IS_NOT_READY) { + VLOG(1) << "BatchGetReplicaListForPrefetch failed for key[" << i + << "] '" << keys[i] << "': " << toString(error); + } else { + LOG(ERROR) << "BatchGetReplicaListForPrefetch failed for key[" + << i << "] '" << keys[i] + << "': " << toString(error); + } + } + } + return results; +} + +tl::expected WrappedMasterService::RegisterPrefetchTask( + const UUID& client_id, const std::string& key) { + ScopedVLogTimer timer(1, "RegisterPrefetchTask"); + timer.LogRequest("client_id=", client_id, ", key=", key); + auto result = master_service_.RegisterPrefetchTask(client_id, key); + timer.LogResponseExpected(result); + return result; +} + std::vector> WrappedMasterService::BatchGetReplicaList(const std::vector& keys, const std::string& tenant_id) { @@ -2019,6 +2068,12 @@ void RegisterRpcService( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::GetReplicaList>( &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::GetReplicaListForPrefetch>( + &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::BatchGetReplicaListForPrefetch>( + &wrapped_master_service); server .register_handler<&mooncake::WrappedMasterService::BatchGetReplicaList>( &wrapped_master_service); @@ -2138,6 +2193,9 @@ void RegisterRpcService( &wrapped_master_service); server.register_handler<&mooncake::WrappedMasterService::FetchTasks>( &wrapped_master_service); + server.register_handler< + &mooncake::WrappedMasterService::RegisterPrefetchTask>( + &wrapped_master_service); server .register_handler<&mooncake::WrappedMasterService::MarkTaskToComplete>( &wrapped_master_service); diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index 0820170190..3a12db2f9f 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -1329,23 +1329,19 @@ tl::expected BucketStorageBackend::BatchOffload( LOG(ERROR) << "Failed to write bucket with id: " << bucket_id; return tl::make_unexpected(write_bucket_result.error()); } - if (complete_handler != nullptr) { - auto error_code = complete_handler(bucket->keys, metadatas); - if (error_code != ErrorCode::OK) { - LOG(ERROR) << "Complete handler failed: " << error_code - << ", Key count: " << bucket->keys.size() - << ", Bucket id: " << bucket_id; - return tl::make_unexpected(error_code); - } - } - // Commit to metadata maps under exclusive lock. - // Check for duplicate keys and rollback if any found. + const std::vector committed_keys = bucket->keys; + const int64_t bucket_data_size = bucket->data_size; + const int64_t bucket_meta_size = bucket->meta_size; + std::vector metadatas_for_notify = metadatas; + + // Commit local metadata before notifying master so concurrent BatchLoad + // never observes a LOCAL_DISK replica while object_bucket_map_ is empty. { SharedMutexLocker lock(&mutex_); // Pre-check for duplicates before modifying any state - for (const auto& key : bucket->keys) { + for (const auto& key : committed_keys) { if (object_bucket_map_.find(key) != object_bucket_map_.end()) { LOG(WARNING) << "Duplicate key detected in BatchOffload: " << key @@ -1358,21 +1354,36 @@ tl::expected BucketStorageBackend::BatchOffload( } // No duplicates found, safe to commit - total_size_ += bucket->data_size + bucket->meta_size; + total_size_ += bucket_data_size + bucket_meta_size; object_bucket_map_.reserve(object_bucket_map_.size() + - bucket->keys.size()); - for (size_t i = 0; i < bucket->keys.size(); ++i) { + committed_keys.size()); + for (size_t i = 0; i < committed_keys.size(); ++i) { auto [it, inserted] = object_bucket_map_.insert( - {bucket->keys[i], std::move(metadatas[i])}); + {committed_keys[i], std::move(metadatas[i])}); if (!inserted) { LOG(ERROR) << "Unexpected duplicate key after pre-check: " - << bucket->keys[i] << ", bucket_id=" << bucket_id; + << committed_keys[i] << ", bucket_id=" << bucket_id; } } buckets_.emplace(bucket_id, std::move(bucket)); lru_index_.emplace(0LL, bucket_id); } + if (complete_handler != nullptr) { + auto error_code = + complete_handler(committed_keys, metadatas_for_notify); + if (error_code != ErrorCode::OK) { + LOG(ERROR) << "Complete handler failed: " << error_code + << ", Key count: " << committed_keys.size() + << ", Bucket id: " << bucket_id + << ". Rolling back local metadata commit."; + RollbackCommittedBucket(bucket_id, committed_keys, bucket_data_size, + bucket_meta_size); + CleanupOrphanedBucket(bucket_id); + return tl::make_unexpected(error_code); + } + } + return bucket_id; } @@ -2126,6 +2137,24 @@ tl::expected BucketStorageBackend::WriteBucket( return {}; } +void BucketStorageBackend::RollbackCommittedBucket( + int64_t bucket_id, const std::vector& keys, + int64_t data_size, int64_t meta_size) { + SharedMutexLocker lock(&mutex_); + for (const auto& key : keys) { + object_bucket_map_.erase(key); + } + buckets_.erase(bucket_id); + for (auto it = lru_index_.begin(); it != lru_index_.end();) { + if (it->second == bucket_id) { + it = lru_index_.erase(it); + } else { + ++it; + } + } + total_size_ -= data_size + meta_size; +} + void BucketStorageBackend::CleanupOrphanedBucket(int64_t bucket_id) { namespace fs = std::filesystem; std::error_code ec; diff --git a/mooncake-store/src/store_c.cpp b/mooncake-store/src/store_c.cpp index a3a19086ef..8bd2bf7a62 100644 --- a/mooncake-store/src/store_c.cpp +++ b/mooncake-store/src/store_c.cpp @@ -58,6 +58,14 @@ mooncake::ReplicateConfig to_replicate_config( return config; } +mooncake::ExistOptions to_exist_options( + const mooncake_exist_options_t *c_options) { + mooncake::ExistOptions options; + if (!c_options) return options; + options.prefetch_to_memory = c_options->prefetch_to_memory != 0; + return options; +} + StoreHandle *as_handle(mooncake_store_t store) { return static_cast(store); } @@ -280,6 +288,42 @@ int mooncake_store_batch_is_exist(mooncake_store_t store, const char **keys, } } +int mooncake_store_is_exist_with_options( + mooncake_store_t store, const char *key, + const mooncake_exist_options_t *options) { + if (!store || !key) return -1; + try { + return as_client(store)->isExist(std::string(key), + to_exist_options(options)); + } catch (...) { + return -1; + } +} + +int mooncake_store_batch_is_exist_with_options( + mooncake_store_t store, const char **keys, size_t count, int *results_out, + const mooncake_exist_options_t *options) { + if (!store || !keys || !results_out) return -1; + for (size_t i = 0; i < count; ++i) { + if (!keys[i]) return -1; + } + try { + std::vector key_vec; + key_vec.reserve(count); + for (size_t i = 0; i < count; ++i) { + key_vec.emplace_back(keys[i]); + } + auto results = + as_client(store)->batchIsExist(key_vec, to_exist_options(options)); + for (size_t i = 0; i < count; ++i) { + results_out[i] = (i < results.size()) ? results[i] : -1; + } + return 0; + } catch (...) { + return -1; + } +} + int64_t mooncake_store_get_size(mooncake_store_t store, const char *key) { if (!store || !key) return -1; try { From 46db15cb3e7ee710fb8a2d4a57b8a8c15fa6d48d Mon Sep 17 00:00:00 2001 From: h30027576 Date: Sat, 27 Jun 2026 11:42:22 +0800 Subject: [PATCH 2/2] Add UT Signed-off-by: h30027576 --- .../tests/test_prefetch_on_exist.py | 509 ++++++++++++++++++ 1 file changed, 509 insertions(+) create mode 100644 mooncake-wheel/tests/test_prefetch_on_exist.py diff --git a/mooncake-wheel/tests/test_prefetch_on_exist.py b/mooncake-wheel/tests/test_prefetch_on_exist.py new file mode 100644 index 0000000000..064e996b6e --- /dev/null +++ b/mooncake-wheel/tests/test_prefetch_on_exist.py @@ -0,0 +1,509 @@ +"""Python-binding test for the SSD-to-DRAM prefetch-on-exist feature (RFC #2213). + +Validates that ``is_exist(key, ExistOptions(prefetch_to_memory=True))`` / +``batch_is_exist(keys, options)`` triggers asynchronous SSD-to-DRAM promotion +via the dedicated ``FileStorage::PrefetchKeys`` path (not promotion-on-hit +heartbeat queue). + +Test scenario: + 1. Push enough data to overflow DRAM, forcing eviction + offload to turn + warm keys into LOCAL_DISK-only objects. + 2. Identify a LOCAL_DISK-only key from the replica descriptors. + 3. Call ``is_exist(key, options)`` with ``prefetch_to_memory=True``. + 4. Wait for async ``PrefetchKeys`` (thread-pool + batched metadata query). + 5. Assert the key now has a MEMORY replica (prefetch succeeded). + 6. Assert that a subsequent ``get`` serves from MEMORY (not SSD). + +Prerequisites: + - ``mooncake_master`` running with: + ``--enable_offload=true`` + ``--offload_on_evict=true`` + ``--root_fs_dir=`` + - ``MOONCAKE_OFFLOAD_FILE_STORAGE_PATH=`` set on the client side + (or pass ``ssd_offload_path`` via setup). + - ``MOONCAKE_OFFLOAD_BUCKET_KEYS_LIMIT=10`` and + ``MOONCAKE_OFFLOAD_BUCKET_SIZE_LIMIT_BYTES=10485760`` on the client. +""" + +import os +import statistics +import time +import unittest + +try: + import torch_npu + + torch_npu.npu.set_device(int(os.getenv("NPU_DEVICE_ID", "0"))) +except ImportError: + pass + +from mooncake.store import ExistOptions, MooncakeDistributedStore + + +def _prefetch_options(enabled: bool = True) -> ExistOptions: + options = ExistOptions() + options.prefetch_to_memory = enabled + return options + + +DEFAULT_KV_LEASE_TTL = 5000 # ms +default_kv_lease_ttl = int(os.getenv("DEFAULT_KV_LEASE_TTL", DEFAULT_KV_LEASE_TTL)) + +SEGMENT_SIZE = int(os.getenv("SEGMENT_SIZE_BYTES", str(32 * 1024 * 1024))) +LOCAL_BUFFER_SIZE = int(os.getenv("LOCAL_BUFFER_SIZE_BYTES", str(64 * 1024 * 1024))) + +# Async prefetch runs on a bounded thread pool with batched metadata queries; +# poll briefly instead of relying on a single fixed sleep. +PREFETCH_WAIT_SECONDS = int(os.getenv("PREFETCH_WAIT_SECONDS", "15")) +PREFETCH_POLL_INTERVAL_SECONDS = float( + os.getenv("PREFETCH_POLL_INTERVAL_SECONDS", "0.2") +) +EVICTION_WAIT_SECONDS = int(os.getenv("EVICTION_WAIT_SECONDS", "25")) + + +def setup_store(store, local_hostname=None): + protocol = os.getenv("PROTOCOL", "tcp") + device_name = os.getenv("DEVICE_NAME", "") + if local_hostname is None: + local_hostname = os.getenv("LOCAL_HOSTNAME", "127.0.0.1") + metadata_server = os.getenv("MC_METADATA_SERVER", "P2PHANDSHAKE") + master_server_address = os.getenv("MASTER_SERVER", "127.0.0.1:50051") + ssd_offload_path = os.getenv("MOONCAKE_OFFLOAD_FILE_STORAGE_PATH", "") + tenant_id = os.getenv("MOONCAKE_TENANT_ID", "default") + prefetch_cooldown_sec = int(os.getenv("SSD_PREFETCH_COOLDOWN_SEC", "5")) + prefetch_dedup_ttl_sec = int(os.getenv("SSD_PREFETCH_DEDUP_TTL_SEC", "30")) + + retcode = store.setup( + local_hostname, + metadata_server, + SEGMENT_SIZE, + LOCAL_BUFFER_SIZE, + protocol, + device_name, + master_server_address, + None, # engine + True, # enable_ssd_offload + ssd_offload_path, + tenant_id, + prefetch_cooldown_sec, + prefetch_dedup_ttl_sec, + ) + if retcode: + raise RuntimeError(f"Failed to setup store client. Return code: {retcode}") + + +def _replica_types(descs, key): + infos = descs.get(key) if isinstance(descs, dict) else None + if infos is None: + return [] + if not isinstance(infos, (list, tuple)): + infos = [infos] + tags = [] + for info in infos: + if hasattr(info, "is_memory_replica") and info.is_memory_replica(): + tags.append("MEMORY") + elif hasattr(info, "is_local_disk_replica") and info.is_local_disk_replica(): + tags.append("LOCAL_DISK") + elif hasattr(info, "is_disk_replica") and info.is_disk_replica(): + tags.append("DISK") + else: + tags.append("UNKNOWN") + return tags + + +def _find_cold_key(store, keys): + """Find a key that exists only on LOCAL_DISK (no MEMORY replica).""" + descs = store.batch_get_replica_desc(list(keys)) + for key in keys: + types = _replica_types(descs, key) + if types and all("MEMORY" not in t for t in types) and any("LOCAL_DISK" in t for t in types): + return key + return None + + +def _find_cold_keys(store, keys): + """Return all LOCAL_DISK-only keys from *keys*.""" + descs = store.batch_get_replica_desc(list(keys)) + cold_keys = [] + for key in keys: + types = _replica_types(descs, key) + if types and all("MEMORY" not in t for t in types) and any("LOCAL_DISK" in t for t in types): + cold_keys.append(key) + return cold_keys + + +def _count_memory_replicas(store, keys): + descs = store.batch_get_replica_desc(list(keys)) + return sum(1 for key in keys if "MEMORY" in _replica_types(descs, key)) + + +def _wait_for_memory_replicas( + store, + keys, + min_promoted=1, + timeout_seconds=PREFETCH_WAIT_SECONDS, +): + """Poll replica descriptors until at least *min_promoted* keys have MEMORY.""" + deadline = time.time() + timeout_seconds + promoted = 0 + while time.time() < deadline: + promoted = _count_memory_replicas(store, keys) + if promoted >= min_promoted: + return promoted + time.sleep(PREFETCH_POLL_INTERVAL_SECONDS) + return promoted + + +def _overflow_dram(store, num_keys=96, value_size=1024 * 1024): + """Put enough data to overflow DRAM and trigger offload-on-evict. + Returns {key: value} for successfully stored keys.""" + timestamp = int(time.time()) + keys = [f"prefetch_{i}_{timestamp}" for i in range(num_keys)] + reference = {} + for key in keys: + value = os.urandom(value_size) + if store.put(key, value) == 0: + reference[key] = value + return reference + + +class TestPrefetchOnExist(unittest.TestCase): + """Validates that is_exist(prefetch=True) triggers SSD→DRAM promotion.""" + + @classmethod + def setUpClass(cls): + cls.store = MooncakeDistributedStore() + setup_store(cls.store) + + def test_single_key_prefetch(self): + """is_exist(key, prefetch=True) should promote a LOCAL_DISK-only key.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0, "No PUTs succeeded") + + try: + # Wait for eviction + offload to create LOCAL_DISK replicas + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + self.assertIsNotNone( + cold_key, + "No LOCAL_DISK-only key found. Is offload_on_evict=true " + "and the segment small enough to overflow?", + ) + print(f"Found LOCAL_DISK-only key: {cold_key}") + + # Trigger prefetch via is_exist + result = self.store.is_exist(cold_key, _prefetch_options()) + self.assertEqual(result, 1, "is_exist should return 1 for existing key") + + # Wait for async prefetch (batched metadata + thread pool) + promoted = _wait_for_memory_replicas(self.store, [cold_key], min_promoted=1) + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + print(f"Replica types after prefetch: {types} (promoted={promoted})") + self.assertIn( + "MEMORY", + types, + f"Expected a MEMORY replica for {cold_key} after prefetch, " + f"got types={types}. Prefetch may have failed or not triggered.", + ) + + # Verify the data is correct and served from MEMORY (not SSD) + offload_count_before = self.store.get_offload_rpc_read_count() + got = self.store.get(cold_key) + offload_count_after = self.store.get_offload_rpc_read_count() + self.assertEqual( + got, + reference[cold_key], + "Data mismatch after prefetch", + ) + self.assertEqual( + offload_count_after, + offload_count_before, + f"Post-prefetch read still hit LOCAL_DISK (offload RPC count " + f"went from {offload_count_before} to {offload_count_after}). " + f"MEMORY replica may not be preferred.", + ) + print("PASS: is_exist(prefetch=True) successfully promoted key to DRAM") + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_batch_prefetch(self): + """batch_is_exist(keys, prefetch=True) should promote LOCAL_DISK-only keys.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0, "No PUTs succeeded") + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + all_keys = list(reference.keys()) + cold_keys = _find_cold_keys(self.store, all_keys) + + self.assertGreater(len(cold_keys), 0, "No LOCAL_DISK-only keys found") + print(f"Found {len(cold_keys)} LOCAL_DISK-only keys for batch prefetch") + + # Trigger batch prefetch + results = self.store.batch_is_exist(cold_keys, _prefetch_options()) + self.assertEqual(len(results), len(cold_keys)) + for r in results: + self.assertEqual(r, 1, "batch_is_exist should return 1 for existing keys") + + promoted_count = _wait_for_memory_replicas( + self.store, cold_keys, min_promoted=1 + ) + print(f"Promoted {promoted_count}/{len(cold_keys)} keys to DRAM") + self.assertGreater( + promoted_count, + 0, + f"No keys were promoted after batch_is_exist(prefetch=True). " + f"Prefetch may not be triggering.", + ) + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_prefetch_false_does_not_promote(self): + """is_exist(key, prefetch=False) should NOT trigger promotion.""" + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0) + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + if cold_key is None: + self.skipTest("No LOCAL_DISK-only key found; cannot run negative test") + + # Call without prefetch + result = self.store.is_exist(cold_key, _prefetch_options(False)) + self.assertEqual(result, 1) + + # Short wait; if prefetch were triggered, it would complete quickly + time.sleep(5) + + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + self.assertNotIn( + "MEMORY", + types, + f"Key {cold_key} got a MEMORY replica after is_exist(prefetch=False). " + f"Prefetch should NOT fire when prefetch=False.", + ) + print("PASS: is_exist(prefetch=False) correctly did NOT trigger promotion") + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + def test_prefetch_memory_key_is_noop(self): + """Prefetch on a key already in MEMORY should be a no-op.""" + key = f"mem_only_{int(time.time())}" + value = os.urandom(1024) + try: + self.assertEqual(self.store.put(key, value), 0) + + # Key should be in MEMORY + result = self.store.is_exist(key, _prefetch_options()) + self.assertEqual(result, 1) + + # Verify it's still just MEMORY (no extra replicas) + descs = self.store.batch_get_replica_desc([key]) + types = _replica_types(descs, key) + self.assertIn("MEMORY", types) + print(f"PASS: prefetch on MEMORY key is no-op, types={types}") + finally: + self.store.remove(key) + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +@unittest.skipUnless( + os.getenv("MC_BENCH_PREFETCH_LATENCY"), + "opt-in benchmark — set MC_BENCH_PREFETCH_LATENCY=1 to run.", +) +class BenchPrefetchLatency(unittest.TestCase): + """Latency benchmark comparing reads before and after prefetch. + + Measures: + - Phase A: LOCAL_DISK read latency (before prefetch) + - Phase B: MEMORY read latency (after prefetch via is_exist) + - Speedup: phase_A / phase_B at p50/p95/p99 + """ + + @classmethod + def setUpClass(cls): + cls.store = MooncakeDistributedStore() + setup_store(cls.store) + + def test_latency_comparison(self): + reference = _overflow_dram(self.store) + self.assertGreater(len(reference), 0) + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + cold_key = _find_cold_key(self.store, reference.keys()) + self.assertIsNotNone(cold_key) + expected_bytes = reference[cold_key] + + N = int(os.getenv("LATENCY_SAMPLES", "200")) + + # Phase A: timed reads from LOCAL_DISK (before prefetch) + pre_latencies_ms = [] + for _ in range(N): + t0 = time.perf_counter() + got = self.store.get(cold_key) + t1 = time.perf_counter() + self.assertEqual(got, expected_bytes) + pre_latencies_ms.append((t1 - t0) * 1000.0) + + # Trigger prefetch + self.store.is_exist(cold_key, _prefetch_options()) + promoted = _wait_for_memory_replicas(self.store, [cold_key], min_promoted=1) + + # Verify promotion happened + descs = self.store.batch_get_replica_desc([cold_key]) + types = _replica_types(descs, cold_key) + self.assertIn( + "MEMORY", + types, + f"Prefetch did not create MEMORY replica (promoted={promoted})", + ) + + # Phase B: timed reads from MEMORY (after prefetch) + post_latencies_ms = [] + for _ in range(N): + t0 = time.perf_counter() + got = self.store.get(cold_key) + t1 = time.perf_counter() + self.assertEqual(got, expected_bytes) + post_latencies_ms.append((t1 - t0) * 1000.0) + + def _pcts(samples): + if len(samples) >= 2: + cuts = statistics.quantiles(samples, n=100) + return {50: cuts[49], 95: cuts[94], 99: cuts[98]} + v = samples[0] if samples else 0.0 + return {50: v, 95: v, 99: v} + + pre = _pcts(pre_latencies_ms) + post = _pcts(post_latencies_ms) + + print() + print(f"=== prefetch-on-exist latency comparison (key={cold_key}) ===") + print( + f"BEFORE prefetch (LOCAL_DISK): n={len(pre_latencies_ms)}, " + f"p50={pre[50]:.2f}ms, p95={pre[95]:.2f}ms, p99={pre[99]:.2f}ms, " + f"min={min(pre_latencies_ms):.2f}ms, max={max(pre_latencies_ms):.2f}ms" + ) + print( + f"AFTER prefetch (MEMORY): n={len(post_latencies_ms)}, " + f"p50={post[50]:.2f}ms, p95={post[95]:.2f}ms, p99={post[99]:.2f}ms, " + f"min={min(post_latencies_ms):.2f}ms, max={max(post_latencies_ms):.2f}ms" + ) + speedup_50 = pre[50] / post[50] if post[50] > 0 else float("inf") + speedup_95 = pre[95] / post[95] if post[95] > 0 else float("inf") + speedup_99 = pre[99] / post[99] if post[99] > 0 else float("inf") + print(f"speedup: p50={speedup_50:.1f}x | p95={speedup_95:.1f}x | p99={speedup_99:.1f}x") + + self.assertLess( + post[50], + pre[50] / 1.3, + f"Post-prefetch p50 ({post[50]:.2f}ms) not meaningfully lower than " + f"pre-prefetch p50 ({pre[50]:.2f}ms). MEMORY may not be preferred.", + ) + finally: + for key in reference: + try: + self.store.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +@unittest.skipUnless( + os.getenv("MC_TEST_CROSS_NODE"), + "opt-in cross-node test — set MC_TEST_CROSS_NODE=1 and provide two " + "endpoints via NODE_A_HOSTNAME / NODE_B_HOSTNAME (e.g. ip:port).", +) +class TestCrossNodePrefetch(unittest.TestCase): + """Validates方案B: cross-node SSD prefetch delegation to the holder node. + + Scenario (two store clients sharing one Master): + 1. Node A puts keys and overflows DRAM so they are evicted to A's SSD. + A is therefore the LOCAL_DISK holder (replica transport_endpoint == A). + 2. Node B calls batch_is_exist(keys, prefetch_to_memory=True). Because the + holder endpoint != B's own endpoint, B delegates the promotion to A via + the prefetch_offload_object RPC instead of calling RegisterPrefetchTask + locally (which would fail Master's holder check with INVALID_PARAMS). + 3. A registers the promotion task with its own client_id and stages the + object SSD->DRAM. The key gains a MEMORY replica. + + Requires a running master with --enable_offload=true --offload_on_evict=true + and two reachable endpoints. Both stores run in this process but bind + distinct offload RPC servers / segments. + """ + + @classmethod + def setUpClass(cls): + node_a = os.getenv("NODE_A_HOSTNAME", "127.0.0.1:17001") + node_b = os.getenv("NODE_B_HOSTNAME", "127.0.0.1:17002") + cls.store_a = MooncakeDistributedStore() + cls.store_b = MooncakeDistributedStore() + setup_store(cls.store_a, local_hostname=node_a) + setup_store(cls.store_b, local_hostname=node_b) + + def test_remote_holder_prefetch(self): + # Node A populates and overflows so keys land as LOCAL_DISK-only on A. + reference = _overflow_dram(self.store_a) + self.assertGreater(len(reference), 0, "No PUTs succeeded on node A") + + try: + time.sleep(EVICTION_WAIT_SECONDS) + + all_keys = list(reference.keys()) + cold_keys = _find_cold_keys(self.store_a, all_keys) + self.assertGreater( + len(cold_keys), 0, "No LOCAL_DISK-only keys produced on node A" + ) + print(f"Node A holds {len(cold_keys)} LOCAL_DISK-only key(s)") + + # Node B triggers prefetch. B must delegate to holder A via RPC. + results = self.store_b.batch_is_exist(cold_keys, _prefetch_options()) + self.assertEqual(len(results), len(cold_keys)) + for r in results: + self.assertEqual(r, 1, "batch_is_exist should return 1") + + promoted = _wait_for_memory_replicas( + self.store_b, cold_keys, min_promoted=1 + ) + print(f"Cross-node promoted {promoted}/{len(cold_keys)} key(s) to DRAM") + self.assertGreater( + promoted, + 0, + "No keys were promoted via cross-node delegation. Check that the " + "holder received prefetch_offload_object and no INVALID_PARAMS " + "(RegisterPrefetchTask holder check) was logged.", + ) + finally: + for key in reference: + try: + self.store_a.remove(key) + except Exception: + pass + time.sleep(default_kv_lease_ttl / 1000 + 0.5) + + +if __name__ == "__main__": + unittest.main()