From 3620acccef0a028c1a682ee624a58a11b34dfd7d Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Tue, 14 Apr 2026 12:51:02 +0800 Subject: [PATCH] [refactor](be) Split paused query handling logic Issue Number: None Related PR: None Problem Summary: Refactor WorkloadGroupMgr::handle_paused_queries by splitting cleanup, revoke-resume, per-workload-group processing, and memory-release paths into focused helpers while preserving the existing paused query behavior. None - Test: Unit Test - `./run-be-ut.sh --run --filter=WorkloadGroupManagerTest.* -j${DORIS_PARALLELISM}` - Behavior changed: No - Does this need documentation: No --- .../workload_group/workload_group_manager.cpp | 549 +++++++++--------- .../workload_group/workload_group_manager.h | 129 +++- .../workload_group_manager_test.cpp | 9 + 3 files changed, 426 insertions(+), 261 deletions(-) diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 94e92c50fdced4..f7abad1c0c55e1 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -310,27 +310,21 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr& * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: */ void WorkloadGroupMgr::handle_paused_queries() { - { - std::shared_lock r_lock(_group_mutex); - std::unique_lock lock(_paused_queries_lock); - for (auto& [wg_id, wg] : _workload_groups) { - if (_paused_queries_list[wg].empty()) { - // Add an empty set to wg that not contains paused queries. - } - } + std::unique_lock lock(_paused_queries_lock); + auto recently_cancelled = cleanup_paused_queries_(); + if (resume_paused_queries_after_revoke_(recently_cancelled)) { + return; } + process_paused_queries_(recently_cancelled); +} - std::unique_lock lock(_paused_queries_lock); - bool has_recently_cancelled_query = false; - std::set wgs_with_recently_cancelled; +WorkloadGroupMgr::RecentlyCancelledQueries WorkloadGroupMgr::cleanup_paused_queries_() { + RecentlyCancelledQueries recently_cancelled; for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { - auto resource_ctx = query_it->resource_ctx_.lock(); - // The query is finished during in paused list. + auto resource_ctx = get_resource_ctx_or_erase_(queries_list, query_it); if (resource_ctx == nullptr) { - LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; - query_it = queries_list.erase(query_it); continue; } if (resource_ctx->task_controller()->is_cancelled()) { @@ -339,8 +333,8 @@ void WorkloadGroupMgr::handle_paused_queries() { // Recently cancelled — may still be releasing memory. Record it so that // PROCESS_MEMORY_EXCEEDED queries wait globally and // WORKLOAD_GROUP_MEMORY_EXCEEDED queries wait within the same WG. - has_recently_cancelled_query = true; - wgs_with_recently_cancelled.insert(it->first); + recently_cancelled.has_query = true; + recently_cancelled.workload_groups.insert(it->first); ++query_it; } else { // Cancelled long ago — no longer releasing memory. Remove it now to @@ -361,277 +355,303 @@ void WorkloadGroupMgr::handle_paused_queries() { ++it; } } + return recently_cancelled; +} +bool WorkloadGroupMgr::resume_paused_queries_after_revoke_( + const RecentlyCancelledQueries& recently_cancelled) { // In previous loop, some query is cancelled, and now there is no query in cancel list. Resume all paused queries. if (revoking_memory_from_other_query_) { - if (has_recently_cancelled_query) { + if (recently_cancelled.has_query) { // Still waiting for the cancelled query to release memory. - return; + return true; } - for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { - auto& queries_list = it->second; - for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { - auto resource_ctx = query_it->resource_ctx_.lock(); - // The query is finished during in paused list. - if (resource_ctx == nullptr) { - LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; - query_it = queries_list.erase(query_it); - continue; - } - if (resource_ctx->task_controller()->is_cancelled()) { - LOG(INFO) << "Query: " << query_it->query_id() - << " is already cancelled, erase it from paused list."; - query_it = queries_list.erase(query_it); - continue; - } + resume_all_paused_queries_(); + revoking_memory_from_other_query_ = false; + return true; + } + return false; +} + +void WorkloadGroupMgr::resume_all_paused_queries_() { + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto resource_ctx = get_resource_ctx_or_erase_(queries_list, query_it); + if (resource_ctx == nullptr) { + continue; + } + if (resource_ctx->task_controller()->is_cancelled()) { + LOG(INFO) << "Query: " << query_it->query_id() + << " is already cancelled, erase it from paused list."; + } else { LOG(INFO) << "Query " << print_id(resource_ctx->task_controller()->task_id()) << " is blocked due to process memory not enough, but already " "cancelled some queries, resume it now."; resource_ctx->task_controller()->set_memory_sufficient(true); - query_it = queries_list.erase(query_it); - } - if (queries_list.empty()) { - it = _paused_queries_list.erase(it); - } else { - ++it; } + query_it = queries_list.erase(query_it); + } + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + } else { + ++it; } - revoking_memory_from_other_query_ = false; - return; } +} +void WorkloadGroupMgr::process_paused_queries_(const RecentlyCancelledQueries& recently_cancelled) { for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; - auto query_count = queries_list.size(); const auto& wg = it->first; - if (query_count != 0) { - LOG_EVERY_T(INFO, 1) << "Paused queries count of workload group " << wg->name() << ": " - << query_count; + if (process_workload_group_paused_queries_(wg, queries_list, recently_cancelled)) { + return; } - bool has_changed_hard_limit = false; - bool exceed_low_watermark = false; - bool exceed_high_watermark = false; - wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); - // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. - // The query's memlimit is set using slot mechanism and its value is set using the user settings, not - // by weighted value. So if reserve failed, then it is actually exceed limit. - for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { - auto resource_ctx = query_it->resource_ctx_.lock(); - // The query is finished during in paused list. - if (resource_ctx == nullptr) { - LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; - query_it = queries_list.erase(query_it); - continue; - } + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } + // Finished deal with one workload group, and should deal with next one. + ++it; + } +} - // Recently cancelled queries are kept in the list by Phase 2 for - // delay-waiting tracking. Skip them here — they must not be processed - // (spilled/cancelled/resumed) by Phase 4. - if (resource_ctx->task_controller()->is_cancelled()) { - ++query_it; - continue; +bool WorkloadGroupMgr::process_workload_group_paused_queries_( + const WorkloadGroupPtr& wg, PausedQuerySet& queries_list, + const RecentlyCancelledQueries& recently_cancelled) { + auto query_count = queries_list.size(); + if (query_count != 0) { + LOG_EVERY_T(INFO, 1) << "Paused queries count of workload group " << wg->name() << ": " + << query_count; + } + + bool has_changed_hard_limit = false; + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto resource_ctx = get_resource_ctx_or_erase_(queries_list, query_it); + if (resource_ctx == nullptr) { + continue; + } + + // Recently cancelled queries are kept in the list by cleanup_paused_queries_() + // for delay-waiting tracking. Skip them here; they must not be processed + // (spilled/cancelled/resumed) by active-query handlers. + if (resource_ctx->task_controller()->is_cancelled()) { + ++query_it; + continue; + } + + if (resource_ctx->task_controller() + ->paused_reason() + .is()) { + if (handle_query_memory_exceeded_(queries_list, query_it, resource_ctx)) { + return true; } + } else if (resource_ctx->task_controller() + ->paused_reason() + .is()) { + if (handle_workload_group_memory_exceeded_(wg, queries_list, query_it, resource_ctx, + has_changed_hard_limit, exceed_low_watermark, + recently_cancelled)) { + return true; + } + } else if (handle_process_memory_exceeded_(wg, queries_list, query_it, resource_ctx, + recently_cancelled)) { + return true; + } + } + return false; +} - if (resource_ctx->task_controller() - ->paused_reason() - .is()) { - // Streamload, kafka load, group commit will never have query memory exceeded error because - // their query limit is very large. - bool spill_res = handle_single_query_( - resource_ctx, query_it->reserve_size_, query_it->elapsed_time(), - resource_ctx->task_controller()->paused_reason()); - if (!spill_res) { - ++query_it; - continue; - } else { - VLOG_DEBUG << "Query: " << print_id(resource_ctx->task_controller()->task_id()) - << " remove from paused list"; - query_it = queries_list.erase(query_it); - // The query is cancelled, just break. And wait for the query to release the memory. Other query maybe not need spill. - if (resource_ctx->task_controller()->is_cancelled()) { - revoking_memory_from_other_query_ = true; - return; - } - continue; - } - } else if (resource_ctx->task_controller() - ->paused_reason() - .is()) { - // For WORKLOAD_GROUP_MEMORY_EXCEEDED: only wait if a cancelled query in the - // SAME WG is releasing memory (WG memory pools are independent across WGs). - if (wgs_with_recently_cancelled.contains(wg)) { - ++query_it; - continue; - } - // here query is paused because of WORKLOAD_GROUP_MEMORY_EXCEEDED, - // wg of the current query may not actually exceed the limit, - // just (wg consumption + current query expected reserve memory > wg memory limit) - // if the current query memory consumption + expected reserve memory exceeds the limit, - // it may be that the expected reserve memory is too large, - // wg memory is insufficient at this time, - // so the current query should try to release memory by itself, - // but here we did not directly try to spill this query, - // set the query's limit only, and then wake up the current query to continue execution. - // - // if the expected reserve memory estimate is correct, high probability, - // query will enter the pause state again, the reason is expected to be QUERY_MEMORY_EXCEEDED, - // and handle_single_query_ will be called to spill. - // - // Of course, if the actual required memory is less than the reserved memory, - // or if there is enough memory when continuing to execute, - // it will run successfully without spilling. - if (resource_ctx->memory_context()->adjusted_mem_limit() < - resource_ctx->memory_context()->current_memory_bytes() + - query_it->reserve_size_) { - // The query not exceed the query limit, but exceed the expected query limit when the workload - // group memory is not enough, use the litter memory limit to let the query exceed query limit. - resource_ctx->memory_context()->set_mem_limit( - resource_ctx->memory_context()->adjusted_mem_limit()); - resource_ctx->task_controller()->set_memory_sufficient(true); - LOG(INFO) << "Workload group memory reserve failed because " - << resource_ctx->task_controller()->debug_string() << " reserve size " - << PrettyPrinter::print_bytes(query_it->reserve_size_) - << " is too large, set hard limit to " - << PrettyPrinter::print_bytes( - resource_ctx->memory_context()->adjusted_mem_limit()) - << " and resume running."; - query_it = queries_list.erase(query_it); - continue; - } +std::shared_ptr WorkloadGroupMgr::get_resource_ctx_or_erase_( + PausedQuerySet& queries_list, PausedQueryIterator& query_it) { + auto resource_ctx = query_it->resource_ctx_.lock(); + // The query is finished during in paused list. + if (resource_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + } + return resource_ctx; +} - // when running here, current query adjusted_mem_limit >= query memory consumption + reserve_size, - // which means that the current query itself has not exceeded the adjusted memory limit. - // - // this means that there must be queries in the wg of the current query whose memory exceeds - // adjusted_mem_limit, but these queries may not have entered the paused state, - // so these queries may not modify the mem limit and continue to execute - // when (adjusted_mem_limit < consumption + reserve_size_) is judged above. - // - // so call `update_queries_limit_` to force the update of the mem_limit of all queries - // in the wg of the current query to the adjusted_mem_limit, - // hoping that these queries that exceed limit will release memory. - if (!has_changed_hard_limit) { - update_queries_limit_(wg, true); - has_changed_hard_limit = true; - LOG(INFO) << "Query: " << print_id(resource_ctx->task_controller()->task_id()) - << " reserve memory(" - << PrettyPrinter::print_bytes(query_it->reserve_size_) - << ") failed due to workload group memory exceed, " - "should set the workload group work in memory insufficent mode, " - "so that other query will reduce their memory." - << " Query mem limit: " - << PrettyPrinter::print_bytes( - resource_ctx->memory_context()->mem_limit()) - << " mem usage: " - << PrettyPrinter::print_bytes( - resource_ctx->memory_context()->current_memory_bytes()) - << ", wg: " << wg->debug_string(); - } - if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { - // we not encourage not enable slot memory. - // - // If not enable slot memory policy, then should spill directly - // Maybe there are another query that use too much memory, if these queries - // exceed the memory limit, they will enter the paused state - // due to `QUERY_MEMORY_EXCEEDED` and will also try to spill. - // - // TODO should kill the query that exceed limit. - bool spill_res = handle_single_query_( - resource_ctx, query_it->reserve_size_, query_it->elapsed_time(), - resource_ctx->task_controller()->paused_reason()); - - if (!spill_res) { - ++query_it; - continue; - } else { - VLOG_DEBUG - << "Query: " << print_id(resource_ctx->task_controller()->task_id()) - << " remove from paused list"; - query_it = queries_list.erase(query_it); - // The query is cancelled, just break. And wait for the query to release the memory. Other query maybe not need spill. - if (resource_ctx->task_controller()->is_cancelled()) { - revoking_memory_from_other_query_ = true; - return; - } - continue; - } - } else { - // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, - // and then set wg's flag, other query may not free memory very quickly. - // If the workload group's memusage is less than low watermark then dispatch the query to run. - if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms || - !exceed_low_watermark) { - // set wg's memory to sufficient, then add it back to task scheduler to run. - LOG(INFO) << "Query: " - << print_id(resource_ctx->task_controller()->task_id()) - << " has waited in paused query queue for " - << query_it->elapsed_time() << " ms. Resume it."; - resource_ctx->task_controller()->set_memory_sufficient(true); - query_it = queries_list.erase(query_it); - continue; - } else { - ++query_it; - continue; - } - } - } else { - // PROCESS_MEMORY_EXCEEDED: a recently cancelled query anywhere may be releasing - // process-level memory, so wait globally before spilling or revoking. - if (has_recently_cancelled_query) { - ++query_it; - continue; - } - // If workload group's memory usage > min memory, then it means the workload group use too much memory - // in memory contention state. Should just spill - if (wg->total_mem_used() > wg->min_memory_limit()) { - bool spill_res = handle_single_query_( - resource_ctx, query_it->reserve_size_, query_it->elapsed_time(), - resource_ctx->task_controller()->paused_reason()); - if (!spill_res) { - ++query_it; - continue; - } else { - VLOG_DEBUG - << "Query: " << print_id(resource_ctx->task_controller()->task_id()) - << " remove from paused list"; - query_it = queries_list.erase(query_it); - // The query is cancelled, just break. And wait for the query to release the memory. Other query maybe not need spill. - if (resource_ctx->task_controller()->is_cancelled()) { - revoking_memory_from_other_query_ = true; - } - // If any query is cancelled or spilled to disk, we need to stop and not revoke memory from other queries. - return; - } - } +bool WorkloadGroupMgr::handle_query_memory_exceeded_( + PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + return release_query_memory_(queries_list, query_it, resource_ctx, false); +} - // Other workload groups many use a lot of memory, should revoke memory from other workload groups - // by cancelling their queries. - int64_t revoked_size = revoke_memory_from_other_groups_(); - if (revoked_size > 0) { - // Revoke memory from other workload groups will cancel some queries, wait them cancel finished - // and then check it again. - revoking_memory_from_other_query_ = true; - return; - } +bool WorkloadGroupMgr::handle_workload_group_memory_exceeded_( + const WorkloadGroupPtr& wg, PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, bool& has_changed_hard_limit, + bool exceed_low_watermark, const RecentlyCancelledQueries& recently_cancelled) { + // For WORKLOAD_GROUP_MEMORY_EXCEEDED: only wait if a cancelled query in the + // SAME WG is releasing memory (WG memory pools are independent across WGs). + if (recently_cancelled.workload_groups.contains(wg)) { + ++query_it; + return false; + } - // TODO revoke from memtable + // here query is paused because of WORKLOAD_GROUP_MEMORY_EXCEEDED, + // wg of the current query may not actually exceed the limit, + // just (wg consumption + current query expected reserve memory > wg memory limit) + // if the current query memory consumption + expected reserve memory exceeds the limit, + // it may be that the expected reserve memory is too large, + // wg memory is insufficient at this time, + // so the current query should try to release memory by itself, + // but here we did not directly try to spill this query, + // set the query's limit only, and then wake up the current query to continue execution. + // + // if the expected reserve memory estimate is correct, high probability, + // query will enter the pause state again, the reason is expected to be QUERY_MEMORY_EXCEEDED, + // and handle_single_query_ will be called to spill. + // + // Of course, if the actual required memory is less than the reserved memory, + // or if there is enough memory when continuing to execute, + // it will run successfully without spilling. + if (resource_ctx->memory_context()->adjusted_mem_limit() < + resource_ctx->memory_context()->current_memory_bytes() + query_it->reserve_size_) { + // The query not exceed the query limit, but exceed the expected query limit when the workload + // group memory is not enough, use the litter memory limit to let the query exceed query limit. + resource_ctx->memory_context()->set_mem_limit( + resource_ctx->memory_context()->adjusted_mem_limit()); + resource_ctx->task_controller()->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << resource_ctx->task_controller()->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes( + resource_ctx->memory_context()->adjusted_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + return false; + } - ++query_it; - } - } + // when running here, current query adjusted_mem_limit >= query memory consumption + reserve_size, + // which means that the current query itself has not exceeded the adjusted memory limit. + // + // this means that there must be queries in the wg of the current query whose memory exceeds + // adjusted_mem_limit, but these queries may not have entered the paused state, + // so these queries may not modify the mem limit and continue to execute + // when (adjusted_mem_limit < consumption + reserve_size_) is judged above. + // + // so call `update_queries_limit_` to force the update of the mem_limit of all queries + // in the wg of the current query to the adjusted_mem_limit, + // hoping that these queries that exceed limit will release memory. + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(resource_ctx->task_controller()->task_id()) + << " reserve memory(" << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory." + << " Query mem limit: " + << PrettyPrinter::print_bytes(resource_ctx->memory_context()->mem_limit()) + << " mem usage: " + << PrettyPrinter::print_bytes( + resource_ctx->memory_context()->current_memory_bytes()) + << ", wg: " << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { + // we not encourage not enable slot memory. + // + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, if these queries + // exceed the memory limit, they will enter the paused state + // due to `QUERY_MEMORY_EXCEEDED` and will also try to spill. + // + // TODO should kill the query that exceed limit. + return release_query_memory_(queries_list, query_it, resource_ctx, false); + } + + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + // If the workload group's memusage is less than low watermark then dispatch the query to run. + if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms || + !exceed_low_watermark) { + // set wg's memory to sufficient, then add it back to task scheduler to run. + LOG(INFO) << "Query: " << print_id(resource_ctx->task_controller()->task_id()) + << " has waited in paused query queue for " << query_it->elapsed_time() + << " ms. Resume it."; + resource_ctx->task_controller()->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + return false; + } + ++query_it; + return false; +} - // even if wg has no query in the paused state, the following code will still be executed - // because `handle_paused_queries` adds a to `_paused_queries_list` at the beginning. - if (queries_list.empty()) { - it = _paused_queries_list.erase(it); - continue; - } else { - // Finished deal with one workload group, and should deal with next one. - ++it; - } +bool WorkloadGroupMgr::handle_process_memory_exceeded_( + const WorkloadGroupPtr& wg, PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, + const RecentlyCancelledQueries& recently_cancelled) { + // PROCESS_MEMORY_EXCEEDED: a recently cancelled query anywhere may be releasing + // process-level memory, so wait globally before spilling or revoking. + if (recently_cancelled.has_query) { + ++query_it; + return false; + } + + // If workload group's memory usage > min memory, then it means the workload group use too much memory + // in memory contention state. Should just spill + if (wg->total_mem_used() > wg->min_memory_limit()) { + return release_query_memory_(queries_list, query_it, resource_ctx, true); + } + + // Other workload groups many use a lot of memory, should revoke memory from other workload groups + // by cancelling their queries. + int64_t revoked_size = revoke_memory_from_other_groups_(); + if (revoked_size > 0) { + // Revoke memory from other workload groups will cancel some queries, wait them cancel finished + // and then check it again. + revoking_memory_from_other_query_ = true; + return true; } + + // TODO revoke from memtable + + // Fallback: if we have waited too long and cannot revoke from anywhere, + // cancel the query or disable reserve memory to let it proceed. + if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms) { + // Cannot spill (no revocable memory), cannot revoke from other WGs, + // and process memory is still exceeded. Cancel the query to protect the system. + return release_query_memory_(queries_list, query_it, resource_ctx, true); + } + ++query_it; + return false; +} + +bool WorkloadGroupMgr::release_query_memory_(PausedQuerySet& queries_list, + PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, + bool stop_after_release) { + bool spill_res = + handle_single_query_(resource_ctx, query_it->reserve_size_, query_it->elapsed_time(), + resource_ctx->task_controller()->paused_reason()); + if (!spill_res) { + ++query_it; + return false; + } + + VLOG_DEBUG << "Query: " << print_id(resource_ctx->task_controller()->task_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + if (resource_ctx->task_controller()->is_cancelled()) { + // The query is cancelled, just break. And wait for the query to release the memory. + // Other query maybe not need spill. + revoking_memory_from_other_query_ = true; + return true; + } + return stop_after_release; } // Find the workload group that could revoke lot of memory: @@ -770,6 +790,15 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptrtask_controller()->set_memory_sufficient(true); return true; } else if (memory_usage <= limit) { + if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { + // Timed out waiting for WG memory to drop. Cancel the query. + Status error_status = Status::MemoryLimitExceeded( + "Query {} paused due to WG memory exceeded with no revocable memory, " + "timed out after {}ms waiting for memory recovery.", + query_id, time_in_queue); + requestor->task_controller()->cancel(error_status); + return true; + } LOG(INFO) << "Query: " << query_id << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, keep it paused. " << "Reserve size: " << PrettyPrinter::print_bytes(size_to_reserve) diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 4db43b50db12a3..6cd63e6e5fa409 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -18,6 +18,8 @@ #include +#include +#include #include #include @@ -96,13 +98,138 @@ class WorkloadGroupMgr { friend class ExecEnv; private: + using PausedQuerySet = std::set; + using PausedQueryIterator = PausedQuerySet::iterator; + + struct RecentlyCancelledQueries { + bool has_query = false; + std::set workload_groups; + }; + Status create_internal_wg(); WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info); + // ==================== handle_paused_queries() sub-routines ==================== + // + // handle_paused_queries() is the top-level entry called periodically. It runs + // four phases in order: + // Phase 1 (cleanup): cleanup_paused_queries_() + // Phase 2 (revoke): resume_paused_queries_after_revoke_() + // Phase 3 (process): process_paused_queries_() + // + // All Phase 1–3 helpers below are called with _paused_queries_lock held. + + // Phase 1 — Scan every paused query and build a summary of recently cancelled + // queries. Queries cancelled longer than `wait_cancel_release_memory_ms` ago are + // erased from the list. Empty per-WG entries are pruned. + // Returns a RecentlyCancelledQueries struct recording: + // - has_query: true if any recently cancelled query exists (process-level signal) + // - workload_groups: which WGs contain a recently cancelled query (WG-level signal) + RecentlyCancelledQueries cleanup_paused_queries_(); + + // Phase 2 — If a previous round cancelled a query and set + // `revoking_memory_from_other_query_`, check whether the cancelled query has + // finished releasing memory. If still releasing → return true (caller should + // exit early). If done → resume all remaining paused queries, reset the flag, + // and return true (caller should exit, work is complete for this round). + // Returns false when the revoking flag was not set, meaning Phase 3 should run. + bool resume_paused_queries_after_revoke_(const RecentlyCancelledQueries& recently_cancelled); + + // Helper for Phase 2 — Resume every non-cancelled paused query by calling + // set_memory_sufficient(true), then erase all entries from `_paused_queries_list`. + // Cancelled queries are simply erased without resuming. + void resume_all_paused_queries_(); + + // Phase 3 — Iterate over each workload group's paused queries and attempt to + // resolve them (spill, cancel, resume, or keep waiting). Delegates per-WG work + // to `process_workload_group_paused_queries_()`. If any per-WG handler signals + // early termination (returns true), this function returns immediately. + void process_paused_queries_(const RecentlyCancelledQueries& recently_cancelled); + + // Process all paused queries belonging to a single workload group. For each + // non-cancelled query, dispatches to the appropriate handler based on the + // paused reason (QUERY_MEMORY_EXCEEDED / WORKLOAD_GROUP_MEMORY_EXCEEDED / + // PROCESS_MEMORY_EXCEEDED). + // Returns true if the caller should stop processing further workload groups + // (e.g., a query was cancelled and we need to wait for memory release). + bool process_workload_group_paused_queries_(const WorkloadGroupPtr& wg, + PausedQuerySet& queries_list, + const RecentlyCancelledQueries& recently_cancelled); + + // Try to lock the weak_ptr in the PausedQuery entry. If the query context has + // already been destroyed (query finished while paused), erase the entry and + // advance the iterator. Returns nullptr in that case; otherwise returns the + // locked shared_ptr. + std::shared_ptr get_resource_ctx_or_erase_(PausedQuerySet& queries_list, + PausedQueryIterator& query_it); + + // Handle a query paused due to QUERY_MEMORY_EXCEEDED. Delegates to + // release_query_memory_() with stop_after_release=false, meaning other queries + // in the same WG will continue to be processed even after this one is resolved. + // Returns true if the caller should stop processing (query was cancelled). + bool handle_query_memory_exceeded_(PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx); + + // Handle a query paused due to WORKLOAD_GROUP_MEMORY_EXCEEDED. The logic is: + // 1. If a recently cancelled query exists in the SAME WG, skip (wait for it + // to release WG-level memory). + // 2. If adjusted_mem_limit < current_usage + reserve_size, lower the query's + // hard limit to adjusted_mem_limit and resume it — the query will likely + // re-pause with QUERY_MEMORY_EXCEEDED and spill then. + // 3. Otherwise, force-update all queries in the WG to use adjusted limits + // (once per WG via has_changed_hard_limit), hoping over-limit queries + // will voluntarily release memory. + // 4. If slot_memory_policy is NONE, spill directly via release_query_memory_. + // 5. If slot_memory_policy is active, wait until timeout or low watermark + // recovery before resuming. + // Returns true if the caller should stop processing further queries/WGs. + bool handle_workload_group_memory_exceeded_( + const WorkloadGroupPtr& wg, PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, bool& has_changed_hard_limit, + bool exceed_low_watermark, const RecentlyCancelledQueries& recently_cancelled); + + // Handle a query paused due to PROCESS_MEMORY_EXCEEDED. The logic is: + // 1. If any recently cancelled query exists globally, skip (wait for + // process-level memory release). + // 2. If the WG's usage exceeds its min_memory_limit, spill via + // release_query_memory_(stop_after_release=true) — one spill per round. + // 3. Otherwise, try to revoke memory from other overcommitted WGs by + // cancelling their largest queries. + // Returns true if the caller should stop processing further queries/WGs. + bool handle_process_memory_exceeded_(const WorkloadGroupPtr& wg, PausedQuerySet& queries_list, + PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, + const RecentlyCancelledQueries& recently_cancelled); + + // Common helper: attempt to release memory for a single paused query by calling + // handle_single_query_() (which triggers spill or cancel). On success, erases the + // query from the paused list. If the query was cancelled (not spilled), sets + // revoking_memory_from_other_query_ and returns true to signal early termination. + // `stop_after_release`: if true, return true even on successful spill (used by + // PROCESS_MEMORY_EXCEEDED to limit one spill per round); if false, return false + // to let the caller continue processing remaining queries. + bool release_query_memory_(PausedQuerySet& queries_list, PausedQueryIterator& query_it, + const std::shared_ptr& resource_ctx, + bool stop_after_release); + + // Attempt to resolve a single paused query: if revocable memory exists, trigger + // spill; if under limit, resume; if no memory can be freed, cancel the query or + // disable reserve memory and resume. Returns true if the query was acted upon + // (spilled/cancelled/resumed), false if it should keep waiting (e.g., still has + // running tasks). bool handle_single_query_(const std::shared_ptr& requestor, size_t size_to_reserve, int64_t time_in_queue, Status paused_reason); + + // Find the most overcommitted workload group (usage - min_memory_limit is + // largest) and cancel its biggest query to reclaim ~10% of the excess memory. + // Returns the amount of memory expected to be freed, or 0 if no WG qualifies. int64_t revoke_memory_from_other_groups_(); + + // Recalculate and apply per-query memory limits for all queries in a workload + // group based on the slot memory policy (NONE/FIXED/DYNAMIC). When + // `enable_hard_limit` is true, limits are tightened to the slot-weighted value + // even if the WG is below the low watermark (used during memory pressure). void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit); std::shared_mutex _group_mutex; @@ -113,7 +240,7 @@ class WorkloadGroupMgr { // Save per group paused query list, it should be a global structure, not per // workload group, because we need do some coordinate work globally. std::mutex _paused_queries_lock; - std::map> _paused_queries_list; + std::map _paused_queries_list; // If any query is cancelled when process memory is not enough, we set this to true. // When there is not query in cancel state, this var is set to false. bool revoking_memory_from_other_query_ = false; diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp b/be/test/runtime/workload_group/workload_group_manager_test.cpp index 45afdd95e6604f..0c1c06e086e213 100644 --- a/be/test/runtime/workload_group/workload_group_manager_test.cpp +++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp @@ -155,6 +155,15 @@ TEST_F(WorkloadGroupManagerTest, get_or_create_workload_group) { ASSERT_EQ(wg->id(), 0); } +TEST_F(WorkloadGroupManagerTest, handle_paused_queries_ignores_empty_workload_group) { + auto wg = _wg_manager->get_or_create_workload_group({}); + + _wg_manager->handle_paused_queries(); + + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_FALSE(_wg_manager->_paused_queries_list.contains(wg)); +} + // Query is paused due to query memlimit exceed, after waiting in queue for spill_in_paused_queue_timeout_ms // it should be resumed TEST_F(WorkloadGroupManagerTest, query_exceed) {