Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void memory_gc() {
}

void Daemon::memory_maintenance_thread() {
doris::enable_profile_counter_check = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling the check only on memory_maintenance_thread() does not cover the other path that refreshes the same process-memory counters: /profile calls ProcessProfile::refresh_profile() from an HTTP worker thread (be/src/service/http/default_path_handlers.cpp, line 192), which reaches MemoryProfile::refresh_memory_overview_profile(). That code sets UntrackedMemory = VmRSS - all_tracked_mem_sum, and there is no invariant that sampled RSS is always greater than or equal to tracked bytes. In debug builds the new HighWaterMarkCounter::set() DCHECK will still fire on that path, so this line only hides the crash on one thread instead of fixing the generic issue.

while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
// step 1. Refresh process memory metrics.
Expand Down
21 changes: 8 additions & 13 deletions be/src/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ class Scanner {
Status _do_projections(Block* origin_block, Block* output_block);

private:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void _start_wait_worker_timer() {
_watch.reset();
_watch.start();
}

void _start_scan_cpu_timer() {
_cpu_watch.reset();
_cpu_watch.start();
Expand All @@ -144,19 +137,21 @@ class Scanner {
void _update_scan_cpu_timer();

public:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void start_wait_worker_timer() {
_watch.reset();
_watch.start();
}

void resume() {
_update_wait_worker_timer();
_start_scan_cpu_timer();
}
void pause() {
_update_scan_cpu_timer();
_start_wait_worker_timer();
start_wait_worker_timer();
}
// Called when submitting the scanner to the thread pool queue.
// Only starts the wait timer without touching the CPU timer, because the CPU
// timer uses CLOCK_THREAD_CPUTIME_ID which must be read on the same thread
// that started it.
void start_queue_wait() { _start_wait_worker_timer(); }
int64_t get_time_cost_ns() const { return _per_scanner_timer; }

int64_t projection_time() const { return _projection_timer; }
Expand Down
13 changes: 6 additions & 7 deletions be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
}

scan_task->set_state(ScanTask::State::IN_FLIGHT);
scanner_delegate->_scanner->start_queue_wait();
// Only starts the wait timer without touching the CPU timer, because the CPU
// timer uses CLOCK_THREAD_CPUTIME_ID which must be read on the same thread
// that started it.
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
auto work_func = [scanner_ref = scan_task, ctx]() {
Expand Down Expand Up @@ -164,13 +167,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
max_run_time_watch.start();
scanner->resume();

bool need_update_profile = true;
auto update_scanner_profile = [&]() {
if (need_update_profile) {
scanner->pause();
scanner->update_realtime_counters();
need_update_profile = false;
}
scanner->pause();
scanner->update_realtime_counters();
};

Status status = Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/sort/heap_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class HeapSorter final : public Sorter {

private:
void _do_filter(MergeSortCursorImpl& block, size_t num_rows);
size_t _data_size = 0;
int64_t _data_size = 0;
size_t _heap_size = 0;
size_t _queue_row_num = 0;
MergeSorterQueue _queue;
Expand Down
6 changes: 3 additions & 3 deletions be/src/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ class OrcReader : public GenericReader {
bool _enable_filter_by_min_max = true;

std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;
size_t _decimal_scale_params_index = 0;

bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter;
Expand Down Expand Up @@ -926,8 +926,8 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
io::FileReaderSPtr _tracing_file_reader;

bool _is_all_tiny_stripes = false;
int64_t _orc_once_max_read_bytes;
int64_t _orc_max_merge_distance_bytes;
int64_t _orc_once_max_read_bytes = 0;
int64_t _orc_max_merge_distance_bytes = 0;

std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;

Expand Down
2 changes: 1 addition & 1 deletion be/src/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class ParquetReader : public GenericReader {
RowGroupReader::RowGroupIndex _current_row_group_index {-1, 0, 0};
// read to the end of current reader
bool _row_group_eof = true;
size_t _total_groups; // num of groups(stripes) of a parquet(orc) file
size_t _total_groups = 0; // num of groups(stripes) of a parquet(orc) file

std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/cache/result_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PartitionRowBatch {
private:
int64_t _partition_key;
PCacheValue* _cache_value = nullptr;
size_t _data_size;
int64_t _data_size;
CacheStat _cache_stat;
};

Expand Down Expand Up @@ -175,7 +175,7 @@ class ResultNode {
UniqueId _sql_key;
ResultNode* _prev = nullptr;
ResultNode* _next = nullptr;
size_t _data_size;
int64_t _data_size;
PartitionRowBatchList _partition_list;
PartitionRowBatchMap _partition_map;
};
Expand Down
14 changes: 11 additions & 3 deletions be/src/runtime/memory/memory_reclamation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "runtime/memory/memory_reclamation.h"

#include <algorithm>
#include <unordered_map>

#include "runtime/exec_env.h"
Expand Down Expand Up @@ -117,8 +118,12 @@ int64_t MemoryReclamation::revoke_tasks_memory(
} else {
keep_wait_cancelling_tasks.push_back(
resource_ctx->task_controller()->debug_string());
COUNTER_UPDATE(freed_memory_counter,
resource_ctx->memory_context()->current_memory_bytes());
// Memory tracker consumption can be slightly negative due to
// concurrent batched tracking; clamp to 0 for freed memory accounting.
COUNTER_UPDATE(
freed_memory_counter,
std::max(int64_t(0),
resource_ctx->memory_context()->current_memory_bytes()));
}
is_filtered = true;
}
Expand Down Expand Up @@ -157,8 +162,11 @@ int64_t MemoryReclamation::revoke_tasks_memory(
if (ActionFuncImpl[action](resource_ctx.get(),
Status::MemoryLimitExceeded(task_revoke_reason))) {
this_time_revoked_tasks.push_back(resource_ctx->task_controller()->debug_string());
// Memory tracker consumption can be slightly negative due to
// concurrent batched tracking; clamp to 0 for freed memory accounting.
COUNTER_UPDATE(freed_memory_counter,
resource_ctx->memory_context()->current_memory_bytes());
std::max(int64_t(0),
resource_ctx->memory_context()->current_memory_bytes()));
COUNTER_UPDATE(this_time_revoked_tasks_counter, 1);
if (freed_memory_counter->value() > need_free_mem) {
break;
Expand Down
85 changes: 82 additions & 3 deletions be/src/runtime/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace doris {
class TRuntimeProfileNode;
class TRuntimeProfileTree;
class RuntimeProfileCounterTreeNode;

inline thread_local bool enable_profile_counter_check = true;
// Some macro magic to generate unique ids using __COUNTER__
#define CONCAT_IMPL(x, y) x##y
#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
Expand Down Expand Up @@ -189,15 +189,65 @@ class RuntimeProfile {

virtual Counter* clone() const { return new Counter(type(), value(), _level); }

virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }
virtual void update(int64_t delta) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
// Using memory_order_seq_cst to make sure no concurrency issues, it may affect
// performance. So that only in debug mode, we check the counter value.
_value.fetch_add(delta, std::memory_order_seq_cst);
#else
_value.fetch_add(delta, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
if (delta < 0) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " delta: " << delta << " prev_value: " << prev_value;
}
}
#endif
#endif
}

void bit_or(int64_t delta) { _value.fetch_or(delta, std::memory_order_relaxed); }

virtual void set(int64_t value) { _value.store(value, std::memory_order_relaxed); }
virtual void set(int64_t value) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(value, std::memory_order_seq_cst);
#else
_value.store(value, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
#endif
}

virtual void set(double value) {
DCHECK_EQ(sizeof(value), sizeof(int64_t));
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(binary_cast<double, int64_t>(value), std::memory_order_seq_cst);
#else
_value.store(binary_cast<double, int64_t>(value), std::memory_order_relaxed);
#endif
#ifndef NDEBUG
(void)prev_value;
#if !defined(BE_TEST)
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
#endif
}

virtual int64_t value() const { return _value.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -265,10 +315,20 @@ class RuntimeProfile {
}

void add(int64_t delta) {
#ifndef NDEBUG
current_value_.fetch_add(delta, std::memory_order_seq_cst);
if (delta > 0) {
UpdateMax(current_value_);
}
//if (enable_profile_counter_check) {
// DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L);
//}
#else
current_value_.fetch_add(delta, std::memory_order_relaxed);
if (delta > 0) {
UpdateMax(current_value_);
}
#endif
}
virtual void update(int64_t delta) override { add(delta); }

Expand Down Expand Up @@ -328,8 +388,27 @@ class RuntimeProfile {
}

void set(int64_t v) override {
#ifndef NDEBUG
int64_t prev_value = current_value_.load(std::memory_order_seq_cst);
int64_t prev_max_value = _value.load(std::memory_order_seq_cst);
current_value_.store(v, std::memory_order_seq_cst);
#else
current_value_.store(v, std::memory_order_relaxed);
#endif
UpdateMax(v);
#ifndef NDEBUG
(void)prev_value;
(void)prev_max_value;
#if !defined(BE_TEST)

if (enable_profile_counter_check) {
DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L)
<< " prev_value: " << prev_value;
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " prev_max_value: " << prev_max_value << " prev_value: " << prev_value;
}
#endif
#endif
}

int64_t current_value() const { return current_value_.load(std::memory_order_relaxed); }
Expand Down
Loading
Loading