Skip to content

Commit fb717c5

Browse files
zhuqi-lucasclaude
andcommitted
fix: push OFFSET through CoalescePartitionsExec to DataSourceExec
When a single parquet file is split into multiple byte-range partitions, CoalescePartitionsExec sits between GlobalLimitExec and DataSourceExec. Previously, the optimizer would keep the GlobalLimitExec for the skip, meaning offset wasn't pushed to the parquet reader. Now we try to push the offset through the combining operator to its DataSourceExec child, which uses a shared Arc<AtomicUsize> counter to coordinate offset consumption across partitions. This eliminates the GlobalLimitExec entirely for supported sources. Result on 14GB hits.parquet: OFFSET 99M LIMIT 5 from 3.2s → 29ms. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2028426 commit fb717c5

2 files changed

Lines changed: 19 additions & 4 deletions

File tree

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,25 @@ pub fn pushdown_limit_helper(
216216
// fetch info to plan if possible. If not, we must add a limit node
217217
// with the information from the global state.
218218
let mut new_plan = plan_with_fetch;
219-
// Execution plans can't (yet) handle skip, so if we have one,
220-
// we still need to add a global limit
221219
if global_state.skip > 0 {
222-
new_plan =
223-
add_global_limit(new_plan, global_state.skip, global_state.fetch);
220+
// Try to push offset through the combining operator
221+
// to its child (DataSourceExec).
222+
let offset_pushed = {
223+
let children = new_plan.children();
224+
children.len() == 1
225+
&& children[0].with_offset(global_state.skip).is_some()
226+
};
227+
if offset_pushed {
228+
let child = new_plan.children()[0]
229+
.with_offset(global_state.skip)
230+
.unwrap();
231+
let fallback = Arc::clone(&new_plan);
232+
new_plan =
233+
new_plan.with_new_children(vec![child]).unwrap_or(fallback);
234+
} else {
235+
new_plan =
236+
add_global_limit(new_plan, global_state.skip, global_state.fetch);
237+
}
224238
}
225239
global_state.fetch = skip_and_fetch;
226240
global_state.skip = 0;

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,7 @@ SELECT * FROM tn_offset LIMIT 5 OFFSET 10;
23742374
15 150
23752375

23762376
# Test N.8: OFFSET with multi-partition (target_partitions=4)
2377+
# Small file stays as 1 group, but offset is still pushed down correctly.
23772378
statement ok
23782379
SET datafusion.execution.target_partitions = 4;
23792380

0 commit comments

Comments
 (0)