Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/include/transfer_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class TransferMetadata {
uint16_t barex_port;
#endif
std::vector<uint32_t> qp_num;
bool ready_ack = false;
// Decode-only marker used to detect peers that understand ready_ack.
bool ready_ack_supported = false;
std::string reply_msg; // on error
#ifdef USE_EFA
std::string efa_addr; // EFA endpoint address (hex encoded)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RDMA_ENDPOINT_H
#define RDMA_ENDPOINT_H

#include <atomic>
#include <queue>

#include "rdma_context.h"
Expand Down Expand Up @@ -93,6 +94,14 @@ class RdmaEndPoint {
return status_.load(std::memory_order_relaxed) == CONNECTED;
}

// CONNECTED only means local QPs have reached RTS. A passive endpoint must
// still confirm the peer has completed its active setup before posting WRs.
bool readyToSend() const {
return connected() && ready_to_send_.load(std::memory_order_relaxed);
}

bool readyAckTimedOut() const;

bool retired() const {
auto status = status_.load(std::memory_order_relaxed);
return status == DESTROYING || status == DESTROYED;
Expand Down Expand Up @@ -122,6 +131,8 @@ class RdmaEndPoint {
// Resets only pre-connected handshake attempts. Once an endpoint has ever
// reached CONNECTED, it is retired instead of being reused.
int resetConnection(const std::string &reason);
int sendReadyAck(const std::string &peer_server_name,
const HandShakeDesc &local_desc);

public:
const std::string toString() const;
Expand Down Expand Up @@ -166,6 +177,8 @@ class RdmaEndPoint {
private:
static constexpr uint64_t kWaitExistingHandshakeTimeoutNano =
10 * 1000000000ull; // 10 seconds
static constexpr uint64_t kReadyAckTimeoutNano =
10 * 1000000000ull; // 10 seconds
static constexpr uint32_t kWaitExistingHandshakeSpinCount = 500;
static constexpr uint32_t kWaitExistingHandshakeInitialSleepUs = 50;
static constexpr uint32_t kWaitExistingHandshakeMaxSleepUs = 2000;
Expand All @@ -189,6 +202,8 @@ class RdmaEndPoint {
std::string peer_nic_path_;
std::vector<uint32_t> peer_qp_num_list_;
bool has_connected_;
std::atomic<bool> ready_to_send_;
std::atomic<uint64_t> ready_wait_start_ts_;

volatile int *wr_depth_list_;
int max_wr_depth_;
Expand Down
7 changes: 7 additions & 0 deletions mooncake-transfer-engine/src/transfer_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct TransferHandshakeUtil {
Json::Value qpNums(Json::arrayValue);
for (const auto &qp : desc.qp_num) qpNums.append(qp);
root["qp_num"] = qpNums;
root["ready_ack"] = desc.ready_ack;
root["reply_msg"] = desc.reply_msg;
#ifdef USE_EFA
root["efa_addr"] = desc.efa_addr; // EFA endpoint address
Expand Down Expand Up @@ -110,6 +111,12 @@ struct TransferHandshakeUtil {
#endif
for (const auto &qp : root["qp_num"])
desc.qp_num.push_back(qp.asUInt());
desc.ready_ack_supported = root.isMember("ready_ack");
if (desc.ready_ack_supported && root["ready_ack"].isBool()) {
desc.ready_ack = root["ready_ack"].asBool();
} else {
desc.ready_ack = false;
}
desc.reply_msg = root["reply_msg"].asString();
#ifdef USE_EFA
desc.efa_addr = root["efa_addr"].asString(); // EFA endpoint address
Expand Down
Loading
Loading