Skip to content

Commit 07d54c4

Browse files
committed
feat: pushdown OFFSET to parquet for RG-level skipping
Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener for single-file, no-filter parquet queries. Skips entire row groups + uses RowSelection for partial RG skip. Design: - with_offset returns Some only when fully handled (single file, no filter, parquet source) → optimizer eliminates GlobalLimitExec - Multi-file and filtered queries: GlobalLimitExec kept (follow-up) - prune_by_offset: skip leading RGs by cumulative row count - RowSelection for remaining offset within first surviving RG - effective_limit = limit - offset for decoder Implementation: - FileSource::supports_offset() (parquet=true, others=false) - FileScanConfig: offset field, with_offset (single file + no filter) - LimitPushdown: push offset, eliminate GlobalLimitExec when accepted - Shared Arc<AtomicUsize> remaining_offset for future multi-file support Benchmark (60M row, 1.5GB single parquet file): OFFSET 59M: 182ms → <1ms (>182x faster)
1 parent 3aefba7 commit 07d54c4

14 files changed

Lines changed: 551 additions & 38 deletions

File tree

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct ParquetFileMetrics {
4949
pub row_groups_pruned_bloom_filter: PruningMetrics,
5050
/// Number of row groups pruned due to limit pruning.
5151
pub limit_pruned_row_groups: PruningMetrics,
52+
/// Number of row groups pruned due to offset pruning.
53+
pub offset_pruned_row_groups: PruningMetrics,
5254
/// Number of row groups pruned by statistics
5355
pub row_groups_pruned_statistics: PruningMetrics,
5456
/// Total number of bytes scanned
@@ -113,6 +115,11 @@ impl ParquetFileMetrics {
113115
.with_type(MetricType::Summary)
114116
.pruning_metrics("limit_pruned_row_groups", partition);
115117

118+
let offset_pruned_row_groups = MetricBuilder::new(metrics)
119+
.with_new_label("filename", filename.to_string())
120+
.with_type(MetricType::Summary)
121+
.pruning_metrics("offset_pruned_row_groups", partition);
122+
116123
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
117124
.with_new_label("filename", filename.to_string())
118125
.with_type(MetricType::Summary)
@@ -198,6 +205,7 @@ impl ParquetFileMetrics {
198205
row_groups_pruned_bloom_filter,
199206
row_groups_pruned_statistics,
200207
limit_pruned_row_groups,
208+
offset_pruned_row_groups,
201209
bytes_scanned,
202210
pushdown_rows_pruned,
203211
pushdown_rows_matched,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub(super) struct ParquetMorselizer {
9292
pub batch_size: usize,
9393
/// Optional limit on the number of rows to read
9494
pub(crate) limit: Option<usize>,
95+
/// Shared remaining offset across all file openers. Each opener
96+
/// atomically consumes part of the offset by skipping rows/RGs.
97+
/// When zero, no more rows need to be skipped.
98+
pub(crate) remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
9599
/// If should keep the output rows in order
96100
pub preserve_order: bool,
97101
/// Optional predicate to apply during the scan
@@ -281,6 +285,8 @@ struct PreparedParquetOpen {
281285
enable_bloom_filter: bool,
282286
enable_row_group_stats_pruning: bool,
283287
limit: Option<usize>,
288+
/// Shared remaining offset — atomically consumed across file openers
289+
remaining_offset: Arc<std::sync::atomic::AtomicUsize>,
284290
coerce_int96: Option<TimeUnit>,
285291
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
286292
predicate_creation_errors: Count,
@@ -650,6 +656,7 @@ impl ParquetMorselizer {
650656
enable_bloom_filter: self.enable_bloom_filter,
651657
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
652658
limit: self.limit,
659+
remaining_offset: Arc::clone(&self.remaining_offset),
653660
coerce_int96: self.coerce_int96,
654661
expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
655662
predicate_creation_errors,
@@ -1101,6 +1108,30 @@ impl RowGroupsPrunedParquetOpen {
11011108
None
11021109
};
11031110

1111+
// Prune by offset: atomically consume from the shared remaining_offset.
1112+
// Each file opener skips as many fully-matched RGs as possible,
1113+
// reducing the shared counter for subsequent file openers.
1114+
let file_offset = prepared
1115+
.remaining_offset
1116+
.load(std::sync::atomic::Ordering::Relaxed);
1117+
let remaining_offset = if file_offset > 0 {
1118+
let remaining = row_groups.prune_by_offset(
1119+
file_offset,
1120+
prepared.predicate.is_some(),
1121+
rg_metadata,
1122+
&prepared.file_metrics,
1123+
);
1124+
// How many rows did we skip at RG level?
1125+
let rg_skipped = file_offset - remaining;
1126+
// Atomically reduce the shared offset
1127+
prepared
1128+
.remaining_offset
1129+
.fetch_sub(rg_skipped, std::sync::atomic::Ordering::Relaxed);
1130+
Some(remaining)
1131+
} else {
1132+
None
1133+
};
1134+
11041135
// Prune by limit if limit is set and limit order is not sensitive
11051136
if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) {
11061137
row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics);
@@ -1126,6 +1157,34 @@ impl RowGroupsPrunedParquetOpen {
11261157
// Prepare the access plan (extract row groups and row selection)
11271158
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
11281159

1160+
// Handle remaining offset via RowSelection — only when there's NO
1161+
// predicate (all rows match, raw row count is accurate for offset).
1162+
// With predicates, GlobalLimitExec handles remaining offset since
1163+
// we can't know how many rows survive the filter.
1164+
if let Some(remaining_offset) = remaining_offset
1165+
&& remaining_offset > 0
1166+
&& prepared.predicate.is_none()
1167+
&& !prepared_plan.row_group_indexes.is_empty()
1168+
{
1169+
let total_rows: usize = prepared_plan
1170+
.row_group_indexes
1171+
.iter()
1172+
.map(|&idx| rg_metadata[idx].num_rows() as usize)
1173+
.sum();
1174+
let select_rows = total_rows.saturating_sub(remaining_offset);
1175+
if select_rows > 0 {
1176+
let offset_selection =
1177+
parquet::arrow::arrow_reader::RowSelection::from(vec![
1178+
parquet::arrow::arrow_reader::RowSelector::skip(remaining_offset),
1179+
parquet::arrow::arrow_reader::RowSelector::select(select_rows),
1180+
]);
1181+
prepared_plan.row_selection = Some(match prepared_plan.row_selection {
1182+
Some(existing) => existing.intersection(&offset_selection),
1183+
None => offset_selection,
1184+
});
1185+
}
1186+
}
1187+
11291188
// Potentially reverse the access plan for performance.
11301189
// See `ParquetSource::try_pushdown_sort` for the rationale.
11311190
if prepared.reverse_row_groups {
@@ -1157,7 +1216,21 @@ impl RowGroupsPrunedParquetOpen {
11571216
}
11581217
decoder_builder =
11591218
decoder_builder.with_row_groups(prepared_plan.row_group_indexes);
1160-
if let Some(limit) = prepared.limit {
1219+
1220+
// Adjust limit: original limit is `skip + fetch` from optimizer.
1221+
// Since we handle offset at parquet level (RG prune + RowSelection),
1222+
// only need to read `fetch` rows = `limit - offset`.
1223+
// Only adjust limit when no predicate (offset fully handled here).
1224+
// With predicate, GlobalLimitExec handles offset, decoder needs
1225+
// full limit (skip+fetch) to provide enough rows.
1226+
let effective_limit = match prepared.limit {
1227+
Some(limit) if file_offset > 0 && prepared.predicate.is_none() => {
1228+
Some(limit.saturating_sub(file_offset))
1229+
}
1230+
Some(limit) => Some(limit),
1231+
_ => None,
1232+
};
1233+
if let Some(limit) = effective_limit {
11611234
decoder_builder = decoder_builder.with_limit(limit);
11621235
}
11631236
if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size {
@@ -1794,6 +1867,7 @@ mod test {
17941867
projection,
17951868
batch_size: self.batch_size,
17961869
limit: self.limit,
1870+
remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
17971871
preserve_order: self.preserve_order,
17981872
predicate: self.predicate,
17991873
table_schema,

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter {
212212
}
213213
}
214214

215+
/// Prune row groups that can be entirely skipped due to offset.
216+
///
217+
/// When an offset is specified, rows at the beginning of the scan must be
218+
/// skipped. This method marks leading fully-matched row groups whose
219+
/// cumulative row count falls within the offset as skipped, so they are
220+
/// never read from disk.
221+
///
222+
/// Returns the remaining offset (number of rows still to skip within the
223+
/// first non-pruned row group).
224+
pub fn prune_by_offset(
225+
&mut self,
226+
offset: usize,
227+
has_predicate: bool,
228+
rg_metadata: &[RowGroupMetaData],
229+
metrics: &ParquetFileMetrics,
230+
) -> usize {
231+
let mut remaining = offset;
232+
let mut pruned_count = 0;
233+
234+
for &idx in self.access_plan.row_group_indexes().iter() {
235+
if remaining == 0 {
236+
break;
237+
}
238+
// We can skip a row group entirely if:
239+
// - No predicate: all rows match, row count is exact
240+
// - Has predicate but is_fully_matched: all rows pass filter
241+
let can_skip = !has_predicate || self.is_fully_matched[idx];
242+
if can_skip {
243+
let rg_rows = rg_metadata[idx].num_rows() as usize;
244+
if remaining >= rg_rows {
245+
self.access_plan.skip(idx);
246+
remaining -= rg_rows;
247+
pruned_count += 1;
248+
} else {
249+
break;
250+
}
251+
} else {
252+
break;
253+
}
254+
}
255+
256+
metrics.offset_pruned_row_groups.add_pruned(pruned_count);
257+
remaining
258+
}
259+
215260
/// Prune remaining row groups to only those within the specified range.
216261
///
217262
/// Updates this set to mark row groups that should not be scanned
@@ -1438,6 +1483,133 @@ mod tests {
14381483
ParquetFileMetrics::new(0, "file.parquet", &metrics)
14391484
}
14401485

1486+
/// Create a RowGroupMetaData with the specified number of rows.
1487+
/// Uses a minimal schema with a single INT32 column.
1488+
fn make_row_group_meta(num_rows: i64) -> RowGroupMetaData {
1489+
let schema_descr = get_test_schema_descr(vec![PrimitiveTypeField::new(
1490+
"id",
1491+
PhysicalType::INT32,
1492+
)]);
1493+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
1494+
.set_num_values(num_rows)
1495+
.build()
1496+
.unwrap();
1497+
RowGroupMetaData::builder(schema_descr)
1498+
.set_num_rows(num_rows)
1499+
.set_total_byte_size(1000)
1500+
.set_column_metadata(vec![column])
1501+
.build()
1502+
.unwrap()
1503+
}
1504+
1505+
/// Helper to build a RowGroupAccessPlanFilter with specified fully_matched flags.
1506+
fn make_filter_with_fully_matched(
1507+
num_rgs: usize,
1508+
fully_matched: Vec<bool>,
1509+
) -> RowGroupAccessPlanFilter {
1510+
assert_eq!(num_rgs, fully_matched.len());
1511+
let access_plan = ParquetAccessPlan::new_all(num_rgs);
1512+
let mut filter = RowGroupAccessPlanFilter::new(access_plan);
1513+
filter.is_fully_matched = fully_matched;
1514+
filter
1515+
}
1516+
1517+
#[test]
1518+
fn test_prune_by_offset_skips_fully_matched_rgs() {
1519+
// 3 RGs each with 100 rows, all fully_matched. offset=250.
1520+
// Should skip 2 RGs (200 rows), remaining=50.
1521+
let rg_metadata: Vec<RowGroupMetaData> =
1522+
(0..3).map(|_| make_row_group_meta(100)).collect();
1523+
let metrics = parquet_file_metrics();
1524+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1525+
1526+
let remaining = filter.prune_by_offset(250, false, &rg_metadata, &metrics);
1527+
assert_eq!(remaining, 50);
1528+
// First two RGs should be skipped, third should still be scanned
1529+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1530+
assert_eq!(indexes, vec![2]);
1531+
}
1532+
1533+
#[test]
1534+
fn test_prune_by_offset_stops_at_non_fully_matched() {
1535+
// 3 RGs each with 100 rows. First two fully_matched, third not.
1536+
// offset=250 → skip 2 RGs (200 rows), remaining=50.
1537+
// Cannot skip the non-fully-matched third RG even though offset
1538+
// still needs more rows skipped.
1539+
let rg_metadata: Vec<RowGroupMetaData> =
1540+
(0..3).map(|_| make_row_group_meta(100)).collect();
1541+
let metrics = parquet_file_metrics();
1542+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, false]);
1543+
1544+
let remaining = filter.prune_by_offset(250, true, &rg_metadata, &metrics);
1545+
assert_eq!(remaining, 50);
1546+
// First two RGs skipped, third still scanned (not fully matched)
1547+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1548+
assert_eq!(indexes, vec![2]);
1549+
}
1550+
1551+
#[test]
1552+
fn test_prune_by_offset_zero() {
1553+
// offset=0 → no pruning, remaining=0.
1554+
let rg_metadata: Vec<RowGroupMetaData> =
1555+
(0..3).map(|_| make_row_group_meta(100)).collect();
1556+
let metrics = parquet_file_metrics();
1557+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1558+
1559+
let remaining = filter.prune_by_offset(0, false, &rg_metadata, &metrics);
1560+
assert_eq!(remaining, 0);
1561+
// All RGs should still be scanned
1562+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1563+
assert_eq!(indexes, vec![0, 1, 2]);
1564+
}
1565+
1566+
#[test]
1567+
fn test_prune_by_offset_exact_boundary() {
1568+
// 3 RGs each 100 rows. offset=200 → skip exactly 2 RGs, remaining=0.
1569+
let rg_metadata: Vec<RowGroupMetaData> =
1570+
(0..3).map(|_| make_row_group_meta(100)).collect();
1571+
let metrics = parquet_file_metrics();
1572+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1573+
1574+
let remaining = filter.prune_by_offset(200, false, &rg_metadata, &metrics);
1575+
assert_eq!(remaining, 0);
1576+
// First two RGs skipped, third still scanned
1577+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1578+
assert_eq!(indexes, vec![2]);
1579+
}
1580+
1581+
#[test]
1582+
fn test_prune_by_offset_exceeds_total() {
1583+
// offset=400 > total 300 rows → skip all fully_matched RGs,
1584+
// remaining = 400 - 300 = 100.
1585+
let rg_metadata: Vec<RowGroupMetaData> =
1586+
(0..3).map(|_| make_row_group_meta(100)).collect();
1587+
let metrics = parquet_file_metrics();
1588+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1589+
1590+
let remaining = filter.prune_by_offset(400, false, &rg_metadata, &metrics);
1591+
assert_eq!(remaining, 100);
1592+
// All RGs should be skipped
1593+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1594+
assert!(indexes.is_empty());
1595+
}
1596+
1597+
#[test]
1598+
fn test_prune_by_offset_partial_rg() {
1599+
// offset=50 (less than first RG of 100 rows) → don't skip any RG,
1600+
// remaining=50.
1601+
let rg_metadata: Vec<RowGroupMetaData> =
1602+
(0..3).map(|_| make_row_group_meta(100)).collect();
1603+
let metrics = parquet_file_metrics();
1604+
let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]);
1605+
1606+
let remaining = filter.prune_by_offset(50, false, &rg_metadata, &metrics);
1607+
assert_eq!(remaining, 50);
1608+
// No RGs should be skipped since offset < first RG's row count
1609+
let indexes: Vec<usize> = filter.row_group_indexes().collect();
1610+
assert_eq!(indexes, vec![0, 1, 2]);
1611+
}
1612+
14411613
#[tokio::test]
14421614
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
14431615
BloomFilterTest::new_data_index_bloom_encoding_stats()

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ impl FileSource for ParquetSource {
560560
.batch_size
561561
.expect("Batch size must set before creating ParquetMorselizer"),
562562
limit: base_config.limit,
563+
remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(
564+
base_config.offset.unwrap_or(0),
565+
)),
563566
preserve_order: base_config.preserve_order,
564567
predicate: self.predicate.clone(),
565568
table_schema: self.table_schema.clone(),
@@ -583,6 +586,10 @@ impl FileSource for ParquetSource {
583586
}))
584587
}
585588

589+
fn supports_offset(&self) -> bool {
590+
true
591+
}
592+
586593
fn table_schema(&self) -> &TableSchema {
587594
&self.table_schema
588595
}

datafusion/datasource/src/file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,13 @@ pub trait FileSource: Any + Send + Sync {
280280
Ok(SortOrderPushdownResult::Unsupported)
281281
}
282282

283+
/// Whether this source fully handles offset at the scan level.
284+
/// When true, the optimizer can eliminate GlobalLimitExec's skip.
285+
/// Default: false (offset handled by GlobalLimitExec).
286+
fn supports_offset(&self) -> bool {
287+
false
288+
}
289+
283290
/// Try to push down a projection into this FileSource.
284291
///
285292
/// `FileSource` implementations that support projection pushdown should

0 commit comments

Comments
 (0)