From 4ff36813a2f211cc0a2273e5ef19a8b7b0ee5236 Mon Sep 17 00:00:00 2001 From: mikethegoblin Date: Mon, 22 Jun 2026 17:16:31 +0800 Subject: [PATCH 1/2] implement direct submit --- .../flagcx_transport/flagcx_transport.h | 4 ++ .../flagcx_transport/flagcx_transport.cpp | 58 ++++++++++++++----- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h index 2e847aa397..1a1e2b4525 100644 --- a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h +++ b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h @@ -94,6 +94,8 @@ class FlagCxTransport : public Transport { // Issue a group of same-target, same-opcode slices as one multi-iov // (multi-rail) transfer and mark each slice's completion. void runSliceGroup(const std::vector &group); + void enqueueSlices(const std::vector &slices); + void submitSlicesDirect(const std::vector &slices); // Find the engine MR handle whose registered region fully contains // [addr, addr+length). Returns false if the source is unregistered. @@ -120,6 +122,8 @@ class FlagCxTransport : public Transport { // submitTransfer/Task pushes Slice* into io_queue_ and returns // immediately. void ioWorker(); + bool direct_submit_ = false; + std::mutex submit_mu_; std::thread io_thread_; std::atomic running_{false}; std::mutex queue_mu_; diff --git a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp index b858b821c7..ca1a9297a4 100644 --- a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp +++ b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp @@ -58,6 +58,11 @@ int FlagCxTransport::install(std::string &local_server_name, (void)topo; local_server_name_ = local_server_name; metadata_ = meta; + const char *direct_submit_env = std::getenv("MC_FLAGCX_DIRECT_SUBMIT"); + direct_submit_ = + direct_submit_env && std::string(direct_submit_env) != "0" && + std::string(direct_submit_env) != "false" && + std::string(direct_submit_env) != "FALSE"; engine_ = flagcxP2pEngineCreate(); if (!engine_) { @@ -95,9 +100,14 @@ int FlagCxTransport::install(std::string &local_server_name, if (allocateLocalSegment() != 0) return -1; - running_.store(true); - io_thread_ = std::thread(&FlagCxTransport::ioWorker, this); - LOG(INFO) << "FlagCxTransport: install OK (io_thread spawned)"; + if (direct_submit_) { + LOG(INFO) << "FlagCxTransport: install OK (direct submit enabled; " + "io_thread not spawned)"; + } else { + running_.store(true); + io_thread_ = std::thread(&FlagCxTransport::ioWorker, this); + LOG(INFO) << "FlagCxTransport: install OK (io_thread spawned)"; + } return 0; } @@ -177,6 +187,30 @@ void FlagCxTransport::runSliceGroup(const std::vector &group) { } } +void FlagCxTransport::enqueueSlices(const std::vector &slices) { + { + std::lock_guard lk(queue_mu_); + for (auto *s : slices) io_queue_.push_back(s); + } + queue_cv_.notify_all(); +} + +void FlagCxTransport::submitSlicesDirect(const std::vector &slices) { + // Keep direct-submit FlagCX engine calls serialized for this experiment. + // The existing implementation routes all data-path calls through one + // ioWorker thread, so this preserves that single-caller assumption while + // removing the worker queue and wakeup from the submission path. + std::lock_guard lk(submit_mu_); + std::unordered_map> groups; + for (Slice *s : slices) { + const uint64_t key = + (static_cast(s->target_id) << 1) | + (s->opcode == TransferRequest::WRITE ? 0u : 1u); + groups[key].push_back(s); + } + for (auto &kv : groups) runSliceGroup(kv.second); +} + void FlagCxTransport::ioWorker() { while (true) { std::vector batch; @@ -402,11 +436,10 @@ Status FlagCxTransport::submitTransfer( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - { - std::lock_guard lk(queue_mu_); - for (auto *s : to_post) io_queue_.push_back(s); - } - queue_cv_.notify_all(); + if (direct_submit_) + submitSlicesDirect(to_post); + else + enqueueSlices(to_post); return Status::OK(); } @@ -432,11 +465,10 @@ Status FlagCxTransport::submitTransferTask( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - { - std::lock_guard lk(queue_mu_); - for (auto *s : to_post) io_queue_.push_back(s); - } - queue_cv_.notify_all(); + if (direct_submit_) + submitSlicesDirect(to_post); + else + enqueueSlices(to_post); return Status::OK(); } From d3896dbee800ae4f12d9aef4035af4cf9287150e Mon Sep 17 00:00:00 2001 From: mikethegoblin Date: Tue, 23 Jun 2026 14:20:49 +0800 Subject: [PATCH 2/2] move transfer status polling to getTransferStatus --- .../flagcx_transport/flagcx_transport.h | 41 ++-- .../flagcx_transport/flagcx_transport.cpp | 208 ++++++------------ 2 files changed, 81 insertions(+), 168 deletions(-) diff --git a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h index 1a1e2b4525..7c26a085da 100644 --- a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h +++ b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h @@ -9,15 +9,12 @@ #ifndef FLAGCX_TRANSPORT_H_ #define FLAGCX_TRANSPORT_H_ -#include -#include +#include #include #include -#include #include #include #include -#include #include #include @@ -41,12 +38,11 @@ namespace mooncake { // - The data path resolves a cached connection per target segment // (flagcxP2pEngineGetConn), looks up the remote rkey by absolute // virtual address (flagcxP2pEngineMakeDesc -- the FlagCX equivalent -// of Mooncake's "rkey by dst_ptr"), then issues a one-sided -// WriteVectorSync (WRITE) or ReadVector + XferStatus poll (READ). +// of Mooncake's "rkey by dst_ptr"), then issues a non-blocking one-sided +// WriteVector (WRITE) or ReadVector (READ). // -// A single I/O worker thread drains the slice queue so submitTransfer / -// submitTransferTask stay non-blocking; completion is reported through -// the usual TransferTask slice counters. +// getTransferStatus() polls submitted FlagCX transfer IDs and reports +// completion through the usual TransferTask slice counters. // // Required runtime env: the FlagCX engine honours the standard FlagCX // knobs (FLAGCX_SOCKET_IFNAME selects the NIC the RPC endpoint binds / @@ -90,12 +86,11 @@ class FlagCxTransport : public Transport { const char *getName() const override { return "flagcx"; } int allocateLocalSegment(); - int doSlice(Slice *slice); // Issue a group of same-target, same-opcode slices as one multi-iov - // (multi-rail) transfer and mark each slice's completion. + // (multi-rail) transfer and record its transfer id for status checks. void runSliceGroup(const std::vector &group); - void enqueueSlices(const std::vector &slices); - void submitSlicesDirect(const std::vector &slices); + void submitSlices(const std::vector &slices); + void pollPendingTransfers(); // Find the engine MR handle whose registered region fully contains // [addr, addr+length). Returns false if the source is unregistered. @@ -118,17 +113,17 @@ class FlagCxTransport : public Transport { std::mutex reg_mu_; std::vector regs_; - // Single-threaded I/O worker: all FlagCX ops happen in io_thread_. - // submitTransfer/Task pushes Slice* into io_queue_ and returns - // immediately. - void ioWorker(); - bool direct_submit_ = false; + struct PendingTransfer { + FlagcxP2pConn *conn; + uint64_t transfer_id; + std::vector slices; + std::chrono::steady_clock::time_point deadline; + }; + + std::mutex flagcx_mu_; std::mutex submit_mu_; - std::thread io_thread_; - std::atomic running_{false}; - std::mutex queue_mu_; - std::condition_variable queue_cv_; - std::deque io_queue_; + std::mutex pending_mu_; + std::vector pending_; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp index ca1a9297a4..8f95654656 100644 --- a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp +++ b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp @@ -15,8 +15,8 @@ #include #include #include -#include #include +#include #include #include "common.h" @@ -28,15 +28,13 @@ namespace mooncake { FlagCxTransport::FlagCxTransport() {} FlagCxTransport::~FlagCxTransport() { - // Stop and join the worker first; any in-flight slices fail. { - std::lock_guard lk(queue_mu_); - running_.store(false); + std::lock_guard lk(pending_mu_); + for (auto &p : pending_) { + for (auto *s : p.slices) s->markFailed(); + } + pending_.clear(); } - queue_cv_.notify_all(); - if (io_thread_.joinable()) io_thread_.join(); - for (auto *s : io_queue_) s->markFailed(); - io_queue_.clear(); if (engine_) { // Deregister everything still registered, then tear the engine down. @@ -58,11 +56,6 @@ int FlagCxTransport::install(std::string &local_server_name, (void)topo; local_server_name_ = local_server_name; metadata_ = meta; - const char *direct_submit_env = std::getenv("MC_FLAGCX_DIRECT_SUBMIT"); - direct_submit_ = - direct_submit_env && std::string(direct_submit_env) != "0" && - std::string(direct_submit_env) != "false" && - std::string(direct_submit_env) != "FALSE"; engine_ = flagcxP2pEngineCreate(); if (!engine_) { @@ -100,25 +93,22 @@ int FlagCxTransport::install(std::string &local_server_name, if (allocateLocalSegment() != 0) return -1; - if (direct_submit_) { - LOG(INFO) << "FlagCxTransport: install OK (direct submit enabled; " - "io_thread not spawned)"; - } else { - running_.store(true); - io_thread_ = std::thread(&FlagCxTransport::ioWorker, this); - LOG(INFO) << "FlagCxTransport: install OK (io_thread spawned)"; - } + LOG(INFO) << "FlagCxTransport: install OK (direct submit)"; return 0; } // Issue one batch of slices that all target the same segment and share an // opcode as a SINGLE multi-iov transfer. The engine fans the iovs out across // the conn's rails (NICs), so a deep batch keeps every NIC busy instead of one -// block-at-a-time. Marks every slice success/fail. +// block-at-a-time. Completion is marked by getTransferStatus(). void FlagCxTransport::runSliceGroup(const std::vector &group) { if (group.empty()) return; bool ok = true; - FlagcxP2pConn *conn = connForSegment(group.front()->target_id); + FlagcxP2pConn *conn = nullptr; + { + std::lock_guard lk(flagcx_mu_); + conn = connForSegment(group.front()->target_id); + } const bool isWrite = group.front()->opcode == TransferRequest::WRITE; std::vector mrs; @@ -156,50 +146,36 @@ void FlagCxTransport::runSliceGroup(const std::vector &group) { } if (ok) { + uint64_t transfer_id = 0; if (isWrite) { - ok = flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) == - 0; + std::lock_guard lk(flagcx_mu_); + ok = flagcxP2pEngineWriteVector(conn, mrs, bufs, sizes, descs, + static_cast(group.size()), + &transfer_id) == 0; } else { - uint64_t transfer_id = 0; + std::lock_guard lk(flagcx_mu_); ok = flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs, static_cast(group.size()), &transfer_id) == 0; - if (ok) { - auto deadline = std::chrono::steady_clock::now() + - std::chrono::seconds(30); - while (!flagcxP2pEngineXferStatus(conn, transfer_id)) { - if (std::chrono::steady_clock::now() > deadline) { - LOG(ERROR) << "FlagCxTransport: ReadVector batch timeout"; - ok = false; - break; - } - std::this_thread::yield(); - } + } + if (ok) { + const auto now = std::chrono::steady_clock::now(); + for (Slice *s : group) { + s->status = Slice::POSTED; + s->ts = getCurrentTimeInNano(); } + std::lock_guard lk(pending_mu_); + pending_.push_back( + {conn, transfer_id, group, now + std::chrono::seconds(30)}); } } - for (Slice *s : group) { - if (ok) - s->markSuccess(); - else - s->markFailed(); + if (!ok) { + for (Slice *s : group) s->markFailed(); } } -void FlagCxTransport::enqueueSlices(const std::vector &slices) { - { - std::lock_guard lk(queue_mu_); - for (auto *s : slices) io_queue_.push_back(s); - } - queue_cv_.notify_all(); -} - -void FlagCxTransport::submitSlicesDirect(const std::vector &slices) { - // Keep direct-submit FlagCX engine calls serialized for this experiment. - // The existing implementation routes all data-path calls through one - // ioWorker thread, so this preserves that single-caller assumption while - // removing the worker queue and wakeup from the submission path. +void FlagCxTransport::submitSlices(const std::vector &slices) { std::lock_guard lk(submit_mu_); std::unordered_map> groups; for (Slice *s : slices) { @@ -211,30 +187,37 @@ void FlagCxTransport::submitSlicesDirect(const std::vector &slices) { for (auto &kv : groups) runSliceGroup(kv.second); } -void FlagCxTransport::ioWorker() { - while (true) { - std::vector batch; +void FlagCxTransport::pollPendingTransfers() { + std::vector local; + { + std::lock_guard lk(pending_mu_); + local.swap(pending_); + } + + if (local.empty()) return; + + std::vector still_pending; + const auto now = std::chrono::steady_clock::now(); + for (auto &p : local) { + bool done = false; { - std::unique_lock lk(queue_mu_); - queue_cv_.wait(lk, [this] { - return !io_queue_.empty() || !running_.load(); - }); - if (io_queue_.empty()) return; // running_ false and queue drained - // Drain everything currently queued so it can be coalesced into - // one multi-iov, multi-rail transfer per target. - batch.assign(io_queue_.begin(), io_queue_.end()); - io_queue_.clear(); + std::lock_guard lk(flagcx_mu_); + done = flagcxP2pEngineXferStatus(p.conn, p.transfer_id); } - - // Group by (target_id, opcode); each group becomes one batched xfer. - std::unordered_map> groups; - for (Slice *s : batch) { - const uint64_t key = - (static_cast(s->target_id) << 1) | - (s->opcode == TransferRequest::WRITE ? 0u : 1u); - groups[key].push_back(s); + if (done) { + for (auto *s : p.slices) s->markSuccess(); + } else if (now > p.deadline) { + LOG(ERROR) << "FlagCxTransport: transfer timeout, tid=" + << p.transfer_id; + for (auto *s : p.slices) s->markFailed(); + } else { + still_pending.push_back(std::move(p)); } - for (auto &kv : groups) runSliceGroup(kv.second); + } + + if (!still_pending.empty()) { + std::lock_guard lk(pending_mu_); + for (auto &p : still_pending) pending_.push_back(std::move(p)); } } @@ -348,67 +331,6 @@ FlagcxP2pConn *FlagCxTransport::connForSegment(SegmentID target_id) { return conn; } -int FlagCxTransport::doSlice(Slice *slice) { - // Runs only on the ioWorker thread; no extra locking required here. - FlagcxP2pMr local_mr = 0; - if (!resolveLocalMr(slice->source_addr, slice->length, local_mr)) { - LOG(ERROR) << "FlagCxTransport: source not registered " - << slice->source_addr; - return -1; - } - - FlagcxP2pConn *conn = connForSegment(slice->target_id); - if (!conn) return -1; - - // Resolve the remote rkey by absolute virtual address using the region - // table exchanged at handshake -- the FlagCX analogue of Mooncake's - // "look up rkey by dst_ptr". dest_offset already carries the remote - // absolute VA (request.target_offset). - FlagcxP2pRdmaDesc desc; - if (flagcxP2pEngineMakeDesc(conn, slice->flagcx.dest_offset, - static_cast(slice->length), - &desc) != 0) { - LOG(ERROR) << "FlagCxTransport: MakeDesc failed for remote VA 0x" - << std::hex << slice->flagcx.dest_offset << std::dec - << " len=" << slice->length; - return -1; - } - - std::vector mrs{local_mr}; - std::vector bufs{slice->source_addr}; - std::vector sizes{slice->length}; - std::vector descs{desc}; - - if (slice->opcode == TransferRequest::WRITE) { - // Blocking one-sided write; on return the data has landed in the - // peer's registered buffer (no separate signal/counter needed). - if (flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) != - 0) { - LOG(ERROR) << "FlagCxTransport: WriteVectorSync failed"; - return -1; - } - } else { - uint64_t transfer_id = 0; - if (flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs, - /*numIovs=*/1, &transfer_id) != 0) { - LOG(ERROR) << "FlagCxTransport: ReadVector failed"; - return -1; - } - // Poll to completion with a generous deadline. - auto deadline = - std::chrono::steady_clock::now() + std::chrono::seconds(30); - while (!flagcxP2pEngineXferStatus(conn, transfer_id)) { - if (std::chrono::steady_clock::now() > deadline) { - LOG(ERROR) << "FlagCxTransport: ReadVector timed out, tid=" - << transfer_id; - return -1; - } - std::this_thread::yield(); - } - } - return 0; -} - Status FlagCxTransport::submitTransfer( BatchID batch_id, const std::vector &entries) { auto &batch_desc = *reinterpret_cast(batch_id); @@ -436,10 +358,7 @@ Status FlagCxTransport::submitTransfer( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - if (direct_submit_) - submitSlicesDirect(to_post); - else - enqueueSlices(to_post); + submitSlices(to_post); return Status::OK(); } @@ -465,15 +384,14 @@ Status FlagCxTransport::submitTransferTask( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - if (direct_submit_) - submitSlicesDirect(to_post); - else - enqueueSlices(to_post); + submitSlices(to_post); return Status::OK(); } Status FlagCxTransport::getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status) { + pollPendingTransfers(); + auto &batch_desc = *reinterpret_cast(batch_id); if (task_id >= batch_desc.task_list.size()) { return Status::InvalidArgument(