diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7a6642965b..5cbd3bf4e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,7 @@ jobs: python-version: ['3.10', '3.12'] env: CI: "true" + PIP_NO_CACHE_DIR: "1" SCCACHE_GHA_ENABLED: "true" steps: @@ -229,12 +230,15 @@ jobs: echo "=== Processing coverage data ===" lcov --remove coverage.info '/usr/*' '*/test/*' '*/third_party/*' --output-file coverage.filtered.info 2>&1 || true - - echo "=== Generating HTML report ===" - genhtml coverage.filtered.info --output-directory coverage_report 2>&1 || echo "genhtml failed, continuing..." + rm -f coverage.info echo "=== Coverage summary ===" - lcov --list coverage.filtered.info 2>&1 || echo "lcov list failed" + lcov --summary coverage.filtered.info 2>&1 || echo "lcov summary failed" + + echo "=== Removing coverage intermediates ===" + find . -name "*.gcda" -delete + find . -name "*.gcno" -delete + df -h echo "=== Coverage report generation completed ===" shell: bash @@ -328,6 +332,10 @@ jobs: ubuntu-version: [ubuntu-22.04, ubuntu-24.04] python-version: ['3.10', '3.12'] runs-on: ${{ matrix.ubuntu-version }} + env: + PIP_NO_CACHE_DIR: "1" + MOONCAKE_TEST_TORCH_SPEC: "torch==2.11.0+cu128" + MOONCAKE_TEST_TORCH_INDEX_URL: "https://download.pytorch.org/whl/cu128" steps: - uses: actions/checkout@v4 with: diff --git a/mooncake-transfer-engine/include/transfer_metadata.h b/mooncake-transfer-engine/include/transfer_metadata.h index 69bd651213..5b66b1bd73 100644 --- a/mooncake-transfer-engine/include/transfer_metadata.h +++ b/mooncake-transfer-engine/include/transfer_metadata.h @@ -149,6 +149,9 @@ class TransferMetadata { uint16_t barex_port; #endif std::vector qp_num; + bool ready_ack = false; + // Decode-only marker used to detect peers that understand ready_ack. + bool ready_ack_supported = false; std::string reply_msg; // on error #ifdef USE_EFA std::string efa_addr; // EFA endpoint address (hex encoded) diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h index e98fa8e76a..e65ad85779 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h @@ -15,6 +15,7 @@ #ifndef RDMA_ENDPOINT_H #define RDMA_ENDPOINT_H +#include #include #include "rdma_context.h" @@ -93,6 +94,14 @@ class RdmaEndPoint { return status_.load(std::memory_order_relaxed) == CONNECTED; } + // CONNECTED only means local QPs have reached RTS. A passive endpoint must + // still confirm the peer has completed its active setup before posting WRs. + bool readyToSend() const { + return connected() && ready_to_send_.load(std::memory_order_relaxed); + } + + bool readyAckTimedOut() const; + bool retired() const { auto status = status_.load(std::memory_order_relaxed); return status == DESTROYING || status == DESTROYED; @@ -122,6 +131,8 @@ class RdmaEndPoint { // Resets only pre-connected handshake attempts. Once an endpoint has ever // reached CONNECTED, it is retired instead of being reused. int resetConnection(const std::string &reason); + int sendReadyAck(const std::string &peer_server_name, + const HandShakeDesc &local_desc); public: const std::string toString() const; @@ -166,6 +177,8 @@ class RdmaEndPoint { private: static constexpr uint64_t kWaitExistingHandshakeTimeoutNano = 10 * 1000000000ull; // 10 seconds + static constexpr uint64_t kReadyAckTimeoutNano = + 10 * 1000000000ull; // 10 seconds static constexpr uint32_t kWaitExistingHandshakeSpinCount = 500; static constexpr uint32_t kWaitExistingHandshakeInitialSleepUs = 50; static constexpr uint32_t kWaitExistingHandshakeMaxSleepUs = 2000; @@ -189,6 +202,8 @@ class RdmaEndPoint { std::string peer_nic_path_; std::vector peer_qp_num_list_; bool has_connected_; + std::atomic ready_to_send_; + std::atomic ready_wait_start_ts_; volatile int *wr_depth_list_; int max_wr_depth_; diff --git a/mooncake-transfer-engine/src/transfer_metadata.cpp b/mooncake-transfer-engine/src/transfer_metadata.cpp index 30eab3bc14..2475d48a95 100644 --- a/mooncake-transfer-engine/src/transfer_metadata.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata.cpp @@ -76,6 +76,7 @@ struct TransferHandshakeUtil { Json::Value qpNums(Json::arrayValue); for (const auto &qp : desc.qp_num) qpNums.append(qp); root["qp_num"] = qpNums; + root["ready_ack"] = desc.ready_ack; root["reply_msg"] = desc.reply_msg; #ifdef USE_EFA root["efa_addr"] = desc.efa_addr; // EFA endpoint address @@ -110,6 +111,12 @@ struct TransferHandshakeUtil { #endif for (const auto &qp : root["qp_num"]) desc.qp_num.push_back(qp.asUInt()); + desc.ready_ack_supported = root.isMember("ready_ack"); + if (desc.ready_ack_supported && root["ready_ack"].isBool()) { + desc.ready_ack = root["ready_ack"].asBool(); + } else { + desc.ready_ack = false; + } desc.reply_msg = root["reply_msg"].asString(); #ifdef USE_EFA desc.efa_addr = root["efa_addr"].asString(); // EFA endpoint address diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp index 0d9648bd41..e869e59e00 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp @@ -46,6 +46,8 @@ static GidSelectionSnapshot fillLocalHandshakeDesc( local_desc.local_gid = gid_selection.gid; local_desc.peer_nic_path = peer_nic; local_desc.qp_num = qp_num; + local_desc.ready_ack = false; + local_desc.ready_ack_supported = true; local_desc.reply_msg.clear(); return gid_selection; } @@ -68,6 +70,8 @@ RdmaEndPoint::RdmaEndPoint(RdmaContext &context) : context_(context), status_(INITIALIZING), has_connected_(false), + ready_to_send_(false), + ready_wait_start_ts_(0), wr_depth_list_(nullptr), active_(true), cq_outstanding_(nullptr) {} @@ -121,6 +125,8 @@ int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list, } } + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); status_.store(UNCONNECTED, std::memory_order_relaxed); return 0; } @@ -149,6 +155,8 @@ int RdmaEndPoint::reconstruct() { // Reconstruct with same parameters as original construction status_.store(INITIALIZING, std::memory_order_relaxed); + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); active_ = true; return construct(cq, num_qp, max_sge_per_wr, max_wr_depth, @@ -213,6 +221,8 @@ void RdmaEndPoint::beginDestroyLocked() { active_ = false; inactive_time_ = getCurrentTimeInNano(); status_.store(DESTROYING, std::memory_order_release); + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); // Transition QPs to ERR state so hardware flushes all inflight WRs to CQ. // This allows performPollCq to drain them naturally. @@ -309,19 +319,23 @@ int RdmaEndPoint::setupConnectionsByActive() { { RWSpinlock::WriteGuard guard(lock_); - if (connected()) { + if (readyToSend()) { LOG(INFO) << "Connection has been established"; return 0; } // loopback mode if (context_.nicPath() == peer_nic_path_) { - return doSetupConnection(context_.gid(), context_.lid(), qpNum()); + int ret = + doSetupConnection(context_.gid(), context_.lid(), qpNum()); + if (ret == 0) { + ready_wait_start_ts_.store(0, std::memory_order_relaxed); + ready_to_send_.store(true, std::memory_order_relaxed); + } + return ret; } - // Only proceed with RPC if we are the first to transition from - // UNCONNECTED. This prevents duplicate concurrent handshake attempts - // from the same endpoint. + // Only the first UNCONNECTED caller transitions to CONNECTING. auto current_status = status_.load(std::memory_order_relaxed); if (current_status == UNCONNECTED) { status_.store(CONNECTING, std::memory_order_relaxed); @@ -367,6 +381,7 @@ int RdmaEndPoint::setupConnectionsByActive() { } } RWSpinlock::ReadGuard guard(lock_); + if (readyToSend()) return 0; return connected() ? 0 : ERR_ENDPOINT; } @@ -403,127 +418,152 @@ int RdmaEndPoint::setupConnectionsByActive() { } bool retry_with_new_gid = false; + bool should_send_ready_ack = false; + HandShakeDesc ready_ack_desc; { // Re-acquire lock after RPC to finalize state transition RWSpinlock::WriteGuard guard(lock_); // Handle simultaneous open: if the peer initiates a connection // during our RPC and it is passively established in - // setupConnectionsByPassive, simply reuse the existing endpoint. + // setupConnectionsByPassive, send an explicit ready ACK after this + // active RPC confirms that the peer's passive QPs are ready. if (connected()) { if (peer_qp_num_list_ == peer_desc.qp_num) { + should_send_ready_ack = true; + ready_ack_desc = local_desc; LOG(INFO) - << "Received same peer QP numbers, reusing connection."; - return 0; - } + << "Received same peer QP numbers, sending RDMA ready " + "ACK."; + } else { + // This mismatch scenario should be rare. It may occur when + // a peer first sends us an Active RPC and establishes a + // connection, then restarts, and eventually accepts and + // responds to our Active RPC. + LOG(WARNING) + << "Peer QP list mismatch on connected endpoint, " + "re-establishing connection: " + << toString(); - // This mismatch scenario should be rare. It may occur when a - // peer first sends us an Active RPC and establishes a - // connection, then restarts, and eventually accepts and - // responds to our Active RPC. - LOG(WARNING) << "Peer QP list mismatch on connected endpoint, " - "re-establishing connection: " - << toString(); - - int ret = - resetConnection("re-establishing connection (active)"); - if (ret) return ret; + int ret = + resetConnection("re-establishing connection (active)"); + if (ret) return ret; + } } - if (!peer_desc.reply_msg.empty()) { - LOG(ERROR) << "Rejected handshake request by peer " - << local_desc.peer_nic_path; - disconnectUnlocked(); - return ERR_REJECT_HANDSHAKE; - } + if (!should_send_ready_ack) { + if (!peer_desc.reply_msg.empty()) { + LOG(ERROR) << "Rejected handshake request by peer " + << local_desc.peer_nic_path; + disconnectUnlocked(); + return ERR_REJECT_HANDSHAKE; + } - if (peer_desc.local_nic_path != peer_nic_path_ || - peer_desc.peer_nic_path != local_desc.local_nic_path) { - LOG(ERROR) << "Invalid argument: received packet mismatch, " - "local.local_nic_path: " - << local_desc.local_nic_path - << ", local.peer_nic_path: " - << local_desc.peer_nic_path - << ", peer.local_nic_path: " - << peer_desc.local_nic_path - << ", peer.peer_nic_path: " - << peer_desc.peer_nic_path; - disconnectUnlocked(); - return ERR_REJECT_HANDSHAKE; - } + if (peer_desc.local_nic_path != peer_nic_path_ || + peer_desc.peer_nic_path != local_desc.local_nic_path) { + LOG(ERROR) + << "Invalid argument: received packet mismatch, " + "local.local_nic_path: " + << local_desc.local_nic_path + << ", local.peer_nic_path: " << local_desc.peer_nic_path + << ", peer.local_nic_path: " << peer_desc.local_nic_path + << ", peer.peer_nic_path: " << peer_desc.peer_nic_path; + disconnectUnlocked(); + return ERR_REJECT_HANDSHAKE; + } - int ret = ERR_DEVICE_NOT_FOUND; - std::string failure_message; - SetupConnectionFailureInfo failure_info; - if (!peer_desc.local_gid.empty()) { - ret = doSetupConnection(peer_desc.local_gid, - peer_desc.local_lid, peer_desc.qp_num, - &failure_message, &failure_info); - } else { - auto segment_desc = - context_.engine().meta()->getSegmentDescByName( - peer_server_name); - if (segment_desc) { - for (auto &nic : segment_desc->devices) { - if (nic.name == peer_nic_name) { - ret = doSetupConnection( - nic.gid, nic.lid, peer_desc.qp_num, - &failure_message, &failure_info); - break; + int ret = ERR_DEVICE_NOT_FOUND; + std::string failure_message; + SetupConnectionFailureInfo failure_info; + if (!peer_desc.local_gid.empty()) { + ret = doSetupConnection( + peer_desc.local_gid, peer_desc.local_lid, + peer_desc.qp_num, &failure_message, &failure_info); + } else { + auto segment_desc = + context_.engine().meta()->getSegmentDescByName( + peer_server_name); + if (segment_desc) { + for (auto &nic : segment_desc->devices) { + if (nic.name == peer_nic_name) { + ret = doSetupConnection( + nic.gid, nic.lid, peer_desc.qp_num, + &failure_message, &failure_info); + break; + } } } } - } - if (ret == 0) { - return 0; - } + if (ret == 0) { + should_send_ready_ack = true; + ready_ack_desc = local_desc; + } else { + if (shouldAttemptAutoGidHandshakeRetry( + context_.autoGidSelectionEnabled(), + auto_gid_retry_count, + globalConfig().auto_gid_max_retries, + failure_info.stage == + SetupConnectionFailureStage::kRtr, + failure_info.sys_errno)) { + std::string previous_gid; + std::string next_gid; + bool reprobe_changed = context_.reprobeAutoGid( + local_gid_selection, attempted_auto_gid_selections, + &previous_gid, &next_gid); + auto current_gid_selection = context_.gidSelection(); + auto retry_action = decideAutoGidRetryAction( + reprobe_changed, local_gid_selection.gid_index, + local_gid_selection.gid, + current_gid_selection.gid_index, + current_gid_selection.gid); + if (retry_action != AutoGidRetryAction::kDoNotRetry) { + int reset_ret = resetConnection( + retry_action == AutoGidRetryAction:: + kRetryWithReprobedGid + ? "retry after auto GID reprobe (active)" + : "retry with externally reprobed GID " + "(active)"); + if (reset_ret) return reset_ret; + status_.store(CONNECTING, + std::memory_order_relaxed); + ++auto_gid_retry_count; + retry_with_new_gid = true; + LOG(WARNING) + << "Retry active handshake with updated local " + "GID on " + << context_.deviceName() << ": " + << local_gid_selection.gid << " -> " + << current_gid_selection.gid << " (attempt " + << auto_gid_retry_count << "/" + << globalConfig().auto_gid_max_retries << ")"; + } + } - if (shouldAttemptAutoGidHandshakeRetry( - context_.autoGidSelectionEnabled(), auto_gid_retry_count, - globalConfig().auto_gid_max_retries, - failure_info.stage == SetupConnectionFailureStage::kRtr, - failure_info.sys_errno)) { - std::string previous_gid; - std::string next_gid; - bool reprobe_changed = context_.reprobeAutoGid( - local_gid_selection, attempted_auto_gid_selections, - &previous_gid, &next_gid); - auto current_gid_selection = context_.gidSelection(); - auto retry_action = decideAutoGidRetryAction( - reprobe_changed, local_gid_selection.gid_index, - local_gid_selection.gid, current_gid_selection.gid_index, - current_gid_selection.gid); - if (retry_action != AutoGidRetryAction::kDoNotRetry) { - int reset_ret = resetConnection( - retry_action == - AutoGidRetryAction::kRetryWithReprobedGid - ? "retry after auto GID reprobe (active)" - : "retry with externally reprobed GID (active)"); - if (reset_ret) return reset_ret; - status_.store(CONNECTING, std::memory_order_relaxed); - ++auto_gid_retry_count; - retry_with_new_gid = true; - LOG(WARNING) - << "Retry active handshake with updated local GID on " - << context_.deviceName() << ": " - << local_gid_selection.gid << " -> " - << current_gid_selection.gid << " (attempt " - << auto_gid_retry_count << "/" - << globalConfig().auto_gid_max_retries << ")"; + if (!retry_with_new_gid) { + if (ret == ERR_DEVICE_NOT_FOUND) { + LOG(ERROR) << "Peer NIC " << peer_nic_name + << " not found in " << peer_server_name; + disconnectUnlocked(); + } else { + resetConnection("failed connection setup (active)"); + } + return ret; + } } } + } - if (!retry_with_new_gid) { - if (ret == ERR_DEVICE_NOT_FOUND) { - LOG(ERROR) << "Peer NIC " << peer_nic_name - << " not found in " << peer_server_name; - disconnectUnlocked(); - } else { - resetConnection("failed connection setup (active)"); - } - return ret; + if (should_send_ready_ack) { + int ack_ret = sendReadyAck(peer_server_name, ready_ack_desc); + RWSpinlock::WriteGuard guard(lock_); + if (ack_ret) { + resetConnection("failed to send ready ACK"); + return ack_ret; } + ready_wait_start_ts_.store(0, std::memory_order_relaxed); + ready_to_send_.store(true, std::memory_order_relaxed); + return 0; } } } @@ -531,12 +571,31 @@ int RdmaEndPoint::setupConnectionsByActive() { int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc, HandShakeDesc &local_desc) { RWSpinlock::WriteGuard guard(lock_); + if (peer_desc.ready_ack && !connected()) { + local_desc.reply_msg = + "Received RDMA ready ACK for unconnected endpoint"; + LOG(ERROR) << local_desc.reply_msg << ": " << toString(); + return ERR_REJECT_HANDSHAKE; + } + if (connected()) { // If already connected with the same peer QP info, return success if (peer_qp_num_list_ == peer_desc.qp_num) { fillLocalHandshakeDesc(context_, peer_nic_path_, qpNum(), local_desc); - LOG(INFO) << "Received same peer QP numbers, reusing connection."; + if (peer_desc.ready_ack || !peer_desc.ready_ack_supported) { + ready_wait_start_ts_.store(0, std::memory_order_relaxed); + ready_to_send_.store(true, std::memory_order_relaxed); + if (peer_desc.ready_ack) { + LOG(INFO) << "Received RDMA ready ACK."; + } else { + LOG(INFO) << "Peer does not support RDMA ready ACK, " + "reusing connection."; + } + } else { + LOG(INFO) << "Received same peer QP numbers, reusing " + "connection while waiting for ready ACK."; + } return 0; } // Different peer (e.g., peer restarted) @@ -551,9 +610,8 @@ int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc, // establish the connection on this same endpoint. Because we're holding // the lock, even if there are already Active RPCs sent to the same // peer nic path by setupConnectionsByActive on another thread, it will - // be blocked after the RPC return. Once the lock is released, - // they will simply observe the CONNECTED state and safely reuse the QP. - // This inherently handles simultaneous open. + // be blocked after the RPC return. Once the lock is released, active + // callers will confirm readiness before posting WRs. if (peer_desc.peer_nic_path != context_.nicPath() || peer_desc.local_nic_path != peer_nic_path_) { @@ -590,6 +648,14 @@ int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc, int ret = doSetupConnection(peer_gid, peer_lid, peer_desc.qp_num, &local_desc.reply_msg, &failure_info); if (ret == 0) { + if (peer_desc.ready_ack_supported) { + ready_wait_start_ts_.store(getCurrentTimeInNano(), + std::memory_order_relaxed); + ready_to_send_.store(false, std::memory_order_relaxed); + } else { + ready_wait_start_ts_.store(0, std::memory_order_relaxed); + ready_to_send_.store(true, std::memory_order_relaxed); + } return 0; } @@ -649,6 +715,8 @@ int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc, } local_desc.reply_msg = "Peer nic not found in that server: " + peer_nic_path_; + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); status_.store(UNCONNECTED, std::memory_order_relaxed); LOG(ERROR) << local_desc.reply_msg; return ERR_DEVICE_NOT_FOUND; @@ -662,6 +730,8 @@ void RdmaEndPoint::disconnect() { int RdmaEndPoint::disconnectUnlocked() { auto curr_status = status_.load(std::memory_order_acquire); if (curr_status != CONNECTED && curr_status != CONNECTING) return 0; + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); if (!has_connected_) { // Pre-connected handshake retries are allowed to reuse this endpoint: @@ -700,6 +770,8 @@ int RdmaEndPoint::disconnectUnlocked() { int RdmaEndPoint::resetConnection(const std::string &reason) { auto curr_status = status_.load(std::memory_order_acquire); if (curr_status != CONNECTING && curr_status != CONNECTED) return 0; + ready_to_send_.store(false, std::memory_order_relaxed); + ready_wait_start_ts_.store(0, std::memory_order_relaxed); if (!has_connected_) { int ret = disconnectUnlocked(); @@ -719,6 +791,35 @@ int RdmaEndPoint::resetConnection(const std::string &reason) { return ERR_ENDPOINT; } +bool RdmaEndPoint::readyAckTimedOut() const { + if (!connected() || ready_to_send_.load(std::memory_order_relaxed)) + return false; + uint64_t start_ts = ready_wait_start_ts_.load(std::memory_order_relaxed); + return start_ts != 0 && + getCurrentTimeInNano() - start_ts > kReadyAckTimeoutNano; +} + +int RdmaEndPoint::sendReadyAck(const std::string &peer_server_name, + const HandShakeDesc &local_desc) { + HandShakeDesc ready_ack_desc = local_desc; + ready_ack_desc.ready_ack = true; + + HandShakeDesc peer_desc; + int rc = context_.engine().sendHandshake(peer_server_name, ready_ack_desc, + peer_desc); + if (rc) { + LOG(ERROR) << "Failed to send RDMA ready ACK to " << peer_server_name + << ": " << rc; + return rc; + } + if (!peer_desc.reply_msg.empty()) { + LOG(ERROR) << "RDMA ready ACK rejected by " << peer_server_name << ": " + << peer_desc.reply_msg; + return ERR_REJECT_HANDSHAKE; + } + return 0; +} + const std::string RdmaEndPoint::toString() const { auto status = status_.load(std::memory_order_relaxed); if (status == CONNECTED) @@ -737,7 +838,8 @@ int RdmaEndPoint::submitPostSend( std::vector &slice_list, std::vector &failed_slice_list) { RWSpinlock::WriteGuard guard(lock_); - if (!active_ || status_.load(std::memory_order_relaxed) != CONNECTED) { + if (!active_ || status_.load(std::memory_order_relaxed) != CONNECTED || + !ready_to_send_.load(std::memory_order_relaxed)) { for (auto &slice : slice_list) failed_slice_list.push_back(slice); slice_list.clear(); return 0; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp index 39af3ce18f..ea3aa7fa2e 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp @@ -296,6 +296,18 @@ void WorkerPool::performPostSend(int thread_id) { entry.second.clear(); continue; } + if (!endpoint->readyToSend()) { + if (endpoint->readyAckTimedOut()) { + LOG(ERROR) << "Worker: Timed out waiting for RDMA ready ACK " + << "for endpoint: " << entry.first + << ", deleting endpoint"; + handlePathFailure(entry.first, endpoint.get()); + for (auto &slice : entry.second) + failed_slice_list.push_back(slice); + entry.second.clear(); + } + continue; + } // Set endpoint pointer for each slice before submitting for (auto &slice : entry.second) { slice->rdma.endpoint = endpoint.get(); diff --git a/scripts/test_installation.sh b/scripts/test_installation.sh index 9e7f07761d..54519bc900 100755 --- a/scripts/test_installation.sh +++ b/scripts/test_installation.sh @@ -37,7 +37,14 @@ echo "Running import structure test..." # Run the import structure test cp -r mooncake-wheel/tests test_env/ cd test_env -pip install torch==2.11.0 numpy +TORCH_SPEC=${MOONCAKE_TEST_TORCH_SPEC:-torch==2.11.0} +if [ -n "${MOONCAKE_TEST_TORCH_INDEX_URL:-}" ]; then + pip install "$TORCH_SPEC" numpy \ + --index-url "$MOONCAKE_TEST_TORCH_INDEX_URL" \ + --extra-index-url https://pypi.org/simple +else + pip install "$TORCH_SPEC" numpy +fi python tests/test_import_structure.py echo "Running mooncake config test..."