Skip to content

Commit fba644f

Browse files
committed
feat: pushdown OFFSET to parquet for RG-level skipping
Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener for single-file, no-filter parquet queries. Skips entire row groups + uses RowSelection for partial RG skip within first surviving RG. Design: - with_offset returns Some only when fully handled (single file, no filter, parquet source) → optimizer eliminates GlobalLimitExec - Multi-file and filtered queries: GlobalLimitExec kept (follow-up) - PreparedAccessPlan::apply_offset() encapsulates RowSelection logic - prune_by_offset: skip leading RGs by cumulative row count - effective_limit = limit - offset for decoder Implementation: - FileSource::supports_offset() (parquet=true, others=false) - FileScanConfig: offset field, with_offset (single file + no filter) - LimitPushdown: push offset, eliminate GlobalLimitExec when accepted Benchmark (60M row, 1.5GB single parquet file): OFFSET 59M: 182ms → <1ms (>182x faster)
1 parent 3aefba7 commit fba644f

15 files changed

Lines changed: 557 additions & 38 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,42 @@ impl PreparedAccessPlan {
396396

397397
Ok(self)
398398
}
399+
400+
/// Apply a row-level offset by creating a [`RowSelection`] that skips the
401+
/// first `remaining_offset` rows across all row groups. Merges with any
402+
/// existing row selection (e.g., from page index pruning).
403+
///
404+
/// This is used when RG-level offset pruning has already skipped whole
405+
/// row groups but a partial offset remains within the first surviving RG.
406+
pub(crate) fn apply_offset(
407+
mut self,
408+
remaining_offset: usize,
409+
rg_metadata: &[RowGroupMetaData],
410+
) -> Self {
411+
if remaining_offset == 0 || self.row_group_indexes.is_empty() {
412+
return self;
413+
}
414+
415+
let total_rows: usize = self
416+
.row_group_indexes
417+
.iter()
418+
.map(|&idx| rg_metadata[idx].num_rows() as usize)
419+
.sum();
420+
let select_rows = total_rows.saturating_sub(remaining_offset);
421+
if select_rows == 0 {
422+
return self;
423+
}
424+
425+
let offset_selection = RowSelection::from(vec![
426+
RowSelector::skip(remaining_offset),
427+
RowSelector::select(select_rows),
428+
]);
429+
self.row_selection = Some(match self.row_selection {
430+
Some(existing) => existing.intersection(&offset_selection),
431+
None => offset_selection,
432+
});
433+
self
434+
}
399435
}
400436

401437
#[cfg(test)]

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct ParquetFileMetrics {
4949
pub row_groups_pruned_bloom_filter: PruningMetrics,
5050
/// Number of row groups pruned due to limit pruning.
5151
pub limit_pruned_row_groups: PruningMetrics,
52+
/// Number of row groups pruned due to offset pruning.
53+
pub offset_pruned_row_groups: PruningMetrics,
5254
/// Number of row groups pruned by statistics
5355
pub row_groups_pruned_statistics: PruningMetrics,
5456
/// Total number of bytes scanned
@@ -113,6 +115,11 @@ impl ParquetFileMetrics {
113115
.with_type(MetricType::Summary)
114116
.pruning_metrics("limit_pruned_row_groups", partition);
115117

118+
let offset_pruned_row_groups = MetricBuilder::new(metrics)
119+
.with_new_label("filename", filename.to_string())
120+
.with_type(MetricType::Summary)
121+
.pruning_metrics("offset_pruned_row_groups", partition);
122+
116123
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
117124
.with_new_label("filename", filename.to_string())
118125
.with_type(MetricType::Summary)
@@ -198,6 +205,7 @@ impl ParquetFileMetrics {
198205
row_groups_pruned_bloom_filter,
199206
row_groups_pruned_statistics,
200207
limit_pruned_row_groups,
208+
offset_pruned_row_groups,
201209
bytes_scanned,
202210
pushdown_rows_pruned,
203211
pushdown_rows_matched,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ pub(super) struct ParquetMorselizer {
9292
pub batch_size: usize,
9393
/// Optional limit on the number of rows to read
9494
pub(crate) limit: Option<usize>,
95+
/// Optional offset (number of rows to skip before reading)
96+
pub(crate) offset: Option<usize>,
9597
/// If should keep the output rows in order
9698
pub preserve_order: bool,
9799
/// Optional predicate to apply during the scan
@@ -281,6 +283,7 @@ struct PreparedParquetOpen {
281283
enable_bloom_filter: bool,
282284
enable_row_group_stats_pruning: bool,
283285
limit: Option<usize>,
286+
offset: Option<usize>,
284287
coerce_int96: Option<TimeUnit>,
285288
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
286289
predicate_creation_errors: Count,
@@ -650,6 +653,7 @@ impl ParquetMorselizer {
650653
enable_bloom_filter: self.enable_bloom_filter,
651654
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
652655
limit: self.limit,
656+
offset: self.offset,
653657
coerce_int96: self.coerce_int96,
654658
expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
655659
predicate_creation_errors,
@@ -1101,6 +1105,24 @@ impl RowGroupsPrunedParquetOpen {
11011105
None
11021106
};
11031107

1108+
// Prune by offset: atomically consume from the shared remaining_offset.
1109+
// Prune by offset: skip leading fully-matched row groups that fall
1110+
// entirely within the offset, so they are never read from disk.
1111+
let remaining_offset = if let Some(offset) = prepared.offset {
1112+
if offset > 0 {
1113+
Some(row_groups.prune_by_offset(
1114+
offset,
1115+
prepared.predicate.is_some(),
1116+
rg_metadata,
1117+
&prepared.file_metrics,
1118+
))
1119+
} else {
1120+
None
1121+
}
1122+
} else {
1123+
None
1124+
};
1125+
11041126
// Prune by limit if limit is set and limit order is not sensitive
11051127
if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) {
11061128
row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics);
@@ -1126,6 +1148,15 @@ impl RowGroupsPrunedParquetOpen {
11261148
// Prepare the access plan (extract row groups and row selection)
11271149
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
11281150

1151+
// Apply remaining offset as RowSelection (partial RG skip).
1152+
// Only when no predicate — with predicates, GlobalLimitExec handles.
1153+
if let Some(remaining_offset) = remaining_offset
1154+
&& remaining_offset > 0
1155+
&& prepared.predicate.is_none()
1156+
{
1157+
prepared_plan = prepared_plan.apply_offset(remaining_offset, rg_metadata);
1158+
}
1159+
11291160
// Potentially reverse the access plan for performance.
11301161
// See `ParquetSource::try_pushdown_sort` for the rationale.
11311162
if prepared.reverse_row_groups {
@@ -1157,7 +1188,21 @@ impl RowGroupsPrunedParquetOpen {
11571188
}
11581189
decoder_builder =
11591190
decoder_builder.with_row_groups(prepared_plan.row_group_indexes);
1160-
if let Some(limit) = prepared.limit {
1191+
1192+
// Adjust limit: original limit is `skip + fetch` from optimizer.
1193+
// Since we handle offset at parquet level (RG prune + RowSelection),
1194+
// only need to read `fetch` rows = `limit - offset`.
1195+
// Only adjust limit when no predicate (offset fully handled here).
1196+
// With predicate, GlobalLimitExec handles offset, decoder needs
1197+
// full limit (skip+fetch) to provide enough rows.
1198+
let effective_limit = match (prepared.limit, prepared.offset) {
1199+
(Some(limit), Some(offset)) if offset > 0 && prepared.predicate.is_none() => {
1200+
Some(limit.saturating_sub(offset))
1201+
}
1202+
(Some(limit), _) => Some(limit),
1203+
_ => None,
1204+
};
1205+
if let Some(limit) = effective_limit {
11611206
decoder_builder = decoder_builder.with_limit(limit);
11621207
}
11631208
if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size {
@@ -1794,6 +1839,7 @@ mod test {
17941839
projection,
17951840
batch_size: self.batch_size,
17961841
limit: self.limit,
1842+
offset: None,
17971843
preserve_order: self.preserve_order,
17981844
predicate: self.predicate,
17991845
table_schema,

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter {
212212
}
213213
}
214214

215+
/// Prune row groups that can be entirely skipped due to offset.
216+
///
217+
/// When an offset is specified, rows at the beginning of the scan must be
218+
/// skipped. This method marks leading fully-matched row groups whose
219+
/// cumulative row count falls within the offset as skipped, so they are
220+
/// never read from disk.
221+
///
222+
/// Returns the remaining offset (number of rows still to skip within the
223+
/// first non-pruned row group).
224+
pub fn prune_by_offset(
225+
&mut self,
226+
offset: usize,
227+
has_predicate: bool,
228+
rg_metadata: &[RowGroupMetaData],
229+
metrics: &ParquetFileMetrics,
230+
) -> usize {
231+
let mut remaining = offset;
232+
let mut pruned_count = 0;
233+
234+
for &idx in self.access_plan.row_group_indexes().iter() {
235+
if remaining == 0 {
236+
break;
237+
}
238+
// We can skip a row group entirely if:
239+
// - No predicate: all rows match, row count is exact
240+
// - Has predicate but is_fully_matched: all rows pass filter
241+
let can_skip = !has_predicate || self.is_fully_matched[idx];
242+
if can_skip {
243+
let rg_rows = rg_metadata[idx].num_rows() as usize;
244+
if remaining >= rg_rows {
245+
self.access_plan.skip(idx);
246+
remaining -= rg_rows;
247+
pruned_count += 1;
248+
} else {
249+
break;
250+
}
251+
} else {
252+
break;
253+
}
254+
}
255+
256+
metrics.offset_pruned_row_groups.add_pruned(pruned_count);
257+
remaining
258+
}
259+
215260
/// Prune remaining row groups to only those within the specified range.
216261
///
217262
/// Updates this set to mark row groups that should not be scanned
@@ -1438,6 +1483,133 @@ mod tests {
14381483
ParquetFileMetrics::new(0, "file.parquet", &metrics)
14391484
}
14401485

1486+
/// Create a RowGroupMetaData with the specified number of rows.
1487+
/// Uses a minimal schema with a single INT32 column.
1488+
fn make_row_group_meta(num_rows: i64) -> RowGroupMetaData {
1489+
let schema_descr = get_test_schema_descr(vec![PrimitiveTypeField::new(
1490+
"id",
1491+
PhysicalType::INT32,
1492+
)]);
1493+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
1494+
.set_num_values(num_rows)
1495+
.build()
1496+
.unwrap();
1497+
RowGroupMetaData::builder(schema_descr)
1498+
.set_num_rows(num_rows)
1499+
.set_total_byte_size(1000)
1500+
.set_column_metadata(vec![column])
1501+
.build()
1502+
.unwrap()
1503+
}
1504+
1505+
/// Helper to build a RowGroupAccessPlanFilter with specified fully_matched flags.
1506+
fn make_filter_with_fully_matched(
1507+
num_rgs: usize,
1508+
fully_matched: Vec<bool>,
1509+
) -> RowGroupAccessPlanFilter {
1510+
assert_eq!(num_rgs, fully_matched.len());
1511+
let access_plan = ParquetAccessPlan::new_all(num_rgs);
1512+
let mut filter = RowGroupAccessPlanFilter::new(access_plan);
1513+
filter.is_fully_matched = fully_matched;
1514+
filter
1515+
}
1516+
1517+
#[test]
1518+
fn test_prune_by_offset_skips_fully_matched_rgs() {
1519+
// 3 RGs each with 100 rows, all fully_matched. offset=250.
1520+
// Should skip 2 RGs (200 rows), remaining=50.
1521+
let rg_metadata: Vec<RowGroupMetaData> =
1522+
(0..3).map(|_| make_row_group_meta(100)).collect();
1523+
let metrics = parquet_file_metrics();
1524+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1525+
1526+
let remaining = filter.prune_by_offset(250, false, &rg_metadata, &metrics);
1527+
assert_eq!(remaining, 50);
1528+
// First two RGs should be skipped, third should still be scanned
1529+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1530+
assert_eq!(indexes, vec![2]);
1531+
}
1532+
1533+
#[test]
1534+
fn test_prune_by_offset_stops_at_non_fully_matched() {
1535+
// 3 RGs each with 100 rows. First two fully_matched, third not.
1536+
// offset=250 → skip 2 RGs (200 rows), remaining=50.
1537+
// Cannot skip the non-fully-matched third RG even though offset
1538+
// still needs more rows skipped.
1539+
let rg_metadata: Vec<RowGroupMetaData> =
1540+
(0..3).map(|_| make_row_group_meta(100)).collect();
1541+
let metrics = parquet_file_metrics();
1542+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, false]);
1543+
1544+
let remaining = filter.prune_by_offset(250, true, &rg_metadata, &metrics);
1545+
assert_eq!(remaining, 50);
1546+
// First two RGs skipped, third still scanned (not fully matched)
1547+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1548+
assert_eq!(indexes, vec![2]);
1549+
}
1550+
1551+
#[test]
1552+
fn test_prune_by_offset_zero() {
1553+
// offset=0 → no pruning, remaining=0.
1554+
let rg_metadata: Vec<RowGroupMetaData> =
1555+
(0..3).map(|_| make_row_group_meta(100)).collect();
1556+
let metrics = parquet_file_metrics();
1557+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1558+
1559+
let remaining = filter.prune_by_offset(0, false, &rg_metadata, &metrics);
1560+
assert_eq!(remaining, 0);
1561+
// All RGs should still be scanned
1562+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1563+
assert_eq!(indexes, vec![0, 1, 2]);
1564+
}
1565+
1566+
#[test]
1567+
fn test_prune_by_offset_exact_boundary() {
1568+
// 3 RGs each 100 rows. offset=200 → skip exactly 2 RGs, remaining=0.
1569+
let rg_metadata: Vec<RowGroupMetaData> =
1570+
(0..3).map(|_| make_row_group_meta(100)).collect();
1571+
let metrics = parquet_file_metrics();
1572+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1573+
1574+
let remaining = filter.prune_by_offset(200, false, &rg_metadata, &metrics);
1575+
assert_eq!(remaining, 0);
1576+
// First two RGs skipped, third still scanned
1577+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1578+
assert_eq!(indexes, vec![2]);
1579+
}
1580+
1581+
#[test]
1582+
fn test_prune_by_offset_exceeds_total() {
1583+
// offset=400 > total 300 rows → skip all fully_matched RGs,
1584+
// remaining = 400 - 300 = 100.
1585+
let rg_metadata: Vec<RowGroupMetaData> =
1586+
(0..3).map(|_| make_row_group_meta(100)).collect();
1587+
let metrics = parquet_file_metrics();
1588+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1589+
1590+
let remaining = filter.prune_by_offset(400, false, &rg_metadata, &metrics);
1591+
assert_eq!(remaining, 100);
1592+
// All RGs should be skipped
1593+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1594+
assert!(indexes.is_empty());
1595+
}
1596+
1597+
#[test]
1598+
fn test_prune_by_offset_partial_rg() {
1599+
// offset=50 (less than first RG of 100 rows) → don't skip any RG,
1600+
// remaining=50.
1601+
let rg_metadata: Vec<RowGroupMetaData> =
1602+
(0..3).map(|_| make_row_group_meta(100)).collect();
1603+
let metrics = parquet_file_metrics();
1604+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1605+
1606+
let remaining = filter.prune_by_offset(50, false, &rg_metadata, &metrics);
1607+
assert_eq!(remaining, 50);
1608+
// No RGs should be skipped since offset < first RG's row count
1609+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1610+
assert_eq!(indexes, vec![0, 1, 2]);
1611+
}
1612+
14411613
#[tokio::test]
14421614
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
14431615
BloomFilterTest::new_data_index_bloom_encoding_stats()

datafusion/datasource-parquet/src/source.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ impl FileSource for ParquetSource {
560560
.batch_size
561561
.expect("Batch size must set before creating ParquetMorselizer"),
562562
limit: base_config.limit,
563+
offset: base_config.offset,
563564
preserve_order: base_config.preserve_order,
564565
predicate: self.predicate.clone(),
565566
table_schema: self.table_schema.clone(),
@@ -583,6 +584,10 @@ impl FileSource for ParquetSource {
583584
}))
584585
}
585586

587+
fn supports_offset(&self) -> bool {
588+
true
589+
}
590+
586591
fn table_schema(&self) -> &TableSchema {
587592
&self.table_schema
588593
}

datafusion/datasource/src/file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,13 @@ pub trait FileSource: Any + Send + Sync {
280280
Ok(SortOrderPushdownResult::Unsupported)
281281
}
282282

283+
/// Whether this source can efficiently skip rows for OFFSET queries
284+
/// (e.g., by skipping entire row groups based on row counts).
285+
/// Default: false.
286+
fn supports_offset(&self) -> bool {
287+
false
288+
}
289+
283290
/// Try to push down a projection into this FileSource.
284291
///
285292
/// `FileSource` implementations that support projection pushdown should

0 commit comments

Comments
 (0)