Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/v/cloud_topics/level_one/compaction/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ compaction_filter::compaction_filter(
ss::future<> compaction_filter::maybe_index_offset_delta(
const model::record_batch& b,
const model::record& r,
std::vector<int32_t>& offset_deltas) const {
chunked_vector<int32_t>& offset_deltas) const {
if (co_await compaction::is_latest_record_for_key(_map, b, r)) {
offset_deltas.push_back(r.offset_delta());
}
}

ss::future<std::vector<int32_t>>
ss::future<chunked_vector<int32_t>>
compaction_filter::compute_offset_deltas_to_keep(
const model::record_batch& b) const {
std::vector<int32_t> offset_deltas;
chunked_vector<int32_t> offset_deltas;
offset_deltas.reserve(b.record_count());

co_await b.for_each_record_async(
Expand All @@ -53,7 +53,7 @@ compaction_filter::compute_offset_deltas_to_keep(

ss::future<std::optional<model::record_batch>>
compaction_filter::filter_batch_with_offset_deltas(
model::record_batch b, std::vector<int32_t> offset_deltas) const {
model::record_batch b, chunked_vector<int32_t> offset_deltas) const {
co_return co_await do_filter_batch(std::move(b), std::move(offset_deltas));
}

Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_topics/level_one/compaction/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ class compaction_filter : public compaction::filter {
ss::future<> maybe_index_offset_delta(
const model::record_batch&,
const model::record&,
std::vector<int32_t>&) const;
chunked_vector<int32_t>&) const;

ss::future<std::vector<int32_t>>
ss::future<chunked_vector<int32_t>>
compute_offset_deltas_to_keep(const model::record_batch&) const final;

ss::future<std::optional<model::record_batch>>
filter_batch_with_offset_deltas(
model::record_batch, std::vector<int32_t>) const final;
model::record_batch, chunked_vector<int32_t>) const final;

private:
const compaction::key_offset_map& _map;
Expand Down
1 change: 1 addition & 0 deletions src/v/compaction/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ redpanda_cc_library(
":reducer",
":types",
"//src/v/bytes",
"//src/v/container:chunked_vector",
"//src/v/model",
"@seastar",
],
Expand Down
7 changes: 4 additions & 3 deletions src/v/compaction/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "filter.h"

#include "compaction/utils.h"
#include "container/chunked_vector.h"
#include "model/batch_compression.h"
#include "model/record.h"

Expand Down Expand Up @@ -37,16 +38,16 @@ filter::filter_batch(model::record_batch b) const {
}

// compute which records to keep
std::vector<int32_t> offset_deltas = co_await compute_offset_deltas_to_keep(
b);
chunked_vector<int32_t> offset_deltas
= co_await compute_offset_deltas_to_keep(b);

auto ret = co_await filter_batch_with_offset_deltas(
std::move(b), std::move(offset_deltas));
co_return ret;
}

ss::future<std::optional<model::record_batch>> filter::do_filter_batch(
model::record_batch b, std::vector<int32_t> offset_deltas) const {
model::record_batch b, chunked_vector<int32_t> offset_deltas) const {
// no records to keep
if (offset_deltas.empty()) {
co_return std::nullopt;
Expand Down
6 changes: 3 additions & 3 deletions src/v/compaction/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ class filter {
// Creates a new batch based on the provided batch and offset_deltas
// indicated.
ss::future<std::optional<model::record_batch>> do_filter_batch(
model::record_batch b, std::vector<int32_t> offset_deltas) const;
model::record_batch b, chunked_vector<int32_t> offset_deltas) const;

private:
// For a given batch, this function should return a vector containing offset
// deltas from records in the batch which we intend on keeping when
// performing record batch filtering.
virtual ss::future<std::vector<int32_t>>
virtual ss::future<chunked_vector<int32_t>>
compute_offset_deltas_to_keep(const model::record_batch& b) const = 0;

// For most implementations, this should serve as a pass through function to
Expand All @@ -60,7 +60,7 @@ class filter {
// placeholder batch if `offset_deltas` is empty.
virtual ss::future<std::optional<model::record_batch>>
filter_batch_with_offset_deltas(
model::record_batch b, std::vector<int32_t> offset_deltas) const
model::record_batch b, chunked_vector<int32_t> offset_deltas) const
= 0;

// Computes offset deltas from the batch to keep, and then filters the
Expand Down
9 changes: 5 additions & 4 deletions src/v/compaction/tests/simple_reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ class simple_map_filter : public filter {
ss::future<> maybe_index_offset_delta(
const model::record_batch& b,
const model::record& r,
std::vector<int32_t>& offset_deltas) const {
chunked_vector<int32_t>& offset_deltas) const {
if (co_await is_latest_record_for_key(_map, b, r)) {
offset_deltas.push_back(r.offset_delta());
}
}

ss::future<std::vector<int32_t>>
ss::future<chunked_vector<int32_t>>
compute_offset_deltas_to_keep(const model::record_batch& b) const final {
std::vector<int32_t> offset_deltas;
chunked_vector<int32_t> offset_deltas;
offset_deltas.reserve(b.record_count());

co_await b.for_each_record_async(
Expand All @@ -58,7 +58,8 @@ class simple_map_filter : public filter {

ss::future<std::optional<model::record_batch>>
filter_batch_with_offset_deltas(
model::record_batch b, std::vector<int32_t> offset_deltas) const final {
model::record_batch b,
chunked_vector<int32_t> offset_deltas) const final {
co_return co_await do_filter_batch(
std::move(b), std::move(offset_deltas));
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ss::future<> copy_data_segment_reducer::maybe_keep_offset(
const model::record_batch& batch,
const model::record& r,
bool is_last_record_in_batch,
std::vector<int32_t>& offset_deltas) {
chunked_vector<int32_t>& offset_deltas) {
if (co_await _should_keep_fn(batch, r, is_last_record_in_batch)) {
offset_deltas.push_back(r.offset_delta());
co_return;
Expand Down Expand Up @@ -166,7 +166,7 @@ copy_data_segment_reducer::filter(model::record_batch batch) {
}

// 1. compute which records to keep
std::vector<int32_t> offset_deltas;
chunked_vector<int32_t> offset_deltas;
offset_deltas.reserve(batch.record_count());

int32_t records_seen = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class copy_data_segment_reducer : public compaction_reducer {
const model::record_batch&,
const model::record&,
bool,
std::vector<int32_t>&);
chunked_vector<int32_t>&);

ss::future<std::optional<model::record_batch>> filter(model::record_batch);

Expand Down
Loading