Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
"File format name: avro, orc, or parquet");

static constexpr int32_t kSpecIdFieldId = 141;
inline static const SchemaField kSpecId =
SchemaField::MakeOptional(kSpecIdFieldId, "spec_id", int32(), "Partition spec ID");

static constexpr int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const std::string kPartitionDoc =
Expand Down
99 changes: 92 additions & 7 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

#include "iceberg/manifest/manifest_group.h"

#include <algorithm>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include "iceberg/expression/binder.h"
#include "iceberg/expression/evaluator.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/manifest_evaluator.h"
Expand All @@ -29,14 +35,47 @@
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/manifest_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/table_scan.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

std::shared_ptr<Schema> DataFileFilterSchema() {
auto empty_partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
return std::make_shared<Schema>(std::vector<SchemaField>{
DataFile::kContent,
DataFile::kFilePath,
DataFile::kFileFormat,
DataFile::kSpecId,
SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField,
std::move(empty_partition_type), DataFile::kPartitionDoc),
DataFile::kRecordCount,
DataFile::kFileSize,
DataFile::kColumnSizes,
DataFile::kValueCounts,
DataFile::kNullValueCounts,
DataFile::kNanValueCounts,
DataFile::kLowerBounds,
DataFile::kUpperBounds,
DataFile::kKeyMetadata,
DataFile::kSplitOffsets,
DataFile::kEqualityIds,
DataFile::kSortOrderId,
DataFile::kFirstRowId,
DataFile::kReferencedDataFile,
DataFile::kContentOffset,
DataFile::kContentSize});
}

} // namespace

Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
Expand Down Expand Up @@ -265,10 +304,39 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));

auto columns = columns_;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
!columns.empty() &&
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
auto data_file_schema = DataFileFilterSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto bound_file_filter,
Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));

std::unordered_set<std::string> selected_columns(columns.cbegin(), columns.cend());
for (const auto field_id : referenced_field_ids) {
if (field_id == DataFile::kSpecIdFieldId) {
continue;
}
ICEBERG_ASSIGN_OR_RAISE(auto column_name,
data_file_schema->FindColumnNameById(field_id));
if (column_name.has_value()) {
std::string column_name_str(column_name.value());
if (selected_columns.contains(column_name_str)) {
continue;
}
columns.push_back(std::move(column_name_str));
selected_columns.insert(columns.back());
}
}
}

reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
.CaseSensitive(case_sensitive_)
.Select(columns_);
.Select(std::move(columns));

return reader;
}
Expand Down Expand Up @@ -299,11 +367,23 @@ ManifestGroup::ReadEntries() {
return eval_cache[spec_id].get();
};

const bool has_file_filter =
file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
std::unique_ptr<Evaluator> data_file_evaluator;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
// TODO(gangwu): create an Evaluator on the DataFile schema with empty
// partition type
}
auto get_data_file_evaluator = [&]() -> Result<Evaluator*> {
if (!has_file_filter) {
return nullptr;
}
if (data_file_evaluator != nullptr) {
return data_file_evaluator.get();
}

ICEBERG_ASSIGN_OR_RAISE(
data_file_evaluator,
Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_));

return data_file_evaluator.get();
};

std::unordered_map<int32_t, std::vector<ManifestEntry>> result;

Expand Down Expand Up @@ -336,15 +416,20 @@ ManifestGroup::ReadEntries() {
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
ICEBERG_ASSIGN_OR_RAISE(auto data_file_evaluator, get_data_file_evaluator());

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}

if (data_file_evaluator != nullptr) {
// TODO(gangwu): implement data_file_evaluator to evaluate StructLike on
// top of entry.data_file
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}

if (!manifest_entry_predicate_(entry)) {
Expand Down
173 changes: 173 additions & 0 deletions src/iceberg/row/manifest_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,57 @@

#include "iceberg/row/manifest_wrapper.h"

#include <iterator>
#include <map>
#include <memory>
#include <span>
#include <type_traits>
#include <vector>

#include "iceberg/manifest/manifest_reader_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

enum class DataFileFieldPosition : size_t {
kContent = 0,
kFilePath = 1,
kFileFormat = 2,
kSpecId = 3,
kPartition = 4,
kRecordCount = 5,
kFileSize = 6,
kColumnSizes = 7,
kValueCounts = 8,
kNullValueCounts = 9,
kNanValueCounts = 10,
kLowerBounds = 11,
kUpperBounds = 12,
kKeyMetadata = 13,
kSplitOffsets = 14,
kEqualityIds = 15,
kSortOrderId = 16,
kFirstRowId = 17,
kReferencedDataFile = 18,
kContentOffset = 19,
kContentSize = 20,
kNextUnusedId = 21,
};

template <typename T>
requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, std::string>
std::string_view ToView(const T& value) {
return {reinterpret_cast<const char*>(value.data()), value.size()}; // NOLINT
}

Scalar ToScalar(const int32_t value) { return value; }

Scalar ToScalar(const int64_t value) { return value; }

Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }

template <typename T>
Result<Scalar> FromOptional(const std::optional<T>& value) {
if (value.has_value()) {
Expand All @@ -39,6 +78,79 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}

Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
if (value.has_value()) {
return ToView(value.value());
}
return std::monostate{};
}

template <typename T>
class VectorArrayLike : public ArrayLike {
public:
explicit VectorArrayLike(std::span<const T> values) : values_(values) {}

Result<Scalar> GetElement(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid array index: {}", pos);
}
return ToScalar(values_[pos]);
}

size_t size() const override { return values_.size(); }

private:
std::span<const T> values_;
};

template <typename V>
class IntMapLike : public MapLike {
public:
explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}

Result<Scalar> GetKey(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return std::next(values_.get().cbegin(), pos)->first;
}

Result<Scalar> GetValue(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return ToScalar(std::next(values_.get().cbegin(), pos)->second);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::map<int32_t, V>> values_;
};

template <typename V>
Result<Scalar> FromOptionalMap(const std::map<int32_t, V>& values) {
if (values.empty()) {
return std::monostate{};
}
return std::make_shared<IntMapLike<V>>(values);
}

template <typename T>
Result<Scalar> FromOptionalVector(const std::vector<T>& values) {
if (values.empty()) {
return std::monostate{};
}
return std::make_shared<VectorArrayLike<T>>(values);
}

Result<Scalar> FromOptionalBytes(const std::vector<uint8_t>& value) {
if (value.empty()) {
return std::monostate{};
}
return ToView(value);
}

} // namespace

Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
Expand Down Expand Up @@ -134,4 +246,65 @@ std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}

Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
if (pos >= num_fields()) {
return InvalidArgument("Invalid data file field index: {}", pos);
}

const auto& data_file = data_file_.get();
switch (static_cast<DataFileFieldPosition>(pos)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expose partition_spec_id field as an optional field? FYI Java puts it at pos 3 (zero-based).

case DataFileFieldPosition::kContent:
return static_cast<int32_t>(data_file.content);
case DataFileFieldPosition::kFilePath:
return ToView(data_file.file_path);
case DataFileFieldPosition::kFileFormat:
return ToString(data_file.file_format);
case DataFileFieldPosition::kSpecId:
return FromOptional(data_file.partition_spec_id);
case DataFileFieldPosition::kPartition: {
partition_ = std::make_shared<PartitionValues>(data_file.partition);
return partition_;
}
case DataFileFieldPosition::kRecordCount:
return data_file.record_count;
case DataFileFieldPosition::kFileSize:
return data_file.file_size_in_bytes;
case DataFileFieldPosition::kColumnSizes:
return FromOptionalMap(data_file.column_sizes);
case DataFileFieldPosition::kValueCounts:
return FromOptionalMap(data_file.value_counts);
case DataFileFieldPosition::kNullValueCounts:
return FromOptionalMap(data_file.null_value_counts);
case DataFileFieldPosition::kNanValueCounts:
return FromOptionalMap(data_file.nan_value_counts);
case DataFileFieldPosition::kLowerBounds:
return FromOptionalMap(data_file.lower_bounds);
case DataFileFieldPosition::kUpperBounds:
return FromOptionalMap(data_file.upper_bounds);
case DataFileFieldPosition::kKeyMetadata:
return FromOptionalBytes(data_file.key_metadata);
case DataFileFieldPosition::kSplitOffsets:
return FromOptionalVector(data_file.split_offsets);
case DataFileFieldPosition::kEqualityIds:
return FromOptionalVector(data_file.equality_ids);
case DataFileFieldPosition::kSortOrderId:
return FromOptional(data_file.sort_order_id);
case DataFileFieldPosition::kFirstRowId:
return FromOptional(data_file.first_row_id);
case DataFileFieldPosition::kReferencedDataFile:
return FromOptionalString(data_file.referenced_data_file);
case DataFileFieldPosition::kContentOffset:
return FromOptional(data_file.content_offset);
case DataFileFieldPosition::kContentSize:
return FromOptional(data_file.content_size_in_bytes);
case DataFileFieldPosition::kNextUnusedId:
return InvalidArgument("Invalid data file field index: {}", pos);
}
return InvalidArgument("Invalid data file field index: {}", pos);
}

size_t DataFileStructLike::num_fields() const {
return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
}

} // namespace iceberg
Loading
Loading