Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
619f14a
Add bloom filter folding to automatically size SBBF filters
adriangb Mar 30, 2026
9f58be2
Revert default FPP change, add documentation with references
adriangb Mar 30, 2026
a262533
Fix rustdoc lint errors in bloom filter folding docs
adriangb Mar 30, 2026
9670203
fmt
adriangb Mar 30, 2026
dc00ce0
write proof
friendlymatthew Mar 30, 2026
f3dd1b8
Update parquet/src/bloom_filter/mod.rs
adriangb Mar 30, 2026
cedad54
Update parquet/src/bloom_filter/mod.rs
adriangb Mar 31, 2026
ffbb59f
Update parquet/src/bloom_filter/mod.rs
adriangb Mar 31, 2026
07c9902
address pr feedback
adriangb Mar 31, 2026
c07c7d7
tweak docs, rename variables to encapsulate domains
adriangb Mar 31, 2026
a95b638
add test for fixed size / ndv filters
adriangb Mar 31, 2026
c6dc8a2
simplify
adriangb Mar 31, 2026
d7589a8
simplify
adriangb Mar 31, 2026
a1090ed
tweak test
adriangb Mar 31, 2026
70a1521
fix bloom filter NDV test to actually test underestimate path
adriangb Apr 1, 2026
850143f
Update parquet/src/bloom_filter/mod.rs
adriangb Apr 1, 2026
d0656aa
optimize bloom filter folding performance and add benchmark
adriangb Apr 1, 2026
7e33617
further optimize bloom filter folding: zero-alloc, SIMD-friendly codegen
adriangb Apr 1, 2026
9a6d741
move cast
adriangb Apr 1, 2026
fbea1e1
lint
adriangb Apr 1, 2026
8611ba8
address review: assert even block count in estimated_fpp_after_fold, …
adriangb Apr 1, 2026
a752620
use tree reduction to determine fold count, then fold in single pass
adriangb Apr 1, 2026
b2a0e3f
refactor: split fold_to_target_fpp into num_folds_for_target_fpp + fo…
adriangb Apr 1, 2026
0f3500a
add insert-only benchmark to isolate fold overhead
adriangb Apr 1, 2026
a1713fa
use analytical FPP estimate to determine fold count, eliminating scra…
adriangb Apr 1, 2026
50eff0d
lint
adriangb Apr 1, 2026
bb6c3a9
keep BloomFilterProperties::ndv as u64 to avoid breaking API change
adriangb Apr 4, 2026
eb88b69
fmt
adriangb Apr 4, 2026
e8e8b8b
add power of 2 assertion
adriangb Apr 5, 2026
0c6b449
update docstring
adriangb Apr 5, 2026
390fbed
add more docstrings
adriangb Apr 5, 2026
07bf922
more docs
adriangb Apr 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,9 @@ name = "row_selection_cursor"
harness = false
required-features = ["arrow"]

[[bench]]
name = "bloom_filter"
harness = false

[lib]
bench = false
113 changes: 113 additions & 0 deletions parquet/benches/bloom_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use parquet::bloom_filter::Sbbf;

/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values` distinct values,
/// and return it ready for folding.
fn build_filter(initial_ndv: u64, fpp: f64, num_values: u64) -> Sbbf {
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
for i in 0..num_values {
sbbf.insert(&i);
}
sbbf
}

fn bench_fold_to_target_fpp(c: &mut Criterion) {
let mut group = c.benchmark_group("fold_to_target_fpp");

// Realistic scenario: filter sized for 1M NDV, varying actual distinct values
let initial_ndv = 1_000_000u64;
let fpp = 0.05;

for num_values in [1_000u64, 10_000, 100_000] {
let filter = build_filter(initial_ndv, fpp, num_values);
let num_blocks = filter.num_blocks();
group.throughput(Throughput::Elements(num_blocks as u64));
group.bench_with_input(BenchmarkId::new("ndv", num_values), &filter, |b, filter| {
b.iter_batched(
|| filter.clone(),
|mut f| {
f.fold_to_target_fpp(fpp);
f
},
criterion::BatchSize::SmallInput,
);
});
}
group.finish();
}

fn bench_insert_and_fold(c: &mut Criterion) {
let mut group = c.benchmark_group("insert_and_fold");

let initial_ndv = 1_000_000u64;
let fpp = 0.05;

for num_values in [1_000u64, 10_000, 100_000] {
group.throughput(Throughput::Elements(num_values));
group.bench_with_input(
BenchmarkId::new("values", num_values),
&num_values,
|b, &num_values| {
b.iter(|| {
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
for i in 0..num_values {
sbbf.insert(&i);
}
sbbf.fold_to_target_fpp(fpp);
sbbf
});
},
);
}
group.finish();
}

fn bench_insert_only(c: &mut Criterion) {
let mut group = c.benchmark_group("insert_only");

let initial_ndv = 1_000_000u64;
let fpp = 0.05;

for num_values in [1_000u64, 10_000, 100_000] {
group.throughput(Throughput::Elements(num_values));
group.bench_with_input(
BenchmarkId::new("values", num_values),
&num_values,
|b, &num_values| {
b.iter(|| {
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
for i in 0..num_values {
sbbf.insert(&i);
}
sbbf
});
},
);
}
group.finish();
}

criterion_group!(
benches,
bench_fold_to_target_fpp,
bench_insert_and_fold,
bench_insert_only
);
criterion_main!(benches);
15 changes: 9 additions & 6 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
};
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
Expand Down Expand Up @@ -423,14 +425,17 @@ pub struct ByteArrayEncoder {
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
bloom_filter_target_fpp: f64,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = dyn Array;
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
let mut sbbf = self.bloom_filter.take()?;
sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
Some(sbbf)
}

fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
Expand All @@ -443,10 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let fallback = FallbackEncoder::new(descr, props)?;

let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;
let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;

let statistics_enabled = props.statistics_enabled(descr.path());

Expand All @@ -456,6 +458,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
fallback,
statistics_enabled,
bloom_filter,
bloom_filter_target_fpp,
dict_encoder: dictionary,
min_value: None,
max_value: None,
Expand Down
47 changes: 44 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2681,6 +2681,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
bloom_filter_ndv: Option<u64>,
bloom_filter_position: BloomFilterPosition,
}

Expand All @@ -2692,6 +2693,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
bloom_filter_ndv: None,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
Expand All @@ -2712,6 +2714,7 @@ mod tests {
values,
schema,
bloom_filter,
bloom_filter_ndv,
bloom_filter_position,
} = options;

Expand Down Expand Up @@ -2750,15 +2753,18 @@ mod tests {
for encoding in &encodings {
for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
for row_group_size in row_group_sizes {
let props = WriterProperties::builder()
let mut builder = WriterProperties::builder()
.set_writer_version(version)
.set_max_row_group_row_count(Some(row_group_size))
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(bloom_filter_position)
.build();
.set_bloom_filter_position(bloom_filter_position);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
}
let props = builder.build();

files.push(roundtrip_opts(&expected_batch, props))
}
Expand Down Expand Up @@ -3142,6 +3148,41 @@ mod tests {
);
}

/// Test that bloom filter folding produces correct results even when
/// the configured NDV differs significantly from actual NDV.
/// A large NDV means a larger initial filter that gets folded down;
/// a small NDV means a smaller initial filter.
#[test]
fn i32_column_bloom_filter_fixed_ndv() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));

// NDV much larger than actual distinct values — tests folding a large filter down
let mut options = RoundTripOptions::new(array.clone(), false);
options.bloom_filter = true;
options.bloom_filter_ndv = Some(1_000_000);

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);

// NDV smaller than actual distinct values — tests the underestimate path
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array has only 7 distinct value. So "NDV smaller than actual distinct values" seems incorrect?

let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;
options.bloom_filter_ndv = Some(3);

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn binary_column_bloom_filter() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
Expand Down
Loading
Loading