Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ struct SelectionPolicy {

// Transport preference list (evaluated in order)
std::vector<TransportType> transports;

// --- Link-layer QoS attributes (RFC #2519 / #2568, step-1 schema only) ---

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think add the comments about the RFC reference and splited step is not necessary

// These let a policy carry per-policy fabric QoS instead of the
// process-global MC_IB_SL / MC_IB_TC. nullopt = fall back to the global
// RdmaParams value (today's behavior). InfiniBand Service Level (0-15) and
// Traffic Class / DSCP (0-255).
std::optional<int> service_level; // nullopt = use global default
std::optional<int> traffic_class; // nullopt = use global default
// Named QP pool this policy's traffic should land on. Reserved here so the
// schema is forward-compatible: step 1 only parses/stores it; the actual
// per-class QP pool creation and routing is a follow-up (step 2). Unset =
// the current single "data QP" path.
std::optional<std::string> qp_pool;
};

/**
Expand All @@ -125,6 +138,12 @@ struct SelectionPolicy {
struct SelectionResult {
TransportType transport = UNSPEC;
uint64_t device_mask = ~0ULL; // Bitmask of allowed devices (~0 = all)
// Resolved link-layer QoS from the matched policy (RFC #2519 / #2568).
// nullopt = use the global RdmaParams default. Carried here for the
// follow-up that applies them at QP setup; step 1 only plumbs the values.
std::optional<int> service_level;
std::optional<int> traffic_class;
std::optional<std::string> qp_pool;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,34 @@ void TransportSelector::loadPolicies() {
}
}

// Parse link-layer QoS attributes (RFC #2519 / #2568, step 1: stored
// only, not yet applied to QPs). Out-of-range values are ignored so a
// bad config never breaks selection.
if (policy_json.contains("service_level")) {
int sl = policy_json.value("service_level", -1);
if (sl >= 0 && sl <= 15) {
policy.service_level = sl;
} else {
LOG(WARNING) << "Ignore service_level in policy " << policy.name
<< ", value " << sl << " out of range (0-15)";
}
}
if (policy_json.contains("traffic_class")) {
int tc = policy_json.value("traffic_class", -1);
if (tc >= 0 && tc <= 255) {
policy.traffic_class = tc;
} else {
LOG(WARNING) << "Ignore traffic_class in policy " << policy.name
<< ", value " << tc << " out of range (0-255)";
}
}
// Reserved for step 2 (per-class QP pools); parsed for forward schema
// compatibility, no effect yet.
if (policy_json.contains("qp_pool")) {
auto& qp = policy_json["qp_pool"];
if (qp.is_string()) policy.qp_pool = qp.get<std::string>();
}
Comment on lines +212 to +235

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using nlohmann::json::value() or direct retrieval on keys without type checking can throw a nlohmann::json::type_error exception if the value in the JSON configuration has an unexpected type (e.g., a string, boolean, or null). To ensure that a bad configuration never breaks selection or crashes the application, we should explicitly check the type using is_number_integer() before retrieving the value. Additionally, we should log a warning if qp_pool is present but is not a string, for consistency.

        if (policy_json.contains("service_level")) {
            auto& sl_val = policy_json["service_level"];
            if (sl_val.is_number_integer()) {
                int sl = sl_val.get<int>();
                if (sl >= 0 && sl <= 15) {
                    policy.service_level = sl;
                } else {
                    LOG(WARNING) << "Ignore service_level in policy " << policy.name
                                 << ", value " << sl << " out of range (0-15)";
                }
            } else {
                LOG(WARNING) << "Ignore service_level in policy " << policy.name
                             << ", value is not an integer";
            }
        }
        if (policy_json.contains("traffic_class")) {
            auto& tc_val = policy_json["traffic_class"];
            if (tc_val.is_number_integer()) {
                int tc = tc_val.get<int>();
                if (tc >= 0 && tc <= 255) {
                    policy.traffic_class = tc;
                } else {
                    LOG(WARNING) << "Ignore traffic_class in policy " << policy.name
                                 << ", value " << tc << " out of range (0-255)";
                }
            } else {
                LOG(WARNING) << "Ignore traffic_class in policy " << policy.name
                             << ", value is not an integer";
            }
        }
        // Reserved for step 2 (per-class QP pools); parsed for forward schema
        // compatibility, no effect yet.
        if (policy_json.contains("qp_pool")) {
            auto& qp = policy_json["qp_pool"];
            if (qp.is_string()) {
                policy.qp_pool = qp.get<std::string>();
            } else {
                LOG(WARNING) << "Ignore qp_pool in policy " << policy.name
                             << ", value is not a string";
            }
        }

Comment on lines +232 to +235

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty-string qp_pool is treated as set, which is semantically different from nullopt (= use default pool).

Comment on lines +232 to +235

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SL/TC out-of-range gets a LOG(WARNING), but "qp_pool": 42 (non-string) is silently no-op'd. For schema consistency, log a warning here too, you don't want a typo'd config to look like it took effect when it didn't.


policies_.push_back(std::move(policy));
LOG(INFO) << "Loaded transport policy: " << policy.name
<< " (segment_type=" << segment_type_str
Expand Down Expand Up @@ -384,6 +412,13 @@ SelectionResult TransportSelector::select(
return result; // UNSPEC, all devices
}

// Carry the matched policy's link-layer QoS out to the caller (RFC #2519 /
// #2568, step 1). These are plumbed but not yet applied at QP setup; that
// is the per-class QP pool follow-up (step 2).
result.service_level = matching_policy->service_level;
result.traffic_class = matching_policy->traffic_class;
result.qp_pool = matching_policy->qp_pool;

// Convert device names to mask
result.device_mask = ~0ULL; // Default: all devices
if (!matching_policy->devices.empty() && topology_) {
Expand Down
71 changes: 71 additions & 0 deletions mooncake-transfer-engine/tent/tests/transport_selector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,77 @@ TEST(TransportSelectorTest, HintNotInMatchingPolicyReturnsUnspec) {
EXPECT_EQ(r.transport, UNSPEC);
}

// RFC #2519 / #2568 step 1: a policy's link-layer QoS (service_level /
// traffic_class / qp_pool) is parsed from JSON and carried out via
// SelectionResult. (Step 1 only plumbs the values; applying them at QP setup
// is the per-class QP pool follow-up.)
TEST(TransportSelectorTest, PolicyLinkLayerQoSIsParsedAndCarried) {
auto conf = std::make_shared<Config>();
json policy;
policy["name"] = "kv-critical";
policy["segment_type"] = "memory";
policy["transports"] = {"rdma"};
policy["service_level"] = 3;
policy["traffic_class"] = 96;
policy["qp_pool"] = "kv";
conf->set("policy", json::array({policy}));

TransportSelector selector(conf);
std::array<std::shared_ptr<Transport>, kSupportedTransportTypes>
transports{};
transports[RDMA] = std::make_shared<FakeTransport>(RDMA);
static_cast<FakeTransport*>(transports[RDMA].get())->setDramToDram(true);

std::vector<TransportType> buffer_transports = {RDMA};
SelectionContext ctx;
ctx.segment_type = SegmentType::Memory;
ctx.same_machine = false;
ctx.local_memory_type = MTYPE_CPU;
ctx.remote_memory_type = MTYPE_CPU;
ctx.buffer_transports = &buffer_transports;
ctx.policy_name = "kv-critical";

auto r = selector.select(ctx, transports, /*index=*/0);
ASSERT_TRUE(r.service_level.has_value());
EXPECT_EQ(r.service_level.value(), 3);
ASSERT_TRUE(r.traffic_class.has_value());
EXPECT_EQ(r.traffic_class.value(), 96);
ASSERT_TRUE(r.qp_pool.has_value());
EXPECT_EQ(r.qp_pool.value(), "kv");
}

// Out-of-range SL/TC are ignored (left as nullopt) so a bad config never
// changes selection behavior.
TEST(TransportSelectorTest, PolicyLinkLayerQoSOutOfRangeIgnored) {
auto conf = std::make_shared<Config>();
json policy;
policy["name"] = "bad-qos";
policy["segment_type"] = "memory";
policy["transports"] = {"rdma"};
policy["service_level"] = 99; // > 15, invalid
policy["traffic_class"] = 9999; // > 255, invalid
conf->set("policy", json::array({policy}));

TransportSelector selector(conf);
std::array<std::shared_ptr<Transport>, kSupportedTransportTypes>
transports{};
transports[RDMA] = std::make_shared<FakeTransport>(RDMA);
static_cast<FakeTransport*>(transports[RDMA].get())->setDramToDram(true);

std::vector<TransportType> buffer_transports = {RDMA};
SelectionContext ctx;
ctx.segment_type = SegmentType::Memory;
ctx.same_machine = false;
ctx.local_memory_type = MTYPE_CPU;
ctx.remote_memory_type = MTYPE_CPU;
ctx.buffer_transports = &buffer_transports;
ctx.policy_name = "bad-qos";

auto r = selector.select(ctx, transports, /*index=*/0);
EXPECT_FALSE(r.service_level.has_value());
EXPECT_FALSE(r.traffic_class.has_value());
}

} // namespace
} // namespace tent
} // namespace mooncake
Loading