Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 73 additions & 8 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

#include "iceberg/manifest/manifest_group.h"

#include <algorithm>
#include <string>
#include <unordered_set>
#include <utility>

#include "iceberg/expression/binder.h"
#include "iceberg/expression/evaluator.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/manifest_evaluator.h"
Expand All @@ -29,6 +33,7 @@
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/manifest_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/table_scan.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -265,10 +270,45 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));

auto columns = columns_;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
!columns.empty() &&
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
auto spec_iter = specs_by_id_.find(manifest.partition_spec_id);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my other comment, partition_spec_id varies among manifest files so let's remove its support for now.

ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Cannot find partition spec for ID {}", manifest.partition_spec_id);

ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
spec_iter->second->PartitionType(*schema_));
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto bound_file_filter,
Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));

std::unordered_set<std::string> selected_columns(columns.cbegin(), columns.cend());
for (const auto field_id : referenced_field_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto column_name,
data_file_schema->FindColumnNameById(field_id));
if (column_name.has_value()) {
std::string column_name_str(column_name.value());
if (column_name_str.starts_with(DataFile::kPartitionField + ".")) {
column_name_str = DataFile::kPartitionField;
}
if (selected_columns.contains(column_name_str)) {
continue;
}
columns.push_back(std::move(column_name_str));
selected_columns.insert(columns.back());
}
}
}

reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
.CaseSensitive(case_sensitive_)
.Select(columns_);
.Select(std::move(columns));

return reader;
}
Expand Down Expand Up @@ -299,11 +339,31 @@ ManifestGroup::ReadEntries() {
return eval_cache[spec_id].get();
};

std::unique_ptr<Evaluator> data_file_evaluator;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
// TODO(gangwu): create an Evaluator on the DataFile schema with empty
// partition type
}
const bool has_file_filter =
file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
std::unordered_map<int32_t, std::unique_ptr<Evaluator>> data_file_eval_cache;
auto get_data_file_evaluator = [&](int32_t spec_id) -> Result<Evaluator*> {
if (!has_file_filter) {
return nullptr;
}
if (data_file_eval_cache.contains(spec_id)) {
return data_file_eval_cache[spec_id].get();
}

auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Cannot find partition spec for ID {}", spec_id);

ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
spec_iter->second->PartitionType(*schema_));
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto data_file_evaluator,
Evaluator::Make(*data_file_schema, file_filter_, case_sensitive_));
data_file_eval_cache[spec_id] = std::move(data_file_evaluator);

return data_file_eval_cache[spec_id].get();
};

std::unordered_map<int32_t, std::vector<ManifestEntry>> result;

Expand Down Expand Up @@ -336,15 +396,20 @@ ManifestGroup::ReadEntries() {
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
ICEBERG_ASSIGN_OR_RAISE(auto data_file_evaluator, get_data_file_evaluator(spec_id));

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}

if (data_file_evaluator != nullptr) {
// TODO(gangwu): implement data_file_evaluator to evaluate StructLike on
// top of entry.data_file
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}

if (!manifest_entry_predicate_(entry)) {
Expand Down
146 changes: 146 additions & 0 deletions src/iceberg/row/manifest_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,55 @@

#include "iceberg/row/manifest_wrapper.h"

#include <iterator>
#include <map>
#include <memory>
#include <type_traits>
#include <vector>

#include "iceberg/manifest/manifest_reader_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

enum class DataFileFieldPosition : size_t {
kContent = 0,
kFilePath = 1,
kFileFormat = 2,
kPartition = 3,
kRecordCount = 4,
kFileSize = 5,
kColumnSizes = 6,
kValueCounts = 7,
kNullValueCounts = 8,
kNanValueCounts = 9,
kLowerBounds = 10,
kUpperBounds = 11,
kKeyMetadata = 12,
kSplitOffsets = 13,
kEqualityIds = 14,
kSortOrderId = 15,
kFirstRowId = 16,
kReferencedDataFile = 17,
kContentOffset = 18,
kContentSize = 19,
kNextUnusedId = 20,
};

template <typename T>
requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, std::string>
std::string_view ToView(const T& value) {
return {reinterpret_cast<const char*>(value.data()), value.size()}; // NOLINT
}

Scalar ToScalar(const int32_t value) { return value; }

Scalar ToScalar(const int64_t value) { return value; }

Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }

template <typename T>
Result<Scalar> FromOptional(const std::optional<T>& value) {
if (value.has_value()) {
Expand All @@ -39,6 +76,56 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}

Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
if (value.has_value()) {
return ToView(value.value());
}
return std::monostate{};
}

template <typename T>
class VectorArrayLike : public ArrayLike {
public:
explicit VectorArrayLike(const std::vector<T>& values) : values_(values) {}

Result<Scalar> GetElement(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid array index: {}", pos);
}
return ToScalar(values_.get()[pos]);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::vector<T>> values_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use std::span<const T> here?

};

template <typename V>
class IntMapLike : public MapLike {
public:
explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}

Result<Scalar> GetKey(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return std::next(values_.get().cbegin(), pos)->first;
}

Result<Scalar> GetValue(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return ToScalar(std::next(values_.get().cbegin(), pos)->second);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::map<int32_t, V>> values_;
};

} // namespace

Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
Expand Down Expand Up @@ -134,4 +221,63 @@ std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}

Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
if (pos >= num_fields()) {
return InvalidArgument("Invalid data file field index: {}", pos);
}

const auto& data_file = data_file_.get();
switch (static_cast<DataFileFieldPosition>(pos)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expose partition_spec_id field as an optional field? FYI Java puts it at pos 3 (zero-based).

case DataFileFieldPosition::kContent:
return static_cast<int32_t>(data_file.content);
case DataFileFieldPosition::kFilePath:
return ToView(data_file.file_path);
case DataFileFieldPosition::kFileFormat:
return ToString(data_file.file_format);
case DataFileFieldPosition::kPartition: {
partition_ = std::make_shared<PartitionValues>(data_file.partition);
return partition_;
}
case DataFileFieldPosition::kRecordCount:
return data_file.record_count;
case DataFileFieldPosition::kFileSize:
return data_file.file_size_in_bytes;
case DataFileFieldPosition::kColumnSizes:
return std::make_shared<IntMapLike<int64_t>>(data_file.column_sizes);
case DataFileFieldPosition::kValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.value_counts);
case DataFileFieldPosition::kNullValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.null_value_counts);
case DataFileFieldPosition::kNanValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.nan_value_counts);
case DataFileFieldPosition::kLowerBounds:
return std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.lower_bounds);
case DataFileFieldPosition::kUpperBounds:
return std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.upper_bounds);
case DataFileFieldPosition::kKeyMetadata:
return ToView(data_file.key_metadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to return std::monostate if any std container is empty? The main point is that we do not use std::optional to represent a missing value for these fields. Besides, fields of DataFileStructLike might be missing when ManifestGroup has specified projection.

case DataFileFieldPosition::kSplitOffsets:
return std::make_shared<VectorArrayLike<int64_t>>(data_file.split_offsets);
case DataFileFieldPosition::kEqualityIds:
return std::make_shared<VectorArrayLike<int32_t>>(data_file.equality_ids);
case DataFileFieldPosition::kSortOrderId:
return FromOptional(data_file.sort_order_id);
case DataFileFieldPosition::kFirstRowId:
return FromOptional(data_file.first_row_id);
case DataFileFieldPosition::kReferencedDataFile:
return FromOptionalString(data_file.referenced_data_file);
case DataFileFieldPosition::kContentOffset:
return FromOptional(data_file.content_offset);
case DataFileFieldPosition::kContentSize:
return FromOptional(data_file.content_size_in_bytes);
case DataFileFieldPosition::kNextUnusedId:
return InvalidArgument("Invalid data file field index: {}", pos);
}
return InvalidArgument("Invalid data file field index: {}", pos);
}

size_t DataFileStructLike::num_fields() const {
return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
}

} // namespace iceberg
21 changes: 21 additions & 0 deletions src/iceberg/row/manifest_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <functional>

#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/row/struct_like.h"

Expand Down Expand Up @@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public StructLike {
mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
};

/// \brief StructLike wrapper for DataFile metadata.
class ICEBERG_EXPORT DataFileStructLike : public StructLike {
public:
explicit DataFileStructLike(const DataFile& file) : data_file_(file) {}
~DataFileStructLike() override = default;

DataFileStructLike(const DataFileStructLike&) = delete;
DataFileStructLike& operator=(const DataFileStructLike&) = delete;

Result<Scalar> GetField(size_t pos) const override;

size_t num_fields() const override;

void Reset(const DataFile& file) { data_file_ = std::cref(file); }

private:
std::reference_wrapper<const DataFile> data_file_;
mutable std::shared_ptr<StructLike> partition_;
};

} // namespace iceberg
Loading
Loading