Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v/cluster/metrics_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ metrics_reporter::build_metrics_snapshot() {
case model::iceberg_mode::variant::value_schema_latest:
++snapshot.topics_with_iceberg_schema_latest;
break;
case model::iceberg_mode::variant::debezium:
++snapshot.topics_with_iceberg_debezium;
break;
}
}

Expand Down Expand Up @@ -717,6 +720,8 @@ void rjson_serialize(
w.Uint64(snapshot.topics_with_iceberg_schema_id);
w.Key("topics_with_iceberg_latest_protobuf_value");
w.Uint64(snapshot.topics_with_iceberg_schema_latest);
w.Key("topics_with_iceberg_debezium");
w.Uint64(snapshot.topics_with_iceberg_debezium);

w.Key("partition_count");
w.Uint64(snapshot.partition_count);
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/metrics_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class metrics_reporter {
uint32_t topics_with_iceberg_kv{0};
uint32_t topics_with_iceberg_schema_id{0};
uint32_t topics_with_iceberg_schema_latest{0};
uint32_t topics_with_iceberg_debezium{0};

cluster_version active_logical_version{invalid_version};
cluster_version original_logical_version{invalid_version};
Expand Down
40 changes: 40 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ redpanda_cc_library(
":backlog_controller",
":catalog_schema_manager",
":cloud_data_io",
":debezium_translator",
":location",
":logger",
":record_translator",
Expand Down Expand Up @@ -238,11 +239,16 @@ redpanda_cc_library(
"table_definition.cc",
],
hdrs = [
"schema_descriptor.h",
"table_definition.h",
],
visibility = [":__subpackages__"],
deps = [
"//src/v/iceberg:datatypes",
"//src/v/iceberg:schema",
"//src/v/iceberg:values",
"//src/v/model",
"@seastar",
],
)

Expand Down Expand Up @@ -409,6 +415,40 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "debezium_translator",
srcs = [
"debezium_translator.cc",
],
hdrs = [
"debezium_translator.h",
],
implementation_deps = [
":logger",
":table_definition",
"//src/v/iceberg:avro_utils",
"//src/v/iceberg:compatibility_utils",
"//src/v/iceberg:values",
"//src/v/iceberg/conversion:conversion_outcome",
"//src/v/iceberg/conversion:values_avro",
"//src/v/iceberg/conversion:values_json",
"//src/v/iceberg/conversion:values_protobuf",
"@abseil-cpp//absl/container:flat_hash_set",
"@avro",
"@protobuf",
"@seastar",
],
visibility = [":__subpackages__"],
deps = [
":record_schema_resolver",
":record_translator",
"//src/v/base",
"//src/v/iceberg:datatypes",
"//src/v/metrics",
"//src/v/model",
],
)

redpanda_cc_library(
name = "local_parquet_file_writer",
srcs = [
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ redpanda_cc_library(
"//src/v/container:chunked_vector",
"//src/v/serde",
"//src/v/serde:bytes",
"//src/v/serde:iobuf",
"//src/v/serde:optional",
"@fmt",
"@seastar",
],
Expand Down
9 changes: 9 additions & 0 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "container/chunked_vector.h"
#include "datalake/catalog_schema_manager.h"
#include "datalake/coordinator/state_update.h"
#include "datalake/debezium_translator.h"
#include "datalake/logger.h"
#include "datalake/partition_spec_parser.h"
#include "datalake/record_translator.h"
Expand Down Expand Up @@ -441,6 +442,14 @@ struct coordinator::main_table_schema_provider
val_type = std::move(type_res.value());
}

if (comps.is_debezium) {
if (!val_type.has_value()) {
co_return errc::failed;
}
auto& envelope = std::get<iceberg::struct_type>(
val_type.value()->type);
co_return debezium_envelope_to_table_type(envelope);
}
auto record_type = default_translator{}.build_type(std::move(val_type));
co_return std::move(record_type.type);
}
Expand Down
15 changes: 12 additions & 3 deletions src/v/datalake/coordinator/data_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "container/chunked_vector.h"
#include "serde/envelope.h"
#include "serde/rw/bytes.h"
#include "serde/rw/optional.h"

#include <seastar/core/sstring.hh>

Expand All @@ -23,7 +24,7 @@ namespace datalake::coordinator {

// Represents a file that exists in object storage.
struct data_file
: serde::envelope<data_file, serde::version<1>, serde::compat_version<0>> {
: serde::envelope<data_file, serde::version<2>, serde::compat_version<0>> {
auto serde_fields() {
return std::tie(
remote_path,
Expand All @@ -32,7 +33,9 @@ struct data_file
hour_deprecated,
table_schema_id,
partition_spec_id,
partition_key);
partition_key,
delete_key_field_ids,
is_delete);
}
ss::sstring remote_path = "";
size_t row_count = 0;
Expand All @@ -47,7 +50,8 @@ struct data_file
// single-value serialization" (see iceberg/values_bytes.h).
// Nulls are represented by std::nullopt.
chunked_vector<std::optional<bytes>> partition_key;
// TODO: add kafka schema id
std::optional<chunked_vector<int32_t>> delete_key_field_ids;
bool is_delete{false};

data_file copy() const {
return {
Expand All @@ -58,6 +62,11 @@ struct data_file
.table_schema_id = table_schema_id,
.partition_spec_id = partition_spec_id,
.partition_key = partition_key.copy(),
.delete_key_field_ids = delete_key_field_ids.has_value()
? std::make_optional(
delete_key_field_ids->copy())
: std::nullopt,
.is_delete = is_delete,
};
}

Expand Down
121 changes: 99 additions & 22 deletions src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iceberg/transaction.h"
#include "iceberg/values.h"
#include "iceberg/values_bytes.h"
#include "ssx/future-util.h"
#include "storage/api.h"

#include <exception>
Expand Down Expand Up @@ -275,19 +276,30 @@ class table_commit_builder {
table_commit_offset_);
} else {
for (const auto& f : files) {
bool is_equality_delete = f.is_delete;
auto pk = build_partition_key(topic, table_, f);
if (pk.has_error()) {
return pk.error();
}

iceberg::data_file file{
.content_type = iceberg::data_file_content_type::data,
.content_type
= is_equality_delete
? iceberg::data_file_content_type::equality_deletes
: iceberg::data_file_content_type::data,
.file_path = io.to_uri(std::filesystem::path(f.remote_path)),
.file_format = iceberg::data_file_format::parquet,
.partition = std::move(pk.value()),
.record_count = f.row_count,
.file_size_bytes = f.file_size_bytes,
};
if (is_equality_delete && f.delete_key_field_ids) {
chunked_vector<iceberg::nested_field::id_t> eq_ids;
for (auto id : *f.delete_key_field_ids) {
eq_ids.push_back(iceberg::nested_field::id_t{id});
}
file.equality_ids = std::move(eq_ids);
}
// For files created by a legacy Redpanda version that only
// supported hourly partitioning, choose current schema and
// spec ids.
Expand All @@ -298,12 +310,23 @@ class table_commit_builder {
= f.partition_spec_id >= 0
? iceberg::partition_spec::id_t{f.partition_spec_id}
: table_.default_spec_id;
icb_files_.push_back(
iceberg::file_to_append{
.file = std::move(file),
.schema_id = schema_id,
.partition_spec_id = pspec_id,
});
auto fta = iceberg::file_to_append{
.file = std::move(file),
.schema_id = schema_id,
.partition_spec_id = pspec_id,
};
if (f.delete_key_field_ids) {
if (!key_field_ids_) {
key_field_ids_.emplace();
for (auto id : *f.delete_key_field_ids) {
key_field_ids_->push_back(
iceberg::nested_field::id_t{id});
}
}
upsert_files_.push_back(std::move(fta));
} else {
icb_files_.push_back(std::move(fta));
}
}
}

Expand All @@ -319,8 +342,7 @@ class table_commit_builder {
model::revision_id topic_revision,
iceberg::catalog& catalog,
iceberg::manifest_io& io) && {
if (icb_files_.empty()) {
// No new files to commit.
if (icb_files_.empty() && upsert_files_.empty()) {
vlog(
datalake_log.debug,
"All committed files were deduplicated for topic {} revision {}, "
Expand All @@ -340,8 +362,9 @@ class table_commit_builder {

vlog(
datalake_log.debug,
"Adding {} files to Iceberg table {}",
"Adding {} data files and {} upsert files to Iceberg table {}",
icb_files_.size(),
upsert_files_.size(),
table_id_);
// NOTE: a non-expiring tag is added to the new snapshot to ensure that
// snapshot expiration doesn't clear this snapshot and its commit
Expand All @@ -355,18 +378,37 @@ class table_commit_builder {
std::numeric_limits<int64_t>::max())
: std::nullopt;
iceberg::transaction txn(std::move(table_));
auto icb_append_res = co_await txn.merge_append(
io,
std::move(icb_files_),
{{commit_meta_prop, to_json_str(commit_meta)}},
std::move(tag_name),
tag_expiry_ms);
if (icb_append_res.has_error()) {
co_return log_and_convert_action_errc(
icb_append_res.error(),
fmt::format(
"Iceberg merge append failed for table {}", table_id_));

if (!key_field_ids_) {
auto icb_append_res = co_await txn.merge_append(
io,
std::move(icb_files_),
{{commit_meta_prop, to_json_str(commit_meta)}},
std::move(tag_name),
tag_expiry_ms);
if (icb_append_res.has_error()) {
co_return log_and_convert_action_errc(
icb_append_res.error(),
fmt::format(
"Iceberg merge append failed for table {}", table_id_));
}
} else {
auto merge_delta_res = commit_merge_delta();
auto row_delta_res = co_await txn.row_delta(
io,
std::move(merge_delta_res.data_files),
std::move(merge_delta_res.delete_files),
{{commit_meta_prop, to_json_str(commit_meta)}},
std::move(tag_name),
tag_expiry_ms);
if (row_delta_res.has_error()) {
co_return log_and_convert_action_errc(
row_delta_res.error(),
fmt::format(
"Iceberg row delta failed for table {}", table_id_));
}
}

auto icb_commit_res = co_await catalog.commit_txn(
table_id_, std::move(txn));
if (icb_commit_res.has_error()) {
Expand All @@ -379,9 +421,42 @@ class table_commit_builder {
co_return std::move(icb_commit_res.value());
}

size_t num_files() const noexcept { return icb_files_.size(); }
size_t num_files() const noexcept {
return icb_files_.size() + upsert_files_.size();
}

private:
struct merge_delta_result {
chunked_vector<iceberg::file_to_append> data_files;
chunked_vector<iceberg::file_to_delete> delete_files;
};

merge_delta_result commit_merge_delta() {
chunked_vector<iceberg::file_to_append> all_data;
chunked_vector<iceberg::file_to_delete> deletes;
for (auto& f : icb_files_) {
all_data.push_back(std::move(f));
}
for (auto& f : upsert_files_) {
if (
f.file.content_type
== iceberg::data_file_content_type::equality_deletes) {
deletes.push_back(
iceberg::file_to_delete{
.file = std::move(f.file),
.schema_id = f.schema_id,
.partition_spec_id = f.partition_spec_id,
});
} else {
all_data.push_back(std::move(f));
}
}
return merge_delta_result{
.data_files = std::move(all_data),
.delete_files = std::move(deletes),
};
}

table_commit_builder(
const model::cluster_uuid& cluster,
iceberg::table_identifier table_id,
Expand Down Expand Up @@ -409,6 +484,8 @@ class table_commit_builder {

// State accumulated.
chunked_vector<iceberg::file_to_append> icb_files_;
chunked_vector<iceberg::file_to_append> upsert_files_;
std::optional<chunked_vector<iceberg::nested_field::id_t>> key_field_ids_;
std::optional<model::offset> new_committed_offset_;
};

Expand Down
Loading
Loading