diff --git a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h index 2e847aa397..f2b44789d7 100644 --- a/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h +++ b/mooncake-transfer-engine/include/transport/flagcx_transport/flagcx_transport.h @@ -9,11 +9,11 @@ #ifndef FLAGCX_TRANSPORT_H_ #define FLAGCX_TRANSPORT_H_ -#include -#include +#include #include #include -#include +#include +#include #include #include #include @@ -41,12 +41,11 @@ namespace mooncake { // - The data path resolves a cached connection per target segment // (flagcxP2pEngineGetConn), looks up the remote rkey by absolute // virtual address (flagcxP2pEngineMakeDesc -- the FlagCX equivalent -// of Mooncake's "rkey by dst_ptr"), then issues a one-sided -// WriteVectorSync (WRITE) or ReadVector + XferStatus poll (READ). +// of Mooncake's "rkey by dst_ptr"), then issues a non-blocking one-sided +// WriteVector (WRITE) or ReadVector (READ). // -// A single I/O worker thread drains the slice queue so submitTransfer / -// submitTransferTask stay non-blocking; completion is reported through -// the usual TransferTask slice counters. +// A lightweight completion thread polls submitted FlagCX transfer IDs and +// reports completion through the usual TransferTask slice counters. // // Required runtime env: the FlagCX engine honours the standard FlagCX // knobs (FLAGCX_SOCKET_IFNAME selects the NIC the RPC endpoint binds / @@ -90,10 +89,12 @@ class FlagCxTransport : public Transport { const char *getName() const override { return "flagcx"; } int allocateLocalSegment(); - int doSlice(Slice *slice); // Issue a group of same-target, same-opcode slices as one multi-iov - // (multi-rail) transfer and mark each slice's completion. + // (multi-rail) transfer and record its transfer id for status checks. void runSliceGroup(const std::vector &group); + void submitSlices(const std::vector &slices); + bool pollPendingTransfers(); + void completionLoop(); // Find the engine MR handle whose registered region fully contains // [addr, addr+length). Returns false if the source is unregistered. @@ -116,15 +117,20 @@ class FlagCxTransport : public Transport { std::mutex reg_mu_; std::vector regs_; - // Single-threaded I/O worker: all FlagCX ops happen in io_thread_. - // submitTransfer/Task pushes Slice* into io_queue_ and returns - // immediately. - void ioWorker(); - std::thread io_thread_; - std::atomic running_{false}; - std::mutex queue_mu_; - std::condition_variable queue_cv_; - std::deque io_queue_; + struct PendingTransfer { + FlagcxP2pConn *conn; + uint64_t transfer_id; + std::vector slices; + std::chrono::steady_clock::time_point deadline; + }; + + std::mutex flagcx_mu_; + std::mutex submit_mu_; + std::mutex pending_mu_; + std::condition_variable pending_cv_; + std::vector pending_; + std::atomic completion_running_{false}; + std::thread completion_thread_; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp index b858b821c7..d18bd0de31 100644 --- a/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp +++ b/mooncake-transfer-engine/src/transport/flagcx_transport/flagcx_transport.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "common.h" @@ -28,15 +29,17 @@ namespace mooncake { FlagCxTransport::FlagCxTransport() {} FlagCxTransport::~FlagCxTransport() { - // Stop and join the worker first; any in-flight slices fail. + completion_running_.store(false, std::memory_order_release); + pending_cv_.notify_all(); + if (completion_thread_.joinable()) completion_thread_.join(); + { - std::lock_guard lk(queue_mu_); - running_.store(false); + std::lock_guard lk(pending_mu_); + for (auto &p : pending_) { + for (auto *s : p.slices) s->markFailed(); + } + pending_.clear(); } - queue_cv_.notify_all(); - if (io_thread_.joinable()) io_thread_.join(); - for (auto *s : io_queue_) s->markFailed(); - io_queue_.clear(); if (engine_) { // Deregister everything still registered, then tear the engine down. @@ -95,20 +98,25 @@ int FlagCxTransport::install(std::string &local_server_name, if (allocateLocalSegment() != 0) return -1; - running_.store(true); - io_thread_ = std::thread(&FlagCxTransport::ioWorker, this); - LOG(INFO) << "FlagCxTransport: install OK (io_thread spawned)"; + completion_running_.store(true, std::memory_order_release); + completion_thread_ = std::thread(&FlagCxTransport::completionLoop, this); + + LOG(INFO) << "FlagCxTransport: install OK (direct submit)"; return 0; } // Issue one batch of slices that all target the same segment and share an // opcode as a SINGLE multi-iov transfer. The engine fans the iovs out across // the conn's rails (NICs), so a deep batch keeps every NIC busy instead of one -// block-at-a-time. Marks every slice success/fail. +// block-at-a-time. Completion is marked by the completion polling thread. void FlagCxTransport::runSliceGroup(const std::vector &group) { if (group.empty()) return; bool ok = true; - FlagcxP2pConn *conn = connForSegment(group.front()->target_id); + FlagcxP2pConn *conn = nullptr; + { + std::lock_guard lk(flagcx_mu_); + conn = connForSegment(group.front()->target_id); + } const bool isWrite = group.front()->opcode == TransferRequest::WRITE; std::vector mrs; @@ -146,61 +154,103 @@ void FlagCxTransport::runSliceGroup(const std::vector &group) { } if (ok) { + uint64_t transfer_id = 0; if (isWrite) { - ok = flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) == - 0; + std::lock_guard lk(flagcx_mu_); + ok = flagcxP2pEngineWriteVector(conn, mrs, bufs, sizes, descs, + static_cast(group.size()), + &transfer_id) == 0; } else { - uint64_t transfer_id = 0; + std::lock_guard lk(flagcx_mu_); ok = flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs, static_cast(group.size()), &transfer_id) == 0; - if (ok) { - auto deadline = std::chrono::steady_clock::now() + - std::chrono::seconds(30); - while (!flagcxP2pEngineXferStatus(conn, transfer_id)) { - if (std::chrono::steady_clock::now() > deadline) { - LOG(ERROR) << "FlagCxTransport: ReadVector batch timeout"; - ok = false; - break; - } - std::this_thread::yield(); - } + } + if (ok) { + const auto now = std::chrono::steady_clock::now(); + for (Slice *s : group) { + s->status = Slice::POSTED; + s->ts = getCurrentTimeInNano(); + } + { + std::lock_guard lk(pending_mu_); + pending_.push_back( + {conn, transfer_id, group, now + std::chrono::seconds(30)}); } + pending_cv_.notify_one(); } } - for (Slice *s : group) { - if (ok) - s->markSuccess(); - else - s->markFailed(); + if (!ok) { + for (Slice *s : group) s->markFailed(); + } +} + +void FlagCxTransport::submitSlices(const std::vector &slices) { + std::lock_guard lk(submit_mu_); + std::unordered_map> groups; + for (Slice *s : slices) { + const uint64_t key = + (static_cast(s->target_id) << 1) | + (s->opcode == TransferRequest::WRITE ? 0u : 1u); + groups[key].push_back(s); } + for (auto &kv : groups) runSliceGroup(kv.second); } -void FlagCxTransport::ioWorker() { - while (true) { - std::vector batch; +bool FlagCxTransport::pollPendingTransfers() { + std::vector local; + { + std::lock_guard lk(pending_mu_); + local.swap(pending_); + } + + if (local.empty()) return false; + + std::vector still_pending; + const auto now = std::chrono::steady_clock::now(); + for (auto &p : local) { + bool done = false; { - std::unique_lock lk(queue_mu_); - queue_cv_.wait(lk, [this] { - return !io_queue_.empty() || !running_.load(); - }); - if (io_queue_.empty()) return; // running_ false and queue drained - // Drain everything currently queued so it can be coalesced into - // one multi-iov, multi-rail transfer per target. - batch.assign(io_queue_.begin(), io_queue_.end()); - io_queue_.clear(); + std::lock_guard lk(flagcx_mu_); + done = flagcxP2pEngineXferStatus(p.conn, p.transfer_id); + } + if (done) { + for (auto *s : p.slices) s->markSuccess(); + } else if (now > p.deadline) { + LOG(ERROR) << "FlagCxTransport: transfer timeout, tid=" + << p.transfer_id; + for (auto *s : p.slices) s->markFailed(); + } else { + still_pending.push_back(std::move(p)); } + } + + if (!still_pending.empty()) { + std::lock_guard lk(pending_mu_); + for (auto &p : still_pending) pending_.push_back(std::move(p)); + return true; + } + return false; +} - // Group by (target_id, opcode); each group becomes one batched xfer. - std::unordered_map> groups; - for (Slice *s : batch) { - const uint64_t key = - (static_cast(s->target_id) << 1) | - (s->opcode == TransferRequest::WRITE ? 0u : 1u); - groups[key].push_back(s); +void FlagCxTransport::completionLoop() { + unsigned active_polls = 0; + while (completion_running_.load(std::memory_order_acquire)) { + if (pollPendingTransfers()) { + if (++active_polls >= 64) { + active_polls = 0; + std::this_thread::yield(); + } + continue; } - for (auto &kv : groups) runSliceGroup(kv.second); + + active_polls = 0; + std::unique_lock lk(pending_mu_); + pending_cv_.wait(lk, [this] { + return !completion_running_.load(std::memory_order_acquire) || + !pending_.empty(); + }); } } @@ -314,67 +364,6 @@ FlagcxP2pConn *FlagCxTransport::connForSegment(SegmentID target_id) { return conn; } -int FlagCxTransport::doSlice(Slice *slice) { - // Runs only on the ioWorker thread; no extra locking required here. - FlagcxP2pMr local_mr = 0; - if (!resolveLocalMr(slice->source_addr, slice->length, local_mr)) { - LOG(ERROR) << "FlagCxTransport: source not registered " - << slice->source_addr; - return -1; - } - - FlagcxP2pConn *conn = connForSegment(slice->target_id); - if (!conn) return -1; - - // Resolve the remote rkey by absolute virtual address using the region - // table exchanged at handshake -- the FlagCX analogue of Mooncake's - // "look up rkey by dst_ptr". dest_offset already carries the remote - // absolute VA (request.target_offset). - FlagcxP2pRdmaDesc desc; - if (flagcxP2pEngineMakeDesc(conn, slice->flagcx.dest_offset, - static_cast(slice->length), - &desc) != 0) { - LOG(ERROR) << "FlagCxTransport: MakeDesc failed for remote VA 0x" - << std::hex << slice->flagcx.dest_offset << std::dec - << " len=" << slice->length; - return -1; - } - - std::vector mrs{local_mr}; - std::vector bufs{slice->source_addr}; - std::vector sizes{slice->length}; - std::vector descs{desc}; - - if (slice->opcode == TransferRequest::WRITE) { - // Blocking one-sided write; on return the data has landed in the - // peer's registered buffer (no separate signal/counter needed). - if (flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) != - 0) { - LOG(ERROR) << "FlagCxTransport: WriteVectorSync failed"; - return -1; - } - } else { - uint64_t transfer_id = 0; - if (flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs, - /*numIovs=*/1, &transfer_id) != 0) { - LOG(ERROR) << "FlagCxTransport: ReadVector failed"; - return -1; - } - // Poll to completion with a generous deadline. - auto deadline = - std::chrono::steady_clock::now() + std::chrono::seconds(30); - while (!flagcxP2pEngineXferStatus(conn, transfer_id)) { - if (std::chrono::steady_clock::now() > deadline) { - LOG(ERROR) << "FlagCxTransport: ReadVector timed out, tid=" - << transfer_id; - return -1; - } - std::this_thread::yield(); - } - } - return 0; -} - Status FlagCxTransport::submitTransfer( BatchID batch_id, const std::vector &entries) { auto &batch_desc = *reinterpret_cast(batch_id); @@ -402,11 +391,7 @@ Status FlagCxTransport::submitTransfer( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - { - std::lock_guard lk(queue_mu_); - for (auto *s : to_post) io_queue_.push_back(s); - } - queue_cv_.notify_all(); + submitSlices(to_post); return Status::OK(); } @@ -432,11 +417,7 @@ Status FlagCxTransport::submitTransferTask( __sync_fetch_and_add(&task.slice_count, 1); to_post.push_back(slice); } - { - std::lock_guard lk(queue_mu_); - for (auto *s : to_post) io_queue_.push_back(s); - } - queue_cv_.notify_all(); + submitSlices(to_post); return Status::OK(); }