Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ struct FixedLenByteArrayBuffer {
buffer: Vec<u8>,
/// The length of each element in bytes
byte_length: Option<usize>,
/// Preserved value-count hint used to allocate `buffer` once `byte_length`
/// becomes known on the first decode.
values_capacity: Option<usize>,
}

#[inline]
Expand All @@ -291,12 +294,13 @@ fn move_values<F>(
}

impl ValuesBuffer for FixedLenByteArrayBuffer {
fn with_capacity(_capacity: usize) -> Self {
// byte_length is not known at trait level, so we return a default buffer
// The decoder will pre-allocate when it knows both capacity and byte_length
fn with_capacity(capacity: usize) -> Self {
// `byte_length` is not known initially, so preserve the value-count
// hint so the first decode can allocate the exact byte capacity.
Self {
buffer: Vec::new(),
byte_length: None,
values_capacity: Some(capacity),
}
}

Expand Down Expand Up @@ -419,7 +423,19 @@ impl ColumnValueDecoder for ValueDecoder {
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
match out.byte_length {
Some(x) => assert_eq!(x, self.byte_length),
None => out.byte_length = Some(self.byte_length),
None => {
out.byte_length = Some(self.byte_length);
// TODO: collapse to a let-chain once MSRV ≥ 1.88
// (`if out.buffer.is_empty() && let Some(cap) = out.values_capacity.take()`)
if out.buffer.is_empty() {
if let Some(values_capacity) = out.values_capacity.take() {
// now that the byte length per output element is known,
// allocate the actual needed space.
let byte_capacity = values_capacity.saturating_mul(self.byte_length);
out.buffer = Vec::with_capacity(byte_capacity);
}
}
}
}

match self.decoder.as_mut().unwrap() {
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ where

/// Try to read one batch of data returning the number of records read
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
if batch_size == 0 {
return Ok(0);
}
// Update capacity hint to the largest batch size seen
if batch_size > self.capacity_hint {
self.capacity_hint = batch_size;
Expand Down
Loading