diff --git a/src/v/cluster/metrics_reporter.cc b/src/v/cluster/metrics_reporter.cc index eafe4233c2dd4..a45f2cb971950 100644 --- a/src/v/cluster/metrics_reporter.cc +++ b/src/v/cluster/metrics_reporter.cc @@ -355,6 +355,9 @@ metrics_reporter::build_metrics_snapshot() { case model::iceberg_mode::variant::value_schema_latest: ++snapshot.topics_with_iceberg_schema_latest; break; + case model::iceberg_mode::variant::debezium: + ++snapshot.topics_with_iceberg_debezium; + break; } } @@ -717,6 +720,8 @@ void rjson_serialize( w.Uint64(snapshot.topics_with_iceberg_schema_id); w.Key("topics_with_iceberg_latest_protobuf_value"); w.Uint64(snapshot.topics_with_iceberg_schema_latest); + w.Key("topics_with_iceberg_debezium"); + w.Uint64(snapshot.topics_with_iceberg_debezium); w.Key("partition_count"); w.Uint64(snapshot.partition_count); diff --git a/src/v/cluster/metrics_reporter.h b/src/v/cluster/metrics_reporter.h index 481cd8273bcee..618c1657c56a1 100644 --- a/src/v/cluster/metrics_reporter.h +++ b/src/v/cluster/metrics_reporter.h @@ -113,6 +113,7 @@ class metrics_reporter { uint32_t topics_with_iceberg_kv{0}; uint32_t topics_with_iceberg_schema_id{0}; uint32_t topics_with_iceberg_schema_latest{0}; + uint32_t topics_with_iceberg_debezium{0}; cluster_version active_logical_version{invalid_version}; cluster_version original_logical_version{invalid_version}; diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index 2a779af61b855..2fede5d57d827 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -151,6 +151,7 @@ redpanda_cc_library( ":backlog_controller", ":catalog_schema_manager", ":cloud_data_io", + ":debezium_translator", ":location", ":logger", ":record_translator", @@ -238,11 +239,16 @@ redpanda_cc_library( "table_definition.cc", ], hdrs = [ + "schema_descriptor.h", "table_definition.h", ], visibility = [":__subpackages__"], deps = [ + "//src/v/iceberg:datatypes", "//src/v/iceberg:schema", + "//src/v/iceberg:values", + "//src/v/model", + "@seastar", ], ) @@ -409,6 +415,40 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "debezium_translator", + srcs = [ + "debezium_translator.cc", + ], + hdrs = [ + "debezium_translator.h", + ], + implementation_deps = [ + ":logger", + ":table_definition", + "//src/v/iceberg:avro_utils", + "//src/v/iceberg:compatibility_utils", + "//src/v/iceberg:values", + "//src/v/iceberg/conversion:conversion_outcome", + "//src/v/iceberg/conversion:values_avro", + "//src/v/iceberg/conversion:values_json", + "//src/v/iceberg/conversion:values_protobuf", + "@abseil-cpp//absl/container:flat_hash_set", + "@avro", + "@protobuf", + "@seastar", + ], + visibility = [":__subpackages__"], + deps = [ + ":record_schema_resolver", + ":record_translator", + "//src/v/base", + "//src/v/iceberg:datatypes", + "//src/v/metrics", + "//src/v/model", + ], +) + redpanda_cc_library( name = "local_parquet_file_writer", srcs = [ diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD index 1a2281708baee..22b6743db794c 100644 --- a/src/v/datalake/coordinator/BUILD +++ b/src/v/datalake/coordinator/BUILD @@ -163,6 +163,8 @@ redpanda_cc_library( "//src/v/container:chunked_vector", "//src/v/serde", "//src/v/serde:bytes", + "//src/v/serde:iobuf", + "//src/v/serde:optional", "@fmt", "@seastar", ], diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 45598646ad3af..a695e665bb946 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -15,6 +15,7 @@ #include "container/chunked_vector.h" #include "datalake/catalog_schema_manager.h" #include "datalake/coordinator/state_update.h" +#include "datalake/debezium_translator.h" #include "datalake/logger.h" #include "datalake/partition_spec_parser.h" #include "datalake/record_translator.h" @@ -441,6 +442,14 @@ struct coordinator::main_table_schema_provider val_type = std::move(type_res.value()); } + if (comps.is_debezium) { + if (!val_type.has_value()) { + co_return errc::failed; + } + auto& envelope = std::get( + val_type.value()->type); + co_return debezium_envelope_to_table_type(envelope); + } auto record_type = default_translator{}.build_type(std::move(val_type)); co_return std::move(record_type.type); } diff --git a/src/v/datalake/coordinator/data_file.h b/src/v/datalake/coordinator/data_file.h index 5ca4816ca13e2..a853eea25f3ec 100644 --- a/src/v/datalake/coordinator/data_file.h +++ b/src/v/datalake/coordinator/data_file.h @@ -14,6 +14,7 @@ #include "container/chunked_vector.h" #include "serde/envelope.h" #include "serde/rw/bytes.h" +#include "serde/rw/optional.h" #include @@ -23,7 +24,7 @@ namespace datalake::coordinator { // Represents a file that exists in object storage. struct data_file - : serde::envelope, serde::compat_version<0>> { + : serde::envelope, serde::compat_version<0>> { auto serde_fields() { return std::tie( remote_path, @@ -32,7 +33,9 @@ struct data_file hour_deprecated, table_schema_id, partition_spec_id, - partition_key); + partition_key, + delete_key_field_ids, + is_delete); } ss::sstring remote_path = ""; size_t row_count = 0; @@ -47,7 +50,8 @@ struct data_file // single-value serialization" (see iceberg/values_bytes.h). // Nulls are represented by std::nullopt. chunked_vector> partition_key; - // TODO: add kafka schema id + std::optional> delete_key_field_ids; + bool is_delete{false}; data_file copy() const { return { @@ -58,6 +62,11 @@ struct data_file .table_schema_id = table_schema_id, .partition_spec_id = partition_spec_id, .partition_key = partition_key.copy(), + .delete_key_field_ids = delete_key_field_ids.has_value() + ? std::make_optional( + delete_key_field_ids->copy()) + : std::nullopt, + .is_delete = is_delete, }; } diff --git a/src/v/datalake/coordinator/iceberg_file_committer.cc b/src/v/datalake/coordinator/iceberg_file_committer.cc index 2b52b595e4c57..ac33eeb287a1c 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.cc +++ b/src/v/datalake/coordinator/iceberg_file_committer.cc @@ -25,6 +25,7 @@ #include "iceberg/transaction.h" #include "iceberg/values.h" #include "iceberg/values_bytes.h" +#include "ssx/future-util.h" #include "storage/api.h" #include @@ -275,19 +276,30 @@ class table_commit_builder { table_commit_offset_); } else { for (const auto& f : files) { + bool is_equality_delete = f.is_delete; auto pk = build_partition_key(topic, table_, f); if (pk.has_error()) { return pk.error(); } iceberg::data_file file{ - .content_type = iceberg::data_file_content_type::data, + .content_type + = is_equality_delete + ? iceberg::data_file_content_type::equality_deletes + : iceberg::data_file_content_type::data, .file_path = io.to_uri(std::filesystem::path(f.remote_path)), .file_format = iceberg::data_file_format::parquet, .partition = std::move(pk.value()), .record_count = f.row_count, .file_size_bytes = f.file_size_bytes, }; + if (is_equality_delete && f.delete_key_field_ids) { + chunked_vector eq_ids; + for (auto id : *f.delete_key_field_ids) { + eq_ids.push_back(iceberg::nested_field::id_t{id}); + } + file.equality_ids = std::move(eq_ids); + } // For files created by a legacy Redpanda version that only // supported hourly partitioning, choose current schema and // spec ids. @@ -298,12 +310,23 @@ class table_commit_builder { = f.partition_spec_id >= 0 ? iceberg::partition_spec::id_t{f.partition_spec_id} : table_.default_spec_id; - icb_files_.push_back( - iceberg::file_to_append{ - .file = std::move(file), - .schema_id = schema_id, - .partition_spec_id = pspec_id, - }); + auto fta = iceberg::file_to_append{ + .file = std::move(file), + .schema_id = schema_id, + .partition_spec_id = pspec_id, + }; + if (f.delete_key_field_ids) { + if (!key_field_ids_) { + key_field_ids_.emplace(); + for (auto id : *f.delete_key_field_ids) { + key_field_ids_->push_back( + iceberg::nested_field::id_t{id}); + } + } + upsert_files_.push_back(std::move(fta)); + } else { + icb_files_.push_back(std::move(fta)); + } } } @@ -319,8 +342,7 @@ class table_commit_builder { model::revision_id topic_revision, iceberg::catalog& catalog, iceberg::manifest_io& io) && { - if (icb_files_.empty()) { - // No new files to commit. + if (icb_files_.empty() && upsert_files_.empty()) { vlog( datalake_log.debug, "All committed files were deduplicated for topic {} revision {}, " @@ -340,8 +362,9 @@ class table_commit_builder { vlog( datalake_log.debug, - "Adding {} files to Iceberg table {}", + "Adding {} data files and {} upsert files to Iceberg table {}", icb_files_.size(), + upsert_files_.size(), table_id_); // NOTE: a non-expiring tag is added to the new snapshot to ensure that // snapshot expiration doesn't clear this snapshot and its commit @@ -355,18 +378,37 @@ class table_commit_builder { std::numeric_limits::max()) : std::nullopt; iceberg::transaction txn(std::move(table_)); - auto icb_append_res = co_await txn.merge_append( - io, - std::move(icb_files_), - {{commit_meta_prop, to_json_str(commit_meta)}}, - std::move(tag_name), - tag_expiry_ms); - if (icb_append_res.has_error()) { - co_return log_and_convert_action_errc( - icb_append_res.error(), - fmt::format( - "Iceberg merge append failed for table {}", table_id_)); + + if (!key_field_ids_) { + auto icb_append_res = co_await txn.merge_append( + io, + std::move(icb_files_), + {{commit_meta_prop, to_json_str(commit_meta)}}, + std::move(tag_name), + tag_expiry_ms); + if (icb_append_res.has_error()) { + co_return log_and_convert_action_errc( + icb_append_res.error(), + fmt::format( + "Iceberg merge append failed for table {}", table_id_)); + } + } else { + auto merge_delta_res = commit_merge_delta(); + auto row_delta_res = co_await txn.row_delta( + io, + std::move(merge_delta_res.data_files), + std::move(merge_delta_res.delete_files), + {{commit_meta_prop, to_json_str(commit_meta)}}, + std::move(tag_name), + tag_expiry_ms); + if (row_delta_res.has_error()) { + co_return log_and_convert_action_errc( + row_delta_res.error(), + fmt::format( + "Iceberg row delta failed for table {}", table_id_)); + } } + auto icb_commit_res = co_await catalog.commit_txn( table_id_, std::move(txn)); if (icb_commit_res.has_error()) { @@ -379,9 +421,42 @@ class table_commit_builder { co_return std::move(icb_commit_res.value()); } - size_t num_files() const noexcept { return icb_files_.size(); } + size_t num_files() const noexcept { + return icb_files_.size() + upsert_files_.size(); + } private: + struct merge_delta_result { + chunked_vector data_files; + chunked_vector delete_files; + }; + + merge_delta_result commit_merge_delta() { + chunked_vector all_data; + chunked_vector deletes; + for (auto& f : icb_files_) { + all_data.push_back(std::move(f)); + } + for (auto& f : upsert_files_) { + if ( + f.file.content_type + == iceberg::data_file_content_type::equality_deletes) { + deletes.push_back( + iceberg::file_to_delete{ + .file = std::move(f.file), + .schema_id = f.schema_id, + .partition_spec_id = f.partition_spec_id, + }); + } else { + all_data.push_back(std::move(f)); + } + } + return merge_delta_result{ + .data_files = std::move(all_data), + .delete_files = std::move(deletes), + }; + } + table_commit_builder( const model::cluster_uuid& cluster, iceberg::table_identifier table_id, @@ -409,6 +484,8 @@ class table_commit_builder { // State accumulated. chunked_vector icb_files_; + chunked_vector upsert_files_; + std::optional> key_field_ids_; std::optional new_committed_offset_; }; diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index 6e0555906ac12..64b39fefebbd0 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -20,6 +20,7 @@ #include "datalake/cloud_data_io.h" #include "datalake/coordinator/catalog_factory.h" #include "datalake/coordinator/frontend.h" +#include "datalake/debezium_translator.h" #include "datalake/logger.h" #include "datalake/record_schema_resolver.h" #include "datalake/record_translator.h" @@ -52,6 +53,7 @@ static std::unique_ptr make_type_resolver( case model::iceberg_mode::variant::key_value: return std::make_unique(); case model::iceberg_mode::variant::value_schema_id_prefix: + case model::iceberg_mode::variant::debezium: return std::make_unique(sr, cache, type_cache); case model::iceberg_mode::variant::value_schema_latest: auto subject = pandaproxy::schema_registry::subject( @@ -69,8 +71,8 @@ static std::unique_ptr make_type_resolver( } } -static std::unique_ptr -make_record_translator(const model::iceberg_mode& mode) { +static std::unique_ptr make_record_translator( + const model::iceberg_mode& mode, type_resolver& resolver) { switch (mode.kind()) { case model::iceberg_mode::variant::disabled: vassert( @@ -81,6 +83,8 @@ make_record_translator(const model::iceberg_mode& mode) { case model::iceberg_mode::variant::value_schema_id_prefix: case model::iceberg_mode::variant::value_schema_latest: return std::make_unique(); + case model::iceberg_mode::variant::debezium: + return std::make_unique(resolver); } } } // namespace @@ -638,7 +642,7 @@ ss::future<> datalake_manager::handle_translator_state_change( *_schema_registry, *_schema_cache, *_resolved_type_cache); - auto record_translator = make_record_translator(mode); + auto record_translator = make_record_translator(mode, *type_resolver); auto table_creator = translation::make_default_table_creator( _coordinator_frontend->local()); diff --git a/src/v/datalake/debezium_translator.cc b/src/v/datalake/debezium_translator.cc new file mode 100644 index 0000000000000..2495de01fc3ac --- /dev/null +++ b/src/v/datalake/debezium_translator.cc @@ -0,0 +1,426 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/debezium_translator.h" + +#include "absl/container/flat_hash_set.h" +#include "base/vlog.h" +#include "datalake/logger.h" +#include "datalake/record_schema_resolver.h" +#include "datalake/table_definition.h" +#include "iceberg/avro_utils.h" +#include "iceberg/compatibility_utils.h" +#include "iceberg/conversion/conversion_outcome.h" +#include "iceberg/conversion/values_avro.h" +#include "iceberg/conversion/values_json.h" +#include "iceberg/conversion/values_protobuf.h" +#include "iceberg/datatypes.h" +#include "iceberg/values.h" +#include "model/fundamental.h" +#include "model/record.h" + +#include +#include + +namespace datalake { + +namespace { + +struct value_translating_visitor { + iobuf parsable_buf; + const iceberg::field_type& type; + + ss::future + operator()(const google::protobuf::Descriptor& d) { + return iceberg::deserialize_protobuf(std::move(parsable_buf), d); + } + ss::future + operator()(const avro::ValidSchema& s) { + auto value = co_await iceberg::deserialize_avro( + std::move(parsable_buf), s); + if (value.has_error()) { + co_return iceberg::optional_value_outcome(value.error()); + } + co_return std::move(value.value()); + } + ss::future + operator()(const iceberg::json_conversion_ir& s) { + auto value = co_await iceberg::deserialize_json( + std::move(parsable_buf), s); + if (value.has_error()) { + co_return iceberg::optional_value_outcome(value.error()); + } + co_return std::move(value.value()); + } +}; + +std::optional get_redpanda_idx(const iceberg::struct_type& val_type) { + for (size_t i = 0; i < val_type.fields.size(); ++i) { + if (val_type.fields[i]->name == rp_struct_name) { + return i; + } + } + return std::nullopt; +} + +/// Find a field by name in a struct_type and return its index. +std::optional +find_field_idx(const iceberg::struct_type& st, std::string_view name) { + for (size_t i = 0; i < st.fields.size(); ++i) { + if (st.fields[i]->name == name) { + return i; + } + } + return std::nullopt; +} + +/// Extract the inner struct_type from a field that is itself a struct. +const iceberg::struct_type* +get_inner_struct(const iceberg::nested_field& field) { + if (auto* st = std::get_if(&field.type)) { + return st; + } + return nullptr; +} + +/// Extract the iobuf from an iceberg::value that holds a string_value. +const iobuf* extract_string_buf(const std::optional& val) { + if (!val.has_value()) { + return nullptr; + } + auto* prim = std::get_if(&val.value()); + if (!prim) { + return nullptr; + } + auto* sv = std::get_if(prim); + if (!sv) { + return nullptr; + } + return &sv->val; +} + +/// Given an inner struct_value and its type, extract only the fields whose +/// names match the key field names, returning them as a new struct_value. +std::optional extract_key_fields( + const iceberg::struct_value& src, + const iceberg::struct_type& src_type, + const chunked_vector& key_field_names) { + auto key = iceberg::struct_value{}; + for (const auto& name : key_field_names) { + bool found = false; + for (size_t i = 0; i < src_type.fields.size(); ++i) { + if (src_type.fields[i]->name == name) { + if (src.fields[i].has_value()) { + key.fields.emplace_back( + iceberg::make_copy(src.fields[i].value())); + } else { + key.fields.emplace_back(std::nullopt); + } + found = true; + break; + } + } + if (!found) { + return std::nullopt; + } + } + return key; +} + +/// Force all fields in a struct_type to non-required (except map keys). +void make_fields_optional(iceberg::struct_type& struct_type) { + absl::flat_hash_set map_keys; + std::ignore = iceberg::for_each_field( + struct_type, [&map_keys](iceberg::nested_field* f) { + f->required = map_keys.contains(f) ? iceberg::field_required::yes + : iceberg::field_required::no; + if (std::holds_alternative(f->type)) { + auto& kv = std::get(f->type); + map_keys.insert(kv.key_field.get()); + } + }); +} + +} // namespace + +iceberg::struct_type +debezium_envelope_to_table_type(const iceberg::struct_type& envelope_type) { + auto ret_type = schemaless_struct_type(); + + auto after_idx = find_field_idx(envelope_type, "after"); + if (!after_idx.has_value()) { + vlog( + datalake_log.error, "Debezium envelope schema missing 'after' field"); + return ret_type; + } + + auto* after_struct = get_inner_struct(*envelope_type.fields[*after_idx]); + if (!after_struct) { + vlog( + datalake_log.error, + "Debezium envelope 'after' field is not a struct"); + return ret_type; + } + + auto inner_type = after_struct->copy(); + make_fields_optional(inner_type); + + for (auto& field : inner_type.fields) { + if (field->name == rp_struct_name) { + auto& system_fields = rp_struct_type(ret_type); + system_fields.fields.emplace_back( + iceberg::nested_field::create( + schemaless_next_field_id, + "data", + field->required, + std::move(field->type))); + continue; + } + ret_type.fields.emplace_back(std::move(field)); + } + + return ret_type; +} + +debezium_translator::debezium_translator(type_resolver& key_resolver) + : _key_resolver(key_resolver) {} + +record_type debezium_translator::build_type( + std::optional val_type) { + std::optional val_id; + if (!val_type.has_value()) { + return record_type{ + .comps = record_schema_components{ + .key_identifier = std::nullopt, + .val_identifier = std::nullopt, + .is_debezium = true, + }, + .type = schemaless_struct_type(), + .key_field_names = std::nullopt, + }; + } + + val_id = val_type.value()->id; + auto envelope_type = std::get( + iceberg::make_copy(val_type.value()->type)); + auto ret_type = debezium_envelope_to_table_type(envelope_type); + + return record_type{ + .comps = record_schema_components{ + .key_identifier = std::nullopt, + .val_identifier = std::move(val_id), + .is_debezium = true, + }, + .type = std::move(ret_type), + .key_field_names + = [this]() -> std::optional> { + if (!_cached_key_field_names) { + return std::nullopt; + } + return _cached_key_field_names->copy(); + }(), + }; +} + +ss::future> +debezium_translator::translate_data( + model::partition_id pid, + kafka::offset o, + std::optional key, + const std::optional& val_type, + std::optional parsable_val, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers) { + // Tombstone: both fields nullopt. + if (!parsable_val.has_value()) { + co_return translated_record{ + .data_row = std::nullopt, + .delete_key = std::nullopt, + }; + } + + if (!key.has_value()) { + vlog(datalake_log.warn, "Debezium record missing key, routing to DLQ"); + co_return errc::translation_error; + } + + if (!val_type.has_value()) { + vlog( + datalake_log.error, + "Must have parsed schema when using debezium mode"); + co_return errc::unexpected_schema; + } + + // Deserialize the full envelope. + auto& resolved = *val_type.value(); + auto translated_val = co_await std::visit( + value_translating_visitor{std::move(*parsable_val), resolved.type}, + resolved.schema.get_schema_ref()); + if (translated_val.has_error()) { + vlog( + datalake_log.warn, + "Error converting Debezium envelope: {}", + translated_val.error()); + co_return errc::translation_error; + } + + auto& envelope_struct = std::get>( + translated_val.value().value()); + + // Look up field positions in the envelope schema. + auto& envelope_type = std::get(resolved.type); + auto op_idx = find_field_idx(envelope_type, "op"); + auto before_idx = find_field_idx(envelope_type, "before"); + auto after_idx = find_field_idx(envelope_type, "after"); + + if (!op_idx.has_value() || !after_idx.has_value()) { + vlog( + datalake_log.error, + "Debezium envelope missing required fields (op/after)"); + co_return errc::translation_error; + } + + auto* op_buf = extract_string_buf(envelope_struct->fields[*op_idx]); + if (!op_buf) { + vlog(datalake_log.error, "Debezium envelope 'op' is not a string"); + co_return errc::translation_error; + } + + // Get the inner table struct_type from the envelope schema. + auto* after_type_ptr = get_inner_struct(*envelope_type.fields[*after_idx]); + if (!after_type_ptr) { + vlog( + datalake_log.error, + "Debezium envelope 'after' field is not a struct type"); + co_return errc::translation_error; + } + const auto& after_type = *after_type_ptr; + + // Resolve key field IDs on first call. + if (!_cached_key_field_names) { + auto key_type_res = co_await _key_resolver.resolve_buf_type( + key->copy()); + if (key_type_res.has_error()) { + vlog( + datalake_log.warn, + "Failed to resolve key schema: {}", + key_type_res.error()); + co_return errc::translation_error; + } + auto& key_resolved = key_type_res.value(); + if (key_resolved.type.has_value()) { + auto& key_iceberg_type = std::get( + key_resolved.type.value()->type); + chunked_vector names; + for (const auto& key_field : key_iceberg_type.fields) { + names.emplace_back(key_field->name); + } + _cached_key_field_names = std::move(names); + } else { + _cached_key_field_names.emplace(); + } + } + + // Helper: extract the struct_value from the before or after envelope field. + auto get_inner_value = + [](std::optional& field) -> iceberg::struct_value* { + if (!field.has_value()) { + return nullptr; + } + auto* sv = std::get_if>( + &field.value()); + if (!sv || !*sv) { + return nullptr; + } + return sv->get(); + }; + + auto* after_value = get_inner_value(envelope_struct->fields[*after_idx]); + iceberg::struct_value* before_value = nullptr; + if (before_idx.has_value()) { + before_value = get_inner_value(envelope_struct->fields[*before_idx]); + } + + // Helper: build a data row from the after struct value, following the + // structured_data_translator pattern. + auto build_data_row = + [&]( + iceberg::struct_value* inner) -> std::optional { + if (!inner) { + return std::nullopt; + } + auto ret_data = iceberg::struct_value{}; + auto system_data = build_rp_struct( + pid, o, key->copy(), ts, ts_t, headers); + ret_data.fields.emplace_back(std::move(system_data)); + + auto redpanda_field_idx = get_redpanda_idx(after_type); + for (size_t i = 0; i < inner->fields.size(); ++i) { + auto& field = inner->fields[i]; + if (redpanda_field_idx == i) { + rp_struct_value(ret_data).fields.emplace_back(std::move(field)); + continue; + } + ret_data.fields.emplace_back(std::move(field)); + } + return ret_data; + }; + + // Helper: extract key fields from a struct_value. + auto extract_key = + [&](iceberg::struct_value* src) -> std::optional { + if ( + !src || !_cached_key_field_names + || _cached_key_field_names->empty()) { + return std::nullopt; + } + return extract_key_fields(*src, after_type, *_cached_key_field_names); + }; + + const auto& op = *op_buf; + if (op == "c" || op == "r") { + auto data_row = build_data_row(after_value); + if (!data_row.has_value()) { + vlog( + datalake_log.warn, "Debezium create/read op but 'after' is null"); + co_return errc::translation_error; + } + co_return translated_record{ + .data_row = std::move(data_row), + .delete_key = std::nullopt, + }; + } + + if (op == "u") { + auto data_row = build_data_row(after_value); + if (!data_row.has_value()) { + vlog(datalake_log.warn, "Debezium update op but 'after' is null"); + co_return errc::translation_error; + } + auto delete_key = extract_key(before_value); + co_return translated_record{ + .data_row = std::move(data_row), + .delete_key = std::move(delete_key), + }; + } + + if (op == "d") { + auto delete_key = extract_key(before_value); + co_return translated_record{ + .data_row = std::nullopt, + .delete_key = std::move(delete_key), + }; + } + + vlog(datalake_log.warn, "Unsupported Debezium op type"); + co_return errc::translation_error; +} + +} // namespace datalake diff --git a/src/v/datalake/debezium_translator.h b/src/v/datalake/debezium_translator.h new file mode 100644 index 0000000000000..95cd71cb2012d --- /dev/null +++ b/src/v/datalake/debezium_translator.h @@ -0,0 +1,52 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "datalake/record_schema_resolver.h" +#include "datalake/record_translator.h" +#include "iceberg/datatypes.h" + +namespace datalake { + +/// Extract the inner table schema from a Debezium CDC envelope struct +/// type. Returns the "after" field's struct with all fields made optional, +/// merged into the standard schemaless struct (with redpanda system columns). +/// Used by both the translator and the coordinator to agree on the table +/// schema. +iceberg::struct_type +debezium_envelope_to_table_type(const iceberg::struct_type& envelope_type); + +/// Translates Debezium CDC envelope records into Iceberg rows with +/// insert/upsert/delete semantics based on the envelope's `op` field. +class debezium_translator : public record_translator { +public: + explicit debezium_translator(type_resolver& key_resolver); + + record_type + build_type(std::optional val_type) override; + + ss::future> translate_data( + model::partition_id pid, + kafka::offset o, + std::optional key, + const std::optional& val_type, + std::optional parsable_val, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers) override; + + ~debezium_translator() override = default; + +private: + type_resolver& _key_resolver; + std::optional> _cached_key_field_names; +}; + +} // namespace datalake diff --git a/src/v/datalake/partitioning_writer.cc b/src/v/datalake/partitioning_writer.cc index 6ae7ddf0f9e16..d24d1a3090d7c 100644 --- a/src/v/datalake/partitioning_writer.cc +++ b/src/v/datalake/partitioning_writer.cc @@ -140,6 +140,10 @@ partitioning_writer::finish() && { file_res.value().size_bytes, file_res.value().path()); + std::optional> file_key_ids; + if (key_field_ids_) { + file_key_ids = key_field_ids_->copy(); + } files.push_back( partitioned_file{ .local_file = std::move(file_res.value()), @@ -147,7 +151,8 @@ partitioning_writer::finish() && { .schema_id = schema_id_, .partition_spec_id = spec_.spec_id, .partition_key = std::move(pk), - .partition_key_path = std::move(partition_key_path_res.value())}); + .partition_key_path = std::move(partition_key_path_res.value()), + .key_field_ids = std::move(file_key_ids)}); } vlog( diff --git a/src/v/datalake/partitioning_writer.h b/src/v/datalake/partitioning_writer.h index 7af81952d2633..183b7486f3429 100644 --- a/src/v/datalake/partitioning_writer.h +++ b/src/v/datalake/partitioning_writer.h @@ -63,10 +63,22 @@ class partitioning_writer { iceberg::partition_spec::id_t partition_spec_id; iceberg::partition_key partition_key; remote_path partition_key_path; + /// When set, this file participates in upsert/delete operations. + std::optional> + key_field_ids; friend std::ostream& operator<<(std::ostream&, const partitioned_file&); }; + void set_key_field_ids(chunked_vector ids) { + key_field_ids_.emplace(std::move(ids)); + } + + const iceberg::struct_type& type() const { return type_; } + iceberg::schema::id_t schema_id() const { return schema_id_; } + const iceberg::partition_spec& partition_spec() const { return spec_; } + const remote_path& remote_prefix() const { return remote_prefix_; } + // Finishes and returns the list of local files written by the underlying // writers, with the appropriate partitioning metadata filled in. ss::future, writer_error>> @@ -84,6 +96,8 @@ class partitioning_writer { iceberg::partition_spec spec_; remote_path remote_prefix_; + std::optional> key_field_ids_; + // Map of partition keys to their corresponding data file writers. chunked_hash_map< iceberg::partition_key, diff --git a/src/v/datalake/record_multiplexer.cc b/src/v/datalake/record_multiplexer.cc index b9f9cadc5d7eb..93efe81288767 100644 --- a/src/v/datalake/record_multiplexer.cc +++ b/src/v/datalake/record_multiplexer.cc @@ -31,6 +31,23 @@ namespace datalake { namespace { +/// Build a struct_type for just the key columns by extracting fields whose +/// IDs match key_field_ids from the full type. +iceberg::struct_type extract_key_type( + const iceberg::struct_type& full_type, + const chunked_vector& key_field_ids) { + iceberg::struct_type key_type; + for (const auto& key_id : key_field_ids) { + for (const auto& field : full_type.fields) { + if (field->id == key_id) { + key_type.fields.push_back(field->copy()); + break; + } + } + } + return key_type; +} + // Get the data location for the table. Some catalogs require using the property // `write.data.path`. Otherwise, it defaults to /data. iceberg::uri get_data_location(const schema_manager::table_info& table_info) { @@ -227,6 +244,11 @@ ss::future record_multiplexer::do_multiplex( continue; } } + auto& translated = record_data_res.value(); + + if (!translated.data_row && !translated.delete_key) { + continue; + } auto& val_type = val_type_res.value().type; record_schema_components comps{ .key_identifier = std::nullopt, @@ -326,33 +348,108 @@ ss::future record_multiplexer::do_multiplex( co_return ss::stop_iteration::yes; } - auto [iter, _] = _writers.emplace( - record_type.comps, - std::make_unique( - *_writer_factory, - load_res.value().schema.schema_id, - std::move(record_type.type), - std::move(load_res.value().partition_spec), - std::move(data_remote_path.value()))); + auto sw = schema_writer{}; + + auto data_writer = std::make_unique( + *_writer_factory, + load_res.value().schema.schema_id, + std::move(record_type.type), + std::move(load_res.value().partition_spec), + std::move(data_remote_path.value())); + + if (record_type.key_field_names) { + // Resolve key field names to IDs using the type that + // fill_registered_ids just populated with catalog IDs. + chunked_vector key_ids; + for (const auto& name : *record_type.key_field_names) { + for (const auto& f : data_writer->type().fields) { + if (f->name == name) { + key_ids.emplace_back(f->id); + break; + } + } + } + data_writer->set_key_field_ids(key_ids.copy()); + sw.key_field_ids = std::make_optional(std::move(key_ids)); + } + sw.data_writer = std::move(data_writer); + + auto [iter, _] = _writers.emplace(record_type.comps, std::move(sw)); writer_iter = iter; } - auto& writer = writer_iter->second; - auto add_data_result = co_await writer->add_data( - std::move(record_data_res.value()), estimated_size, as); + auto& sw = writer_iter->second; + if (translated.data_row) { + auto add_data_result = co_await sw.data_writer->add_data( + std::move(*translated.data_row), estimated_size, as); + + if (add_data_result != writer_error::ok) { + vlogl( + _log, + is_recoverable_error(add_data_result) ? ss::log_level::debug + : ss::log_level::warn, + "Error adding data to writer for record {}: {}", + offset, + add_data_result); + _error = add_data_result; + co_return ss::stop_iteration::yes; + } + } - if (add_data_result != writer_error::ok) { - vlogl( - _log, - is_recoverable_error(add_data_result) ? ss::log_level::debug - : ss::log_level::warn, - "Error adding data to writer for record {}: {}", - offset, - add_data_result); - _error = add_data_result; - // If a write fails, the writer is left in an indeterminate state, - // we cannot continue in this case. - co_return ss::stop_iteration::yes; + if (translated.delete_key && sw.key_field_ids) { + if (!sw.delete_writer) { + auto key_type = extract_key_type( + sw.data_writer->type(), *sw.key_field_ids); + + // Validate that the delete key columns include all + // partition source columns. Without this, equality + // deletes would be scoped to the wrong partition and + // fail to match the data files they intend to delete. + const auto& pspec = sw.data_writer->partition_spec(); + for (const auto& pf : pspec.fields) { + bool found = false; + for (const auto& kf : key_type.fields) { + if (kf->id == pf.source_id) { + found = true; + break; + } + } + if (!found) { + vlogl( + _log, + ss::log_level::warn, + "Partition source field {} not found in " + "delete key columns; equality deletes " + "require key fields to be a superset of " + "partition source columns", + pf.source_id); + _error = writer_error::unknown_error; + co_return ss::stop_iteration::yes; + } + } + + auto delete_writer = std::make_unique( + *_writer_factory, + sw.data_writer->schema_id(), + std::move(key_type), + pspec.copy(), + sw.data_writer->remote_prefix()); + delete_writer->set_key_field_ids(sw.key_field_ids->copy()); + sw.delete_writer = std::move(delete_writer); + } + auto add_del_result = co_await sw.delete_writer->add_data( + std::move(*translated.delete_key), estimated_size, as); + if (add_del_result != writer_error::ok) { + vlogl( + _log, + is_recoverable_error(add_del_result) ? ss::log_level::debug + : ss::log_level::warn, + "Error adding delete key to writer for record {}: {}", + offset, + add_del_result); + _error = add_del_result; + co_return ss::stop_iteration::yes; + } } // TODO: we want to ensure we're using an offset translating reader so @@ -379,8 +476,15 @@ ss::future record_multiplexer::flush_writers() { co_return *_error; } auto result = co_await ss::coroutine::as_future( - ss::max_concurrent_for_each( - _writers, 10, [](auto& entry) { return entry.second->flush(); })); + ss::max_concurrent_for_each(_writers, 10, [](auto& entry) { + auto& sw = entry.second; + return sw.data_writer->flush().then([&sw] { + if (sw.delete_writer) { + return sw.delete_writer->flush(); + } + return ss::make_ready_future<>(); + }); + })); if (result.failed()) { auto ex = result.get_exception(); vlog(_log.warn, "Error flushing writers: {}", ex); @@ -400,8 +504,8 @@ record_multiplexer::finish( _reader_bytes_processed); auto writers = std::move(_writers); - for (auto& [id, writer] : writers) { - auto res = co_await std::move(*writer).finish(); + for (auto& [id, sw] : writers) { + auto res = co_await std::move(*sw.data_writer).finish(); if (res.has_error()) { vlog(_log.trace, "writer finish error: {}", res.error()); _error = res.error(); @@ -413,6 +517,27 @@ record_multiplexer::finish( files.begin(), files.end(), std::back_inserter(finished_files.data_files)); + + if (sw.delete_writer) { + auto del_res = co_await std::move(*sw.delete_writer).finish(); + if (del_res.has_error()) { + vlog( + _log.trace, + "delete writer finish error: {}", + del_res.error()); + _error = del_res.error(); + continue; + } + auto& del_files = del_res.value(); + vlog( + _log.trace, + "delete writer finished: files_created={}", + del_files.size()); + std::move( + del_files.begin(), + del_files.end(), + std::back_inserter(finished_files.delete_files)); + } } if (_invalid_record_writer) { auto writer = std::move(_invalid_record_writer); @@ -454,16 +579,22 @@ record_multiplexer::finish( size_t record_multiplexer::buffered_bytes() const { size_t result = 0; - for (const auto& [_, writer] : _writers) { - result += writer->buffered_bytes(); + for (const auto& [_, sw] : _writers) { + result += sw.data_writer->buffered_bytes(); + if (sw.delete_writer) { + result += sw.delete_writer->buffered_bytes(); + } } return result; } size_t record_multiplexer::flushed_bytes() const { size_t result = 0; - for (const auto& [_, writer] : _writers) { - result += writer->flushed_bytes(); + for (const auto& [_, sw] : _writers) { + result += sw.data_writer->flushed_bytes(); + if (sw.delete_writer) { + result += sw.delete_writer->flushed_bytes(); + } } return result; } @@ -617,7 +748,7 @@ record_multiplexer::handle_invalid_record( _result.value().last_offset = offset; auto add_data_err = co_await _invalid_record_writer->add_data( - std::move(record_data_res.value()), estimated_size, as); + std::move(*record_data_res.value().data_row), estimated_size, as); if (add_data_err != writer_error::ok) { vlog( diff --git a/src/v/datalake/record_multiplexer.h b/src/v/datalake/record_multiplexer.h index 52d3e35a35ba8..abbcc13b47993 100644 --- a/src/v/datalake/record_multiplexer.h +++ b/src/v/datalake/record_multiplexer.h @@ -109,6 +109,8 @@ class record_multiplexer { // vector containing a list of files that were written during // translation. chunked_vector data_files; + // equality delete files for upsert/delete operations + chunked_vector delete_files; // files with invalid records chunked_vector dlq_files; }; @@ -150,10 +152,16 @@ class record_multiplexer { location_provider _location_provider; translation_probe& _translation_probe; [[maybe_unused]] features::feature_table* _features; - chunked_hash_map< - record_schema_components, - std::unique_ptr> - _writers; + struct schema_writer { + std::unique_ptr data_writer; + /// Delete key writer, created lazily when the first delete_key is seen. + std::unique_ptr delete_writer; + /// Key field IDs from record_type, used to set delete_key_field_ids + /// on coordinator data_file entries. + std::optional> + key_field_ids; + }; + chunked_hash_map _writers; std::unique_ptr _invalid_record_writer; std::optional _error; diff --git a/src/v/datalake/record_translator.cc b/src/v/datalake/record_translator.cc index f090c2e8fdf4f..50a998a42013f 100644 --- a/src/v/datalake/record_translator.cc +++ b/src/v/datalake/record_translator.cc @@ -71,56 +71,6 @@ std::optional get_redpanda_idx(const iceberg::struct_type& val_type) { return std::nullopt; } -// Builds a struct value meant to be used as the base of the "redpanda" struct. -// Additional fields specific to the mode (e.g. "value" for key-value mode) may -// be appended to the end. -std::unique_ptr build_rp_struct( - model::partition_id pid, - kafka::offset o, - std::optional key, - model::timestamp ts, - model::timestamp_type ts_t, - const chunked_vector& headers) { - auto system_data = std::make_unique(); - system_data->fields.reserve(6); - - system_data->fields.emplace_back(iceberg::int_value(pid)); - system_data->fields.emplace_back(iceberg::long_value(o)); - // NOTE: Kafka uses milliseconds, Iceberg uses microseconds. - system_data->fields.emplace_back( - iceberg::timestamptz_value(ts.value() * 1000)); - - if (headers.empty()) { - system_data->fields.emplace_back(std::nullopt); - } else { - auto headers_list = std::make_unique(); - for (const auto& hdr : headers) { - auto header_kv_struct = std::make_unique(); - header_kv_struct->fields.emplace_back( - hdr.key_size() >= 0 ? std::make_optional( - iceberg::string_value(hdr.key().copy())) - : std::nullopt); - header_kv_struct->fields.emplace_back( - hdr.value_size() >= 0 - ? std::make_optional( - iceberg::binary_value(hdr.value().copy())) - : std::nullopt); - headers_list->elements.emplace_back(std::move(header_kv_struct)); - } - system_data->fields.emplace_back(std::move(headers_list)); - } - - system_data->fields.emplace_back( - key ? std::make_optional( - iceberg::binary_value(std::move(*key))) - : std::nullopt); - - system_data->fields.emplace_back( - iceberg::int_value{static_cast(ts_t)}); - - return system_data; -} - } // namespace std::ostream& operator<<(std::ostream& o, const record_translator::errc& e) { @@ -140,7 +90,7 @@ default_translator::build_type(std::optional val_type) { return kv_translator.build_type(std::move(val_type)); } -ss::future> +ss::future> default_translator::translate_data( model::partition_id pid, kafka::offset o, @@ -177,17 +127,21 @@ key_value_translator::build_type(std::optional) { auto ret_type = schemaless_struct_type(); ret_type.fields.emplace_back( iceberg::nested_field::create( - 11, "value", iceberg::field_required::no, iceberg::binary_type{})); + schemaless_next_field_id, + "value", + iceberg::field_required::no, + iceberg::binary_type{})); return record_type{ .comps = record_schema_components{ .key_identifier = std::nullopt, .val_identifier = std::nullopt, }, .type = std::move(ret_type), + .key_field_names = std::nullopt, }; } -ss::future> +ss::future> key_value_translator::translate_data( model::partition_id pid, kafka::offset o, @@ -212,7 +166,10 @@ key_value_translator::translate_data( parsable_val ? std::make_optional( iceberg::binary_value(std::move(*parsable_val))) : std::nullopt); - co_return ret_data; + co_return translated_record{ + .data_row = std::move(ret_data), + .delete_key = std::nullopt, + }; } record_type structured_data_translator::build_type( @@ -251,12 +208,14 @@ record_type structured_data_translator::build_type( if (field->name == rp_struct_name) { // To avoid collisions, move user fields named "redpanda" into // the nested "redpanda" system field. - auto& system_fields = std::get( - ret_type.fields[0]->type); + auto& system_fields = rp_struct_type(ret_type); // Use the next id of the system defaults. system_fields.fields.emplace_back( iceberg::nested_field::create( - 10, "data", field->required, std::move(field->type))); + schemaless_next_field_id, + "data", + field->required, + std::move(field->type))); continue; } // Add the extra user-defined fields. @@ -269,10 +228,11 @@ record_type structured_data_translator::build_type( .val_identifier = std::move(val_id), }, .type = std::move(ret_type), + .key_field_names = std::nullopt, }; } -ss::future> +ss::future> structured_data_translator::translate_data( model::partition_id pid, kafka::offset o, @@ -320,15 +280,15 @@ structured_data_translator::translate_data( if (redpanda_field_idx == i) { // To avoid collisions, move user fields named "redpanda" into // the nested "redpanda" system field. - auto& system_vals - = std::get>( - ret_data.fields[0].value()); - system_vals->fields.emplace_back(std::move(field)); + rp_struct_value(ret_data).fields.emplace_back(std::move(field)); continue; } ret_data.fields.emplace_back(std::move(field)); } - co_return ret_data; + co_return translated_record{ + .data_row = std::move(ret_data), + .delete_key = std::nullopt, + }; } } // namespace datalake diff --git a/src/v/datalake/record_translator.h b/src/v/datalake/record_translator.h index 9dcd29ed9086a..5b9f42f8c1118 100644 --- a/src/v/datalake/record_translator.h +++ b/src/v/datalake/record_translator.h @@ -24,6 +24,21 @@ namespace datalake { struct record_type { record_schema_components comps; iceberg::struct_type type; + /// When set, this translator produces upsert/delete operations. + /// These field names identify the key columns for deduplication. + /// The multiplexer resolves them to Iceberg field IDs after the + /// table schema is registered and IDs are assigned. + std::optional> key_field_names; +}; + +/// Result of translating a Kafka record. For append-only translators, +/// data_row is set and delete_key is nullopt. For CDC translators, +/// either or both may be set depending on the operation. +struct translated_record { + /// Row to insert as data. Nullopt for pure deletes. + std::optional data_row; + /// Key value to delete. Nullopt for pure inserts. + std::optional delete_key; }; class record_translator { @@ -36,7 +51,7 @@ class record_translator { virtual record_type build_type(std::optional val_type) = 0; - virtual ss::future> translate_data( + virtual ss::future> translate_data( model::partition_id pid, kafka::offset o, std::optional key, @@ -52,7 +67,7 @@ class key_value_translator : public record_translator { public: record_type build_type(std::optional val_type) override; - ss::future> translate_data( + ss::future> translate_data( model::partition_id pid, kafka::offset o, std::optional key, @@ -68,7 +83,7 @@ class structured_data_translator : public record_translator { public: record_type build_type(std::optional val_type) override; - ss::future> translate_data( + ss::future> translate_data( model::partition_id pid, kafka::offset o, std::optional key, @@ -88,7 +103,7 @@ class default_translator : public record_translator { public: record_type build_type(std::optional val_type) override; - ss::future> translate_data( + ss::future> translate_data( model::partition_id pid, kafka::offset o, std::optional key, diff --git a/src/v/datalake/schema_descriptor.h b/src/v/datalake/schema_descriptor.h new file mode 100644 index 0000000000000..eeb673adb09ae --- /dev/null +++ b/src/v/datalake/schema_descriptor.h @@ -0,0 +1,203 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "iceberg/datatypes.h" +#include "iceberg/values.h" + +#include + +#include +#include +#include +#include +#include +#include + +namespace datalake { + +/// A compile-time string usable as a template parameter (C++20 NTTP). +template +struct ct_string { + char data[N]{}; + constexpr ct_string(const char (&s)[N]) { std::copy_n(s, N, data); } + constexpr operator std::string_view() const { return {data, N - 1}; } + constexpr auto operator<=>(const ct_string&) const = default; +}; + +/// A field descriptor: compile-time name + iceberg type. +template +struct field_desc { + static constexpr std::string_view name{Name}; + using type = T; +}; + +/// Forward declarations. +template +struct struct_desc; + +template +struct list_desc; + +/// Find the index of a field by name. Fails to compile if not found. +template +consteval size_t index_of_fn() { + constexpr std::array names = {Fields::name...}; + for (size_t i = 0; i < names.size(); ++i) { + if (names[i] == std::string_view{Name}) { + return i; + } + } + // If we reach here, the name was not found. This is consteval, + // so the compiler will reject it with an error pointing here. + throw "field name not found in descriptor"; +} + +/// Check that given names match field names in order. +/// Uses an array of field names passed directly to avoid partial +/// specialization. +template +consteval bool +names_match_fn(std::array field_names) { + constexpr std::array given = {std::string_view{Names}...}; + for (size_t i = 0; i < field_names.size(); ++i) { + if (field_names[i] != given[i]) { + return false; + } + } + return true; +} + +/// A named value for use with struct_desc::build_value. The name is +/// checked at compile time to match the corresponding field descriptor. +template +struct val { + std::optional v; + + template + requires std::constructible_from, U> + val(U&& u) // NOLINT(google-explicit-constructor) + : v(std::forward(u)) {} +}; + +template +struct struct_desc { + static constexpr size_t count = sizeof...(Fields); + + /// Compile-time index of a field by name. + template + static constexpr size_t index_of = index_of_fn(); + + /// Total number of fields in the tree (used for ID assignment). + static int total_fields() { + int next_id = 0; + build_impl(next_id); + return next_id; + } + + /// Build the runtime iceberg::struct_type. + static iceberg::struct_type build() { + int next_id = 0; + return build_impl(next_id); + } + +private: + template + friend struct struct_desc; + template + friend struct list_desc; + + static iceberg::struct_type build_impl(int& next_id) { + iceberg::struct_type st; + auto add = [&]() { + int my_id = next_id++; + st.fields.push_back( + iceberg::nested_field::create( + my_id, + ss::sstring{F::name}, + iceberg::field_required::no, + build_iceberg_type(next_id))); + }; + (add.template operator()(), ...); + return st; + } + +public: + /// Build a struct_value with compile-time name and arity checking. + /// Each argument is a val<"field_name"> matching the descriptor. + /// + /// Wrong arity, wrong name, or wrong order = compile error. + template + static std::unique_ptr + build_value(val... args) { + static_assert(sizeof...(Names) == count, "wrong number of fields"); + static_assert( + names_match_fn({Fields::name...}), + "field names don't match descriptor or are in wrong order"); + auto sv = std::make_unique(); + sv->fields.reserve(count); + (sv->fields.emplace_back(std::move(args.v)), ...); + return sv; + } + +private: + template + static constexpr bool is_nested_v = false; + template + static constexpr bool is_nested_v> = true; + template + static constexpr bool is_nested_v> = true; + + template + static auto build_iceberg_type(int& next_id) { + if constexpr (is_nested_v) { + return F::type::build_impl(next_id); + } else { + return typename F::type{}; + } + } +}; + +template +struct list_desc; + +template +struct list_desc> { + static iceberg::list_type build_impl(int& next_id) { + int element_id = next_id++; + return iceberg::list_type::create( + element_id, + iceberg::field_required::no, + struct_desc::build_impl(next_id)); + } +}; + +template +iceberg::nested_field& type_field(iceberg::struct_type& st) { + return *st.fields[Desc::template index_of]; +} + +template +const iceberg::nested_field& type_field(const iceberg::struct_type& st) { + return *st.fields[Desc::template index_of]; +} + +template +std::optional& value_field(iceberg::struct_value& sv) { + return sv.fields[Desc::template index_of]; +} + +template +const std::optional& +value_field(const iceberg::struct_value& sv) { + return sv.fields[Desc::template index_of]; +} + +} // namespace datalake diff --git a/src/v/datalake/schema_identifier.h b/src/v/datalake/schema_identifier.h index 20ba70ef409b2..648477b963815 100644 --- a/src/v/datalake/schema_identifier.h +++ b/src/v/datalake/schema_identifier.h @@ -34,12 +34,18 @@ struct schema_identifier struct record_schema_components : serde::envelope< record_schema_components, - serde::version<0>, + serde::version<1>, serde::compat_version<0>> { - auto serde_fields() { return std::tie(key_identifier, val_identifier); } + auto serde_fields() { + return std::tie(key_identifier, val_identifier, is_debezium); + } std::optional key_identifier; std::optional val_identifier; + // Whether to interpret the value schema as a Debezium CDC envelope. + // When true, the coordinator extracts the inner "after" struct + // from the Debezium envelope rather than using the full envelope. + bool is_debezium{false}; bool operator==(const record_schema_components&) const = default; }; @@ -72,6 +78,7 @@ struct hash { boost::hash_combine( h, hash()(*c.val_identifier)); } + boost::hash_combine(h, hash()(c.is_debezium)); return h; } }; diff --git a/src/v/datalake/table_definition.cc b/src/v/datalake/table_definition.cc index 63aac59c9d282..9d175367b7621 100644 --- a/src/v/datalake/table_definition.cc +++ b/src/v/datalake/table_definition.cc @@ -10,52 +10,61 @@ #include "datalake/table_definition.h" namespace datalake { -using namespace iceberg; -struct_type schemaless_struct_type() { - using namespace iceberg; - struct_type system_fields; - system_fields.fields.emplace_back( - nested_field::create(2, "partition", field_required::no, int_type{})); - system_fields.fields.emplace_back( - nested_field::create(3, "offset", field_required::no, long_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 4, "timestamp", field_required::no, timestamptz_type{})); - - struct_type headers_kv; - headers_kv.fields.emplace_back( - nested_field::create(7, "key", field_required::no, string_type{})); - headers_kv.fields.emplace_back( - nested_field::create(8, "value", field_required::no, binary_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 5, - "headers", - field_required::no, - list_type::create(6, field_required::no, std::move(headers_kv)))); - - system_fields.fields.emplace_back( - nested_field::create(9, "key", field_required::no, binary_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 10, "timestamp_type", field_required::no, int_type{})); - struct_type res; - res.fields.emplace_back( - nested_field::create( - 1, - ss::sstring{rp_struct_name}, - field_required::no, - std::move(system_fields))); - - return res; + +iceberg::struct_type schemaless_struct_type() { + return schemaless_desc::build(); } -schema default_schema() { - return { +iceberg::schema default_schema() { + return iceberg::schema{ .schema_struct = schemaless_struct_type(), - .schema_id = iceberg::schema::id_t{0}, + .schema_id = iceberg::schema::default_id, .identifier_field_ids = {}, }; } +namespace { + +std::optional +build_headers_value(const chunked_vector& headers) { + if (headers.empty()) { + return std::nullopt; + } + auto hdr_list = std::make_unique(); + for (const auto& hdr : headers) { + auto kv = header_kv_desc::build_value( + val<"key">( + hdr.key_size() >= 0 ? std::make_optional( + iceberg::string_value(hdr.key().copy())) + : std::nullopt), + val<"value">( + hdr.value_size() >= 0 ? std::make_optional( + iceberg::binary_value(hdr.value().copy())) + : std::nullopt)); + hdr_list->elements.emplace_back(std::move(kv)); + } + return hdr_list; +} + +} // namespace + +std::unique_ptr build_rp_struct( + model::partition_id pid, + kafka::offset o, + std::optional key, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers) { + return rp_desc::build_value( + val<"partition">(iceberg::int_value(pid)), + val<"offset">(iceberg::long_value(o)), + val<"timestamp">(iceberg::timestamptz_value(ts.value() * 1000)), + val<"headers">(build_headers_value(headers)), + val<"key">( + key ? std::make_optional( + iceberg::binary_value(std::move(*key))) + : std::nullopt), + val<"timestamp_type">(iceberg::int_value{static_cast(ts_t)})); +} + } // namespace datalake diff --git a/src/v/datalake/table_definition.h b/src/v/datalake/table_definition.h index 6eb8f70791f01..769debbc748d7 100644 --- a/src/v/datalake/table_definition.h +++ b/src/v/datalake/table_definition.h @@ -9,16 +9,70 @@ */ #pragma once +#include "datalake/schema_descriptor.h" #include "iceberg/schema.h" +#include "model/fundamental.h" +#include "model/record.h" +#include "model/timestamp.h" namespace datalake { -// Definitions for default table metadata. +// The schema is defined once here as a type. All field ordering, +// naming, and type information is derived from this single source. +// Adding, removing, or reordering fields here automatically updates: +// - schemaless_struct_type() (runtime struct_type) +// - build_rp_struct() (runtime struct_value) +// - All typed field accessors -// Contains some minimal fields used for all tables, even those with no schemas. -// TODO: rename to redpanda_fields_struct_type? +/// Header key/value struct inside the headers list. +using header_kv_desc = struct_desc< + field_desc<"key", iceberg::string_type>, + field_desc<"value", iceberg::binary_type>>; + +/// The redpanda system struct. +using rp_desc = struct_desc< + field_desc<"partition", iceberg::int_type>, + field_desc<"offset", iceberg::long_type>, + field_desc<"timestamp", iceberg::timestamptz_type>, + field_desc<"headers", list_desc>, + field_desc<"key", iceberg::binary_type>, + field_desc<"timestamp_type", iceberg::int_type>>; + +/// The top-level schemaless table struct. +using schemaless_desc = struct_desc>; + +inline constexpr std::string_view rp_struct_name = "redpanda"; + +/// Next available pre-assignment field ID after the schemaless struct. +/// Translators that add fields should start IDs from here. +inline const int schemaless_next_field_id = schemaless_desc::total_fields(); + +/// Build the runtime struct_type from the compile-time descriptor. iceberg::struct_type schemaless_struct_type(); + +/// Build the default iceberg schema. iceberg::schema default_schema(); -inline constexpr std::string_view rp_struct_name = "redpanda"; + +/// Build the redpanda system struct_value. Single definition used +/// by all translators. +std::unique_ptr build_rp_struct( + model::partition_id pid, + kafka::offset o, + std::optional key, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers); + +/// Get the redpanda struct_type from a schemaless struct_type. +inline iceberg::struct_type& rp_struct_type(iceberg::struct_type& schemaless) { + return std::get( + type_field(schemaless).type); +} + +/// Get the redpanda struct_value from a data row. +inline iceberg::struct_value& rp_struct_value(iceberg::struct_value& row) { + return *std::get>( + value_field(row).value()); +} } // namespace datalake diff --git a/src/v/datalake/tests/BUILD b/src/v/datalake/tests/BUILD index 87541e485dc1a..65e63bc846286 100644 --- a/src/v/datalake/tests/BUILD +++ b/src/v/datalake/tests/BUILD @@ -213,6 +213,27 @@ redpanda_cc_gtest( ], ) +redpanda_cc_gtest( + name = "debezium_translator_test", + timeout = "short", + srcs = [ + "debezium_translator_test.cc", + ], + cpu = 1, + deps = [ + "//src/v/datalake:debezium_translator", + "//src/v/datalake:record_schema_resolver", + "//src/v/iceberg:datatypes", + "//src/v/iceberg:values", + "//src/v/model", + "//src/v/schema/tests:fake_registry", + "//src/v/test_utils:gtest", + "@avro", + "@googletest//:gtest", + "@seastar", + ], +) + redpanda_cc_gtest( name = "schema_registry_test", timeout = "short", diff --git a/src/v/datalake/tests/debezium_translator_test.cc b/src/v/datalake/tests/debezium_translator_test.cc new file mode 100644 index 0000000000000..85baac291383b --- /dev/null +++ b/src/v/datalake/tests/debezium_translator_test.cc @@ -0,0 +1,433 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/debezium_translator.h" +#include "datalake/record_schema_resolver.h" +#include "iceberg/datatypes.h" +#include "iceberg/values.h" +#include "model/fundamental.h" +#include "model/record.h" +#include "model/timestamp.h" +#include "schema/tests/fake_registry.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace datalake; +using namespace iceberg; +using namespace pandaproxy::schema_registry; + +namespace { + +// Debezium envelope Avro schema with before/after/op/source/ts_ms. +// The inner table has two fields: "id" (key) and "name". +constexpr std::string_view debezium_envelope_schema = R"({ + "type": "record", + "name": "Envelope", + "namespace": "test.debezium", + "fields": [ + { + "name": "before", + "type": ["null", { + "type": "record", + "name": "Value", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }], + "default": null + }, + { + "name": "after", + "type": ["null", "Value"], + "default": null + }, + { + "name": "source", + "type": { + "type": "record", + "name": "Source", + "fields": [ + {"name": "connector", "type": "string"} + ] + } + }, + { + "name": "op", + "type": "string" + }, + { + "name": "ts_ms", + "type": ["null", "long"], + "default": null + } + ] +})"; + +// Key schema matching the "id" field of the inner table. +constexpr std::string_view key_schema = R"({ + "type": "record", + "name": "Key", + "namespace": "test.debezium", + "fields": [ + {"name": "id", "type": "int"} + ] +})"; + +// Encode an Avro GenericDatum to iobuf with schema registry wire format +// (magic byte 0x00 + 4 byte big-endian schema id). +iobuf encode_avro_with_schema_id( + const avro::GenericDatum& datum, int32_t schema_id) { + std::unique_ptr out = avro::memoryOutputStream(); + avro::EncoderPtr encoder = avro::binaryEncoder(); + encoder->init(*out); + avro::encode(*encoder, datum); + encoder->flush(); + auto snap = avro::snapshot(*out); + + iobuf buf; + buf.append("\0", 1); + int32_t encoded_id = ss::cpu_to_be(schema_id); + buf.append(reinterpret_cast(&encoded_id), 4); + buf.append(snap->data(), snap->size()); + return buf; +} + +// Build a Debezium envelope datum with the given op, before, and after values. +avro::GenericDatum make_envelope_datum( + const avro::ValidSchema& envelope_schema, + std::string_view op, + std::optional> before, + std::optional> after) { + avro::GenericDatum datum(envelope_schema); + auto& envelope = datum.value(); + + // before: union [null, Value] + auto& before_datum = envelope.field("before"); + if (before.has_value()) { + before_datum.selectBranch(1); + auto& before_rec = before_datum.value(); + before_rec.field("id").value() = before->first; + before_rec.field("name").value() = before->second; + } else { + before_datum.selectBranch(0); + } + + // after: union [null, Value] + auto& after_datum = envelope.field("after"); + if (after.has_value()) { + after_datum.selectBranch(1); + auto& after_rec = after_datum.value(); + after_rec.field("id").value() = after->first; + after_rec.field("name").value() = after->second; + } else { + after_datum.selectBranch(0); + } + + // source + auto& source = envelope.field("source").value(); + source.field("connector").value() = "mysql"; + + // op + envelope.field("op").value() = std::string(op); + + // ts_ms: union [null, long] + auto& ts_ms = envelope.field("ts_ms"); + ts_ms.selectBranch(1); + ts_ms.value() = 1234567890; + + return datum; +} + +avro::GenericDatum +make_key_datum(const avro::ValidSchema& key_avro_schema, int32_t id) { + avro::GenericDatum datum(key_avro_schema); + auto& rec = datum.value(); + rec.field("id").value() = id; + return datum; +} + +} // namespace + +class DebeziumTranslatorTest : public ::testing::Test { +public: + DebeziumTranslatorTest() + : sr(std::make_unique()) + , key_resolver(*sr, std::nullopt, std::nullopt) + , val_resolver(*sr, std::nullopt, std::nullopt) {} + + void SetUp() override { + // Register envelope schema (id=1). + auto envelope_id = sr->create_schema( + subject_schema{ + context_subject::unqualified("test-value"), + schema_definition{ + debezium_envelope_schema, + schema_type::avro}}) + .get(); + ASSERT_EQ(1, envelope_id.id()); + + // Register key schema (id=2). + auto k_id = sr->create_schema( + subject_schema{ + context_subject::unqualified("test-key"), + schema_definition{key_schema, schema_type::avro}}) + .get(); + ASSERT_EQ(2, k_id.id()); + + envelope_avro = avro::compileJsonSchemaFromString( + std::string(debezium_envelope_schema)); + key_avro = avro::compileJsonSchemaFromString(std::string(key_schema)); + } + + // Resolve the envelope type from a raw iobuf (with schema ID prefix). + shared_resolved_type_t resolve_envelope(iobuf buf) { + auto res = val_resolver.resolve_buf_type(std::move(buf)).get(); + EXPECT_TRUE(res.has_value()); + auto& tb = res.value(); + EXPECT_TRUE(tb.type.has_value()); + return tb.type.value(); + } + + iobuf encode_envelope( + std::string_view op, + std::optional> before, + std::optional> after) { + auto datum = make_envelope_datum(envelope_avro, op, before, after); + return encode_avro_with_schema_id(datum, 1); + } + + iobuf encode_key(int32_t id) { + auto datum = make_key_datum(key_avro, id); + return encode_avro_with_schema_id(datum, 2); + } + + // Strip the schema ID prefix to get the parsable buffer, mimicking what + // resolve_buf_type returns. + std::pair + resolve_and_strip(iobuf envelope_buf) { + auto res = val_resolver.resolve_buf_type(std::move(envelope_buf)).get(); + EXPECT_TRUE(res.has_value()); + auto& tb = res.value(); + EXPECT_TRUE(tb.type.has_value()); + return {tb.type.value(), std::move(tb.parsable_buf.value())}; + } + + std::unique_ptr sr; + record_schema_resolver key_resolver; + record_schema_resolver val_resolver; + avro::ValidSchema envelope_avro; + avro::ValidSchema key_avro; + + static constexpr model::partition_id pid{0}; + static constexpr kafka::offset offset{42}; + static constexpr model::timestamp ts{1000}; + static constexpr model::timestamp_type ts_t{ + model::timestamp_type::create_time}; + chunked_vector headers; +}; + +TEST_F(DebeziumTranslatorTest, CreateOp) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("c", std::nullopt, {{1, "Alice"}}); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + auto key_buf = encode_key(1); + + auto result = translator + .translate_data( + pid, + offset, + std::move(key_buf), + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_value()); + auto& rec = result.value(); + EXPECT_TRUE(rec.data_row.has_value()); + EXPECT_FALSE(rec.delete_key.has_value()); +} + +TEST_F(DebeziumTranslatorTest, ReadOp) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("r", std::nullopt, {{2, "Bob"}}); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + auto key_buf = encode_key(2); + + auto result = translator + .translate_data( + pid, + offset, + std::move(key_buf), + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_value()); + auto& rec = result.value(); + EXPECT_TRUE(rec.data_row.has_value()); + EXPECT_FALSE(rec.delete_key.has_value()); +} + +TEST_F(DebeziumTranslatorTest, UpdateOp) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope( + "u", {{1, "Alice"}}, {{1, "Alice Updated"}}); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + auto key_buf = encode_key(1); + + auto result = translator + .translate_data( + pid, + offset, + std::move(key_buf), + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_value()); + auto& rec = result.value(); + EXPECT_TRUE(rec.data_row.has_value()); + EXPECT_TRUE(rec.delete_key.has_value()); +} + +TEST_F(DebeziumTranslatorTest, DeleteOp) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("d", {{1, "Alice"}}, std::nullopt); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + auto key_buf = encode_key(1); + + auto result = translator + .translate_data( + pid, + offset, + std::move(key_buf), + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_value()); + auto& rec = result.value(); + EXPECT_FALSE(rec.data_row.has_value()); + EXPECT_TRUE(rec.delete_key.has_value()); +} + +TEST_F(DebeziumTranslatorTest, TombstoneReturnsNullopt) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("c", std::nullopt, {{1, "Alice"}}); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + + // Tombstone: parsable_val is nullopt. + auto result = translator + .translate_data( + pid, + offset, + std::nullopt, + val_type, + std::nullopt, + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_value()); + auto& rec = result.value(); + EXPECT_FALSE(rec.data_row.has_value()); + EXPECT_FALSE(rec.delete_key.has_value()); +} + +TEST_F(DebeziumTranslatorTest, NullKeyReturnsError) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("c", std::nullopt, {{1, "Alice"}}); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + + // Null key with non-null value should error. + auto result = translator + .translate_data( + pid, + offset, + std::nullopt, + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_error()); + EXPECT_EQ(result.error(), record_translator::errc::translation_error); +} + +TEST_F(DebeziumTranslatorTest, UnknownOpReturnsError) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("t", std::nullopt, std::nullopt); + auto [val_type, parsable] = resolve_and_strip(std::move(envelope_buf)); + auto key_buf = encode_key(1); + + auto result = translator + .translate_data( + pid, + offset, + std::move(key_buf), + val_type, + std::move(parsable), + ts, + ts_t, + headers) + .get(); + + ASSERT_TRUE(result.has_error()); + EXPECT_EQ(result.error(), record_translator::errc::translation_error); +} + +TEST_F(DebeziumTranslatorTest, BuildTypeExtractsAfterFields) { + debezium_translator translator(key_resolver); + + auto envelope_buf = encode_envelope("c", std::nullopt, {{1, "Alice"}}); + auto val_type = resolve_envelope(std::move(envelope_buf)); + + auto rt = translator.build_type(val_type); + + // The result should have the redpanda system struct plus the inner table + // fields (id, name). + EXPECT_EQ(rt.comps.val_identifier.has_value(), true); + // redpanda struct + id + name = 3 top-level fields + EXPECT_EQ(rt.type.fields.size(), 3); + EXPECT_EQ(rt.type.fields[0]->name, "redpanda"); + EXPECT_EQ(rt.type.fields[1]->name, "id"); + EXPECT_EQ(rt.type.fields[2]->name, "name"); +} diff --git a/src/v/datalake/translation_task.cc b/src/v/datalake/translation_task.cc index 5e0392c7bb490..25fa9e99a06c4 100644 --- a/src/v/datalake/translation_task.cc +++ b/src/v/datalake/translation_task.cc @@ -119,11 +119,13 @@ delete_local_data_files( } ss::future> -delete_data_and_dlq_files( +delete_all_local_files( prefix_logger& log, const record_multiplexer::finished_files& files) { - auto [data_result, dlq_result] = co_await ss::when_all_succeed( - delete_local_data_files(log, files.data_files), - delete_local_data_files(log, files.dlq_files)); + auto [data_result, delete_result, dlq_result] + = co_await ss::when_all_succeed( + delete_local_data_files(log, files.data_files), + delete_local_data_files(log, files.delete_files), + delete_local_data_files(log, files.dlq_files)); if (data_result.has_error()) { vlog( @@ -132,9 +134,16 @@ delete_data_and_dlq_files( data_result.error()); co_return data_result.error(); } + if (delete_result.has_error()) { + vlog( + log.warn, + "error deleting local delete files - {}", + delete_result.error()); + co_return delete_result.error(); + } if (dlq_result.has_error()) { vlog( - log.warn, "error deleting local dlq files - {}", data_result.error()); + log.warn, "error deleting local dlq files - {}", dlq_result.error()); co_return dlq_result.error(); } co_return std::nullopt; @@ -199,6 +208,15 @@ upload_files( uploaded.hour_deprecated = get_hour(file.partition_key); } + if (file.key_field_ids) { + chunked_vector ids; + ids.reserve(file.key_field_ids->size()); + for (const auto& id : *file.key_field_ids) { + ids.push_back(id()); + } + uploaded.delete_key_field_ids = std::move(ids); + } + ret.push_back(std::move(uploaded)); } @@ -310,13 +328,13 @@ translation_task::finish( auto mux_err = mux_result.error(); vlog( _log.warn, - "Error writing data files - {}, deleting {} data files and {} DLQ " - "files", + "Error writing data files - {}, deleting {} data files, {} delete " + "files and {} DLQ files", mux_result.error(), files.data_files.size(), + files.delete_files.size(), files.dlq_files.size()); - [[maybe_unused]] auto _ = co_await delete_data_and_dlq_files( - _log, files); + [[maybe_unused]] auto _ = co_await delete_all_local_files(_log, files); co_return map_error_code(mux_err); } auto write_result = std::move(mux_result).value(); @@ -324,10 +342,11 @@ translation_task::finish( vlog( _log.trace, "translation result base offset: {}, last offset: {}, data files: " - "{}, dlq files: {}", + "{}, delete files: {}, dlq files: {}", write_result.start_offset, write_result.last_offset, files.data_files.size(), + files.delete_files.size(), files.dlq_files.size()); } @@ -367,6 +386,27 @@ translation_task::finish( ret.files = std::move(upload_res.value()); } + // Equality delete files go alongside data files since the coordinator + // distinguishes them by delete_key_field_ids. + { + auto del_upload_res = co_await upload_files( + _log, + *_cloud_io, + files.delete_files, + is_custom_partitioning_enabled, + rcn, + lazy_as); + if (del_upload_res.has_error()) { + co_return del_upload_res.error(); + } + auto& del_files = del_upload_res.value(); + for (auto& df : del_files) { + df.is_delete = true; + } + std::move( + del_files.begin(), del_files.end(), std::back_inserter(ret.files)); + } + // DLQ files. { auto dlq_upload_res = co_await upload_files( @@ -392,16 +432,16 @@ translation_task::discard() && { if (mux_result.has_error()) { vlog( _log.warn, - "Error writing data files - {}, deleting {} data files and {} DLQ " - "files", + "Error writing data files - {}, deleting {} data files, {} delete " + "files and {} DLQ files", mux_result.error(), files.data_files.size(), + files.delete_files.size(), files.dlq_files.size()); - [[maybe_unused]] auto _ = co_await delete_data_and_dlq_files( - _log, files); + [[maybe_unused]] auto _ = co_await delete_all_local_files(_log, files); co_return errc::file_io_error; } - co_return co_await delete_data_and_dlq_files(_log, files); + co_return co_await delete_all_local_files(_log, files); } size_t translation_task::buffered_bytes() const { diff --git a/src/v/iceberg/row_delta_action.cc b/src/v/iceberg/row_delta_action.cc index dc7d41b920836..a20d0dd54cf5c 100644 --- a/src/v/iceberg/row_delta_action.cc +++ b/src/v/iceberg/row_delta_action.cc @@ -269,10 +269,13 @@ ss::future row_delta_action::build_updates() && { } auto added_data_files_count = new_data_files_.size(); - // Validate delete files. + // Validate delete files. Equality delete files are unpartitioned per + // the Iceberg spec, so they skip partition field count validation. size_t deleted_records{0}; for (const auto& f : new_delete_files_) { - if (f.file.partition.val == nullptr) { + bool is_equality_delete = f.file.content_type + == data_file_content_type::equality_deletes; + if (!is_equality_delete && f.file.partition.val == nullptr) { vlog( log.error, "Metadata for delete file {} is missing partition key", @@ -292,7 +295,8 @@ ss::future row_delta_action::build_updates() && { if (f_num_fields != pspec->fields.size()) { vlog( log.error, - "Partition key for delete file {} has {} fields, expected {}", + "Partition key for delete file {} has {} fields, expected " + "{}", f.file.file_path, f_num_fields, pspec->fields.size()); diff --git a/src/v/model/metadata.h b/src/v/model/metadata.h index c17c02af9e5e5..e1a2085fd1dbc 100644 --- a/src/v/model/metadata.h +++ b/src/v/model/metadata.h @@ -614,7 +614,6 @@ enum class redpanda_storage_mode : uint8_t { local = 0, tiered = 1, cloud = 2, - tiered_cloud = 3, unset = 255 }; @@ -626,8 +625,6 @@ constexpr const char* redpanda_storage_mode_to_string(redpanda_storage_mode m) { return "tiered"; case redpanda_storage_mode::cloud: return "cloud"; - case redpanda_storage_mode::tiered_cloud: - return "tiered_cloud"; case redpanda_storage_mode::unset: return "unset"; } @@ -678,6 +675,7 @@ class iceberg_mode { // in the file descriptor. However these can both be overridden by the // user. value_schema_latest = 3, + debezium = 4, }; static iceberg_mode disabled; @@ -685,6 +683,8 @@ class iceberg_mode { static iceberg_mode value_schema_id_prefix; + static iceberg_mode debezium; + // Creates a new iceberg mode with the latest protobuf value kind and the // protobuf full name. static iceberg_mode value_schema_latest( @@ -761,12 +761,16 @@ class iceberg_mode { ss::sstring subject_name; bool operator==(const value_schema_latest_impl&) const = default; }; + struct debezium_impl { + bool operator==(const debezium_impl&) const = default; + }; std::variant< disabled_impl, key_value_impl, value_schema_id_prefix_impl, - value_schema_latest_impl> + value_schema_latest_impl, + debezium_impl> _impl; }; diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 0ccae01aaf682..30130382f4c39 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -612,10 +612,6 @@ redpanda_storage_mode_from_string(std::string_view s) { model::redpanda_storage_mode_to_string( model::redpanda_storage_mode::cloud), model::redpanda_storage_mode::cloud) - .match( - model::redpanda_storage_mode_to_string( - model::redpanda_storage_mode::tiered_cloud), - model::redpanda_storage_mode::tiered_cloud) .match( model::redpanda_storage_mode_to_string( model::redpanda_storage_mode::unset), @@ -658,6 +654,8 @@ iceberg_mode iceberg_mode::key_value = iceberg_mode::make(); iceberg_mode iceberg_mode::value_schema_id_prefix = iceberg_mode::make(); +iceberg_mode iceberg_mode::debezium + = iceberg_mode::make(); void write_nested(iobuf& out, const iceberg_mode& m) { using serde::write; @@ -683,7 +681,7 @@ void read_nested( case iceberg_mode::variant::value_schema_id_prefix: m = iceberg_mode::value_schema_id_prefix; return; - case iceberg_mode::variant::value_schema_latest: + case iceberg_mode::variant::value_schema_latest: { ss::sstring msg_name; read_nested(in, msg_name, bytes_left_limit); ss::sstring subject; @@ -691,6 +689,10 @@ void read_nested( m = iceberg_mode::value_schema_latest(msg_name, subject); return; } + case iceberg_mode::variant::debezium: + m = iceberg_mode::debezium; + return; + } throw serde::serde_exception( fmt::format("unknown iceberg_mode variant: {}", std::to_underlying(v))); } @@ -703,7 +705,7 @@ std::ostream& operator<<(std::ostream& os, const iceberg_mode& mode) { return os << "key_value"; case iceberg_mode::variant::value_schema_id_prefix: return os << "value_schema_id_prefix"; - case iceberg_mode::variant::value_schema_latest: + case iceberg_mode::variant::value_schema_latest: { os << "value_schema_latest"; bool delimiter = false; auto emit_delimiter = [&delimiter, &os]() { @@ -720,6 +722,9 @@ std::ostream& operator<<(std::ostream& os, const iceberg_mode& mode) { } return os; } + case iceberg_mode::variant::debezium: + return os << "debezium_schema_id_prefix"; + } } namespace { @@ -762,6 +767,8 @@ std::istream& operator>>(std::istream& is, iceberg_mode& mode) { mode = iceberg_mode::key_value; } else if (s == "value_schema_id_prefix") { mode = iceberg_mode::value_schema_id_prefix; + } else if (s == "debezium_schema_id_prefix") { + mode = iceberg_mode::debezium; } else if (s.starts_with("value_schema_latest")) { s = s.substr(std::strlen("value_schema_latest")); auto options = parse_config_options(s); diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index bee80515630c9..4f5803047f903 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -324,6 +324,21 @@ COPY --chown=0:0 tests/java/spark-iceberg-dependencies /opt/redpanda-tests/java/ COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/spark / RUN --mount=type=cache,id=dl-cache,target=/dl-cache /spark && rm /spark +################################# + +FROM java-base AS debezium-server +COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/debezium-server / +RUN --mount=type=cache,id=dl-cache,target=/dl-cache \ + /debezium-server && rm /debezium-server + +################################# + +FROM base AS postgres +COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/postgres / +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + /postgres && rm /postgres + ################################# # m2-collector creates the final .m2 cache from all images # that execute directly out of the cache, so we avoid duplicating @@ -453,6 +468,13 @@ COPY --from=flink /opt/flink/ /opt/flink/ COPY --from=iceberg-rest /opt/iceberg-rest-catalog /opt/iceberg-rest-catalog COPY --from=trino /opt/trino /opt/trino COPY --from=spark /opt/spark /opt/spark +COPY --from=debezium-server /opt/debezium-server/ /opt/debezium-server/ +COPY --from=postgres /usr/lib/postgresql/ /usr/lib/postgresql/ +COPY --from=postgres /usr/share/postgresql/ /usr/share/postgresql/ +COPY --from=postgres /etc/postgresql/ /etc/postgresql/ +COPY --from=postgres /usr/share/postgresql-common/ /usr/share/postgresql-common/ +COPY --from=postgres /usr/lib/x86_64-linux-gnu/libpq.so* /usr/lib/x86_64-linux-gnu/ +COPY --from=postgres /usr/share/zoneinfo/ /usr/share/zoneinfo/ COPY --from=m2-collector /root/.m2 /root/.m2 COPY --from=nodejs /opt/nodejs/ /opt/nodejs/ COPY --from=nodejs-test-clients /opt/redpanda-tests/nodejs /opt/redpanda-tests/nodejs diff --git a/tests/docker/ducktape-deps/debezium-server b/tests/docker/ducktape-deps/debezium-server new file mode 100644 index 0000000000000..3c403e1a70f6c --- /dev/null +++ b/tests/docker/ducktape-deps/debezium-server @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +set -e + +source "$(dirname "${BASH_SOURCE[0]}")/download-utils" + +mkdir -p /opt/debezium-server + +DEBEZIUM_VERSION=3.1.1.Final +DEBEZIUM_URL=https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/${DEBEZIUM_VERSION}/debezium-server-dist-${DEBEZIUM_VERSION}.tar.gz + +DL_STRIP=1 download_extract ${DEBEZIUM_URL} /opt/debezium-server + +mkdir -p /opt/debezium-server/data +mkdir -p /opt/debezium-server/conf + +# Add Confluent Avro converter + serializer for Schema Registry wire +# format compatibility with Redpanda's Schema Registry. +CONFLUENT_VERSION=7.6.0 +CONFLUENT_REPO=https://packages.confluent.io/maven/io/confluent + +for jar in \ + kafka-connect-avro-converter/${CONFLUENT_VERSION}/kafka-connect-avro-converter-${CONFLUENT_VERSION}.jar \ + kafka-connect-avro-data/${CONFLUENT_VERSION}/kafka-connect-avro-data-${CONFLUENT_VERSION}.jar \ + kafka-avro-serializer/${CONFLUENT_VERSION}/kafka-avro-serializer-${CONFLUENT_VERSION}.jar \ + kafka-schema-serializer/${CONFLUENT_VERSION}/kafka-schema-serializer-${CONFLUENT_VERSION}.jar \ + kafka-schema-registry-client/${CONFLUENT_VERSION}/kafka-schema-registry-client-${CONFLUENT_VERSION}.jar \ + kafka-schema-converter/${CONFLUENT_VERSION}/kafka-schema-converter-${CONFLUENT_VERSION}.jar \ + common-config/${CONFLUENT_VERSION}/common-config-${CONFLUENT_VERSION}.jar \ + common-utils/${CONFLUENT_VERSION}/common-utils-${CONFLUENT_VERSION}.jar; do + download ${CONFLUENT_REPO}/${jar} /opt/debezium-server/lib/$(basename ${jar}) +done diff --git a/tests/docker/ducktape-deps/postgres b/tests/docker/ducktape-deps/postgres new file mode 100644 index 0000000000000..80f1570a8e710 --- /dev/null +++ b/tests/docker/ducktape-deps/postgres @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -e + +apt-get update +apt-get install -y postgresql postgresql-client + +# Pre-configure for logical replication (needed by Debezium CDC) +PG_CONF=$(find /etc/postgresql -name postgresql.conf | head -1) +if [ -n "$PG_CONF" ]; then + echo "wal_level = logical" >>"$PG_CONF" + echo "max_replication_slots = 10" >>"$PG_CONF" + echo "max_wal_senders = 10" >>"$PG_CONF" +fi diff --git a/tests/rptest/services/debezium_server_service.py b/tests/rptest/services/debezium_server_service.py new file mode 100644 index 0000000000000..20d1adb61894e --- /dev/null +++ b/tests/rptest/services/debezium_server_service.py @@ -0,0 +1,146 @@ +# Copyright 2026 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import requests + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until + + +class DebeziumServerService(Service): + """Debezium Server service for ducktape tests. + + Runs a standalone Debezium Server (Quarkus-based) that captures + CDC events from a PostgreSQL source database and writes them to + Redpanda using Avro + Schema Registry serialization. + """ + + INSTALL_DIR = "/opt/debezium-server" + PERSISTENT_ROOT = "/var/lib/debezium" + LOG_FILE = f"{PERSISTENT_ROOT}/debezium.log" + HEALTH_PORT = 8080 + logs = {"debezium_logs": {"path": LOG_FILE, "collect_default": True}} + + def __init__( + self, + ctx, + redpanda, + postgres_service, + database_name="testdb", + table_include_list="public.*", + server_name="dbserver1", + num_nodes=1, + ): + super().__init__(ctx, num_nodes=num_nodes) + self.redpanda = redpanda + self.postgres = postgres_service + self.database_name = database_name + self.table_include_list = table_include_list + self.server_name = server_name + + def start_node(self, node, timeout_sec=120): + node.account.ssh(f"mkdir -p {self.PERSISTENT_ROOT}") + + pg_host = self.postgres.hostname() + bootstrap = self.redpanda.brokers() + schema_reg = self.redpanda.schema_reg() + + props = self._build_properties(pg_host, bootstrap, schema_reg) + config_path = f"{self.INSTALL_DIR}/config/application.properties" + node.account.create_file(config_path, props) + + # Detect architecture for Java path + arch = ( + node.account.ssh_output("dpkg-architecture -q DEB_BUILD_ARCH") + .decode() + .strip() + ) + java_home = f"/usr/lib/jvm/java-21-openjdk-{arch}" + + runner_jar = ( + node.account.ssh_output( + f"ls {self.INSTALL_DIR}/debezium-server-*runner.jar" + ) + .decode() + .strip() + ) + # Use -cp with lib/* glob (not -jar) so that additional JARs + # we added to lib/ (e.g. Confluent Avro serializer) are on + # the classpath. Use semicolon before &, not &&: the pattern + # `cd dir && cmd &` backgrounds the entire compound command as + # a subshell that holds the SSH channel open, while + # `cd dir; cmd &` only backgrounds cmd. + cp = f"{runner_jar}:{self.INSTALL_DIR}/config:{self.INSTALL_DIR}/lib/*" + cmd = ( + f"cd {self.INSTALL_DIR}; " + f"nohup {java_home}/bin/java -cp '{cp}'" + f" io.debezium.server.Main" + f" >> {self.LOG_FILE} 2>&1 &" + ) + self.logger.info(f"Starting Debezium with: {cmd}") + node.account.ssh(cmd) + self.logger.info("Debezium SSH command returned") + + wait_until( + lambda: self._is_ready(node), + timeout_sec=timeout_sec, + backoff_sec=2, + err_msg="Debezium Server did not become ready", + ) + + def stop_node(self, node): + node.account.ssh("pkill -f 'debezium.server.Main'", allow_fail=True) + + def clean_node(self, node): + self.stop_node(node) + node.account.ssh(f"rm -rf {self.PERSISTENT_ROOT}", allow_fail=True) + + def _is_ready(self, node): + try: + url = f"http://{node.account.hostname}:{self.HEALTH_PORT}/q/health/ready" + r = requests.get(url, timeout=5) + return r.status_code == 200 + except Exception: + return False + + def _build_properties(self, pg_host, bootstrap_servers, schema_reg_url): + first_sr = schema_reg_url.split(",")[0] + return f"""# Source: PostgreSQL +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.offset.storage.file.filename={self.PERSISTENT_ROOT}/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.database.hostname={pg_host} +debezium.source.database.port={self.postgres.PG_PORT} +debezium.source.database.user={self.postgres.DB_USER} +debezium.source.database.password={self.postgres.DB_PASSWORD} +debezium.source.database.dbname={self.database_name} +debezium.source.topic.prefix={self.server_name} +debezium.source.table.include.list={self.table_include_list} +debezium.source.plugin.name=pgoutput +debezium.source.slot.name=debezium_test +debezium.source.tombstones.on.delete=false +debezium.source.snapshot.mode=initial + +# Sink: Kafka/Redpanda +debezium.sink.type=kafka +debezium.sink.kafka.producer.bootstrap.servers={bootstrap_servers} +debezium.sink.kafka.producer.acks=all +debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer +debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer + +# Avro format via Confluent AvroConverter (produces SR wire format) +debezium.format.value=avro +debezium.format.value.schema.registry.url={first_sr} +debezium.format.key=avro +debezium.format.key.schema.registry.url={first_sr} +""" + + def topic_name(self, schema="public", table=""): + """Return the Debezium topic name for a given table.""" + return f"{self.server_name}.{schema}.{table}" diff --git a/tests/rptest/services/postgres_service.py b/tests/rptest/services/postgres_service.py new file mode 100644 index 0000000000000..32c5a6b51fe91 --- /dev/null +++ b/tests/rptest/services/postgres_service.py @@ -0,0 +1,173 @@ +# Copyright 2026 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until + + +class PostgresService(Service): + """PostgreSQL service for ducktape tests. + + Starts a PostgreSQL instance with logical replication enabled + (for Debezium CDC). Provides methods to create databases, + users, tables, and execute SQL. + """ + + PERSISTENT_ROOT = "/var/lib/postgresql-test" + LOG_FILE = "/var/lib/postgresql-test/postgresql.log" + PG_PORT = 5432 + DB_NAME = "testdb" + DB_USER = "dbz" + DB_PASSWORD = "dbz" + + def __init__(self, ctx, num_nodes=1): + super().__init__(ctx, num_nodes=num_nodes) + + def _pg_bin(self, node): + """Find the PostgreSQL binary directory.""" + result = ( + node.account.ssh_output( + "ls -d /usr/lib/postgresql/*/bin | head -1", allow_fail=True + ) + .decode() + .strip() + ) + return result if result else "/usr/lib/postgresql/16/bin" + + def _pg_cmd(self, pg_bin, cmd): + """Wrap a postgres command to run as the postgres user from /.""" + return f"cd / && sudo -u postgres {pg_bin}/{cmd}" + + def start_node(self, node, timeout_sec=60): + pg_bin = self._pg_bin(node) + + # Ensure the postgres system user exists (the Docker image copies + # binaries from a build stage but not /etc/passwd entries). + node.account.ssh( + "id -u postgres >/dev/null 2>&1 || useradd -r -m -s /bin/bash postgres", + allow_fail=True, + ) + + # Create directories needed by PostgreSQL + node.account.ssh(f"mkdir -p {self.PERSISTENT_ROOT}") + node.account.ssh( + "mkdir -p /var/run/postgresql && chown postgres:postgres /var/run/postgresql" + ) + node.account.ssh(f"chown -R postgres:postgres {self.PERSISTENT_ROOT}") + node.account.ssh(self._pg_cmd(pg_bin, f"initdb -D {self.PERSISTENT_ROOT}/data")) + + # Configure for logical replication + conf = f"{self.PERSISTENT_ROOT}/data/postgresql.conf" + node.account.ssh(f"echo 'wal_level = logical' >> {conf}") + node.account.ssh(f"echo 'max_replication_slots = 10' >> {conf}") + node.account.ssh(f"echo 'max_wal_senders = 10' >> {conf}") + node.account.ssh(f"echo \"listen_addresses = '*'\" >> {conf}") + node.account.ssh(f"echo 'port = {self.PG_PORT}' >> {conf}") + + # Allow connections from any host + hba = f"{self.PERSISTENT_ROOT}/data/pg_hba.conf" + node.account.ssh(f"echo 'host all all 0.0.0.0/0 md5' >> {hba}") + + # Start PostgreSQL + node.account.ssh( + self._pg_cmd( + pg_bin, + f"pg_ctl -D {self.PERSISTENT_ROOT}/data -l {self.LOG_FILE} start", + ) + ) + + # Wait for ready + wait_until( + lambda: self._is_ready(node), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg="PostgreSQL did not start in time", + ) + + # Create test user and database + self._exec_sql_as_postgres( + node, + f"CREATE ROLE {self.DB_USER} WITH LOGIN SUPERUSER PASSWORD " + f"'{self.DB_PASSWORD}' REPLICATION", + ) + self._exec_sql_as_postgres( + node, f"CREATE DATABASE {self.DB_NAME} OWNER {self.DB_USER}" + ) + + def stop_node(self, node): + pg_bin = self._pg_bin(node) + node.account.ssh( + self._pg_cmd( + pg_bin, + f"pg_ctl -D {self.PERSISTENT_ROOT}/data stop -m fast", + ), + allow_fail=True, + ) + + def clean_node(self, node): + self.stop_node(node) + node.account.ssh(f"rm -rf {self.PERSISTENT_ROOT}", allow_fail=True) + + def _is_ready(self, node): + try: + pg_bin = self._pg_bin(node) + result = node.account.ssh_output( + self._pg_cmd(pg_bin, f"pg_isready -p {self.PG_PORT}"), + allow_fail=True, + ).decode() + return "accepting connections" in result + except Exception: + return False + + def _exec_sql_as_postgres(self, node, sql): + pg_bin = self._pg_bin(node) + node.account.ssh( + f'cd / && sudo -u postgres {pg_bin}/psql -p {self.PG_PORT} -c "{sql}"' + ) + + def exec_sql(self, node=None, sql="", database=None): + """Execute SQL as the test user.""" + node = node or self.nodes[0] + db = database or self.DB_NAME + pg_bin = self._pg_bin(node) + node.account.ssh( + f"PGPASSWORD={self.DB_PASSWORD} {pg_bin}/psql " + f"-h localhost -p {self.PG_PORT} " + f"-U {self.DB_USER} -d {db} " + f'-c "{sql}"' + ) + + def exec_sql_output(self, node=None, sql="", database=None): + """Execute SQL and return the output.""" + node = node or self.nodes[0] + db = database or self.DB_NAME + pg_bin = self._pg_bin(node) + return ( + node.account.ssh_output( + f"PGPASSWORD={self.DB_PASSWORD} {pg_bin}/psql " + f"-h localhost -p {self.PG_PORT} " + f"-U {self.DB_USER} -d {db} -t -A " + f'-c "{sql}"' + ) + .decode() + .strip() + ) + + def hostname(self, node=None): + """Return the hostname for external connections.""" + node = node or self.nodes[0] + return node.account.hostname + + def connection_string(self, node=None): + """Return a connection string for the test database.""" + return ( + f"postgresql://{self.DB_USER}:{self.DB_PASSWORD}" + f"@{self.hostname(node)}:{self.PG_PORT}/{self.DB_NAME}" + ) diff --git a/tests/rptest/tests/datalake/debezium_cdc_e2e_test.py b/tests/rptest/tests/datalake/debezium_cdc_e2e_test.py new file mode 100644 index 0000000000000..2089683c1c577 --- /dev/null +++ b/tests/rptest/tests/datalake/debezium_cdc_e2e_test.py @@ -0,0 +1,227 @@ +# Copyright 2026 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +"""End-to-end test: PostgreSQL -> Debezium -> Redpanda -> Iceberg + +Verifies that Debezium CDC events (inserts, updates, deletes) are +correctly translated to Iceberg tables with proper upsert and delete +semantics. +""" + +from ducktape.mark import matrix +from ducktape.utils.util import wait_until + +from rptest.clients.rpk import RpkTool +from rptest.services.cluster import cluster +from rptest.services.debezium_server_service import DebeziumServerService +from rptest.services.postgres_service import PostgresService +from rptest.services.redpanda import SISettings, SchemaRegistryConfig +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.datalake.utils import supported_storage_types +from rptest.tests.redpanda_test import RedpandaTest + + +class DebeziumCdcIcebergTest(RedpandaTest): + """End-to-end test: PostgreSQL -> Debezium -> Redpanda -> Iceberg""" + + def __init__(self, test_ctx): + super().__init__( + test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx), + schema_registry_config=SchemaRegistryConfig(), + extra_rp_conf={ + "iceberg_enabled": True, + "iceberg_catalog_commit_interval_ms": 5000, + }, + ) + self.postgres = PostgresService(test_ctx) + self.debezium = None + self.dl = DatalakeServices( + self.test_context, + redpanda=self.redpanda, + include_query_engines=[QueryEngineType.SPARK], + ) + + def setUp(self): + self.postgres.start() + self.dl.setUp() + + def tearDown(self): + if self.debezium: + self.debezium.stop() + self.dl.tearDown() + self.postgres.stop() + super().tearDown() + + @cluster(num_nodes=5) + @matrix(cloud_storage_type=supported_storage_types()) + def test_debezium_cdc_to_iceberg(self, cloud_storage_type): + """Test insert, update, and delete via Debezium CDC to Iceberg.""" + # Create test table with REPLICA IDENTITY FULL + self.postgres.exec_sql( + sql="CREATE TABLE users (" + "id SERIAL PRIMARY KEY, " + "name TEXT NOT NULL, " + "email TEXT" + ")" + ) + self.postgres.exec_sql(sql="ALTER TABLE users REPLICA IDENTITY FULL") + + # Insert initial rows before starting Debezium so the snapshot + # has data to produce (which creates the Kafka topic). + self.postgres.exec_sql( + sql="INSERT INTO users (name, email) VALUES ('alice', 'alice@example.com')" + ) + self.postgres.exec_sql( + sql="INSERT INTO users (name, email) VALUES ('bob', 'bob@example.com')" + ) + self.postgres.exec_sql( + sql="INSERT INTO users (name, email) VALUES " + "('charlie', 'charlie@example.com')" + ) + self.logger.info("PostgreSQL table created with 3 initial rows") + + # Wait for Schema Registry to be ready (Debezium's Avro + # converter needs to register schemas on first produce). + def _sr_ready(): + try: + import requests as req + + r = req.get( + f"{self.redpanda.schema_reg().split(',')[0]}/subjects", + timeout=5, + ) + self.logger.debug(f"Schema Registry status: {r.status_code}") + return r.status_code == 200 + except Exception: + return False + + wait_until( + _sr_ready, + timeout_sec=30, + backoff_sec=2, + err_msg="Schema Registry not ready", + ) + self.logger.info("Schema Registry is ready") + + # Pre-create the Debezium topic since Redpanda may not have + # auto-topic creation enabled. + topic_name = "dbserver1.public.users" + rpk = RpkTool(self.redpanda) + rpk.create_topic(topic_name) + self.logger.info(f"Pre-created topic '{topic_name}'") + + # Start Debezium Server + self.debezium = DebeziumServerService( + self.test_context, + self.redpanda, + self.postgres, + database_name=PostgresService.DB_NAME, + table_include_list="public.users", + server_name="dbserver1", + ) + self.logger.info("Starting Debezium Server...") + self.debezium.start() + self.logger.info("Debezium Server started") + + # Set Iceberg mode on the pre-created topic + self.dl.set_iceberg_mode_on_topic(topic_name, "debezium") + self.logger.info(f"Set iceberg mode 'debezium' on topic '{topic_name}'") + + self.logger.info("Waiting for initial inserts to appear in Iceberg...") + + # Check that Redpanda received messages + def _topic_has_messages(): + partitions = rpk.describe_topic(topic_name) + total = sum(p.high_watermark for p in partitions) + self.logger.info(f"Topic '{topic_name}' high watermark: {total}") + return total > 0 + + wait_until( + _topic_has_messages, + timeout_sec=30, + backoff_sec=2, + err_msg="No messages appeared in Redpanda topic", + ) + + # Wait for table to appear in Iceberg catalog + self.logger.info("Waiting for Iceberg table to be created in catalog...") + + def _table_exists(): + exists = self.dl.table_exists(topic_name) + self.logger.info(f"Iceberg table exists: {exists}") + return exists + + wait_until( + _table_exists, + timeout_sec=90, + backoff_sec=5, + err_msg="Iceberg table was never created by translation", + ) + self.logger.info("Iceberg table created, checking row count...") + + # Wait for initial inserts to appear + spark = self.dl.query_engine(QueryEngineType.SPARK) + + def _inserts_visible(): + try: + count = spark.count_table("redpanda", topic_name) + self.logger.info(f"Iceberg row count: {count}") + return count >= 3 + except Exception as e: + self.logger.info(f"Iceberg query failed: {e}") + return False + + wait_until( + _inserts_visible, + timeout_sec=60, + backoff_sec=5, + err_msg="Initial inserts not visible in Iceberg", + ) + self.logger.info("All 3 inserts visible in Iceberg") + + # Update and delete + self.logger.info("Performing UPDATE (id=1) and DELETE (id=2)...") + self.postgres.exec_sql(sql="UPDATE users SET name = 'alicia' WHERE id = 1") + self.postgres.exec_sql(sql="DELETE FROM users WHERE id = 2") + self.logger.info("Waiting for CDC updates to propagate...") + + def _final_state(): + try: + rows = spark.run_query_fetch_all( + f"SELECT id, name FROM redpanda.{spark.escape_identifier(topic_name)} ORDER BY id" + ) + self.logger.info(f"Current Iceberg rows: {rows}") + if len(rows) != 2: + return False + return rows[0][1] == "alicia" and rows[1][1] == "charlie" + except Exception as e: + self.logger.info(f"Iceberg query failed: {e}") + return False + + wait_until( + _final_state, + timeout_sec=120, + backoff_sec=5, + err_msg="Final CDC state not reflected in Iceberg", + ) + + # Final verification + rows = spark.run_query_fetch_all( + f"SELECT id, name, email FROM redpanda.{spark.escape_identifier(topic_name)} ORDER BY id" + ) + assert len(rows) == 2, f"Expected 2 rows, got {len(rows)}: {rows}" + assert rows[0][0] == 1, f"Expected id=1, got {rows[0][0]}" + assert rows[0][1] == "alicia", f"Expected 'alicia', got {rows[0][1]}" + assert rows[1][0] == 3, f"Expected id=3, got {rows[1][0]}" + assert rows[1][1] == "charlie", f"Expected 'charlie', got {rows[1][1]}" + + self.logger.info(f"Debezium CDC -> Iceberg test passed: final state = {rows}")