Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 223 additions & 10 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{
FilterBuilder, FilterPredicate, IndexIterator, IterationStrategy, SlicesIterator,
filter_record_batch,
};
use crate::take::take_record_batch;
use arrow_array::cast::AsArray;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
use arrow_schema::{ArrowError, DataType, SchemaRef};
Expand Down Expand Up @@ -238,6 +242,83 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if supports_fused_inline_binary_view_filter(&batch) {
if filter.len() > batch.num_rows() {
return Err(ArrowError::InvalidArgumentError(format!(
"Filter predicate of length {} is larger than target array of length {}",
filter.len(),
batch.num_rows()
)));
}

let mut filter_builder = FilterBuilder::new(filter);
if batch.num_columns() > 1 {
filter_builder = filter_builder.optimize();
}
let predicate = filter_builder.build();
let selected_count = predicate.count();

if selected_count == 0 {
return Ok(());
}

if selected_count == batch.num_rows() && filter.len() == batch.num_rows() {
return self.push_batch(batch);
}

if let Some(limit) = self.biggest_coalesce_batch_size {
if selected_count > limit {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}
}

// For dense inline filters, the existing filter kernel remains faster.
if selected_count.saturating_mul(4) > filter.len() {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let space_in_batch = self.target_batch_size - self.buffered_rows;
if selected_count <= space_in_batch {
let (_schema, arrays, _num_rows) = batch.into_parts();

if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}

self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});

let result = (|| {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows_by_filter(&predicate)?;
}

self.buffered_rows += selected_count;
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}

Ok(())
})();

for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

return result;
}
}

// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
Expand Down Expand Up @@ -588,6 +669,31 @@ fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn
}
}

fn supports_fused_inline_binary_view_filter(batch: &RecordBatch) -> bool {
let mut has_inline_binary_view = false;

let supported = batch
.schema()
.fields()
.iter()
.zip(batch.columns())
.all(|(field, array)| {
if field.data_type().is_primitive() {
return true;
}

let Some(binary_view) = array.as_binary_view_opt() else {
return false;
};

let inline = binary_view.data_buffers().is_empty();
has_inline_binary_view |= inline;
inline
});

supported && has_inline_binary_view
}

/// Incrementally builds up arrays
///
/// [`GenericInProgressArray`] is the default implementation that buffers
Expand All @@ -611,6 +717,39 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows selected by `filter` from the current source array.
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
match filter.strategy() {
IterationStrategy::None => Ok(()),
IterationStrategy::All => self.copy_rows(0, filter.count()),
IterationStrategy::Slices(slices) => {
for &(start, end) in slices {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::SlicesIterator => {
for (start, end) in SlicesIterator::new(filter.filter_array()) {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::Indices(indices) => self.copy_rows_by_indices(indices),
IterationStrategy::IndexIterator => {
let indices = IndexIterator::new(filter.filter_array(), filter.count()).collect();
self.copy_rows_by_indices(&indices)
}
}
}

/// Copy rows at the specified indices from the current source array.
fn copy_rows_by_indices(&mut self, indices: &[usize]) -> Result<(), ArrowError> {
for &idx in indices {
self.copy_rows(idx, 1)?;
}
Ok(())
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand Down Expand Up @@ -1197,6 +1336,78 @@ mod tests {
.run();
}

#[test]
fn test_binary_view_filtered() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 2 == 0)));

Test::new("coalesce_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(256)
.with_expected_output_sizes(vec![256, 256, 256, 232])
.run();
}

#[test]
fn test_binary_view_filtered_inline() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_binary_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[test]
fn test_mixed_inline_binary_view_filtered() {
let int_values = Int32Array::from_iter((0..1000).map(Some));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
let binary_view = BinaryViewArray::from_iter(
std::iter::repeat(binary_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("b", Arc::new(binary_view) as ArrayRef),
])
.unwrap();

let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_mixed_inline_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
Expand Down Expand Up @@ -1701,18 +1912,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();

for column in columns.iter_mut() {
let Some(string_view) = column.as_string_view_opt() else {
if let Some(string_view) = column.as_string_view_opt() {
// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
}
*column = Arc::new(builder.finish());
continue;
};
}

// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
if let Some(binary_view) = column.as_binary_view_opt() {
*column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
// Update the column with the new StringViewArray
*column = Arc::new(builder.finish());
}

let options = RecordBatchOptions::new().with_row_count(Some(row_count));
Expand Down
Loading
Loading