diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index efcd1fe2190b..696e9b5411eb 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -275,5 +275,9 @@ name = "row_selection_cursor" harness = false required-features = ["arrow"] +[[bench]] +name = "bloom_filter" +harness = false + [lib] bench = false diff --git a/parquet/benches/bloom_filter.rs b/parquet/benches/bloom_filter.rs new file mode 100644 index 000000000000..ca4f900067f8 --- /dev/null +++ b/parquet/benches/bloom_filter.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use parquet::bloom_filter::Sbbf; + +/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values` distinct values, +/// and return it ready for folding. +fn build_filter(initial_ndv: u64, fpp: f64, num_values: u64) -> Sbbf { + let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap(); + for i in 0..num_values { + sbbf.insert(&i); + } + sbbf +} + +fn bench_fold_to_target_fpp(c: &mut Criterion) { + let mut group = c.benchmark_group("fold_to_target_fpp"); + + // Realistic scenario: filter sized for 1M NDV, varying actual distinct values + let initial_ndv = 1_000_000u64; + let fpp = 0.05; + + for num_values in [1_000u64, 10_000, 100_000] { + let filter = build_filter(initial_ndv, fpp, num_values); + let num_blocks = filter.num_blocks(); + group.throughput(Throughput::Elements(num_blocks as u64)); + group.bench_with_input(BenchmarkId::new("ndv", num_values), &filter, |b, filter| { + b.iter_batched( + || filter.clone(), + |mut f| { + f.fold_to_target_fpp(fpp); + f + }, + criterion::BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +fn bench_insert_and_fold(c: &mut Criterion) { + let mut group = c.benchmark_group("insert_and_fold"); + + let initial_ndv = 1_000_000u64; + let fpp = 0.05; + + for num_values in [1_000u64, 10_000, 100_000] { + group.throughput(Throughput::Elements(num_values)); + group.bench_with_input( + BenchmarkId::new("values", num_values), + &num_values, + |b, &num_values| { + b.iter(|| { + let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap(); + for i in 0..num_values { + sbbf.insert(&i); + } + sbbf.fold_to_target_fpp(fpp); + sbbf + }); + }, + ); + } + group.finish(); +} + +fn bench_insert_only(c: &mut Criterion) { + let mut group = c.benchmark_group("insert_only"); + + let initial_ndv = 1_000_000u64; + let fpp = 0.05; + + for num_values in [1_000u64, 10_000, 100_000] { + group.throughput(Throughput::Elements(num_values)); + group.bench_with_input( + BenchmarkId::new("values", num_values), + &num_values, + |b, &num_values| { + b.iter(|| { + let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap(); + for i in 0..num_values { + sbbf.insert(&i); + } + sbbf + }); + }, + ); + } + group.finish(); +} + +criterion_group!( + benches, + bench_fold_to_target_fpp, + bench_insert_and_fold, + bench_insert_only +); +criterion_main!(benches); diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 228d229b3088..f56f9570adfb 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -17,7 +17,9 @@ use crate::basic::Encoding; use crate::bloom_filter::Sbbf; -use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage}; +use crate::column::writer::encoder::{ + ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter, +}; use crate::data_type::{AsBytes, ByteArray, Int32Type}; use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; use crate::encodings::rle::RleEncoder; @@ -423,6 +425,7 @@ pub struct ByteArrayEncoder { min_value: Option, max_value: Option, bloom_filter: Option, + bloom_filter_target_fpp: f64, geo_stats_accumulator: Option>, } @@ -430,7 +433,9 @@ impl ColumnValueEncoder for ByteArrayEncoder { type T = ByteArray; type Values = dyn Array; fn flush_bloom_filter(&mut self) -> Option { - self.bloom_filter.take() + let mut sbbf = self.bloom_filter.take()?; + sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp); + Some(sbbf) } fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result @@ -443,10 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { let fallback = FallbackEncoder::new(descr, props)?; - let bloom_filter = props - .bloom_filter_properties(descr.path()) - .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) - .transpose()?; + let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?; let statistics_enabled = props.statistics_enabled(descr.path()); @@ -456,6 +458,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { fallback, statistics_enabled, bloom_filter, + bloom_filter_target_fpp, dict_encoder: dictionary, min_value: None, max_value: None, diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2ef71d5745a2..8422263b1f63 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2681,6 +2681,7 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, + bloom_filter_ndv: Option, bloom_filter_position: BloomFilterPosition, } @@ -2692,6 +2693,7 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, + bloom_filter_ndv: None, bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } @@ -2712,6 +2714,7 @@ mod tests { values, schema, bloom_filter, + bloom_filter_ndv, bloom_filter_position, } = options; @@ -2750,15 +2753,18 @@ mod tests { for encoding in &encodings { for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] { for row_group_size in row_group_sizes { - let props = WriterProperties::builder() + let mut builder = WriterProperties::builder() .set_writer_version(version) .set_max_row_group_row_count(Some(row_group_size)) .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) - .set_bloom_filter_position(bloom_filter_position) - .build(); + .set_bloom_filter_position(bloom_filter_position); + if let Some(ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(ndv); + } + let props = builder.build(); files.push(roundtrip_opts(&expected_batch, props)) } @@ -3142,6 +3148,41 @@ mod tests { ); } + /// Test that bloom filter folding produces correct results even when + /// the configured NDV differs significantly from actual NDV. + /// A large NDV means a larger initial filter that gets folded down; + /// a small NDV means a smaller initial filter. + #[test] + fn i32_column_bloom_filter_fixed_ndv() { + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + + // NDV much larger than actual distinct values — tests folding a large filter down + let mut options = RoundTripOptions::new(array.clone(), false); + options.bloom_filter = true; + options.bloom_filter_ndv = Some(1_000_000); + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + + // NDV smaller than actual distinct values — tests the underestimate path + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + options.bloom_filter_ndv = Some(3); + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + #[test] fn binary_column_bloom_filter() { let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 933b5a269fff..8e89ba406b59 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -68,6 +68,46 @@ //! | 1,000,000 | 0.00001 | 131,072 | 4,096 | //! | 1,000,000 | 0.000001 | 262,144 | 8,192 | //! +//! # Structure: Filter → Blocks → Words → Bits +//! +//! An SBBF is an array of **blocks**. Each block is 256 bits (32 bytes), +//! divided into eight 32-bit **words**. A word is just a `u32` — an array of +//! 32 individual bits that can each be "set" (1) or "not set" (0). +//! +//! ```text +//! Sbbf (the whole filter) +//! ┌──────────┬──────────┬──────────┬─── ─── ──┬──────────┐ +//! │ Block 0 │ Block 1 │ Block 2 │ ... │ Block N-1│ +//! └──────────┴──────────┴──────────┴─── ─── ──┴──────────┘ +//! │ +//! ▼ +//! One Block = 256 bits = 8 words +//! ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐ +//! │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │ +//! │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ +//! └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘ +//! │ +//! ▼ +//! One Word = 32 individual bits +//! ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ +//! │0│0│1│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│ ← bit 29 is set +//! └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘ +//! ``` +//! +//! **Inserting** a value hashes it to a 64-bit number, then: +//! 1. The upper 32 bits pick which **block** (via `Sbbf::hash_to_block_index`). +//! 2. The lower 32 bits pick one bit position in each of the 8 **words** (via `Block::mask`). +//! So each insert sets exactly **8 bits** (one per word) in a single block. +//! +//! **Checking** does the same two steps and returns `true` only if all 8 bits +//! are already set — meaning the value was *probably* inserted (or is a false +//! positive). +//! +//! # Bloom Filter Folding +//! +//! After inserting all values into a bloom filter it can be "folded" to minimize it's size. +//! See [`Sbbf::fold_to_target_fpp`] for details on the algorithm and its mathematical basis. +//! //! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md //! [sbbf-paper]: https://arxiv.org/pdf/2101.01719 //! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf @@ -114,23 +154,50 @@ pub struct BloomFilterHeader { } ); -/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. -/// Each word is thought of as an array of bits; each bit is either "set" or "not set". +/// A single 256-bit block, the basic unit of the Split Block Bloom Filter. +/// +/// A block is eight contiguous 32-bit **words** (`[u32; 8]`). +/// Each word is an independent bit-array of 32 positions: +/// +/// ```text +/// Block (256 bits total) +/// ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐ +/// │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │ +/// │ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ +/// └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘ +/// ``` +/// +/// When a value is inserted, [`Block::mask`] picks one bit in each word +/// (8 bits total), and those bits are OR'd in. When checking, we verify +/// all 8 bits are set. #[derive(Debug, Copy, Clone)] #[repr(transparent)] struct Block([u32; 8]); impl Block { const ZERO: Block = Block([0; 8]); - /// takes as its argument a single unsigned 32-bit integer and returns a block in which each - /// word has exactly one bit set. + /// Produce a block where each of the 8 words has exactly one bit set. + /// + /// For each word `i` the bit position is derived from `x`: + /// + /// ```text + /// y = (x wrapping* SALT[i]) >> 27 // top 5 bits → value in 0..31 + /// word[i] = 1 << y // exactly one bit set per word + /// ``` + /// + /// Because only the top 5 bits survive the shift, each word picks one of + /// 32 possible bit positions. The eight SALT constants spread the choices + /// so different words usually light up different positions. + /// + /// Key property: the mask depends *only* on `x` (a u32) and the fixed + /// SALT constants — it is independent of the filter size. This is why + /// folding preserves bit patterns (see Lemma 2 in tests). fn mask(x: u32) -> Self { let mut result = [0_u32; 8]; for i in 0..8 { - // wrapping instead of checking for overflow - let y = x.wrapping_mul(SALT[i]); - let y = y >> 27; - result[i] = 1 << y; + let y = x.wrapping_mul(SALT[i]); // spread bits via multiply + let y = y >> 27; // keep top 5 bits → 0..31 + result[i] = 1 << y; // set exactly that one bit } Self(result) } @@ -155,7 +222,10 @@ impl Block { self } - /// setting every bit in the block that was also set in the result from mask + /// OR the mask bits into this block (`block[i] |= mask[i]`). + /// + /// After insertion the 8 bits chosen by `mask(hash)` are guaranteed set; + /// bits previously set by other hashes are preserved. fn insert(&mut self, hash: u32) { let mask = Self::mask(hash); for i in 0..8 { @@ -163,7 +233,11 @@ impl Block { } } - /// returns true when every bit that is set in the result of mask is also set in the block. + /// Check membership: returns `true` when *every* bit from `mask(hash)` is + /// already set in this block (`block[i] & mask[i] != 0` for all 8 words). + /// + /// A `true` result means "probably present" (other inserts may have set + /// the same bits). A `false` is definitive — the value was never inserted. fn check(&self, hash: u32) -> bool { let mask = Self::mask(hash); for i in 0..8 { @@ -191,7 +265,55 @@ impl std::ops::IndexMut for Block { } } -/// A split block Bloom filter. +impl std::ops::BitOr for Block { + type Output = Self; + + #[inline] + fn bitor(self, rhs: Self) -> Self { + let mut result = [0u32; 8]; + for (i, item) in result.iter_mut().enumerate() { + *item = self.0[i] | rhs.0[i]; + } + Self(result) + } +} + +impl std::ops::BitOrAssign for Block { + #[inline] + fn bitor_assign(&mut self, rhs: Self) { + for i in 0..8 { + self.0[i] |= rhs.0[i]; + } + } +} + +impl Block { + /// Count the total number of set bits across all 8 words. + /// + /// Computes popcount on each word separately and sums. Keeping the popcount + /// separate from the OR allows the compiler to batch SIMD popcount instructions + /// (e.g., `cnt.16b` on ARM NEON) instead of interleaving them with OR operations. + #[inline] + fn count_ones(self) -> u32 { + // Written as a fold over the array so the compiler sees 8 independent + // popcount operations it can vectorize into cnt.16b + horizontal sum. + self.0.iter().map(|w| w.count_ones()).sum() + } +} + +/// A split block Bloom filter (SBBF). +/// +/// An SBBF partitions its bit space into fixed-size 256-bit (32-byte) blocks, each fitting in a +/// single CPU cache line. Each block contains eight 32-bit words, aligned with SIMD lanes for +/// parallel bit manipulation. When checking membership, only one block is accessed per query, +/// eliminating the cache-miss penalty of standard Bloom filters. +/// +/// ## Sizing and folding +/// +/// Filters are initially sized for a maximum expected number of distinct values (NDV) via +/// [`Sbbf::new_with_ndv_fpp`]. After all values are inserted, the filter is compacted by +/// calling [`Sbbf::fold_to_target_fpp`], which folds the filter down to the smallest size +/// that still meets the target false positive probability. /// /// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`] /// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default. @@ -395,10 +517,27 @@ impl Sbbf { Ok(Some(Self::new(&bitset))) } + /// Map a 64-bit hash to a block index in `[0, num_blocks)`. + /// + /// Uses the "multiply-and-shift" trick (a fast alternative to modulo): + /// + /// ```text + /// upper32 = hash >> 32 // take the top 32 bits of the hash + /// index = (upper32 * N) >> 32 // ∈ [0, N) where N = num_blocks + /// ``` + /// + /// Why this matters for folding (Lemma 1): when N is a power of two and + /// you halve it to N/2, the index also halves: + /// + /// ```text + /// index_N = (upper32 * N) >> 32 + /// index_N/2 = (upper32 * N/2) >> 32 = index_N / 2 (integer division) + /// ``` + /// + /// So the block that held hash `h` in the big filter is at `index / 2` in + /// the half-sized filter — exactly where `fold` ORs it. #[inline] fn hash_to_block_index(&self, hash: u64) -> usize { - // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul - // but it will not saturate (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize } @@ -431,6 +570,140 @@ impl Sbbf { self.0.capacity() * std::mem::size_of::() } + /// Returns the number of blocks in this bloom filter. + pub fn num_blocks(&self) -> usize { + self.0.len() + } + + /// Fold the bloom filter down to the smallest size that still meets the target FPP + /// (False Positive Percentage). + /// + /// Folds the filter by merging groups of adjacent blocks via bitwise OR, where each + /// fold level halves the number of blocks. The fold count is chosen as the maximum + /// number of folds whose estimated FPP stays within `target_fpp`. The filter stops + /// at a minimum size of 1 block (32 bytes). + /// + /// ## How it works + /// + /// SBBFs use multiplicative hashing for block selection: + /// + /// ```text + /// block_index = ((hash >> 32) * num_blocks) >> 32 + /// ``` + /// + /// A single fold halves the block count: when `num_blocks` is halved, the new index + /// becomes `floor(original_index / 2)`, so blocks `2i` and `2i+1` map to the same + /// position. More generally, `k` folds reduce the block count by `2^k`, merging + /// groups of `2^k` adjacent blocks in a single pass: + /// + /// ```text + /// folded[i] = blocks[i*2^k] | blocks[i*2^k + 1] | ... | blocks[i*2^k + 2^k - 1] + /// ``` + /// + /// This differs from standard Bloom filter folding, which merges the two halves + /// (`B[i] | B[i + m/2]`) because standard filters use modular hashing where + /// `h(x) mod (m/2)` maps indices `i` and `i + m/2` to the same position. + /// + /// ## Correctness + /// + /// Folding **never introduces false negatives**. Every bit that was set in the original + /// filter remains set in the folded filter (via bitwise OR). The only effect is a controlled + /// increase in FPP as set bits from different blocks are merged together. + /// This is was originally proven in [Sailhan & Stehr 2012] for standard bloom filters and is empirically + /// demonstrated for SBBFs in Lemma 1 and Lemma 2 of the tests. + /// + /// ## References + /// + /// [Sailhan & Stehr 2012]: https://doi.org/10.1109/GreenCom.2012.16 + pub fn fold_to_target_fpp(&mut self, target_fpp: f64) { + let num_folds = self.num_folds_for_target_fpp(target_fpp); + if num_folds > 0 { + self.fold_n(num_folds); + } + } + + /// Determine how many folds can be applied without exceeding `target_fpp`. + /// + /// Computes the average per-block fill rate in a single pass (no allocation), + /// then analytically estimates the FPP at each fold level. + /// + /// When two blocks with independent fill rate `f` are OR'd, the expected fill + /// of the merged block is `1 - (1-f)^2`. After `k` folds (merging `2^k` blocks): + /// + /// ```text + /// f_k = 1 - (1 - f)^(2^k) + /// ``` + /// + /// SBBF membership checks perform `k=8` bit checks within one 256-bit block, + /// so the estimated FPP at fold level k is `f_k^8`. + fn num_folds_for_target_fpp(&self, target_fpp: f64) -> u32 { + let len = self.0.len(); + if len < 2 { + return 0; + } + + // Single pass: compute average per-block fill rate. + let total_set_bits: u64 = self.0.iter().map(|b| u64::from(b.count_ones())).sum(); + let avg_fill = total_set_bits as f64 / (len as f64 * 256.0); + + // Empty filter: can fold all the way down. + if avg_fill == 0.0 { + return len.trailing_zeros(); + } + + // Find max folds where estimated FPP stays within target. + // f_k = 1 - (1 - avg_fill)^(2^k), FPP_k = f_k^8 + assert!( + len.is_power_of_two(), + "Number of blocks must be a power of 2 for folding" + ); + let max_folds = len.trailing_zeros(); // log2(len) since len is power of 2 + let one_minus_f = 1.0 - avg_fill; + let mut num_folds = 0u32; + let mut one_minus_fk = one_minus_f; // (1-f)^1 initially + + for _ in 0..max_folds { + // After one more fold: (1-f)^(2^(k+1)) = ((1-f)^(2^k))^2 + one_minus_fk = one_minus_fk * one_minus_fk; + let fk = 1.0 - one_minus_fk; + let estimated_fpp = fk.powi(8); + if estimated_fpp > target_fpp { + break; + } + num_folds += 1; + } + + num_folds + } + + /// Fold the filter `num_folds` times in a single pass. + /// + /// Merges groups of `2^num_folds` adjacent blocks via bitwise OR, producing + /// `len / 2^num_folds` output blocks. The original allocation is reused. + /// + /// # Panics + /// + /// Panics if `num_folds` is 0 or would reduce the filter below 1 block. + fn fold_n(&mut self, num_folds: u32) { + assert!(num_folds > 0, "num_folds must be at least 1"); + let len = self.0.len(); + let group_size = 1usize << num_folds; + assert!( + group_size <= len, + "Cannot fold {num_folds} times: need at least {group_size} blocks, have {len}" + ); + let new_len = len / group_size; + for i in 0..new_len { + let start = i * group_size; + let mut merged = self.0[start]; + for j in 1..group_size { + merged |= self.0[start + j]; + } + self.0[i] = merged; + } + self.0.truncate(new_len); + } + /// Reads a Sbff from Thrift encoded bytes /// /// # Examples @@ -603,6 +876,86 @@ mod tests { } } + #[test] + fn test_fold_n_halves_block_count() { + let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks + assert_eq!(sbbf.num_blocks(), 32); + sbbf.fold_n(1); + assert_eq!(sbbf.num_blocks(), 16); + sbbf.fold_n(1); + assert_eq!(sbbf.num_blocks(), 8); + } + + #[test] + fn test_fold_preserves_inserted_values() { + // Create a large filter, insert values, fold, verify no false negatives + let mut sbbf = Sbbf::new_with_num_of_bytes(32 * 1024); // 32KB = 1024 blocks + let values: Vec = (0..1000).map(|i| format!("value_{i}")).collect(); + for v in &values { + sbbf.insert(v.as_str()); + } + + // Fold several times + let original_blocks = sbbf.num_blocks(); + sbbf.fold_to_target_fpp(0.05); + assert!( + sbbf.num_blocks() < original_blocks, + "should have folded at least once" + ); + + // All inserted values must still be found (no false negatives) + for v in &values { + assert!( + sbbf.check(v.as_str()), + "Value '{}' missing after folding (false negative!)", + v + ); + } + } + + #[test] + fn test_fold_to_target_fpp_stops_before_exceeding_target() { + let mut sbbf = Sbbf::new_with_num_of_bytes(64 * 1024); // 64KB + // Insert enough values to set some bits + for i in 0..5000 { + sbbf.insert(&i); + } + + let target_fpp = 0.01; + sbbf.fold_to_target_fpp(target_fpp); + + // After folding, the estimated FPP should be at or below target + // (the current state should not exceed target — we stopped before that would happen) + let total_bits = (sbbf.num_blocks() * 256) as f64; + let set_bits: u64 = sbbf + .0 + .iter() + .flat_map(|b| b.0.iter()) + .map(|w| w.count_ones() as u64) + .sum(); + let fill = set_bits as f64 / total_bits; + let current_fpp = fill.powi(8); + assert!( + current_fpp <= target_fpp, + "FPP {current_fpp} exceeds target {target_fpp}" + ); + } + + #[test] + fn test_fold_empty_filter_folds_to_minimum() { + // An empty filter has fill=0, so estimated FPP is always 0 — should fold all the way down + let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks + sbbf.fold_to_target_fpp(0.01); + assert_eq!(sbbf.num_blocks(), 1); + } + + #[test] + #[should_panic(expected = "Cannot fold 1 times: need at least 2 blocks, have 1")] + fn test_fold_n_panics_at_minimum_size() { + let mut sbbf = Sbbf::new_with_num_of_bytes(32); // 1 block (minimum) + sbbf.fold_n(1); + } + #[test] fn test_sbbf_write_round_trip() { // Create a bloom filter with a 32-byte bitset (minimum size) @@ -641,4 +994,247 @@ mod tests { ); } } + + /// Prove that folding an SBBF by one level produces the exact same bits + /// as building a fresh filter at the smaller size from scratch. + /// + /// # What is folding? + /// + /// ```text + /// Original (N = 8 blocks): + /// ┌───┬───┬───┬───┬───┬───┬───┬───┐ + /// │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ + /// └─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┘ + /// │ │ │ │ │ │ │ │ + /// └─OR┘ └─OR┘ └─OR┘ └─OR┘ pair-wise OR + /// │ │ │ │ + /// ┌───┴──┬────┴──┬────┴──┬────┴──┐ + /// │ 0|1 │ 2|3 │ 4|5 │ 6|7 │ Folded (N/2 = 4 blocks) + /// └──────┴───────┴───────┴───────┘ + /// ``` + /// + /// # Why folded == fresh (the two lemmas) + /// + /// An SBBF insertion does two things with a 64-bit hash `h`: + /// + /// 1. **Pick a block** — uses the upper 32 bits via `hash_to_block_index` + /// 2. **Set 8 bits in that block** — uses the lower 32 bits via `Block::mask` + /// + /// **Lemma 1 (block index halves):** `hash_to_block_index` uses + /// `(upper32 * N) >> 32`. When N halves, the index halves too: + /// `index_in(N/2) == index_in(N) / 2`. So the hash lands in the same + /// destination block whether you fold or build fresh. + /// + /// **Lemma 2 (mask is size-independent):** `Block::mask(h as u32)` depends + /// only on the lower 32 bits and the fixed SALT constants — the filter + /// size N is not involved. So the same 8 bits get set regardless. + /// + /// Combined: every hash sets the *same bits* in the *same destination + /// block* whether you fold or build fresh → filters are bit-identical. + #[test] + fn test_sbbf_folded_equals_fresh() { + let values = (0..5000).map(|i| format!("elem_{i}")).collect::>(); + let hashes = values + .iter() + .map(|v| hash_as_bytes(v.as_str())) + .collect::>(); + + for num_blocks in [64, 256, 1024] { + let half = num_blocks / 2; + + // Build a filter with N blocks and insert all values. + let mut original = Sbbf::new_with_num_of_bytes(num_blocks * 32); + assert_eq!(original.num_blocks(), num_blocks); + for &h in &hashes { + original.insert_hash(h); + } + + // --- Per-hash verification of the two lemmas --- + for &h in hashes.iter() { + // mask(h as u32) gives the 8-bit pattern that this hash sets + // inside whichever block it lands in. It uses only the lower + // 32 bits of h, so it's the same regardless of filter size. + let mask = Block::mask(h as u32); + + // Lemma 1 check: the block index in the original N-block + // filter, divided by 2, should equal the block index in a + // fresh N/2-block filter. + let orig_idx = original.hash_to_block_index(h); + assert!(orig_idx < num_blocks); + + let fresh_idx = { + let tmp = Sbbf(vec![Block::ZERO; half]); + tmp.hash_to_block_index(h) + }; + let folded_idx = orig_idx / 2; + assert_eq!( + fresh_idx, folded_idx, + "Lemma 1 failed: fresh index {fresh_idx} != folded index {folded_idx}" + ); + + // Lemma 2 check: every bit that mask wants to set is actually + // present in the original block. + // + // mask.0[w] has exactly ONE bit set (see Block::mask: `1 << y`). + // The block at orig_idx has many bits set from many inserts, so + // we can't test equality — we test that the specific mask bit is + // *present*: + // + // block_word & mask_word != 0 + // ⟺ "the one bit in the mask is set in the block" + // + // (Since mask_word has exactly 1 bit, `& mask != 0` is the same + // as `& mask == mask` — but `!= 0` reads more naturally.) + for w in 0..8 { + assert_ne!( + original.0[orig_idx].0[w] & mask.0[w], + 0, + "Lemma 2 failed: mask bit not set in word {w} of block {orig_idx}" + ); + } + } + + // --- Final bit-identical comparison --- + // Fold the original N-block filter down to N/2 blocks. + let mut folded = original.clone(); + folded.fold_n(1); + assert_eq!(folded.num_blocks(), half); + + // Build a fresh N/2-block filter with the same values. + let mut fresh = Sbbf::new_with_num_of_bytes(half * 32); + for &h in &hashes { + fresh.insert_hash(h); + } + + // By lemmas 1 + 2, every block should be bit-identical. + for j in 0..half { + assert_eq!( + folded.0[j].0, fresh.0[j].0, + "Block {j} differs after fold (N={num_blocks} → {half})" + ); + } + } + } + + /// Inductive multi-step folding: folding k times from N blocks produces + /// a filter bit-identical to a fresh N/2^k-block filter. + /// + /// `test_sbbf_folded_equals_fresh` proves the base case (one fold). + /// This test applies folds *repeatedly*, checking after each step: + /// + /// ```text + /// 512 ─fold→ 256 ─fold→ 128 ─…→ 1 (9 folds total) + /// ``` + /// + /// At each intermediate size we build a fresh filter and assert + /// bit-equality, confirming the lemma composes across folds. + #[test] + fn test_multi_step_fold() { + let values = (0..3000).map(|i| format!("x_{i}")).collect::>(); + + // Start with a 512-block filter. + let mut filter = Sbbf::new_with_num_of_bytes(512 * 32); + for v in &values { + filter.insert(v.as_str()); + } + + // Fold one level at a time, comparing against a fresh filter each step. + for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] { + filter.fold_n(1); + assert_eq!(filter.num_blocks(), expected_blocks); + + let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32); + for v in &values { + fresh.insert(v.as_str()); + } + for (fb, rb) in filter.0.iter().zip(fresh.0.iter()) { + assert_eq!(fb.0, rb.0); + } + } + } + + /// test that the fpp estimator's overestimation doesn't cause fold_to_target_fpp + /// to produce significantly oversized filters + /// + /// compare the final size after folding against the theoretical optimal size + #[test] + fn test_fold_size_vs_optimal_fixed_size() { + for (ndv, target_fpp) in [ + (1000, 0.05), + (1000, 0.01), + (5000, 0.05), + (5000, 0.01), + (10000, 0.05), + ] { + let values = (0..ndv).map(|i| format!("d_{i}")).collect::>(); + + let mut folded = Sbbf::new_with_num_of_bytes(128 * 1024); // 128KB + for v in &values { + folded.insert(v.as_str()); + } + folded.fold_to_target_fpp(target_fpp); + + let folded_bytes = folded.num_blocks() * 32; + + let optimal = Sbbf::new_with_ndv_fpp(ndv as u64, target_fpp).unwrap(); + let optimal_bytes = optimal.num_blocks() * 32; + + let ratio = folded_bytes as f64 / optimal_bytes as f64; + + assert_eq!(ratio, 1.0); + } + } + + /// verify that a folded sbbf has the same empirical fpp as a fresh filter of the same size + /// this bridges the bit-identity proof above with the FPP guarantee from the folding paper + /// since the bits are identical, the false-positive rate must be too + /// + /// we measure fpp empirically by probing with values that were never inserted + /// and counting how many are incorrectly marked as present + #[test] + fn test_folded_fpp_matches_fresh_fpp() { + let ndv = 2000; + let num_probes = 50_000; + let inserted = (0..ndv) + .map(|i| format!("ins_{i}")) + .collect::>(); + + // probe values that were NOT inserted (different prefix guarantees no overlap) + let probes = (0..num_probes) + .map(|i| format!("probe_{i}")) + .collect::>(); + + // build a large filter and fold it down several times + let mut folded = Sbbf::new_with_num_of_bytes(512 * 32); // 512 blocks + for v in &inserted { + folded.insert(v.as_str()); + } + + // check FPP at each fold level + for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] { + folded.fold_n(1); + assert_eq!(folded.num_blocks(), expected_blocks); + + // build a fresh filter of the same size with the same values + let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32); + for v in &inserted { + fresh.insert(v.as_str()); + } + + // measure empirical FPP on both + let mut folded_fp = 0u64; + let mut fresh_fp = 0u64; + for p in &probes { + if folded.check(p.as_str()) { + folded_fp += 1; + } + if fresh.check(p.as_str()) { + fresh_fp += 1; + } + } + + // bit-identity means these must be exactly equal + assert_eq!(folded_fp, fresh_fp); + } + } } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 11d4f3142a20..ec1afca58335 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -138,6 +138,7 @@ pub struct ColumnValueEncoderImpl { min_value: Option, max_value: Option, bloom_filter: Option, + bloom_filter_target_fpp: f64, variable_length_bytes: Option, geo_stats_accumulator: Option>, } @@ -187,7 +188,9 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { type Values = [T::T]; fn flush_bloom_filter(&mut self) -> Option { - self.bloom_filter.take() + let mut sbbf = self.bloom_filter.take()?; + sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp); + Some(sbbf) } fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result { @@ -205,10 +208,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { let statistics_enabled = props.statistics_enabled(descr.path()); - let bloom_filter = props - .bloom_filter_properties(descr.path()) - .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) - .transpose()?; + let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?; let geo_stats_accumulator = try_new_geo_stats_accumulator(descr); @@ -219,6 +219,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { num_values: 0, statistics_enabled, bloom_filter, + bloom_filter_target_fpp, min_value: None, max_value: None, variable_length_bytes: None, @@ -384,6 +385,21 @@ fn replace_zero(val: &T, descr: &ColumnDescriptor, replace: } } +/// Creates a bloom filter sized for the column's configured NDV, returning the filter +/// and the target FPP for folding. +pub(crate) fn create_bloom_filter( + props: &WriterProperties, + descr: &ColumnDescPtr, +) -> Result<(Option, f64)> { + match props.bloom_filter_properties(descr.path()) { + Some(bf_props) => Ok(( + Some(Sbbf::new_with_ndv_fpp(bf_props.ndv, bf_props.fpp)?), + bf_props.fpp, + )), + None => Ok((None, 0.0)), + } +} + fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I) where T: ParquetValueType + 'a, diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 640a7a075d2f..65630cfed218 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -53,8 +53,14 @@ pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option = Some(64); /// Default value for [`BloomFilterProperties::fpp`] pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; -/// Default value for [`BloomFilterProperties::ndv`] -pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; +/// Default value for [`BloomFilterProperties::ndv`]. +/// +/// Note: this is only the fallback default used when constructing [`BloomFilterProperties`] +/// directly. When using [`WriterPropertiesBuilder`], columns with bloom filters enabled +/// but without an explicit NDV will have their NDV resolved at build time to +/// [`WriterProperties::max_row_group_row_count`], which may differ from this constant +/// if the user configured a custom row group size. +pub const DEFAULT_BLOOM_FILTER_NDV: u64 = DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64; /// Default values for [`WriterProperties::statistics_truncate_length`] pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = Some(64); /// Default value for [`WriterProperties::offset_index_disabled`] @@ -587,6 +593,18 @@ impl Default for WriterPropertiesBuilder { impl WriterPropertiesBuilder { /// Finalizes the configuration and returns immutable writer properties struct. pub fn build(self) -> WriterProperties { + // Resolve bloom filter NDV for columns where it wasn't explicitly set: + // default to max_row_group_row_count so the filter is never undersized. + let default_ndv = self + .max_row_group_row_count + .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64; + let mut default_column_properties = self.default_column_properties; + default_column_properties.resolve_bloom_filter_ndv(default_ndv); + let mut column_properties = self.column_properties; + for props in column_properties.values_mut() { + props.resolve_bloom_filter_ndv(default_ndv); + } + WriterProperties { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, @@ -597,8 +615,8 @@ impl WriterPropertiesBuilder { created_by: self.created_by, offset_index_disabled: self.offset_index_disabled, key_value_metadata: self.key_value_metadata, - default_column_properties: self.default_column_properties, - column_properties: self.column_properties, + default_column_properties, + column_properties, sorting_columns: self.sorting_columns, column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, @@ -996,8 +1014,13 @@ impl WriterPropertiesBuilder { self } - /// Sets default number of distinct values (ndv) for bloom filter for all - /// columns (defaults to `1_000_000` via [`DEFAULT_BLOOM_FILTER_NDV`]). + /// Sets default maximum expected number of distinct values (ndv) for bloom filter + /// for all columns (defaults to [`DEFAULT_BLOOM_FILTER_NDV`]). + /// + /// The bloom filter is initially sized for this many distinct values at the + /// configured FPP, then folded down after all values are inserted to achieve + /// optimal size. A good heuristic is to set this to the expected number of rows + /// in the row group. /// /// Implicitly enables bloom writing, as if [`set_bloom_filter_enabled`] had /// been called. @@ -1191,6 +1214,13 @@ impl Default for EnabledStatistics { } /// Controls the bloom filter to be computed by the writer. +/// +/// The bloom filter is initially sized for `ndv` distinct values at the given `fpp`, then +/// automatically folded down after all values are inserted to achieve optimal size while +/// maintaining the target `fpp`. See [`Sbbf::fold_to_target_fpp`] for details on the +/// folding algorithm. +/// +/// [`Sbbf::fold_to_target_fpp`]: crate::bloom_filter::Sbbf::fold_to_target_fpp #[derive(Debug, Clone, PartialEq)] pub struct BloomFilterProperties { /// False positive probability. This should be always between 0 and 1 exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`]. @@ -1201,20 +1231,30 @@ pub struct BloomFilterProperties { /// smaller the fpp, the more memory and disk space is required, thus setting it to a reasonable value /// e.g. 0.1, 0.05, or 0.001 is recommended. /// - /// Setting to a very small number diminishes the value of the filter itself, as the bitset size is - /// even larger than just storing the whole value. You are also expected to set `ndv` if it can - /// be known in advance to greatly reduce space usage. + /// This value also serves as the target FPP for bloom filter folding: after all values + /// are inserted, the filter is folded down to the smallest size that still meets this FPP. pub fpp: f64, - /// Number of distinct values, should be non-negative to be meaningful. Defaults to [`DEFAULT_BLOOM_FILTER_NDV`]. + /// Maximum expected number of distinct values. Defaults to [`DEFAULT_BLOOM_FILTER_NDV`]. /// /// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_ndv`]. /// - /// Usage of bloom filter is most beneficial for columns with large cardinality, so a good heuristic - /// is to set ndv to the number of rows. However, it can reduce disk size if you know in advance a smaller - /// number of distinct values. For very small ndv value it is probably not worth it to use bloom filter - /// anyway. - /// - /// Increasing this value (without increasing fpp) will result in an increase in disk or memory size. + /// When not explicitly set via the builder, this defaults to + /// [`max_row_group_row_count`](WriterProperties::max_row_group_row_count) (resolved at + /// build time). The bloom filter is initially sized for this many distinct values at the + /// given `fpp`, then folded down after insertion to achieve optimal size. A good heuristic + /// is to set this to the expected number of rows in the row group. If fewer distinct values + /// are actually written, the filter will be automatically compacted via folding. + /// + /// Thus the only negative side of overestimating this value is that the bloom filter + /// will use more memory during writing than necessary, but it will not affect the final + /// bloom filter size on disk. + /// + /// If you wish to reduce memory usage during writing and are able to make a reasonable estimate + /// of the number of distinct values in a row group, it is recommended to set this value explicitly + /// rather than relying on the default dynamic sizing based on `max_row_group_row_count`. + /// If you do set this value explicitly it is probably best to set it for each column + /// individually via [`WriterPropertiesBuilder::set_column_bloom_filter_ndv`] rather than globally, + /// since different columns may have different numbers of distinct values. pub ndv: u64, } @@ -1242,6 +1282,8 @@ struct ColumnProperties { write_page_header_statistics: Option, /// bloom filter related properties bloom_filter_properties: Option, + /// Whether the bloom filter NDV was explicitly set by the user + bloom_filter_ndv_is_set: bool, } impl ColumnProperties { @@ -1319,12 +1361,13 @@ impl ColumnProperties { .fpp = value; } - /// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly - /// enables bloom filter if not previously enabled. + /// Sets the maximum expected number of distinct (unique) values for bloom filter for this + /// column, and implicitly enables bloom filter if not previously enabled. fn set_bloom_filter_ndv(&mut self, value: u64) { self.bloom_filter_properties .get_or_insert_with(Default::default) .ndv = value; + self.bloom_filter_ndv_is_set = true; } /// Returns optional encoding for this column. @@ -1372,6 +1415,16 @@ impl ColumnProperties { fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> { self.bloom_filter_properties.as_ref() } + + /// If bloom filter is enabled and NDV was not explicitly set, resolve it to the + /// given `default_ndv` (typically derived from `max_row_group_row_count`). + fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) { + if !self.bloom_filter_ndv_is_set { + if let Some(ref mut bf) = self.bloom_filter_properties { + bf.ndv = default_ndv; + } + } + } } /// Reference counted reader properties. @@ -1703,8 +1756,8 @@ mod tests { assert_eq!( props.bloom_filter_properties(&ColumnPath::from("col")), Some(&BloomFilterProperties { - fpp: 0.05, - ndv: 1_000_000_u64 + fpp: DEFAULT_BLOOM_FILTER_FPP, + ndv: DEFAULT_BLOOM_FILTER_NDV, }) ); } @@ -1746,8 +1799,8 @@ mod tests { .build() .bloom_filter_properties(&ColumnPath::from("col")), Some(&BloomFilterProperties { - fpp: 0.05, - ndv: 100 + fpp: DEFAULT_BLOOM_FILTER_FPP, + ndv: 100, }) ); assert_eq!( @@ -1757,7 +1810,7 @@ mod tests { .bloom_filter_properties(&ColumnPath::from("col")), Some(&BloomFilterProperties { fpp: 0.1, - ndv: 1_000_000_u64 + ndv: DEFAULT_BLOOM_FILTER_NDV, }) ); }