-
Notifications
You must be signed in to change notification settings - Fork 28
MOD-15578 Track shared SVS thread pool memory & expose it through public API #972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
3c2023a
b8ffc81
20d0e09
5f329df
ea2296e
b40e042
5f7ec7a
4d5456f
9d082f0
fda6a43
7c8b871
bd88757
ddc00ec
31a8026
b17a7b8
a923fa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -407,35 +407,73 @@ class VecSimSVSThreadPoolImpl { | |
| std::vector<ThreadSlot *> slots_; | ||
| }; | ||
|
|
||
| // Allocator type for the slots vector. | ||
| using SlotPtr = std::shared_ptr<ThreadSlot>; | ||
| using SlotVecAllocator = VecsimSTLAllocator<SlotPtr>; | ||
|
|
||
| // Create a pool with `num_threads` total parallelism (including the calling thread). | ||
| // Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1. | ||
| // In write-in-place mode, the pool is created with num_threads == 1 (0 worker threads, | ||
| // only the calling thread participates). | ||
| // Private — use instance() to access the shared singleton. | ||
| explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) { | ||
| explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) | ||
| : allocator_(VecSimAllocator::newVecsimAllocator()), slots_(SlotVecAllocator(allocator_)) { | ||
| assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads"); | ||
| slots_.reserve(num_threads - 1); | ||
| for (size_t i = 0; i < num_threads - 1; ++i) { | ||
| slots_.push_back(std::make_shared<ThreadSlot>()); | ||
| slots_.push_back( | ||
| std::allocate_shared<ThreadSlot>(VecsimSTLAllocator<ThreadSlot>(allocator_))); | ||
| } | ||
| } | ||
|
|
||
| // Set to true the first time instance() constructs the singleton. Allows other | ||
| // code paths (e.g., global stats reporting) to query whether the pool has been | ||
| // touched without forcing its lazy construction. | ||
| static std::atomic<bool> &initialized_flag() { | ||
| static std::atomic<bool> flag{false}; | ||
| return flag; | ||
| } | ||
|
|
||
| public: | ||
| // Singleton accessor for the shared SVS thread pool. | ||
| // Always valid — initialized with size 1 (write-in-place mode: 0 worker threads, | ||
| // only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls. | ||
| static std::shared_ptr<VecSimSVSThreadPoolImpl> instance() { | ||
| static auto shared_pool = std::shared_ptr<VecSimSVSThreadPoolImpl>( | ||
| new VecSimSVSThreadPoolImpl(1), [](VecSimSVSThreadPoolImpl *) { /* leak at exit */ }); | ||
| static auto shared_pool = [] { | ||
| auto p = std::shared_ptr<VecSimSVSThreadPoolImpl>( | ||
| new VecSimSVSThreadPoolImpl(1), | ||
| [](VecSimSVSThreadPoolImpl *) { /* leak at exit */ }); | ||
| initialized_flag().store(true, std::memory_order_release); | ||
| return p; | ||
| }(); | ||
| return shared_pool; | ||
| } | ||
|
|
||
| // Returns true iff instance() has ever been called (singleton constructed). | ||
| static bool isInitialized() { return initialized_flag().load(std::memory_order_acquire); } | ||
|
|
||
| // Total parallelism: worker slots + 1 (the calling thread always participates). | ||
| size_t size() const { | ||
| std::lock_guard lock{pool_mutex_}; | ||
| return slots_.size() + 1; | ||
| } | ||
|
|
||
| // Bytes currently allocated through the pool's internal allocator (the slots vector | ||
| // and the ThreadSlot objects). Does not include allocations performed by SVS itself | ||
| // outside of the pool, nor per-index wrapper state. | ||
| size_t getAllocationSize() const { return allocator_->getAllocationSize(); } | ||
|
|
||
| // Bytes allocated by the shared pool singleton. Returns 0 if the singleton has | ||
| // never been constructed (e.g., no SVS index was ever created and | ||
| // VecSim_UpdateThreadPoolSize was never called). Safe to call from any context; | ||
| // does not force singleton construction. | ||
| static size_t getSharedAllocationSize() { | ||
| if (!isInitialized()) { | ||
| return 0; | ||
| } | ||
| return instance()->getAllocationSize(); | ||
| } | ||
|
|
||
| // Physically resize the pool. Creates new OS threads on grow, shuts down idle threads | ||
| // on shrink. new_size is total parallelism including the calling thread (minimum 1). | ||
| // Occupied threads (held by renters) survive shrink via the deferred-resize protocol — | ||
|
|
@@ -599,7 +637,8 @@ class VecSimSVSThreadPoolImpl { | |
| // Grow (or same size): apply immediately, cancel any pending deferred shrink. | ||
| deferred_size_.reset(); | ||
| for (size_t i = slots_.size(); i < target_workers; ++i) { | ||
| slots_.push_back(std::make_shared<ThreadSlot>()); | ||
| slots_.push_back( | ||
| std::allocate_shared<ThreadSlot>(VecsimSTLAllocator<ThreadSlot>(allocator_))); | ||
| } | ||
| } else { | ||
| // Shrink. | ||
|
|
@@ -615,8 +654,9 @@ class VecSimSVSThreadPoolImpl { | |
| } | ||
| } | ||
|
|
||
| std::shared_ptr<VecSimAllocator> allocator_; // pool's own allocator for memory tracking | ||
| mutable std::mutex pool_mutex_; | ||
| std::vector<std::shared_ptr<ThreadSlot>> slots_; | ||
| std::vector<SlotPtr, SlotVecAllocator> slots_; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't using vecsim_stl::vector simplify the whole thing?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — slots_ is now |
||
| size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight | ||
| std::optional<size_t> deferred_size_; // resize target deferred until pending_jobs_ == 0 | ||
| }; | ||
|
|
@@ -646,9 +686,14 @@ class VecSimSVSThreadPool { | |
| // parallelism_ starts at 1 (the calling thread always participates), matching the | ||
| // pool's minimum size. Safe for immediate use in write-in-place mode without an | ||
| // explicit setParallelism() call. | ||
| explicit VecSimSVSThreadPool(void *log_ctx = nullptr) | ||
| // parallelism_ is allocated through the provided VecsimAllocator so that the | ||
| // allocation is tracked by the index's memory accounting. | ||
| explicit VecSimSVSThreadPool(const std::shared_ptr<VecSimAllocator> &allocator, | ||
| void *log_ctx = nullptr) | ||
| : pool_(VecSimSVSThreadPoolImpl::instance()), | ||
| parallelism_(std::make_shared<std::atomic<size_t>>(1)), log_ctx_(log_ctx) {} | ||
| parallelism_(std::allocate_shared<std::atomic<size_t>>( | ||
| VecsimSTLAllocator<std::atomic<size_t>>(allocator), size_t{1})), | ||
| log_ctx_(log_ctx) {} | ||
|
|
||
| // Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance(). | ||
| static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); } | ||
|
|
@@ -677,6 +722,11 @@ class VecSimSVSThreadPool { | |
| // Shared pool size — used by scheduling to decide how many reserve jobs to submit. | ||
| static size_t poolSize() { return VecSimSVSThreadPoolImpl::instance()->size(); } | ||
|
|
||
| // See VecSimSVSThreadPoolImpl::getSharedAllocationSize(). | ||
| static size_t getSharedAllocationSize() { | ||
| return VecSimSVSThreadPoolImpl::getSharedAllocationSize(); | ||
| } | ||
|
|
||
| // Delegates to the shared pool's parallel_for, passing the per-index log context. | ||
| // n may be less than parallelism_ when the problem size is smaller than the | ||
| // thread count (SVS computes n = min(arg.size(), pool.size())). | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,6 +183,18 @@ VecSimIndexBasicInfo VecSimIndex_BasicInfo(VecSimIndex *index); | |
| */ | ||
| VecSimIndexStatsInfo VecSimIndex_StatsInfo(VecSimIndex *index); | ||
|
|
||
| /** | ||
| * @brief Return process-wide VecSim statistics that are not tied to any single index. | ||
| * Currently exposes the memory used by the shared SVS thread pool singleton. | ||
| * Safe to call without holding any index lock; does not force initialization of the | ||
| * shared SVS pool (returns 0 in fields whose backing singleton has not been touched). | ||
| * | ||
| * @return Total bytes currently allocated by VecSim outside any single index | ||
| * (e.g. the shared SVS thread pool singleton). 0 if no such allocations | ||
| * have been made. | ||
| */ | ||
| size_t VecSim_GetGlobalMemory(void); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Problematic name, implies that it also includes the per-index memory |
||
|
|
||
| /** | ||
| * @brief Returns an info iterator for generic reply purposes. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3362,6 +3362,60 @@ TEST(SVSTest, NumThreadsParamIgnored) { | |
| VecSimIndexInterface::logCallback = nullptr; | ||
| } | ||
|
|
||
| // SVS debug info exposes both: | ||
| // * GLOBAL_MEMORY — top-level field appended by VecSimIndex_DebugInfoIterator | ||
| // (mirrors VecSim_GetGlobalMemory()). | ||
| // * SHARED_SVS_THREADPOOL_MEMORY — emitted by SVSIndex::debugInfoIterator(). | ||
| // They are sourced from the same VecSimSVSThreadPool::getSharedAllocationSize() | ||
| // (the only contributor to global memory today), so their values must match. | ||
| TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) { | ||
| // Ensure the shared SVS thread pool singleton has allocated memory so both | ||
| // fields report a non-zero value. resize() always lazy-initializes the singleton. | ||
| VecSim_UpdateThreadPoolSize(2); | ||
| ASSERT_GT(VecSim_GetGlobalMemory(), 0u); | ||
|
|
||
| size_t dim = 4; | ||
| SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; | ||
| VecSimIndex *index = this->CreateNewIndex(params); | ||
| ASSERT_INDEX(index); | ||
|
|
||
| VecSimDebugInfoIterator *infoIterator = VecSimIndex_DebugInfoIterator(index); | ||
|
|
||
| bool seen_global = false; | ||
| bool seen_shared = false; | ||
| uint64_t global_value = 0; | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| uint64_t shared_value = 0; | ||
| while (VecSimDebugInfoIterator_HasNextField(infoIterator)) { | ||
| VecSim_InfoField *f = VecSimDebugInfoIterator_NextField(infoIterator); | ||
| if (!strcmp(f->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) { | ||
| ASSERT_FALSE(seen_global) << "GLOBAL_MEMORY appears more than once"; | ||
| ASSERT_EQ(f->fieldType, INFOFIELD_UINT64); | ||
| global_value = f->fieldValue.uintegerValue; | ||
| seen_global = true; | ||
| } else if (!strcmp(f->fieldName, | ||
| VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING)) { | ||
| ASSERT_FALSE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY appears more than once"; | ||
| ASSERT_EQ(f->fieldType, INFOFIELD_UINT64); | ||
| shared_value = f->fieldValue.uintegerValue; | ||
| seen_shared = true; | ||
| } | ||
| } | ||
| EXPECT_TRUE(seen_global) << "GLOBAL_MEMORY field missing from SVS debug info"; | ||
| EXPECT_TRUE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY field missing from SVS debug info"; | ||
| EXPECT_EQ(global_value, shared_value) | ||
| << "GLOBAL_MEMORY and SHARED_SVS_THREADPOOL_MEMORY should report the same bytes " | ||
| "(only the SVS thread pool contributes to VecSim global memory today)"; | ||
| EXPECT_EQ(global_value, VecSim_GetGlobalMemory()); | ||
|
|
||
| VecSimDebugInfoIterator_Free(infoIterator); | ||
| VecSimIndex_Free(index); | ||
|
|
||
| // Reset the shared singleton pool to size 1 so the next test is not affected. | ||
| // Use VecSimSVSThreadPool::resize(1) directly (matching other thread-pool tests) | ||
| // to avoid the write-mode side effect that VecSim_UpdateThreadPoolSize(0) carries. | ||
| VecSimSVSThreadPool::resize(1); | ||
| } | ||
|
|
||
|
Comment on lines
+3402
to
+3404
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a validation that the tracked memory actually increased/decreased as expected (rather than just that both APIs return the same value) ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a new test |
||
| #else // HAVE_SVS | ||
|
|
||
| TEST(SVSTest, svs_not_supported) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one pair was added no? How come it increased by 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually +1 for the new field and +2 because the old 23 was already wrong.
Before the change the real count was 25 (1 ALGORITHM + 9 from addCommonInfoToIterator + 15 SVS-specific), but the comment just said "update this when needed" and it looks like nobody had.
So adding the shared-memory field bumped the correct count to 26, not 24.
I added a little breakdown in the comment so it doesn't drift again. Worth noting it's only a reserve() hint, not a hard size, so the stale value wasn't actually causing any bug — just an extra realloc — which is probably why it slipped by.