diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 30d6eadf0763d..ff10157d35249 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -444,6 +444,32 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest_utils", + srcs = [ + "manifest_utils.cc", + ], + hdrs = [ + "manifest_utils.h", + ], + implementation_deps = [ + ":compatibility", + ":logger", + ":manifest", + ":partition_key_type", + ":values_bytes", + ], + deps = [ + ":action", + ":manifest_entry", + ":manifest_io", + ":manifest_list", + ":schema", + ":table_metadata", + "//src/v/base", + ], +) + redpanda_cc_library( name = "merge_append_action", srcs = [ @@ -453,9 +479,7 @@ redpanda_cc_library( "merge_append_action.h", ], implementation_deps = [ - ":compatibility", ":logger", - ":manifest", ":manifest_file_packer", ":snapshot", ":table_requirement", @@ -466,9 +490,37 @@ redpanda_cc_library( ":manifest_entry", ":manifest_io", ":manifest_list", + ":manifest_utils", + ":schema", + ":table_metadata", + "//src/v/base", + ], +) + +redpanda_cc_library( + name = "row_delta_action", + srcs = [ + "row_delta_action.cc", + ], + hdrs = [ + "row_delta_action.h", + ], + implementation_deps = [ + ":logger", + ":manifest_file_packer", + ":snapshot", + ":table_requirement", + "//src/v/random:generators", + ], + deps = [ + ":action", + ":manifest_entry", + ":manifest_io", + ":manifest_list", + ":manifest_utils", + ":merge_append_action", ":schema", ":table_metadata", - ":values_bytes", "//src/v/base", ], ) @@ -554,6 +606,30 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "equality_delete_file", + srcs = [ + "equality_delete_file.cc", + ], + hdrs = [ + "equality_delete_file.h", + ], + implementation_deps = [ + "//src/v/bytes:iostream", + "//src/v/serde/parquet:writer", + ], + visibility = ["//visibility:public"], + deps = [ + ":datatypes", + ":manifest_entry", + "//src/v/bytes:iobuf", + "//src/v/container:chunked_vector", + "//src/v/serde/parquet:schema", + "//src/v/serde/parquet:value", + "@seastar", + ], +) + redpanda_cc_library( name = "partition_key_type", srcs = [ @@ -967,6 +1043,7 @@ redpanda_cc_library( ":action", ":manifest_io", ":merge_append_action", + ":row_delta_action", ":schema", ":table_metadata", ":table_requirement", diff --git a/src/v/iceberg/equality_delete_file.cc b/src/v/iceberg/equality_delete_file.cc new file mode 100644 index 0000000000000..140f07da9d2ed --- /dev/null +++ b/src/v/iceberg/equality_delete_file.cc @@ -0,0 +1,50 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/equality_delete_file.h" + +#include "bytes/iostream.h" +#include "serde/parquet/writer.h" + +namespace iceberg { + +ss::future write_equality_delete_file( + equality_delete_options opts, + chunked_vector rows) { + iobuf file; + serde::parquet::writer w( + {.schema = std::move(opts.parquet_schema), .compress = opts.compress}, + make_iobuf_ref_output_stream(file)); + co_await w.init(); + + auto record_count = rows.size(); + for (auto& row : rows) { + co_await w.write_row(std::move(row)); + } + + co_await w.close(); + + data_file df{ + .content_type = data_file_content_type::equality_deletes, + .file_path = uri{}, + .file_format = data_file_format::parquet, + .partition = partition_key{}, + .record_count = record_count, + .file_size_bytes = file.size_bytes(), + .equality_ids = std::move(opts.equality_field_ids), + }; + + co_return delete_file_result{ + .file_data = std::move(file), + .manifest_entry = std::move(df), + }; +} + +} // namespace iceberg diff --git a/src/v/iceberg/equality_delete_file.h b/src/v/iceberg/equality_delete_file.h new file mode 100644 index 0000000000000..02af4ddf3760f --- /dev/null +++ b/src/v/iceberg/equality_delete_file.h @@ -0,0 +1,44 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "bytes/iobuf.h" +#include "container/chunked_vector.h" +#include "iceberg/datatypes.h" +#include "iceberg/manifest_entry.h" +#include "serde/parquet/schema.h" +#include "serde/parquet/value.h" + +#include + +namespace iceberg { + +struct delete_file_result { + iobuf file_data; + data_file manifest_entry; +}; + +struct equality_delete_options { + serde::parquet::schema_element parquet_schema; + chunked_vector equality_field_ids; + bool compress = false; +}; + +/// \brief Write an equality delete file in parquet format from key column +/// values. +/// +/// The schema must match the equality columns from the table schema. The +/// resulting data_file has content_type=equality_deletes and equality_ids +/// populated from the provided field IDs. +ss::future write_equality_delete_file( + equality_delete_options opts, + chunked_vector rows); + +} // namespace iceberg diff --git a/src/v/iceberg/manifest_utils.cc b/src/v/iceberg/manifest_utils.cc new file mode 100644 index 0000000000000..a4e230a1bb28f --- /dev/null +++ b/src/v/iceberg/manifest_utils.cc @@ -0,0 +1,354 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "iceberg/manifest_utils.h" + +#include "base/vlog.h" +#include "iceberg/compatibility.h" +#include "iceberg/logger.h" +#include "iceberg/manifest.h" +#include "iceberg/partition_key_type.h" +#include "iceberg/values_bytes.h" + +namespace iceberg { + +namespace { + +uri get_metadata_location(const table_metadata& table) { + static constexpr std::string_view write_metadata_path_prop + = "write.metadata.path"; + + if (table.properties.has_value()) { + auto it = table.properties->find(write_metadata_path_prop); + if (it != table.properties->end()) { + return uri(it->second); + } + } + + return uri(fmt::format("{}/metadata", table.location)); +} + +uri get_manifest_path( + const table_metadata& table, const uuid_t& commit_uuid, size_t num) { + auto metadata_location = get_metadata_location(table); + return uri( + fmt::format("{}/{}-m{}.avro", metadata_location, commit_uuid, num)); +} + +action::errc to_action_errc(metadata_io::errc e) { + switch (e) { + case metadata_io::errc::failed: + return action::errc::io_failed; + case metadata_io::errc::shutting_down: + return action::errc::shutting_down; + case metadata_io::errc::invalid_uri: + return action::errc::unexpected_state; + case metadata_io::errc::timedout: + return action::errc::io_failed; + } +} + +manifest_file_content to_file_content(manifest_content_type t) { + switch (t) { + case manifest_content_type::data: + return manifest_file_content::data; + case manifest_content_type::deletes: + return manifest_file_content::deletes; + } +} + +void update_partition_summaries( + const data_file& f, chunked_vector& summaries) { + const auto& pk_val_fields = f.partition.val->fields; + for (size_t i = 0; i < summaries.size(); ++i) { + const auto& file_val_field = pk_val_fields[i]; + if (!file_val_field.has_value()) { + summaries[i].contains_null = true; + continue; + } + const auto& file_prim_val = std::get( + file_val_field.value()); + if (!summaries[i].lower_bound.has_value()) { + summaries[i].lower_bound = make_copy(file_prim_val); + } else { + auto& lb = summaries[i].lower_bound.value(); + if (file_prim_val < lb) { + lb = make_copy(file_prim_val); + } + } + if (!summaries[i].upper_bound.has_value()) { + summaries[i].upper_bound = make_copy(file_prim_val); + } else { + auto& ub = summaries[i].upper_bound.value(); + if (ub < file_prim_val) { + ub = make_copy(file_prim_val); + } + } + } +} + +chunked_vector release_with_bytes(field_summary_val::list_t l) { + chunked_vector ret; + ret.reserve(l.size()); + for (auto& v : l) { + ret.emplace_back(std::move(v).release_with_bytes()); + } + return ret; +} + +void promote_partition_key_type( + partition_key& pk, const partition_key_type& pk_type) { + vassert( + pk.val->fields.size() == pk_type.type.fields.size(), + "unexpected partition key size: {} (expected: {})", + pk.val->fields.size(), + pk_type.type.fields.size()); + for (size_t i = 0; i < pk.val->fields.size(); ++i) { + auto& field = pk.val->fields[i]; + if (field) { + const auto& type = std::get( + pk_type.type.fields[i]->type); + field = promote_primitive_value_type( + std::move(std::get(*field)), type); + } + } +} + +} // namespace + +field_summary_val::list_t +field_summary_val::empty_summaries(size_t num_fields) { + field_summary_val::list_t ret; + ret.reserve(num_fields); + for (size_t i = 0; i < num_fields; ++i) { + ret.emplace_back(field_summary_val{}); + } + return ret; +} + +field_summary field_summary_val::release_with_bytes() && { + std::optional lb; + std::optional ub; + if (lower_bound.has_value()) { + lb = value_to_bytes(value{std::move(lower_bound).value()}); + } + if (upper_bound.has_value()) { + ub = value_to_bytes(value{std::move(upper_bound).value()}); + } + return field_summary{ + .contains_null = contains_null, + .contains_nan = contains_nan, + .lower_bound = std::move(lb), + .upper_bound = std::move(ub), + }; +} + +ss::future> upload_as_manifest( + manifest_io& io, + const uri& path, + const schema& schema, + const partition_spec& pspec, + manifest_content_type content_type, + chunked_vector entries) { + vlog( + log.info, + "Uploading manifest with {} entries to {}", + entries.size(), + path); + manifest m{ + .metadata = manifest_metadata{ + .schema = schema.copy(), + .partition_spec = pspec.copy(), + .format_version = format_version::v2, + .manifest_content_type = content_type, + }, + .entries = std::move(entries), + }; + co_return co_await io.upload_manifest(path, m); +} + +ss::future> merge_mfiles( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + manifest_content_type content_type, + chunked_vector to_merge, + chunked_vector added_entries, + std::optional max_schema_id_in_added, + const partition_spec& pspec, + const table_snapshot_ctx& ctx) { + vlogl( + log, + to_merge.empty() ? ss::log_level::debug : ss::log_level::info, + "Merging {} manifest files and {} added manifest entries", + to_merge.size(), + added_entries.size()); + + const size_t added_files = added_entries.size(); + size_t added_rows = 0; + for (const auto& e : added_entries) { + added_rows += e.data_file.record_count; + } + + auto merged_entries = std::move(added_entries); + auto max_schema_id = max_schema_id_in_added.value_or(schema::id_t::min()); + size_t existing_rows = 0; + size_t existing_files = 0; + auto min_seq_num = ctx.seq_num; + for (const auto& mfile : to_merge) { + auto mfile_res = co_await io.download_manifest(mfile.manifest_path); + if (mfile_res.has_error()) { + co_return to_action_errc(mfile_res.error()); + } + auto m = std::move(mfile_res).value(); + max_schema_id = std::max(max_schema_id, m.metadata.schema.schema_id); + existing_files += m.entries.size(); + for (auto& e : m.entries) { + auto f_num_fields = e.data_file.partition.val->fields.size(); + if (f_num_fields != pspec.fields.size()) { + vlog( + log.error, + "Partition key for data file {} has {} fields, expected {}", + e.data_file.file_path, + f_num_fields, + pspec.fields.size()); + co_return action::errc::unexpected_state; + } + + existing_rows += e.data_file.record_count; + if (e.status == manifest_entry_status::added) { + e.status = manifest_entry_status::existing; + e.sequence_number = e.sequence_number.value_or( + mfile.seq_number); + e.file_sequence_number = e.file_sequence_number.value_or( + file_sequence_number{mfile.seq_number()}); + } + if (e.sequence_number.has_value()) { + min_seq_num = std::min(min_seq_num, e.sequence_number.value()); + } + } + std::move( + m.entries.begin(), + m.entries.end(), + std::back_inserter(merged_entries)); + } + + auto schema_id = pspec.spec_id == table.default_spec_id + ? table.current_schema_id + : max_schema_id; + const auto* resolved_schema = table.get_schema(schema_id); + if (!resolved_schema) { + vlog(log.error, "Table schema {} is missing from metadata", schema_id); + co_return action::errc::unexpected_state; + } + + auto pk_type = partition_key_type::create(pspec, *resolved_schema); + auto partition_summaries = field_summary_val::empty_summaries( + pspec.fields.size()); + for (auto& e : merged_entries) { + try { + promote_partition_key_type(e.data_file.partition, pk_type); + update_partition_summaries(e.data_file, partition_summaries); + } catch (const std::exception& ex) { + vlog( + log.error, + "bad partition key for file {}: {}", + e.data_file.file_path, + ex); + co_return action::errc::unexpected_state; + } + } + + const auto merged_manifest_path = get_manifest_path( + table, ctx.commit_uuid, gen_manifest_num()); + const auto mfile_up_res = co_await upload_as_manifest( + io, + merged_manifest_path, + *resolved_schema, + pspec, + content_type, + std::move(merged_entries)); + if (mfile_up_res.has_error()) { + co_return to_action_errc(mfile_up_res.error()); + } + manifest_file merged_file{ + .manifest_path = merged_manifest_path, + .manifest_length = mfile_up_res.value(), + .partition_spec_id = pspec.spec_id, + .content = to_file_content(content_type), + .seq_number = ctx.seq_num, + .min_seq_number = min_seq_num, + .added_snapshot_id = ctx.snap_id, + .added_files_count = added_files, + .existing_files_count = existing_files, + .deleted_files_count = 0, + .added_rows_count = added_rows, + .existing_rows_count = existing_rows, + .deleted_rows_count = 0, + .partitions = release_with_bytes(std::move(partition_summaries)), + }; + co_return merged_file; +} + +ss::future, action::errc>> +maybe_merge_mfiles_and_new_entries( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + manifest_content_type content_type, + size_t min_to_merge, + chunked_vector to_merge, + chunked_vector new_entries, + std::optional max_added_schema_id, + const partition_spec& pspec, + const table_snapshot_ctx& ctx) { + vlog( + log.info, + "Considering {} existing manifest files and {} entries to merge", + to_merge.size(), + new_entries.size()); + chunked_vector ret; + if (to_merge.size() < min_to_merge) { + if (!new_entries.empty()) { + auto new_mfile_res = co_await merge_mfiles( + io, + table, + std::move(gen_manifest_num), + content_type, + {}, + std::move(new_entries), + max_added_schema_id, + pspec, + ctx); + if (new_mfile_res.has_error()) { + co_return new_mfile_res.error(); + } + ret.emplace_back(std::move(new_mfile_res.value())); + } + std::move(to_merge.begin(), to_merge.end(), std::back_inserter(ret)); + co_return ret; + } + auto merged_mfile_res = co_await merge_mfiles( + io, + table, + std::move(gen_manifest_num), + content_type, + std::move(to_merge), + std::move(new_entries), + max_added_schema_id, + pspec, + ctx); + if (merged_mfile_res.has_error()) { + co_return merged_mfile_res.error(); + } + ret.emplace_back(std::move(merged_mfile_res.value())); + co_return ret; +} + +} // namespace iceberg diff --git a/src/v/iceberg/manifest_utils.h b/src/v/iceberg/manifest_utils.h new file mode 100644 index 0000000000000..c731caa524190 --- /dev/null +++ b/src/v/iceberg/manifest_utils.h @@ -0,0 +1,97 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "base/outcome.h" +#include "iceberg/action.h" +#include "iceberg/manifest.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_io.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" + +namespace iceberg { + +/// Container for metadata required to build manifest_file::partitions (the +/// field summaries for each partition key field). +/// +/// Unlike the field_summary in manifest_file, which stores bytes per bound, +/// this is value-comparable by maintaining the bounds as values instead of +/// serialized bytes. Note that only values that are the same primitive_value +/// variant are directly comparable. +struct field_summary_val { + using list_t = chunked_vector; + static list_t empty_summaries(size_t num_fields); + field_summary release_with_bytes() &&; + + bool contains_null{false}; + std::optional contains_nan; + std::optional lower_bound; + std::optional upper_bound; +}; + +/// Context containing fields resolved from table metadata that pertain to +/// the new snapshot created by an action. +struct table_snapshot_ctx { + const uuid_t& commit_uuid; + const snapshot_id snap_id; + const sequence_number seq_num; +}; + +/// Callback type for generating unique manifest numbers within an action. +using manifest_num_gen = ss::noncopyable_function; + +/// Uploads the given manifest entries as a new manifest file. +/// +/// Returns the size of the resulting uploaded file. +ss::future> upload_as_manifest( + manifest_io& io, + const uri& path, + const schema& schema, + const partition_spec& pspec, + manifest_content_type content_type, + chunked_vector entries); + +/// Merges existing manifest files with new entries, computing file/row +/// counts, building partition summaries, and uploading the result. +/// +/// \p content_type controls both the manifest metadata content type and +/// the manifest_file_content tag on the resulting manifest_file. +ss::future> merge_mfiles( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + manifest_content_type content_type, + chunked_vector to_merge, + chunked_vector added_entries, + std::optional max_added_schema_id, + const partition_spec& pspec, + const table_snapshot_ctx& ctx); + +/// Decides whether to merge manifests or just add a new one, based on +/// the \p min_to_merge threshold. +/// +/// Returns the resulting list of manifest files: size 1 when merging, +/// or the original size + 1 otherwise. +ss::future, action::errc>> +maybe_merge_mfiles_and_new_entries( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + manifest_content_type content_type, + size_t min_to_merge, + chunked_vector to_merge, + chunked_vector new_entries, + std::optional max_added_schema_id, + const partition_spec& pspec, + const table_snapshot_ctx& ctx); + +} // namespace iceberg diff --git a/src/v/iceberg/merge_append_action.cc b/src/v/iceberg/merge_append_action.cc index 342451dbb1879..065eeb75ad5d7 100644 --- a/src/v/iceberg/merge_append_action.cc +++ b/src/v/iceberg/merge_append_action.cc @@ -11,26 +11,20 @@ #include "base/units.h" #include "base/vlog.h" -#include "iceberg/compatibility.h" #include "iceberg/logger.h" -#include "iceberg/manifest.h" #include "iceberg/manifest_file_packer.h" #include "iceberg/manifest_list.h" +#include "iceberg/manifest_utils.h" #include "iceberg/snapshot.h" #include "iceberg/table_requirement.h" -#include "iceberg/values_bytes.h" #include "random/generators.h" -#include #include namespace iceberg { namespace { -// Derive the metadata location from table properties. -// Some catalogs require respecting the property `write.metadata.path`, -// but the default location is /metadata. uri get_metadata_location(const table_metadata& table) { static constexpr std::string_view write_metadata_path_prop = "write.metadata.path"; @@ -45,12 +39,6 @@ uri get_metadata_location(const table_metadata& table) { return uri(fmt::format("{}/metadata", table.location)); } -uri get_manifest_path( - const table_metadata& table, const uuid_t& commit_uuid, size_t num) { - auto metadata_location = get_metadata_location(table); - return uri( - fmt::format("{}/{}-m{}.avro", metadata_location, commit_uuid, num)); -} uri get_manifest_list_path( const table_metadata& table, snapshot_id snap_id, @@ -70,8 +58,6 @@ action::errc to_action_errc(metadata_io::errc e) { case metadata_io::errc::invalid_uri: return action::errc::unexpected_state; case metadata_io::errc::timedout: - // NOTE: treat IO timeouts the same as other IO failures. - // TODO: build out retry logic. return action::errc::io_failed; } } @@ -86,7 +72,6 @@ snapshot_id generate_unused_snap_id(const table_metadata& m) { if (!m.snapshots.has_value() || m.snapshots->empty()) { return sid; } - // Repeatedly try to generate a new snapshot id that isn't used already. const auto& snaps = *m.snapshots; while (std::ranges::find(snaps, sid, &snapshot::id) != snaps.end()) { sid = random_snap_id(); @@ -94,78 +79,137 @@ snapshot_id generate_unused_snap_id(const table_metadata& m) { return sid; } -void update_partition_summaries( - const data_file& f, chunked_vector& summaries) { - // NOTE: callers should have validated that partition keys of the data - // files have the same number of fields as the partition key used to - // construct the summaries. - const auto& pk_val_fields = f.partition.val->fields; - for (size_t i = 0; i < summaries.size(); ++i) { - const auto& file_val_field = pk_val_fields[i]; - if (!file_val_field.has_value()) { - summaries[i].contains_null = true; - continue; - } - // TODO: contains_nan - const auto& file_prim_val = std::get( - file_val_field.value()); - if (!summaries[i].lower_bound.has_value()) { - summaries[i].lower_bound = make_copy(file_prim_val); - } else { - auto& lb = summaries[i].lower_bound.value(); - if (file_prim_val < lb) { - lb = make_copy(file_prim_val); - } - } - if (!summaries[i].upper_bound.has_value()) { - summaries[i].upper_bound = make_copy(file_prim_val); - } else { - auto& ub = summaries[i].upper_bound.value(); - if (ub < file_prim_val) { - ub = make_copy(file_prim_val); - } - } +// Converts file_to_append entries into manifest_entries and a max schema id, +// then delegates to maybe_merge_mfiles_and_new_entries. +ss::future, action::errc>> +maybe_merge_mfiles_and_new_data( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + size_t min_to_merge, + chunked_vector to_merge, + chunked_vector new_data_files, + const partition_spec& pspec, + const table_snapshot_ctx& ctx) { + chunked_vector new_data_entries; + auto max_schema_id_in_added = schema::id_t::min(); + for (auto& f : new_data_files) { + max_schema_id_in_added = std::max(max_schema_id_in_added, f.schema_id); + manifest_entry e{ + .status = manifest_entry_status::added, + .snapshot_id = ctx.snap_id, + .sequence_number = std::nullopt, + .file_sequence_number = std::nullopt, + .data_file = std::move(f.file), + }; + new_data_entries.emplace_back(std::move(e)); } + co_return co_await maybe_merge_mfiles_and_new_entries( + io, + table, + std::move(gen_manifest_num), + manifest_content_type::data, + min_to_merge, + std::move(to_merge), + std::move(new_data_entries), + new_data_files.empty() ? std::nullopt + : std::make_optional(max_schema_id_in_added), + pspec, + ctx); } -chunked_vector release_with_bytes(field_summary_val::list_t l) { - chunked_vector ret; - ret.reserve(l.size()); - for (auto& v : l) { - ret.emplace_back(std::move(v).release_with_bytes()); +// Distributes manifest files and new data files by partition spec, bin-packs, +// and merges/creates manifests as needed. +ss::future, action::errc>> +pack_mlist_and_new_data( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + size_t min_to_merge, + size_t target_size_bytes, + const table_snapshot_ctx& ctx, + manifest_list old_mlist, + chunked_vector new_data_files) { + struct per_spec_data { + chunked_vector existing_manifests; + chunked_vector new_data_files; + }; + + chunked_hash_map spec2data; + for (auto& m : old_mlist.files) { + auto spec_id = m.partition_spec_id; + spec2data[spec_id].existing_manifests.push_back(std::move(m)); + } + for (auto& f : new_data_files) { + auto spec_id = f.partition_spec_id; + spec2data[spec_id].new_data_files.push_back(std::move(f)); } - return ret; -} -} // namespace + chunked_vector new_mfiles; + for (auto& [spec_id, data] : spec2data) { + const auto* pspec = table.get_partition_spec(spec_id); + if (!pspec) { + vlog(log.error, "partition spec {} not found in metadata", spec_id); + co_return action::errc::unexpected_state; + } -field_summary_val::list_t -field_summary_val::empty_summaries(size_t num_fields) { - field_summary_val::list_t ret; - ret.reserve(num_fields); - for (size_t i = 0; i < num_fields; ++i) { - ret.emplace_back(field_summary_val{}); - } - return ret; -} + auto num_old_manifests = data.existing_manifests.size(); + auto binned_mfiles = manifest_packer::pack( + target_size_bytes, std::move(data.existing_manifests)); + vlog( + log.info, + "Packed {} manifests into {} bins for partition spec id {}", + num_old_manifests, + binned_mfiles.size(), + spec_id); + if (binned_mfiles.empty()) { + binned_mfiles.emplace_back(chunked_vector{}); + } + auto merged_bins_res = co_await maybe_merge_mfiles_and_new_data( + io, + table, + [&gen_manifest_num]() { return gen_manifest_num(); }, + min_to_merge, + std::move(binned_mfiles[0]), + std::move(data.new_data_files), + *pspec, + ctx); + if (merged_bins_res.has_error()) { + co_return merged_bins_res.error(); + } + auto merged_bins = std::move(merged_bins_res.value()); + std::move( + merged_bins.begin(), + merged_bins.end(), + std::back_inserter(new_mfiles)); -field_summary field_summary_val::release_with_bytes() && { - std::optional lb; - std::optional ub; - if (lower_bound.has_value()) { - lb = value_to_bytes(value{std::move(lower_bound).value()}); - } - if (upper_bound.has_value()) { - ub = value_to_bytes(value{std::move(upper_bound).value()}); + for (size_t i = 1; i < binned_mfiles.size(); i++) { + auto& bin = binned_mfiles[i]; + if (bin.size() == 1) { + new_mfiles.emplace_back(std::move(bin[0])); + continue; + } + auto merged_bin_res = co_await merge_mfiles( + io, + table, + [&gen_manifest_num]() { return gen_manifest_num(); }, + manifest_content_type::data, + std::move(bin), + {}, + std::nullopt, + *pspec, + ctx); + if (merged_bin_res.has_error()) { + co_return merged_bin_res.error(); + } + new_mfiles.emplace_back(std::move(merged_bin_res.value())); + } } - return field_summary{ - .contains_null = contains_null, - .contains_nan = contains_nan, - .lower_bound = std::move(lb), - .upper_bound = std::move(ub), - }; + co_return new_mfiles; } +} // namespace + ss::future merge_append_action::build_updates() && { vlog( log.info, @@ -213,8 +257,6 @@ ss::future merge_append_action::build_updates() && { std::optional old_summary; if (table_.snapshots.has_value() && !table_.snapshots->empty()) { if (!table_.current_snapshot_id.has_value()) { - // We have snapshots, but it's unclear which one to base our update - // off of. vlog( log.error, "Table's current snapshot id is not set but there are {} " @@ -222,14 +264,11 @@ ss::future merge_append_action::build_updates() && { table_.snapshots->size()); co_return action::errc::unexpected_state; } - // Look for the current snapshot. const auto table_cur_snap_id = *table_.current_snapshot_id; const auto& snaps = *table_.snapshots; auto snap_it = std::ranges::find( snaps, table_cur_snap_id, &snapshot::id); if (snap_it == snaps.end()) { - // We have snapshots, but the one we thought we needed to base our - // update off of is missing. vlog( log.error, "Table's current snapshot id {} is missing", @@ -264,7 +303,14 @@ ss::future merge_append_action::build_updates() && { }; auto mfiles_res = co_await pack_mlist_and_new_data( - ctx, std::move(mlist), std::move(new_data_files_)); + io_, + table_, + [this]() { return generate_manifest_num(); }, + default_min_to_merge_new_files, + default_target_size_bytes, + ctx, + std::move(mlist), + std::move(new_data_files_)); if (mfiles_res.has_error()) { co_return mfiles_res.error(); } @@ -297,8 +343,6 @@ ss::future merge_append_action::build_updates() && { .other = {}, }; if (old_summary) { - // Only update existing total metrics; otherwise we wouldn't have an - // accurate starting point. if (old_summary->total_data_files.has_value()) { new_summary.total_data_files = added_data_files + *old_summary->total_data_files; @@ -312,8 +356,6 @@ ss::future merge_append_action::build_updates() && { + *old_summary->total_files_size; } } else { - // This is the first summary. The totals are just what we're adding in - // (presumably) this first snapshot. new_summary.total_data_files = added_data_files; new_summary.total_records = added_records; new_summary.total_files_size = added_files_size; @@ -359,321 +401,4 @@ ss::future merge_append_action::build_updates() && { co_return ret; } -ss::future> -merge_append_action::upload_as_manifest( - const uri& path, - const schema& schema, - const partition_spec& pspec, - chunked_vector entries) { - vlog( - log.info, - "Uploading manifest with {} entries to {}", - entries.size(), - path); - manifest m{ - .metadata = manifest_metadata{ - .schema = schema.copy(), - .partition_spec = pspec.copy(), - .format_version = format_version::v2, - .manifest_content_type = manifest_content_type::data, - }, - .entries = std::move(entries), - }; - co_return co_await io_.upload_manifest(path, m); -} - -namespace { - -// Depending on how schema evolves, type of partition key fields can change -// (e.g. a field type can be promoted from int to long). This means that -// partition values of all data files in the new manifest need to be -// promoted to the common type before creating the manifest. -// -// Preconditions: pk and pk_type have the same size, and the promotion for each -// field is possible. -void promote_partition_key_type( - partition_key& pk, const partition_key_type& pk_type) { - // ensured by the callers - vassert( - pk.val->fields.size() == pk_type.type.fields.size(), - "unexpected partition key size: {} (expected: {})", - pk.val->fields.size(), - pk_type.type.fields.size()); - for (size_t i = 0; i < pk.val->fields.size(); ++i) { - auto& field = pk.val->fields[i]; - if (field) { - const auto& type = std::get( - pk_type.type.fields[i]->type); - field = promote_primitive_value_type( - std::move(std::get(*field)), type); - } - } -} - -} // namespace - -ss::future, action::errc>> -merge_append_action::maybe_merge_mfiles_and_new_data( - chunked_vector to_merge, - chunked_vector new_data_files, - const partition_spec& pspec, - const table_snapshot_ctx& ctx) { - vlog( - log.info, - "Considering {} existing manifest files and {} data files to merge", - to_merge.size(), - new_data_files.size()); - // First construct some manifest entries for the new data files. Regardless - // of if we upload a brand new manifest or merge with an existing manifest, - // the new data files will need new entries. - chunked_vector new_data_entries; - auto max_schema_id_in_added = schema::id_t::min(); - for (auto& f : new_data_files) { - max_schema_id_in_added = std::max(max_schema_id_in_added, f.schema_id); - manifest_entry e{ - .status = manifest_entry_status::added, - .snapshot_id = ctx.snap_id, - .sequence_number = std::nullopt, - .file_sequence_number = std::nullopt, - .data_file = std::move(f.file), - }; - new_data_entries.emplace_back(std::move(e)); - } - chunked_vector ret; - if (to_merge.size() < default_min_to_merge_new_files) { - // Upload and return. This bin is too small to merge. - if (!new_data_files.empty()) { - auto new_mfile_res = co_await merge_mfiles( - {}, - std::move(new_data_entries), - max_schema_id_in_added, - pspec, - ctx); - if (new_mfile_res.has_error()) { - co_return new_mfile_res.error(); - } - ret.emplace_back(std::move(new_mfile_res.value())); - } - std::move(to_merge.begin(), to_merge.end(), std::back_inserter(ret)); - co_return ret; - } - auto merged_mfile_res = co_await merge_mfiles( - std::move(to_merge), - std::move(new_data_entries), - max_schema_id_in_added, - pspec, - ctx); - if (merged_mfile_res.has_error()) { - co_return merged_mfile_res.error(); - } - ret.emplace_back(std::move(merged_mfile_res.value())); - co_return ret; -} - -ss::future> -merge_append_action::merge_mfiles( - chunked_vector to_merge, - chunked_vector added_entries, - std::optional max_schema_id_in_added, - const partition_spec& pspec, - const table_snapshot_ctx& ctx) { - vlogl( - log, - to_merge.empty() ? ss::log_level::debug : ss::log_level::info, - "Merging {} manifest files and {} added manifest entries", - to_merge.size(), - added_entries.size()); - - const size_t added_files = added_entries.size(); - size_t added_rows = 0; - for (const auto& e : added_entries) { - added_rows += e.data_file.record_count; - } - - auto merged_entries = std::move(added_entries); - auto max_schema_id = max_schema_id_in_added.value_or(schema::id_t::min()); - size_t existing_rows = 0; - size_t existing_files = 0; - auto min_seq_num = ctx.seq_num; - for (const auto& mfile : to_merge) { - // Download the manifest file and collect the entries into the merged - // container. - auto mfile_res = co_await io_.download_manifest(mfile.manifest_path); - if (mfile_res.has_error()) { - co_return to_action_errc(mfile_res.error()); - } - auto m = std::move(mfile_res).value(); - max_schema_id = std::max(max_schema_id, m.metadata.schema.schema_id); - existing_files += m.entries.size(); - for (auto& e : m.entries) { - auto f_num_fields = e.data_file.partition.val->fields.size(); - if (f_num_fields != pspec.fields.size()) { - vlog( - log.error, - "Partition key for data file {} has {} fields, expected {}", - e.data_file.file_path, - f_num_fields, - pspec.fields.size()); - co_return action::errc::unexpected_state; - } - - existing_rows += e.data_file.record_count; - // Rewrite sequence numbers for previously added entries. - // These entries refer to files committed prior to this action. - if (e.status == manifest_entry_status::added) { - e.status = manifest_entry_status::existing; - e.sequence_number = e.sequence_number.value_or( - mfile.seq_number); - e.file_sequence_number = e.file_sequence_number.value_or( - file_sequence_number{mfile.seq_number()}); - } - if (e.sequence_number.has_value()) { - min_seq_num = std::min(min_seq_num, e.sequence_number.value()); - } - } - std::move( - m.entries.begin(), - m.entries.end(), - std::back_inserter(merged_entries)); - } - - // If the partition spec is not the default one, there is a risk that the - // partition key type can't be resolved with the current schema (e.g. if the - // spec uses a source column that was subsequently deleted). To prevent - // that, we choose the schema with the highest id among all manifests and - // new files with this spec. Presumably, any of these schemas can be used to - // resolve the key type, but we choose the one with the highest id because - // it should contain the "most promoted" types, and therefore partition keys - // for all files can be promoted to the key type resolved with this schema. - auto schema_id = pspec.spec_id == table_.default_spec_id - ? table_.current_schema_id - : max_schema_id; - const auto* schema = table_.get_schema(schema_id); - if (!schema) { - vlog(log.error, "Table schema {} is missing from metadata", schema_id); - co_return errc::unexpected_state; - } - - auto pk_type = partition_key_type::create(pspec, *schema); - auto partition_summaries = field_summary_val::empty_summaries( - pspec.fields.size()); - for (auto& e : merged_entries) { - try { - promote_partition_key_type(e.data_file.partition, pk_type); - update_partition_summaries(e.data_file, partition_summaries); - } catch (const std::exception& ex) { - vlog( - log.error, - "bad partition key for file {}: {}", - e.data_file.file_path, - ex); - co_return errc::unexpected_state; - } - } - - const auto merged_manifest_path = get_manifest_path( - table_, ctx.commit_uuid, generate_manifest_num()); - const auto mfile_up_res = co_await upload_as_manifest( - merged_manifest_path, *schema, pspec, std::move(merged_entries)); - if (mfile_up_res.has_error()) { - co_return to_action_errc(mfile_up_res.error()); - } - manifest_file merged_file{ - .manifest_path = merged_manifest_path, - .manifest_length = mfile_up_res.value(), - .partition_spec_id = pspec.spec_id, - .content = manifest_file_content::data, - .seq_number = ctx.seq_num, - .min_seq_number = min_seq_num, - .added_snapshot_id = ctx.snap_id, - .added_files_count = added_files, - .existing_files_count = existing_files, - .deleted_files_count = 0, - .added_rows_count = added_rows, - .existing_rows_count = existing_rows, - .deleted_rows_count = 0, - .partitions = release_with_bytes(std::move(partition_summaries)), - }; - co_return merged_file; -} - -ss::future, action::errc>> -merge_append_action::pack_mlist_and_new_data( - const table_snapshot_ctx& ctx, - manifest_list old_mlist, - chunked_vector new_data_files) { - struct per_spec_data { - chunked_vector existing_manifests; - chunked_vector new_data_files; - }; - - chunked_hash_map spec2data; - for (auto& m : old_mlist.files) { - auto spec_id = m.partition_spec_id; - spec2data[spec_id].existing_manifests.push_back(std::move(m)); - } - for (auto& f : new_data_files) { - auto spec_id = f.partition_spec_id; - spec2data[spec_id].new_data_files.push_back(std::move(f)); - } - - chunked_vector new_mfiles; - for (auto& [spec_id, data] : spec2data) { - const auto* pspec = table_.get_partition_spec(spec_id); - if (!pspec) { - vlog(log.error, "partition spec {} not found in metadata", spec_id); - co_return errc::unexpected_state; - } - - auto num_old_manifests = data.existing_manifests.size(); - auto binned_mfiles = manifest_packer::pack( - default_target_size_bytes, std::move(data.existing_manifests)); - vlog( - log.info, - "Packed {} manifests into {} bins for partition spec id {}", - num_old_manifests, - binned_mfiles.size(), - spec_id); - if (binned_mfiles.empty()) { - // If we had no manifests at all, at least create an empty bin to - // add new manifests to below. - binned_mfiles.emplace_back(chunked_vector{}); - } - // Always add files to the first bin, which by convention will be the - // latest data. We may not merge existing manifests if the bin is - // small, but we'll at least add metadata for the new data files. - auto merged_bins_res = co_await maybe_merge_mfiles_and_new_data( - std::move(binned_mfiles[0]), - std::move(data.new_data_files), - *pspec, - ctx); - if (merged_bins_res.has_error()) { - co_return merged_bins_res.error(); - } - auto merged_bins = std::move(merged_bins_res.value()); - std::move( - merged_bins.begin(), - merged_bins.end(), - std::back_inserter(new_mfiles)); - - // Merge the rest of the bins. - for (size_t i = 1; i < binned_mfiles.size(); i++) { - auto& bin = binned_mfiles[i]; - if (bin.size() == 1) { - // The bin has only a single manifest so there's nothing to do, - // just add it as is. - new_mfiles.emplace_back(std::move(bin[0])); - continue; - } - auto merged_bin_res = co_await merge_mfiles( - std::move(bin), {}, std::nullopt, *pspec, ctx); - if (merged_bin_res.has_error()) { - co_return merged_bin_res.error(); - } - new_mfiles.emplace_back(std::move(merged_bin_res.value())); - } - } - co_return new_mfiles; -} - } // namespace iceberg diff --git a/src/v/iceberg/merge_append_action.h b/src/v/iceberg/merge_append_action.h index cf75f18b0b257..a3629f5b86d12 100644 --- a/src/v/iceberg/merge_append_action.h +++ b/src/v/iceberg/merge_append_action.h @@ -14,6 +14,7 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_io.h" #include "iceberg/manifest_list.h" +#include "iceberg/manifest_utils.h" #include "iceberg/schema.h" #include "iceberg/table_metadata.h" @@ -27,27 +28,6 @@ struct file_to_append { partition_spec::id_t partition_spec_id; }; -// Container for a metadata required to build manifest_file::partitions (the -// field summaries for each partition key field). -// -// Unlike the field_summary in manifest_file, which stores bytes per bound, -// this is value-comparable by maintaining the bounds as values instead of -// serialized bytes. Note that only values that are the same primitive_value -// variant are directly comparable. -struct field_summary_val { - using list_t = chunked_vector; - // Creates a list of field summaries meant to summarize partition key values - // with the given number of fields. - static list_t empty_summaries(size_t num_fields); - // Returns this summary with the bounds converted to bytes. - field_summary release_with_bytes() &&; - - bool contains_null{false}; - std::optional contains_nan; - std::optional lower_bound; - std::optional upper_bound; -}; - // An action that builds and uploads metadata to append a given list of data // files to the table's latest snapshot, merging together existing manifests if // there are too many. @@ -87,62 +67,8 @@ class merge_append_action : public action { ss::future build_updates() && final; private: - // Context containing various fields resolved from the table metadata. The - // fields here all pertain to the new snapshot created by this action. - struct table_snapshot_ctx { - const uuid_t& commit_uuid; - const snapshot_id snap_id; - const sequence_number seq_num; - }; - - // Returns a number that can be used to uniquely identify the next manifest - // upload within this action. size_t generate_manifest_num() { return next_manifest_num_++; } - // Uploads the given manifest entries as a new manifest, returning the size - // of the resulting file. - ss::future> upload_as_manifest( - const uri& path, - const schema& schema, - const partition_spec& pspec, - chunked_vector entries); - - // Takes the given list of manifest files and merges them with the given - // new data files if the list of files is long enough, or just adds a new - // manifest for the new data files otherwise. - // - // Returns the resulting list of manifest files, which will be size 1 in - // the merging case, or the input size + 1 otherwise. - ss::future, action::errc>> - maybe_merge_mfiles_and_new_data( - chunked_vector to_merge, - chunked_vector new_data_files, - const partition_spec& pspec, - const table_snapshot_ctx& ctx); - - // Takes the given list of manifest files and merges them with the optional - // new manifest entries (i.e. data file metadata). - ss::future> merge_mfiles( - chunked_vector to_merge, - chunked_vector added_entries, - std::optional max_added_schema_id, - const partition_spec& pspec, - const table_snapshot_ctx& ctx); - - // Takes the given manifest list and bin-packs them to reduce the number of - // manifests, adding new data files either to a new manifest or the latest - // bin if the number of files in the bin has reached a threshold. - // - // Returns the resulting list of manifest files, which should encompass all - // data from the latest snapshot + new data files, and can be written as a - // new manifest list and committed as a new snapshot. - ss::future, action::errc>> - pack_mlist_and_new_data( - const table_snapshot_ctx& ctx, - manifest_list old_mlist, - chunked_vector new_data_files); - -private: manifest_io& io_; const table_metadata& table_; const uuid_t commit_uuid_; diff --git a/src/v/iceberg/row_delta_action.cc b/src/v/iceberg/row_delta_action.cc new file mode 100644 index 0000000000000..dc7d41b920836 --- /dev/null +++ b/src/v/iceberg/row_delta_action.cc @@ -0,0 +1,549 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "iceberg/row_delta_action.h" + +#include "base/units.h" +#include "base/vlog.h" +#include "iceberg/logger.h" +#include "iceberg/manifest_file_packer.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_utils.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_requirement.h" +#include "random/generators.h" + +#include + +namespace iceberg { + +namespace { + +uri get_metadata_location(const table_metadata& table) { + static constexpr std::string_view write_metadata_path_prop + = "write.metadata.path"; + + if (table.properties.has_value()) { + auto it = table.properties->find(write_metadata_path_prop); + if (it != table.properties->end()) { + return uri(it->second); + } + } + + return uri(fmt::format("{}/metadata", table.location)); +} + +uri get_manifest_list_path( + const table_metadata& table, + snapshot_id snap_id, + const uuid_t& commit_uuid, + size_t num) { + auto metadata_location = get_metadata_location(table); + return uri{fmt::format( + "{}/snap-{}-{}-{}.avro", metadata_location, snap_id(), commit_uuid, num)}; +} + +action::errc to_action_errc(metadata_io::errc e) { + switch (e) { + case metadata_io::errc::failed: + return action::errc::io_failed; + case metadata_io::errc::shutting_down: + return action::errc::shutting_down; + case metadata_io::errc::invalid_uri: + return action::errc::unexpected_state; + case metadata_io::errc::timedout: + return action::errc::io_failed; + } +} + +snapshot_id random_snap_id() { + return snapshot_id{random_generators::get_int( + 0, std::numeric_limits::max())}; +} + +snapshot_id generate_unused_snap_id(const table_metadata& m) { + auto sid = random_snap_id(); + if (!m.snapshots.has_value() || m.snapshots->empty()) { + return sid; + } + const auto& snaps = *m.snapshots; + while (std::ranges::find(snaps, sid, &snapshot::id) != snaps.end()) { + sid = random_snap_id(); + } + return sid; +} + +// Converts file_to_append entries into manifest_entries and a max schema id, +// then delegates to maybe_merge_mfiles_and_new_entries. +ss::future, action::errc>> +maybe_merge_mfiles_and_new_data( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + size_t min_to_merge, + chunked_vector to_merge, + chunked_vector new_data_files, + const partition_spec& pspec, + const table_snapshot_ctx& ctx) { + chunked_vector new_data_entries; + auto max_schema_id_in_added = schema::id_t::min(); + for (auto& f : new_data_files) { + max_schema_id_in_added = std::max(max_schema_id_in_added, f.schema_id); + manifest_entry e{ + .status = manifest_entry_status::added, + .snapshot_id = ctx.snap_id, + .sequence_number = std::nullopt, + .file_sequence_number = std::nullopt, + .data_file = std::move(f.file), + }; + new_data_entries.emplace_back(std::move(e)); + } + co_return co_await maybe_merge_mfiles_and_new_entries( + io, + table, + std::move(gen_manifest_num), + manifest_content_type::data, + min_to_merge, + std::move(to_merge), + std::move(new_data_entries), + new_data_files.empty() ? std::nullopt + : std::make_optional(max_schema_id_in_added), + pspec, + ctx); +} + +// Distributes manifest files and new data files by partition spec, bin-packs, +// and merges/creates manifests as needed. +ss::future, action::errc>> +pack_mlist_and_new_data( + manifest_io& io, + const table_metadata& table, + manifest_num_gen gen_manifest_num, + size_t min_to_merge, + size_t target_size_bytes, + const table_snapshot_ctx& ctx, + chunked_vector old_data_mfiles, + chunked_vector new_data_files) { + struct per_spec_data { + chunked_vector existing_manifests; + chunked_vector new_data_files; + }; + + chunked_hash_map spec2data; + for (auto& m : old_data_mfiles) { + auto spec_id = m.partition_spec_id; + spec2data[spec_id].existing_manifests.push_back(std::move(m)); + } + for (auto& f : new_data_files) { + auto spec_id = f.partition_spec_id; + spec2data[spec_id].new_data_files.push_back(std::move(f)); + } + + chunked_vector new_mfiles; + for (auto& [spec_id, data] : spec2data) { + const auto* pspec = table.get_partition_spec(spec_id); + if (!pspec) { + vlog(log.error, "partition spec {} not found in metadata", spec_id); + co_return action::errc::unexpected_state; + } + + auto num_old_manifests = data.existing_manifests.size(); + auto binned_mfiles = manifest_packer::pack( + target_size_bytes, std::move(data.existing_manifests)); + vlog( + log.info, + "Packed {} manifests into {} bins for partition spec id {}", + num_old_manifests, + binned_mfiles.size(), + spec_id); + if (binned_mfiles.empty()) { + binned_mfiles.emplace_back(chunked_vector{}); + } + auto merged_bins_res = co_await maybe_merge_mfiles_and_new_data( + io, + table, + [&gen_manifest_num]() { return gen_manifest_num(); }, + min_to_merge, + std::move(binned_mfiles[0]), + std::move(data.new_data_files), + *pspec, + ctx); + if (merged_bins_res.has_error()) { + co_return merged_bins_res.error(); + } + auto merged_bins = std::move(merged_bins_res.value()); + std::move( + merged_bins.begin(), + merged_bins.end(), + std::back_inserter(new_mfiles)); + + for (size_t i = 1; i < binned_mfiles.size(); i++) { + auto& bin = binned_mfiles[i]; + if (bin.size() == 1) { + new_mfiles.emplace_back(std::move(bin[0])); + continue; + } + auto merged_bin_res = co_await merge_mfiles( + io, + table, + [&gen_manifest_num]() { return gen_manifest_num(); }, + manifest_content_type::data, + std::move(bin), + {}, + std::nullopt, + *pspec, + ctx); + if (merged_bin_res.has_error()) { + co_return merged_bin_res.error(); + } + new_mfiles.emplace_back(std::move(merged_bin_res.value())); + } + } + co_return new_mfiles; +} + +} // namespace + +row_delta_action::row_delta_action( + manifest_io& io, + const table_metadata& table, + chunked_vector data_files, + chunked_vector delete_files, + chunked_vector> snapshot_props, + std::optional tag_name, + std::optional tag_expiration_ms) + : io_(io) + , table_(table) + , commit_uuid_(uuid_t::create()) + , new_data_files_(std::move(data_files)) + , new_delete_files_(std::move(delete_files)) + , snapshot_props_(std::move(snapshot_props)) + , tag_name_(std::move(tag_name)) + , tag_expiration_ms_(tag_expiration_ms) {} + +ss::future row_delta_action::build_updates() && { + vlog( + log.info, + "Building row delta update for {} data files and {} delete files", + new_data_files_.size(), + new_delete_files_.size()); + + // Validate data files. + size_t added_data_records{0}; + size_t added_data_files_size{0}; + for (const auto& f : new_data_files_) { + if (f.file.partition.val == nullptr) { + vlog( + log.error, + "Metadata for data file {} is missing partition key", + f.file.file_path); + co_return action::errc::unexpected_state; + } + const auto* pspec = table_.get_partition_spec(f.partition_spec_id); + if (!pspec) { + vlog( + log.error, + "partition spec {} for file {} not found in metadata", + f.partition_spec_id, + f.file.file_path); + co_return action::errc::unexpected_state; + } + auto f_num_fields = f.file.partition.val->fields.size(); + if (f_num_fields != pspec->fields.size()) { + vlog( + log.error, + "Partition key for data file {} has {} fields, expected {}", + f.file.file_path, + f_num_fields, + pspec->fields.size()); + co_return action::errc::unexpected_state; + } + added_data_records += f.file.record_count; + added_data_files_size += f.file.file_size_bytes; + } + auto added_data_files_count = new_data_files_.size(); + + // Validate delete files. + size_t deleted_records{0}; + for (const auto& f : new_delete_files_) { + if (f.file.partition.val == nullptr) { + vlog( + log.error, + "Metadata for delete file {} is missing partition key", + f.file.file_path); + co_return action::errc::unexpected_state; + } + const auto* pspec = table_.get_partition_spec(f.partition_spec_id); + if (!pspec) { + vlog( + log.error, + "partition spec {} for delete file {} not found in metadata", + f.partition_spec_id, + f.file.file_path); + co_return action::errc::unexpected_state; + } + auto f_num_fields = f.file.partition.val->fields.size(); + if (f_num_fields != pspec->fields.size()) { + vlog( + log.error, + "Partition key for delete file {} has {} fields, expected {}", + f.file.file_path, + f_num_fields, + pspec->fields.size()); + co_return action::errc::unexpected_state; + } + deleted_records += f.file.record_count; + } + auto added_delete_files_count = new_delete_files_.size(); + + // Get the manifest list for the current snapshot, if any. + manifest_list mlist; + std::optional old_snap_id; + std::optional old_summary; + if (table_.snapshots.has_value() && !table_.snapshots->empty()) { + if (!table_.current_snapshot_id.has_value()) { + vlog( + log.error, + "Table's current snapshot id is not set but there are {} " + "snapshots", + table_.snapshots->size()); + co_return action::errc::unexpected_state; + } + const auto table_cur_snap_id = *table_.current_snapshot_id; + const auto& snaps = *table_.snapshots; + auto snap_it = std::ranges::find( + snaps, table_cur_snap_id, &snapshot::id); + if (snap_it == snaps.end()) { + vlog( + log.error, + "Table's current snapshot id {} is missing", + table_cur_snap_id); + co_return action::errc::unexpected_state; + } + auto mlist_res = co_await io_.download_manifest_list( + snap_it->manifest_list_path); + if (mlist_res.has_error()) { + co_return to_action_errc(mlist_res.error()); + } + mlist = std::move(mlist_res).value(); + old_snap_id = table_cur_snap_id; + old_summary = snap_it->summary; + } else if ( + table_.current_snapshot_id.has_value() + && table_.current_snapshot_id.value() != invalid_snapshot_id) { + vlog( + log.error, + "Table's current snapshot id is set to {} but there are no " + "snapshots", + table_.current_snapshot_id.value()); + co_return action::errc::unexpected_state; + } + const auto new_seq_num = sequence_number{table_.last_sequence_number() + 1}; + const auto new_snap_id = generate_unused_snap_id(table_); + + const table_snapshot_ctx ctx{ + .commit_uuid = commit_uuid_, + .snap_id = new_snap_id, + .seq_num = new_seq_num, + }; + + // Separate existing manifest files into data and delete manifests. + chunked_vector old_data_mfiles; + chunked_vector old_delete_mfiles; + for (auto& mf : mlist.files) { + if (mf.content == manifest_file_content::data) { + old_data_mfiles.push_back(std::move(mf)); + } else { + old_delete_mfiles.push_back(std::move(mf)); + } + } + + // Handle data files using the same pack+merge logic as merge_append. + chunked_vector new_mfiles; + if (!new_data_files_.empty() || !old_data_mfiles.empty()) { + auto mfiles_res = co_await pack_mlist_and_new_data( + io_, + table_, + [this]() { return generate_manifest_num(); }, + merge_append_action::default_min_to_merge_new_files, + merge_append_action::default_target_size_bytes, + ctx, + std::move(old_data_mfiles), + std::move(new_data_files_)); + if (mfiles_res.has_error()) { + co_return mfiles_res.error(); + } + auto data_mfiles = std::move(mfiles_res.value()); + std::move( + data_mfiles.begin(), + data_mfiles.end(), + std::back_inserter(new_mfiles)); + } + + // Handle delete files: create a new delete manifest per partition spec. + if (!new_delete_files_.empty()) { + struct per_spec_delete_data { + chunked_vector entries; + schema::id_t max_schema_id{schema::id_t::min()}; + }; + chunked_hash_map + spec2deletes; + for (auto& f : new_delete_files_) { + auto spec_id = f.partition_spec_id; + auto& per_spec = spec2deletes[spec_id]; + per_spec.max_schema_id = std::max( + per_spec.max_schema_id, f.schema_id); + manifest_entry e{ + .status = manifest_entry_status::added, + .snapshot_id = new_snap_id, + .sequence_number = std::nullopt, + .file_sequence_number = std::nullopt, + .data_file = std::move(f.file), + }; + per_spec.entries.emplace_back(std::move(e)); + } + + for (auto& [spec_id, per_spec] : spec2deletes) { + const auto* pspec = table_.get_partition_spec(spec_id); + if (!pspec) { + vlog( + log.error, + "partition spec {} not found in metadata", + spec_id); + co_return action::errc::unexpected_state; + } + + auto mfile_res = co_await merge_mfiles( + io_, + table_, + [this]() { return generate_manifest_num(); }, + manifest_content_type::deletes, + {}, + std::move(per_spec.entries), + per_spec.max_schema_id, + *pspec, + ctx); + if (mfile_res.has_error()) { + co_return mfile_res.error(); + } + new_mfiles.emplace_back(std::move(mfile_res.value())); + } + } + + // Carry forward existing delete manifests. + std::move( + old_delete_mfiles.begin(), + old_delete_mfiles.end(), + std::back_inserter(new_mfiles)); + + manifest_list new_mlist{std::move(new_mfiles)}; + + const auto new_mlist_path = get_manifest_list_path( + table_, new_snap_id, commit_uuid_, 0); + + vlog( + log.info, + "Uploading manifest list {} containing {} manifest files", + new_mlist_path, + new_mlist.files.size()); + const auto mlist_up_res = co_await io_.upload_manifest_list( + new_mlist_path, new_mlist); + if (mlist_up_res.has_error()) { + co_return to_action_errc(mlist_up_res.error()); + } + + // Determine snapshot operation. + snapshot_operation op; + if (added_data_files_count > 0 && added_delete_files_count > 0) { + op = snapshot_operation::overwrite; + } else if (added_delete_files_count > 0) { + op = snapshot_operation::delete_data; + } else { + op = snapshot_operation::append; + } + + snapshot_summary new_summary = { + .operation = op, + .added_data_files = static_cast(added_data_files_count), + .added_records = static_cast(added_data_records), + .added_files_size = static_cast(added_data_files_size), + .added_delete_files = static_cast(added_delete_files_count), + .deleted_records = static_cast(deleted_records), + .other = {}, + }; + if (old_summary) { + if (old_summary->total_data_files.has_value()) { + new_summary.total_data_files = static_cast( + added_data_files_count) + + *old_summary->total_data_files; + } + if (old_summary->total_records.has_value()) { + new_summary.total_records = static_cast(added_data_records) + + *old_summary->total_records; + } + if (old_summary->total_files_size.has_value()) { + new_summary.total_files_size = static_cast( + added_data_files_size) + + *old_summary->total_files_size; + } + if (old_summary->total_delete_files.has_value()) { + new_summary.total_delete_files = static_cast( + added_delete_files_count) + + *old_summary->total_delete_files; + } + } else { + new_summary.total_data_files = static_cast( + added_data_files_count); + new_summary.total_records = static_cast(added_data_records); + new_summary.total_files_size = static_cast( + added_data_files_size); + new_summary.total_delete_files = static_cast( + added_delete_files_count); + } + + snapshot s{ + .id = new_snap_id, + .parent_snapshot_id = old_snap_id, + .sequence_number = new_seq_num, + .timestamp_ms = model::timestamp::now(), + .summary = std::move(new_summary), + .manifest_list_path = new_mlist_path, + .schema_id = table_.current_schema_id, + }; + for (auto& [k, v] : snapshot_props_) { + s.summary.other.emplace(k, v); + } + updates_and_reqs ret; + ret.updates.emplace_back(table_update::add_snapshot{std::move(s)}); + ret.updates.emplace_back(table_update::set_snapshot_ref{ + .ref_name = "main", + .ref = snapshot_reference{ + .snapshot_id = new_snap_id, + .type = snapshot_ref_type::branch, + }, + }); + if (tag_name_.has_value()) { + ret.updates.emplace_back(table_update::set_snapshot_ref{ + .ref_name = tag_name_.value(), + .ref = snapshot_reference{ + .snapshot_id = new_snap_id, + .type = snapshot_ref_type::tag, + .max_ref_age_ms = tag_expiration_ms_, + }, + }); + } + ret.requirements.emplace_back( + table_requirement::assert_ref_snapshot_id{ + .ref = "main", + .snapshot_id = old_snap_id, + }); + co_return ret; +} + +} // namespace iceberg diff --git a/src/v/iceberg/row_delta_action.h b/src/v/iceberg/row_delta_action.h new file mode 100644 index 0000000000000..b3bea85c1ee9e --- /dev/null +++ b/src/v/iceberg/row_delta_action.h @@ -0,0 +1,64 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "iceberg/action.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_io.h" +#include "iceberg/manifest_utils.h" +#include "iceberg/merge_append_action.h" +#include "iceberg/table_metadata.h" + +namespace iceberg { + +struct file_to_delete { + data_file file; + schema::id_t schema_id; + partition_spec::id_t partition_spec_id; +}; + +/// An action that commits both data files and delete files to a table's +/// latest snapshot. Data files are handled identically to merge_append_action +/// (bin-packing and merging existing data manifests). Delete files are written +/// to a new delete manifest without merging into existing delete manifests. +/// +/// Produces snapshot operations: +/// - overwrite: when both data and delete files are present +/// - delete_data: when only delete files are present +/// - append: when only data files are present +class row_delta_action : public action { +public: + row_delta_action( + manifest_io& io, + const table_metadata& table, + chunked_vector data_files, + chunked_vector delete_files, + chunked_vector> snapshot_props = {}, + std::optional tag_name = std::nullopt, + std::optional tag_expiration_ms = std::nullopt); + +protected: + ss::future build_updates() && final; + +private: + size_t generate_manifest_num() { return next_manifest_num_++; } + + manifest_io& io_; + const table_metadata& table_; + const uuid_t commit_uuid_; + size_t next_manifest_num_{0}; + chunked_vector new_data_files_; + chunked_vector new_delete_files_; + chunked_vector> snapshot_props_; + std::optional tag_name_; + std::optional tag_expiration_ms_; +}; + +} // namespace iceberg diff --git a/src/v/iceberg/snapshot.h b/src/v/iceberg/snapshot.h index db083892874ad..b26ae068be1d3 100644 --- a/src/v/iceberg/snapshot.h +++ b/src/v/iceberg/snapshot.h @@ -50,6 +50,10 @@ struct snapshot_summary { std::optional total_records; std::optional added_files_size; std::optional total_files_size; + std::optional deleted_data_files; + std::optional added_delete_files; + std::optional total_delete_files; + std::optional deleted_records; // All other properties of the snapshot, besides 'operation'. // NOTE: these aren't necessarily important to Redpanda's Iceberg write diff --git a/src/v/iceberg/snapshot_json.cc b/src/v/iceberg/snapshot_json.cc index cfb98dafb7239..eaf40374641ae 100644 --- a/src/v/iceberg/snapshot_json.cc +++ b/src/v/iceberg/snapshot_json.cc @@ -134,6 +134,10 @@ snapshot parse_snapshot(const json::Value& v) { maybe_parse_metric("total-records", &summary.total_records); maybe_parse_metric("added-files-size", &summary.added_files_size); maybe_parse_metric("total-files-size", &summary.total_files_size); + maybe_parse_metric("deleted-data-files", &summary.deleted_data_files); + maybe_parse_metric("added-delete-files", &summary.added_delete_files); + maybe_parse_metric("total-delete-files", &summary.total_delete_files); + maybe_parse_metric("deleted-records", &summary.deleted_records); auto operation_str = parse_required_str(summary_json, "operation"); return snapshot{ .id = snapshot_id{id}, @@ -209,6 +213,10 @@ void rjson_serialize(iceberg::json_writer& w, const iceberg::snapshot& s) { maybe_serialize_metric("total-records", s.summary.total_records); maybe_serialize_metric("added-files-size", s.summary.added_files_size); maybe_serialize_metric("total-files-size", s.summary.total_files_size); + maybe_serialize_metric("deleted-data-files", s.summary.deleted_data_files); + maybe_serialize_metric("added-delete-files", s.summary.added_delete_files); + maybe_serialize_metric("total-delete-files", s.summary.total_delete_files); + maybe_serialize_metric("deleted-records", s.summary.deleted_records); w.EndObject(); w.EndObject(); diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 704b10ad44033..649ebbf727336 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -234,6 +234,33 @@ redpanda_cc_gtest( ], ) +redpanda_cc_gtest( + name = "row_delta_action_test", + timeout = "moderate", + srcs = [ + "row_delta_action_test.cc", + ], + cpu = 1, + deps = [ + ":test_schemas", + "//src/v/cloud_io:remote", + "//src/v/cloud_io/tests:s3_imposter", + "//src/v/cloud_io/tests:scoped_remote", + "//src/v/iceberg:manifest_entry", + "//src/v/iceberg:manifest_io", + "//src/v/iceberg:merge_append_action", + "//src/v/iceberg:partition", + "//src/v/iceberg:row_delta_action", + "//src/v/iceberg:table_update", + "//src/v/iceberg:table_update_applier", + "//src/v/iceberg:values_bytes", + "//src/v/model", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) + redpanda_cc_gtest( name = "partition_json_test", timeout = "short", @@ -727,6 +754,21 @@ redpanda_cc_bench( ], ) +redpanda_cc_gtest( + name = "equality_delete_file_test", + timeout = "short", + srcs = [ + "equality_delete_file_test.cc", + ], + deps = [ + "//src/v/iceberg:equality_delete_file", + "//src/v/serde/parquet:reader", + "//src/v/serde/parquet:value", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + ], +) + redpanda_cc_bench( name = "iceberg_compatibility_rpbench", srcs = [ diff --git a/src/v/iceberg/tests/equality_delete_file_test.cc b/src/v/iceberg/tests/equality_delete_file_test.cc new file mode 100644 index 0000000000000..71b526908ce59 --- /dev/null +++ b/src/v/iceberg/tests/equality_delete_file_test.cc @@ -0,0 +1,134 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/equality_delete_file.h" +#include "serde/parquet/reader.h" +#include "serde/parquet/value.h" + +#include + +namespace iceberg { + +namespace { + +serde::parquet::schema_element two_column_key_schema() { + using serde::parquet::byte_array_type; + using serde::parquet::field_repetition_type; + using serde::parquet::i32_type; + using serde::parquet::schema_element; + using serde::parquet::string_type; + + chunked_vector children; + children.push_back( + schema_element{ + .type = i32_type{}, + .repetition_type = field_repetition_type::required, + .path = {ss::sstring("id")}, + .field_id = 1, + }); + children.push_back( + schema_element{ + .type = byte_array_type{}, + .repetition_type = field_repetition_type::required, + .path = {ss::sstring("name")}, + .field_id = 2, + .logical_type = string_type{}, + }); + return schema_element{ + .type = std::monostate{}, + .repetition_type = field_repetition_type::required, + .path = {ss::sstring("equality_deletes")}, + .children = std::move(children), + }; +} + +serde::parquet::group_value make_row(int32_t id, const ss::sstring& name) { + serde::parquet::group_value row; + row.push_back( + serde::parquet::group_member{serde::parquet::int32_value{id}}); + row.push_back( + serde::parquet::group_member{ + serde::parquet::byte_array_value{iobuf::from(name)}}); + return row; +} + +} // namespace + +// NOLINTBEGIN(*magic-number*) + +TEST(EqualityDeleteFile, RoundTrip) { + chunked_vector rows; + rows.push_back(make_row(1, "alice")); + rows.push_back(make_row(2, "bob")); + rows.push_back(make_row(3, "charlie")); + + chunked_vector field_ids; + field_ids.push_back(nested_field::id_t{1}); + field_ids.push_back(nested_field::id_t{2}); + + equality_delete_options opts{ + .parquet_schema = two_column_key_schema(), + .equality_field_ids = std::move(field_ids), + }; + + auto result + = write_equality_delete_file(std::move(opts), std::move(rows)).get(); + + EXPECT_EQ( + result.manifest_entry.content_type, + data_file_content_type::equality_deletes); + EXPECT_EQ(result.manifest_entry.file_format, data_file_format::parquet); + EXPECT_EQ(result.manifest_entry.record_count, 3); + EXPECT_GT(result.manifest_entry.file_size_bytes, 0); + + ASSERT_TRUE(result.manifest_entry.equality_ids.has_value()); + auto& eq_ids = result.manifest_entry.equality_ids.value(); + ASSERT_EQ(eq_ids.size(), 2); + EXPECT_EQ(eq_ids[0], nested_field::id_t{1}); + EXPECT_EQ(eq_ids[1], nested_field::id_t{2}); + + auto records + = serde::parquet::read_file_as_records(std::move(result.file_data)).get(); + + ASSERT_EQ(records.size(), 3); + EXPECT_EQ(records[0], make_row(1, "alice")); + EXPECT_EQ(records[1], make_row(2, "bob")); + EXPECT_EQ(records[2], make_row(3, "charlie")); +} + +TEST(EqualityDeleteFile, RoundTripCompressed) { + chunked_vector rows; + rows.push_back(make_row(42, "eve")); + + chunked_vector field_ids; + field_ids.push_back(nested_field::id_t{1}); + field_ids.push_back(nested_field::id_t{2}); + + equality_delete_options opts{ + .parquet_schema = two_column_key_schema(), + .equality_field_ids = std::move(field_ids), + .compress = true, + }; + + auto result + = write_equality_delete_file(std::move(opts), std::move(rows)).get(); + + EXPECT_EQ(result.manifest_entry.record_count, 1); + + auto records + = serde::parquet::read_file_as_records(std::move(result.file_data)).get(); + + ASSERT_EQ(records.size(), 1); + EXPECT_EQ(records[0], make_row(42, "eve")); +} + +// NOLINTEND(*magic-number*) + +} // namespace iceberg diff --git a/src/v/iceberg/tests/row_delta_action_test.cc b/src/v/iceberg/tests/row_delta_action_test.cc new file mode 100644 index 0000000000000..e2b5e2b8eefc2 --- /dev/null +++ b/src/v/iceberg/tests/row_delta_action_test.cc @@ -0,0 +1,293 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "cloud_io/remote.h" +#include "cloud_io/tests/s3_imposter.h" +#include "cloud_io/tests/scoped_remote.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_io.h" +#include "iceberg/merge_append_action.h" +#include "iceberg/partition.h" +#include "iceberg/row_delta_action.h" +#include "iceberg/table_update.h" +#include "iceberg/table_update_applier.h" +#include "iceberg/tests/test_schemas.h" +#include "iceberg/values_bytes.h" +#include "model/timestamp.h" + +#include + +using namespace iceberg; + +class RowDeltaActionTest + : public s3_imposter_fixture + , public ::testing::Test { +public: + RowDeltaActionTest() + : sr(cloud_io::scoped_remote::create(10, conf)) + , io(remote(), bucket_name) { + set_expectations_and_listen({}); + } + cloud_io::remote& remote() { return sr->remote.local(); } + + table_metadata create_table() { + auto s = schema{ + .schema_struct = std::get(test_nested_schema_type()), + .schema_id = schema::id_t{0}, + .identifier_field_ids = {}, + }; + chunked_vector schemas; + schemas.emplace_back(s.copy()); + chunked_vector pspecs; + pspecs.emplace_back(partition_spec{ + .spec_id = partition_spec::id_t{0}, + .fields = { + partition_field{ + .source_id = nested_field::id_t{2}, + .field_id = partition_field::id_t{1000}, + .name = "bar", + .transform = identity_transform{}, + }, + }, + }); + return table_metadata{ + .format_version = format_version::v2, + .table_uuid = uuid_t::create(), + .location = uri(fmt::format("s3://{}/foo/bar", bucket_name())), + .last_sequence_number = sequence_number{0}, + .last_updated_ms = model::timestamp::now(), + .last_column_id = s.highest_field_id().value(), + .schemas = std::move(schemas), + .current_schema_id = schema::id_t{0}, + .partition_specs = std::move(pspecs), + .default_spec_id = partition_spec::id_t{0}, + .last_partition_id = partition_field::id_t{-1}, + }; + } + + partition_key make_single_field_pk(primitive_value v) { + auto pk_struct = std::make_unique(); + pk_struct->fields.emplace_back(std::move(v)); + return {std::move(pk_struct)}; + } + + chunked_vector create_data_files( + const table_metadata& md, + const ss::sstring& path_base, + size_t num_files, + size_t record_count, + primitive_value pk_value = int_value{42}) { + chunked_vector ret; + ret.reserve(num_files); + const auto records_per_file = record_count / num_files; + const auto leftover_records = record_count % num_files; + for (size_t i = 0; i < num_files; i++) { + const auto path = fmt::format("{}-{}", path_base, i); + data_file file{ + .content_type = data_file_content_type::data, + .file_path = uri(path), + .partition = make_single_field_pk(make_copy(pk_value)), + .record_count = records_per_file, + .file_size_bytes = 1_KiB, + }; + ret.emplace_back( + file_to_append{ + .file = std::move(file), + .schema_id = md.current_schema_id, + .partition_spec_id = md.default_spec_id, + }); + } + ret[0].file.record_count += leftover_records; + return ret; + } + + chunked_vector create_delete_files( + const table_metadata& md, + const ss::sstring& path_base, + size_t num_files, + size_t record_count, + data_file_content_type content_type + = data_file_content_type::position_deletes, + primitive_value pk_value = int_value{42}) { + chunked_vector ret; + ret.reserve(num_files); + const auto records_per_file = record_count / num_files; + const auto leftover_records = record_count % num_files; + for (size_t i = 0; i < num_files; i++) { + const auto path = fmt::format("{}-del-{}", path_base, i); + data_file file{ + .content_type = content_type, + .file_path = uri(path), + .partition = make_single_field_pk(make_copy(pk_value)), + .record_count = records_per_file, + .file_size_bytes = 512, + }; + ret.emplace_back( + file_to_delete{ + .file = std::move(file), + .schema_id = md.current_schema_id, + .partition_spec_id = md.default_spec_id, + }); + } + ret[0].file.record_count += leftover_records; + return ret; + } + + /// Runs a row_delta_action against the given table metadata, applying + /// the resulting updates in place. + checked run_row_delta( + table_metadata& table, + chunked_vector data_files, + chunked_vector delete_files) { + row_delta_action rda( + io, table, std::move(data_files), std::move(delete_files)); + action& a = rda; + auto outcome = std::move(a).build_updates().get(); + if (outcome.has_error()) { + return outcome.error(); + } + auto& updates = outcome.value(); + for (auto& u : updates.updates) { + auto res = table_update::apply(u, table); + if (res != table_update::outcome::success) { + return action::errc::unexpected_state; + } + } + return std::monostate{}; + } + + std::unique_ptr sr; + manifest_io io; +}; + +TEST_F(RowDeltaActionTest, TestDeleteOnly) { + auto table = create_table(); + const size_t delete_files_count = 3; + const size_t deleted_records = 90; + + auto del_files = create_delete_files( + table, "test", delete_files_count, deleted_records); + auto res = run_row_delta(table, {}, std::move(del_files)); + ASSERT_FALSE(res.has_error()) << res.error(); + + ASSERT_TRUE(table.snapshots.has_value()); + ASSERT_EQ(table.snapshots->size(), 1); + const auto& snap = table.snapshots->back(); + ASSERT_EQ(snap.summary.operation, snapshot_operation::delete_data); + + // Verify summary fields. + ASSERT_EQ(snap.summary.added_data_files, 0); + ASSERT_EQ(snap.summary.added_delete_files, delete_files_count); + ASSERT_EQ(snap.summary.deleted_records, deleted_records); + ASSERT_EQ(snap.summary.total_delete_files, delete_files_count); + ASSERT_EQ(snap.summary.total_data_files, 0); + ASSERT_EQ(snap.summary.total_records, 0); + + // Verify manifest list has one delete manifest. + auto mlist_res = io.download_manifest_list(snap.manifest_list_path).get(); + ASSERT_TRUE(mlist_res.has_value()); + const auto& mlist = mlist_res.value(); + ASSERT_EQ(mlist.files.size(), 1); + ASSERT_EQ(mlist.files[0].content, manifest_file_content::deletes); + ASSERT_EQ(mlist.files[0].added_files_count, delete_files_count); + ASSERT_EQ(mlist.files[0].added_rows_count, deleted_records); + + // Download the manifest and verify entries. + auto manifest_res + = io.download_manifest(mlist.files[0].manifest_path).get(); + ASSERT_TRUE(manifest_res.has_value()); + const auto& manifest = manifest_res.value(); + ASSERT_EQ(manifest.entries.size(), delete_files_count); + for (const auto& entry : manifest.entries) { + ASSERT_EQ(entry.status, manifest_entry_status::added); + ASSERT_EQ( + entry.data_file.content_type, + data_file_content_type::position_deletes); + } +} + +TEST_F(RowDeltaActionTest, TestDataAndDeletes) { + auto table = create_table(); + const size_t data_files_count = 2; + const size_t data_records = 100; + const size_t delete_files_count = 1; + const size_t deleted_records = 30; + + auto data = create_data_files( + table, "data", data_files_count, data_records); + auto deletes = create_delete_files( + table, "del", delete_files_count, deleted_records); + + auto res = run_row_delta(table, std::move(data), std::move(deletes)); + ASSERT_FALSE(res.has_error()) << res.error(); + + ASSERT_TRUE(table.snapshots.has_value()); + ASSERT_EQ(table.snapshots->size(), 1); + const auto& snap = table.snapshots->back(); + ASSERT_EQ(snap.summary.operation, snapshot_operation::overwrite); + + // Verify summary. + ASSERT_EQ(snap.summary.added_data_files, data_files_count); + ASSERT_EQ(snap.summary.added_records, data_records); + ASSERT_EQ(snap.summary.added_delete_files, delete_files_count); + ASSERT_EQ(snap.summary.deleted_records, deleted_records); + ASSERT_EQ(snap.summary.total_data_files, data_files_count); + ASSERT_EQ(snap.summary.total_records, data_records); + ASSERT_EQ(snap.summary.total_delete_files, delete_files_count); + + // Verify manifest list has both data and delete manifest files. + auto mlist_res = io.download_manifest_list(snap.manifest_list_path).get(); + ASSERT_TRUE(mlist_res.has_value()); + const auto& mlist = mlist_res.value(); + ASSERT_EQ(mlist.files.size(), 2); + + size_t data_manifest_count = 0; + size_t delete_manifest_count = 0; + for (const auto& mf : mlist.files) { + if (mf.content == manifest_file_content::data) { + ++data_manifest_count; + ASSERT_EQ(mf.added_files_count, data_files_count); + ASSERT_EQ(mf.added_rows_count, data_records); + } else { + ++delete_manifest_count; + ASSERT_EQ(mf.added_files_count, delete_files_count); + ASSERT_EQ(mf.added_rows_count, deleted_records); + } + } + ASSERT_EQ(data_manifest_count, 1); + ASSERT_EQ(delete_manifest_count, 1); +} + +TEST_F(RowDeltaActionTest, TestDataOnlyBehavesLikeAppend) { + auto table = create_table(); + const size_t data_files_count = 3; + const size_t data_records = 150; + + auto data = create_data_files( + table, "data", data_files_count, data_records); + auto res = run_row_delta(table, std::move(data), {}); + ASSERT_FALSE(res.has_error()) << res.error(); + + ASSERT_TRUE(table.snapshots.has_value()); + ASSERT_EQ(table.snapshots->size(), 1); + const auto& snap = table.snapshots->back(); + ASSERT_EQ(snap.summary.operation, snapshot_operation::append); + + ASSERT_EQ(snap.summary.added_data_files, data_files_count); + ASSERT_EQ(snap.summary.added_records, data_records); + ASSERT_EQ(snap.summary.total_data_files, data_files_count); + ASSERT_EQ(snap.summary.total_records, data_records); + + // Only data manifests in the list. + auto mlist_res = io.download_manifest_list(snap.manifest_list_path).get(); + ASSERT_TRUE(mlist_res.has_value()); + const auto& mlist = mlist_res.value(); + ASSERT_EQ(mlist.files.size(), 1); + ASSERT_EQ(mlist.files[0].content, manifest_file_content::data); +} diff --git a/src/v/iceberg/transaction.cc b/src/v/iceberg/transaction.cc index c61dc85dbab92..922b80a581245 100644 --- a/src/v/iceberg/transaction.cc +++ b/src/v/iceberg/transaction.cc @@ -11,6 +11,7 @@ #include "iceberg/merge_append_action.h" #include "iceberg/remove_snapshots_action.h" +#include "iceberg/row_delta_action.h" #include "iceberg/schema.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update_applier.h" @@ -93,6 +94,24 @@ ss::future transaction::merge_append( co_return co_await apply(std::move(a)); } +ss::future transaction::row_delta( + manifest_io& io, + chunked_vector data_files, + chunked_vector delete_files, + chunked_vector> snapshot_props, + std::optional tag_name, + std::optional tag_expiration_ms) { + auto a = std::make_unique( + io, + table_, + std::move(data_files), + std::move(delete_files), + std::move(snapshot_props), + std::move(tag_name), + tag_expiration_ms); + co_return co_await apply(std::move(a)); +} + ss::future transaction::remove_expired_snapshots(model::timestamp now) { auto a = std::make_unique(table_, now); diff --git a/src/v/iceberg/transaction.h b/src/v/iceberg/transaction.h index bd51393241880..bf254e69b411e 100644 --- a/src/v/iceberg/transaction.h +++ b/src/v/iceberg/transaction.h @@ -14,6 +14,7 @@ #include "iceberg/action.h" #include "iceberg/manifest_io.h" #include "iceberg/merge_append_action.h" +#include "iceberg/row_delta_action.h" #include "iceberg/schema.h" #include "iceberg/table_metadata.h" @@ -62,6 +63,18 @@ class transaction { std::optional tag_name = std::nullopt, std::optional tag_expiration_ms = std::nullopt); + // Adds the given data files and delete files to a new snapshot. + // Delete files reference rows to be removed from existing data files. + // Creates an overwrite snapshot (data+deletes), delete_data (deletes + // only), or append (data only) depending on inputs. + ss::future row_delta( + manifest_io&, + chunked_vector data_files, + chunked_vector delete_files, + chunked_vector> snapshot_props = {}, + std::optional tag_name = std::nullopt, + std::optional tag_expiration_ms = std::nullopt); + // Removes expired snapshots from the table, computing expiration based on // the given timestamp. Note, this does not perform IO to delete any // now-orphaned metadata or data.