Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@
#ifndef FLAGCX_TRANSPORT_H_
#define FLAGCX_TRANSPORT_H_

#include <atomic>
#include <condition_variable>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include <flagcx_p2p.h>
Expand All @@ -41,12 +38,11 @@ namespace mooncake {
// - The data path resolves a cached connection per target segment
// (flagcxP2pEngineGetConn), looks up the remote rkey by absolute
// virtual address (flagcxP2pEngineMakeDesc -- the FlagCX equivalent
// of Mooncake's "rkey by dst_ptr"), then issues a one-sided
// WriteVectorSync (WRITE) or ReadVector + XferStatus poll (READ).
// of Mooncake's "rkey by dst_ptr"), then issues a non-blocking one-sided
// WriteVector (WRITE) or ReadVector (READ).
//
// A single I/O worker thread drains the slice queue so submitTransfer /
// submitTransferTask stay non-blocking; completion is reported through
// the usual TransferTask slice counters.
// getTransferStatus() polls submitted FlagCX transfer IDs and reports
// completion through the usual TransferTask slice counters.
//
// Required runtime env: the FlagCX engine honours the standard FlagCX
// knobs (FLAGCX_SOCKET_IFNAME selects the NIC the RPC endpoint binds /
Expand Down Expand Up @@ -90,10 +86,11 @@ class FlagCxTransport : public Transport {
const char *getName() const override { return "flagcx"; }

int allocateLocalSegment();
int doSlice(Slice *slice);
// Issue a group of same-target, same-opcode slices as one multi-iov
// (multi-rail) transfer and mark each slice's completion.
// (multi-rail) transfer and record its transfer id for status checks.
void runSliceGroup(const std::vector<Slice *> &group);
void submitSlices(const std::vector<Slice *> &slices);
void pollPendingTransfers();

// Find the engine MR handle whose registered region fully contains
// [addr, addr+length). Returns false if the source is unregistered.
Expand All @@ -116,15 +113,17 @@ class FlagCxTransport : public Transport {
std::mutex reg_mu_;
std::vector<Reg> regs_;

// Single-threaded I/O worker: all FlagCX ops happen in io_thread_.
// submitTransfer/Task pushes Slice* into io_queue_ and returns
// immediately.
void ioWorker();
std::thread io_thread_;
std::atomic<bool> running_{false};
std::mutex queue_mu_;
std::condition_variable queue_cv_;
std::deque<Slice *> io_queue_;
struct PendingTransfer {
FlagcxP2pConn *conn;
uint64_t transfer_id;
std::vector<Slice *> slices;
std::chrono::steady_clock::time_point deadline;
};

std::mutex flagcx_mu_;
std::mutex submit_mu_;
std::mutex pending_mu_;
std::vector<PendingTransfer> pending_;
};

} // namespace mooncake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include <cstdint>
#include <cstdlib>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common.h"
Expand All @@ -28,15 +28,13 @@ namespace mooncake {
FlagCxTransport::FlagCxTransport() {}

FlagCxTransport::~FlagCxTransport() {
// Stop and join the worker first; any in-flight slices fail.
{
std::lock_guard<std::mutex> lk(queue_mu_);
running_.store(false);
std::lock_guard<std::mutex> lk(pending_mu_);
for (auto &p : pending_) {
for (auto *s : p.slices) s->markFailed();
}
pending_.clear();
}
queue_cv_.notify_all();
if (io_thread_.joinable()) io_thread_.join();
for (auto *s : io_queue_) s->markFailed();
io_queue_.clear();

if (engine_) {
// Deregister everything still registered, then tear the engine down.
Expand Down Expand Up @@ -95,20 +93,22 @@ int FlagCxTransport::install(std::string &local_server_name,

if (allocateLocalSegment() != 0) return -1;

running_.store(true);
io_thread_ = std::thread(&FlagCxTransport::ioWorker, this);
LOG(INFO) << "FlagCxTransport: install OK (io_thread spawned)";
LOG(INFO) << "FlagCxTransport: install OK (direct submit)";
return 0;
}

// Issue one batch of slices that all target the same segment and share an
// opcode as a SINGLE multi-iov transfer. The engine fans the iovs out across
// the conn's rails (NICs), so a deep batch keeps every NIC busy instead of one
// block-at-a-time. Marks every slice success/fail.
// block-at-a-time. Completion is marked by getTransferStatus().
void FlagCxTransport::runSliceGroup(const std::vector<Slice *> &group) {
if (group.empty()) return;
bool ok = true;
FlagcxP2pConn *conn = connForSegment(group.front()->target_id);
FlagcxP2pConn *conn = nullptr;
{
std::lock_guard<std::mutex> lk(flagcx_mu_);
conn = connForSegment(group.front()->target_id);
}
Comment on lines +107 to +111

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Holding flagcx_mu_ while calling connForSegment can lead to performance bottlenecks. connForSegment calls metadata_->getSegmentDescByID, which may perform blocking network I/O (e.g., querying etcd or performing handshakes) if the metadata cache is disabled or needs updating. Holding the mutex during this time blocks all other threads trying to submit transfers or poll status.

Consider refactoring connForSegment to only lock flagcx_mu_ around the flagcxP2pEngineGetConn call, and call connForSegment without holding the lock here.

const bool isWrite = group.front()->opcode == TransferRequest::WRITE;

std::vector<FlagcxP2pMr> mrs;
Expand Down Expand Up @@ -146,61 +146,78 @@ void FlagCxTransport::runSliceGroup(const std::vector<Slice *> &group) {
}

if (ok) {
uint64_t transfer_id = 0;
if (isWrite) {
ok = flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) ==
0;
std::lock_guard<std::mutex> lk(flagcx_mu_);
ok = flagcxP2pEngineWriteVector(conn, mrs, bufs, sizes, descs,
static_cast<int>(group.size()),
&transfer_id) == 0;
} else {
uint64_t transfer_id = 0;
std::lock_guard<std::mutex> lk(flagcx_mu_);
ok = flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs,
static_cast<int>(group.size()),
&transfer_id) == 0;
if (ok) {
auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(30);
while (!flagcxP2pEngineXferStatus(conn, transfer_id)) {
if (std::chrono::steady_clock::now() > deadline) {
LOG(ERROR) << "FlagCxTransport: ReadVector batch timeout";
ok = false;
break;
}
std::this_thread::yield();
}
}
if (ok) {
const auto now = std::chrono::steady_clock::now();
for (Slice *s : group) {
s->status = Slice::POSTED;
s->ts = getCurrentTimeInNano();
}
std::lock_guard<std::mutex> lk(pending_mu_);
pending_.push_back(
{conn, transfer_id, group, now + std::chrono::seconds(30)});
}
}

for (Slice *s : group) {
if (ok)
s->markSuccess();
else
s->markFailed();
if (!ok) {
for (Slice *s : group) s->markFailed();
}
}

void FlagCxTransport::ioWorker() {
while (true) {
std::vector<Slice *> batch;
void FlagCxTransport::submitSlices(const std::vector<Slice *> &slices) {
std::lock_guard<std::mutex> lk(submit_mu_);
std::unordered_map<uint64_t, std::vector<Slice *>> groups;
for (Slice *s : slices) {
const uint64_t key =
(static_cast<uint64_t>(s->target_id) << 1) |
(s->opcode == TransferRequest::WRITE ? 0u : 1u);
groups[key].push_back(s);
}
for (auto &kv : groups) runSliceGroup(kv.second);
}
Comment on lines +178 to +188

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In high-performance networking, avoiding dynamic memory allocations on the data path is crucial. Currently, submitSlices allocates a std::unordered_map on every call. For the very common case where all slices in the batch are homogeneous (same target_id and opcode) or when there is only a single slice, we can completely bypass the map allocation and call runSliceGroup directly.

void FlagCxTransport::submitSlices(const std::vector<Slice *> &slices) {
    if (slices.empty()) return;
    std::lock_guard<std::mutex> lk(submit_mu_);
    bool all_same = true;
    for (size_t i = 1; i < slices.size(); ++i) {
        if (slices[i]->target_id != slices[0]->target_id ||
            slices[i]->opcode != slices[0]->opcode) {
            all_same = false;
            break;
        }
    }
    if (all_same) {
        runSliceGroup(slices);
        return;
    }

    std::unordered_map<uint64_t, std::vector<Slice *>> groups;
    for (Slice *s : slices) {
        const uint64_t key =
            (static_cast<uint64_t>(s->target_id) << 1) |
            (s->opcode == TransferRequest::WRITE ? 0u : 1u);
        groups[key].push_back(s);
    }
    for (auto &kv : groups) runSliceGroup(kv.second);
}


void FlagCxTransport::pollPendingTransfers() {
std::vector<PendingTransfer> local;
{
std::lock_guard<std::mutex> lk(pending_mu_);
local.swap(pending_);
}

if (local.empty()) return;

std::vector<PendingTransfer> still_pending;
const auto now = std::chrono::steady_clock::now();
for (auto &p : local) {
bool done = false;
{
std::unique_lock<std::mutex> lk(queue_mu_);
queue_cv_.wait(lk, [this] {
return !io_queue_.empty() || !running_.load();
});
if (io_queue_.empty()) return; // running_ false and queue drained
// Drain everything currently queued so it can be coalesced into
// one multi-iov, multi-rail transfer per target.
batch.assign(io_queue_.begin(), io_queue_.end());
io_queue_.clear();
std::lock_guard<std::mutex> lk(flagcx_mu_);
done = flagcxP2pEngineXferStatus(p.conn, p.transfer_id);
}

// Group by (target_id, opcode); each group becomes one batched xfer.
std::unordered_map<uint64_t, std::vector<Slice *>> groups;
for (Slice *s : batch) {
const uint64_t key =
(static_cast<uint64_t>(s->target_id) << 1) |
(s->opcode == TransferRequest::WRITE ? 0u : 1u);
groups[key].push_back(s);
if (done) {
for (auto *s : p.slices) s->markSuccess();
} else if (now > p.deadline) {
LOG(ERROR) << "FlagCxTransport: transfer timeout, tid="
<< p.transfer_id;
for (auto *s : p.slices) s->markFailed();
} else {
still_pending.push_back(std::move(p));
}
for (auto &kv : groups) runSliceGroup(kv.second);
}

if (!still_pending.empty()) {
std::lock_guard<std::mutex> lk(pending_mu_);
for (auto &p : still_pending) pending_.push_back(std::move(p));
}
}

Expand Down Expand Up @@ -314,67 +331,6 @@ FlagcxP2pConn *FlagCxTransport::connForSegment(SegmentID target_id) {
return conn;
}

int FlagCxTransport::doSlice(Slice *slice) {
// Runs only on the ioWorker thread; no extra locking required here.
FlagcxP2pMr local_mr = 0;
if (!resolveLocalMr(slice->source_addr, slice->length, local_mr)) {
LOG(ERROR) << "FlagCxTransport: source not registered "
<< slice->source_addr;
return -1;
}

FlagcxP2pConn *conn = connForSegment(slice->target_id);
if (!conn) return -1;

// Resolve the remote rkey by absolute virtual address using the region
// table exchanged at handshake -- the FlagCX analogue of Mooncake's
// "look up rkey by dst_ptr". dest_offset already carries the remote
// absolute VA (request.target_offset).
FlagcxP2pRdmaDesc desc;
if (flagcxP2pEngineMakeDesc(conn, slice->flagcx.dest_offset,
static_cast<uint32_t>(slice->length),
&desc) != 0) {
LOG(ERROR) << "FlagCxTransport: MakeDesc failed for remote VA 0x"
<< std::hex << slice->flagcx.dest_offset << std::dec
<< " len=" << slice->length;
return -1;
}

std::vector<FlagcxP2pMr> mrs{local_mr};
std::vector<void *> bufs{slice->source_addr};
std::vector<size_t> sizes{slice->length};
std::vector<FlagcxP2pRdmaDesc> descs{desc};

if (slice->opcode == TransferRequest::WRITE) {
// Blocking one-sided write; on return the data has landed in the
// peer's registered buffer (no separate signal/counter needed).
if (flagcxP2pEngineWriteVectorSync(conn, mrs, bufs, sizes, descs) !=
0) {
LOG(ERROR) << "FlagCxTransport: WriteVectorSync failed";
return -1;
}
} else {
uint64_t transfer_id = 0;
if (flagcxP2pEngineReadVector(conn, mrs, bufs, sizes, descs,
/*numIovs=*/1, &transfer_id) != 0) {
LOG(ERROR) << "FlagCxTransport: ReadVector failed";
return -1;
}
// Poll to completion with a generous deadline.
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds(30);
while (!flagcxP2pEngineXferStatus(conn, transfer_id)) {
if (std::chrono::steady_clock::now() > deadline) {
LOG(ERROR) << "FlagCxTransport: ReadVector timed out, tid="
<< transfer_id;
return -1;
}
std::this_thread::yield();
}
}
return 0;
}

Status FlagCxTransport::submitTransfer(
BatchID batch_id, const std::vector<TransferRequest> &entries) {
auto &batch_desc = *reinterpret_cast<BatchDesc *>(batch_id);
Expand Down Expand Up @@ -402,11 +358,7 @@ Status FlagCxTransport::submitTransfer(
__sync_fetch_and_add(&task.slice_count, 1);
to_post.push_back(slice);
}
{
std::lock_guard<std::mutex> lk(queue_mu_);
for (auto *s : to_post) io_queue_.push_back(s);
}
queue_cv_.notify_all();
submitSlices(to_post);
return Status::OK();
}

Expand All @@ -432,16 +384,14 @@ Status FlagCxTransport::submitTransferTask(
__sync_fetch_and_add(&task.slice_count, 1);
to_post.push_back(slice);
}
{
std::lock_guard<std::mutex> lk(queue_mu_);
for (auto *s : to_post) io_queue_.push_back(s);
}
queue_cv_.notify_all();
submitSlices(to_post);
return Status::OK();
}

Status FlagCxTransport::getTransferStatus(BatchID batch_id, size_t task_id,
TransferStatus &status) {
pollPendingTransfers();

auto &batch_desc = *reinterpret_cast<BatchDesc *>(batch_id);
if (task_id >= batch_desc.task_list.size()) {
return Status::InvalidArgument(
Expand Down
Loading