From dbf02c0f3f9973ce8f4d66008d6d386f162ef1ff Mon Sep 17 00:00:00 2001 From: Innocent Date: Thu, 12 Feb 2026 09:07:12 -0700 Subject: [PATCH 1/3] feat: metrics reporting for scan and commit --- src/iceberg/CMakeLists.txt | 8 + src/iceberg/constants.h | 1 + src/iceberg/meson.build | 8 + src/iceberg/metrics/CMakeLists.txt | 18 + src/iceberg/metrics/commit_report.cc | 106 ++++ src/iceberg/metrics/commit_report.h | 155 +++++ src/iceberg/metrics/counter.cc | 51 ++ src/iceberg/metrics/counter.h | 100 +++ src/iceberg/metrics/json_serde.cc | 413 +++++++++++++ src/iceberg/metrics/json_serde.h | 62 ++ src/iceberg/metrics/meson.build | 31 + src/iceberg/metrics/metrics_context.cc | 79 +++ src/iceberg/metrics/metrics_context.h | 86 +++ src/iceberg/metrics/metrics_reporter.h | 100 +++ src/iceberg/metrics/metrics_reporters.cc | 142 +++++ src/iceberg/metrics/metrics_reporters.h | 116 ++++ src/iceberg/metrics/metrics_types.h | 63 ++ src/iceberg/metrics/scan_report.cc | 86 +++ src/iceberg/metrics/scan_report.h | 143 +++++ src/iceberg/metrics/timer.cc | 90 +++ src/iceberg/metrics/timer.h | 122 ++++ src/iceberg/test/CMakeLists.txt | 2 + src/iceberg/test/meson.build | 2 + src/iceberg/test/metrics_reporter_test.cc | 296 +++++++++ src/iceberg/test/metrics_test.cc | 701 ++++++++++++++++++++++ 25 files changed, 2981 insertions(+) create mode 100644 src/iceberg/metrics/CMakeLists.txt create mode 100644 src/iceberg/metrics/commit_report.cc create mode 100644 src/iceberg/metrics/commit_report.h create mode 100644 src/iceberg/metrics/counter.cc create mode 100644 src/iceberg/metrics/counter.h create mode 100644 src/iceberg/metrics/json_serde.cc create mode 100644 src/iceberg/metrics/json_serde.h create mode 100644 src/iceberg/metrics/meson.build create mode 100644 src/iceberg/metrics/metrics_context.cc create mode 100644 src/iceberg/metrics/metrics_context.h create mode 100644 src/iceberg/metrics/metrics_reporter.h create mode 100644 src/iceberg/metrics/metrics_reporters.cc create mode 100644 src/iceberg/metrics/metrics_reporters.h create mode 100644 src/iceberg/metrics/metrics_types.h create mode 100644 src/iceberg/metrics/scan_report.cc create mode 100644 src/iceberg/metrics/scan_report.h create mode 100644 src/iceberg/metrics/timer.cc create mode 100644 src/iceberg/metrics/timer.h create mode 100644 src/iceberg/test/metrics_reporter_test.cc create mode 100644 src/iceberg/test/metrics_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 68cacebeb..91a2915b8 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -58,6 +58,13 @@ set(ICEBERG_SOURCES manifest/v3_metadata.cc metadata_columns.cc metrics_config.cc + metrics/commit_report.cc + metrics/counter.cc + metrics/json_serde.cc + metrics/metrics_context.cc + metrics/metrics_reporters.cc + metrics/scan_report.cc + metrics/timer.cc name_mapping.cc partition_field.cc partition_spec.cc @@ -219,6 +226,7 @@ add_subdirectory(puffin) add_subdirectory(row) add_subdirectory(update) add_subdirectory(util) +add_subdirectory(metrics) if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h index 1d5941626..e443b292c 100644 --- a/src/iceberg/constants.h +++ b/src/iceberg/constants.h @@ -33,6 +33,7 @@ namespace iceberg { constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; constexpr int64_t kInvalidSnapshotId = -1; constexpr int64_t kInvalidSequenceNumber = -1; +constexpr int64_t kInvalidSchemaId = -1; /// \brief Stand-in for the current sequence number that will be assigned when the commit /// is successful. This is replaced when writing a manifest list by the ManifestFile /// adapter. diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 03dc24479..b010b96e5 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -79,6 +79,13 @@ iceberg_sources = files( 'manifest/v2_metadata.cc', 'manifest/v3_metadata.cc', 'metadata_columns.cc', + 'metrics/commit_report.cc', + 'metrics/counter.cc', + 'metrics/json_serde.cc', + 'metrics/metrics_context.cc', + 'metrics/metrics_reporters.cc', + 'metrics/scan_report.cc', + 'metrics/timer.cc', 'metrics_config.cc', 'name_mapping.cc', 'partition_field.cc', @@ -273,6 +280,7 @@ subdir('data') subdir('deletes') subdir('expression') subdir('manifest') +subdir('metrics') subdir('puffin') subdir('row') subdir('update') diff --git a/src/iceberg/metrics/CMakeLists.txt b/src/iceberg/metrics/CMakeLists.txt new file mode 100644 index 000000000..c097fb0ed --- /dev/null +++ b/src/iceberg/metrics/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/metrics) diff --git a/src/iceberg/metrics/commit_report.cc b/src/iceberg/metrics/commit_report.cc new file mode 100644 index 000000000..c2cac6f72 --- /dev/null +++ b/src/iceberg/metrics/commit_report.cc @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/commit_report.h" + +#include "iceberg/snapshot.h" +#include "iceberg/util/string_util.h" + +namespace iceberg { + +CommitMetrics CommitMetrics::Of(MetricsContext& context) { + CommitMetrics m; + m.total_duration = context.GetTimer("total-duration"); + m.attempts = context.GetCounter("attempts"); + return m; +} + +CommitMetrics CommitMetrics::Noop() { return CommitMetrics::Of(MetricsContext::Null()); } + +void CommitMetrics::PopulateResult(CommitMetricsResult& result) const { + result.total_duration = + total_duration ? TimerResult{.unit = std::string(total_duration->Unit()), + .count = total_duration->Count(), + .total_duration = total_duration->TotalDuration()} + : TimerResult{}; + result.attempts = + attempts ? CounterResult{.unit = attempts->Unit(), .value = attempts->Value()} + : CounterResult{}; +} + +CommitMetricsResult CommitMetricsResult::From( + const CommitMetrics& live_metrics, + const std::unordered_map& snapshot_summary) { + CommitMetricsResult result; + live_metrics.PopulateResult(result); + + // Helpers: parse a summary key and wrap as a typed CounterResult. + auto count_field = [&snapshot_summary](const std::string& key) -> CounterResult { + auto it = snapshot_summary.find(key); + if (it == snapshot_summary.end()) return {}; + auto parsed = StringUtils::ParseNumber(it->second); + return {.unit = CounterUnit::kCount, + .value = parsed.has_value() ? parsed.value() : 0}; + }; + auto bytes_field = [&snapshot_summary](const std::string& key) -> CounterResult { + auto it = snapshot_summary.find(key); + if (it == snapshot_summary.end()) return {.unit = CounterUnit::kBytes}; + auto parsed = StringUtils::ParseNumber(it->second); + return {.unit = CounterUnit::kBytes, + .value = parsed.has_value() ? parsed.value() : 0}; + }; + + result.added_data_files = count_field(SnapshotSummaryFields::kAddedDataFiles); + result.removed_data_files = count_field(SnapshotSummaryFields::kDeletedDataFiles); + result.total_data_files = count_field(SnapshotSummaryFields::kTotalDataFiles); + result.added_delete_files = count_field(SnapshotSummaryFields::kAddedDeleteFiles); + result.added_equality_delete_files = + count_field(SnapshotSummaryFields::kAddedEqDeleteFiles); + result.added_positional_delete_files = + count_field(SnapshotSummaryFields::kAddedPosDeleteFiles); + result.added_dvs = count_field(SnapshotSummaryFields::kAddedDVs); + result.removed_positional_delete_files = + count_field(SnapshotSummaryFields::kRemovedPosDeleteFiles); + result.removed_dvs = count_field(SnapshotSummaryFields::kRemovedDVs); + result.removed_equality_delete_files = + count_field(SnapshotSummaryFields::kRemovedEqDeleteFiles); + result.removed_delete_files = count_field(SnapshotSummaryFields::kRemovedDeleteFiles); + result.total_delete_files = count_field(SnapshotSummaryFields::kTotalDeleteFiles); + result.added_records = count_field(SnapshotSummaryFields::kAddedRecords); + result.removed_records = count_field(SnapshotSummaryFields::kDeletedRecords); + result.total_records = count_field(SnapshotSummaryFields::kTotalRecords); + result.added_files_size_bytes = bytes_field(SnapshotSummaryFields::kAddedFileSize); + result.removed_files_size_bytes = bytes_field(SnapshotSummaryFields::kRemovedFileSize); + result.total_files_size_bytes = bytes_field(SnapshotSummaryFields::kTotalFileSize); + result.added_positional_deletes = count_field(SnapshotSummaryFields::kAddedPosDeletes); + result.removed_positional_deletes = + count_field(SnapshotSummaryFields::kRemovedPosDeletes); + result.total_positional_deletes = count_field(SnapshotSummaryFields::kTotalPosDeletes); + result.added_equality_deletes = count_field(SnapshotSummaryFields::kAddedEqDeletes); + result.removed_equality_deletes = count_field(SnapshotSummaryFields::kRemovedEqDeletes); + result.total_equality_deletes = count_field(SnapshotSummaryFields::kTotalEqDeletes); + result.kept_manifest_count = count_field(SnapshotSummaryFields::kManifestsKept); + result.created_manifest_count = count_field(SnapshotSummaryFields::kManifestsCreated); + result.replaced_manifest_count = count_field(SnapshotSummaryFields::kManifestsReplaced); + result.processed_manifest_entries_count = + count_field(SnapshotSummaryFields::kEntriesProcessed); + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/commit_report.h b/src/iceberg/metrics/commit_report.h new file mode 100644 index 000000000..77a313615 --- /dev/null +++ b/src/iceberg/metrics/commit_report.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/metrics_types.h" // CounterResult, TimerResult, DurationNs +#include "iceberg/metrics/timer.h" + +namespace iceberg { + +// Forward declaration: CommitMetricsResult is defined later in this header. +struct CommitMetricsResult; + +/// \brief Live commit metrics collected during a table commit operation. +/// +/// Tracks the overall commit duration and retry count. File/record counts come +/// from the snapshot summary after the commit succeeds and are stored separately +/// in CommitMetricsResult. +class ICEBERG_EXPORT CommitMetrics { + public: + /// \brief Create a CommitMetrics instance backed by the given MetricsContext. + static CommitMetrics Of(MetricsContext& context); + + /// \brief Create a CommitMetrics instance with all-noop timer and counter. + static CommitMetrics Noop(); + + /// \brief Snapshot timer and counter values into the corresponding fields of result. + /// + /// Only total_duration and attempts are written; the caller is responsible for + /// populating the remaining snapshot-summary fields. + void PopulateResult(CommitMetricsResult& result) const; + + /// \brief Timer measuring total wall-clock time of the commit call. + std::shared_ptr total_duration; + + /// \brief Counter for the number of commit attempts (including retries). + std::shared_ptr attempts; +}; + +/// \brief Immutable snapshot of commit metrics for use in CommitReport. +struct ICEBERG_EXPORT CommitMetricsResult { + /// \brief Total wall-clock duration of the commit attempt. + TimerResult total_duration; + /// \brief Number of commit attempts (1 on success without retries). + CounterResult attempts; + /// \brief Number of data files added in this commit. + CounterResult added_data_files; + /// \brief Number of data files removed in this commit. + CounterResult removed_data_files; + /// \brief Total live data files after this commit. + CounterResult total_data_files; + /// \brief Number of delete files added in this commit. + CounterResult added_delete_files; + /// \brief Equality delete files added. + CounterResult added_equality_delete_files; + /// \brief Positional delete files added. + CounterResult added_positional_delete_files; + /// \brief Deletion vectors added. + CounterResult added_dvs; + /// \brief Positional delete files removed. + CounterResult removed_positional_delete_files; + /// \brief Deletion vectors removed. + CounterResult removed_dvs; + /// \brief Equality delete files removed. + CounterResult removed_equality_delete_files; + /// \brief Number of delete files removed in this commit. + CounterResult removed_delete_files; + /// \brief Total live delete files after this commit. + CounterResult total_delete_files; + /// \brief Number of records added in this commit. + CounterResult added_records; + /// \brief Number of records removed in this commit. + CounterResult removed_records; + /// \brief Total live records after this commit. + CounterResult total_records; + /// \brief Total byte size of files added. + CounterResult added_files_size_bytes; + /// \brief Total byte size of files removed. + CounterResult removed_files_size_bytes; + /// \brief Total byte size of all live files after this commit. + CounterResult total_files_size_bytes; + /// \brief Positional delete records added. + CounterResult added_positional_deletes; + /// \brief Positional delete records removed. + CounterResult removed_positional_deletes; + /// \brief Total positional delete records after this commit. + CounterResult total_positional_deletes; + /// \brief Equality delete records added. + CounterResult added_equality_deletes; + /// \brief Equality delete records removed. + CounterResult removed_equality_deletes; + /// \brief Total equality delete records after this commit. + CounterResult total_equality_deletes; + /// \brief Manifest files kept unchanged in this commit. + CounterResult kept_manifest_count; + /// \brief Manifest files created in this commit. + CounterResult created_manifest_count; + /// \brief Manifest files replaced in this commit. + CounterResult replaced_manifest_count; + /// \brief Manifest entries processed in this commit. + CounterResult processed_manifest_entries_count; + + bool operator==(const CommitMetricsResult&) const = default; + + /// \brief Build a CommitMetricsResult from live metrics and a snapshot summary map. + /// + /// Combines timer/retry measurements from \p live_metrics with records parsed + /// from \p snapshot_summary. Missing or unparseable summary keys default to 0. + static CommitMetricsResult From( + const CommitMetrics& live_metrics, + const std::unordered_map& snapshot_summary); +}; + +/// \brief Report generated after a commit operation. +/// +/// Contains metrics about the changes made in a commit. +struct ICEBERG_EXPORT CommitReport { + /// \brief The fully qualified name of the table that was modified. + std::string table_name; + /// \brief The snapshot ID created by this commit. + int64_t snapshot_id = kInvalidSnapshotId; + /// \brief The sequence number assigned to this commit. + int64_t sequence_number = kInvalidSequenceNumber; + /// \brief The operation that was performed (write, delete, etc.). + std::string operation; + /// \brief Metrics collected during the commit operation. + CommitMetricsResult commit_metrics; + /// \brief Additional key-value metadata. + std::unordered_map metadata; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/counter.cc b/src/iceberg/metrics/counter.cc new file mode 100644 index 000000000..fc165923b --- /dev/null +++ b/src/iceberg/metrics/counter.cc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/counter.h" + +namespace iceberg { + +namespace { + +class NoopCounter final : public Counter { + public: + void Increment() override {} + void Increment(int64_t) override {} + int64_t Value() const override { return 0; } + bool IsNoop() const override { return true; } +}; + +} // namespace + +Counter& Counter::Noop() { + static NoopCounter instance; + return instance; +} + +DefaultCounter::DefaultCounter(CounterUnit unit) : unit_(unit) {} + +void DefaultCounter::Increment() { count_.fetch_add(1, std::memory_order_relaxed); } + +void DefaultCounter::Increment(int64_t amount) { + count_.fetch_add(amount, std::memory_order_relaxed); +} + +int64_t DefaultCounter::Value() const { return count_.load(std::memory_order_relaxed); } + +} // namespace iceberg diff --git a/src/iceberg/metrics/counter.h b/src/iceberg/metrics/counter.h new file mode 100644 index 000000000..5fec63752 --- /dev/null +++ b/src/iceberg/metrics/counter.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Unit for a Counter metric. +enum class CounterUnit { + kCount, + kBytes, + kUndefined, +}; + +/// \brief String representation of a CounterUnit. +ICEBERG_EXPORT constexpr std::string_view ToString(CounterUnit unit) noexcept { + switch (unit) { + case CounterUnit::kCount: + return "count"; + case CounterUnit::kBytes: + return "bytes"; + case CounterUnit::kUndefined: + return "undefined"; + } + std::unreachable(); +} + +/// \brief Parse a CounterUnit from a string. +/// +/// \param s The string to parse ("count", "bytes", or "undefined"). +/// \return The CounterUnit, or CounterUnit::kCount if unrecognized. +ICEBERG_EXPORT constexpr CounterUnit CounterUnitFromString(std::string_view s) noexcept { + if (s == "bytes") return CounterUnit::kBytes; + if (s == "undefined") return CounterUnit::kUndefined; + return CounterUnit::kCount; +} + +/// \brief Abstract counter for tracking event totals. +class ICEBERG_EXPORT Counter { + public: + virtual ~Counter() = default; + + /// \brief Increment the counter by 1. + virtual void Increment() = 0; + + /// \brief Increment the counter by the given amount. + virtual void Increment(int64_t amount) = 0; + + /// \brief Return the current count. + virtual int64_t Value() const = 0; + + /// \brief Return the unit for this counter. + virtual CounterUnit Unit() const { return CounterUnit::kCount; } + + /// \brief Return true if this counter is a no-op. + virtual bool IsNoop() const { return false; } + + /// \brief Return a shared no-op counter singleton. + static Counter& Noop(); +}; + +/// \brief Thread-safe counter backed by std::atomic. +class ICEBERG_EXPORT DefaultCounter : public Counter { + public: + explicit DefaultCounter(CounterUnit unit = CounterUnit::kCount); + + void Increment() override; + void Increment(int64_t amount) override; + int64_t Value() const override; + CounterUnit Unit() const override { return unit_; } + + private: + std::atomic count_{0}; + CounterUnit unit_; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/json_serde.cc b/src/iceberg/metrics/json_serde.cc new file mode 100644 index 000000000..bc0474a6c --- /dev/null +++ b/src/iceberg/metrics/json_serde.cc @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/json_serde.h" + +#include + +#include "iceberg/expression/json_serde_internal.h" +#include "iceberg/util/json_util_internal.h" + +namespace iceberg { + +namespace { + +// JSON key constants (kebab-case, matching Iceberg spec) +constexpr std::string_view kTableName = "table-name"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kFilter = "filter"; +constexpr std::string_view kSchemaId = "schema-id"; +constexpr std::string_view kProjectedFieldIds = "projected-field-ids"; +constexpr std::string_view kProjectedFieldNames = "projected-field-names"; +constexpr std::string_view kScanMetrics = "scan-metrics"; +constexpr std::string_view kMetadata = "metadata"; +constexpr std::string_view kSequenceNumber = "sequence-number"; +constexpr std::string_view kOperation = "operation"; +constexpr std::string_view kCommitMetrics = "commit-metrics"; + +// CounterResult / TimerResult keys +constexpr std::string_view kUnit = "unit"; +constexpr std::string_view kValue = "value"; +constexpr std::string_view kCount = "count"; +constexpr std::string_view kTotalDuration = "total-duration"; + +// ScanMetricsResult keys +constexpr std::string_view kTotalPlanningDuration = "total-planning-duration"; +constexpr std::string_view kResultDataFiles = "result-data-files"; +constexpr std::string_view kResultDeleteFiles = "result-delete-files"; +constexpr std::string_view kScannedDataManifests = "scanned-data-manifests"; +constexpr std::string_view kScannedDeleteManifests = "scanned-delete-manifests"; +constexpr std::string_view kTotalDataManifests = "total-data-manifests"; +constexpr std::string_view kTotalDeleteManifests = "total-delete-manifests"; +constexpr std::string_view kTotalFileSizeInBytes = "total-file-size-in-bytes"; +constexpr std::string_view kTotalDeleteFileSizeInBytes = + "total-delete-file-size-in-bytes"; +constexpr std::string_view kSkippedDataManifests = "skipped-data-manifests"; +constexpr std::string_view kSkippedDeleteManifests = "skipped-delete-manifests"; +constexpr std::string_view kSkippedDataFiles = "skipped-data-files"; +constexpr std::string_view kSkippedDeleteFiles = "skipped-delete-files"; +constexpr std::string_view kIndexedDeleteFiles = "indexed-delete-files"; +constexpr std::string_view kEqualityDeleteFiles = "equality-delete-files"; +constexpr std::string_view kPositionalDeleteFiles = "positional-delete-files"; +constexpr std::string_view kDvs = "dvs"; + +// CommitMetricsResult keys +constexpr std::string_view kAttempts = "attempts"; +constexpr std::string_view kAddedDataFiles = "added-data-files"; +constexpr std::string_view kRemovedDataFiles = "removed-data-files"; +constexpr std::string_view kTotalDataFiles = "total-data-files"; +constexpr std::string_view kAddedDeleteFiles = "added-delete-files"; +constexpr std::string_view kAddedEqualityDeleteFiles = "added-equality-delete-files"; +constexpr std::string_view kAddedPositionalDeleteFiles = "added-positional-delete-files"; +constexpr std::string_view kAddedDvs = "added-dvs"; +constexpr std::string_view kRemovedPositionalDeleteFiles = + "removed-positional-delete-files"; +constexpr std::string_view kRemovedDvs = "removed-dvs"; +constexpr std::string_view kRemovedEqualityDeleteFiles = "removed-equality-delete-files"; +constexpr std::string_view kRemovedDeleteFiles = "removed-delete-files"; +constexpr std::string_view kTotalDeleteFiles = "total-delete-files"; +constexpr std::string_view kAddedRecords = "added-records"; +constexpr std::string_view kRemovedRecords = "removed-records"; +constexpr std::string_view kTotalRecords = "total-records"; +constexpr std::string_view kAddedFilesSizeBytes = "added-files-size-bytes"; +constexpr std::string_view kRemovedFilesSizeBytes = "removed-files-size-bytes"; +constexpr std::string_view kTotalFilesSizeBytes = "total-files-size-bytes"; +constexpr std::string_view kAddedPositionalDeletes = "added-positional-deletes"; +constexpr std::string_view kRemovedPositionalDeletes = "removed-positional-deletes"; +constexpr std::string_view kTotalPositionalDeletes = "total-positional-deletes"; +constexpr std::string_view kAddedEqualityDeletes = "added-equality-deletes"; +constexpr std::string_view kRemovedEqualityDeletes = "removed-equality-deletes"; +constexpr std::string_view kTotalEqualityDeletes = "total-equality-deletes"; +constexpr std::string_view kKeptManifestCount = "kept-manifest-count"; +constexpr std::string_view kCreatedManifestCount = "created-manifest-count"; +constexpr std::string_view kReplacedManifestCount = "replaced-manifest-count"; +constexpr std::string_view kProcessedManifestEntriesCount = + "processed-manifest-entries-count"; + +// Helper: emit a CounterResult field only when its value != 0 +void SetCounterField(nlohmann::json& json, std::string_view key, + const CounterResult& counter) { + if (counter.value == 0) return; + json[key] = ToJson(counter); +} + +// Helper: parse optional CounterResult; absent/null yields CounterResult{}, malformed +// propagates. +Result ParseCounterResult(const nlohmann::json& json, + std::string_view key) { + auto it = json.find(key); + if (it == json.end() || it->is_null()) return CounterResult{}; + return CounterResultFromJson(*it); +} + +// Helper: parse optional timer field; absent/null yields TimerResult{}, malformed +// propagates. +Result ParseTimerResult(const nlohmann::json& json, std::string_view key) { + auto it = json.find(key); + if (it == json.end() || it->is_null()) return TimerResult{}; + return TimerResultFromJson(*it); +} + +} // namespace + +// --------------------------------------------------------------------------- +// CounterResult +// --------------------------------------------------------------------------- + +nlohmann::json ToJson(const CounterResult& counter) { + return {{kUnit, ToString(counter.unit)}, {kValue, counter.value}}; +} + +Result CounterResultFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto value, GetJsonValue(json, kValue)); + CounterResult result; + result.value = value; + if (auto it = json.find(kUnit); it != json.end() && it->is_string()) { + result.unit = CounterUnitFromString(it->get()); + } + return result; +} + +// --------------------------------------------------------------------------- +// TimerResult +// --------------------------------------------------------------------------- + +nlohmann::json ToJson(const TimerResult& timer) { + return {{kUnit, timer.unit}, + {kCount, timer.count}, + {kTotalDuration, timer.total_duration.count()}}; +} + +Result TimerResultFromJson(const nlohmann::json& json) { + ICEBERG_ASSIGN_OR_RAISE(auto count, GetJsonValue(json, kCount)); + ICEBERG_ASSIGN_OR_RAISE(auto total, GetJsonValue(json, kTotalDuration)); + TimerResult result{.count = count, .total_duration = std::chrono::nanoseconds{total}}; + if (auto it = json.find(kUnit); it != json.end() && it->is_string()) { + result.unit = it->get(); + } + return result; +} + +// --------------------------------------------------------------------------- +// ScanMetricsResult +// --------------------------------------------------------------------------- + +nlohmann::json ToJson(const ScanMetricsResult& m) { + nlohmann::json json = nlohmann::json::object(); + if (m.total_planning_duration.count > 0) { + json[std::string(kTotalPlanningDuration)] = ToJson(m.total_planning_duration); + } + SetCounterField(json, kResultDataFiles, m.result_data_files); + SetCounterField(json, kResultDeleteFiles, m.result_delete_files); + SetCounterField(json, kScannedDataManifests, m.scanned_data_manifests); + SetCounterField(json, kScannedDeleteManifests, m.scanned_delete_manifests); + SetCounterField(json, kTotalDataManifests, m.total_data_manifests); + SetCounterField(json, kTotalDeleteManifests, m.total_delete_manifests); + SetCounterField(json, kTotalFileSizeInBytes, m.total_file_size_in_bytes); + SetCounterField(json, kTotalDeleteFileSizeInBytes, m.total_delete_file_size_in_bytes); + SetCounterField(json, kSkippedDataManifests, m.skipped_data_manifests); + SetCounterField(json, kSkippedDeleteManifests, m.skipped_delete_manifests); + SetCounterField(json, kSkippedDataFiles, m.skipped_data_files); + SetCounterField(json, kSkippedDeleteFiles, m.skipped_delete_files); + SetCounterField(json, kIndexedDeleteFiles, m.indexed_delete_files); + SetCounterField(json, kEqualityDeleteFiles, m.equality_delete_files); + SetCounterField(json, kPositionalDeleteFiles, m.positional_delete_files); + SetCounterField(json, kDvs, m.dvs); + return json; +} + +Result ScanMetricsResultFromJson(const nlohmann::json& json) { + ScanMetricsResult m; + ICEBERG_ASSIGN_OR_RAISE(m.total_planning_duration, + ParseTimerResult(json, kTotalPlanningDuration)); + ICEBERG_ASSIGN_OR_RAISE(m.result_data_files, + ParseCounterResult(json, kResultDataFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.result_delete_files, + ParseCounterResult(json, kResultDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.scanned_data_manifests, + ParseCounterResult(json, kScannedDataManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.scanned_delete_manifests, + ParseCounterResult(json, kScannedDeleteManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.total_data_manifests, + ParseCounterResult(json, kTotalDataManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.total_delete_manifests, + ParseCounterResult(json, kTotalDeleteManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.total_file_size_in_bytes, + ParseCounterResult(json, kTotalFileSizeInBytes)); + ICEBERG_ASSIGN_OR_RAISE(m.total_delete_file_size_in_bytes, + ParseCounterResult(json, kTotalDeleteFileSizeInBytes)); + ICEBERG_ASSIGN_OR_RAISE(m.skipped_data_manifests, + ParseCounterResult(json, kSkippedDataManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.skipped_delete_manifests, + ParseCounterResult(json, kSkippedDeleteManifests)); + ICEBERG_ASSIGN_OR_RAISE(m.skipped_data_files, + ParseCounterResult(json, kSkippedDataFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.skipped_delete_files, + ParseCounterResult(json, kSkippedDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.indexed_delete_files, + ParseCounterResult(json, kIndexedDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.equality_delete_files, + ParseCounterResult(json, kEqualityDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.positional_delete_files, + ParseCounterResult(json, kPositionalDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.dvs, ParseCounterResult(json, kDvs)); + return m; +} + +// --------------------------------------------------------------------------- +// CommitMetricsResult +// --------------------------------------------------------------------------- + +nlohmann::json ToJson(const CommitMetricsResult& m) { + nlohmann::json json = nlohmann::json::object(); + if (m.total_duration.count > 0) { + json[std::string(kTotalDuration)] = ToJson(m.total_duration); + } + SetCounterField(json, kAttempts, m.attempts); + SetCounterField(json, kAddedDataFiles, m.added_data_files); + SetCounterField(json, kRemovedDataFiles, m.removed_data_files); + SetCounterField(json, kTotalDataFiles, m.total_data_files); + SetCounterField(json, kAddedDeleteFiles, m.added_delete_files); + SetCounterField(json, kAddedEqualityDeleteFiles, m.added_equality_delete_files); + SetCounterField(json, kAddedPositionalDeleteFiles, m.added_positional_delete_files); + SetCounterField(json, kAddedDvs, m.added_dvs); + SetCounterField(json, kRemovedPositionalDeleteFiles, m.removed_positional_delete_files); + SetCounterField(json, kRemovedDvs, m.removed_dvs); + SetCounterField(json, kRemovedEqualityDeleteFiles, m.removed_equality_delete_files); + SetCounterField(json, kRemovedDeleteFiles, m.removed_delete_files); + SetCounterField(json, kTotalDeleteFiles, m.total_delete_files); + SetCounterField(json, kAddedRecords, m.added_records); + SetCounterField(json, kRemovedRecords, m.removed_records); + SetCounterField(json, kTotalRecords, m.total_records); + SetCounterField(json, kAddedFilesSizeBytes, m.added_files_size_bytes); + SetCounterField(json, kRemovedFilesSizeBytes, m.removed_files_size_bytes); + SetCounterField(json, kTotalFilesSizeBytes, m.total_files_size_bytes); + SetCounterField(json, kAddedPositionalDeletes, m.added_positional_deletes); + SetCounterField(json, kRemovedPositionalDeletes, m.removed_positional_deletes); + SetCounterField(json, kTotalPositionalDeletes, m.total_positional_deletes); + SetCounterField(json, kAddedEqualityDeletes, m.added_equality_deletes); + SetCounterField(json, kRemovedEqualityDeletes, m.removed_equality_deletes); + SetCounterField(json, kTotalEqualityDeletes, m.total_equality_deletes); + SetCounterField(json, kKeptManifestCount, m.kept_manifest_count); + SetCounterField(json, kCreatedManifestCount, m.created_manifest_count); + SetCounterField(json, kReplacedManifestCount, m.replaced_manifest_count); + SetCounterField(json, kProcessedManifestEntriesCount, + m.processed_manifest_entries_count); + return json; +} + +Result CommitMetricsResultFromJson(const nlohmann::json& json) { + CommitMetricsResult m; + ICEBERG_ASSIGN_OR_RAISE(m.total_duration, ParseTimerResult(json, kTotalDuration)); + ICEBERG_ASSIGN_OR_RAISE(m.attempts, ParseCounterResult(json, kAttempts)); + ICEBERG_ASSIGN_OR_RAISE(m.added_data_files, ParseCounterResult(json, kAddedDataFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_data_files, + ParseCounterResult(json, kRemovedDataFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.total_data_files, ParseCounterResult(json, kTotalDataFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.added_delete_files, + ParseCounterResult(json, kAddedDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.added_equality_delete_files, + ParseCounterResult(json, kAddedEqualityDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.added_positional_delete_files, + ParseCounterResult(json, kAddedPositionalDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.added_dvs, ParseCounterResult(json, kAddedDvs)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_positional_delete_files, + ParseCounterResult(json, kRemovedPositionalDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_dvs, ParseCounterResult(json, kRemovedDvs)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_equality_delete_files, + ParseCounterResult(json, kRemovedEqualityDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_delete_files, + ParseCounterResult(json, kRemovedDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.total_delete_files, + ParseCounterResult(json, kTotalDeleteFiles)); + ICEBERG_ASSIGN_OR_RAISE(m.added_records, ParseCounterResult(json, kAddedRecords)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_records, ParseCounterResult(json, kRemovedRecords)); + ICEBERG_ASSIGN_OR_RAISE(m.total_records, ParseCounterResult(json, kTotalRecords)); + ICEBERG_ASSIGN_OR_RAISE(m.added_files_size_bytes, + ParseCounterResult(json, kAddedFilesSizeBytes)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_files_size_bytes, + ParseCounterResult(json, kRemovedFilesSizeBytes)); + ICEBERG_ASSIGN_OR_RAISE(m.total_files_size_bytes, + ParseCounterResult(json, kTotalFilesSizeBytes)); + ICEBERG_ASSIGN_OR_RAISE(m.added_positional_deletes, + ParseCounterResult(json, kAddedPositionalDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_positional_deletes, + ParseCounterResult(json, kRemovedPositionalDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.total_positional_deletes, + ParseCounterResult(json, kTotalPositionalDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.added_equality_deletes, + ParseCounterResult(json, kAddedEqualityDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.removed_equality_deletes, + ParseCounterResult(json, kRemovedEqualityDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.total_equality_deletes, + ParseCounterResult(json, kTotalEqualityDeletes)); + ICEBERG_ASSIGN_OR_RAISE(m.kept_manifest_count, + ParseCounterResult(json, kKeptManifestCount)); + ICEBERG_ASSIGN_OR_RAISE(m.created_manifest_count, + ParseCounterResult(json, kCreatedManifestCount)); + ICEBERG_ASSIGN_OR_RAISE(m.replaced_manifest_count, + ParseCounterResult(json, kReplacedManifestCount)); + ICEBERG_ASSIGN_OR_RAISE(m.processed_manifest_entries_count, + ParseCounterResult(json, kProcessedManifestEntriesCount)); + return m; +} + +// --------------------------------------------------------------------------- +// ScanReport +// --------------------------------------------------------------------------- + +Result ToJson(const ScanReport& report) { + nlohmann::json json; + json[kTableName] = report.table_name; + json[kSnapshotId] = report.snapshot_id; + if (report.filter) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, ToJson(*report.filter)); + json[kFilter] = std::move(filter_json); + } + json[kSchemaId] = report.schema_id; + SetContainerField(json, kProjectedFieldIds, report.projected_field_ids); + SetContainerField(json, kProjectedFieldNames, report.projected_field_names); + json[kScanMetrics] = ToJson(report.scan_metrics); + if (!report.metadata.empty()) { + json[kMetadata] = report.metadata; + } + return json; +} + +Result ScanReportFromJson(const nlohmann::json& json) { + ScanReport report; + ICEBERG_ASSIGN_OR_RAISE(report.table_name, GetJsonValue(json, kTableName)); + ICEBERG_ASSIGN_OR_RAISE(report.snapshot_id, GetJsonValue(json, kSnapshotId)); + if (auto it = json.find(kFilter); it != json.end() && !it->is_null()) { + ICEBERG_ASSIGN_OR_RAISE(report.filter, ExpressionFromJson(*it)); + } + if (auto it = json.find(kSchemaId); it != json.end()) { + report.schema_id = it->get(); + } + if (auto it = json.find(kProjectedFieldIds); it != json.end()) { + report.projected_field_ids = it->get>(); + } + if (auto it = json.find(kProjectedFieldNames); it != json.end()) { + report.projected_field_names = it->get>(); + } + if (auto it = json.find(kScanMetrics); it != json.end() && !it->is_null()) { + ICEBERG_ASSIGN_OR_RAISE(report.scan_metrics, ScanMetricsResultFromJson(*it)); + } + if (auto it = json.find(kMetadata); it != json.end() && it->is_object()) { + report.metadata = it->get>(); + } + return report; +} + +// --------------------------------------------------------------------------- +// CommitReport +// --------------------------------------------------------------------------- + +nlohmann::json ToJson(const CommitReport& report) { + nlohmann::json json; + json[kTableName] = report.table_name; + json[kSnapshotId] = report.snapshot_id; + json[kSequenceNumber] = report.sequence_number; + SetOptionalStringField(json, kOperation, report.operation); + json[kCommitMetrics] = ToJson(report.commit_metrics); + if (!report.metadata.empty()) { + json[kMetadata] = report.metadata; + } + return json; +} + +Result CommitReportFromJson(const nlohmann::json& json) { + CommitReport report; + ICEBERG_ASSIGN_OR_RAISE(report.table_name, GetJsonValue(json, kTableName)); + ICEBERG_ASSIGN_OR_RAISE(report.snapshot_id, GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(report.sequence_number, + GetJsonValue(json, kSequenceNumber)); + if (auto it = json.find(kOperation); it != json.end() && it->is_string()) { + report.operation = it->get(); + } + if (auto it = json.find(kCommitMetrics); it != json.end() && !it->is_null()) { + ICEBERG_ASSIGN_OR_RAISE(report.commit_metrics, CommitMetricsResultFromJson(*it)); + } + if (auto it = json.find(kMetadata); it != json.end() && it->is_object()) { + report.metadata = it->get>(); + } + return report; +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/json_serde.h b/src/iceberg/metrics/json_serde.h new file mode 100644 index 000000000..7ee977bac --- /dev/null +++ b/src/iceberg/metrics/json_serde.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics/json_serde.h +/// \brief JSON serialization and deserialization for metrics report types. + +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/result.h" + +namespace iceberg { + +ICEBERG_EXPORT nlohmann::json ToJson(const CounterResult& counter); +ICEBERG_EXPORT Result CounterResultFromJson(const nlohmann::json& json); + +ICEBERG_EXPORT nlohmann::json ToJson(const TimerResult& timer); +ICEBERG_EXPORT Result TimerResultFromJson(const nlohmann::json& json); + +ICEBERG_EXPORT nlohmann::json ToJson(const ScanMetricsResult& metrics); +ICEBERG_EXPORT Result ScanMetricsResultFromJson( + const nlohmann::json& json); + +ICEBERG_EXPORT nlohmann::json ToJson(const CommitMetricsResult& metrics); +ICEBERG_EXPORT Result CommitMetricsResultFromJson( + const nlohmann::json& json); + +/// \brief Serialize a ScanReport to JSON. +/// +/// Returns Result because ScanReport.filter is an Expression whose serialization +/// is fallible. Returns an error if the filter cannot be serialized. +ICEBERG_EXPORT Result ToJson(const ScanReport& report); +ICEBERG_EXPORT Result ScanReportFromJson(const nlohmann::json& json); + +/// \brief Serialize a CommitReport to JSON. +/// +/// Returns nlohmann::json directly (not Result) because CommitReport contains +/// no Expression fields and serialization cannot fail. +ICEBERG_EXPORT nlohmann::json ToJson(const CommitReport& report); +ICEBERG_EXPORT Result CommitReportFromJson(const nlohmann::json& json); + +} // namespace iceberg diff --git a/src/iceberg/metrics/meson.build b/src/iceberg/metrics/meson.build new file mode 100644 index 000000000..713391836 --- /dev/null +++ b/src/iceberg/metrics/meson.build @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +install_headers( + [ + 'commit_report.h', + 'counter.h', + 'json_serde.h', + 'metrics_context.h', + 'metrics_reporter.h', + 'metrics_reporters.h', + 'metrics_types.h', + 'scan_report.h', + 'timer.h', + ], + subdir: 'iceberg/metrics', +) diff --git a/src/iceberg/metrics/metrics_context.cc b/src/iceberg/metrics/metrics_context.cc new file mode 100644 index 000000000..61406fa9d --- /dev/null +++ b/src/iceberg/metrics/metrics_context.cc @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/metrics_context.h" + +#include + +namespace iceberg { + +namespace { + +// Wraps noop singletons behind shared_ptr without deleting them. +struct NoDelete { + template + void operator()(T*) const noexcept {} +}; + +class NullMetricsContext final : public MetricsContext { + public: + using MetricsContext::GetCounter; // expose the one-arg base overload + std::shared_ptr GetCounter(std::string_view, CounterUnit) override { + // Static shared_ptr: control block allocated once, zero allocation per call. + static const std::shared_ptr kNoop{&Counter::Noop(), NoDelete{}}; + return kNoop; + } + + std::shared_ptr GetTimer(std::string_view) override { + static const std::shared_ptr kNoop{&Timer::Noop(), NoDelete{}}; + return kNoop; + } +}; + +} // namespace + +MetricsContext& MetricsContext::Null() { + static NullMetricsContext instance; + return instance; +} + +std::unique_ptr MetricsContext::Default() { + return std::make_unique(); +} + +std::shared_ptr DefaultMetricsContext::GetCounter(std::string_view name, + CounterUnit unit) { + auto key = std::string(name); + auto it = counters_.find(key); + if (it != counters_.end()) return it->second; + auto counter = std::make_shared(unit); + counters_.emplace(std::move(key), counter); + return counter; +} + +std::shared_ptr DefaultMetricsContext::GetTimer(std::string_view name) { + auto key = std::string(name); + auto it = timers_.find(key); + if (it != timers_.end()) return it->second; + auto timer = std::make_shared(); + timers_.emplace(std::move(key), timer); + return timer; +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/metrics_context.h b/src/iceberg/metrics/metrics_context.h new file mode 100644 index 000000000..84dbff2cb --- /dev/null +++ b/src/iceberg/metrics/metrics_context.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics/metrics_context.h +/// \brief Factory interface for creating named Counter and Timer instances. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/counter.h" +#include "iceberg/metrics/timer.h" + +namespace iceberg { + +/// \brief Factory for creating named Counter and Timer instances. +/// +/// A MetricsContext owns all metrics it creates. Asking for the same name +/// twice returns the same object (identity by name). The null context returns +/// noop singletons without allocating. +class ICEBERG_EXPORT MetricsContext { + public: + virtual ~MetricsContext() = default; + + /// \brief Get or create a named Counter with an explicit unit. + virtual std::shared_ptr GetCounter(std::string_view name, + CounterUnit unit) = 0; + + /// \brief Get or create a count-unit Counter by name. + /// + /// Convenience overload defaulting to CounterUnit::kCount. + std::shared_ptr GetCounter(std::string_view name) { + return GetCounter(name, CounterUnit::kCount); + } + + /// \brief Get or create a named Timer (nanosecond precision). + virtual std::shared_ptr GetTimer(std::string_view name) = 0; + + /// \brief Return the null (no-op) MetricsContext singleton. + /// + /// All metrics returned by the null context are noop; nothing is allocated. + static MetricsContext& Null(); + + /// \brief Create a new DefaultMetricsContext. + static std::unique_ptr Default(); +}; + +/// \brief MetricsContext backed by DefaultCounter and DefaultTimer instances. +/// +/// Thread-safe for metric *increments*; the unordered_map lookup/insert is NOT +/// protected. Concurrent GetCounter/GetTimer calls with the same name may create +/// duplicate Counter/Timer instances, breaking identity-by-name semantics. +/// Register all metrics during single-threaded setup, then pass the returned +/// shared_ptrs freely across threads. +class ICEBERG_EXPORT DefaultMetricsContext : public MetricsContext { + public: + using MetricsContext::GetCounter; // expose the one-arg base overload + std::shared_ptr GetCounter(std::string_view name, CounterUnit unit) override; + + std::shared_ptr GetTimer(std::string_view name) override; + + private: + std::unordered_map> counters_; + std::unordered_map> timers_; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/metrics_reporter.h b/src/iceberg/metrics/metrics_reporter.h new file mode 100644 index 000000000..828adf360 --- /dev/null +++ b/src/iceberg/metrics/metrics_reporter.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/scan_report.h" + +namespace iceberg { + +/// \brief The type of a metrics report. +enum class MetricsReportType { + kScanReport, + kCommitReport, +}; + +/// \brief Get the string representation of a metrics report type. +ICEBERG_EXPORT constexpr std::string_view ToString(MetricsReportType type) noexcept { + switch (type) { + case MetricsReportType::kScanReport: + return "scan"; + case MetricsReportType::kCommitReport: + return "commit"; + } + std::unreachable(); +} + +/// \brief A metrics report, which can be either a ScanReport or CommitReport. +/// +/// This variant type allows handling both report types uniformly through +/// the MetricsReporter interface. +using MetricsReport = std::variant; + +/// \brief Get the type of a metrics report. +/// +/// \param report The metrics report to get the type of. +/// \return The type of the metrics report. +ICEBERG_EXPORT inline MetricsReportType GetReportType(const MetricsReport& report) { + return std::visit( + [](const auto& r) -> MetricsReportType { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return MetricsReportType::kScanReport; + } else { + return MetricsReportType::kCommitReport; + } + }, + report); +} + +/// \brief Interface for reporting metrics from Iceberg operations. +/// +/// Implementations of this interface can be used to collect and report +/// metrics about scan and commit operations. Common implementations include +/// logging reporters, metrics collectors, and the noop reporter for testing. +class ICEBERG_EXPORT MetricsReporter { + public: + virtual ~MetricsReporter() = default; + + /// \brief Initialize the reporter with catalog properties after construction. + /// + /// Called by MetricsReporters::Load() before the first Report() invocation. + /// The default implementation is a no-op. Override to perform property-based + /// setup (e.g., configure endpoints, credentials, sampling rates). + virtual void Initialize( + [[maybe_unused]] const std::unordered_map& properties) {} + + /// \brief Report a metrics report. + /// + /// Implementations should handle the report according to their purpose + /// (e.g., logging, sending to a metrics service, etc.). + /// + /// \param report The metrics report to process. + virtual void Report(const MetricsReport& report) = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/metrics_reporters.cc b/src/iceberg/metrics/metrics_reporters.cc new file mode 100644 index 000000000..a1c8fac92 --- /dev/null +++ b/src/iceberg/metrics/metrics_reporters.cc @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/metrics_reporters.h" + +namespace iceberg { + +namespace { + +/// \brief Registry type for MetricsReporter factories. +using MetricsReporterRegistry = std::unordered_map; + +/// \brief Extract the reporter type identifier from properties. +/// +/// Returns the value of "metrics-reporter-impl" verbatim (case-preserved), or +/// kMetricsReporterTypeNoop if the property is absent or empty. +std::string InferReporterType( + const std::unordered_map& properties) { + auto it = properties.find(std::string(kMetricsReporterImpl)); + if (it != properties.end() && !it->second.empty()) { + return it->second; + } + return std::string(kMetricsReporterTypeNoop); +} + +/// \brief Metrics reporter that does nothing. +class NoopMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report([[maybe_unused]] const MetricsReport& report) override {} +}; + +template +MetricsReporterFactory MakeReporterFactory() { + return [](const std::unordered_map& props) + -> Result> { return T::Make(props); }; +} + +MetricsReporterRegistry CreateDefaultRegistry() { + return { + {std::string(kMetricsReporterTypeNoop), MakeReporterFactory()}, + }; +} + +MetricsReporterRegistry& GetRegistry() { + static MetricsReporterRegistry registry = CreateDefaultRegistry(); + return registry; +} + +} // namespace + +// --- CompositeMetricsReporter --- + +CompositeMetricsReporter::CompositeMetricsReporter( + std::unordered_set> reporters) + : reporters_(std::move(reporters)) {} + +void CompositeMetricsReporter::Report(const MetricsReport& report) { + for (const auto& reporter : reporters_) { + try { + reporter->Report(report); + } catch (...) { + // Catch all exceptions to ensure one failing reporter doesn't prevent others from + // receiving the report. + } + } +} + +const std::unordered_set>& +CompositeMetricsReporter::Reporters() const { + return reporters_; +} + +// --- MetricsReporters --- + +void MetricsReporters::Register(std::string_view reporter_type, + MetricsReporterFactory factory) { + GetRegistry()[std::string(reporter_type)] = std::move(factory); +} + +Result> MetricsReporters::Load( + const std::unordered_map& properties) { + std::string reporter_type = InferReporterType(properties); + + auto& registry = GetRegistry(); + auto it = registry.find(reporter_type); + if (it == registry.end()) { + return InvalidArgument( + "Unknown metrics reporter type '{}'. Register a factory with " + "MetricsReporters::Register() before using this type.", + reporter_type); + } + + ICEBERG_ASSIGN_OR_RAISE(auto reporter, it->second(properties)); + reporter->Initialize(properties); + return reporter; +} + +std::shared_ptr MetricsReporters::Combine( + std::shared_ptr first, std::shared_ptr second) { + if (!first) return second; + if (!second || first.get() == second.get()) return first; + + std::unordered_set> reporters; + + auto collect = [&reporters](const std::shared_ptr& r) { + if (auto* composite = dynamic_cast(r.get())) { + for (const auto& inner : composite->Reporters()) { + reporters.insert(inner); + } + } else { + reporters.insert(r); + } + }; + + collect(first); + collect(second); + + return std::make_shared(std::move(reporters)); +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/metrics_reporters.h b/src/iceberg/metrics/metrics_reporters.h new file mode 100644 index 000000000..c182fc8e2 --- /dev/null +++ b/src/iceberg/metrics/metrics_reporters.h @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics/metrics_reporters.h +/// \brief Factory for creating MetricsReporter instances. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Property key for configuring the metrics reporter implementation. +/// +/// Set this property in catalog properties to specify which metrics reporter +/// implementation to use. The value should match a registered reporter type. +constexpr std::string_view kMetricsReporterImpl = "metrics-reporter-impl"; + +/// \brief Property value for the noop metrics reporter. +constexpr std::string_view kMetricsReporterTypeNoop = "noop"; + +/// \brief Function type for creating MetricsReporter instances. +/// +/// \param properties Configuration properties for the reporter. +/// \return A new MetricsReporter instance or an error. +using MetricsReporterFactory = std::function>( + const std::unordered_map& properties)>; + +/// \brief A MetricsReporter that delegates to multiple reporters. +/// +/// Combines several reporters so that every report is delivered to each of them. +/// Any exception thrown by an individual reporter is caught and swallowed; +/// the remaining reporters still receive the report. +/// +/// Use MetricsReporters::Combine() to create instances — that helper flattens +/// nested composites and deduplicates reporters by identity. +class ICEBERG_EXPORT CompositeMetricsReporter : public MetricsReporter { + public: + explicit CompositeMetricsReporter( + std::unordered_set> reporters); + + void Report(const MetricsReport& report) override; + + /// \brief The reporters contained in this composite. + /// + /// Used by MetricsReporters::Combine() for flattening. + const std::unordered_set>& Reporters() const; + + private: + std::unordered_set> reporters_; +}; + +/// \brief Factory class for creating and managing MetricsReporter instances. +/// +/// This class provides a registry-based factory for creating MetricsReporter +/// implementations. Custom reporter implementations can be registered using +/// the Register() method. +class ICEBERG_EXPORT MetricsReporters { + public: + /// \brief Load a metrics reporter based on properties. + /// + /// This method looks up the "metrics-reporter-impl" property to determine + /// which reporter implementation to create. If not specified, returns a + /// NoopMetricsReporter. + /// + /// \param properties Configuration properties containing reporter type. + /// \return A new MetricsReporter instance or an error. + static Result> Load( + const std::unordered_map& properties); + + /// \brief Register a factory for a metrics reporter type. + /// + /// This method is not thread-safe. All registrations should be done during + /// application startup before any concurrent access to Load(). + /// + /// \param reporter_type Type identifier matched case-sensitively against the + /// value of "metrics-reporter-impl" in catalog properties, mirroring + /// Java's fully-qualified class-name lookup (e.g., "noop"). + /// \param factory Factory function that produces the reporter. + static void Register(std::string_view reporter_type, MetricsReporterFactory factory); + + /// \brief Combine two reporters into one. + /// + /// \param first First reporter, may be nullptr. + /// \param second Second reporter, may be nullptr. + /// \return Combined reporter, or nullptr if both inputs are nullptr. + static std::shared_ptr Combine( + std::shared_ptr first, std::shared_ptr second); +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/metrics_types.h b/src/iceberg/metrics/metrics_types.h new file mode 100644 index 000000000..b170d9684 --- /dev/null +++ b/src/iceberg/metrics/metrics_types.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics/metrics_types.h +/// \brief Serialisable snapshot types shared across scan and commit metrics. +/// +/// CounterResult, TimerResult, and DurationNs are primitive result types used by +/// ScanMetricsResult, CommitMetricsResult, and the JSON serde layer. Defining +/// them here keeps ScanReport and CommitReport headers independent of each other. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/counter.h" // CounterUnit + +namespace iceberg { + +/// \brief Duration type for metrics reporting (nanosecond precision). +using DurationNs = std::chrono::nanoseconds; + +/// \brief Serialisable snapshot of a single Counter measurement. +/// +/// Carries both the unit and value. +struct ICEBERG_EXPORT CounterResult { + CounterUnit unit = CounterUnit::kCount; + int64_t value = 0; + + bool operator==(const CounterResult&) const = default; +}; + +/// \brief Serialisable snapshot of a single Timer measurement. +/// +/// Carries the unit name, recording count, and total accumulated duration. +struct ICEBERG_EXPORT TimerResult { + // Time unit name; always "nanoseconds" for built-in timers. + std::string unit{"nanoseconds"}; + int64_t count = 0; + std::chrono::nanoseconds total_duration{0}; + + bool operator==(const TimerResult&) const = default; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/scan_report.cc b/src/iceberg/metrics/scan_report.cc new file mode 100644 index 000000000..8a0bdb87d --- /dev/null +++ b/src/iceberg/metrics/scan_report.cc @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/scan_report.h" + +namespace iceberg { + +ScanMetrics ScanMetrics::Of(MetricsContext& context) { + ScanMetrics m; + m.total_planning_duration = context.GetTimer("totalPlanningDuration"); + m.result_data_files = context.GetCounter("resultDataFiles"); + m.result_delete_files = context.GetCounter("resultDeleteFiles"); + m.scanned_data_manifests = context.GetCounter("scannedDataManifests"); + m.scanned_delete_manifests = context.GetCounter("scannedDeleteManifests"); + m.total_data_manifests = context.GetCounter("totalDataManifests"); + m.total_delete_manifests = context.GetCounter("totalDeleteManifests"); + m.total_file_size_in_bytes = + context.GetCounter("totalFileSizeInBytes", CounterUnit::kBytes); + m.total_delete_file_size_in_bytes = + context.GetCounter("totalDeleteFileSizeInBytes", CounterUnit::kBytes); + m.skipped_data_manifests = context.GetCounter("skippedDataManifests"); + m.skipped_delete_manifests = context.GetCounter("skippedDeleteManifests"); + m.skipped_data_files = context.GetCounter("skippedDataFiles"); + m.skipped_delete_files = context.GetCounter("skippedDeleteFiles"); + m.indexed_delete_files = context.GetCounter("indexedDeleteFiles"); + m.equality_delete_files = context.GetCounter("equalityDeleteFiles"); + m.positional_delete_files = context.GetCounter("positionalDeleteFiles"); + m.dvs = context.GetCounter("dvs"); + return m; +} + +ScanMetrics ScanMetrics::Noop() { return ScanMetrics::Of(MetricsContext::Null()); } + +ScanMetricsResult ScanMetrics::ToResult() const { + ScanMetricsResult r; + // Helper: snapshot a live Counter into a CounterResult (unit comes from the Counter). + auto snap = [](const std::shared_ptr& c) -> CounterResult { + return c ? CounterResult{.unit = c->Unit(), .value = c->Value()} : CounterResult{}; + }; + + r.total_planning_duration = + total_planning_duration + ? TimerResult{.unit = std::string(total_planning_duration->Unit()), + .count = total_planning_duration->Count(), + .total_duration = total_planning_duration->TotalDuration()} + : TimerResult{}; + r.result_data_files = snap(result_data_files); + r.result_delete_files = snap(result_delete_files); + r.scanned_data_manifests = snap(scanned_data_manifests); + r.scanned_delete_manifests = snap(scanned_delete_manifests); + r.total_data_manifests = snap(total_data_manifests); + r.total_delete_manifests = snap(total_delete_manifests); + r.total_file_size_in_bytes = snap(total_file_size_in_bytes); + r.total_delete_file_size_in_bytes = snap(total_delete_file_size_in_bytes); + r.skipped_data_manifests = snap(skipped_data_manifests); + r.skipped_delete_manifests = snap(skipped_delete_manifests); + r.skipped_data_files = snap(skipped_data_files); + r.skipped_delete_files = snap(skipped_delete_files); + r.indexed_delete_files = snap(indexed_delete_files); + r.equality_delete_files = snap(equality_delete_files); + r.positional_delete_files = snap(positional_delete_files); + r.dvs = snap(dvs); + return r; +} + +ScanMetricsResult ScanMetricsResult::From(const ScanMetrics& scan_metrics) { + return scan_metrics.ToResult(); +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/scan_report.h b/src/iceberg/metrics/scan_report.h new file mode 100644 index 000000000..d4fbe0924 --- /dev/null +++ b/src/iceberg/metrics/scan_report.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/expression/expression.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/metrics_types.h" +#include "iceberg/metrics/timer.h" + +namespace iceberg { + +// Forward declaration: ScanMetrics is defined later in this header. +class ScanMetrics; + +/// \brief Immutable snapshot of scan metrics for use in ScanReport. +/// +/// Populated by ScanMetrics::ToResult() after a scan completes. +struct ICEBERG_EXPORT ScanMetricsResult { + /// \brief Total planning duration (count of recordings + accumulated nanoseconds). + TimerResult total_planning_duration; + /// \brief Number of data files included in the scan result. + CounterResult result_data_files; + /// \brief Number of delete files included in the scan result. + CounterResult result_delete_files; + /// \brief Number of data manifests whose files were read (not skipped). + CounterResult scanned_data_manifests; + /// \brief Number of delete manifests whose files were read (not skipped). + CounterResult scanned_delete_manifests; + /// \brief Total number of data manifests in the snapshot. + CounterResult total_data_manifests; + /// \brief Total number of delete manifests in the snapshot. + CounterResult total_delete_manifests; + /// \brief Total byte size of all result data files. + CounterResult total_file_size_in_bytes; + /// \brief Total byte size of all result delete files. + CounterResult total_delete_file_size_in_bytes; + /// \brief Number of data manifests skipped by partition/stats pruning. + CounterResult skipped_data_manifests; + /// \brief Number of delete manifests skipped by partition/stats pruning. + CounterResult skipped_delete_manifests; + /// \brief Number of individual data files skipped by stats pruning. + CounterResult skipped_data_files; + /// \brief Number of individual delete files skipped by stats pruning. + CounterResult skipped_delete_files; + /// \brief Number of indexed delete files (positional or DV) in the result. + CounterResult indexed_delete_files; + /// \brief Number of equality delete files in the result. + CounterResult equality_delete_files; + /// \brief Number of positional delete files in the result. + CounterResult positional_delete_files; + /// \brief Number of deletion vectors in the result. + CounterResult dvs; + + bool operator==(const ScanMetricsResult&) const = default; + + /// \brief Build a ScanMetricsResult from live scan metrics. + static ScanMetricsResult From(const ScanMetrics& scan_metrics); +}; + +/// \brief Live scan metrics collected during a table scan operation. +/// +/// Holds named Counter and Timer instances obtained from a MetricsContext. +/// Call Of() at the start of a scan to obtain an instrumented instance, then +/// increment counters and start/stop the planning timer as the scan proceeds. +/// Call ToResult() at the end to obtain the serialisable ScanMetricsResult. +class ICEBERG_EXPORT ScanMetrics { + public: + /// \brief Create a ScanMetrics instance backed by the given MetricsContext. + static ScanMetrics Of(MetricsContext& context); + + /// \brief Create a ScanMetrics instance with all-noop counters and timer. + static ScanMetrics Noop(); + + /// \brief Snapshot current counter/timer values into a ScanMetricsResult. + ScanMetricsResult ToResult() const; + + std::shared_ptr total_planning_duration; + std::shared_ptr result_data_files; + std::shared_ptr result_delete_files; + std::shared_ptr scanned_data_manifests; + std::shared_ptr scanned_delete_manifests; + std::shared_ptr total_data_manifests; + std::shared_ptr total_delete_manifests; + std::shared_ptr total_file_size_in_bytes; + std::shared_ptr total_delete_file_size_in_bytes; + std::shared_ptr skipped_data_manifests; + std::shared_ptr skipped_delete_manifests; + std::shared_ptr skipped_data_files; + std::shared_ptr skipped_delete_files; + std::shared_ptr indexed_delete_files; + std::shared_ptr equality_delete_files; + std::shared_ptr positional_delete_files; + std::shared_ptr dvs; +}; + +/// \brief Report generated after a table scan operation. +/// +/// Contains metrics about the planning and execution of a table scan, +/// including information about manifests and data files processed. +struct ICEBERG_EXPORT ScanReport { + /// \brief The fully qualified name of the table that was scanned. + std::string table_name; + /// \brief Snapshot ID that was scanned, if available. + int64_t snapshot_id = kInvalidSnapshotId; + /// \brief Filter expression used in the scan, if any. + std::shared_ptr filter; + /// \brief Schema ID. + int32_t schema_id = kInvalidSchemaId; + /// \brief Projected field IDs from the scan schema. + std::vector projected_field_ids; + /// \brief Projected field names from the scan schema. + std::vector projected_field_names; + /// \brief Metrics collected during the scan operation. + ScanMetricsResult scan_metrics; + /// \brief Additional key-value metadata. + std::unordered_map metadata; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics/timer.cc b/src/iceberg/metrics/timer.cc new file mode 100644 index 000000000..9bedfca0e --- /dev/null +++ b/src/iceberg/metrics/timer.cc @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/timer.h" + +namespace iceberg { + +namespace { + +class NoopTimer final : public Timer { + public: + int64_t Count() const override { return 0; } + std::chrono::nanoseconds TotalDuration() const override { + return std::chrono::nanoseconds{0}; + } + void Record(std::chrono::nanoseconds) override {} + bool IsNoop() const override { return true; } +}; + +} // namespace + +// --- Timer::Timed --- + +Timer::Timed::Timed(Timer& timer) + : timer_(&timer), start_(std::chrono::steady_clock::now()) {} + +Timer::Timed::~Timed() { Stop(); } + +Timer::Timed::Timed(Timed&& other) noexcept + : timer_(other.timer_), start_(other.start_), stopped_(other.stopped_) { + other.stopped_ = true; // transfer ownership; prevent double-record +} + +Timer::Timed& Timer::Timed::operator=(Timed&& other) noexcept { + if (this != &other) { + Stop(); + timer_ = other.timer_; + start_ = other.start_; + stopped_ = other.stopped_; + other.stopped_ = true; + } + return *this; +} + +void Timer::Timed::Stop() { + if (stopped_) return; + stopped_ = true; + auto end = std::chrono::steady_clock::now(); + timer_->Record(std::chrono::duration_cast(end - start_)); +} + +// --- Timer --- + +Timer::Timed Timer::Start() { return Timed(*this); } + +Timer& Timer::Noop() { + static NoopTimer instance; + return instance; +} + +// --- DefaultTimer --- + +int64_t DefaultTimer::Count() const { return count_.load(std::memory_order_relaxed); } + +std::chrono::nanoseconds DefaultTimer::TotalDuration() const { + return std::chrono::nanoseconds{total_nanos_.load(std::memory_order_relaxed)}; +} + +void DefaultTimer::Record(std::chrono::nanoseconds duration) { + count_.fetch_add(1, std::memory_order_relaxed); + total_nanos_.fetch_add(duration.count(), std::memory_order_relaxed); +} + +} // namespace iceberg diff --git a/src/iceberg/metrics/timer.h b/src/iceberg/metrics/timer.h new file mode 100644 index 000000000..53820ed14 --- /dev/null +++ b/src/iceberg/metrics/timer.h @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Abstract timer for measuring operation durations. +/// +/// Use Start() to obtain a Timed RAII +/// guard that records the elapsed duration when it goes out of scope. +class ICEBERG_EXPORT Timer { + public: + /// \brief RAII guard that records elapsed time into the owning Timer on destruction. + class ICEBERG_EXPORT Timed { + public: + explicit Timed(Timer& timer); + ~Timed(); + + Timed(const Timed&) = delete; + Timed& operator=(const Timed&) = delete; + Timed(Timed&& other) noexcept; + Timed& operator=(Timed&& other) noexcept; + + /// \brief Explicitly stop timing and record the duration. + /// + /// Subsequent calls (including the destructor) are no-ops. + void Stop(); + + private: + Timer* timer_; + std::chrono::steady_clock::time_point start_; + bool stopped_ = false; + }; + + virtual ~Timer() = default; + + /// \brief Number of timing recordings made so far. + virtual int64_t Count() const = 0; + + /// \brief Total accumulated duration across all recordings. + virtual std::chrono::nanoseconds TotalDuration() const = 0; + + /// \brief Record a nanosecond duration directly. + /// + /// Use the template overload below to record + /// any std::chrono duration type with automatic unit conversion. + virtual void Record(std::chrono::nanoseconds duration) = 0; + + /// \brief Record a duration of any chrono type, converting to nanoseconds. + template + void Record(std::chrono::duration duration) { + Record(std::chrono::duration_cast(duration)); + } + + /// \brief Return the time unit used by this timer (always "nanoseconds"). + virtual std::string_view Unit() const { return "nanoseconds"; } + + /// \brief Return true if this timer is a no-op. + virtual bool IsNoop() const { return false; } + + /// \brief Start timing and return a RAII Timed guard. + /// + /// The elapsed duration is recorded into this timer when the Timed guard is + /// destroyed or Stop() is called. + Timed Start(); + + /// \brief Execute a callable, record its wall-clock duration, and return its result. + template + auto Time(Callable&& fn) { + auto timed = Start(); + if constexpr (std::is_void_v>) { + std::forward(fn)(); + timed.Stop(); + } else { + auto result = std::forward(fn)(); + timed.Stop(); + return result; + } + } + + /// \brief Return a shared no-op timer singleton. + static Timer& Noop(); +}; + +/// \brief Thread-safe timer backed by std::atomic. +class ICEBERG_EXPORT DefaultTimer : public Timer { + public: + int64_t Count() const override; + std::chrono::nanoseconds TotalDuration() const override; + void Record(std::chrono::nanoseconds duration) override; + + private: + std::atomic count_{0}; + std::atomic total_nanos_{0}; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 791ad9be0..94348d2a3 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -86,6 +86,8 @@ add_iceberg_test(table_test SOURCES location_provider_test.cc metrics_config_test.cc + metrics_reporter_test.cc + metrics_test.cc snapshot_summary_builder_test.cc snapshot_test.cc snapshot_util_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e168d08bf..e84e69643 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -49,6 +49,8 @@ iceberg_tests = { 'sources': files( 'location_provider_test.cc', 'metrics_config_test.cc', + 'metrics_reporter_test.cc', + 'metrics_test.cc', 'snapshot_test.cc', 'snapshot_util_test.cc', 'table_metadata_builder_test.cc', diff --git a/src/iceberg/test/metrics_reporter_test.cc b/src/iceberg/test/metrics_reporter_test.cc new file mode 100644 index 000000000..95bcaba85 --- /dev/null +++ b/src/iceberg/test/metrics_reporter_test.cc @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics/metrics_reporter.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/metrics/counter.h" +#include "iceberg/metrics/metrics_reporters.h" + +namespace iceberg { + +class CollectingMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report(const MetricsReport& report) override { reports_.push_back(report); } + + const std::vector& reports() const { return reports_; } + + private: + std::vector reports_; +}; + +TEST(CustomMetricsReporterTest, RegisterAndLoad) { + // Register custom reporter + MetricsReporters::Register("collecting", + [](const std::unordered_map& props) + -> Result> { + return CollectingMetricsReporter::Make(props); + }); + + // Load the custom reporter + std::unordered_map properties = { + {std::string(kMetricsReporterImpl), "collecting"}}; + auto result = MetricsReporters::Load(properties); + + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value(), nullptr); + + // Report and verify + auto* reporter = dynamic_cast(result.value().get()); + ASSERT_NE(reporter, nullptr); + + ScanReport scan_report{.table_name = "test.table"}; + reporter->Report(scan_report); + + EXPECT_EQ(reporter->reports().size(), 1); + EXPECT_EQ(GetReportType(reporter->reports()[0]), MetricsReportType::kScanReport); +} + +struct ReporterRegistrationParam { + std::string test_name; + std::string register_name; + std::string load_name; + bool expect_success; +}; + +class ReporterRegistrationTest + : public ::testing::TestWithParam {}; + +TEST_P(ReporterRegistrationTest, LoadsRegisteredReporter) { + const auto& param = GetParam(); + MetricsReporters::Register(param.register_name, + [](const std::unordered_map&) + -> Result> { + return std::make_unique(); + }); + + std::unordered_map props = { + {std::string(kMetricsReporterImpl), param.load_name}}; + auto result = MetricsReporters::Load(props); + EXPECT_EQ(result.has_value(), param.expect_success); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsReporterRegistration, ReporterRegistrationTest, + ::testing::Values(ReporterRegistrationParam{.test_name = "ExactMatch", + .register_name = "custom1", + .load_name = "custom1", + .expect_success = true}, + ReporterRegistrationParam{.test_name = "ExactCaseMatch", + .register_name = "UPPER1", + .load_name = "UPPER1", + .expect_success = true}, + ReporterRegistrationParam{.test_name = "CaseMismatch", + .register_name = "UPPER2", + .load_name = "upper2", + .expect_success = false}, + ReporterRegistrationParam{.test_name = "UnregisteredType", + .register_name = "registered1", + .load_name = "nonexistent1", + .expect_success = false}), + [](const auto& info) { return info.param.test_name; }); + +struct VariantDispatchParam { + std::string test_name; + MetricsReport report; + MetricsReportType expected_type; +}; + +class VariantDispatchTest : public ::testing::TestWithParam {}; + +TEST_P(VariantDispatchTest, CorrectTypeDispatch) { + const auto& param = GetParam(); + EXPECT_EQ(GetReportType(param.report), param.expected_type); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsReportVariant, VariantDispatchTest, + ::testing::Values( + VariantDispatchParam{.test_name = "ScanReportDefault", + .report = ScanReport{}, + .expected_type = MetricsReportType::kScanReport}, + VariantDispatchParam{.test_name = "CommitReportDefault", + .report = CommitReport{}, + .expected_type = MetricsReportType::kCommitReport}), + [](const auto& info) { return info.param.test_name; }); + +struct CollectorParam { + std::string test_name; + MetricsReport report; + MetricsReportType expected_type; + std::string expected_table_name; +}; + +class CollectorTest : public ::testing::TestWithParam {}; + +TEST_P(CollectorTest, CollectsAndPreservesReport) { + const auto& param = GetParam(); + CollectingMetricsReporter reporter; + reporter.Report(param.report); + + ASSERT_EQ(reporter.reports().size(), 1); + EXPECT_EQ(GetReportType(reporter.reports()[0]), param.expected_type); + + std::visit([&](const auto& r) { EXPECT_EQ(r.table_name, param.expected_table_name); }, + reporter.reports()[0]); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsCollector, CollectorTest, + ::testing::Values( + CollectorParam{ + .test_name = "ScanWithFields", + .report = ScanReport{.table_name = "db.t1", + .snapshot_id = 1, + .scan_metrics = + ScanMetricsResult{ + .total_file_size_in_bytes = + CounterResult{.unit = CounterUnit::kBytes, + .value = 99999}}}, + .expected_type = MetricsReportType::kScanReport, + .expected_table_name = "db.t1"}, + CollectorParam{.test_name = "CommitWithFields", + .report = CommitReport{.table_name = "db.t2", + .snapshot_id = 2, + .operation = "append"}, + .expected_type = MetricsReportType::kCommitReport, + .expected_table_name = "db.t2"}), + [](const auto& info) { return info.param.test_name; }); + +// --------------------------------------------------------------------------- +// CompositeMetricsReporter / MetricsReporters::Combine tests +// --------------------------------------------------------------------------- + +class ThrowingMetricsReporter : public MetricsReporter { + public: + void Report([[maybe_unused]] const MetricsReport& report) override { + throw std::runtime_error("reporter failure"); + } +}; + +TEST(CombineTest, FlattenNestedComposite) { + auto a = std::make_shared(); + auto b = std::make_shared(); + auto c = std::make_shared(); + + auto ab = MetricsReporters::Combine(a, b); + auto abc = MetricsReporters::Combine(ab, c); + + // Result must be a flat composite — not a composite-of-composites. + auto* composite = dynamic_cast(abc.get()); + ASSERT_NE(composite, nullptr); + EXPECT_EQ(composite->Reporters().size(), 3u); + for (const auto& r : composite->Reporters()) { + EXPECT_EQ(dynamic_cast(r.get()), nullptr); + } + + abc->Report(CommitReport{.table_name = "db.t2"}); + EXPECT_EQ(a->reports().size(), 1u); + EXPECT_EQ(b->reports().size(), 1u); + EXPECT_EQ(c->reports().size(), 1u); +} + +TEST(CombineTest, DeduplicateByIdentity) { + auto a = std::make_shared(); + auto b = std::make_shared(); + + // ab already contains a and b; combining with b again must not add b twice. + auto ab = MetricsReporters::Combine(a, b); + auto result = MetricsReporters::Combine(ab, b); + + auto* composite = dynamic_cast(result.get()); + ASSERT_NE(composite, nullptr); + EXPECT_EQ(composite->Reporters().size(), 2u); + + result->Report(ScanReport{}); + EXPECT_EQ(a->reports().size(), 1u); + EXPECT_EQ(b->reports().size(), 1u); // delivered once, not twice +} + +TEST(CombineTest, ExceptionInOneReporterDoesNotBlockOthers) { + auto throwing = std::make_shared(); + auto collecting = std::make_shared(); + auto combined = MetricsReporters::Combine(throwing, collecting); + + EXPECT_NO_THROW(combined->Report(ScanReport{})); + EXPECT_EQ(collecting->reports().size(), 1u); +} + +TEST(MetricsReportersTest, LoadDefaultIsNoop) { + auto result = MetricsReporters::Load({}); + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value(), nullptr); + EXPECT_NO_THROW(result.value()->Report(ScanReport{})); +} + +// Verify that Load() calls Initialize() on the created reporter. +class InitializingReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] const std::unordered_map&) { + return std::make_unique(); + } + void Initialize(const std::unordered_map& props) override { + initialized_ = true; + init_props_ = props; + } + void Report([[maybe_unused]] const MetricsReport&) override {} + bool initialized() const { return initialized_; } + const std::unordered_map& init_props() const { + return init_props_; + } + + private: + bool initialized_ = false; + std::unordered_map init_props_; +}; + +TEST(MetricsReportersTest, LoadCallsInitialize) { + MetricsReporters::Register("initializing", [](const auto& props) { + return InitializingReporter::Make(props); + }); + + std::unordered_map props = { + {std::string(kMetricsReporterImpl), "initializing"}, + {"custom-key", "custom-value"}, + }; + auto result = MetricsReporters::Load(props); + ASSERT_TRUE(result.has_value()); + + auto* reporter = dynamic_cast(result.value().get()); + ASSERT_NE(reporter, nullptr); + EXPECT_TRUE(reporter->initialized()); + EXPECT_EQ(reporter->init_props().at("custom-key"), "custom-value"); +} + +} // namespace iceberg diff --git a/src/iceberg/test/metrics_test.cc b/src/iceberg/test/metrics_test.cc new file mode 100644 index 000000000..6eecd702a --- /dev/null +++ b/src/iceberg/test/metrics_test.cc @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/counter.h" +#include "iceberg/metrics/json_serde.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/metrics/timer.h" + +namespace iceberg { + +// --------------------------------------------------------------------------- +// Counter +// --------------------------------------------------------------------------- + +TEST(DefaultCounterTest, IncrementByOne) { + DefaultCounter c; + EXPECT_EQ(c.Value(), 0); + c.Increment(); + EXPECT_EQ(c.Value(), 1); +} + +TEST(DefaultCounterTest, IncrementByAmount) { + DefaultCounter c; + c.Increment(42); + EXPECT_EQ(c.Value(), 42); + c.Increment(8); + EXPECT_EQ(c.Value(), 50); +} + +TEST(DefaultCounterTest, UnitCount) { + DefaultCounter c(CounterUnit::kCount); + EXPECT_EQ(c.Unit(), CounterUnit::kCount); + EXPECT_FALSE(c.IsNoop()); +} + +TEST(DefaultCounterTest, UnitBytes) { + DefaultCounter c(CounterUnit::kBytes); + EXPECT_EQ(c.Unit(), CounterUnit::kBytes); +} + +TEST(NoopCounterTest, IsNoopAndAlwaysZero) { + Counter& noop = Counter::Noop(); + EXPECT_TRUE(noop.IsNoop()); + noop.Increment(); + noop.Increment(100); + EXPECT_EQ(noop.Value(), 0); +} + +// --------------------------------------------------------------------------- +// Timer +// --------------------------------------------------------------------------- + +TEST(DefaultTimerTest, RaiiRecordsOnce) { + DefaultTimer t; + EXPECT_EQ(t.Count(), 0); + { + auto timed = t.Start(); + } + EXPECT_EQ(t.Count(), 1); // RAII guard called Record() exactly once +} + +TEST(DefaultTimerTest, ExplicitStopRecordsOnce) { + DefaultTimer t; + auto timed = t.Start(); + timed.Stop(); + EXPECT_EQ(t.Count(), 1); + // Destructor must not double-record. +} + +TEST(DefaultTimerTest, RecordDirect) { + DefaultTimer t; + t.Record(std::chrono::nanoseconds{1000}); + t.Record(std::chrono::nanoseconds{500}); + EXPECT_EQ(t.Count(), 2); + EXPECT_EQ(t.TotalDuration(), std::chrono::nanoseconds{1500}); +} + +TEST(DefaultTimerTest, MoveDoesNotDoubleRecord) { + DefaultTimer t; + { + auto a = t.Start(); + auto b = std::move(a); // a is moved-from; destructor must not record + } // b records exactly once on destruction + EXPECT_EQ(t.Count(), 1); +} + +TEST(NoopTimerTest, IsNoopAndAlwaysZero) { + Timer& noop = Timer::Noop(); + EXPECT_TRUE(noop.IsNoop()); + { + auto timed = noop.Start(); + } + EXPECT_EQ(noop.Count(), 0); + EXPECT_EQ(noop.TotalDuration().count(), 0); +} + +TEST(DefaultTimerTest, UnitIsNanoseconds) { + DefaultTimer t; + EXPECT_EQ(t.Unit(), "nanoseconds"); +} + +TEST(NoopTimerTest, UnitIsNanoseconds) { EXPECT_EQ(Timer::Noop().Unit(), "nanoseconds"); } + +struct DurationConversionParam { + std::string name; + std::chrono::nanoseconds input; + std::chrono::nanoseconds expected; +}; + +class DefaultTimerDurationConversionTest + : public ::testing::TestWithParam {}; + +TEST_P(DefaultTimerDurationConversionTest, RecordsAndConverts) { + DefaultTimer t; + t.Record(GetParam().input); + EXPECT_EQ(t.TotalDuration(), GetParam().expected); + EXPECT_EQ(t.Count(), 1); +} + +INSTANTIATE_TEST_SUITE_P( + DurationConversion, DefaultTimerDurationConversionTest, + ::testing::Values( + DurationConversionParam{"Microseconds", + std::chrono::duration_cast( + std::chrono::microseconds{5}), + std::chrono::nanoseconds{5000}}, + DurationConversionParam{"Milliseconds", + std::chrono::duration_cast( + std::chrono::milliseconds{2}), + std::chrono::nanoseconds{2000000}}), + [](const auto& info) { return info.param.name; }); + +TEST(DefaultTimerTest, TimeVoidCallableRecordsOnce) { + DefaultTimer t; + t.Time([&] { /* intentional no-op */ }); + // Verify the count was incremented; duration is not checked because a + // no-op body may measure as 0 ns depending on clock resolution. + EXPECT_EQ(t.Count(), 1); +} + +TEST(DefaultTimerTest, TimeNonVoidCallableReturnsResult) { + DefaultTimer t; + int result = t.Time([&] { return 42; }); + EXPECT_EQ(result, 42); + EXPECT_EQ(t.Count(), 1); + // Even for Noop callable is still invoked. + int called = 0; + Timer::Noop().Time([&] { ++called; }); + EXPECT_EQ(called, 1); +} + +// --------------------------------------------------------------------------- +// MetricsContext +// --------------------------------------------------------------------------- + +TEST(DefaultMetricsContextTest, SameNameReturnsSameObject) { + DefaultMetricsContext ctx; + auto c1 = ctx.GetCounter("foo", CounterUnit::kCount); + auto c2 = ctx.GetCounter("foo", CounterUnit::kCount); + EXPECT_EQ(c1.get(), c2.get()); + + auto t1 = ctx.GetTimer("dur"); + auto t2 = ctx.GetTimer("dur"); + EXPECT_EQ(t1.get(), t2.get()); + + auto c1 = ctx.GetCounter("a", CounterUnit::kCount); + auto c2 = ctx.GetCounter("b", CounterUnit::kCount); + EXPECT_NE(c1.get(), c2.get()); +} + +TEST(NullMetricsContextTest, ReturnsNoopInstances) { + MetricsContext& null_ctx = MetricsContext::Null(); + EXPECT_TRUE(null_ctx.GetCounter("x", CounterUnit::kCount)->IsNoop()); + EXPECT_TRUE(null_ctx.GetTimer("y")->IsNoop()); +} + +TEST(NullMetricsContextTest, ReturnsSameSharedPtrEachCall) { + // Verify the static-shared_ptr fix: no new control block per call. + MetricsContext& null_ctx = MetricsContext::Null(); + auto c1 = null_ctx.GetCounter("a", CounterUnit::kCount); + auto c2 = null_ctx.GetCounter("b", CounterUnit::kCount); + EXPECT_EQ(c1.get(), c2.get()); // same noop singleton + auto t1 = null_ctx.GetTimer("x"); + auto t2 = null_ctx.GetTimer("y"); + EXPECT_EQ(t1.get(), t2.get()); +} + +TEST(DefaultMetricsContextTest, OneArgGetCounterDefaultsToCount) { + DefaultMetricsContext ctx; + auto c = ctx.GetCounter("hits"); + EXPECT_NE(c, nullptr); + EXPECT_EQ(c->Unit(), CounterUnit::kCount); + // Calling again with the same name returns the same object. + EXPECT_EQ(ctx.GetCounter("hits").get(), c.get()); +} + +// --------------------------------------------------------------------------- +// ScanMetrics +// --------------------------------------------------------------------------- + +TEST(ScanMetricsTest, OfContextPopulatesResult) { + DefaultMetricsContext ctx; + auto m = ScanMetrics::Of(ctx); + m.result_data_files->Increment(5); + m.total_file_size_in_bytes->Increment(1024); + m.total_planning_duration->Record(std::chrono::nanoseconds{500}); + + auto r = m.ToResult(); + EXPECT_EQ(r.result_data_files.value, 5); + EXPECT_EQ(r.result_data_files.unit, CounterUnit::kCount); + EXPECT_EQ(r.total_file_size_in_bytes.value, 1024); + EXPECT_EQ(r.total_file_size_in_bytes.unit, CounterUnit::kBytes); + EXPECT_EQ(r.total_planning_duration.count, 1); + EXPECT_EQ(r.total_planning_duration.total_duration, std::chrono::nanoseconds{500}); +} + +TEST(ScanMetricsTest, ToResultForwardsTimerUnit) { + DefaultMetricsContext ctx; + auto m = ScanMetrics::Of(ctx); + m.total_planning_duration->Record(std::chrono::nanoseconds{100}); + auto r = m.ToResult(); + EXPECT_EQ(r.total_planning_duration.unit, "nanoseconds"); +} + +// --------------------------------------------------------------------------- +// CommitMetrics +// --------------------------------------------------------------------------- + +TEST(CommitMetricsTest, NoopPopulatesZero) { + auto m = CommitMetrics::Noop(); + CommitMetricsResult result; + m.PopulateResult(result); + EXPECT_EQ(result.total_duration.count, 0); + EXPECT_EQ(result.total_duration.total_duration.count(), 0); + EXPECT_EQ(result.attempts.value, 0); +} + +TEST(CommitMetricsTest, TimerAndAttemptsPopulated) { + DefaultMetricsContext ctx; + auto m = CommitMetrics::Of(ctx); + m.total_duration->Record(std::chrono::nanoseconds{2000}); + m.attempts->Increment(3); + + CommitMetricsResult result; + m.PopulateResult(result); + EXPECT_EQ(result.total_duration.count, 1); + EXPECT_EQ(result.total_duration.total_duration, std::chrono::nanoseconds{2000}); + EXPECT_EQ(result.attempts.value, 3); + EXPECT_EQ(result.attempts.unit, CounterUnit::kCount); +} + +// --------------------------------------------------------------------------- +// JSON serde — CounterResult / TimerResult +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// CounterResult serde — parameterized round-trip +// --------------------------------------------------------------------------- + +class CounterResultRoundTripTest : public ::testing::TestWithParam {}; + +TEST_P(CounterResultRoundTripTest, RoundTrip) { + const CounterResult original = GetParam(); + auto json = ToJson(original); + auto result = CounterResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), original); +} + +INSTANTIATE_TEST_SUITE_P( + CounterResultSerde, CounterResultRoundTripTest, + ::testing::Values(CounterResult{.unit = CounterUnit::kBytes, .value = 1024}, + CounterResult{.unit = CounterUnit::kCount, .value = 42}), + [](const auto& info) { + return info.param.unit == CounterUnit::kBytes ? "BytesUnit" : "CountUnit"; + }); + +TEST(TimerResultSerdeTest, RoundTrip) { + TimerResult original{.count = 3, .total_duration = std::chrono::nanoseconds{9876}}; + auto json = ToJson(original); + auto result = TimerResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().count, 3); + EXPECT_EQ(result.value().total_duration, std::chrono::nanoseconds{9876}); +} + +// --------------------------------------------------------------------------- +// JSON serde — ScanReport / CommitReport +// --------------------------------------------------------------------------- + +TEST(ScanReportSerdeTest, RoundTrip) { + ScanReport report; + report.table_name = "cat.db.t"; + report.snapshot_id = 42; + report.schema_id = 1; + report.scan_metrics.result_data_files = CounterResult{.value = 7}; + report.scan_metrics.total_file_size_in_bytes = + CounterResult{.unit = CounterUnit::kBytes, .value = 8192}; + report.scan_metrics.total_planning_duration = + TimerResult{.count = 1, .total_duration = std::chrono::nanoseconds{100000}}; + report.projected_field_ids = {1, 2}; + report.projected_field_names = {"id", "name"}; + + auto json_result = ToJson(report); + ASSERT_TRUE(json_result.has_value()); + auto result = ScanReportFromJson(json_result.value()); + ASSERT_TRUE(result.has_value()); + const auto& r = result.value(); + EXPECT_EQ(r.table_name, "cat.db.t"); + EXPECT_EQ(r.snapshot_id, 42); + EXPECT_EQ(r.scan_metrics.result_data_files.value, 7); + EXPECT_EQ(r.scan_metrics.result_data_files.unit, CounterUnit::kCount); + EXPECT_EQ(r.scan_metrics.total_file_size_in_bytes.value, 8192); + EXPECT_EQ(r.scan_metrics.total_file_size_in_bytes.unit, CounterUnit::kBytes); + EXPECT_EQ(r.scan_metrics.total_planning_duration.count, 1); + EXPECT_EQ(r.scan_metrics.total_planning_duration.total_duration, + std::chrono::nanoseconds{100000}); + EXPECT_EQ(r.projected_field_ids, (std::vector{1, 2})); +} + +TEST(ScanReportSerdeTest, RoundTripWithAlwaysTrueFilter) { + ScanReport report; + report.table_name = "db.t"; + report.snapshot_id = 1; + report.filter = True::Instance(); + + auto json_result = ToJson(report); + ASSERT_TRUE(json_result.has_value()); + auto result = ScanReportFromJson(json_result.value()); + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value().filter, nullptr); + EXPECT_EQ(result.value().filter->op(), Expression::Operation::kTrue); +} + +TEST(CommitReportSerdeTest, RoundTrip) { + CommitReport report; + report.table_name = "cat.db.t"; + report.snapshot_id = 99; + report.sequence_number = 5; + report.operation = "append"; + report.commit_metrics.total_duration = + TimerResult{.count = 1, .total_duration = std::chrono::nanoseconds{200000}}; + report.commit_metrics.attempts = CounterResult{.value = 1}; + report.commit_metrics.added_data_files = CounterResult{.value = 3}; + report.commit_metrics.added_records = CounterResult{.value = 1000}; + + auto json = ToJson(report); + auto result = CommitReportFromJson(json); + ASSERT_TRUE(result.has_value()); + const auto& r = result.value(); + EXPECT_EQ(r.table_name, "cat.db.t"); + EXPECT_EQ(r.snapshot_id, 99); + EXPECT_EQ(r.sequence_number, 5); + EXPECT_EQ(r.operation, "append"); + EXPECT_EQ(r.commit_metrics.total_duration.count, 1); + EXPECT_EQ(r.commit_metrics.total_duration.total_duration, + std::chrono::nanoseconds{200000}); + EXPECT_EQ(r.commit_metrics.added_data_files.value, 3); + EXPECT_EQ(r.commit_metrics.added_records.value, 1000); +} + +// --------------------------------------------------------------------------- +// ScanMetricsResult::From +// --------------------------------------------------------------------------- + +TEST(ScanMetricsResultTest, FromDelegatesToToResult) { + DefaultMetricsContext ctx; + auto m = ScanMetrics::Of(ctx); + m.result_data_files->Increment(7); + m.total_planning_duration->Record(std::chrono::nanoseconds{12345}); + + auto via_from = ScanMetricsResult::From(m); + auto via_to_result = m.ToResult(); + + EXPECT_EQ(via_from.result_data_files, via_to_result.result_data_files); + EXPECT_EQ(via_from.total_planning_duration.count, + via_to_result.total_planning_duration.count); + EXPECT_EQ(via_from.total_planning_duration.total_duration, + via_to_result.total_planning_duration.total_duration); +} + +// --------------------------------------------------------------------------- +// CommitMetricsResult::From +// --------------------------------------------------------------------------- + +TEST(CommitMetricsResultTest, FromWithEmptySummaryYieldsZeroFileCounts) { + DefaultMetricsContext ctx; + auto live = CommitMetrics::Of(ctx); + live.total_duration->Record(std::chrono::nanoseconds{5000}); + live.attempts->Increment(); + + auto result = CommitMetricsResult::From(live, {}); + + EXPECT_EQ(result.total_duration.count, 1); + EXPECT_EQ(result.total_duration.total_duration, std::chrono::nanoseconds{5000}); + EXPECT_EQ(result.attempts.value, 1); + EXPECT_EQ(result.attempts.unit, CounterUnit::kCount); + // All snapshot-summary fields must be zero when the summary is empty. + EXPECT_EQ(result.added_data_files.value, 0); + EXPECT_EQ(result.added_data_files.unit, CounterUnit::kCount); + EXPECT_EQ(result.removed_data_files.value, 0); + EXPECT_EQ(result.total_data_files.value, 0); + EXPECT_EQ(result.added_records.value, 0); + EXPECT_EQ(result.total_records.value, 0); + EXPECT_EQ(result.kept_manifest_count.value, 0); + EXPECT_EQ(result.created_manifest_count.value, 0); +} + +TEST(CommitMetricsResultTest, FromParsesSnapshotSummary) { + DefaultMetricsContext ctx; + auto live = CommitMetrics::Of(ctx); + live.total_duration->Record(std::chrono::nanoseconds{8000}); + live.attempts->Increment(2); + + std::unordered_map summary = { + {"added-data-files", "3"}, {"deleted-data-files", "1"}, + {"total-data-files", "10"}, {"added-records", "1000"}, + {"deleted-records", "200"}, {"total-records", "5000"}, + {"added-files-size", "4096"}, {"removed-files-size", "1024"}, + {"total-files-size", "20480"}, {"manifests-created", "2"}, + {"manifests-kept", "5"}, {"manifests-replaced", "1"}, + {"entries-processed", "8"}, + }; + + auto result = CommitMetricsResult::From(live, summary); + + // Live metrics. + EXPECT_EQ(result.total_duration.count, 1); + EXPECT_EQ(result.total_duration.total_duration, std::chrono::nanoseconds{8000}); + EXPECT_EQ(result.attempts.value, 2); + EXPECT_EQ(result.attempts.unit, CounterUnit::kCount); + + // Snapshot-summary fields — verify both value and unit. + EXPECT_EQ(result.added_data_files.value, 3); + EXPECT_EQ(result.added_data_files.unit, CounterUnit::kCount); + EXPECT_EQ(result.removed_data_files.value, 1); + EXPECT_EQ(result.total_data_files.value, 10); + EXPECT_EQ(result.added_records.value, 1000); + EXPECT_EQ(result.removed_records.value, 200); + EXPECT_EQ(result.total_records.value, 5000); + EXPECT_EQ(result.added_files_size_bytes.value, 4096); + EXPECT_EQ(result.added_files_size_bytes.unit, CounterUnit::kBytes); + EXPECT_EQ(result.removed_files_size_bytes.value, 1024); + EXPECT_EQ(result.removed_files_size_bytes.unit, CounterUnit::kBytes); + EXPECT_EQ(result.total_files_size_bytes.value, 20480); + EXPECT_EQ(result.total_files_size_bytes.unit, CounterUnit::kBytes); + EXPECT_EQ(result.created_manifest_count.value, 2); + EXPECT_EQ(result.kept_manifest_count.value, 5); + EXPECT_EQ(result.replaced_manifest_count.value, 1); + EXPECT_EQ(result.processed_manifest_entries_count.value, 8); +} + +TEST(CommitMetricsResultTest, FromHandlesMissingAndUnparseableKeys) { + // Missing key → 0, unparseable value → 0. + std::unordered_map summary = { + {"added-data-files", "not-a-number"}, + // "deleted-data-files" intentionally absent + }; + auto result = CommitMetricsResult::From(CommitMetrics::Noop(), summary); + EXPECT_EQ(result.added_data_files.value, 0); // unparseable + EXPECT_EQ(result.removed_data_files.value, 0); // absent +} + +// --------------------------------------------------------------------------- +// Metrics JSON serde — CounterResult (additional cases) +// --------------------------------------------------------------------------- + +TEST(CounterResultSerdeTest, MissingUnitDefaultsToCount) { + nlohmann::json json; + json["value"] = 7; + // No "unit" key — should default to kCount. + auto result = CounterResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().unit, CounterUnit::kCount); + EXPECT_EQ(result.value().value, 7); +} + +TEST(CounterResultSerdeTest, MissingValueReturnsError) { + nlohmann::json json; + json["unit"] = "count"; + // Missing "value" key — must return an error. + auto result = CounterResultFromJson(json); + EXPECT_FALSE(result.has_value()); +} + +TEST(TimerResultSerdeTest, JsonKeysAreCorrect) { + TimerResult t{.count = 2, .total_duration = std::chrono::nanoseconds{5000}}; + auto json = ToJson(t); + EXPECT_EQ(json["unit"], "nanoseconds"); + EXPECT_EQ(json["count"], 2); + EXPECT_EQ(json["total-duration"], 5000); +} + +TEST(TimerResultSerdeTest, UnitFieldPreservedThroughRoundTrip) { + TimerResult original{ + .unit = "nanoseconds", .count = 1, .total_duration = std::chrono::nanoseconds{999}}; + auto json = ToJson(original); + EXPECT_EQ(json["unit"], "nanoseconds"); // field, not hardcode + auto result = TimerResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().unit, "nanoseconds"); + EXPECT_EQ(result.value().count, 1); + EXPECT_EQ(result.value().total_duration, std::chrono::nanoseconds{999}); +} + +// --------------------------------------------------------------------------- +// Metrics JSON serde — ScanMetricsResult +// --------------------------------------------------------------------------- + +TEST(ScanMetricsResultSerdeTest, AllFieldsRoundTrip) { + ScanMetricsResult m; + m.total_planning_duration = + TimerResult{.count = 2, .total_duration = std::chrono::nanoseconds{50000}}; + m.result_data_files = CounterResult{.unit = CounterUnit::kCount, .value = 10}; + m.result_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.scanned_data_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 5}; + m.scanned_delete_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.total_data_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 8}; + m.total_delete_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 3}; + m.total_file_size_in_bytes = + CounterResult{.unit = CounterUnit::kBytes, .value = 131072}; + m.total_delete_file_size_in_bytes = + CounterResult{.unit = CounterUnit::kBytes, .value = 4096}; + m.skipped_data_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 3}; + m.skipped_delete_manifests = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.skipped_data_files = CounterResult{.unit = CounterUnit::kCount, .value = 7}; + m.skipped_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.indexed_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 4}; + m.equality_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.positional_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.dvs = CounterResult{.unit = CounterUnit::kCount, .value = 3}; + + auto json = ToJson(m); + auto result = ScanMetricsResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), m); +} + +TEST(ScanMetricsResultSerdeTest, MissingFieldsDefaultToZeroCounterResult) { + // JSON with only one field set; all others must default to CounterResult{}. + nlohmann::json json = nlohmann::json::object(); + json["result-data-files"] = nlohmann::json{{"unit", "count"}, {"value", 5}}; + + auto result = ScanMetricsResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().result_data_files.value, 5); + EXPECT_EQ(result.value().result_delete_files, CounterResult{}); + EXPECT_EQ(result.value().total_file_size_in_bytes, CounterResult{}); +} + +TEST(ScanMetricsResultSerdeTest, JsonKeyNamesAreKebabCase) { + ScanMetricsResult m; + m.result_data_files = CounterResult{.value = 1}; + m.total_file_size_in_bytes = CounterResult{.unit = CounterUnit::kBytes, .value = 1}; + m.total_planning_duration = + TimerResult{.count = 1, .total_duration = std::chrono::nanoseconds{1}}; + + auto json = ToJson(m); + EXPECT_TRUE(json.contains("result-data-files")); + EXPECT_TRUE(json.contains("total-file-size-in-bytes")); + EXPECT_TRUE(json.contains("total-planning-duration")); + // Spot-check that no camelCase or snake_case keys leaked in. + EXPECT_FALSE(json.contains("resultDataFiles")); + EXPECT_FALSE(json.contains("result_data_files")); +} + +// --------------------------------------------------------------------------- +// Metrics JSON serde — CommitMetricsResult +// --------------------------------------------------------------------------- + +TEST(CommitMetricsResultSerdeTest, EmptyResultProducesEmptyJsonObject) { + CommitMetricsResult empty{}; + auto json = ToJson(empty); + EXPECT_TRUE(json.is_object()); + EXPECT_TRUE(json.empty()) << "all-zero CommitMetricsResult must produce empty JSON"; +} + +TEST(CommitMetricsResultSerdeTest, AllFieldsRoundTrip) { + CommitMetricsResult m; + m.total_duration = + TimerResult{.count = 1, .total_duration = std::chrono::nanoseconds{1000}}; + m.attempts = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.added_data_files = CounterResult{.unit = CounterUnit::kCount, .value = 3}; + m.removed_data_files = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.total_data_files = CounterResult{.unit = CounterUnit::kCount, .value = 10}; + m.added_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.added_equality_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.added_positional_delete_files = + CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.added_dvs = CounterResult{.unit = CounterUnit::kCount, .value = 4}; + m.removed_positional_delete_files = + CounterResult{.unit = CounterUnit::kCount, .value = 0}; + m.removed_dvs = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.removed_equality_delete_files = + CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.removed_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.total_delete_files = CounterResult{.unit = CounterUnit::kCount, .value = 5}; + m.added_records = CounterResult{.unit = CounterUnit::kCount, .value = 500}; + m.removed_records = CounterResult{.unit = CounterUnit::kCount, .value = 100}; + m.total_records = CounterResult{.unit = CounterUnit::kCount, .value = 2000}; + m.added_files_size_bytes = CounterResult{.unit = CounterUnit::kBytes, .value = 8192}; + m.removed_files_size_bytes = CounterResult{.unit = CounterUnit::kBytes, .value = 1024}; + m.total_files_size_bytes = CounterResult{.unit = CounterUnit::kBytes, .value = 65536}; + m.added_positional_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 20}; + m.removed_positional_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 5}; + m.total_positional_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 50}; + m.added_equality_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 10}; + m.removed_equality_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 3}; + m.total_equality_deletes = CounterResult{.unit = CounterUnit::kCount, .value = 30}; + m.kept_manifest_count = CounterResult{.unit = CounterUnit::kCount, .value = 4}; + m.created_manifest_count = CounterResult{.unit = CounterUnit::kCount, .value = 2}; + m.replaced_manifest_count = CounterResult{.unit = CounterUnit::kCount, .value = 1}; + m.processed_manifest_entries_count = + CounterResult{.unit = CounterUnit::kCount, .value = 12}; + + auto json = ToJson(m); + auto result = CommitMetricsResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), m); +} + +TEST(CommitMetricsResultSerdeTest, ZeroValueFieldsOmittedFromJson) { + CommitMetricsResult m; + m.added_data_files = CounterResult{.value = 5}; + // All other fields remain zero. + auto json = ToJson(m); + EXPECT_TRUE(json.contains("added-data-files")); + EXPECT_FALSE(json.contains("removed-data-files")); + EXPECT_FALSE(json.contains("total-duration")); + EXPECT_FALSE(json.contains("attempts")); +} + +TEST(CommitMetricsResultSerdeTest, MissingFieldsDefaultToZeroCounterResult) { + nlohmann::json json = nlohmann::json::object(); + json["added-data-files"] = nlohmann::json{{"unit", "count"}, {"value", 9}}; + + auto result = CommitMetricsResultFromJson(json); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().added_data_files.value, 9); + EXPECT_EQ(result.value().removed_data_files, CounterResult{}); + EXPECT_EQ(result.value().total_duration, TimerResult{}); +} + +// --------------------------------------------------------------------------- +// Metrics JSON serde — CommitReport (additional cases) +// --------------------------------------------------------------------------- + +TEST(CommitReportSerdeTest, ZeroMetricsOmittedFromJson) { + CommitReport report; + report.table_name = "db.t"; + report.snapshot_id = 1; + report.sequence_number = 1; + auto json = ToJson(report); + EXPECT_TRUE(json.contains("commit-metrics")); + EXPECT_TRUE(json["commit-metrics"].empty()); +} + +TEST(CommitReportSerdeTest, RequiredFieldsMissingReturnsError) { + // Missing "table-name" — FromJson must return an error. + nlohmann::json json; + json["snapshot-id"] = 1; + json["sequence-number"] = 1; + EXPECT_FALSE(CommitReportFromJson(json).has_value()); +} + +TEST(ScanReportSerdeTest, RequiredFieldsMissingReturnsError) { + // Missing "snapshot-id" — FromJson must return an error. + nlohmann::json json; + json["table-name"] = "t"; + EXPECT_FALSE(ScanReportFromJson(json).has_value()); +} + +} // namespace iceberg From 9c89310ec3382ddbf611834f50b8d59eda2de4cf Mon Sep 17 00:00:00 2001 From: Innocent Date: Thu, 21 May 2026 11:32:20 -0700 Subject: [PATCH 2/3] cleanup --- src/iceberg/metrics/commit_report.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/metrics/commit_report.h b/src/iceberg/metrics/commit_report.h index 77a313615..6fcd47902 100644 --- a/src/iceberg/metrics/commit_report.h +++ b/src/iceberg/metrics/commit_report.h @@ -26,7 +26,7 @@ #include "iceberg/constants.h" #include "iceberg/iceberg_export.h" #include "iceberg/metrics/metrics_context.h" -#include "iceberg/metrics/metrics_types.h" // CounterResult, TimerResult, DurationNs +#include "iceberg/metrics/metrics_types.h" #include "iceberg/metrics/timer.h" namespace iceberg { From b87f96a521d6ddd3f4170bbaccddc2dab27c6bf4 Mon Sep 17 00:00:00 2001 From: Innocent Date: Thu, 21 May 2026 13:00:32 -0700 Subject: [PATCH 3/3] cleanup --- src/iceberg/test/metrics_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/iceberg/test/metrics_test.cc b/src/iceberg/test/metrics_test.cc index 6eecd702a..16c875b99 100644 --- a/src/iceberg/test/metrics_test.cc +++ b/src/iceberg/test/metrics_test.cc @@ -190,7 +190,10 @@ TEST(DefaultMetricsContextTest, SameNameReturnsSameObject) { auto t1 = ctx.GetTimer("dur"); auto t2 = ctx.GetTimer("dur"); EXPECT_EQ(t1.get(), t2.get()); +} +TEST(DefaultMetricsContextTest, DifferentNamesReturnDifferentObjects) { + DefaultMetricsContext ctx; auto c1 = ctx.GetCounter("a", CounterUnit::kCount); auto c2 = ctx.GetCounter("b", CounterUnit::kCount); EXPECT_NE(c1.get(), c2.get());