Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,16 @@ redpanda_cc_library(
"table_definition.cc",
],
hdrs = [
"schema_descriptor.h",
"table_definition.h",
],
visibility = [":__subpackages__"],
deps = [
"//src/v/iceberg:datatypes",
"//src/v/iceberg:schema",
"//src/v/iceberg:values",
"//src/v/model",
"@seastar",
],
)

Expand Down
68 changes: 10 additions & 58 deletions src/v/datalake/record_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,56 +71,6 @@ std::optional<size_t> get_redpanda_idx(const iceberg::struct_type& val_type) {
return std::nullopt;
}

// Builds a struct value meant to be used as the base of the "redpanda" struct.
// Additional fields specific to the mode (e.g. "value" for key-value mode) may
// be appended to the end.
std::unique_ptr<iceberg::struct_value> build_rp_struct(
model::partition_id pid,
kafka::offset o,
std::optional<iobuf> key,
model::timestamp ts,
model::timestamp_type ts_t,
const chunked_vector<model::record_header>& headers) {
auto system_data = std::make_unique<iceberg::struct_value>();
system_data->fields.reserve(6);

system_data->fields.emplace_back(iceberg::int_value(pid));
system_data->fields.emplace_back(iceberg::long_value(o));
// NOTE: Kafka uses milliseconds, Iceberg uses microseconds.
system_data->fields.emplace_back(
iceberg::timestamptz_value(ts.value() * 1000));

if (headers.empty()) {
system_data->fields.emplace_back(std::nullopt);
} else {
auto headers_list = std::make_unique<iceberg::list_value>();
for (const auto& hdr : headers) {
auto header_kv_struct = std::make_unique<iceberg::struct_value>();
header_kv_struct->fields.emplace_back(
hdr.key_size() >= 0 ? std::make_optional<iceberg::value>(
iceberg::string_value(hdr.key().copy()))
: std::nullopt);
header_kv_struct->fields.emplace_back(
hdr.value_size() >= 0
? std::make_optional<iceberg::value>(
iceberg::binary_value(hdr.value().copy()))
: std::nullopt);
headers_list->elements.emplace_back(std::move(header_kv_struct));
}
system_data->fields.emplace_back(std::move(headers_list));
}

system_data->fields.emplace_back(
key ? std::make_optional<iceberg::value>(
iceberg::binary_value(std::move(*key)))
: std::nullopt);

system_data->fields.emplace_back(
iceberg::int_value{static_cast<int32_t>(ts_t)});

return system_data;
}

} // namespace

std::ostream& operator<<(std::ostream& o, const record_translator::errc& e) {
Expand Down Expand Up @@ -177,7 +127,10 @@ key_value_translator::build_type(std::optional<shared_resolved_type_t>) {
auto ret_type = schemaless_struct_type();
ret_type.fields.emplace_back(
iceberg::nested_field::create(
11, "value", iceberg::field_required::no, iceberg::binary_type{}));
schemaless_next_field_id,
"value",
iceberg::field_required::no,
iceberg::binary_type{}));
return record_type{
.comps = record_schema_components{
.key_identifier = std::nullopt,
Expand Down Expand Up @@ -251,12 +204,14 @@ record_type structured_data_translator::build_type(
if (field->name == rp_struct_name) {
// To avoid collisions, move user fields named "redpanda" into
// the nested "redpanda" system field.
auto& system_fields = std::get<iceberg::struct_type>(
ret_type.fields[0]->type);
auto& system_fields = rp_struct_type(ret_type);
// Use the next id of the system defaults.
system_fields.fields.emplace_back(
iceberg::nested_field::create(
10, "data", field->required, std::move(field->type)));
schemaless_next_field_id,
"data",
field->required,
std::move(field->type)));
continue;
}
// Add the extra user-defined fields.
Expand Down Expand Up @@ -320,10 +275,7 @@ structured_data_translator::translate_data(
if (redpanda_field_idx == i) {
// To avoid collisions, move user fields named "redpanda" into
// the nested "redpanda" system field.
auto& system_vals
= std::get<std::unique_ptr<iceberg::struct_value>>(
ret_data.fields[0].value());
system_vals->fields.emplace_back(std::move(field));
rp_struct_value(ret_data).fields.emplace_back(std::move(field));
continue;
}
ret_data.fields.emplace_back(std::move(field));
Expand Down
201 changes: 201 additions & 0 deletions src/v/datalake/schema_descriptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "iceberg/datatypes.h"
#include "iceberg/values.h"

#include <seastar/core/sstring.hh>

#include <algorithm>
#include <array>
#include <compare>
#include <concepts>
#include <cstddef>
Comment thread
andrwng marked this conversation as resolved.
#include <optional>
#include <string_view>
#include <type_traits>
#include <utility>

namespace datalake {

/// A compile-time string usable as a template parameter (C++20 NTTP).
template<size_t N>
struct ct_string {
char data[N]{};
constexpr ct_string(const char (&s)[N]) { std::copy_n(s, N, data); }
constexpr operator std::string_view() const { return {data, N - 1}; }
constexpr auto operator<=>(const ct_string&) const = default;
};

/// A field descriptor: compile-time name + iceberg type.
template<ct_string Name, typename T>
struct field_desc {
static constexpr std::string_view name{Name};
using type = T;
};

/// Forward declarations.
template<typename... Fields>
struct struct_desc;

template<typename ElementDesc>
struct list_desc;

/// A nested schema descriptor exposes build_impl(int&) to contribute its
/// subtree to the enclosing iceberg::struct_type. Leaf types (e.g.
/// iceberg::int_type) do not satisfy this and are default-constructed
/// as field types instead.
template<typename T>
concept nested_schema_desc = requires(int& next_id) {
{ T::build_impl(next_id) };
};

/// Find the index of a field by name. Fails to compile if not found.
template<ct_string Name, typename... Fields>
consteval size_t index_of_fn() {
constexpr std::array names = {Fields::name...};
for (size_t i = 0; i < names.size(); ++i) {
if (names[i] == std::string_view{Name}) {
return i;
}
}
// If we reach here, the name was not found. This is consteval,
// so the compiler will reject it with an error pointing here.
throw "field name not found in descriptor";
}

/// Check that given names match field names in order.
/// Uses an array of field names passed directly to avoid partial
/// specialization.
template<ct_string... Names>
consteval bool
names_match_fn(std::array<std::string_view, sizeof...(Names)> field_names) {
constexpr std::array given = {std::string_view{Names}...};
for (size_t i = 0; i < field_names.size(); ++i) {
if (field_names[i] != given[i]) {
return false;
}
}
return true;
}

/// A named value for use with struct_desc::build_value. The name is
/// checked at compile time to match the corresponding field descriptor.
template<ct_string Name>
struct val {
std::optional<iceberg::value> v;

template<typename U>
requires std::constructible_from<std::optional<iceberg::value>, U>
val(U&& u) // NOLINT(google-explicit-constructor)
: v(std::forward<U>(u)) {}
};

template<typename... Fields>
struct struct_desc {
static constexpr size_t count = sizeof...(Fields);

/// Compile-time index of a field by name.
template<ct_string Name>
static constexpr size_t index_of = index_of_fn<Name, Fields...>();

/// Total number of fields in the tree (used for ID assignment).
static int total_fields() {
int next_id = 1;
build_impl(next_id);
return next_id;
}

/// Build the runtime iceberg::struct_type.
static iceberg::struct_type build() {
int next_id = 1;
return build_impl(next_id);
}

static iceberg::struct_type build_impl(int& next_id) {
iceberg::struct_type st;
auto add = [&]<typename F>() {
int my_id = next_id++;
st.fields.push_back(
iceberg::nested_field::create(
my_id,
ss::sstring{F::name},
iceberg::field_required::no,
build_iceberg_type<F>(next_id)));
};
(add.template operator()<Fields>(), ...);
return st;
}

/// Build a struct_value with compile-time name and arity checking.
/// Each argument is a val<"field_name"> matching the descriptor.
///
/// Wrong arity, wrong name, or wrong order = compile error.
template<ct_string... Names>
static std::unique_ptr<iceberg::struct_value>
build_value(val<Names>... args) {
static_assert(sizeof...(Names) == count, "wrong number of fields");
static_assert(
names_match_fn<Names...>({Fields::name...}),
"field names don't match descriptor or are in wrong order");
auto sv = std::make_unique<iceberg::struct_value>();
sv->fields.reserve(count);
(sv->fields.emplace_back(std::move(args.v)), ...);
return sv;
}

private:
template<typename F>
static auto build_iceberg_type(int& next_id) {
if constexpr (nested_schema_desc<typename F::type>) {
return F::type::build_impl(next_id);
} else {
return typename F::type{};
}
}
};

template<typename ElementDesc>
struct list_desc;

template<typename... Fields>
struct list_desc<struct_desc<Fields...>> {
static iceberg::list_type build_impl(int& next_id) {
int element_id = next_id++;
return iceberg::list_type::create(
element_id,
iceberg::field_required::no,
struct_desc<Fields...>::build_impl(next_id));
}
};

template<typename Desc, ct_string Name>
iceberg::nested_field& type_field(iceberg::struct_type& st) {
return *st.fields[Desc::template index_of<Name>];
}

template<typename Desc, ct_string Name>
const iceberg::nested_field& type_field(const iceberg::struct_type& st) {
return *st.fields[Desc::template index_of<Name>];
}

template<typename Desc, ct_string Name>
std::optional<iceberg::value>& value_field(iceberg::struct_value& sv) {
return sv.fields[Desc::template index_of<Name>];
}

template<typename Desc, ct_string Name>
const std::optional<iceberg::value>&
value_field(const iceberg::struct_value& sv) {
return sv.fields[Desc::template index_of<Name>];
}

} // namespace datalake
Loading
Loading