Skip to content
Open
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");
// Enable task executor in internal table scan.
DEFINE_Bool(enable_task_executor_in_internal_table, "true");
// Enable task executor in external table scan.
DEFINE_Bool(enable_task_executor_in_external_table, "true");
DEFINE_Bool(enable_task_executor_in_external_table, "false");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flips the default external-table scan scheduler from TaskExecutorSimplifiedScanScheduler to ThreadPoolSimplifiedScanScheduler for every workload group (be/src/runtime/workload_group/workload_group.cpp, lines 580-588), but none of the scanner/counter fixes in this PR depend on that behavior anymore. Merging this silently disables the task-executor path for all external scans without any justification or coverage in the PR.

Suggested change
DEFINE_Bool(enable_task_executor_in_external_table, "false");
DEFINE_Bool(enable_task_executor_in_external_table, "true");


// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void memory_gc() {
}

void Daemon::memory_maintenance_thread() {
doris::enable_profile_counter_check = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling the check only on memory_maintenance_thread() does not cover the other path that refreshes the same process-memory counters: /profile calls ProcessProfile::refresh_profile() from an HTTP worker thread (be/src/service/http/default_path_handlers.cpp, line 192), which reaches MemoryProfile::refresh_memory_overview_profile(). That code sets UntrackedMemory = VmRSS - all_tracked_mem_sum, and there is no invariant that sampled RSS is always greater than or equal to tracked bytes. In debug builds the new HighWaterMarkCounter::set() DCHECK will still fire on that path, so this line only hides the crash on one thread instead of fixing the generic issue.

while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
// step 1. Refresh process memory metrics.
Expand Down
21 changes: 8 additions & 13 deletions be/src/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ class Scanner {
Status _do_projections(Block* origin_block, Block* output_block);

private:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void _start_wait_worker_timer() {
_watch.reset();
_watch.start();
}

void _start_scan_cpu_timer() {
_cpu_watch.reset();
_cpu_watch.start();
Expand All @@ -144,19 +137,21 @@ class Scanner {
void _update_scan_cpu_timer();

public:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void start_wait_worker_timer() {
_watch.reset();
_watch.start();
}

void resume() {
_update_wait_worker_timer();
_start_scan_cpu_timer();
}
void pause() {
_update_scan_cpu_timer();
_start_wait_worker_timer();
start_wait_worker_timer();
}
// Called when submitting the scanner to the thread pool queue.
// Only starts the wait timer without touching the CPU timer, because the CPU
// timer uses CLOCK_THREAD_CPUTIME_ID which must be read on the same thread
// that started it.
void start_queue_wait() { _start_wait_worker_timer(); }
int64_t get_time_cost_ns() const { return _per_scanner_timer; }

int64_t projection_time() const { return _projection_timer; }
Expand Down
13 changes: 6 additions & 7 deletions be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
}

scan_task->set_state(ScanTask::State::IN_FLIGHT);
scanner_delegate->_scanner->start_queue_wait();
// Only starts the wait timer without touching the CPU timer, because the CPU
// timer uses CLOCK_THREAD_CPUTIME_ID which must be read on the same thread
// that started it.
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
auto work_func = [scanner_ref = scan_task, ctx]() {
Expand Down Expand Up @@ -164,13 +167,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
max_run_time_watch.start();
scanner->resume();

bool need_update_profile = true;
auto update_scanner_profile = [&]() {
if (need_update_profile) {
scanner->pause();
scanner->update_realtime_counters();
need_update_profile = false;
}
scanner->pause();
scanner->update_realtime_counters();
};

Status status = Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/sort/heap_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class HeapSorter final : public Sorter {

private:
void _do_filter(MergeSortCursorImpl& block, size_t num_rows);
size_t _data_size = 0;
int64_t _data_size = 0;
size_t _heap_size = 0;
size_t _queue_row_num = 0;
MergeSorterQueue _queue;
Expand Down
6 changes: 3 additions & 3 deletions be/src/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ class OrcReader : public GenericReader {
bool _enable_filter_by_min_max = true;

std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;
size_t _decimal_scale_params_index = 0;

bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter;
Expand Down Expand Up @@ -926,8 +926,8 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
io::FileReaderSPtr _tracing_file_reader;

bool _is_all_tiny_stripes = false;
int64_t _orc_once_max_read_bytes;
int64_t _orc_max_merge_distance_bytes;
int64_t _orc_once_max_read_bytes = 0;
int64_t _orc_max_merge_distance_bytes = 0;

std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;

Expand Down
2 changes: 1 addition & 1 deletion be/src/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class ParquetReader : public GenericReader {
RowGroupReader::RowGroupIndex _current_row_group_index {-1, 0, 0};
// read to the end of current reader
bool _row_group_eof = true;
size_t _total_groups; // num of groups(stripes) of a parquet(orc) file
size_t _total_groups = 0; // num of groups(stripes) of a parquet(orc) file

std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/cache/result_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PartitionRowBatch {
private:
int64_t _partition_key;
PCacheValue* _cache_value = nullptr;
size_t _data_size;
int64_t _data_size;
CacheStat _cache_stat;
};

Expand Down Expand Up @@ -175,7 +175,7 @@ class ResultNode {
UniqueId _sql_key;
ResultNode* _prev = nullptr;
ResultNode* _next = nullptr;
size_t _data_size;
int64_t _data_size;
PartitionRowBatchList _partition_list;
PartitionRowBatchMap _partition_map;
};
Expand Down
14 changes: 11 additions & 3 deletions be/src/runtime/memory/memory_reclamation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "runtime/memory/memory_reclamation.h"

#include <algorithm>
#include <unordered_map>

#include "runtime/exec_env.h"
Expand Down Expand Up @@ -117,8 +118,12 @@ int64_t MemoryReclamation::revoke_tasks_memory(
} else {
keep_wait_cancelling_tasks.push_back(
resource_ctx->task_controller()->debug_string());
COUNTER_UPDATE(freed_memory_counter,
resource_ctx->memory_context()->current_memory_bytes());
// Memory tracker consumption can be slightly negative due to
// concurrent batched tracking; clamp to 0 for freed memory accounting.
COUNTER_UPDATE(
freed_memory_counter,
std::max(int64_t(0),
resource_ctx->memory_context()->current_memory_bytes()));
}
is_filtered = true;
}
Expand Down Expand Up @@ -157,8 +162,11 @@ int64_t MemoryReclamation::revoke_tasks_memory(
if (ActionFuncImpl[action](resource_ctx.get(),
Status::MemoryLimitExceeded(task_revoke_reason))) {
this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string());
// Memory tracker consumption can be slightly negative due to
// concurrent batched tracking; clamp to 0 for freed memory accounting.
COUNTER_UPDATE(freed_memory_counter,
resource_ctx->memory_context()->current_memory_bytes());
std::max(int64_t(0),
resource_ctx->memory_context()->current_memory_bytes()));
COUNTER_UPDATE(this_time_revoked_tasks_counter, 1);
if (freed_memory_counter->value() > need_free_mem) {
break;
Expand Down
85 changes: 82 additions & 3 deletions be/src/runtime/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace doris {
class TRuntimeProfileNode;
class TRuntimeProfileTree;
class RuntimeProfileCounterTreeNode;

inline thread_local bool enable_profile_counter_check = true;
// Some macro magic to generate unique ids using __COUNTER__
#define CONCAT_IMPL(x, y) x##y
#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
Expand Down Expand Up @@ -189,15 +189,65 @@ class RuntimeProfile {

virtual Counter* clone() const { return new Counter(type(), value(), _level); }

virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }
virtual void update(int64_t delta) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
// Using memory_order_seq_cst to make sure no concurrency issues, it may affect
// performance. So that only in debug mode, we check the counter value.
_value.fetch_add(delta, std::memory_order_seq_cst);
#else
_value.fetch_add(delta, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
if (delta < 0) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " delta: " << delta << " prev_value: " << prev_value;
}
}
#endif
#endif
}

void bit_or(int64_t delta) { _value.fetch_or(delta, std::memory_order_relaxed); }

virtual void set(int64_t value) { _value.store(value, std::memory_order_relaxed); }
virtual void set(int64_t value) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(value, std::memory_order_seq_cst);
#else
_value.store(value, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
#endif
}

virtual void set(double value) {
DCHECK_EQ(sizeof(value), sizeof(int64_t));
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(binary_cast<double, int64_t>(value), std::memory_order_seq_cst);
#else
_value.store(binary_cast<double, int64_t>(value), std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
#endif
}

virtual int64_t value() const { return _value.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -265,10 +315,20 @@ class RuntimeProfile {
}

void add(int64_t delta) {
#ifndef NDEBUG
current_value_.fetch_add(delta, std::memory_order_seq_cst);
if (delta > 0) {
UpdateMax(current_value_);
}
//if (enable_profile_counter_check) {
// DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L);
//}
#else
current_value_.fetch_add(delta, std::memory_order_relaxed);
if (delta > 0) {
UpdateMax(current_value_);
}
#endif
}
virtual void update(int64_t delta) override { add(delta); }

Expand Down Expand Up @@ -328,8 +388,27 @@ class RuntimeProfile {
}

void set(int64_t v) override {
#ifndef NDEBUG
int64_t prev_value = current_value_.load(std::memory_order_seq_cst);
int64_t prev_max_value = _value.load(std::memory_order_seq_cst);
current_value_.store(v, std::memory_order_seq_cst);
#else
current_value_.store(v, std::memory_order_relaxed);
#endif
UpdateMax(v);
#ifndef NDEBUG
(void)prev_value;
(void)prev_max_value;
#if !defined(BE_TEST)

if (enable_profile_counter_check) {
DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L)
<< " prev_value: " << prev_value;
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " prev_max_value: " << prev_max_value << " prev_value: " << prev_value;
}
#endif
#endif
}

int64_t current_value() const { return current_value_.load(std::memory_order_relaxed); }
Expand Down
Loading
Loading