diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index f56f9570adfb..36b8945fdf5d 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -21,10 +21,12 @@ use crate::column::writer::encoder::{ ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter, }; use crate::data_type::{AsBytes, ByteArray, Int32Type}; -use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; +use crate::encodings::encoding::{DeltaBitPackEncoder, DictFallbackCounter, Encoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; +use crate::file::properties::{ + DictionaryFallback, EnabledStatistics, WriterProperties, WriterVersion, +}; use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator}; use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; @@ -421,6 +423,7 @@ impl DictEncoder { pub struct ByteArrayEncoder { fallback: FallbackEncoder, dict_encoder: Option, + dict_fallback_counter: Option, statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, @@ -442,10 +445,17 @@ impl ColumnValueEncoder for ByteArrayEncoder { where Self: Sized, { - let dictionary = props + let dict_encoder = props .dictionary_enabled(descr.path()) .then(DictEncoder::default); + let dict_fallback_counter = match props.dictionary_fallback(descr.path()) { + DictionaryFallback::OnUnfavorableAfter(min_sample_len) if dict_encoder.is_some() => { + Some(DictFallbackCounter::new(descr, min_sample_len)) + } + _ => None, + }; + let fallback = FallbackEncoder::new(descr, props)?; let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?; @@ -459,7 +469,8 @@ impl ColumnValueEncoder for ByteArrayEncoder { statistics_enabled, bloom_filter, bloom_filter_target_fpp, - dict_encoder: dictionary, + dict_encoder, + dict_fallback_counter, min_value: None, max_value: None, geo_stats_accumulator, @@ -521,6 +532,20 @@ impl ColumnValueEncoder for ByteArrayEncoder { } } + fn is_dict_encoding_unfavorable(&self) -> Option { + match (&self.dict_encoder, &self.dict_fallback_counter) { + (Some(encoder), Some(counter)) => { + let dict_size = encoder.estimated_dict_page_size(); + counter.is_dict_encoding_unfavorable(dict_size) + } + _ => None, + } + } + + fn disable_dict_fallback_accounting(&mut self) { + self.dict_fallback_counter = None; + } + fn flush_dict_page(&mut self) -> Result> { match self.dict_encoder.take() { Some(encoder) => { @@ -530,6 +555,8 @@ impl ColumnValueEncoder for ByteArrayEncoder { )); } + self.dict_fallback_counter = None; + Ok(Some(encoder.flush_dict_page())) } _ => Ok(None), @@ -541,7 +568,13 @@ impl ColumnValueEncoder for ByteArrayEncoder { let max_value = self.max_value.take(); match &mut self.dict_encoder { - Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)), + Some(encoder) => { + let data_page = encoder.flush_data_page(min_value, max_value); + if let Some(counter) = self.dict_fallback_counter.as_mut() { + counter.commit_page(&data_page); + } + Ok(data_page) + } _ => self.fallback.flush_data_page(min_value, max_value), } } @@ -582,7 +615,15 @@ where } match &mut encoder.dict_encoder { - Some(dict_encoder) => dict_encoder.encode(values, indices), + Some(dict_encoder) => { + dict_encoder.encode(values, indices); + if let Some(counter) = encoder.dict_fallback_counter.as_mut() { + for idx in indices { + let value = values.value(*idx); + counter.update_byte_array(value.as_ref()); + } + } + } None => encoder.fallback.encode(values, indices), } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8422263b1f63..70cf6dc978ed 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1669,6 +1669,7 @@ mod tests { use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY}; + use crate::basic::PageType; use crate::column::page::{Page, PageReader}; use crate::file::metadata::thrift::PageHeader; use crate::file::page_index::column_index::ColumnIndexMetaData; @@ -1691,13 +1692,14 @@ mod tests { use crate::data_type::AsBytes; use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader}; use crate::file::properties::{ - BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, + BloomFilterPosition, DictionaryFallback, EnabledStatistics, ReaderProperties, WriterVersion, }; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, }; + use crate::record::RowAccessor; #[test] fn arrow_writer() { @@ -2572,6 +2574,91 @@ mod tests { ); } + #[test] + fn arrow_writer_dictionary_fallback_on_unfavorable_compression() { + let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + + let mut builder = StringBuilder::with_capacity(100, 329 * 10_000); + + // Generate an array of 10 unique 10 character strings. + // This results in a dictionary encoding larger than the plain encoded data, + // which should trigger a fallback to PLAIN encoding. + for i in 0..10 { + let value = i + .to_string() + .repeat(10) + .chars() + .take(10) + .collect::(); + + builder.append_value(value); + } + + let array = Arc::new(builder.finish()); + + let batch = RecordBatch::try_new(schema, vec![array.clone()]).unwrap(); + + let file = tempfile::tempfile().unwrap(); + + // Set dictionary fallback to trigger fallback to PLAIN encoding on unfavorable compression + let props = WriterProperties::builder() + .set_dictionary_fallback(DictionaryFallback::OnUnfavorableAfter(1)) + .set_data_page_row_count_limit(2) + .set_write_batch_size(1) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)) + .expect("Unable to write file"); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let options = ReadOptionsBuilder::new() + .with_encoding_stats_as_mask(false) + .build(); + let reader = + SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(); + + let column = reader.metadata().row_group(0).columns(); + + assert_eq!(column.len(), 1); + + // check page encoding stats, should be one dict page, one dict encoded page, and 5 + // plain encoded pages + let stats = column[0].page_encoding_stats().unwrap(); + assert!( + stats + .iter() + .any(|s| s.page_type == PageType::DICTIONARY_PAGE), + "stats are {stats:?}" + ); + let num_dict_encoded: i32 = stats + .iter() + .filter(|s| { + s.page_type == PageType::DATA_PAGE && s.encoding == Encoding::RLE_DICTIONARY + }) + .map(|s| s.count) + .sum(); + assert_eq!(num_dict_encoded, 1); + let num_plain_encoded: i32 = stats + .iter() + .filter(|s| s.page_type == PageType::DATA_PAGE && s.encoding == Encoding::PLAIN) + .map(|s| s.count) + .sum(); + assert_eq!(num_plain_encoded, 5); + + // Read back the values and confirm they match the original array. + let rows: Vec<_> = reader + .get_row_iter(None) + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!(rows.len(), array.len()); + for (i, row) in rows.iter().enumerate() { + assert_eq!(row.get_string(0).unwrap(), array.value(i)); + } + } + #[test] fn arrow_writer_float_nans() { let f16_field = Field::new("a", DataType::Float16, false); @@ -4789,6 +4876,15 @@ mod tests { assert_eq!(chunk_page_stats, file_page_stats); } + fn get_dict_page_size(meta: &ColumnChunkMetaData, data: Bytes) -> usize { + let mut reader = SerializedPageReader::new(Arc::new(data), meta, 0, None).unwrap(); + let page = reader.get_next_page().unwrap().unwrap(); + match page { + Page::DictionaryPage { buf, .. } => buf.len(), + _ => panic!("expected DictionaryPage"), + } + } + #[test] fn test_different_dict_page_size_limit() { let array = Arc::new(Int64Array::from_iter(0..1024 * 1024)); @@ -4813,18 +4909,67 @@ mod tests { let col0_meta = metadata.row_group(0).column(0); let col1_meta = metadata.row_group(0).column(1); - let get_dict_page_size = move |meta: &ColumnChunkMetaData| { - let mut reader = - SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap(); - let page = reader.get_next_page().unwrap().unwrap(); - match page { - Page::DictionaryPage { buf, .. } => buf.len(), - _ => panic!("expected DictionaryPage"), - } - }; + assert_eq!(get_dict_page_size(col0_meta, data.clone()), 1024 * 1024); + assert_eq!(get_dict_page_size(col1_meta, data.clone()), 1024 * 1024 * 4); + } + + #[test] + fn test_dict_page_size_decided_by_compression_fallback() { + // Generate values that are well dispersed across a range approximating (0..256 * 1024) + let array = Arc::new(Int32Array::from_iter( + (0i32..1024 * 1024).map(|x| x.wrapping_mul(163019) % 262139), + )); + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + arrow_schema::DataType::Int32, + false, + )])); + let batch = arrow_array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_page_size_limit(1024 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let file_length_dict = data.len(); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let full_dict_meta = metadata.row_group(0).column(0); + assert_eq!(get_dict_page_size(full_dict_meta, data.clone()), 1_048_576); - assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024); - assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); + let props = WriterProperties::builder() + .set_dictionary_page_size_limit(1024 * 1024) + .set_column_dictionary_fallback( + ColumnPath::from("col0"), + DictionaryFallback::OnUnfavorableAfter(32_768), + ) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let file_length_fallback = data.len(); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let fallback_meta = metadata.row_group(0).column(0); + assert_eq!( + get_dict_page_size(fallback_meta, data.clone()), + 32_768 * std::mem::size_of::() + ); + + let compression_ratio = file_length_fallback as f64 / file_length_dict as f64; + assert!( + compression_ratio < 0.9, + "File encoded with dictionary fallback encoding does not result in sufficient compression, + got {file_length_fallback} vs {file_length_dict} ({:.2}%)", + compression_ratio * 100.0 + ); } struct WriteBatchesShape { diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index ec1afca58335..d1ce1a024a78 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -25,9 +25,9 @@ use crate::column::writer::{ }; use crate::data_type::DataType; use crate::data_type::private::ParquetValueType; -use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder}; +use crate::encodings::encoding::{DictEncoder, DictFallbackCounter, Encoder, get_encoder}; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{EnabledStatistics, WriterProperties}; +use crate::file::properties::{DictionaryFallback, EnabledStatistics, WriterProperties}; use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator}; use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; @@ -109,6 +109,18 @@ pub trait ColumnValueEncoder { /// + fn estimated_data_page_size(&self) -> usize; + /// Returns `Some(true)` if the estimated size of plainly encoded data, in bytes, + /// would be smaller than the size of data encoded with a dictionary. + /// If the estimate does not show a clear benefit, returns `Some(false)`. + /// If there is no dictionary, or the data size statistic is not available, + /// or it is not yet possible to make an estimate due to insufficient + /// collected data, returns `None`. + /// For similar logic in parquet-java, + /// see `DictionaryValuesWriter.isCompressionSatisfying`. + fn is_dict_encoding_unfavorable(&self) -> Option; + + fn disable_dict_fallback_accounting(&mut self); + /// Flush the dictionary page for this column chunk if any. Any subsequent calls to /// [`Self::write`] will not be dictionary encoded /// @@ -132,6 +144,7 @@ pub trait ColumnValueEncoder { pub struct ColumnValueEncoderImpl { encoder: Box>, dict_encoder: Option>, + dict_fallback_counter: Option, descr: ColumnDescPtr, num_values: usize, statistics_enabled: EnabledStatistics, @@ -176,7 +189,13 @@ impl ColumnValueEncoderImpl { } match &mut self.dict_encoder { - Some(encoder) => encoder.put(slice), + Some(encoder) => { + encoder.put(slice)?; + if let Some(counter) = self.dict_fallback_counter.as_mut() { + counter.update_values(slice); + } + Ok(()) + } _ => self.encoder.put(slice), } } @@ -197,6 +216,12 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { let dict_supported = props.dictionary_enabled(descr.path()) && has_dictionary_support(T::get_physical_type(), props); let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone())); + let dict_fallback_counter = match props.dictionary_fallback(descr.path()) { + DictionaryFallback::OnUnfavorableAfter(min_sample_len) if dict_encoder.is_some() => { + Some(DictFallbackCounter::new(descr, min_sample_len)) + } + _ => None, + }; // Set either main encoder or fallback encoder. let encoder = get_encoder( @@ -215,6 +240,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { Ok(Self { encoder, dict_encoder, + dict_fallback_counter, descr: descr.clone(), num_values: 0, statistics_enabled, @@ -284,6 +310,20 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } + fn is_dict_encoding_unfavorable(&self) -> Option { + match (&self.dict_encoder, &self.dict_fallback_counter) { + (Some(encoder), Some(counter)) => { + let dict_size = encoder.dict_encoded_size(); + counter.is_dict_encoding_unfavorable(dict_size) + } + _ => None, + } + } + + fn disable_dict_fallback_accounting(&mut self) { + self.dict_fallback_counter = None; + } + fn flush_dict_page(&mut self) -> Result> { match self.dict_encoder.take() { Some(encoder) => { @@ -293,6 +333,8 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { )); } + self.dict_fallback_counter = None; + let buf = encoder.write_dict()?; Ok(Some(DictionaryPage { @@ -311,14 +353,20 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { _ => (self.encoder.flush_buffer()?, self.encoder.encoding()), }; - Ok(DataPageValues { + let page = DataPageValues { buf, encoding, num_values: std::mem::take(&mut self.num_values), min_value: self.min_value.take(), max_value: self.max_value.take(), variable_length_bytes: self.variable_length_bytes.take(), - }) + }; + + if let Some(counter) = self.dict_fallback_counter.as_mut() { + counter.commit_page(&page); + } + + Ok(page) } fn flush_geospatial_statistics(&mut self) -> Option> { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 46f90d3f7762..bfb94acb6fc6 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -746,18 +746,32 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Returns true if we need to fall back to non-dictionary encoding. /// - /// We can only fall back if dictionary encoder is set and we have exceeded dictionary - /// size. - #[inline] - fn should_dict_fallback(&self) -> bool { - match self.encoder.estimated_dict_page_size() { - Some(size) => { - size >= self - .props - .column_dictionary_page_size_limit(self.descr.path()) - } - None => false, + /// The behavior is governed by the `dictionary_fallback` column property. + fn should_dict_fallback(&mut self) -> bool { + let dict_size = match self.encoder.estimated_dict_page_size() { + Some(size) => size, + None => return false, + }; + + // First check: dictionary size exceeds limit + if dict_size + >= self + .props + .column_dictionary_page_size_limit(self.descr.path()) + { + return true; } + + // Second check, if enabled: the compression heuristic. + if let Some(should_fallback) = self.encoder.is_dict_encoding_unfavorable() { + // The decision on the efficiency is made after processing + // the requisite number of values, so disable further accounting + // to avoid the overhead. + self.encoder.disable_dict_fallback_accounting(); + return should_fallback; + } + + false } /// Returns true if there is enough data for a data page, false otherwise. diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index d8c7b9201389..2649bf29d060 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -57,6 +57,9 @@ const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS; const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS; impl Int96 { + /// Size of an INT96 value in bytes. + pub const SIZE_IN_BYTES: usize = std::mem::size_of::<[u32; 3]>(); + /// Creates new INT96 type struct with no data set. pub fn new() -> Self { Self { value: [0; 3] } @@ -721,10 +724,12 @@ pub(crate) mod private { fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result; - /// Return the encoded size for a type - fn dict_encoding_size(&self) -> (usize, usize) { - (std::mem::size_of::(), 1) - } + /// Return the size in bytes for the value encoded in the plain encoding. + /// + /// This method is only used with the dictionary encoding. Since the writer + /// does not use the dictionary encoding for BOOLEAN type, this method's + /// implementation for bool will panic if called. + fn plain_encoded_size(&self) -> usize; /// Return the number of variable length bytes in a given slice of data /// @@ -803,6 +808,10 @@ pub(crate) mod private { Ok(values_read) } + fn plain_encoded_size(&self) -> usize { + panic!("dictionary encoding should not be used for BOOLEAN type") + } + #[inline] fn as_i64(&self) -> Result { Ok(*self as i64) @@ -887,6 +896,10 @@ pub(crate) mod private { Ok(num_values) } + fn plain_encoded_size(&self) -> usize { + std::mem::size_of::() + } + #[inline] fn as_i64(&$self) -> Result { $as_i64 @@ -984,6 +997,10 @@ pub(crate) mod private { Ok(num_values) } + fn plain_encoded_size(&self) -> usize { + Self::SIZE_IN_BYTES + } + #[inline] fn as_any(&self) -> &dyn std::any::Any { self @@ -1071,9 +1088,8 @@ pub(crate) mod private { Ok(num_values) } - #[inline] - fn dict_encoding_size(&self) -> (usize, usize) { - (std::mem::size_of::(), self.len()) + fn plain_encoded_size(&self) -> usize { + std::mem::size_of::() + self.len() } #[inline] @@ -1171,9 +1187,11 @@ pub(crate) mod private { Ok(num_values) } - #[inline] - fn dict_encoding_size(&self) -> (usize, usize) { - (std::mem::size_of::(), self.len()) + fn plain_encoded_size(&self) -> usize { + // The encoding of fixed-length byte arrays only encodes the bytes + // without a length prefix. In practice, the length of fixed-length + // column values is taken from the column metadata and this call is avoided. + self.len() } #[inline] diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 79a1f247670c..28abcc2e989b 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -49,12 +49,9 @@ impl Storage for KeyStorage { } fn push(&mut self, value: &Self::Value) -> Self::Key { - let (base_size, num_elements) = value.dict_encoding_size(); - let unique_size = match T::get_physical_type() { - Type::BYTE_ARRAY => base_size + num_elements, Type::FIXED_LEN_BYTE_ARRAY => self.type_length, - _ => base_size, + _ => value.plain_encoded_size(), }; self.size_in_bytes += unique_size; diff --git a/parquet/src/encodings/encoding/dict_fallback.rs b/parquet/src/encodings/encoding/dict_fallback.rs new file mode 100644 index 000000000000..9d9f9c438a21 --- /dev/null +++ b/parquet/src/encodings/encoding/dict_fallback.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::basic::{Encoding, Type}; +use crate::column::writer::encoder::DataPageValues; +use crate::data_type::Int96; +use crate::data_type::private::ParquetValueType; +use crate::schema::types::ColumnDescriptor; + +// TODO: it's possible to tighten the worst-case estimate for fallback encodings +// other than PLAIN, e.g. by estimating the bit widths of delta encoded values, +// possibly making use of the min/max statistics if they are available. +// This would require more complex logic in the counter, and it's not clear if +// the improvement in the heuristic would be worth it. +// The size of the plain encoding should be a reasonable bottom estimate +// for any fallback encoding. + +/// A helper to estimate the favorability of the dictionary encoding +/// compared to a pessimistic estimate of the size of data encoded without +/// the dictionary. +/// +/// This is used to enhance the dictionary fallback heuristic with the logic +/// that the writer should fall back to a non-dictionary encoding when, +/// after encoding a prescribed minimum number of values, the worst case on +/// the size of data encoded without the dictionary is calculated as smaller +/// than `(encodedSize + dictionarySize)`. +pub struct DictFallbackCounter { + // Estimated size of the data encoded without the dictionary, in bytes. + raw_data_size: usize, + // Size of the data encoded with the dictionary, in bytes. + encoded_data_size: usize, + // Number of values passed to the counter. + num_values: usize, + // Minimum number of values to sample before + // the counter can return a favorable estimate for fallback. + min_sample_len: usize, + // Cached type length to improve performance for fixed-length types. + type_length: usize, +} + +impl DictFallbackCounter { + pub fn new(desc: &ColumnDescriptor, min_sample_len: usize) -> Self { + Self { + raw_data_size: 0, + encoded_data_size: 0, + num_values: 0, + min_sample_len, + type_length: desc.type_length() as usize, + } + } + + /// Updates the counter with the given slice of values. + pub fn update_values(&mut self, values: &[T]) { + let raw_size = match T::PHYSICAL_TYPE { + Type::INT32 | Type::FLOAT => 4 * values.len(), + Type::INT64 | Type::DOUBLE => 8 * values.len(), + Type::INT96 => Int96::SIZE_IN_BYTES * values.len(), + Type::BYTE_ARRAY => { + // For variable-length types, the length prefix and the actual data are are encoded. + values.iter().map(|value| value.plain_encoded_size()).sum() + } + Type::FIXED_LEN_BYTE_ARRAY => self.type_length * values.len(), + Type::BOOLEAN => panic!("dictionary encoding should not be used for BOOLEAN type"), + }; + self.raw_data_size = self.raw_data_size.saturating_add(raw_size); + self.num_values += values.len(); + } + + /// Like `update_values`, but specialized for byte array data exposed by Arrow + /// array accessors. Updates the counter with the single given byte array value. + #[cfg(feature = "arrow")] + #[inline] + pub fn update_byte_array(&mut self, value: &[u8]) { + let raw_size = std::mem::size_of::() + value.len(); + self.raw_data_size = self.raw_data_size.saturating_add(raw_size); + self.num_values += 1; + } + + /// Increments the total counted size of dictionary encoded data + /// for a page encoded with the dictionary encoding. + pub fn commit_page(&mut self, page: &DataPageValues) + where + T: ParquetValueType, + { + assert_eq!( + page.encoding, + Encoding::RLE_DICTIONARY, + "should only be used with the dictionary encoder" + ); + self.encoded_data_size = self.encoded_data_size.saturating_add(page.buf.len()); + } + + /// If the number of dictionary encoded values accounted so far + /// reaches or exceeds the configured minimum, returns true to indicate + /// that the counting should be stopped, otherwise returns false. + fn min_sample_len_reached(&self) -> bool { + self.num_values >= self.min_sample_len + } + + /// Returns `Some(true)` if the estimated size of plainly encoded data, in bytes, + /// would not exceed the size of data encoded with a dictionary, + /// as counted by the `commit_page` calls made on this counter and the provided size of + /// the encoded dictionary page. + /// This method returns `None` until the minimum number of values given in + /// `DictFallbackCounter::new` has been processed. The third alternative, + /// `Some(false)`, indicates that the sample size is sufficient, but the dictionary encoding + /// is not-unfavorable, that is, the collected metrics show no clear advantage in falling + /// back to plain (or other, presumably more efficient) encoding. + #[inline] + pub fn is_dict_encoding_unfavorable(&self, dict_encoded_size: usize) -> Option { + self.min_sample_len_reached().then_some( + self.raw_data_size <= dict_encoded_size.saturating_add(self.encoded_data_size), + ) + } +} diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index eeabcf4ba5ce..6d5f5fade88a 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -29,10 +29,13 @@ use crate::util::bit_util::{BitWriter, num_required_bits}; use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder}; use bytes::Bytes; -pub use dict_encoder::DictEncoder; mod byte_stream_split_encoder; mod dict_encoder; +mod dict_fallback; + +pub use dict_encoder::DictEncoder; +pub use dict_fallback::DictFallbackCounter; // ---------------------------------------------------------------------- // Encoders @@ -810,7 +813,6 @@ mod tests { #[test] fn test_bool() { BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); - BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1); } @@ -878,8 +880,7 @@ mod tests { assert_eq!(encoder.dict_encoded_size(), expected_size); } - // Only 2 variations of values 1 byte each - run_test::(-1, &[true, false, true, false, true], 2); + // Dictionary encoding is not supported for BoolType, so we don't test it here. run_test::(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20); run_test::(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40); run_test::(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 65630cfed218..fa320c62ce0d 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -35,6 +35,8 @@ pub const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0; pub const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED; /// Default value for [`WriterProperties::dictionary_enabled`] pub const DEFAULT_DICTIONARY_ENABLED: bool = true; +/// Default value for [`WriterProperties::dictionary_fallback`] +pub const DEFAULT_DICTIONARY_FALLBACK: DictionaryFallback = DictionaryFallback::OnPageSizeLimit; /// Default value for [`WriterProperties::dictionary_page_size_limit`] pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE; /// Default value for [`WriterProperties::data_page_row_count_limit`] @@ -491,6 +493,17 @@ impl WriterProperties { .unwrap_or(DEFAULT_DICTIONARY_ENABLED) } + /// Returns the dictionary fallback behavior for a column. + /// + /// For more details see [`WriterPropertiesBuilder::set_dictionary_fallback`] + pub fn dictionary_fallback(&self, col: &ColumnPath) -> DictionaryFallback { + self.column_properties + .get(col) + .and_then(|c| c.dictionary_fallback()) + .or_else(|| self.default_column_properties.dictionary_fallback()) + .unwrap_or(DEFAULT_DICTIONARY_FALLBACK) + } + /// Returns which statistics are written for a column. /// /// For more details see [`WriterPropertiesBuilder::set_statistics_enabled`] @@ -915,6 +928,16 @@ impl WriterPropertiesBuilder { self } + /// Sets default behavior to control dictionary fallback for all columns + /// (defaults to [`OnPageSizeLimit`] via [`DEFAULT_DICTIONARY_FALLBACK`]). + /// + /// [`OnPageSizeLimit`]: DictionaryFallback::OnPageSizeLimit + pub fn set_dictionary_fallback(mut self, behavior: DictionaryFallback) -> Self { + self.default_column_properties + .set_dictionary_fallback(behavior); + self + } + /// Sets best effort maximum dictionary page size, in bytes (defaults to `1024 * 1024` /// via [`DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT`]). /// @@ -1073,6 +1096,18 @@ impl WriterPropertiesBuilder { self } + /// Sets dictionary fallback behavior for a specific column. + /// + /// Takes precedence over [`Self::set_dictionary_fallback`]. + pub fn set_column_dictionary_fallback( + mut self, + col: ColumnPath, + behavior: DictionaryFallback, + ) -> Self { + self.get_mut_props(col).set_dictionary_fallback(behavior); + self + } + /// Sets dictionary page size limit for a specific column. /// /// Takes precedence over [`Self::set_dictionary_page_size_limit`]. @@ -1163,6 +1198,26 @@ impl From for WriterPropertiesBuilder { } } +/// Controls the behavior of the writer in deciding whether to fall back to non-dictionary encoding. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[non_exhaustive] +pub enum DictionaryFallback { + /// Fall back to non-dictionary encoding only if the dictionary page size limit is exceeded. + OnPageSizeLimit, + /// Fall back to non-dictionary encoding if the dictionary page size limit is exceeded, + /// or if the dictionary encoding upon encoding at least the given number of values + /// is larger than the plain encoding for the same data. The latter check is performed once + /// per column chunk, so the encoding efficiency may still degrade with subsequent pages in + /// the same column chunk. + OnUnfavorableAfter(usize), +} + +impl Default for DictionaryFallback { + fn default() -> Self { + DEFAULT_DICTIONARY_FALLBACK + } +} + /// Controls the level of statistics to be computed by the writer and stored in /// the parquet file. /// @@ -1278,6 +1333,7 @@ struct ColumnProperties { data_page_size_limit: Option, dictionary_page_size_limit: Option, dictionary_enabled: Option, + dictionary_fallback: Option, statistics_enabled: Option, write_page_header_statistics: Option, /// bloom filter related properties @@ -1323,6 +1379,11 @@ impl ColumnProperties { self.dictionary_page_size_limit = Some(value); } + /// Sets the dictionary fallback behavior for this column. + fn set_dictionary_fallback(&mut self, value: DictionaryFallback) { + self.dictionary_fallback = Some(value); + } + /// Sets the statistics level for this column. fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) { self.statistics_enabled = Some(enabled); @@ -1392,6 +1453,11 @@ impl ColumnProperties { self.dictionary_page_size_limit } + /// Returns optional dictionary fallback behavior for this column. + fn dictionary_fallback(&self) -> Option { + self.dictionary_fallback + } + /// Returns optional data page size limit for this column. fn data_page_size_limit(&self) -> Option { self.data_page_size_limit @@ -1585,6 +1651,10 @@ mod tests { props.dictionary_enabled(&ColumnPath::from("col")), DEFAULT_DICTIONARY_ENABLED ); + assert_eq!( + props.dictionary_fallback(&ColumnPath::from("col")), + DEFAULT_DICTIONARY_FALLBACK + ); assert_eq!( props.statistics_enabled(&ColumnPath::from("col")), DEFAULT_STATISTICS_ENABLED @@ -1666,11 +1736,16 @@ mod tests { .set_encoding(Encoding::DELTA_BINARY_PACKED) .set_compression(Compression::GZIP(Default::default())) .set_dictionary_enabled(false) + .set_dictionary_fallback(DictionaryFallback::OnUnfavorableAfter(1024)) .set_statistics_enabled(EnabledStatistics::None) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) .set_column_dictionary_enabled(ColumnPath::from("col"), true) + .set_column_dictionary_fallback( + ColumnPath::from("col"), + DictionaryFallback::OnUnfavorableAfter(2048), + ) .set_column_statistics_enabled(ColumnPath::from("col"), EnabledStatistics::Chunk) .set_column_bloom_filter_enabled(ColumnPath::from("col"), true) .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64) @@ -1700,6 +1775,10 @@ mod tests { Compression::GZIP(Default::default()) ); assert!(!props.dictionary_enabled(&ColumnPath::from("a"))); + assert_eq!( + props.dictionary_fallback(&ColumnPath::from("a")), + DictionaryFallback::OnUnfavorableAfter(1024) + ); assert_eq!( props.statistics_enabled(&ColumnPath::from("a")), EnabledStatistics::None @@ -1714,6 +1793,10 @@ mod tests { Compression::SNAPPY ); assert!(props.dictionary_enabled(&ColumnPath::from("col"))); + assert_eq!( + props.dictionary_fallback(&ColumnPath::from("col")), + DictionaryFallback::OnUnfavorableAfter(2048) + ); assert_eq!( props.statistics_enabled(&ColumnPath::from("col")), EnabledStatistics::Chunk