From 839e4103d1d83f8fd42a7f6549b20acb0cffd9f2 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Tue, 16 Jun 2026 11:35:25 +0000 Subject: [PATCH 1/6] Fix cross-DSO issue (leading to empty ranges) --- .github/CODEOWNERS | 26 +++++++++---------- .../raft/core/detail/nvtx_range_stack.hpp | 4 +-- cpp/include/raft/mr/host_memory_resource.hpp | 6 ++--- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1baecf88f3..2744d3876d 100755 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,30 +1,30 @@ #cpp code owners -cpp/ @NVIDIA/raft-cpp-codeowners +cpp/ @rapidsai/raft-cpp-codeowners #python code owners -python/ @NVIDIA/raft-python-codeowners +python/ @rapidsai/raft-python-codeowners #cmake code owners -CMakeLists.txt @NVIDIA/raft-cmake-codeowners -**/cmake/ @NVIDIA/raft-cmake-codeowners -*.cmake @NVIDIA/raft-cmake-codeowners +CMakeLists.txt @rapidsai/raft-cmake-codeowners +**/cmake/ @rapidsai/raft-cmake-codeowners +*.cmake @rapidsai/raft-cmake-codeowners python/setup.py @NVIDIA/raft-cmake-codeowners build.sh @NVIDIA/raft-cmake-codeowners **/build.sh @NVIDIA/raft-cmake-codeowners #CI code owners -/.github/ @NVIDIA/adi-ci-codeowners -/ci/ @NVIDIA/adi-ci-codeowners +/.github/ @rapidsai/ci-codeowners +/ci/ @rapidsai/ci-codeowners /.shellcheckrc @NVIDIA/adi-ci-codeowners /.coderabbit.yaml @NVIDIA/adi-ci-codeowners #packaging code owners -/.pre-commit-config.yaml @NVIDIA/adi-packaging-codeowners -/.devcontainer/ @NVIDIA/adi-packaging-codeowners -/conda/ @NVIDIA/adi-packaging-codeowners -dependencies.yaml @NVIDIA/adi-packaging-codeowners -/build.sh @NVIDIA/adi-packaging-codeowners -pyproject.toml @NVIDIA/adi-packaging-codeowners +/.pre-commit-config.yaml @rapidsai/packaging-codeowners +/.devcontainer/ @rapidsai/packaging-codeowners +/conda/ @rapidsai/packaging-codeowners +dependencies.yaml @rapidsai/packaging-codeowners +/build.sh @rapidsai/packaging-codeowners +pyproject.toml @rapidsai/packaging-codeowners # Ops code owners /SECURITY.md @NVIDIA/adi-ops-codeowners diff --git a/cpp/include/raft/core/detail/nvtx_range_stack.hpp b/cpp/include/raft/core/detail/nvtx_range_stack.hpp index b0145a6904..d8c8559b51 100644 --- a/cpp/include/raft/core/detail/nvtx_range_stack.hpp +++ b/cpp/include/raft/core/detail/nvtx_range_stack.hpp @@ -76,7 +76,7 @@ struct nvtx_range_name_stack { std::shared_ptr current_{std::make_shared()}; }; -inline thread_local nvtx_range_name_stack range_name_stack_instance{}; +RAFT_EXPORT inline thread_local nvtx_range_name_stack range_name_stack_instance{}; } // namespace detail @@ -85,7 +85,7 @@ inline thread_local nvtx_range_name_stack range_name_stack_instance{}; * Pass the returned shared_ptr to another thread to read this thread's current NVTX range name at * any time. */ -inline auto thread_local_current_range() -> std::shared_ptr +RAFT_EXPORT inline auto thread_local_current_range() -> std::shared_ptr { return detail::range_name_stack_instance.current(); } diff --git a/cpp/include/raft/mr/host_memory_resource.hpp b/cpp/include/raft/mr/host_memory_resource.hpp index b17411f31c..af729cd45a 100644 --- a/cpp/include/raft/mr/host_memory_resource.hpp +++ b/cpp/include/raft/mr/host_memory_resource.hpp @@ -46,7 +46,7 @@ struct default_host_resource_holder { } }; -inline default_host_resource_holder default_host_resource_holder_{}; +RAFT_EXPORT inline default_host_resource_holder default_host_resource_holder_{}; } // namespace detail @@ -56,7 +56,7 @@ inline default_host_resource_holder default_host_resource_holder_{}; * Returns raft::mr::host_resource_ref pointing to the resource installed * via set_default_host_resource(), or new_delete_resource() if none was set. */ -inline auto get_default_host_resource() -> raft::mr::host_resource_ref +RAFT_EXPORT inline auto get_default_host_resource() -> raft::mr::host_resource_ref { return detail::default_host_resource_holder_.get(); } @@ -69,7 +69,7 @@ inline auto get_default_host_resource() -> raft::mr::host_resource_ref * @param res The resource to install. * @return The previous default host resource. */ -inline auto set_default_host_resource(raft::mr::host_resource res) -> raft::mr::host_resource +RAFT_EXPORT inline auto set_default_host_resource(raft::mr::host_resource res) -> raft::mr::host_resource { return detail::default_host_resource_holder_.set(res); } From 31979c7f64e8af565417750db017d095fa481010 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Tue, 30 Jun 2026 08:14:41 +0000 Subject: [PATCH 2/6] Add allocation event monitoring and recording adaptor for accurate memory tracking --- .../raft/core/detail/nvtx_range_stack.hpp | 63 +++++- .../raft/core/memory_tracking_resources.hpp | 113 +++++----- .../raft/mr/allocation_event_monitor.hpp | 193 ++++++++++++++++++ cpp/include/raft/mr/recording_adaptor.hpp | 172 ++++++++++++++++ cpp/tests/core/monitor_resources.cu | 47 ++++- 5 files changed, 513 insertions(+), 75 deletions(-) create mode 100644 cpp/include/raft/mr/allocation_event_monitor.hpp create mode 100644 cpp/include/raft/mr/recording_adaptor.hpp diff --git a/cpp/include/raft/core/detail/nvtx_range_stack.hpp b/cpp/include/raft/core/detail/nvtx_range_stack.hpp index d8c8559b51..1d2d6f8c33 100644 --- a/cpp/include/raft/core/detail/nvtx_range_stack.hpp +++ b/cpp/include/raft/core/detail/nvtx_range_stack.hpp @@ -1,17 +1,19 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #pragma once #include +#include #include +#include #include #include -#include #include #include +#include namespace raft { namespace common::nvtx { @@ -35,6 +37,17 @@ class current_range { return {value_, depth_}; } + /** + * Read the full root->leaf range path with instance ids, formatted as + * "name#id > name#id > ..." (empty when no range is active). + * This identifies the exact nvtx range stack responsible for an allocation. + */ + auto get_path() const -> std::string + { + std::lock_guard lock(mu_); + return path_; + } + operator std::string() const { std::lock_guard lock(mu_); @@ -45,35 +58,65 @@ class current_range { mutable std::mutex mu_; std::string value_; std::size_t depth_{0}; + std::string path_; - void set(const char* name, std::size_t depth) + void set(const char* name, std::size_t depth, std::string path) { std::lock_guard lock(mu_); value_ = name ? name : ""; depth_ = depth; + path_ = std::move(path); } }; namespace detail { +RAFT_EXPORT inline std::atomic range_instance_counter{0}; + struct nvtx_range_name_stack { void push(const char* name) { - stack_.emplace(name); - current_->set(name, stack_.size()); + ensure_current(); + auto id = range_instance_counter.fetch_add(1, std::memory_order_relaxed) + 1; + stack_.emplace_back(id, name ? name : ""); + current_->set(stack_.back().second.c_str(), stack_.size(), build_path()); } void pop() { - if (!stack_.empty()) { stack_.pop(); } - current_->set(stack_.empty() ? nullptr : stack_.top().c_str(), stack_.size()); + ensure_current(); + if (!stack_.empty()) { stack_.pop_back(); } + current_->set( + stack_.empty() ? nullptr : stack_.back().second.c_str(), stack_.size(), build_path()); } - auto current() const -> std::shared_ptr { return current_; } + [[nodiscard]] auto current() const -> std::shared_ptr + { + ensure_current(); + return current_; + } private: - std::stack stack_{}; - std::shared_ptr current_{std::make_shared()}; + void ensure_current() const + { + if (!current_) { current_ = std::make_shared(); } + } + + // Serialize the active stack as "name#id > name#id > ..." (outer -> inner). + [[nodiscard]] auto build_path() const -> std::string + { + std::string path; + for (auto const& [id, name] : stack_) { + if (!path.empty()) { path += " > "; } + path += name; + path += '#'; + path += std::to_string(id); + } + return path; + } + + std::vector> stack_{}; + mutable std::shared_ptr current_{std::make_shared()}; }; RAFT_EXPORT inline thread_local nvtx_range_name_stack range_name_stack_instance{}; diff --git a/cpp/include/raft/core/memory_tracking_resources.hpp b/cpp/include/raft/core/memory_tracking_resources.hpp index 310e3775ff..414d171ad4 100644 --- a/cpp/include/raft/core/memory_tracking_resources.hpp +++ b/cpp/include/raft/core/memory_tracking_resources.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -9,11 +9,10 @@ #include #include #include +#include #include #include -#include -#include -#include +#include #include #include @@ -31,7 +30,7 @@ namespace raft { /** * @brief A resources handle that wraps all reachable memory resources with - * allocation-tracking adaptors and logs CSV statistics from a + * allocation-recording adaptors and logs CSV statistics from a * background thread. * * Inherits from raft::resources, so it can be passed anywhere a @@ -39,12 +38,16 @@ namespace raft { * - Materializes all tracked resource types (host, device, pinned, * managed, workspace, large_workspace). * - Takes a snapshot of the original resources to keep them alive. - * - Wraps each with statistics_adaptor + notifying_adaptor. + * - Wraps each with a recording_adaptor that pushes an allocation_event + * (carrying the NVTX range captured at allocation time) onto a shared queue. * - Replaces global host and device resources with tracked versions. - * - Starts a background CSV reporter. + * - Starts a background CSV writer that drains the queue. * - * On destruction the handle stops the reporter and restores the - * global host and device resources. + * On destruction the handle stops the writer (draining all pending events) and + * restores the global host and device resources. + * + * Unlike a sampling monitor, the NVTX range is captured on the allocating + * thread at event time, so range attribution in the CSV is always correct. */ class memory_tracking_resources : public resources { public: @@ -55,7 +58,8 @@ class memory_tracking_resources : public resources { * * @param existing Resources to shallow-copy and wrap with tracking. * @param out Output stream for CSV rows (must outlive this object). - * @param sample_interval Minimum time between successive CSV samples. + * @param sample_interval Accepted for API compatibility; unused by the + * event-driven monitor (every event is recorded). */ memory_tracking_resources(const resources& existing, std::ostream& out, @@ -69,7 +73,7 @@ class memory_tracking_resources : public resources { * * @param existing Resources to shallow-copy and wrap with tracking. * @param file_path Path to the output CSV file (created/truncated). - * @param sample_interval Minimum time between successive CSV samples. + * @param sample_interval Accepted for API compatibility; unused. */ memory_tracking_resources(const resources& existing, const std::string& file_path, @@ -83,7 +87,7 @@ class memory_tracking_resources : public resources { * @brief Construct from scratch (default resources), logging to an ostream. * * @param out Output stream for CSV rows (must outlive this object). - * @param sample_interval Minimum time between successive CSV samples. + * @param sample_interval Accepted for API compatibility; unused. */ explicit memory_tracking_resources(std::ostream& out, duration sample_interval = std::chrono::milliseconds{10}) @@ -95,7 +99,7 @@ class memory_tracking_resources : public resources { * @brief Construct from scratch (default resources), logging to a file. * * @param file_path Path to the output CSV file (created/truncated). - * @param sample_interval Minimum time between successive CSV samples. + * @param sample_interval Accepted for API compatibility; unused. */ explicit memory_tracking_resources(const std::string& file_path, duration sample_interval = std::chrono::milliseconds{10}) @@ -116,17 +120,17 @@ class memory_tracking_resources : public resources { memory_tracking_resources& operator=(memory_tracking_resources const&) = delete; memory_tracking_resources& operator=(memory_tracking_resources&&) = delete; - /** @brief Access the underlying CSV reporter (e.g. to read stats). */ - [[nodiscard]] auto report() noexcept -> raft::mr::resource_monitor& { return report_; } + /** @brief Access the underlying CSV writer. */ + [[nodiscard]] auto report() noexcept -> raft::mr::allocation_event_monitor& { return report_; } private: memory_tracking_resources(const resources* existing, std::unique_ptr owned_stream, std::ostream* out_override, - duration sample_interval) + [[maybe_unused]] duration sample_interval) : resources(existing ? *existing : resources{}), owned_stream_(std::move(owned_stream)), - report_(out_override ? *out_override : *owned_stream_, sample_interval), + report_(out_override ? *out_override : *owned_stream_), old_host_(raft::mr::get_default_host_resource()), old_device_(rmm::mr::get_current_device_resource_ref()) { @@ -136,22 +140,20 @@ class memory_tracking_resources : public resources { // Declaration order determines initialization and destruction order. // snapshot_ is destroyed last (keeps original resource shared_ptrs alive). // owned_stream_ outlives report_ (report_ writes to it). - // report_ is destroyed first of the three (stops background thread). + // report_ is destroyed first of the three (stops the background thread). std::vector snapshot_; std::unique_ptr owned_stream_; - raft::mr::resource_monitor report_; + raft::mr::allocation_event_monitor report_; raft::mr::host_resource old_host_; raft::mr::device_resource old_device_; - using host_stats_t = raft::mr::statistics_adaptor; - using host_notify_t = raft::mr::notifying_adaptor; - std::unique_ptr host_adaptor_; - - using device_stats_t = raft::mr::statistics_adaptor; - using device_notify_t = raft::mr::notifying_adaptor; - - std::unique_ptr device_adaptor_; + // Host and device adaptors are installed as the *global* resources, which + // hold them by reference, so they must outlive this object's use -> owned here. + using host_adaptor_t = raft::mr::recording_adaptor; + using device_adaptor_t = raft::mr::recording_adaptor; + std::unique_ptr host_adaptor_; + std::unique_ptr device_adaptor_; void init() { @@ -179,66 +181,55 @@ class memory_tracking_resources : public resources { // Keeps original resource objects alive while tracking refs point into them. snapshot_ = resources_; + auto queue = report_.get_queue(); + + // Source ids are assigned in registration order, which must match the CSV + // column-group order below. + // --- Host (global) --- { - host_stats_t sa{raft::mr::host_resource_ref{old_host_}}; - report_.register_source("host", sa.get_stats()); - host_adaptor_ = std::make_unique(std::move(sa), report_.get_notifier()); + int id = report_.register_source("host"); + host_adaptor_ = std::make_unique(old_host_, queue, id); raft::mr::set_default_host_resource(*host_adaptor_); } // --- Pinned --- { - using stats_t = raft::mr::statistics_adaptor; - using notify_t = raft::mr::notifying_adaptor; - stats_t sa{pinned_ref}; - report_.register_source("pinned", sa.get_stats()); - raft::resource::set_pinned_memory_resource(*this, - notify_t{std::move(sa), report_.get_notifier()}); + int id = report_.register_source("pinned"); + raft::resource::set_pinned_memory_resource( + *this, + raft::mr::recording_adaptor{pinned_ref, queue, id}); } // --- Managed --- { - using stats_t = raft::mr::statistics_adaptor; - using notify_t = raft::mr::notifying_adaptor; - stats_t sa{managed_ref}; - report_.register_source("managed", sa.get_stats()); - raft::resource::set_managed_memory_resource(*this, - notify_t{std::move(sa), report_.get_notifier()}); + int id = report_.register_source("managed"); + raft::resource::set_managed_memory_resource( + *this, + raft::mr::recording_adaptor{managed_ref, queue, id}); } // --- Device (global) --- - // Invalidate the cached thrust policy (the resource_ref it captured - // will be stale once we replace the global device resource). - factories_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( - resource::resource_type::LAST_KEY, std::make_shared()); - resources_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( - resource::resource_type::LAST_KEY, std::make_shared()); { - device_stats_t sa{rmm::device_async_resource_ref{old_device_}}; - report_.register_source("device", sa.get_stats()); - device_adaptor_ = std::make_unique(std::move(sa), report_.get_notifier()); + int id = report_.register_source("device"); + device_adaptor_ = std::make_unique(old_device_, queue, id); rmm::mr::set_current_device_resource(*device_adaptor_); } // --- Workspace (track upstream to preserve limiting_resource_adaptor) --- { - using ws_stats_t = raft::mr::statistics_adaptor; - using ws_notify_t = raft::mr::notifying_adaptor; - ws_stats_t sa{upstream_ref}; - report_.register_source("workspace", sa.get_stats()); + int id = report_.register_source("workspace"); raft::resource::set_workspace_resource( - *this, ws_notify_t{std::move(sa), report_.get_notifier()}, ws_free); + *this, + raft::mr::recording_adaptor{upstream_ref, queue, id}, + ws_free); } // --- Large workspace --- { - using lws_stats_t = raft::mr::statistics_adaptor; - using lws_notify_t = raft::mr::notifying_adaptor; - lws_stats_t sa{lws_ref}; - report_.register_source("large_workspace", sa.get_stats()); + int id = report_.register_source("large_workspace"); raft::resource::set_large_workspace_resource( - *this, lws_notify_t{std::move(sa), report_.get_notifier()}); + *this, raft::mr::recording_adaptor{lws_ref, queue, id}); } report_.start(); diff --git a/cpp/include/raft/mr/allocation_event_monitor.hpp b/cpp/include/raft/mr/allocation_event_monitor.hpp new file mode 100644 index 0000000000..ea234af34d --- /dev/null +++ b/cpp/include/raft/mr/allocation_event_monitor.hpp @@ -0,0 +1,193 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace raft { +namespace mr { + +/** + * @brief A single allocation or deallocation event, captured on the allocating thread. + */ +struct allocation_event { + int source_id{0}; //< which registered source this belongs to + std::int64_t current{0}; //< source's live bytes after this event + std::int64_t total_alloc{0}; //< cumulative bytes allocated (this source) + std::int64_t total_freed{0}; //< cumulative bytes freed (this source) + std::size_t nvtx_depth{0}; //< NVTX stack depth at event time + std::string nvtx_range; //< NVTX range name active at event time + std::int64_t event_bytes{0}; //< signed bytes for THIS event (+alloc / -free) + std::string alloc_range; //< responsible range path "name#id > ..." + // captured at ALLOCATION time (empty if unknown) + std::chrono::steady_clock::time_point timestamp{}; //< when the event happened +}; + +/** + * @brief Thread-safe multi-producer / single-consumer queue of allocation_events. + */ +class allocation_event_queue { + public: + /** @brief Append an event (any thread). */ + void push(allocation_event event) + { + { + std::lock_guard lock(mtx_); + events_.push_back(std::move(event)); + } + cv_.notify_one(); + } + + /** + * @brief Block until events are available or the queue is stopped, then move + * all pending events into `out`. + * + * @return false once the queue is stopped AND drained (consumer should exit), + * true otherwise. + */ + bool wait_and_take(std::vector& out) + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return stopped_ || !events_.empty(); }); + out.clear(); + out.swap(events_); + return !(stopped_ && out.empty()); + } + + /** @brief Signal the consumer to drain and exit. */ + void stop() + { + { + std::lock_guard lock(mtx_); + stopped_ = true; + } + cv_.notify_all(); + } + + private: + std::mutex mtx_; + std::condition_variable cv_; + std::vector events_; + bool stopped_{false}; +}; + +/** + * @brief Consumes allocation_events from a queue and writes one CSV row per + * event from a background thread. + */ +class allocation_event_monitor { + public: + explicit allocation_event_monitor(std::ostream& out) : out_(out) {} + + ~allocation_event_monitor() { stop(); } + + allocation_event_monitor(allocation_event_monitor const&) = delete; + allocation_event_monitor& operator=(allocation_event_monitor const&) = delete; + + [[nodiscard]] auto get_queue() const noexcept -> std::shared_ptr + { + return queue_; + } + + /** + * @brief Register a named source and return its id (column-group index). + * Must be called before start(). + */ + auto register_source(std::string name) -> int + { + int id = static_cast(source_names_.size()); // TODO (huuanhhuyn) conflict id? + source_names_.push_back(std::move(name)); + view_.emplace_back(); + return id; + } + + void start() + { + if (worker_.joinable()) { return; } + write_header(); + worker_ = std::thread([this] { run(); }); + } + + void stop() + { + if (!worker_.joinable()) { return; } + queue_->stop(); // drains the queue and causes the worker to exit its loop + worker_.join(); + } + + private: + struct source_view { + std::int64_t current{0}; + std::int64_t total_alloc{0}; + std::int64_t total_freed{0}; + }; + + void write_header() + { + out_ << "timestamp_us"; + for (auto const& name : source_names_) { + out_ << ',' << name << "_current," << name << "_peak," << name << "_total_alloc," << name + << "_total_freed"; + } + out_ << ",nvtx_depth,nvtx_range,event_source,event_bytes,alloc_range\n"; + out_.flush(); + } + + void run() + { + std::vector batch; + for (;;) { + bool keep_going = queue_->wait_and_take(batch); + for (auto const& event : batch) { + write_row(event); + } + out_.flush(); + if (!keep_going) { break; } + } + } + + void write_row(allocation_event const& event) + { + if (event.source_id >= 0 && event.source_id < static_cast(view_.size())) { + view_[event.source_id] = source_view{event.current, event.total_alloc, event.total_freed}; + } + + auto us = + std::chrono::duration_cast(event.timestamp - start_).count(); + out_ << us; + for (auto const& v : view_) { + out_ << ',' << v.current << ',' << v.current << ',' << v.total_alloc << ',' << v.total_freed; + } + out_ << ',' << event.nvtx_depth << ",\"" << event.nvtx_range << "\""; + + auto const* src_name = + (event.source_id >= 0 && event.source_id < static_cast(source_names_.size())) + ? source_names_[event.source_id].c_str() + : ""; + out_ << ',' << src_name << ',' << event.event_bytes << ",\"" << event.alloc_range << "\"\n"; + } + + std::ostream& out_; + std::shared_ptr queue_{std::make_shared()}; + std::vector source_names_; + std::vector view_; + std::chrono::steady_clock::time_point start_{std::chrono::steady_clock::now()}; + std::thread worker_; +}; + +} // namespace mr +} // namespace raft diff --git a/cpp/include/raft/mr/recording_adaptor.hpp b/cpp/include/raft/mr/recording_adaptor.hpp new file mode 100644 index 0000000000..71086f834c --- /dev/null +++ b/cpp/include/raft/mr/recording_adaptor.hpp @@ -0,0 +1,172 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include // thread_local_current_range +#include // allocation_event, allocation_event_queue +#include // resource_stats (atomic counters, reused) + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace raft { +namespace mr { + +/** + * @brief Resource adaptor that records each allocation/deallocation as an event, + * capturing the active NVTX range AT THE TIME OF THE EVENT. + * + * Details: + * - nvtx range is captured at the event time, preventing misassociation of a later + * range with an earlier allocation. + * - event is pushed to a thread-safe queue, preventing dropped events. This is desirable + * for a profiling use case, where all events and labels are required. + */ +template +class recording_adaptor : public cuda::forward_property, Upstream> { + // Map an allocated address to the nvtx stack range responsible for the allocation. + // It allows the deallocation event to be tagged with the same range, even if the responsible + // range has ended by the time of deallocation. + struct address_range_map { + std::mutex mtx; + std::unordered_map paths; + }; + + Upstream upstream_; + std::shared_ptr stats_; + std::shared_ptr queue_; + std::shared_ptr alloc_map_; + int source_id_; + + auto record_allocation(void* ptr) noexcept -> std::string + { + std::string path; + try { + path = raft::common::nvtx::thread_local_current_range()->get_path(); + if (ptr != nullptr) { + std::lock_guard lock(alloc_map_->mtx); + alloc_map_->paths[ptr] = path; + } + } catch (...) { + } + return path; + } + + auto forget_allocation(void* ptr) noexcept -> std::string + { + std::string path; + try { + std::lock_guard lock(alloc_map_->mtx); + auto it = alloc_map_->paths.find(ptr); + if (it != alloc_map_->paths.end()) { + path = std::move(it->second); + alloc_map_->paths.erase(it); + } + } catch (...) { + // Safely returns "" on a miss + } + return path; + } + + // Build and enqueue an event from the current snapshot and nvtx range + void emit(std::string alloc_range, std::int64_t signed_bytes) noexcept + { + try { + allocation_event event; + event.source_id = source_id_; + event.current = stats_->bytes_current.load(std::memory_order_relaxed); + event.total_alloc = stats_->bytes_total_allocated.load(std::memory_order_relaxed); + event.total_freed = stats_->bytes_total_deallocated.load(std::memory_order_relaxed); + event.timestamp = std::chrono::steady_clock::now(); + auto range = raft::common::nvtx::thread_local_current_range()->get(); + event.nvtx_range = std::move(range.first); + event.nvtx_depth = range.second; + event.event_bytes = signed_bytes; + event.alloc_range = std::move(alloc_range); + queue_->push(std::move(event)); + } catch (...) { + // noexcept: profiling bookkeeping must not disrupt the allocation path, so + // any failure is swallowed. + RAFT_LOG_WARN("Failed to emit an allocation event"); + } + } + + public: + recording_adaptor(Upstream upstream, std::shared_ptr queue, int source_id) + : upstream_(std::move(upstream)), + stats_(std::make_shared()), + queue_(std::move(queue)), + alloc_map_(std::make_shared()), + source_id_(source_id) + { + } + + /** @brief Access this source's shared counters. */ + [[nodiscard]] auto get_stats() const noexcept -> std::shared_ptr + { + return stats_; + } + + void* allocate_sync(std::size_t bytes, std::size_t alignment = alignof(std::max_align_t)) + { + void* ptr = upstream_.allocate_sync(bytes, alignment); + stats_->record_allocate(static_cast(bytes)); + emit(record_allocation(ptr), static_cast(bytes)); + return ptr; + } + + void deallocate_sync(void* ptr, + std::size_t bytes, + std::size_t alignment = alignof(std::max_align_t)) noexcept + { + upstream_.deallocate_sync(ptr, bytes, alignment); + stats_->record_deallocate(static_cast(bytes)); + emit(forget_allocation(ptr), -static_cast(bytes)); + } + + template , int> = 0> + void* allocate(cuda::stream_ref stream, + std::size_t bytes, + std::size_t alignment = alignof(std::max_align_t)) + { + void* ptr = upstream_.allocate(stream, bytes, alignment); + stats_->record_allocate(static_cast(bytes)); + emit(record_allocation(ptr), static_cast(bytes)); + return ptr; + } + + template , int> = 0> + void deallocate(cuda::stream_ref stream, + void* ptr, + std::size_t bytes, + std::size_t alignment = alignof(std::max_align_t)) noexcept + { + upstream_.deallocate(stream, ptr, bytes, alignment); + stats_->record_deallocate(static_cast(bytes)); + emit(forget_allocation(ptr), -static_cast(bytes)); + } + + [[nodiscard]] bool operator==(recording_adaptor const& other) const noexcept + { + return upstream_ == other.upstream_; + } + + [[nodiscard]] auto upstream_resource() noexcept -> Upstream& { return upstream_; } + [[nodiscard]] auto upstream_resource() const noexcept -> Upstream const& { return upstream_; } +}; + +} // namespace mr +} // namespace raft diff --git a/cpp/tests/core/monitor_resources.cu b/cpp/tests/core/monitor_resources.cu index c7be4bd285..5222f10861 100644 --- a/cpp/tests/core/monitor_resources.cu +++ b/cpp/tests/core/monitor_resources.cu @@ -1,30 +1,39 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #include +#include #include +#include +#include +#include #include #include #include #include +#include +#include +#include #include #include #include namespace { +namespace nvtx = raft::common::nvtx; +using namespace std::chrono_literals; +constexpr std::size_t MiB = std::size_t{1024} * 1024; + TEST(MemoryTrackingResources, TracksDeviceAllocations) { - using namespace std::chrono_literals; - std::ostringstream oss; { raft::resources res; - raft::resource::set_workspace_to_pool_resource(res, 1024 * 1024); + raft::resource::set_workspace_to_pool_resource(res, 1 * MiB); raft::memory_tracking_resources tracked(res, oss, 1ms); @@ -49,4 +58,34 @@ TEST(MemoryTrackingResources, TracksDeviceAllocations) << output; } +TEST(MemoryTrackingResources, MismatchedRangeLabeling) +{ + const std::string csv_path = "mismatch_range_label.csv"; + + { + raft::resources res; + + raft::memory_tracking_resources tracked(res, csv_path, 1ms); + { + nvtx::range r{"1. expect 10 KB"}; + auto matrix = raft::make_host_vector(tracked, 10 * 1024); + } + { + // Deliberately huge & slow: allocating/freeing 10 GiB of host memory takes + // several ms, which makes the background sampler lag past this range's end. + // As a result this allocation's peak is mis-attributed to the NEXT range in + // the CSV (the range-labeling race discussed in the file header). Source + // attribution (host) stays correct; only the nvtx_range label is wrong. + nvtx::range r{"2. expect 10 GiB"}; + auto vector = raft::make_host_vector(tracked, 10 * 1024 * MiB); + } + { + nvtx::range r{"3. expect 4 MiB"}; + auto matrix = raft::make_host_vector(tracked, 4 * MiB); + } + } // tracked destroyed here: stops the sampler and flushes the file + + std::cout << "Wrote allocation statistics to " << csv_path << "\n"; +} + } // namespace From 416149dd28dd07b3ef71f6ae723e32c72d1ad147 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Tue, 30 Jun 2026 12:23:27 +0000 Subject: [PATCH 3/6] Clean up try catch --- .github/CODEOWNERS | 26 ++++----- .../raft/core/detail/nvtx_range_stack.hpp | 3 +- cpp/include/raft/mr/recording_adaptor.hpp | 56 ++++++++----------- 3 files changed, 38 insertions(+), 47 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2744d3876d..1baecf88f3 100755 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,30 +1,30 @@ #cpp code owners -cpp/ @rapidsai/raft-cpp-codeowners +cpp/ @NVIDIA/raft-cpp-codeowners #python code owners -python/ @rapidsai/raft-python-codeowners +python/ @NVIDIA/raft-python-codeowners #cmake code owners -CMakeLists.txt @rapidsai/raft-cmake-codeowners -**/cmake/ @rapidsai/raft-cmake-codeowners -*.cmake @rapidsai/raft-cmake-codeowners +CMakeLists.txt @NVIDIA/raft-cmake-codeowners +**/cmake/ @NVIDIA/raft-cmake-codeowners +*.cmake @NVIDIA/raft-cmake-codeowners python/setup.py @NVIDIA/raft-cmake-codeowners build.sh @NVIDIA/raft-cmake-codeowners **/build.sh @NVIDIA/raft-cmake-codeowners #CI code owners -/.github/ @rapidsai/ci-codeowners -/ci/ @rapidsai/ci-codeowners +/.github/ @NVIDIA/adi-ci-codeowners +/ci/ @NVIDIA/adi-ci-codeowners /.shellcheckrc @NVIDIA/adi-ci-codeowners /.coderabbit.yaml @NVIDIA/adi-ci-codeowners #packaging code owners -/.pre-commit-config.yaml @rapidsai/packaging-codeowners -/.devcontainer/ @rapidsai/packaging-codeowners -/conda/ @rapidsai/packaging-codeowners -dependencies.yaml @rapidsai/packaging-codeowners -/build.sh @rapidsai/packaging-codeowners -pyproject.toml @rapidsai/packaging-codeowners +/.pre-commit-config.yaml @NVIDIA/adi-packaging-codeowners +/.devcontainer/ @NVIDIA/adi-packaging-codeowners +/conda/ @NVIDIA/adi-packaging-codeowners +dependencies.yaml @NVIDIA/adi-packaging-codeowners +/build.sh @NVIDIA/adi-packaging-codeowners +pyproject.toml @NVIDIA/adi-packaging-codeowners # Ops code owners /SECURITY.md @NVIDIA/adi-ops-codeowners diff --git a/cpp/include/raft/core/detail/nvtx_range_stack.hpp b/cpp/include/raft/core/detail/nvtx_range_stack.hpp index 1d2d6f8c33..bdded9a689 100644 --- a/cpp/include/raft/core/detail/nvtx_range_stack.hpp +++ b/cpp/include/raft/core/detail/nvtx_range_stack.hpp @@ -38,9 +38,8 @@ class current_range { } /** - * Read the full root->leaf range path with instance ids, formatted as + * Read the full nvtx range path with instance ids, formatted as * "name#id > name#id > ..." (empty when no range is active). - * This identifies the exact nvtx range stack responsible for an allocation. */ auto get_path() const -> std::string { diff --git a/cpp/include/raft/mr/recording_adaptor.hpp b/cpp/include/raft/mr/recording_adaptor.hpp index 71086f834c..0c749be2a6 100644 --- a/cpp/include/raft/mr/recording_adaptor.hpp +++ b/cpp/include/raft/mr/recording_adaptor.hpp @@ -27,13 +27,7 @@ namespace mr { /** * @brief Resource adaptor that records each allocation/deallocation as an event, - * capturing the active NVTX range AT THE TIME OF THE EVENT. - * - * Details: - * - nvtx range is captured at the event time, preventing misassociation of a later - * range with an earlier allocation. - * - event is pushed to a thread-safe queue, preventing dropped events. This is desirable - * for a profiling use case, where all events and labels are required. + * and associates it with the active NVTX range AT THE TIME OF THE EVENT. */ template class recording_adaptor : public cuda::forward_property, Upstream> { @@ -47,60 +41,58 @@ class recording_adaptor : public cuda::forward_property stats_; - std::shared_ptr queue_; + std::shared_ptr queue_; // stores all alloc and dealloc events std::shared_ptr alloc_map_; int source_id_; + // Record the allocation at this memory address with the current nvtx stack auto record_allocation(void* ptr) noexcept -> std::string { - std::string path; - try { - path = raft::common::nvtx::thread_local_current_range()->get_path(); - if (ptr != nullptr) { + std::string path = ""; + if (ptr != nullptr) { + auto current_range = raft::common::nvtx::thread_local_current_range(); + if (current_range) { + path = current_range->get_path(); std::lock_guard lock(alloc_map_->mtx); alloc_map_->paths[ptr] = path; } - } catch (...) { } return path; } + // Returns nvtx stack associated with the allocation at this memory address. + // Then, remove it from allocation map auto forget_allocation(void* ptr) noexcept -> std::string { - std::string path; - try { - std::lock_guard lock(alloc_map_->mtx); - auto it = alloc_map_->paths.find(ptr); - if (it != alloc_map_->paths.end()) { - path = std::move(it->second); - alloc_map_->paths.erase(it); - } - } catch (...) { - // Safely returns "" on a miss + std::string path = ""; + std::lock_guard lock(alloc_map_->mtx); + auto it = alloc_map_->paths.find(ptr); + if (it != alloc_map_->paths.end()) { + path = std::move(it->second); + alloc_map_->paths.erase(it); } return path; } - // Build and enqueue an event from the current snapshot and nvtx range + // Enqueue an event from the current snapshot and nvtx range void emit(std::string alloc_range, std::int64_t signed_bytes) noexcept { - try { + auto current_range = raft::common::nvtx::thread_local_current_range(); + if (current_range) { allocation_event event; event.source_id = source_id_; event.current = stats_->bytes_current.load(std::memory_order_relaxed); event.total_alloc = stats_->bytes_total_allocated.load(std::memory_order_relaxed); event.total_freed = stats_->bytes_total_deallocated.load(std::memory_order_relaxed); event.timestamp = std::chrono::steady_clock::now(); - auto range = raft::common::nvtx::thread_local_current_range()->get(); - event.nvtx_range = std::move(range.first); - event.nvtx_depth = range.second; event.event_bytes = signed_bytes; + // Stores inner-range for backwards compatibility + auto range = current_range->get(); + event.nvtx_range = std::move(range.first); + event.nvtx_depth = range.second; + // Stores full-range event.alloc_range = std::move(alloc_range); queue_->push(std::move(event)); - } catch (...) { - // noexcept: profiling bookkeeping must not disrupt the allocation path, so - // any failure is swallowed. - RAFT_LOG_WARN("Failed to emit an allocation event"); } } From a9ca140b210f0207099e9c76ae98282c7cd9bf29 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Wed, 1 Jul 2026 04:24:41 -0700 Subject: [PATCH 4/6] Support both approaches --- .../raft/core/memory_tracking_resources.hpp | 316 ++++++++++++++---- cpp/include/raft/mr/host_memory_resource.hpp | 5 +- cpp/include/raft/mr/notifying_adaptor.hpp | 2 +- cpp/include/raft/mr/recording_adaptor.hpp | 2 +- ...vent_monitor.hpp => recording_monitor.hpp} | 10 +- ...ource_monitor.hpp => sampling_monitor.hpp} | 10 +- cpp/tests/core/allocation_tracking.cpp | 8 +- cpp/tests/core/monitor_resources.cu | 36 +- 8 files changed, 285 insertions(+), 104 deletions(-) rename cpp/include/raft/mr/{allocation_event_monitor.hpp => recording_monitor.hpp} (94%) rename cpp/include/raft/mr/{resource_monitor.hpp => sampling_monitor.hpp} (95%) diff --git a/cpp/include/raft/core/memory_tracking_resources.hpp b/cpp/include/raft/core/memory_tracking_resources.hpp index 414d171ad4..235d804bc9 100644 --- a/cpp/include/raft/core/memory_tracking_resources.hpp +++ b/cpp/include/raft/core/memory_tracking_resources.hpp @@ -5,14 +5,18 @@ #pragma once #include +#include #include #include #include #include -#include +#include #include #include +#include #include +#include +#include #include #include @@ -30,54 +34,100 @@ namespace raft { /** * @brief A resources handle that wraps all reachable memory resources with - * allocation-recording adaptors and logs CSV statistics from a + * allocation-tracking adaptors and logs CSV statistics from a * background thread. * * Inherits from raft::resources, so it can be passed anywhere a - * raft::resources& is expected. On construction the handle: + * raft::resources& is expected. + * + * Two underlying approaches are supported, selected at construction time: + * - **Recording approach** (default, no sample_interval): every allocation + * and deallocation is pushed as an event onto a thread-safe queue. The + * active NVTX range is captured on the allocating thread at event time, so + * range attribution in the CSV is always accurate. Use when you need + * per-event labels. + * - **Sampling approach** (pass a sample_interval): a background thread + * periodically samples aggregate statistics counters and writes a CSV row. + * Lower per-allocation overhead than the recording approach, but NVTX range + * association is approximate (whatever range is active when the sampler + * is writing). Use when you want lower invasion and don't need exact + * per-event labels. + * + * On construction the handle: * - Materializes all tracked resource types (host, device, pinned, * managed, workspace, large_workspace). * - Takes a snapshot of the original resources to keep them alive. - * - Wraps each with a recording_adaptor that pushes an allocation_event - * (carrying the NVTX range captured at allocation time) onto a shared queue. + * - Wraps each with the chosen adaptor type. + * - statistics_adaptor + notifying_adaptor for the sampling approach + * - recording_adaptor for the recording approach * - Replaces global host and device resources with tracked versions. - * - Starts a background CSV writer that drains the queue. + * - Starts a background CSV writer. * - * On destruction the handle stops the writer (draining all pending events) and - * restores the global host and device resources. - * - * Unlike a sampling monitor, the NVTX range is captured on the allocating - * thread at event time, so range attribution in the CSV is always correct. + * On destruction the handle stops the reporter and restores the + * global host and device resources. */ class memory_tracking_resources : public resources { public: using duration = std::chrono::steady_clock::duration; + // ------------------------------------------------------------------------- + // Recording approach (no sample_interval): every event is captured. + // ------------------------------------------------------------------------- + /** * @brief Construct from an existing resources handle, logging to an ostream. + * Uses the queue-based recording approach (every allocation captured). + */ + memory_tracking_resources(const resources& existing, std::ostream& out) + : memory_tracking_resources(&existing, nullptr, &out) + { + } + + /** + * @brief Construct from an existing resources handle, logging to a file. + * Uses the queue-based recording approach (every allocation captured). + */ + memory_tracking_resources(const resources& existing, const std::string& file_path) + : memory_tracking_resources(&existing, std::make_unique(file_path), nullptr) + { + } + + /** + * @brief Construct from scratch (default resources), logging to an ostream. + * Uses the queue-based recording approach (every allocation captured). + */ + explicit memory_tracking_resources(std::ostream& out) = delete; + + /** + * @brief Construct from scratch (default resources), logging to a file. + * Uses the queue-based recording approach (every allocation captured). + */ + explicit memory_tracking_resources(const std::string& file_path) = delete; + + // ------------------------------------------------------------------------- + // Sampling approach (with sample_interval): stats sampled periodically. + // ------------------------------------------------------------------------- + + /** + * @brief Construct from an existing resources handle, logging to an ostream. + * Uses the notification/sampling approach. * - * @param existing Resources to shallow-copy and wrap with tracking. - * @param out Output stream for CSV rows (must outlive this object). - * @param sample_interval Accepted for API compatibility; unused by the - * event-driven monitor (every event is recorded). + * @param sample_interval Minimum time between successive CSV samples. */ - memory_tracking_resources(const resources& existing, - std::ostream& out, - duration sample_interval = std::chrono::milliseconds{10}) + memory_tracking_resources(const resources& existing, std::ostream& out, duration sample_interval) : memory_tracking_resources(&existing, nullptr, &out, sample_interval) { } /** * @brief Construct from an existing resources handle, logging to a file. + * Uses the notification/sampling approach. * - * @param existing Resources to shallow-copy and wrap with tracking. - * @param file_path Path to the output CSV file (created/truncated). - * @param sample_interval Accepted for API compatibility; unused. + * @param sample_interval Minimum time between successive CSV samples. */ memory_tracking_resources(const resources& existing, const std::string& file_path, - duration sample_interval = std::chrono::milliseconds{10}) + duration sample_interval) : memory_tracking_resources( &existing, std::make_unique(file_path), nullptr, sample_interval) { @@ -85,24 +135,22 @@ class memory_tracking_resources : public resources { /** * @brief Construct from scratch (default resources), logging to an ostream. + * Uses the notification/sampling approach. * - * @param out Output stream for CSV rows (must outlive this object). - * @param sample_interval Accepted for API compatibility; unused. + * @param sample_interval Minimum time between successive CSV samples. */ - explicit memory_tracking_resources(std::ostream& out, - duration sample_interval = std::chrono::milliseconds{10}) + memory_tracking_resources(std::ostream& out, duration sample_interval) : memory_tracking_resources(nullptr, nullptr, &out, sample_interval) { } /** * @brief Construct from scratch (default resources), logging to a file. + * Uses the notification/sampling approach. * - * @param file_path Path to the output CSV file (created/truncated). - * @param sample_interval Accepted for API compatibility; unused. + * @param sample_interval Minimum time between successive CSV samples. */ - explicit memory_tracking_resources(const std::string& file_path, - duration sample_interval = std::chrono::milliseconds{10}) + memory_tracking_resources(const std::string& file_path, duration sample_interval) : memory_tracking_resources( nullptr, std::make_unique(file_path), nullptr, sample_interval) { @@ -110,7 +158,8 @@ class memory_tracking_resources : public resources { ~memory_tracking_resources() override { - report_.stop(); + if (recorder_) recorder_->stop(); + if (sampler_) sampler_->stop(); raft::mr::set_default_host_resource(old_host_); rmm::mr::set_current_device_resource(old_device_); } @@ -120,42 +169,153 @@ class memory_tracking_resources : public resources { memory_tracking_resources& operator=(memory_tracking_resources const&) = delete; memory_tracking_resources& operator=(memory_tracking_resources&&) = delete; - /** @brief Access the underlying CSV writer. */ - [[nodiscard]] auto report() noexcept -> raft::mr::allocation_event_monitor& { return report_; } + /** @brief Access the recording monitor (non-null for recording approach only). */ + [[nodiscard]] auto get_recorder() noexcept -> raft::mr::recording_monitor* + { + return recorder_.get(); + } + + /** @brief Access the sampling monitor (non-null for sampling approach only). */ + [[nodiscard]] auto get_sampler() noexcept -> raft::mr::sampling_monitor* + { + return sampler_.get(); + } private: + // Constructor for the recording approach (no sample_interval). + memory_tracking_resources(const resources* existing, + std::unique_ptr owned_stream, + std::ostream* out_override) + : resources(existing ? *existing : resources{}), + owned_stream_(std::move(owned_stream)), + old_host_(raft::mr::get_default_host_resource()), + old_device_(rmm::mr::get_current_device_resource_ref()) + { + std::ostream& out = + *(out_override ? out_override : static_cast(owned_stream_.get())); + RAFT_LOG_INFO("memory_tracking_resources: using queue-based recording approach"); + recorder_ = std::make_unique(out); + init_recording(); + } + + // Constructor for the sampling approach (with sample_interval). memory_tracking_resources(const resources* existing, std::unique_ptr owned_stream, std::ostream* out_override, - [[maybe_unused]] duration sample_interval) + duration sample_interval) : resources(existing ? *existing : resources{}), owned_stream_(std::move(owned_stream)), - report_(out_override ? *out_override : *owned_stream_), old_host_(raft::mr::get_default_host_resource()), old_device_(rmm::mr::get_current_device_resource_ref()) { - init(); + std::ostream& out = + *(out_override ? out_override : static_cast(owned_stream_.get())); + auto us = std::chrono::duration_cast(sample_interval).count(); + RAFT_LOG_INFO("memory_tracking_resources: using sampling approach with interval=%lld us", + (long long)us); + sampler_ = std::make_unique(out, sample_interval); + init_sampling(); } // Declaration order determines initialization and destruction order. // snapshot_ is destroyed last (keeps original resource shared_ptrs alive). - // owned_stream_ outlives report_ (report_ writes to it). - // report_ is destroyed first of the three (stops the background thread). + // owned_stream_ outlives recorder_/sampler_ (they write to it). + // recorder_/sampler_ are stopped in the destructor body before member destruction. std::vector snapshot_; std::unique_ptr owned_stream_; - raft::mr::allocation_event_monitor report_; + std::unique_ptr recorder_; + std::unique_ptr sampler_; raft::mr::host_resource old_host_; raft::mr::device_resource old_device_; - // Host and device adaptors are installed as the *global* resources, which - // hold them by reference, so they must outlive this object's use -> owned here. - using host_adaptor_t = raft::mr::recording_adaptor; - using device_adaptor_t = raft::mr::recording_adaptor; - std::unique_ptr host_adaptor_; - std::unique_ptr device_adaptor_; + // --- Recording approach adaptors (owned because installed as global resources) --- + using host_record_t = raft::mr::recording_adaptor; + using device_record_t = raft::mr::recording_adaptor; + std::unique_ptr host_record_adaptor_; + std::unique_ptr device_record_adaptor_; + + // --- Sampling approach adaptors (owned because installed as global resources) --- + using host_stats_t = raft::mr::statistics_adaptor; + using host_notify_t = raft::mr::notifying_adaptor; + using device_stats_t = raft::mr::statistics_adaptor; + using device_notify_t = raft::mr::notifying_adaptor; + std::unique_ptr host_notify_adaptor_; + std::unique_ptr device_notify_adaptor_; + + void init_recording() + { + // Independent-counting invariant: see comment in init_sampling() below. + auto* ws = raft::resource::get_workspace_resource(*this); + auto ws_free = raft::resource::get_workspace_free_bytes(*this); + auto upstream_ref = ws->get_upstream_resource(); + auto lws_ref = raft::resource::get_large_workspace_resource_ref(*this); + auto pinned_ref = raft::resource::get_pinned_memory_resource_ref(*this); + auto managed_ref = raft::resource::get_managed_memory_resource_ref(*this); + + snapshot_ = resources_; + + auto queue = recorder_->get_queue(); + + // Source ids are assigned in registration order, which must match the CSV + // column-group order below. + + // --- Host (global) --- + { + int id = recorder_->register_source("host"); + host_record_adaptor_ = std::make_unique(old_host_, queue, id); + raft::mr::set_default_host_resource(*host_record_adaptor_); + } + + // --- Pinned --- + { + int id = recorder_->register_source("pinned"); + raft::resource::set_pinned_memory_resource( + *this, + raft::mr::recording_adaptor{pinned_ref, queue, id}); + } - void init() + // --- Managed --- + { + int id = recorder_->register_source("managed"); + raft::resource::set_managed_memory_resource( + *this, + raft::mr::recording_adaptor{managed_ref, queue, id}); + } + + // --- Device (global) --- + { + // Invalidate the cached thrust policy (the resource_ref it captured + // will be stale once we replace the global device resource). + factories_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( + resource::resource_type::LAST_KEY, std::make_shared()); + resources_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( + resource::resource_type::LAST_KEY, std::make_shared()); + int id = recorder_->register_source("device"); + device_record_adaptor_ = std::make_unique(old_device_, queue, id); + rmm::mr::set_current_device_resource(*device_record_adaptor_); + } + + // --- Workspace (track upstream to preserve limiting_resource_adaptor) --- + { + int id = recorder_->register_source("workspace"); + raft::resource::set_workspace_resource( + *this, + raft::mr::recording_adaptor{upstream_ref, queue, id}, + ws_free); + } + + // --- Large workspace --- + { + int id = recorder_->register_source("large_workspace"); + raft::resource::set_large_workspace_resource( + *this, raft::mr::recording_adaptor{lws_ref, queue, id}); + } + + recorder_->start(); + } + + void init_sampling() { // Independent-counting invariant // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -178,61 +338,73 @@ class memory_tracking_resources : public resources { auto pinned_ref = raft::resource::get_pinned_memory_resource_ref(*this); auto managed_ref = raft::resource::get_managed_memory_resource_ref(*this); - // Keeps original resource objects alive while tracking refs point into them. snapshot_ = resources_; - auto queue = report_.get_queue(); - - // Source ids are assigned in registration order, which must match the CSV - // column-group order below. - // --- Host (global) --- { - int id = report_.register_source("host"); - host_adaptor_ = std::make_unique(old_host_, queue, id); - raft::mr::set_default_host_resource(*host_adaptor_); + host_stats_t sa{raft::mr::host_resource_ref{old_host_}}; + sampler_->register_source("host", sa.get_stats()); + host_notify_adaptor_ = + std::make_unique(std::move(sa), sampler_->get_notifier()); + raft::mr::set_default_host_resource(*host_notify_adaptor_); } // --- Pinned --- { - int id = report_.register_source("pinned"); - raft::resource::set_pinned_memory_resource( - *this, - raft::mr::recording_adaptor{pinned_ref, queue, id}); + using stats_t = raft::mr::statistics_adaptor; + using notify_t = raft::mr::notifying_adaptor; + stats_t sa{pinned_ref}; + sampler_->register_source("pinned", sa.get_stats()); + raft::resource::set_pinned_memory_resource(*this, + notify_t{std::move(sa), sampler_->get_notifier()}); } // --- Managed --- { - int id = report_.register_source("managed"); + using stats_t = raft::mr::statistics_adaptor; + using notify_t = raft::mr::notifying_adaptor; + stats_t sa{managed_ref}; + sampler_->register_source("managed", sa.get_stats()); raft::resource::set_managed_memory_resource( - *this, - raft::mr::recording_adaptor{managed_ref, queue, id}); + *this, notify_t{std::move(sa), sampler_->get_notifier()}); } // --- Device (global) --- { - int id = report_.register_source("device"); - device_adaptor_ = std::make_unique(old_device_, queue, id); - rmm::mr::set_current_device_resource(*device_adaptor_); + // Invalidate the cached thrust policy (the resource_ref it captured + // will be stale once we replace the global device resource). + factories_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( + resource::resource_type::LAST_KEY, std::make_shared()); + resources_.at(resource::resource_type::THRUST_POLICY) = std::make_pair( + resource::resource_type::LAST_KEY, std::make_shared()); + device_stats_t sa{rmm::device_async_resource_ref{old_device_}}; + sampler_->register_source("device", sa.get_stats()); + device_notify_adaptor_ = + std::make_unique(std::move(sa), sampler_->get_notifier()); + rmm::mr::set_current_device_resource(*device_notify_adaptor_); } // --- Workspace (track upstream to preserve limiting_resource_adaptor) --- { - int id = report_.register_source("workspace"); + using ws_stats_t = raft::mr::statistics_adaptor; + using ws_notify_t = raft::mr::notifying_adaptor; + ws_stats_t sa{upstream_ref}; + sampler_->register_source("workspace", sa.get_stats()); raft::resource::set_workspace_resource( - *this, - raft::mr::recording_adaptor{upstream_ref, queue, id}, - ws_free); + *this, ws_notify_t{std::move(sa), sampler_->get_notifier()}, ws_free); } // --- Large workspace --- { - int id = report_.register_source("large_workspace"); + using lws_stats_t = raft::mr::statistics_adaptor; + using lws_notify_t = raft::mr::notifying_adaptor; + lws_stats_t sa{lws_ref}; + sampler_->register_source("large_workspace", sa.get_stats()); raft::resource::set_large_workspace_resource( - *this, raft::mr::recording_adaptor{lws_ref, queue, id}); + *this, lws_notify_t{std::move(sa), sampler_->get_notifier()}); } - report_.start(); + sampler_->start(); } }; diff --git a/cpp/include/raft/mr/host_memory_resource.hpp b/cpp/include/raft/mr/host_memory_resource.hpp index af729cd45a..e2805a0143 100644 --- a/cpp/include/raft/mr/host_memory_resource.hpp +++ b/cpp/include/raft/mr/host_memory_resource.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -69,7 +69,8 @@ RAFT_EXPORT inline auto get_default_host_resource() -> raft::mr::host_resource_r * @param res The resource to install. * @return The previous default host resource. */ -RAFT_EXPORT inline auto set_default_host_resource(raft::mr::host_resource res) -> raft::mr::host_resource +RAFT_EXPORT inline auto set_default_host_resource(raft::mr::host_resource res) + -> raft::mr::host_resource { return detail::default_host_resource_holder_.set(res); } diff --git a/cpp/include/raft/mr/notifying_adaptor.hpp b/cpp/include/raft/mr/notifying_adaptor.hpp index 09be7f3895..a296535c0e 100644 --- a/cpp/include/raft/mr/notifying_adaptor.hpp +++ b/cpp/include/raft/mr/notifying_adaptor.hpp @@ -67,7 +67,7 @@ class notifier { * or deallocation. * * Forwards all calls to the upstream resource, then calls notifier::notify(). - * A separate consumer (e.g. resource_monitor) can call notifier::wait() to + * A separate consumer (e.g. sampling_monitor) can call notifier::wait() to * block until activity occurs. * * @tparam Upstream Stored by value. Use a concrete resource type for owning diff --git a/cpp/include/raft/mr/recording_adaptor.hpp b/cpp/include/raft/mr/recording_adaptor.hpp index 0c749be2a6..fe0788359e 100644 --- a/cpp/include/raft/mr/recording_adaptor.hpp +++ b/cpp/include/raft/mr/recording_adaptor.hpp @@ -6,7 +6,7 @@ #include #include // thread_local_current_range -#include // allocation_event, allocation_event_queue +#include // allocation_event, allocation_event_queue #include // resource_stats (atomic counters, reused) #include diff --git a/cpp/include/raft/mr/allocation_event_monitor.hpp b/cpp/include/raft/mr/recording_monitor.hpp similarity index 94% rename from cpp/include/raft/mr/allocation_event_monitor.hpp rename to cpp/include/raft/mr/recording_monitor.hpp index ea234af34d..3ee81032e2 100644 --- a/cpp/include/raft/mr/allocation_event_monitor.hpp +++ b/cpp/include/raft/mr/recording_monitor.hpp @@ -89,14 +89,14 @@ class allocation_event_queue { * @brief Consumes allocation_events from a queue and writes one CSV row per * event from a background thread. */ -class allocation_event_monitor { +class recording_monitor { public: - explicit allocation_event_monitor(std::ostream& out) : out_(out) {} + explicit recording_monitor(std::ostream& out) : out_(out) {} - ~allocation_event_monitor() { stop(); } + ~recording_monitor() { stop(); } - allocation_event_monitor(allocation_event_monitor const&) = delete; - allocation_event_monitor& operator=(allocation_event_monitor const&) = delete; + recording_monitor(recording_monitor const&) = delete; + recording_monitor& operator=(recording_monitor const&) = delete; [[nodiscard]] auto get_queue() const noexcept -> std::shared_ptr { diff --git a/cpp/include/raft/mr/resource_monitor.hpp b/cpp/include/raft/mr/sampling_monitor.hpp similarity index 95% rename from cpp/include/raft/mr/resource_monitor.hpp rename to cpp/include/raft/mr/sampling_monitor.hpp index 1dc2311796..836a08926f 100644 --- a/cpp/include/raft/mr/resource_monitor.hpp +++ b/cpp/include/raft/mr/sampling_monitor.hpp @@ -39,7 +39,7 @@ namespace mr { * * start() and stop() are idempotent. */ -class resource_monitor { +class sampling_monitor { std::ostream& out_; std::chrono::steady_clock::duration sample_interval_; std::shared_ptr notifier_; @@ -55,7 +55,7 @@ class resource_monitor { * @param out Output stream for CSV rows. * @param sample_interval Minimum time between successive samples. */ - explicit resource_monitor(std::ostream& out, std::chrono::steady_clock::duration sample_interval) + explicit sampling_monitor(std::ostream& out, std::chrono::steady_clock::duration sample_interval) : out_(out), sample_interval_(sample_interval), notifier_(std::make_shared()), @@ -63,10 +63,10 @@ class resource_monitor { { } - ~resource_monitor() { stop(); } + ~sampling_monitor() { stop(); } - resource_monitor(resource_monitor const&) = delete; - resource_monitor& operator=(resource_monitor const&) = delete; + sampling_monitor(sampling_monitor const&) = delete; + sampling_monitor& operator=(sampling_monitor const&) = delete; /** * @brief Shared notifier for notifying_adaptor instances. diff --git a/cpp/tests/core/allocation_tracking.cpp b/cpp/tests/core/allocation_tracking.cpp index 8f5758315f..2936736a38 100644 --- a/cpp/tests/core/allocation_tracking.cpp +++ b/cpp/tests/core/allocation_tracking.cpp @@ -5,7 +5,7 @@ #include #include -#include +#include #include #include @@ -69,7 +69,7 @@ TEST(AllocationReport, WritesCSVOnDirty) using namespace std::chrono_literals; std::ostringstream oss; - raft::mr::resource_monitor report(oss, 1ms); + raft::mr::sampling_monitor report(oss, 1ms); auto host_stats = std::make_shared(); auto pinned_stats = std::make_shared(); @@ -98,7 +98,7 @@ TEST(AllocationReport, StartStopIdempotent) using namespace std::chrono_literals; std::ostringstream oss; - raft::mr::resource_monitor report(oss, 1ms); + raft::mr::sampling_monitor report(oss, 1ms); auto stats = std::make_shared(); report.register_source("test", stats); @@ -124,7 +124,7 @@ TEST(AllocationReport, DestructorCallsStop) std::ostringstream oss; { auto stats = std::make_shared(); - raft::mr::resource_monitor report(oss, 1ms); + raft::mr::sampling_monitor report(oss, 1ms); report.register_source("test", stats); stats->record_allocate(256); diff --git a/cpp/tests/core/monitor_resources.cu b/cpp/tests/core/monitor_resources.cu index 5222f10861..672df2d8e8 100644 --- a/cpp/tests/core/monitor_resources.cu +++ b/cpp/tests/core/monitor_resources.cu @@ -28,7 +28,9 @@ namespace nvtx = raft::common::nvtx; using namespace std::chrono_literals; constexpr std::size_t MiB = std::size_t{1024} * 1024; -TEST(MemoryTrackingResources, TracksDeviceAllocations) +// TODO improve tests (coverage + multiple allocating threads) + +TEST(MemoryTrackingResources, Sampling) { std::ostringstream oss; { @@ -58,26 +60,21 @@ TEST(MemoryTrackingResources, TracksDeviceAllocations) << output; } -TEST(MemoryTrackingResources, MismatchedRangeLabeling) +TEST(MemoryTrackingResources, Recording) { - const std::string csv_path = "mismatch_range_label.csv"; - + std::ostringstream oss; { raft::resources res; - - raft::memory_tracking_resources tracked(res, csv_path, 1ms); + raft::memory_tracking_resources tracked(res, oss); { nvtx::range r{"1. expect 10 KB"}; auto matrix = raft::make_host_vector(tracked, 10 * 1024); } { - // Deliberately huge & slow: allocating/freeing 10 GiB of host memory takes - // several ms, which makes the background sampler lag past this range's end. - // As a result this allocation's peak is mis-attributed to the NEXT range in - // the CSV (the range-labeling race discussed in the file header). Source - // attribution (host) stays correct; only the nvtx_range label is wrong. - nvtx::range r{"2. expect 10 GiB"}; - auto vector = raft::make_host_vector(tracked, 10 * 1024 * MiB); + // Deliberately large allocation to test that the memory tracking + // resources can handle a large allocation and labels it correctly + nvtx::range r{"2. expect 100 MiB"}; + auto vector = raft::make_host_vector(tracked, 100 * MiB); } { nvtx::range r{"3. expect 4 MiB"}; @@ -85,7 +82,18 @@ TEST(MemoryTrackingResources, MismatchedRangeLabeling) } } // tracked destroyed here: stops the sampler and flushes the file - std::cout << "Wrote allocation statistics to " << csv_path << "\n"; + auto output = oss.str(); + auto num_lines = std::count(output.begin(), output.end(), '\n'); + constexpr size_t NUM_ALLOCS = 3; + constexpr size_t NUM_DEALLOCS = NUM_ALLOCS; + constexpr size_t NUM_HEADER_LINES = 1; + constexpr size_t NUM_LINES_EXPECTED = NUM_ALLOCS + NUM_DEALLOCS + NUM_HEADER_LINES; + EXPECT_GE(num_lines, NUM_LINES_EXPECTED) + << "Expected at least " << NUM_LINES_EXPECTED + << " data records (allocation + deallocation + header); got " << num_lines << " lines" + << std::endl + << "content: " << std::endl + << output; } } // namespace From a315972ed9c254ab60e08a3442010352d6b6fd13 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Wed, 1 Jul 2026 05:43:15 -0700 Subject: [PATCH 5/6] Stress test with multiple parallel threads and allocations --- cpp/tests/core/monitor_resources.cu | 99 +++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 7 deletions(-) diff --git a/cpp/tests/core/monitor_resources.cu b/cpp/tests/core/monitor_resources.cu index 672df2d8e8..91f38a43ba 100644 --- a/cpp/tests/core/monitor_resources.cu +++ b/cpp/tests/core/monitor_resources.cu @@ -28,9 +28,7 @@ namespace nvtx = raft::common::nvtx; using namespace std::chrono_literals; constexpr std::size_t MiB = std::size_t{1024} * 1024; -// TODO improve tests (coverage + multiple allocating threads) - -TEST(MemoryTrackingResources, Sampling) +TEST(MemoryTrackingResources, SamplingSingleThread) { std::ostringstream oss; { @@ -60,7 +58,7 @@ TEST(MemoryTrackingResources, Sampling) << output; } -TEST(MemoryTrackingResources, Recording) +TEST(MemoryTrackingResources, RecordingSingleThread) { std::ostringstream oss; { @@ -71,8 +69,6 @@ TEST(MemoryTrackingResources, Recording) auto matrix = raft::make_host_vector(tracked, 10 * 1024); } { - // Deliberately large allocation to test that the memory tracking - // resources can handle a large allocation and labels it correctly nvtx::range r{"2. expect 100 MiB"}; auto vector = raft::make_host_vector(tracked, 100 * MiB); } @@ -82,7 +78,15 @@ TEST(MemoryTrackingResources, Recording) } } // tracked destroyed here: stops the sampler and flushes the file - auto output = oss.str(); + auto output = oss.str(); + EXPECT_NE(output.find("timestamp_us"), std::string::npos); + EXPECT_NE(output.find("host_current"), std::string::npos); + EXPECT_NE(output.find("device_current"), std::string::npos); + EXPECT_NE(output.find("workspace_current"), std::string::npos); + EXPECT_NE(output.find("event_source"), std::string::npos); + EXPECT_NE(output.find("event_bytes"), std::string::npos); + EXPECT_NE(output.find("alloc_range"), std::string::npos); + auto num_lines = std::count(output.begin(), output.end(), '\n'); constexpr size_t NUM_ALLOCS = 3; constexpr size_t NUM_DEALLOCS = NUM_ALLOCS; @@ -96,4 +100,85 @@ TEST(MemoryTrackingResources, Recording) << output; } +// --------------------------------------------------------------------------- +// Parallel-thread stress tests +// --------------------------------------------------------------------------- + +TEST(MemoryTrackingResources, RecordingParallelThreads) +{ + constexpr int kNumThreads = 64; + constexpr int kNumIters = 200; + constexpr std::size_t kAllocSize = 256 * 1024; // 256 KiB + + std::ostringstream oss; + { + raft::resources res; + raft::memory_tracking_resources tracked(res, oss); + + // Lambda captures tracked directly — no base-class cast needed. + // Each thread tags its allocations with a distinct NVTX range so the + // CSV output carries per-thread attribution. + auto run = [&](int t) { + for (int i = 0; i < kNumIters; ++i) { + std::string label = std::string("thread-") + std::to_string(t); + nvtx::range r{label.c_str()}; + auto vec = raft::make_host_vector(tracked, kAllocSize); + } + }; + + std::vector workers; + for (int t = 0; t < kNumThreads; ++t) { + workers.emplace_back(run, t); + } + for (auto& w : workers) w.join(); + } // tracked destroyed: drains queue, flushes CSV + + std::string output = oss.str(); + auto num_lines = std::count(output.begin(), output.end(), '\n'); + + // Each iteration: one alloc row + one dealloc row; plus one header line. + const int expected_rows = kNumThreads * kNumIters * 2 + 1; + EXPECT_GE(num_lines, expected_rows) + << "Expected at least " << expected_rows << " lines; got " << num_lines + << "\noutput:\n" + << output; +} + +TEST(MemoryTrackingResources, SamplingParallelThreads) +{ + constexpr int kNumThreads = 64; + constexpr int kNumIters = 200; + constexpr std::size_t kAllocSize = 256 * 1024; // 256 KiB + + std::ostringstream oss; + { + raft::resources res; + raft::memory_tracking_resources tracked(res, oss, 1us); + + auto run = [&](int t) { + for (int i = 0; i < kNumIters; ++i) { + std::string label = std::string("thread-") + std::to_string(t); + nvtx::range r{label.c_str()}; + auto vec = raft::make_host_vector(tracked, kAllocSize); + } + }; + + std::vector workers; + for (int t = 0; t < kNumThreads; ++t) { + workers.emplace_back(run, t); + } + for (auto& w : workers) w.join(); + } + + std::string output = oss.str(); + auto num_lines = std::count(output.begin(), output.end(), '\n'); + + // Sampling approach drops many rows + const int max_num_rows = kNumThreads * kNumIters * 2 + 1; + EXPECT_TRUE(3 < num_lines && num_lines < max_num_rows) + << "Expected at least several rows. It is expected when many rows are dropped; got " << num_lines + << "\noutput:\n" + << output; +} + } // namespace From bd98bbd60b2981963bade05312d03d2a81020393 Mon Sep 17 00:00:00 2001 From: Huy Nguyen Date: Thu, 2 Jul 2026 06:26:02 +0000 Subject: [PATCH 6/6] precommit run --- .../raft/core/memory_tracking_resources.hpp | 4 ++-- cpp/include/raft/mr/notifying_adaptor.hpp | 2 +- cpp/include/raft/mr/recording_adaptor.hpp | 2 +- cpp/include/raft/mr/sampling_monitor.hpp | 2 +- cpp/tests/core/allocation_tracking.cpp | 2 +- cpp/tests/core/monitor_resources.cu | 19 ++++++++++--------- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp/include/raft/core/memory_tracking_resources.hpp b/cpp/include/raft/core/memory_tracking_resources.hpp index 235d804bc9..76928b9733 100644 --- a/cpp/include/raft/core/memory_tracking_resources.hpp +++ b/cpp/include/raft/core/memory_tracking_resources.hpp @@ -10,11 +10,11 @@ #include #include #include -#include #include #include #include #include +#include #include #include @@ -50,7 +50,7 @@ namespace raft { * periodically samples aggregate statistics counters and writes a CSV row. * Lower per-allocation overhead than the recording approach, but NVTX range * association is approximate (whatever range is active when the sampler - * is writing). Use when you want lower invasion and don't need exact + * is writing). Use when you want lower invasion and don't need exact * per-event labels. * * On construction the handle: diff --git a/cpp/include/raft/mr/notifying_adaptor.hpp b/cpp/include/raft/mr/notifying_adaptor.hpp index a296535c0e..2601d04aa7 100644 --- a/cpp/include/raft/mr/notifying_adaptor.hpp +++ b/cpp/include/raft/mr/notifying_adaptor.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #pragma once diff --git a/cpp/include/raft/mr/recording_adaptor.hpp b/cpp/include/raft/mr/recording_adaptor.hpp index fe0788359e..22419d151e 100644 --- a/cpp/include/raft/mr/recording_adaptor.hpp +++ b/cpp/include/raft/mr/recording_adaptor.hpp @@ -6,7 +6,7 @@ #include #include // thread_local_current_range -#include // allocation_event, allocation_event_queue +#include // allocation_event, allocation_event_queue #include // resource_stats (atomic counters, reused) #include diff --git a/cpp/include/raft/mr/sampling_monitor.hpp b/cpp/include/raft/mr/sampling_monitor.hpp index 836a08926f..9f5d67719a 100644 --- a/cpp/include/raft/mr/sampling_monitor.hpp +++ b/cpp/include/raft/mr/sampling_monitor.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ #pragma once diff --git a/cpp/tests/core/allocation_tracking.cpp b/cpp/tests/core/allocation_tracking.cpp index 2936736a38..367a52ed58 100644 --- a/cpp/tests/core/allocation_tracking.cpp +++ b/cpp/tests/core/allocation_tracking.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ diff --git a/cpp/tests/core/monitor_resources.cu b/cpp/tests/core/monitor_resources.cu index 91f38a43ba..b1936cfdce 100644 --- a/cpp/tests/core/monitor_resources.cu +++ b/cpp/tests/core/monitor_resources.cu @@ -130,17 +130,17 @@ TEST(MemoryTrackingResources, RecordingParallelThreads) for (int t = 0; t < kNumThreads; ++t) { workers.emplace_back(run, t); } - for (auto& w : workers) w.join(); + for (auto& w : workers) + w.join(); } // tracked destroyed: drains queue, flushes CSV std::string output = oss.str(); - auto num_lines = std::count(output.begin(), output.end(), '\n'); + auto num_lines = std::count(output.begin(), output.end(), '\n'); // Each iteration: one alloc row + one dealloc row; plus one header line. const int expected_rows = kNumThreads * kNumIters * 2 + 1; EXPECT_GE(num_lines, expected_rows) - << "Expected at least " << expected_rows << " lines; got " << num_lines - << "\noutput:\n" + << "Expected at least " << expected_rows << " lines; got " << num_lines << "\noutput:\n" << output; } @@ -149,7 +149,7 @@ TEST(MemoryTrackingResources, SamplingParallelThreads) constexpr int kNumThreads = 64; constexpr int kNumIters = 200; constexpr std::size_t kAllocSize = 256 * 1024; // 256 KiB - + std::ostringstream oss; { raft::resources res; @@ -167,17 +167,18 @@ TEST(MemoryTrackingResources, SamplingParallelThreads) for (int t = 0; t < kNumThreads; ++t) { workers.emplace_back(run, t); } - for (auto& w : workers) w.join(); + for (auto& w : workers) + w.join(); } std::string output = oss.str(); - auto num_lines = std::count(output.begin(), output.end(), '\n'); + auto num_lines = std::count(output.begin(), output.end(), '\n'); // Sampling approach drops many rows const int max_num_rows = kNumThreads * kNumIters * 2 + 1; EXPECT_TRUE(3 < num_lines && num_lines < max_num_rows) - << "Expected at least several rows. It is expected when many rows are dropped; got " << num_lines - << "\noutput:\n" + << "Expected at least several rows. It is expected when many rows are dropped; got " + << num_lines << "\noutput:\n" << output; }