diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index bb6c5e3bf8bc74..b1c22b3bbe131e 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -440,6 +440,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, bool* eos) { if (auto rows = block->rows()) { _num_rows_returned += rows; + _state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows); } } diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h index 4b566a14e99833..30db32bcd1eb7a 100644 --- a/be/src/runtime/workload_management/io_context.h +++ b/be/src/runtime/workload_management/io_context.h @@ -44,6 +44,7 @@ class IOContext : public std::enable_shared_from_this { // number rows returned by query. // only set once by result sink when closing. RuntimeProfile::Counter* returned_rows_counter_; + RuntimeProfile::Counter* process_rows_counter_; RuntimeProfile::Counter* shuffle_send_bytes_counter_; RuntimeProfile::Counter* shuffle_send_rows_counter_; @@ -62,6 +63,7 @@ class IOContext : public std::enable_shared_from_this { bytes_write_into_cache_counter_ = ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES); returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT); + process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT); shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES); shuffle_send_rows_counter_ = ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT); @@ -93,6 +95,7 @@ class IOContext : public std::enable_shared_from_this { return stats_.bytes_write_into_cache_counter_->value(); } int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); } + int64_t process_rows() const { return stats_.process_rows_counter_->value(); } int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); } int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); } @@ -116,6 +119,7 @@ class IOContext : public std::enable_shared_from_this { stats_.bytes_write_into_cache_counter_->update(delta); } void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); } + void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); } void update_shuffle_send_bytes(int64_t delta) const { stats_.shuffle_send_bytes_counter_->update(delta); } diff --git a/be/src/runtime/workload_management/resource_context.cpp b/be/src/runtime/workload_management/resource_context.cpp index 12c14ccdd093d4..e6d46a7cc502ca 100644 --- a/be/src/runtime/workload_management/resource_context.cpp +++ b/be/src/runtime/workload_management/resource_context.cpp @@ -30,6 +30,7 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c statistics->__set_scan_bytes(io_context()->scan_bytes()); statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS); statistics->__set_returned_rows(io_context()->returned_rows()); + statistics->__set_process_rows(io_context()->process_rows()); statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes()); statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes()); statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java deleted file mode 100644 index 27b7e673d389d5..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.util.QueryStatisticsFormatter; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.Comparator; -import java.util.List; - -/* - * show proc "/current_queries/{query_id}/fragments" - * set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". - */ -public class CurrentQueryFragmentProcNode implements ProcNodeInterface { - private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class); - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("FragmentId").add("InstanceId").add("Host") - .add("ScanBytes").add("ProcessRows").build(); - private QueryStatisticsItem item; - - public CurrentQueryFragmentProcNode(QueryStatisticsItem item) { - this.item = item; - } - - @Override - public ProcResult fetchResult() throws AnalysisException { - return requestFragmentExecInfos(); - } - - private ProcResult requestFragmentExecInfos() throws AnalysisException { - final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Collection instanceStatisticsCollection - = provider.getInstanceStatistics(item); - final List> sortedRowData = Lists.newArrayList(); - for (CurrentQueryInfoProvider.InstanceStatistics instanceStatistics : - instanceStatisticsCollection) { - final List rowData = Lists.newArrayList(); - rowData.add(instanceStatistics.getFragmentId()); - rowData.add(instanceStatistics.getInstanceId().toString()); - rowData.add(instanceStatistics.getAddress().toString()); - if (item.getIsReportSucc()) { - rowData.add(QueryStatisticsFormatter.getScanBytes( - instanceStatistics.getScanBytes())); - rowData.add(QueryStatisticsFormatter.getRowsReturned( - instanceStatistics.getRowsReturned())); - } else { - rowData.add("N/A"); - rowData.add("N/A"); - } - sortedRowData.add(rowData); - } - - // sort according to explain's fragment index - sortedRowData.sort(new Comparator>() { - @Override - public int compare(List l1, List l2) { - return l1.get(0).compareTo(l2.get(0)); - } - }); - final BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES.asList()); - result.setRows(sortedRowData); - return result; - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java deleted file mode 100644 index de7247ab3ab8da..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ /dev/null @@ -1,200 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.profile.Counter; -import org.apache.doris.common.profile.RuntimeProfile; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.qe.QueryStatisticsItem; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * Provide running query's statistics. - */ -public class CurrentQueryInfoProvider { - private static final Logger LOG = LogManager.getLogger(CurrentQueryInfoProvider.class); - - public CurrentQueryInfoProvider() { - } - - /** - * get Counters from Coordinator's RuntimeProfile and return query's statistics. - * - * @param item - * @return - * @throws AnalysisException - */ - public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws AnalysisException { - return new QueryStatistics(item.getQueryProfile()); - } - - /** - * - * @param items - * @return - * @throws AnalysisException - */ - public Map getQueryStatistics(Collection items) { - final Map queryStatisticsMap = Maps.newHashMap(); - for (QueryStatisticsItem item : items) { - queryStatisticsMap.put(item.getQueryId(), new QueryStatistics(item.getQueryProfile())); - } - return queryStatisticsMap; - } - - /** - * Return query's instances statistics. - * - * @param item - * @return - * @throws AnalysisException - */ - public Collection getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException { - final Map instanceProfiles = collectInstanceProfile(item.getQueryProfile()); - final List instanceStatisticsList = Lists.newArrayList(); - for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { - final RuntimeProfile instanceProfile - = instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId())); - Preconditions.checkNotNull(instanceProfile); - final InstanceStatistics Statistics = - new InstanceStatistics( - instanceInfo.getFragmentId(), - instanceInfo.getInstanceId(), - instanceInfo.getAddress(), - instanceProfile); - instanceStatisticsList.add(Statistics); - } - return instanceStatisticsList; - } - - /** - * Profile trees is query profile -> fragment profile -> instance profile .... - * @param queryProfile - * @return instanceProfiles - */ - private Map collectInstanceProfile(RuntimeProfile queryProfile) { - final Map instanceProfiles = Maps.newHashMap(); - for (RuntimeProfile fragmentProfile : queryProfile.getChildMap().values()) { - for (Map.Entry entry : fragmentProfile.getChildMap().entrySet()) { - Preconditions.checkState(instanceProfiles.put( - parseInstanceId(entry.getKey()), entry.getValue()) == null); - } - } - return instanceProfiles; - } - - /** - * Instance profile key is "Instance ${instance_id} (host=$host $port)" - * @param str - * @return - */ - private String parseInstanceId(String str) { - final String[] elements = str.split(" "); - if (elements.length == 4) { - return elements[1]; - } else { - Preconditions.checkState(false); - return ""; - } - } - - public static class QueryStatistics { - final List> counterMaps; - - public QueryStatistics(RuntimeProfile profile) { - counterMaps = Lists.newArrayList(); - collectCounters(profile, counterMaps); - } - - private void collectCounters(RuntimeProfile profile, - List> counterMaps) { - for (Map.Entry entry : profile.getChildMap().entrySet()) { - counterMaps.add(entry.getValue().getCounterMap()); - collectCounters(entry.getValue(), counterMaps); - } - } - - public long getScanBytes() { - long scanBytes = 0; - for (Map counters : counterMaps) { - final Counter counter = counters.get("CompressedBytesRead"); - scanBytes += counter == null ? 0 : counter.getValue(); - } - return scanBytes; - } - - public long getRowsReturned() { - long rowsReturned = 0; - for (Map counters : counterMaps) { - final Counter counter = counters.get("RowsReturned"); - rowsReturned += counter == null ? 0 : counter.getValue(); - } - return rowsReturned; - } - } - - public static class InstanceStatistics { - private final String fragmentId; - private final TUniqueId instanceId; - private final TNetworkAddress address; - private final QueryStatistics statistics; - - public InstanceStatistics( - String fragmentId, - TUniqueId instanceId, - TNetworkAddress address, - RuntimeProfile profile) { - this.fragmentId = fragmentId; - this.instanceId = instanceId; - this.address = address; - this.statistics = new QueryStatistics(profile); - } - - public String getFragmentId() { - return fragmentId; - } - - public TUniqueId getInstanceId() { - return instanceId; - } - - public TNetworkAddress getAddress() { - return address; - } - - public long getRowsReturned() { - return statistics.getRowsReturned(); - } - - public long getScanBytes() { - return statistics.getScanBytes(); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java deleted file mode 100644 index 2d21b656ef3ff9..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.util.List; - -/* - * show proc "/current_queries/{query_id}" - */ -public class CurrentQuerySqlProcDir implements ProcDirInterface { - - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("Sql").build(); - - private final QueryStatisticsItem item; - - public CurrentQuerySqlProcDir(QueryStatisticsItem item) { - this.item = item; - } - - @Override - public boolean register(String name, ProcNodeInterface node) { - return false; - } - - @Override - public ProcNodeInterface lookup(String name) throws AnalysisException { - if (Strings.isNullOrEmpty(name)) { - return null; - } - - if (!name.equals("fragments")) { - throw new AnalysisException(name + " doesn't exist."); - } - - return new CurrentQueryFragmentProcNode(item); - } - - @Override - public ProcResult fetchResult() throws AnalysisException { - final BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES.asList()); - final List values = Lists.newArrayList(); - values.add(item.getSql()); - result.addRow(values); - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java deleted file mode 100644 index 746726c90514b3..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.commons.codec.digest.DigestUtils; - -import java.util.List; -import java.util.Map; - -/* - * show proc "/current_query_stmts" - */ -public class CurrentQueryStatementsProcNode implements ProcNodeInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User") - .add("ExecTime").add("SqlHash").add("Statement").build(); - - private static final int EXEC_TIME_INDEX = 5; - - @Override - public ProcResult fetchResult() throws AnalysisException { - final BaseProcResult result = new BaseProcResult(); - final Map statistic = - QeProcessorImpl.INSTANCE.getQueryStatistics(); - result.setNames(TITLE_NAMES.asList()); - final List> sortedRowData = Lists.newArrayList(); - - for (QueryStatisticsItem item : statistic.values()) { - final List values = Lists.newArrayList(); - values.add(item.getQueryId()); - values.add(item.getConnId()); - values.add(item.getCatalog()); - values.add(item.getDb()); - values.add(item.getUser()); - values.add(item.getQueryExecTime()); - values.add(DigestUtils.md5Hex(item.getSql())); - values.add(item.getSql()); - sortedRowData.add(values); - } - - // sort according to ExecTime - sortedRowData.sort((l1, l2) -> { - final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX)); - final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX)); - return Long.compare(execTime2, execTime1); - }); - result.setRows(sortedRowData); - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index 0df36c0040f953..4d06429d91b3a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -21,24 +21,32 @@ import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryStatisticsItem; +import org.apache.doris.thrift.TQueryStatistics; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.codec.digest.DigestUtils; import java.util.List; import java.util.Map; /* * show proc "/current_queries" - * only set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". + * the statistics is same as the data in audit log. */ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { + // ProcessRows temp used for doris manager compatibility, will be implemented future. public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User") - .add("ScanBytes").add("ProcessRows").add("ExecTime").build(); + .add("ExecTime").add("SqlHash").add("Statement") + .add("ScanRows").add("ScanBytes").add("ProcessRows").add("CpuMs") + .add("MaxPeakMemoryBytes").add("CurrentUsedMemoryBytes").add("WorkloadGroupId") + .add("ShuffleSendBytes").add("ShuffleSendRows") + .add("ScanBytesFromLocalStorage").add("ScanBytesFromRemoteStorage") + .add("SpillWriteBytesToLocalStorage").add("SpillReadBytesFromLocalStorage") + .add("BytesWriteIntoCache").build(); - private static final int EXEC_TIME_INDEX = 7; + private static final int EXEC_TIME_INDEX = 5; @Override public boolean register(String name, ProcNodeInterface node) { @@ -47,15 +55,7 @@ public boolean register(String name, ProcNodeInterface node) { @Override public ProcNodeInterface lookup(String name) throws AnalysisException { - if (Strings.isNullOrEmpty(name)) { - return null; - } - final Map statistic = QeProcessorImpl.INSTANCE.getQueryStatistics(); - final QueryStatisticsItem item = statistic.get(name); - if (item == null) { - throw new AnalysisException(name + " doesn't exist."); - } - return new CurrentQuerySqlProcDir(item); + throw new AnalysisException("operation doesn't support."); } @Override @@ -65,32 +65,34 @@ public ProcResult fetchResult() throws AnalysisException { QeProcessorImpl.INSTANCE.getQueryStatistics(); result.setNames(TITLE_NAMES.asList()); final List> sortedRowData = Lists.newArrayList(); - - final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Map statisticsMap - = provider.getQueryStatistics(statistic.values()); for (QueryStatisticsItem item : statistic.values()) { final List values = Lists.newArrayList(); + final TQueryStatistics queryStatistics = item.getQueryStatistics(); values.add(item.getQueryId()); values.add(item.getConnId()); values.add(item.getCatalog()); values.add(item.getDb()); values.add(item.getUser()); - if (item.getIsReportSucc()) { - final CurrentQueryInfoProvider.QueryStatistics statistics - = statisticsMap.get(item.getQueryId()); - values.add(QueryStatisticsFormatter.getScanBytes( - statistics.getScanBytes())); - values.add(QueryStatisticsFormatter.getRowsReturned( - statistics.getRowsReturned())); - } else { - values.add("N/A"); - values.add("N/A"); - } values.add(item.getQueryExecTime()); + values.add(DigestUtils.md5Hex(item.getSql())); + values.add(item.getSql()); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getScanRows())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getProcessRows())); + values.add(String.valueOf(queryStatistics.getCpuMs())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getMaxPeakMemoryBytes())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getCurrentUsedMemoryBytes())); + values.add(String.valueOf(queryStatistics.getWorkloadGroupId())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getShuffleSendBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getShuffleSendRows())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromRemoteStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillWriteBytesToLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillReadBytesFromLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getBytesWriteIntoCache())); sortedRowData.add(values); } - // sort according to ExecTime + sortedRowData.sort((l1, l2) -> { final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX)); final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index 42010ccbd204ae..a1f54901bde833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -49,7 +49,7 @@ private ProcService() { root.register("trash", new TrashProcDir()); root.register("monitor", new MonitorProcDir()); root.register("current_queries", new CurrentQueryStatisticsProcDir()); - root.register("current_query_stmts", new CurrentQueryStatementsProcNode()); + root.register("current_query_stmts", new CurrentQueryStatisticsProcDir()); root.register("current_backend_instances", new CurrentQueryBackendInstanceProcDir()); root.register("cluster_balance", new ClusterBalanceProcDir()); root.register("cluster_health", new ClusterHealthProcDir()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java index b5039bcd7f4359..624e0ca8ddfe4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java @@ -54,11 +54,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -/** - * It is accessed by two kinds of thread, one is to create this RuntimeProfile - * , named 'query thread', the other is to call - * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}. - */ public class RuntimeProfile { // TODO: 这里维护性太差了 // BE 上的 OperatorXBase::init 里面有 Operator 的命名规则 diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 5480858efbf0e0..e619a81326e985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -23,7 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; -import org.apache.doris.common.proc.CurrentQueryStatementsProcNode; +import org.apache.doris.common.proc.CurrentQueryStatisticsProcDir; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.common.profile.ProfileManager; import org.apache.doris.common.profile.ProfileManager.ProfileElement; @@ -450,7 +450,7 @@ private void checkAuthByUserAndQueryId(String queryId) throws AuthenticationExce } /** - * return the result of CurrentQueryStatementsProcNode. + * return the result of CurrentQueryStatisticsProcDir. * * @param request * @param response @@ -480,15 +480,15 @@ public Object currentQueries(HttpServletRequest request, HttpServletResponse res LOG.warn("parse query info error: {}", data, e); } } - List titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES); + List titles = Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES); titles.add(0, FRONTEND); return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, queries)); } else { try { - CurrentQueryStatementsProcNode node = new CurrentQueryStatementsProcNode(); + CurrentQueryStatisticsProcDir node = new CurrentQueryStatisticsProcDir(); ProcResult result = node.fetchResult(); // add frontend info at first column. - List titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES); + List titles = Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES); titles.add(0, FRONTEND); List> rows = result.getRows(); String feIp = FrontendOptions.getLocalHostAddress(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 275dc234706f4c..ff023aeb9394de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -29,6 +29,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryProfile; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TStatus; @@ -212,6 +213,8 @@ public void unregisterQuery(TUniqueId queryId) { @Override public Map getQueryStatistics() { final Map querySet = Maps.newHashMap(); + final Map queryStatisticsMap = + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().getQueryStatisticsMap(); for (Map.Entry entry : coordinatorMap.entrySet()) { final QueryInfo info = entry.getValue(); final ConnectContext context = info.getConnectContext(); @@ -219,12 +222,14 @@ public Map getQueryStatistics() { continue; } final String queryIdStr = DebugUtil.printId(info.getConnectContext().queryId()); + final TQueryStatistics queryStatistics = queryStatisticsMap.get(queryIdStr); final QueryStatisticsItem item = new QueryStatisticsItem.Builder().queryId(queryIdStr) .queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser()) .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase()) .catalog(context.getDefaultCatalog()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) .profile(info.getCoord().getExecutionProfile().getRoot()) + .queryStatistics(queryStatistics) .isReportSucc(context.getSessionVariable().enableProfile()).build(); querySet.put(queryIdStr, item); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index c51ff24ca14f26..f879903e65a17c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -19,6 +19,7 @@ import org.apache.doris.common.profile.RuntimeProfile; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -38,6 +39,8 @@ public final class QueryStatisticsItem { // root query profile private final RuntimeProfile queryProfile; private final boolean isReportSucc; + // query statistics same as statistics in audit log + private final TQueryStatistics queryStatistics; private QueryStatisticsItem(Builder builder) { this.queryId = builder.queryId; @@ -50,6 +53,7 @@ private QueryStatisticsItem(Builder builder) { this.fragmentInstanceInfos = builder.fragmentInstanceInfos; this.queryProfile = builder.queryProfile; this.isReportSucc = builder.isReportSucc; + this.queryStatistics = builder.queryStatistics; } public String getDb() { @@ -97,6 +101,10 @@ public boolean getIsReportSucc() { return isReportSucc; } + public TQueryStatistics getQueryStatistics() { + return queryStatistics; + } + public static final class Builder { private String queryId; private String catalog; @@ -108,6 +116,7 @@ public static final class Builder { private List fragmentInstanceInfos; private RuntimeProfile queryProfile; private boolean isReportSucc; + private TQueryStatistics queryStatistics; public Builder() { fragmentInstanceInfos = Lists.newArrayList(); @@ -163,6 +172,11 @@ public Builder isReportSucc(boolean isReportSucc) { return this; } + public Builder queryStatistics(TQueryStatistics queryStatistics) { + this.queryStatistics = queryStatistics; + return this; + } + public QueryStatisticsItem build() { initDefaultValue(this); return new QueryStatisticsItem(this); @@ -192,6 +206,10 @@ private void initDefaultValue(Builder builder) { if (queryProfile == null) { queryProfile = new RuntimeProfile(""); } + + if (queryStatistics == null) { + queryStatistics = new TQueryStatistics(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index cc31e7858e0852..4fdc252ca9f4bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -20,8 +20,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TQueryStatisticsResult; import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; @@ -48,6 +50,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); + // backend id --> {query id --> (query last report time, query stats)} private Map beToQueryStatsMap = Maps.newConcurrentMap(); private final ReentrantLock queryAuditEventLock = new ReentrantLock(); private List queryAuditEventList = Lists.newLinkedList(); @@ -60,6 +63,7 @@ private class BeReportInfo { this.beLastReportTime = beLastReportTime; } + // query id --> (query last report time, query stats) Map> queryStatsMap = Maps.newConcurrentMap(); } @@ -110,6 +114,13 @@ protected void runAfterCatalogReady() { clearReportTimeoutBeStatistics(); } + // After the query or insert finished, FE will not audit immediately, it will send an audit + // event to this queue. And the worker thread will handle it. If the queue is full, the event + // will be handled immediately and may miss some statistic info. So the statistic info of audit + // event may be not accurate, but it can avoid the case that FE OOM because of too many audit + // events in queue when QPS is high. The event will be logged directly if the queue is full. + // And the worker thread will get an event from the queue and get the statistic info for this + // event from queryStatisticsMap. public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { @@ -121,9 +132,9 @@ public void submitFinishQueryToAudit(AuditEvent event) { // if queryAuditEventList is full, we don't put the event to queryAuditEventList. // so that the statistic info of this audit event will be ignored, // and event will be logged directly. - LOG.warn("audit log event queue size {} is full, this may cause audit log missing statistics." - + "you can check whether qps is too high or " - + "set audit_event_log_queue_size to a larger value in fe.conf. query id: {}", + LOG.warn("audit log event queue size {} is full, this may cause audit log missing " + + "statistics. you can check whether qps is too high or set " + + "audit_event_log_queue_size to a larger value in fe.conf. query id: {}", queryAuditEventList.size(), event.queryId); } Env.getCurrentAuditEventProcessor().handleAuditEvent(event); @@ -186,7 +197,7 @@ public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) { } } - void clearReportTimeoutBeStatistics() { + private void clearReportTimeoutBeStatistics() { // 1 clear report timeout be Set currentBeIdSet = beToQueryStatsMap.keySet(); Long currentTime = System.currentTimeMillis(); @@ -200,13 +211,30 @@ void clearReportTimeoutBeStatistics() { for (String queryId : queryIdSet) { Pair pair = beReportInfo.queryStatsMap.get(queryId); long queryLastReportTime = pair.first; - if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + boolean timeout = currentTime - queryLastReportTime + > Config.be_report_query_statistics_timeout_ms; + // Remove query statistics only when both conditions are satisfied: + // 1) this query statistics is timeout, and + // 2) FE no longer has this query in QeProcessorImpl. + // Example timeline: + // - t0: query q1 is still running, but one periodic BE report is delayed for > timeout. + // - t1: clear thread runs. timeout condition is true, but q1 still exists in FE. + // - t2: we keep q1 statistics instead of removing it; later reports can update it again. + if (timeout && isQueryNotExistInFe(queryId)) { beReportInfo.queryStatsMap.remove(queryId); } } } } + private boolean isQueryNotExistInFe(String queryId) { + try { + return QeProcessorImpl.INSTANCE.getCoordinator(DebugUtil.parseTUniqueIdFromString(queryId)) == null; + } catch (NumberFormatException e) { + return true; + } + } + // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap // so there is no need lock or null check when visit beToQueryStatsMap public Map getQueryStatisticsMap() { @@ -255,6 +283,13 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatisticsResult s dst.cpu_ms += srcStats.cpu_ms; dst.shuffle_send_bytes += srcStats.shuffle_send_bytes; dst.shuffle_send_rows += srcStats.shuffle_send_rows; + dst.process_rows += srcStats.process_rows; + if (dst.current_used_memory_bytes < srcStats.current_used_memory_bytes) { + dst.current_used_memory_bytes = srcStats.current_used_memory_bytes; + } + if (dst.workload_group_id <= 0 && srcStats.workload_group_id > 0) { + dst.workload_group_id = srcStats.workload_group_id; + } if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) { dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1d8f6e6bbd9f3c..7b9677ba59a80b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -202,6 +202,7 @@ struct TQueryStatistics { 12: optional i64 spill_write_bytes_to_local_storage 13: optional i64 spill_read_bytes_from_local_storage 14: optional i64 bytes_write_into_cache + 15: optional i64 process_rows } struct TQueryStatisticsResult {