Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, bool* eos) {

if (auto rows = block->rows()) {
_num_rows_returned += rows;
_state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows);
}
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/workload_management/io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
// number rows returned by query.
// only set once by result sink when closing.
RuntimeProfile::Counter* returned_rows_counter_;
RuntimeProfile::Counter* process_rows_counter_;
RuntimeProfile::Counter* shuffle_send_bytes_counter_;
RuntimeProfile::Counter* shuffle_send_rows_counter_;

Expand All @@ -62,6 +63,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
bytes_write_into_cache_counter_ =
ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT);
shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
shuffle_send_rows_counter_ =
ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
Expand Down Expand Up @@ -93,6 +95,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
return stats_.bytes_write_into_cache_counter_->value();
}
int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
int64_t process_rows() const { return stats_.process_rows_counter_->value(); }
int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }

Expand All @@ -116,6 +119,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
stats_.bytes_write_into_cache_counter_->update(delta);
}
void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); }
void update_shuffle_send_bytes(int64_t delta) const {
stats_.shuffle_send_bytes_counter_->update(delta);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/workload_management/resource_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes(io_context()->scan_bytes());
statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
statistics->__set_returned_rows(io_context()->returned_rows());
statistics->__set_process_rows(io_context()->process_rows());
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading