From 9552c117334f67a7685d439b5f2a08963a80157b Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 14 Apr 2026 23:29:59 -0700 Subject: [PATCH] datalake: compile-time schema descriptors for table_definition Replace the hand-rolled schemaless_struct_type() with a compile-time schema descriptor system. The schema is defined as a type: ``` using schemaless_desc = struct_desc< field_desc<"redpanda", rp_desc>>; ``` From this single definition: - schemaless_struct_type() generates the runtime iceberg::struct_type - index_of<"field_name"> provides compile-time field indices - type_field/value_field accessors replace brittle fields[0] - rp_struct_type()/rp_struct_value() provide named access So code like this: ``` std::unique_ptr build_rp_struct( model::partition_id pid, kafka::offset o, std::optional key, model::timestamp ts, model::timestamp_type ts_t, const chunked_vector& headers) { auto system_data = std::make_unique(); system_data->fields.reserve(6); system_data->fields.emplace_back(iceberg::int_value(pid)); system_data->fields.emplace_back(iceberg::long_value(o)); // NOTE: Kafka uses milliseconds, Iceberg uses microseconds. system_data->fields.emplace_back( iceberg::timestamptz_value(ts.value() * 1000)); ... brittle, carefully ordered code ... ``` ...becomes: ``` std::unique_ptr build_rp_struct( model::partition_id pid, kafka::offset o, std::optional key, model::timestamp ts, model::timestamp_type ts_t, const chunked_vector& headers) { return rp_desc::build_value( val<"partition">(iceberg::int_value(pid)), val<"offset">(iceberg::long_value(o)), val<"timestamp">(iceberg::timestamptz_value(ts.value() * 1000)), ``` and the definitions of compile-time schemas becomes more crisply defined as well. --- src/v/datalake/BUILD | 5 + src/v/datalake/record_translator.cc | 68 ++-------- src/v/datalake/schema_descriptor.h | 201 ++++++++++++++++++++++++++++ src/v/datalake/table_definition.cc | 91 +++++++------ src/v/datalake/table_definition.h | 62 ++++++++- 5 files changed, 324 insertions(+), 103 deletions(-) create mode 100644 src/v/datalake/schema_descriptor.h diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index 2a779af61b855..273e71b148240 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -238,11 +238,16 @@ redpanda_cc_library( "table_definition.cc", ], hdrs = [ + "schema_descriptor.h", "table_definition.h", ], visibility = [":__subpackages__"], deps = [ + "//src/v/iceberg:datatypes", "//src/v/iceberg:schema", + "//src/v/iceberg:values", + "//src/v/model", + "@seastar", ], ) diff --git a/src/v/datalake/record_translator.cc b/src/v/datalake/record_translator.cc index f090c2e8fdf4f..d89adb6d1edda 100644 --- a/src/v/datalake/record_translator.cc +++ b/src/v/datalake/record_translator.cc @@ -71,56 +71,6 @@ std::optional get_redpanda_idx(const iceberg::struct_type& val_type) { return std::nullopt; } -// Builds a struct value meant to be used as the base of the "redpanda" struct. -// Additional fields specific to the mode (e.g. "value" for key-value mode) may -// be appended to the end. -std::unique_ptr build_rp_struct( - model::partition_id pid, - kafka::offset o, - std::optional key, - model::timestamp ts, - model::timestamp_type ts_t, - const chunked_vector& headers) { - auto system_data = std::make_unique(); - system_data->fields.reserve(6); - - system_data->fields.emplace_back(iceberg::int_value(pid)); - system_data->fields.emplace_back(iceberg::long_value(o)); - // NOTE: Kafka uses milliseconds, Iceberg uses microseconds. - system_data->fields.emplace_back( - iceberg::timestamptz_value(ts.value() * 1000)); - - if (headers.empty()) { - system_data->fields.emplace_back(std::nullopt); - } else { - auto headers_list = std::make_unique(); - for (const auto& hdr : headers) { - auto header_kv_struct = std::make_unique(); - header_kv_struct->fields.emplace_back( - hdr.key_size() >= 0 ? std::make_optional( - iceberg::string_value(hdr.key().copy())) - : std::nullopt); - header_kv_struct->fields.emplace_back( - hdr.value_size() >= 0 - ? std::make_optional( - iceberg::binary_value(hdr.value().copy())) - : std::nullopt); - headers_list->elements.emplace_back(std::move(header_kv_struct)); - } - system_data->fields.emplace_back(std::move(headers_list)); - } - - system_data->fields.emplace_back( - key ? std::make_optional( - iceberg::binary_value(std::move(*key))) - : std::nullopt); - - system_data->fields.emplace_back( - iceberg::int_value{static_cast(ts_t)}); - - return system_data; -} - } // namespace std::ostream& operator<<(std::ostream& o, const record_translator::errc& e) { @@ -177,7 +127,10 @@ key_value_translator::build_type(std::optional) { auto ret_type = schemaless_struct_type(); ret_type.fields.emplace_back( iceberg::nested_field::create( - 11, "value", iceberg::field_required::no, iceberg::binary_type{})); + schemaless_next_field_id, + "value", + iceberg::field_required::no, + iceberg::binary_type{})); return record_type{ .comps = record_schema_components{ .key_identifier = std::nullopt, @@ -251,12 +204,14 @@ record_type structured_data_translator::build_type( if (field->name == rp_struct_name) { // To avoid collisions, move user fields named "redpanda" into // the nested "redpanda" system field. - auto& system_fields = std::get( - ret_type.fields[0]->type); + auto& system_fields = rp_struct_type(ret_type); // Use the next id of the system defaults. system_fields.fields.emplace_back( iceberg::nested_field::create( - 10, "data", field->required, std::move(field->type))); + schemaless_next_field_id, + "data", + field->required, + std::move(field->type))); continue; } // Add the extra user-defined fields. @@ -320,10 +275,7 @@ structured_data_translator::translate_data( if (redpanda_field_idx == i) { // To avoid collisions, move user fields named "redpanda" into // the nested "redpanda" system field. - auto& system_vals - = std::get>( - ret_data.fields[0].value()); - system_vals->fields.emplace_back(std::move(field)); + rp_struct_value(ret_data).fields.emplace_back(std::move(field)); continue; } ret_data.fields.emplace_back(std::move(field)); diff --git a/src/v/datalake/schema_descriptor.h b/src/v/datalake/schema_descriptor.h new file mode 100644 index 0000000000000..886927b4b9d68 --- /dev/null +++ b/src/v/datalake/schema_descriptor.h @@ -0,0 +1,201 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "iceberg/datatypes.h" +#include "iceberg/values.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace datalake { + +/// A compile-time string usable as a template parameter (C++20 NTTP). +template +struct ct_string { + char data[N]{}; + constexpr ct_string(const char (&s)[N]) { std::copy_n(s, N, data); } + constexpr operator std::string_view() const { return {data, N - 1}; } + constexpr auto operator<=>(const ct_string&) const = default; +}; + +/// A field descriptor: compile-time name + iceberg type. +template +struct field_desc { + static constexpr std::string_view name{Name}; + using type = T; +}; + +/// Forward declarations. +template +struct struct_desc; + +template +struct list_desc; + +/// A nested schema descriptor exposes build_impl(int&) to contribute its +/// subtree to the enclosing iceberg::struct_type. Leaf types (e.g. +/// iceberg::int_type) do not satisfy this and are default-constructed +/// as field types instead. +template +concept nested_schema_desc = requires(int& next_id) { + { T::build_impl(next_id) }; +}; + +/// Find the index of a field by name. Fails to compile if not found. +template +consteval size_t index_of_fn() { + constexpr std::array names = {Fields::name...}; + for (size_t i = 0; i < names.size(); ++i) { + if (names[i] == std::string_view{Name}) { + return i; + } + } + // If we reach here, the name was not found. This is consteval, + // so the compiler will reject it with an error pointing here. + throw "field name not found in descriptor"; +} + +/// Check that given names match field names in order. +/// Uses an array of field names passed directly to avoid partial +/// specialization. +template +consteval bool +names_match_fn(std::array field_names) { + constexpr std::array given = {std::string_view{Names}...}; + for (size_t i = 0; i < field_names.size(); ++i) { + if (field_names[i] != given[i]) { + return false; + } + } + return true; +} + +/// A named value for use with struct_desc::build_value. The name is +/// checked at compile time to match the corresponding field descriptor. +template +struct val { + std::optional v; + + template + requires std::constructible_from, U> + val(U&& u) // NOLINT(google-explicit-constructor) + : v(std::forward(u)) {} +}; + +template +struct struct_desc { + static constexpr size_t count = sizeof...(Fields); + + /// Compile-time index of a field by name. + template + static constexpr size_t index_of = index_of_fn(); + + /// Total number of fields in the tree (used for ID assignment). + static int total_fields() { + int next_id = 1; + build_impl(next_id); + return next_id; + } + + /// Build the runtime iceberg::struct_type. + static iceberg::struct_type build() { + int next_id = 1; + return build_impl(next_id); + } + + static iceberg::struct_type build_impl(int& next_id) { + iceberg::struct_type st; + auto add = [&]() { + int my_id = next_id++; + st.fields.push_back( + iceberg::nested_field::create( + my_id, + ss::sstring{F::name}, + iceberg::field_required::no, + build_iceberg_type(next_id))); + }; + (add.template operator()(), ...); + return st; + } + + /// Build a struct_value with compile-time name and arity checking. + /// Each argument is a val<"field_name"> matching the descriptor. + /// + /// Wrong arity, wrong name, or wrong order = compile error. + template + static std::unique_ptr + build_value(val... args) { + static_assert(sizeof...(Names) == count, "wrong number of fields"); + static_assert( + names_match_fn({Fields::name...}), + "field names don't match descriptor or are in wrong order"); + auto sv = std::make_unique(); + sv->fields.reserve(count); + (sv->fields.emplace_back(std::move(args.v)), ...); + return sv; + } + +private: + template + static auto build_iceberg_type(int& next_id) { + if constexpr (nested_schema_desc) { + return F::type::build_impl(next_id); + } else { + return typename F::type{}; + } + } +}; + +template +struct list_desc; + +template +struct list_desc> { + static iceberg::list_type build_impl(int& next_id) { + int element_id = next_id++; + return iceberg::list_type::create( + element_id, + iceberg::field_required::no, + struct_desc::build_impl(next_id)); + } +}; + +template +iceberg::nested_field& type_field(iceberg::struct_type& st) { + return *st.fields[Desc::template index_of]; +} + +template +const iceberg::nested_field& type_field(const iceberg::struct_type& st) { + return *st.fields[Desc::template index_of]; +} + +template +std::optional& value_field(iceberg::struct_value& sv) { + return sv.fields[Desc::template index_of]; +} + +template +const std::optional& +value_field(const iceberg::struct_value& sv) { + return sv.fields[Desc::template index_of]; +} + +} // namespace datalake diff --git a/src/v/datalake/table_definition.cc b/src/v/datalake/table_definition.cc index 63aac59c9d282..9d175367b7621 100644 --- a/src/v/datalake/table_definition.cc +++ b/src/v/datalake/table_definition.cc @@ -10,52 +10,61 @@ #include "datalake/table_definition.h" namespace datalake { -using namespace iceberg; -struct_type schemaless_struct_type() { - using namespace iceberg; - struct_type system_fields; - system_fields.fields.emplace_back( - nested_field::create(2, "partition", field_required::no, int_type{})); - system_fields.fields.emplace_back( - nested_field::create(3, "offset", field_required::no, long_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 4, "timestamp", field_required::no, timestamptz_type{})); - - struct_type headers_kv; - headers_kv.fields.emplace_back( - nested_field::create(7, "key", field_required::no, string_type{})); - headers_kv.fields.emplace_back( - nested_field::create(8, "value", field_required::no, binary_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 5, - "headers", - field_required::no, - list_type::create(6, field_required::no, std::move(headers_kv)))); - - system_fields.fields.emplace_back( - nested_field::create(9, "key", field_required::no, binary_type{})); - system_fields.fields.emplace_back( - nested_field::create( - 10, "timestamp_type", field_required::no, int_type{})); - struct_type res; - res.fields.emplace_back( - nested_field::create( - 1, - ss::sstring{rp_struct_name}, - field_required::no, - std::move(system_fields))); - - return res; + +iceberg::struct_type schemaless_struct_type() { + return schemaless_desc::build(); } -schema default_schema() { - return { +iceberg::schema default_schema() { + return iceberg::schema{ .schema_struct = schemaless_struct_type(), - .schema_id = iceberg::schema::id_t{0}, + .schema_id = iceberg::schema::default_id, .identifier_field_ids = {}, }; } +namespace { + +std::optional +build_headers_value(const chunked_vector& headers) { + if (headers.empty()) { + return std::nullopt; + } + auto hdr_list = std::make_unique(); + for (const auto& hdr : headers) { + auto kv = header_kv_desc::build_value( + val<"key">( + hdr.key_size() >= 0 ? std::make_optional( + iceberg::string_value(hdr.key().copy())) + : std::nullopt), + val<"value">( + hdr.value_size() >= 0 ? std::make_optional( + iceberg::binary_value(hdr.value().copy())) + : std::nullopt)); + hdr_list->elements.emplace_back(std::move(kv)); + } + return hdr_list; +} + +} // namespace + +std::unique_ptr build_rp_struct( + model::partition_id pid, + kafka::offset o, + std::optional key, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers) { + return rp_desc::build_value( + val<"partition">(iceberg::int_value(pid)), + val<"offset">(iceberg::long_value(o)), + val<"timestamp">(iceberg::timestamptz_value(ts.value() * 1000)), + val<"headers">(build_headers_value(headers)), + val<"key">( + key ? std::make_optional( + iceberg::binary_value(std::move(*key))) + : std::nullopt), + val<"timestamp_type">(iceberg::int_value{static_cast(ts_t)})); +} + } // namespace datalake diff --git a/src/v/datalake/table_definition.h b/src/v/datalake/table_definition.h index 6eb8f70791f01..769debbc748d7 100644 --- a/src/v/datalake/table_definition.h +++ b/src/v/datalake/table_definition.h @@ -9,16 +9,70 @@ */ #pragma once +#include "datalake/schema_descriptor.h" #include "iceberg/schema.h" +#include "model/fundamental.h" +#include "model/record.h" +#include "model/timestamp.h" namespace datalake { -// Definitions for default table metadata. +// The schema is defined once here as a type. All field ordering, +// naming, and type information is derived from this single source. +// Adding, removing, or reordering fields here automatically updates: +// - schemaless_struct_type() (runtime struct_type) +// - build_rp_struct() (runtime struct_value) +// - All typed field accessors -// Contains some minimal fields used for all tables, even those with no schemas. -// TODO: rename to redpanda_fields_struct_type? +/// Header key/value struct inside the headers list. +using header_kv_desc = struct_desc< + field_desc<"key", iceberg::string_type>, + field_desc<"value", iceberg::binary_type>>; + +/// The redpanda system struct. +using rp_desc = struct_desc< + field_desc<"partition", iceberg::int_type>, + field_desc<"offset", iceberg::long_type>, + field_desc<"timestamp", iceberg::timestamptz_type>, + field_desc<"headers", list_desc>, + field_desc<"key", iceberg::binary_type>, + field_desc<"timestamp_type", iceberg::int_type>>; + +/// The top-level schemaless table struct. +using schemaless_desc = struct_desc>; + +inline constexpr std::string_view rp_struct_name = "redpanda"; + +/// Next available pre-assignment field ID after the schemaless struct. +/// Translators that add fields should start IDs from here. +inline const int schemaless_next_field_id = schemaless_desc::total_fields(); + +/// Build the runtime struct_type from the compile-time descriptor. iceberg::struct_type schemaless_struct_type(); + +/// Build the default iceberg schema. iceberg::schema default_schema(); -inline constexpr std::string_view rp_struct_name = "redpanda"; + +/// Build the redpanda system struct_value. Single definition used +/// by all translators. +std::unique_ptr build_rp_struct( + model::partition_id pid, + kafka::offset o, + std::optional key, + model::timestamp ts, + model::timestamp_type ts_t, + const chunked_vector& headers); + +/// Get the redpanda struct_type from a schemaless struct_type. +inline iceberg::struct_type& rp_struct_type(iceberg::struct_type& schemaless) { + return std::get( + type_field(schemaless).type); +} + +/// Get the redpanda struct_value from a data row. +inline iceberg::struct_value& rp_struct_value(iceberg::struct_value& row) { + return *std::get>( + value_field(row).value()); +} } // namespace datalake