Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d7700cf
feat(parquet): dictionary fallback heuristics
mzabaluev Apr 9, 2026
3aacbe5
test: fallback with OnUnfavorableCompression
mzabaluev Apr 13, 2026
83ded36
refactor(parquet): simplify dict_encoding_size
mzabaluev Apr 13, 2026
674a1b0
fix: unset plain data counter when flushing dict
mzabaluev Apr 13, 2026
13e042e
chore: fix clippy
mzabaluev Apr 13, 2026
becdcef
chore: license on plain_counter
mzabaluev Apr 13, 2026
1de7b01
refactor(parquet): dict_encoded_size cleanup
mzabaluev Apr 14, 2026
701ff2b
chore(parquet): rename uncompressed_data_size
mzabaluev Apr 14, 2026
1b6dd37
test: compression fallback wins
mzabaluev Apr 15, 2026
da73778
feat: DictionaryFallback::OnUnfavorableAfter
mzabaluev Apr 16, 2026
b392738
refactor: more compact plain data counter init
mzabaluev Apr 16, 2026
bd914bb
test(parquet): fix up compression fallback
mzabaluev Apr 16, 2026
898e7e5
test(parquet): test dictionary_fallback property
mzabaluev Apr 16, 2026
1e81173
chore: suggestions from code review
mzabaluev Apr 16, 2026
34b819d
chore: add missing import
mzabaluev Apr 17, 2026
692018e
chore: rename dict_encoding_size
mzabaluev Apr 17, 2026
4511917
fix: rework plain_counter to DictFallbackCounter
mzabaluev Apr 17, 2026
289e4e1
test: adjust dict fallback tests as per review
mzabaluev Apr 17, 2026
3ff12c8
feat: make dict fallback decision one-off
mzabaluev Apr 17, 2026
1fcea02
refactor: revert counter disabling, rename
mzabaluev Apr 20, 2026
389d038
test: verify encoded data after dict fallback
mzabaluev Apr 20, 2026
8873bc1
perf(parquet): eager cutoff for fallback counter
mzabaluev Apr 20, 2026
8ba290b
test: assert efficiency of dictionary fallback
mzabaluev Apr 21, 2026
1827d04
test: remove page stats printout
mzabaluev Apr 21, 2026
87a19c4
docs: clarify purpose of plain_encoded_size
mzabaluev Apr 21, 2026
14ab09e
refactor: consistently panic on bool
mzabaluev Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
};
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder, PlainDataSizeCounter};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::properties::{
DictionaryFallback, EnabledStatistics, WriterProperties, WriterVersion,
};
use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
Expand Down Expand Up @@ -421,6 +423,7 @@ impl DictEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
plain_data_size_counter: Option<PlainDataSizeCounter>,
statistics_enabled: EnabledStatistics,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
Expand All @@ -446,6 +449,17 @@ impl ColumnValueEncoder for ByteArrayEncoder {
.dictionary_enabled(descr.path())
.then(DictEncoder::default);

let plain_data_size_counter = match props.dictionary_fallback(descr.path()) {
DictionaryFallback::OnPageSizeLimit => None,
DictionaryFallback::OnUnfavorableCompression => {
if dictionary.is_some() {
Some(PlainDataSizeCounter::new(descr))
} else {
None
}
}
};

let fallback = FallbackEncoder::new(descr, props)?;

let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
Expand All @@ -460,6 +474,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
bloom_filter,
bloom_filter_target_fpp,
dict_encoder: dictionary,
plain_data_size_counter,
min_value: None,
max_value: None,
geo_stats_accumulator,
Expand Down Expand Up @@ -510,6 +525,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}

fn uncompressed_data_size(&self) -> Option<usize> {
let counter = self.plain_data_size_counter.as_ref()?;
Some(counter.uncompressed_data_size())
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
Expand All @@ -530,6 +550,8 @@ impl ColumnValueEncoder for ByteArrayEncoder {
));
}

self.plain_data_size_counter = None;

Ok(Some(encoder.flush_dict_page()))
}
_ => Ok(None),
Expand Down Expand Up @@ -582,7 +604,15 @@ where
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
Some(dict_encoder) => {
dict_encoder.encode(values, indices);
if let Some(counter) = encoder.plain_data_size_counter.as_mut() {
for idx in indices {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about performance here. It would be nice if after we've collected enough samples and decided on dict vs fallback, we stop gathering these statistics.

Copy link
Copy Markdown
Contributor Author

@mzabaluev mzabaluev Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a mind to keep comparing after every encoded data page, for cases when the configured minimal sample is still not indicative of the overall value distribution and the efficiency degrades somewhere farther down the page chunk. But I understand the concern. Since this behavior is tunable per column through the writer API, I think it's OK to cut counting. For consistency, this should be also done in the generic encoder, I assume?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't want to flag it in both places. 😄

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a mind to keep comparing after every encoded data page, for cases when the configured minimal sample is still not indicative of the overall value distribution and the efficiency degrades somewhere farther down the page chunk.

Fair...but then perhaps the size limit will catch it. In any event, we should stop collectin after we have actually fallen back 😉

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should stop collectin after we have actually fallen back 😉

That's already the case, with the plain_data_size_counter member set to None in both flush_dict_page implementations, and the collecting is also not happening in the put methods in case there is no dictionary. Though if I implement a fix for #9739, this may need to be refactored.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counting shuts down after reaching the sample size threshold in 3ff12c8.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not too granular, I guess, if it only happens on page flush and the fallback might be decided on an earlier write batch.

Copy link
Copy Markdown
Contributor Author

@mzabaluev mzabaluev Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further reworked in 8873bc1.

let value = values.value(*idx);
counter.update_byte_array(value.as_ref());
}
}
}
None => encoder.fallback.encode(values, indices),
}
}
Expand Down
112 changes: 111 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
BloomFilterPosition, DictionaryFallback, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
Expand Down Expand Up @@ -2572,6 +2572,74 @@ mod tests {
);
}

#[test]
fn arrow_writer_dictionary_fallback_on_unfavorable_compression() {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);

// Generate an array of 10 unique 10 character strings.
// This results in a dictionary encoding larger than the plain encoded data,
// which should trigger a fallback to PLAIN encoding.
for i in 0..10 {
let value = i
.to_string()
.repeat(10)
.chars()
.take(10)
.collect::<String>();

builder.append_value(value);
}

let array = Arc::new(builder.finish());

let batch = RecordBatch::try_new(schema, vec![array]).unwrap();

let file = tempfile::tempfile().unwrap();

// Set dictionary fallback to trigger fallback to PLAIN encoding on unfavorable compression
let props = WriterProperties::builder()
.set_dictionary_fallback(DictionaryFallback::OnUnfavorableCompression)
.set_data_page_size_limit(1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.set_data_page_size_limit(1)
.set_data_page_row_count_limit(2)

There's an issue here due to the fact that with these settings the page is flushed before the check for dict fallback is called. Keeping the batch size at 1 but the row count at 2 should allow the check to actually force fallback, resulting in one RLE_DICTIONARY encoded data page and 5 PLAIN encoded.

.set_write_batch_size(1)
.build();

let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
.expect("Unable to write file");
writer.write(&batch).unwrap();
writer.close().unwrap();

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();

let column = reader.metadata().row_group(0).columns();

assert_eq!(column.len(), 1);

// We should write one row before falling back to PLAIN encoding so there should still be a
// dictionary page.
assert!(
column[0].dictionary_page_offset().is_some(),
"Expected a dictionary page"
);

assert!(reader.metadata().offset_index().is_some());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is not testing the encoding, merely counting the number of data pages. Rather than this you should be examining the page encoding stats.

        let options = ReadOptionsBuilder::new()
            .with_encoding_stats_as_mask(false)
            .build();
...
        // check page encoding stats, should be one dict page, one dict encoded page, and 9
        // plain encoded pages
        let stats = column[0].page_encoding_stats().unwrap();
        println!("pes: {stats:?}");
        assert!(
            stats
                .iter()
                .any(|s| s.page_type == PageType::DICTIONARY_PAGE)
        );
        let num_dict_encoded: i32 = stats
            .iter()
            .filter(|s| {
                s.page_type == PageType::DATA_PAGE && s.encoding == Encoding::RLE_DICTIONARY
            })
            .map(|s| s.count)
            .sum();
        assert_eq!(num_dict_encoded, 1);
        let num_plain_encoded: i32 = stats
            .iter()
            .filter(|s| {
                s.page_type == PageType::DATA_PAGE && s.encoding == Encoding::PLAIN
            })
            .map(|s| s.count)
            .sum();
        assert_eq!(num_plain_encoded, 9);

Coded this way, the test fails with

thread 'arrow::arrow_writer::tests::arrow_writer_dictionary_fallback_on_unfavorable_compression' (10294973) panicked at parquet/src/arrow/arrow_writer/mod.rs:2649:9:
assertion `left == right` failed
  left: 10
 right: 1

indicating that all pages are dict encoded and fallback did not occur

let offset_indexes = &reader.metadata().offset_index().unwrap()[0];

let page_locations = offset_indexes[0].page_locations.clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
page_locations.len(),
10,
"Expected 10 pages but got {page_locations:#?}"
);
}

#[test]
fn arrow_writer_float_nans() {
let f16_field = Field::new("a", DataType::Float16, false);
Expand Down Expand Up @@ -4827,6 +4895,48 @@ mod tests {
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}

#[test]
fn test_dict_page_size_decided_by_compression_fallback() {
Copy link
Copy Markdown
Contributor

@etseidl etseidl Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a test, I saved the output from this and examined the sizing. Without the heuristic, the encoded size for col0 is 8658384 bytes (the default fallback mechanism kicked in after 7 pages). With the heuristic, col1 is 8391126 bytes, a savings of 3%.

I also modified the test to mod the index with 32767. In that instance, col1 was still 8391126 bytes, but col0 was only 2231581, nearly 4X smaller.

I know this is not entirely representative, but it does again point out the pitfalls of too simplistic an approach.

Edit: I did a test of spark with the latter file (32k cardinality). By default, it opts to fallback for all pages, so the file is even larger. If I modify the global parquet.page.row.count.limit to 132000, it then opts for dictionary encoding as it should.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test in 1b6dd37 to demonstrate a case when even an early fallback decision brings about 12% compression. But I generally agree with your assessment, so more work is needed.

Another quirk is seen in this test: a dictionary page is still flushed to encode the first data page, even though there is no benefit. Parquet-java takes care to hand over the accumulated values to the plain encoder to be re-encoded.

let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
let schema = Arc::new(Schema::new(vec![
Field::new("col0", arrow_schema::DataType::Int64, false),
Field::new("col1", arrow_schema::DataType::Int64, false),
]));
let batch =
arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();

let props = WriterProperties::builder()
.set_dictionary_page_size_limit(1024 * 1024)
.set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
.set_column_dictionary_fallback(
ColumnPath::from("col1"),
DictionaryFallback::OnUnfavorableCompression,
)
.build();
let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
let data = Bytes::from(writer.into_inner().unwrap());

let mut metadata = ParquetMetaDataReader::new();
metadata.try_parse(&data).unwrap();
let metadata = metadata.finish().unwrap();
let col0_meta = metadata.row_group(0).column(0);
let col1_meta = metadata.row_group(0).column(1);

let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
let mut reader =
SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
let page = reader.get_next_page().unwrap().unwrap();
match page {
Page::DictionaryPage { buf, .. } => buf.len(),
_ => panic!("expected DictionaryPage"),
}
};

assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 8192);
}

struct WriteBatchesShape {
num_batches: usize,
rows_per_batch: usize,
Expand Down
37 changes: 34 additions & 3 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::column::writer::{
};
use crate::data_type::DataType;
use crate::data_type::private::ParquetValueType;
use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
use crate::encodings::encoding::{DictEncoder, Encoder, PlainDataSizeCounter, get_encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::file::properties::{DictionaryFallback, EnabledStatistics, WriterProperties};
use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
Expand Down Expand Up @@ -109,6 +109,12 @@ pub trait ColumnValueEncoder {
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize;

/// Returns the estimated size of plainly encoded data, in bytes,
/// that would be written without a dictionary.
/// If there is no dictionary, or the data size statistic is not available,
/// returns `None`.
fn uncompressed_data_size(&self) -> Option<usize>;
Comment thread
mzabaluev marked this conversation as resolved.
Outdated

/// Flush the dictionary page for this column chunk if any. Any subsequent calls to
/// [`Self::write`] will not be dictionary encoded
///
Expand All @@ -132,6 +138,7 @@ pub trait ColumnValueEncoder {
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
plain_data_size_counter: Option<PlainDataSizeCounter>,
descr: ColumnDescPtr,
num_values: usize,
statistics_enabled: EnabledStatistics,
Expand Down Expand Up @@ -176,7 +183,13 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
}

match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
Some(encoder) => {
encoder.put(slice)?;
if let Some(counter) = self.plain_data_size_counter.as_mut() {
counter.update(slice);
}
Ok(())
}
_ => self.encoder.put(slice),
}
}
Expand All @@ -197,6 +210,16 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
let plain_data_size_counter = match props.dictionary_fallback(descr.path()) {
DictionaryFallback::OnPageSizeLimit => None,
DictionaryFallback::OnUnfavorableCompression => {
if dict_encoder.is_some() {
Some(PlainDataSizeCounter::new(descr))
} else {
None
}
}
};

// Set either main encoder or fallback encoder.
let encoder = get_encoder(
Expand All @@ -215,6 +238,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
Ok(Self {
encoder,
dict_encoder,
plain_data_size_counter,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
Expand Down Expand Up @@ -277,6 +301,11 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}

fn uncompressed_data_size(&self) -> Option<usize> {
let counter = self.plain_data_size_counter.as_ref()?;
Some(counter.uncompressed_data_size())
}

fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_encoded_size(),
Expand All @@ -293,6 +322,8 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
));
}

self.plain_data_size_counter = None;

let buf = encoder.write_dict()?;

Ok(Some(DictionaryPage {
Expand Down
33 changes: 24 additions & 9 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,18 +746,33 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

/// Returns true if we need to fall back to non-dictionary encoding.
///
/// We can only fall back if dictionary encoder is set and we have exceeded dictionary
/// size.
#[inline]
/// The behavior is governed by the `dictionary_fallback` column property.
fn should_dict_fallback(&self) -> bool {
match self.encoder.estimated_dict_page_size() {
Some(size) => {
size >= self
.props
.column_dictionary_page_size_limit(self.descr.path())
let dict_size = match self.encoder.estimated_dict_page_size() {
Some(size) => size,
None => return false,
};

// First check: dictionary size exceeds limit
if dict_size
>= self
.props
.column_dictionary_page_size_limit(self.descr.path())
{
return true;
}

// Second check, if enabled: the compression heuristic.
// For similar logic in parquet-java,
// see DictionaryValuesWriter.isCompressionSatisfying
if let Some(raw_size) = self.encoder.uncompressed_data_size() {
let encoded_size = self.encoder.estimated_data_page_size();
if encoded_size + dict_size >= raw_size {
return true;
}
None => false,
}

false
}

/// Returns true if there is enough data for a data page, false otherwise.
Expand Down
Loading
Loading