Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,36 @@ impl PreparedAccessPlan {

Ok(self)
}

/// Apply a row-level offset by creating a [`RowSelection`] that skips
/// the first `remaining_offset` rows across all row groups.
pub(crate) fn apply_offset(
mut self,
remaining_offset: usize,
rg_metadata: &[RowGroupMetaData],
) -> Self {
if remaining_offset == 0 || self.row_group_indexes.is_empty() {
return self;
}
let total_rows: usize = self
.row_group_indexes
.iter()
.map(|&idx| rg_metadata[idx].num_rows() as usize)
.sum();
let select_rows = total_rows.saturating_sub(remaining_offset);
if select_rows == 0 {
return self;
}
let offset_selection = RowSelection::from(vec![
RowSelector::skip(remaining_offset),
RowSelector::select(select_rows),
]);
self.row_selection = Some(match self.row_selection {
Some(existing) => existing.intersection(&offset_selection),
None => offset_selection,
});
self
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct ParquetFileMetrics {
pub row_groups_pruned_bloom_filter: PruningMetrics,
/// Number of row groups pruned due to limit pruning.
pub limit_pruned_row_groups: PruningMetrics,
/// Number of row groups pruned due to offset pruning.
pub offset_pruned_row_groups: PruningMetrics,
/// Number of row groups pruned by statistics
pub row_groups_pruned_statistics: PruningMetrics,
/// Total number of bytes scanned
Expand Down Expand Up @@ -113,6 +115,11 @@ impl ParquetFileMetrics {
.with_type(MetricType::Summary)
.pruning_metrics("limit_pruned_row_groups", partition);

let offset_pruned_row_groups = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::Summary)
.pruning_metrics("offset_pruned_row_groups", partition);

let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::Summary)
Expand Down Expand Up @@ -198,6 +205,7 @@ impl ParquetFileMetrics {
row_groups_pruned_bloom_filter,
row_groups_pruned_statistics,
limit_pruned_row_groups,
offset_pruned_row_groups,
bytes_scanned,
pushdown_rows_pruned,
pushdown_rows_matched,
Expand Down
61 changes: 60 additions & 1 deletion datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub(super) struct ParquetMorselizer {
pub batch_size: usize,
/// Optional limit on the number of rows to read
pub(crate) limit: Option<usize>,
/// Shared remaining offset across all partition openers.
/// Each opener atomically consumes rows by skipping RGs.
pub(crate) remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
/// If should keep the output rows in order
pub preserve_order: bool,
/// Optional predicate to apply during the scan
Expand Down Expand Up @@ -281,6 +284,7 @@ struct PreparedParquetOpen {
enable_bloom_filter: bool,
enable_row_group_stats_pruning: bool,
limit: Option<usize>,
remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
coerce_int96: Option<TimeUnit>,
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
predicate_creation_errors: Count,
Expand Down Expand Up @@ -650,6 +654,7 @@ impl ParquetMorselizer {
enable_bloom_filter: self.enable_bloom_filter,
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
limit: self.limit,
remaining_offset: Arc::clone(&self.remaining_offset),
coerce_int96: self.coerce_int96,
expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
predicate_creation_errors,
Expand Down Expand Up @@ -1101,6 +1106,29 @@ impl RowGroupsPrunedParquetOpen {
None
};

// Prune by offset: atomically consume from the shared remaining_offset.
// Prune by offset: atomically consume from shared remaining_offset.
// Multiple partitions safely share this counter — each skips RGs
// and reduces the counter. When it reaches 0, remaining partitions
// start producing rows.
let current_offset = prepared
.remaining_offset
.load(std::sync::atomic::Ordering::SeqCst);
if current_offset > 0 {
let remaining = row_groups.prune_by_offset(
current_offset,
prepared.predicate.is_some(),
rg_metadata,
&prepared.file_metrics,
);
let skipped = current_offset - remaining;
if skipped > 0 {
prepared
.remaining_offset
.fetch_sub(skipped, std::sync::atomic::Ordering::SeqCst);
}
}

// Prune by limit if limit is set and limit order is not sensitive
if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) {
row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics);
Expand All @@ -1126,6 +1154,25 @@ impl RowGroupsPrunedParquetOpen {
// Prepare the access plan (extract row groups and row selection)
let mut prepared_plan = access_plan.prepare(rg_metadata)?;

// Apply remaining offset (partial RG skip) via RowSelection.
// SharedCount was reduced by whole-RG skips above; any leftover
// offset is handled by skipping rows within the first surviving RG.
let now_remaining = prepared
.remaining_offset
.load(std::sync::atomic::Ordering::SeqCst);
if now_remaining > 0 {
// Atomically consume the remaining offset for this partition
let taken = prepared
.remaining_offset
.fetch_sub(now_remaining, std::sync::atomic::Ordering::SeqCst)
.min(now_remaining);
if taken > 0 {
prepared_plan = prepared_plan.apply_offset(taken, rg_metadata);
}
}
// GlobalLimitExec handles the row-level skip. We only prune
// whole RGs to reduce I/O.

// Potentially reverse the access plan for performance.
// See `ParquetSource::try_pushdown_sort` for the rationale.
if prepared.reverse_row_groups {
Expand Down Expand Up @@ -1157,8 +1204,19 @@ impl RowGroupsPrunedParquetOpen {
}
decoder_builder =
decoder_builder.with_row_groups(prepared_plan.row_group_indexes);

// Adjust limit: original limit is skip+fetch from optimizer.
// Subtract the offset that was consumed by this partition's
// RG pruning (via SharedCount). Remaining rows to read =
// limit - (original_offset - current_remaining).
if let Some(limit) = prepared.limit {
decoder_builder = decoder_builder.with_limit(limit);
let original_offset = current_offset; // captured before prune
let now_remaining = prepared
.remaining_offset
.load(std::sync::atomic::Ordering::SeqCst);
let this_partition_skipped = original_offset.saturating_sub(now_remaining);
let effective_limit = limit.saturating_sub(this_partition_skipped);
decoder_builder = decoder_builder.with_limit(effective_limit);
}
if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size {
decoder_builder =
Expand Down Expand Up @@ -1794,6 +1852,7 @@ mod test {
projection,
batch_size: self.batch_size,
limit: self.limit,
remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
preserve_order: self.preserve_order,
predicate: self.predicate,
table_schema,
Expand Down
172 changes: 172 additions & 0 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter {
}
}

/// Prune row groups that can be entirely skipped due to offset.
Comment thread
alamb marked this conversation as resolved.
///
/// When an offset is specified, rows at the beginning of the scan must be
/// skipped. This method marks leading fully-matched row groups whose
/// cumulative row count falls within the offset as skipped, so they are
/// never read from disk.
///
/// Returns the remaining offset (number of rows still to skip within the
/// first non-pruned row group).
pub fn prune_by_offset(
&mut self,
offset: usize,
has_predicate: bool,
rg_metadata: &[RowGroupMetaData],
metrics: &ParquetFileMetrics,
) -> usize {
let mut remaining = offset;
let mut pruned_count = 0;

for &idx in self.access_plan.row_group_indexes().iter() {
if remaining == 0 {
break;
}
// We can skip a row group entirely if:
// - No predicate: all rows match, row count is exact
// - Has predicate but is_fully_matched: all rows pass filter
let can_skip = !has_predicate || self.is_fully_matched[idx];
if can_skip {
let rg_rows = rg_metadata[idx].num_rows() as usize;
if remaining >= rg_rows {
self.access_plan.skip(idx);
remaining -= rg_rows;
pruned_count += 1;
} else {
break;
}
} else {
break;
}
}

metrics.offset_pruned_row_groups.add_pruned(pruned_count);
remaining
}

/// Prune remaining row groups to only those within the specified range.
///
/// Updates this set to mark row groups that should not be scanned
Expand Down Expand Up @@ -1438,6 +1483,133 @@ mod tests {
ParquetFileMetrics::new(0, "file.parquet", &metrics)
}

/// Create a RowGroupMetaData with the specified number of rows.
/// Uses a minimal schema with a single INT32 column.
fn make_row_group_meta(num_rows: i64) -> RowGroupMetaData {
let schema_descr = get_test_schema_descr(vec![PrimitiveTypeField::new(
"id",
PhysicalType::INT32,
)]);
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();
RowGroupMetaData::builder(schema_descr)
.set_num_rows(num_rows)
.set_total_byte_size(1000)
.set_column_metadata(vec![column])
.build()
.unwrap()
}

/// Helper to build a RowGroupAccessPlanFilter with specified fully_matched flags.
fn make_filter_with_fully_matched(
num_rgs: usize,
fully_matched: Vec<bool>,
) -> RowGroupAccessPlanFilter {
assert_eq!(num_rgs, fully_matched.len());
let access_plan = ParquetAccessPlan::new_all(num_rgs);
let mut filter = RowGroupAccessPlanFilter::new(access_plan);
filter.is_fully_matched = fully_matched;
filter
}

#[test]
fn test_prune_by_offset_skips_fully_matched_rgs() {
// 3 RGs each with 100 rows, all fully_matched. offset=250.
// Should skip 2 RGs (200 rows), remaining=50.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);

let remaining = filter.prune_by_offset(250, false, &rg_metadata, &metrics);
assert_eq!(remaining, 50);
// First two RGs should be skipped, third should still be scanned
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert_eq!(indexes, vec![2]);
}

#[test]
fn test_prune_by_offset_stops_at_non_fully_matched() {
// 3 RGs each with 100 rows. First two fully_matched, third not.
// offset=250 → skip 2 RGs (200 rows), remaining=50.
// Cannot skip the non-fully-matched third RG even though offset
// still needs more rows skipped.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, false]);

let remaining = filter.prune_by_offset(250, true, &rg_metadata, &metrics);
assert_eq!(remaining, 50);
// First two RGs skipped, third still scanned (not fully matched)
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert_eq!(indexes, vec![2]);
}

#[test]
fn test_prune_by_offset_zero() {
// offset=0 → no pruning, remaining=0.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);

let remaining = filter.prune_by_offset(0, false, &rg_metadata, &metrics);
assert_eq!(remaining, 0);
// All RGs should still be scanned
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert_eq!(indexes, vec![0, 1, 2]);
}

#[test]
fn test_prune_by_offset_exact_boundary() {
// 3 RGs each 100 rows. offset=200 → skip exactly 2 RGs, remaining=0.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);

let remaining = filter.prune_by_offset(200, false, &rg_metadata, &metrics);
assert_eq!(remaining, 0);
// First two RGs skipped, third still scanned
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert_eq!(indexes, vec![2]);
}

#[test]
fn test_prune_by_offset_exceeds_total() {
// offset=400 > total 300 rows → skip all fully_matched RGs,
// remaining = 400 - 300 = 100.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);

let remaining = filter.prune_by_offset(400, false, &rg_metadata, &metrics);
assert_eq!(remaining, 100);
// All RGs should be skipped
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert!(indexes.is_empty());
}

#[test]
fn test_prune_by_offset_partial_rg() {
// offset=50 (less than first RG of 100 rows) → don't skip any RG,
// remaining=50.
let rg_metadata: Vec<RowGroupMetaData> =
(0..3).map(|_| make_row_group_meta(100)).collect();
let metrics = parquet_file_metrics();
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);

let remaining = filter.prune_by_offset(50, false, &rg_metadata, &metrics);
assert_eq!(remaining, 50);
// No RGs should be skipped since offset < first RG's row count
let indexes: Vec<usize> = filter.row_group_indexes().collect();
assert_eq!(indexes, vec![0, 1, 2]);
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
BloomFilterTest::new_data_index_bloom_encoding_stats()
Expand Down
Loading
Loading