Skip to content

Commit 31bed1f

Browse files
authored
Merge pull request #30170 from andrwng/compile-time-iceberg-structs
datalake: compile-time schema descriptors for table_definition
2 parents d79feda + 9552c11 commit 31bed1f

5 files changed

Lines changed: 324 additions & 103 deletions

File tree

src/v/datalake/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,16 @@ redpanda_cc_library(
238238
"table_definition.cc",
239239
],
240240
hdrs = [
241+
"schema_descriptor.h",
241242
"table_definition.h",
242243
],
243244
visibility = [":__subpackages__"],
244245
deps = [
246+
"//src/v/iceberg:datatypes",
245247
"//src/v/iceberg:schema",
248+
"//src/v/iceberg:values",
249+
"//src/v/model",
250+
"@seastar",
246251
],
247252
)
248253

src/v/datalake/record_translator.cc

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -67,56 +67,6 @@ std::optional<size_t> get_redpanda_idx(const iceberg::struct_type& val_type) {
6767
return std::nullopt;
6868
}
6969

70-
// Builds a struct value meant to be used as the base of the "redpanda" struct.
71-
// Additional fields specific to the mode (e.g. "value" for key-value mode) may
72-
// be appended to the end.
73-
std::unique_ptr<iceberg::struct_value> build_rp_struct(
74-
model::partition_id pid,
75-
kafka::offset o,
76-
std::optional<iobuf> key,
77-
model::timestamp ts,
78-
model::timestamp_type ts_t,
79-
const chunked_vector<model::record_header>& headers) {
80-
auto system_data = std::make_unique<iceberg::struct_value>();
81-
system_data->fields.reserve(6);
82-
83-
system_data->fields.emplace_back(iceberg::int_value(pid));
84-
system_data->fields.emplace_back(iceberg::long_value(o));
85-
// NOTE: Kafka uses milliseconds, Iceberg uses microseconds.
86-
system_data->fields.emplace_back(
87-
iceberg::timestamptz_value(ts.value() * 1000));
88-
89-
if (headers.empty()) {
90-
system_data->fields.emplace_back(std::nullopt);
91-
} else {
92-
auto headers_list = std::make_unique<iceberg::list_value>();
93-
for (const auto& hdr : headers) {
94-
auto header_kv_struct = std::make_unique<iceberg::struct_value>();
95-
header_kv_struct->fields.emplace_back(
96-
hdr.key_size() >= 0 ? std::make_optional<iceberg::value>(
97-
iceberg::string_value(hdr.key().copy()))
98-
: std::nullopt);
99-
header_kv_struct->fields.emplace_back(
100-
hdr.value_size() >= 0
101-
? std::make_optional<iceberg::value>(
102-
iceberg::binary_value(hdr.value().copy()))
103-
: std::nullopt);
104-
headers_list->elements.emplace_back(std::move(header_kv_struct));
105-
}
106-
system_data->fields.emplace_back(std::move(headers_list));
107-
}
108-
109-
system_data->fields.emplace_back(
110-
key ? std::make_optional<iceberg::value>(
111-
iceberg::binary_value(std::move(*key)))
112-
: std::nullopt);
113-
114-
system_data->fields.emplace_back(
115-
iceberg::int_value{static_cast<int32_t>(ts_t)});
116-
117-
return system_data;
118-
}
119-
12070
} // namespace
12171

12272
record_type
@@ -164,7 +114,10 @@ key_value_translator::build_type(std::optional<shared_resolved_type_t>) {
164114
auto ret_type = schemaless_struct_type();
165115
ret_type.fields.emplace_back(
166116
iceberg::nested_field::create(
167-
11, "value", iceberg::field_required::no, iceberg::binary_type{}));
117+
schemaless_next_field_id,
118+
"value",
119+
iceberg::field_required::no,
120+
iceberg::binary_type{}));
168121
return record_type{
169122
.comps = record_schema_components{
170123
.key_identifier = std::nullopt,
@@ -238,12 +191,14 @@ record_type structured_data_translator::build_type(
238191
if (field->name == rp_struct_name) {
239192
// To avoid collisions, move user fields named "redpanda" into
240193
// the nested "redpanda" system field.
241-
auto& system_fields = std::get<iceberg::struct_type>(
242-
ret_type.fields[0]->type);
194+
auto& system_fields = rp_struct_type(ret_type);
243195
// Use the next id of the system defaults.
244196
system_fields.fields.emplace_back(
245197
iceberg::nested_field::create(
246-
10, "data", field->required, std::move(field->type)));
198+
schemaless_next_field_id,
199+
"data",
200+
field->required,
201+
std::move(field->type)));
247202
continue;
248203
}
249204
// Add the extra user-defined fields.
@@ -307,10 +262,7 @@ structured_data_translator::translate_data(
307262
if (redpanda_field_idx == i) {
308263
// To avoid collisions, move user fields named "redpanda" into
309264
// the nested "redpanda" system field.
310-
auto& system_vals
311-
= std::get<std::unique_ptr<iceberg::struct_value>>(
312-
ret_data.fields[0].value());
313-
system_vals->fields.emplace_back(std::move(field));
265+
rp_struct_value(ret_data).fields.emplace_back(std::move(field));
314266
continue;
315267
}
316268
ret_data.fields.emplace_back(std::move(field));

src/v/datalake/schema_descriptor.h

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
#pragma once
11+
12+
#include "iceberg/datatypes.h"
13+
#include "iceberg/values.h"
14+
15+
#include <seastar/core/sstring.hh>
16+
17+
#include <algorithm>
18+
#include <array>
19+
#include <compare>
20+
#include <concepts>
21+
#include <cstddef>
22+
#include <optional>
23+
#include <string_view>
24+
#include <type_traits>
25+
#include <utility>
26+
27+
namespace datalake {
28+
29+
/// A compile-time string usable as a template parameter (C++20 NTTP).
30+
template<size_t N>
31+
struct ct_string {
32+
char data[N]{};
33+
constexpr ct_string(const char (&s)[N]) { std::copy_n(s, N, data); }
34+
constexpr operator std::string_view() const { return {data, N - 1}; }
35+
constexpr auto operator<=>(const ct_string&) const = default;
36+
};
37+
38+
/// A field descriptor: compile-time name + iceberg type.
39+
template<ct_string Name, typename T>
40+
struct field_desc {
41+
static constexpr std::string_view name{Name};
42+
using type = T;
43+
};
44+
45+
/// Forward declarations.
46+
template<typename... Fields>
47+
struct struct_desc;
48+
49+
template<typename ElementDesc>
50+
struct list_desc;
51+
52+
/// A nested schema descriptor exposes build_impl(int&) to contribute its
53+
/// subtree to the enclosing iceberg::struct_type. Leaf types (e.g.
54+
/// iceberg::int_type) do not satisfy this and are default-constructed
55+
/// as field types instead.
56+
template<typename T>
57+
concept nested_schema_desc = requires(int& next_id) {
58+
{ T::build_impl(next_id) };
59+
};
60+
61+
/// Find the index of a field by name. Fails to compile if not found.
62+
template<ct_string Name, typename... Fields>
63+
consteval size_t index_of_fn() {
64+
constexpr std::array names = {Fields::name...};
65+
for (size_t i = 0; i < names.size(); ++i) {
66+
if (names[i] == std::string_view{Name}) {
67+
return i;
68+
}
69+
}
70+
// If we reach here, the name was not found. This is consteval,
71+
// so the compiler will reject it with an error pointing here.
72+
throw "field name not found in descriptor";
73+
}
74+
75+
/// Check that given names match field names in order.
76+
/// Uses an array of field names passed directly to avoid partial
77+
/// specialization.
78+
template<ct_string... Names>
79+
consteval bool
80+
names_match_fn(std::array<std::string_view, sizeof...(Names)> field_names) {
81+
constexpr std::array given = {std::string_view{Names}...};
82+
for (size_t i = 0; i < field_names.size(); ++i) {
83+
if (field_names[i] != given[i]) {
84+
return false;
85+
}
86+
}
87+
return true;
88+
}
89+
90+
/// A named value for use with struct_desc::build_value. The name is
91+
/// checked at compile time to match the corresponding field descriptor.
92+
template<ct_string Name>
93+
struct val {
94+
std::optional<iceberg::value> v;
95+
96+
template<typename U>
97+
requires std::constructible_from<std::optional<iceberg::value>, U>
98+
val(U&& u) // NOLINT(google-explicit-constructor)
99+
: v(std::forward<U>(u)) {}
100+
};
101+
102+
template<typename... Fields>
103+
struct struct_desc {
104+
static constexpr size_t count = sizeof...(Fields);
105+
106+
/// Compile-time index of a field by name.
107+
template<ct_string Name>
108+
static constexpr size_t index_of = index_of_fn<Name, Fields...>();
109+
110+
/// Total number of fields in the tree (used for ID assignment).
111+
static int total_fields() {
112+
int next_id = 1;
113+
build_impl(next_id);
114+
return next_id;
115+
}
116+
117+
/// Build the runtime iceberg::struct_type.
118+
static iceberg::struct_type build() {
119+
int next_id = 1;
120+
return build_impl(next_id);
121+
}
122+
123+
static iceberg::struct_type build_impl(int& next_id) {
124+
iceberg::struct_type st;
125+
auto add = [&]<typename F>() {
126+
int my_id = next_id++;
127+
st.fields.push_back(
128+
iceberg::nested_field::create(
129+
my_id,
130+
ss::sstring{F::name},
131+
iceberg::field_required::no,
132+
build_iceberg_type<F>(next_id)));
133+
};
134+
(add.template operator()<Fields>(), ...);
135+
return st;
136+
}
137+
138+
/// Build a struct_value with compile-time name and arity checking.
139+
/// Each argument is a val<"field_name"> matching the descriptor.
140+
///
141+
/// Wrong arity, wrong name, or wrong order = compile error.
142+
template<ct_string... Names>
143+
static std::unique_ptr<iceberg::struct_value>
144+
build_value(val<Names>... args) {
145+
static_assert(sizeof...(Names) == count, "wrong number of fields");
146+
static_assert(
147+
names_match_fn<Names...>({Fields::name...}),
148+
"field names don't match descriptor or are in wrong order");
149+
auto sv = std::make_unique<iceberg::struct_value>();
150+
sv->fields.reserve(count);
151+
(sv->fields.emplace_back(std::move(args.v)), ...);
152+
return sv;
153+
}
154+
155+
private:
156+
template<typename F>
157+
static auto build_iceberg_type(int& next_id) {
158+
if constexpr (nested_schema_desc<typename F::type>) {
159+
return F::type::build_impl(next_id);
160+
} else {
161+
return typename F::type{};
162+
}
163+
}
164+
};
165+
166+
template<typename ElementDesc>
167+
struct list_desc;
168+
169+
template<typename... Fields>
170+
struct list_desc<struct_desc<Fields...>> {
171+
static iceberg::list_type build_impl(int& next_id) {
172+
int element_id = next_id++;
173+
return iceberg::list_type::create(
174+
element_id,
175+
iceberg::field_required::no,
176+
struct_desc<Fields...>::build_impl(next_id));
177+
}
178+
};
179+
180+
template<typename Desc, ct_string Name>
181+
iceberg::nested_field& type_field(iceberg::struct_type& st) {
182+
return *st.fields[Desc::template index_of<Name>];
183+
}
184+
185+
template<typename Desc, ct_string Name>
186+
const iceberg::nested_field& type_field(const iceberg::struct_type& st) {
187+
return *st.fields[Desc::template index_of<Name>];
188+
}
189+
190+
template<typename Desc, ct_string Name>
191+
std::optional<iceberg::value>& value_field(iceberg::struct_value& sv) {
192+
return sv.fields[Desc::template index_of<Name>];
193+
}
194+
195+
template<typename Desc, ct_string Name>
196+
const std::optional<iceberg::value>&
197+
value_field(const iceberg::struct_value& sv) {
198+
return sv.fields[Desc::template index_of<Name>];
199+
}
200+
201+
} // namespace datalake

0 commit comments

Comments
 (0)