Skip to content

Commit eee57af

Browse files
committed
feat: pushdown OFFSET to parquet for RG-level skipping
Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener. Uses shared Arc<AtomicUsize> counter across partitions so multi-partition single-file queries (byte-range partitioning) are handled correctly. Design: - with_offset accepted for parquet + no filter (any file count) - SharedCount: each partition atomically consumes offset by skipping RGs - RowSelection for partial RG skip (remaining offset within first RG) - Optimizer eliminates GlobalLimitExec when offset is pushed - effective_limit adjusted per partition based on consumed offset Implementation: - FileSource::supports_offset() (parquet=true, others=false) - FileScanConfig: offset field, with_offset (no filter guard) - LimitPushdown: push offset, eliminate GlobalLimitExec - prune_by_offset: skip leading fully-matched RGs - PreparedAccessPlan::apply_offset() for RowSelection - Shared Arc<AtomicUsize> remaining_offset in ParquetMorselizer
1 parent 3aefba7 commit eee57af

15 files changed

Lines changed: 563 additions & 38 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,36 @@ impl PreparedAccessPlan {
396396

397397
Ok(self)
398398
}
399+
400+
/// Apply a row-level offset by creating a [`RowSelection`] that skips
401+
/// the first `remaining_offset` rows across all row groups.
402+
pub(crate) fn apply_offset(
403+
mut self,
404+
remaining_offset: usize,
405+
rg_metadata: &[RowGroupMetaData],
406+
) -> Self {
407+
if remaining_offset == 0 || self.row_group_indexes.is_empty() {
408+
return self;
409+
}
410+
let total_rows: usize = self
411+
.row_group_indexes
412+
.iter()
413+
.map(|&idx| rg_metadata[idx].num_rows() as usize)
414+
.sum();
415+
let select_rows = total_rows.saturating_sub(remaining_offset);
416+
if select_rows == 0 {
417+
return self;
418+
}
419+
let offset_selection = RowSelection::from(vec![
420+
RowSelector::skip(remaining_offset),
421+
RowSelector::select(select_rows),
422+
]);
423+
self.row_selection = Some(match self.row_selection {
424+
Some(existing) => existing.intersection(&offset_selection),
425+
None => offset_selection,
426+
});
427+
self
428+
}
399429
}
400430

401431
#[cfg(test)]

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct ParquetFileMetrics {
4949
pub row_groups_pruned_bloom_filter: PruningMetrics,
5050
/// Number of row groups pruned due to limit pruning.
5151
pub limit_pruned_row_groups: PruningMetrics,
52+
/// Number of row groups pruned due to offset pruning.
53+
pub offset_pruned_row_groups: PruningMetrics,
5254
/// Number of row groups pruned by statistics
5355
pub row_groups_pruned_statistics: PruningMetrics,
5456
/// Total number of bytes scanned
@@ -113,6 +115,11 @@ impl ParquetFileMetrics {
113115
.with_type(MetricType::Summary)
114116
.pruning_metrics("limit_pruned_row_groups", partition);
115117

118+
let offset_pruned_row_groups = MetricBuilder::new(metrics)
119+
.with_new_label("filename", filename.to_string())
120+
.with_type(MetricType::Summary)
121+
.pruning_metrics("offset_pruned_row_groups", partition);
122+
116123
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
117124
.with_new_label("filename", filename.to_string())
118125
.with_type(MetricType::Summary)
@@ -198,6 +205,7 @@ impl ParquetFileMetrics {
198205
row_groups_pruned_bloom_filter,
199206
row_groups_pruned_statistics,
200207
limit_pruned_row_groups,
208+
offset_pruned_row_groups,
201209
bytes_scanned,
202210
pushdown_rows_pruned,
203211
pushdown_rows_matched,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub(super) struct ParquetMorselizer {
9292
pub batch_size: usize,
9393
/// Optional limit on the number of rows to read
9494
pub(crate) limit: Option<usize>,
95+
/// Shared remaining offset across all partition openers.
96+
/// Each opener atomically consumes rows by skipping RGs.
97+
pub(crate) remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
9598
/// If should keep the output rows in order
9699
pub preserve_order: bool,
97100
/// Optional predicate to apply during the scan
@@ -281,6 +284,7 @@ struct PreparedParquetOpen {
281284
enable_bloom_filter: bool,
282285
enable_row_group_stats_pruning: bool,
283286
limit: Option<usize>,
287+
remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
284288
coerce_int96: Option<TimeUnit>,
285289
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
286290
predicate_creation_errors: Count,
@@ -650,6 +654,7 @@ impl ParquetMorselizer {
650654
enable_bloom_filter: self.enable_bloom_filter,
651655
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
652656
limit: self.limit,
657+
remaining_offset: Arc::clone(&self.remaining_offset),
653658
coerce_int96: self.coerce_int96,
654659
expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
655660
predicate_creation_errors,
@@ -1101,6 +1106,29 @@ impl RowGroupsPrunedParquetOpen {
11011106
None
11021107
};
11031108

1109+
// Prune by offset: atomically consume from the shared remaining_offset.
1110+
// Prune by offset: atomically consume from shared remaining_offset.
1111+
// Multiple partitions safely share this counter — each skips RGs
1112+
// and reduces the counter. When it reaches 0, remaining partitions
1113+
// start producing rows.
1114+
let current_offset = prepared
1115+
.remaining_offset
1116+
.load(std::sync::atomic::Ordering::SeqCst);
1117+
if current_offset > 0 {
1118+
let remaining = row_groups.prune_by_offset(
1119+
current_offset,
1120+
prepared.predicate.is_some(),
1121+
rg_metadata,
1122+
&prepared.file_metrics,
1123+
);
1124+
let skipped = current_offset - remaining;
1125+
if skipped > 0 {
1126+
prepared
1127+
.remaining_offset
1128+
.fetch_sub(skipped, std::sync::atomic::Ordering::SeqCst);
1129+
}
1130+
}
1131+
11041132
// Prune by limit if limit is set and limit order is not sensitive
11051133
if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) {
11061134
row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics);
@@ -1126,6 +1154,25 @@ impl RowGroupsPrunedParquetOpen {
11261154
// Prepare the access plan (extract row groups and row selection)
11271155
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
11281156

1157+
// Apply remaining offset (partial RG skip) via RowSelection.
1158+
// SharedCount was reduced by whole-RG skips above; any leftover
1159+
// offset is handled by skipping rows within the first surviving RG.
1160+
let now_remaining = prepared
1161+
.remaining_offset
1162+
.load(std::sync::atomic::Ordering::SeqCst);
1163+
if now_remaining > 0 {
1164+
// Atomically consume the remaining offset for this partition
1165+
let taken = prepared
1166+
.remaining_offset
1167+
.fetch_sub(now_remaining, std::sync::atomic::Ordering::SeqCst)
1168+
.min(now_remaining);
1169+
if taken > 0 {
1170+
prepared_plan = prepared_plan.apply_offset(taken, rg_metadata);
1171+
}
1172+
}
1173+
// GlobalLimitExec handles the row-level skip. We only prune
1174+
// whole RGs to reduce I/O.
1175+
11291176
// Potentially reverse the access plan for performance.
11301177
// See `ParquetSource::try_pushdown_sort` for the rationale.
11311178
if prepared.reverse_row_groups {
@@ -1157,8 +1204,19 @@ impl RowGroupsPrunedParquetOpen {
11571204
}
11581205
decoder_builder =
11591206
decoder_builder.with_row_groups(prepared_plan.row_group_indexes);
1207+
1208+
// Adjust limit: original limit is skip+fetch from optimizer.
1209+
// Subtract the offset that was consumed by this partition's
1210+
// RG pruning (via SharedCount). Remaining rows to read =
1211+
// limit - (original_offset - current_remaining).
11601212
if let Some(limit) = prepared.limit {
1161-
decoder_builder = decoder_builder.with_limit(limit);
1213+
let original_offset = current_offset; // captured before prune
1214+
let now_remaining = prepared
1215+
.remaining_offset
1216+
.load(std::sync::atomic::Ordering::SeqCst);
1217+
let this_partition_skipped = original_offset.saturating_sub(now_remaining);
1218+
let effective_limit = limit.saturating_sub(this_partition_skipped);
1219+
decoder_builder = decoder_builder.with_limit(effective_limit);
11621220
}
11631221
if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size {
11641222
decoder_builder =
@@ -1794,6 +1852,7 @@ mod test {
17941852
projection,
17951853
batch_size: self.batch_size,
17961854
limit: self.limit,
1855+
remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
17971856
preserve_order: self.preserve_order,
17981857
predicate: self.predicate,
17991858
table_schema,

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter {
212212
}
213213
}
214214

215+
/// Prune row groups that can be entirely skipped due to offset.
216+
///
217+
/// When an offset is specified, rows at the beginning of the scan must be
218+
/// skipped. This method marks leading fully-matched row groups whose
219+
/// cumulative row count falls within the offset as skipped, so they are
220+
/// never read from disk.
221+
///
222+
/// Returns the remaining offset (number of rows still to skip within the
223+
/// first non-pruned row group).
224+
pub fn prune_by_offset(
225+
&mut self,
226+
offset: usize,
227+
has_predicate: bool,
228+
rg_metadata: &[RowGroupMetaData],
229+
metrics: &ParquetFileMetrics,
230+
) -> usize {
231+
let mut remaining = offset;
232+
let mut pruned_count = 0;
233+
234+
for &idx in self.access_plan.row_group_indexes().iter() {
235+
if remaining == 0 {
236+
break;
237+
}
238+
// We can skip a row group entirely if:
239+
// - No predicate: all rows match, row count is exact
240+
// - Has predicate but is_fully_matched: all rows pass filter
241+
let can_skip = !has_predicate || self.is_fully_matched[idx];
242+
if can_skip {
243+
let rg_rows = rg_metadata[idx].num_rows() as usize;
244+
if remaining >= rg_rows {
245+
self.access_plan.skip(idx);
246+
remaining -= rg_rows;
247+
pruned_count += 1;
248+
} else {
249+
break;
250+
}
251+
} else {
252+
break;
253+
}
254+
}
255+
256+
metrics.offset_pruned_row_groups.add_pruned(pruned_count);
257+
remaining
258+
}
259+
215260
/// Prune remaining row groups to only those within the specified range.
216261
///
217262
/// Updates this set to mark row groups that should not be scanned
@@ -1438,6 +1483,133 @@ mod tests {
14381483
ParquetFileMetrics::new(0, "file.parquet", &metrics)
14391484
}
14401485

1486+
/// Create a RowGroupMetaData with the specified number of rows.
1487+
/// Uses a minimal schema with a single INT32 column.
1488+
fn make_row_group_meta(num_rows: i64) -> RowGroupMetaData {
1489+
let schema_descr = get_test_schema_descr(vec![PrimitiveTypeField::new(
1490+
"id",
1491+
PhysicalType::INT32,
1492+
)]);
1493+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
1494+
.set_num_values(num_rows)
1495+
.build()
1496+
.unwrap();
1497+
RowGroupMetaData::builder(schema_descr)
1498+
.set_num_rows(num_rows)
1499+
.set_total_byte_size(1000)
1500+
.set_column_metadata(vec![column])
1501+
.build()
1502+
.unwrap()
1503+
}
1504+
1505+
/// Helper to build a RowGroupAccessPlanFilter with specified fully_matched flags.
1506+
fn make_filter_with_fully_matched(
1507+
num_rgs: usize,
1508+
fully_matched: Vec<bool>,
1509+
) -> RowGroupAccessPlanFilter {
1510+
assert_eq!(num_rgs, fully_matched.len());
1511+
let access_plan = ParquetAccessPlan::new_all(num_rgs);
1512+
let mut filter = RowGroupAccessPlanFilter::new(access_plan);
1513+
filter.is_fully_matched = fully_matched;
1514+
filter
1515+
}
1516+
1517+
#[test]
1518+
fn test_prune_by_offset_skips_fully_matched_rgs() {
1519+
// 3 RGs each with 100 rows, all fully_matched. offset=250.
1520+
// Should skip 2 RGs (200 rows), remaining=50.
1521+
let rg_metadata: Vec<RowGroupMetaData> =
1522+
(0..3).map(|_| make_row_group_meta(100)).collect();
1523+
let metrics = parquet_file_metrics();
1524+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1525+
1526+
let remaining = filter.prune_by_offset(250, false, &rg_metadata, &metrics);
1527+
assert_eq!(remaining, 50);
1528+
// First two RGs should be skipped, third should still be scanned
1529+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1530+
assert_eq!(indexes, vec![2]);
1531+
}
1532+
1533+
#[test]
1534+
fn test_prune_by_offset_stops_at_non_fully_matched() {
1535+
// 3 RGs each with 100 rows. First two fully_matched, third not.
1536+
// offset=250 → skip 2 RGs (200 rows), remaining=50.
1537+
// Cannot skip the non-fully-matched third RG even though offset
1538+
// still needs more rows skipped.
1539+
let rg_metadata: Vec<RowGroupMetaData> =
1540+
(0..3).map(|_| make_row_group_meta(100)).collect();
1541+
let metrics = parquet_file_metrics();
1542+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, false]);
1543+
1544+
let remaining = filter.prune_by_offset(250, true, &rg_metadata, &metrics);
1545+
assert_eq!(remaining, 50);
1546+
// First two RGs skipped, third still scanned (not fully matched)
1547+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1548+
assert_eq!(indexes, vec![2]);
1549+
}
1550+
1551+
#[test]
1552+
fn test_prune_by_offset_zero() {
1553+
// offset=0 → no pruning, remaining=0.
1554+
let rg_metadata: Vec<RowGroupMetaData> =
1555+
(0..3).map(|_| make_row_group_meta(100)).collect();
1556+
let metrics = parquet_file_metrics();
1557+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1558+
1559+
let remaining = filter.prune_by_offset(0, false, &rg_metadata, &metrics);
1560+
assert_eq!(remaining, 0);
1561+
// All RGs should still be scanned
1562+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1563+
assert_eq!(indexes, vec![0, 1, 2]);
1564+
}
1565+
1566+
#[test]
1567+
fn test_prune_by_offset_exact_boundary() {
1568+
// 3 RGs each 100 rows. offset=200 → skip exactly 2 RGs, remaining=0.
1569+
let rg_metadata: Vec<RowGroupMetaData> =
1570+
(0..3).map(|_| make_row_group_meta(100)).collect();
1571+
let metrics = parquet_file_metrics();
1572+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1573+
1574+
let remaining = filter.prune_by_offset(200, false, &rg_metadata, &metrics);
1575+
assert_eq!(remaining, 0);
1576+
// First two RGs skipped, third still scanned
1577+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1578+
assert_eq!(indexes, vec![2]);
1579+
}
1580+
1581+
#[test]
1582+
fn test_prune_by_offset_exceeds_total() {
1583+
// offset=400 > total 300 rows → skip all fully_matched RGs,
1584+
// remaining = 400 - 300 = 100.
1585+
let rg_metadata: Vec<RowGroupMetaData> =
1586+
(0..3).map(|_| make_row_group_meta(100)).collect();
1587+
let metrics = parquet_file_metrics();
1588+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1589+
1590+
let remaining = filter.prune_by_offset(400, false, &rg_metadata, &metrics);
1591+
assert_eq!(remaining, 100);
1592+
// All RGs should be skipped
1593+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1594+
assert!(indexes.is_empty());
1595+
}
1596+
1597+
#[test]
1598+
fn test_prune_by_offset_partial_rg() {
1599+
// offset=50 (less than first RG of 100 rows) → don't skip any RG,
1600+
// remaining=50.
1601+
let rg_metadata: Vec<RowGroupMetaData> =
1602+
(0..3).map(|_| make_row_group_meta(100)).collect();
1603+
let metrics = parquet_file_metrics();
1604+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1605+
1606+
let remaining = filter.prune_by_offset(50, false, &rg_metadata, &metrics);
1607+
assert_eq!(remaining, 50);
1608+
// No RGs should be skipped since offset < first RG's row count
1609+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1610+
assert_eq!(indexes, vec![0, 1, 2]);
1611+
}
1612+
14411613
#[tokio::test]
14421614
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
14431615
BloomFilterTest::new_data_index_bloom_encoding_stats()

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ impl FileSource for ParquetSource {
560560
.batch_size
561561
.expect("Batch size must set before creating ParquetMorselizer"),
562562
limit: base_config.limit,
563+
remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(
564+
base_config.offset.unwrap_or(0),
565+
)),
563566
preserve_order: base_config.preserve_order,
564567
predicate: self.predicate.clone(),
565568
table_schema: self.table_schema.clone(),
@@ -583,6 +586,10 @@ impl FileSource for ParquetSource {
583586
}))
584587
}
585588

589+
fn supports_offset(&self) -> bool {
590+
true
591+
}
592+
586593
fn table_schema(&self) -> &TableSchema {
587594
&self.table_schema
588595
}

datafusion/datasource/src/file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,13 @@ pub trait FileSource: Any + Send + Sync {
280280
Ok(SortOrderPushdownResult::Unsupported)
281281
}
282282

283+
/// Whether this source can efficiently skip rows for OFFSET queries
284+
/// (e.g., by skipping entire row groups based on row counts).
285+
/// Default: false.
286+
fn supports_offset(&self) -> bool {
287+
false
288+
}
289+
283290
/// Try to push down a projection into this FileSource.
284291
///
285292
/// `FileSource` implementations that support projection pushdown should

0 commit comments

Comments
 (0)