Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
tokio = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod page_filter;
mod reader;
mod row_filter;
mod row_group_filter;
mod sampling;
mod sort;
pub mod source;
mod supported_predicates;
Expand All @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use row_group_filter::RowGroupAccessPlanFilter;
pub use sampling::ParquetSampling;
pub use writer::plan_to_parquet;
148 changes: 146 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub(super) struct ParquetMorselizer {
pub max_predicate_cache_size: Option<usize>,
/// Whether to read row groups in reverse order
pub reverse_row_groups: bool,
/// Sampling config carried from `ParquetSource`. Applied lazily
/// inside the opener once the parquet metadata is available.
pub sampling: crate::sampling::ParquetSampling,
}

impl fmt::Debug for ParquetMorselizer {
Expand Down Expand Up @@ -287,6 +290,7 @@ struct PreparedParquetOpen {
max_predicate_cache_size: Option<usize>,
reverse_row_groups: bool,
preserve_order: bool,
sampling: crate::sampling::ParquetSampling,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
}
Expand Down Expand Up @@ -656,6 +660,7 @@ impl ParquetMorselizer {
max_predicate_cache_size: self.max_predicate_cache_size,
reverse_row_groups: self.reverse_row_groups,
preserve_order: self.preserve_order,
sampling: self.sampling.clone(),
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
})
Expand Down Expand Up @@ -882,11 +887,33 @@ impl FiltersPreparedParquetOpen {

// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
let mut initial_plan = create_initial_plan(
&prepared.file_name,
prepared.extensions.clone(),
rg_metadata.len(),
)?);
)?;

// Apply optional row-group and row-range sampling now that we
// know the actual row-group count. Both calls are no-ops when
// their respective fraction is `None`. Selection is
// deterministic per `(partition_index, row_group_index,
// fraction, cluster_size)` so re-runs match. The execution
// `partition_index` is the stable per-file id we plumb in:
// it makes sampling reproducible across environments without
// depending on object-store paths, and decorrelates files
// assigned to different partitions.
prepared.sampling.apply_row_group_sampling(
&mut initial_plan,
rg_metadata.len(),
prepared.partition_index,
);
prepared.sampling.apply_row_fraction_sampling(
&mut initial_plan,
rg_metadata,
prepared.partition_index,
);

let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);

// If there is a range restricting what parts of the file to read
if let Some(range) = prepared.file_range.as_ref() {
Expand Down Expand Up @@ -1676,6 +1703,7 @@ mod test {
max_predicate_cache_size: Option<usize>,
reverse_row_groups: bool,
preserve_order: bool,
sampling: crate::sampling::ParquetSampling,
}

impl ParquetMorselizerBuilder {
Expand All @@ -1702,9 +1730,16 @@ mod test {
max_predicate_cache_size: None,
reverse_row_groups: false,
preserve_order: false,
sampling: crate::sampling::ParquetSampling::default(),
}
}

/// Set the sampling config.
fn with_sampling(mut self, sampling: crate::sampling::ParquetSampling) -> Self {
self.sampling = sampling;
self
}

/// Set the object store (required for building).
fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
self.store = Some(store);
Expand Down Expand Up @@ -1816,6 +1851,7 @@ mod test {
encryption_factory: None,
max_predicate_cache_size: self.max_predicate_cache_size,
reverse_row_groups: self.reverse_row_groups,
sampling: self.sampling,
}
}
}
Expand Down Expand Up @@ -2720,4 +2756,112 @@ mod test {
"without page index all rows are returned"
);
}

/// End-to-end: a parquet file with 4 row groups, scanned with
/// `row_group_fraction = 0.5`, should return rows from exactly 2
/// of the 4 row groups.
#[tokio::test]
async fn row_group_sampling_end_to_end() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// 4 row groups of 3 rows each = 12 rows total.
let batches = (0..4)
.map(|g| {
record_batch!((
"a",
Int32,
vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),]
))
.unwrap()
})
.collect::<Vec<_>>();
let schema = batches[0].schema();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(3))
.build();

let data_len = write_parquet_batches(
Arc::clone(&store),
"rg_sampled.parquet",
batches,
Some(props),
)
.await;

let file = PartitionedFile::new(
"rg_sampled.parquet".to_string(),
u64::try_from(data_len).unwrap(),
);

let sampling = crate::sampling::ParquetSampling {
row_group_fraction: Some(0.5),
..Default::default()
};

let opener = ParquetMorselizerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_sampling(sampling)
.build();

let stream = open_file(&opener, file).await.unwrap();
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;

// ceil(4 * 0.5) = 2 row groups kept, each with 3 rows.
assert_eq!(
num_rows, 6,
"row_group_fraction=0.5 over 4 row groups should yield 2 row groups × 3 rows"
);
}

/// End-to-end: a single row group of 100 rows scanned with
/// `row_fraction = 0.1` and the default cluster size should yield
/// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) =
/// 10` plus how the windows pack — we assert the count is in the
/// expected range and significantly less than 100.
#[tokio::test]
async fn row_fraction_end_to_end() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// One row group of 100 rows so we exercise the per-row-group
// RowSelection, not the row-group-level skip.
let values: Vec<Option<i32>> = (0..100).map(Some).collect();
let batch = record_batch!(("a", Int32, values)).unwrap();
let schema = batch.schema();
let data_len =
write_parquet(Arc::clone(&store), "rf.parquet", batch.clone()).await;
let file = PartitionedFile::new(
"rf.parquet".to_string(),
u64::try_from(data_len).unwrap(),
);

let sampling = crate::sampling::ParquetSampling {
row_fraction: Some(0.1),
row_cluster_size: 4, // small cluster -> several windows
..Default::default()
};

let opener = ParquetMorselizerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_sampling(sampling)
.build();

let stream = open_file(&opener, file).await.unwrap();
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;

// We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3
// windows of ceil(10/3)=4 rows each, capped at the total ->
// up to 12 rows in practice. Assert the bounds.
assert!(
(1..100).contains(&num_rows),
"row_fraction=0.1 should drop the vast majority of rows; got {num_rows}"
);
assert!(
num_rows <= 16,
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
);
}
}
Loading
Loading