diff --git a/src/v/cloud_topics/level_one/compaction/filter.cc b/src/v/cloud_topics/level_one/compaction/filter.cc index e262a4600d3e4..0be4d171065c5 100644 --- a/src/v/cloud_topics/level_one/compaction/filter.cc +++ b/src/v/cloud_topics/level_one/compaction/filter.cc @@ -31,16 +31,16 @@ compaction_filter::compaction_filter( ss::future<> compaction_filter::maybe_index_offset_delta( const model::record_batch& b, const model::record& r, - std::vector& offset_deltas) const { + chunked_vector& offset_deltas) const { if (co_await compaction::is_latest_record_for_key(_map, b, r)) { offset_deltas.push_back(r.offset_delta()); } } -ss::future> +ss::future> compaction_filter::compute_offset_deltas_to_keep( const model::record_batch& b) const { - std::vector offset_deltas; + chunked_vector offset_deltas; offset_deltas.reserve(b.record_count()); co_await b.for_each_record_async( @@ -53,7 +53,7 @@ compaction_filter::compute_offset_deltas_to_keep( ss::future> compaction_filter::filter_batch_with_offset_deltas( - model::record_batch b, std::vector offset_deltas) const { + model::record_batch b, chunked_vector offset_deltas) const { co_return co_await do_filter_batch(std::move(b), std::move(offset_deltas)); } diff --git a/src/v/cloud_topics/level_one/compaction/filter.h b/src/v/cloud_topics/level_one/compaction/filter.h index e0d0da44f0799..0bb40b16f2d7c 100644 --- a/src/v/cloud_topics/level_one/compaction/filter.h +++ b/src/v/cloud_topics/level_one/compaction/filter.h @@ -26,14 +26,14 @@ class compaction_filter : public compaction::filter { ss::future<> maybe_index_offset_delta( const model::record_batch&, const model::record&, - std::vector&) const; + chunked_vector&) const; - ss::future> + ss::future> compute_offset_deltas_to_keep(const model::record_batch&) const final; ss::future> filter_batch_with_offset_deltas( - model::record_batch, std::vector) const final; + model::record_batch, chunked_vector) const final; private: const compaction::key_offset_map& _map; diff --git a/src/v/compaction/BUILD b/src/v/compaction/BUILD index 0543187d49c9c..f6be4e03c431b 100644 --- a/src/v/compaction/BUILD +++ b/src/v/compaction/BUILD @@ -115,6 +115,7 @@ redpanda_cc_library( ":reducer", ":types", "//src/v/bytes", + "//src/v/container:chunked_vector", "//src/v/model", "@seastar", ], diff --git a/src/v/compaction/filter.cc b/src/v/compaction/filter.cc index 008ac8182af53..d4fe4f5b7b048 100644 --- a/src/v/compaction/filter.cc +++ b/src/v/compaction/filter.cc @@ -10,6 +10,7 @@ #include "filter.h" #include "compaction/utils.h" +#include "container/chunked_vector.h" #include "model/batch_compression.h" #include "model/record.h" @@ -37,8 +38,8 @@ filter::filter_batch(model::record_batch b) const { } // compute which records to keep - std::vector offset_deltas = co_await compute_offset_deltas_to_keep( - b); + chunked_vector offset_deltas + = co_await compute_offset_deltas_to_keep(b); auto ret = co_await filter_batch_with_offset_deltas( std::move(b), std::move(offset_deltas)); @@ -46,7 +47,7 @@ filter::filter_batch(model::record_batch b) const { } ss::future> filter::do_filter_batch( - model::record_batch b, std::vector offset_deltas) const { + model::record_batch b, chunked_vector offset_deltas) const { // no records to keep if (offset_deltas.empty()) { co_return std::nullopt; diff --git a/src/v/compaction/filter.h b/src/v/compaction/filter.h index db1f4d5b3ad62..e335040d5a0ae 100644 --- a/src/v/compaction/filter.h +++ b/src/v/compaction/filter.h @@ -44,13 +44,13 @@ class filter { // Creates a new batch based on the provided batch and offset_deltas // indicated. ss::future> do_filter_batch( - model::record_batch b, std::vector offset_deltas) const; + model::record_batch b, chunked_vector offset_deltas) const; private: // For a given batch, this function should return a vector containing offset // deltas from records in the batch which we intend on keeping when // performing record batch filtering. - virtual ss::future> + virtual ss::future> compute_offset_deltas_to_keep(const model::record_batch& b) const = 0; // For most implementations, this should serve as a pass through function to @@ -60,7 +60,7 @@ class filter { // placeholder batch if `offset_deltas` is empty. virtual ss::future> filter_batch_with_offset_deltas( - model::record_batch b, std::vector offset_deltas) const + model::record_batch b, chunked_vector offset_deltas) const = 0; // Computes offset deltas from the batch to keep, and then filters the diff --git a/src/v/compaction/tests/simple_reducer.h b/src/v/compaction/tests/simple_reducer.h index 5752b5b5b4327..cb5eb10cd0e4d 100644 --- a/src/v/compaction/tests/simple_reducer.h +++ b/src/v/compaction/tests/simple_reducer.h @@ -37,15 +37,15 @@ class simple_map_filter : public filter { ss::future<> maybe_index_offset_delta( const model::record_batch& b, const model::record& r, - std::vector& offset_deltas) const { + chunked_vector& offset_deltas) const { if (co_await is_latest_record_for_key(_map, b, r)) { offset_deltas.push_back(r.offset_delta()); } } - ss::future> + ss::future> compute_offset_deltas_to_keep(const model::record_batch& b) const final { - std::vector offset_deltas; + chunked_vector offset_deltas; offset_deltas.reserve(b.record_count()); co_await b.for_each_record_async( @@ -58,7 +58,8 @@ class simple_map_filter : public filter { ss::future> filter_batch_with_offset_deltas( - model::record_batch b, std::vector offset_deltas) const final { + model::record_batch b, + chunked_vector offset_deltas) const final { co_return co_await do_filter_batch( std::move(b), std::move(offset_deltas)); } diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index ed11e5f86b745..5f1f5a8550b4b 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -125,7 +125,7 @@ ss::future<> copy_data_segment_reducer::maybe_keep_offset( const model::record_batch& batch, const model::record& r, bool is_last_record_in_batch, - std::vector& offset_deltas) { + chunked_vector& offset_deltas) { if (co_await _should_keep_fn(batch, r, is_last_record_in_batch)) { offset_deltas.push_back(r.offset_delta()); co_return; @@ -166,7 +166,7 @@ copy_data_segment_reducer::filter(model::record_batch batch) { } // 1. compute which records to keep - std::vector offset_deltas; + chunked_vector offset_deltas; offset_deltas.reserve(batch.record_count()); int32_t records_seen = 0; diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 7b49fdfc1fd02..6e5f64c500dcc 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -168,7 +168,7 @@ class copy_data_segment_reducer : public compaction_reducer { const model::record_batch&, const model::record&, bool, - std::vector&); + chunked_vector&); ss::future> filter(model::record_batch);