feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource#22000
feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource#22000adriangb wants to merge 6 commits intoapache:mainfrom
Conversation
|
I've looked into the history of #16533 a bit. In particular the decision in #16505 to add hooks (#17843) instead of integrating into core directly. The example in #17843 does post-scan sampling, once the IO cost has been paid. This PR attempts to do much more efficient IO level sampling (files, row groups, pages). In order for this to work we need to add APIs in @geoffreyclaude @alamb wdyt? |
Thanks for the ping @adriangb! My take is that the 'RelationPlanner' direction still makes sense here. The point was not that So I think your split makes a lot of sense: put the parquet and physical pushdown in core, but leave the SQL semantics to On adding |
|
Okay makes sense then, we're on the same page 😄. Since you've worked on this before would you like to review this PR? |
|
I'm also happy to split it up:
Up to you... |
2cc8d76 to
f3270f1
Compare
|
I've cleaned up the history into stacked commits in case that's helpful. |
That's super useful, thanks! I'll give it a look tomorrow. |
My personal rationale was that all the different SQL systems did sampling differently -- so any particular choice for sampling is probably fine but I wasn't at all sure there would be enough commonality across implementations to put it into DataFusion core ALso, what is the problem with constructing the access plan by hand? For this type of low level access pattern (particular sampling methods etc) it seems like low level construction is just the escape valve that is needed (super fine grained control) I am very wary of complicating the built in Parquet reader any more -- it is already very complicated with lots of behaviros (and new ones getting added all ghe time, for example the sortedness ones from @zhuqi-lucas and @xudong963 ) So adding APIs to make it easier to extend / modify plans makes sense to me, but hard coding more sampling into the core is much less clear to me |
|
Did you try and implement whatever sampling you need with just the existing APIs? Aka no change to the core? If that wasn't possible, what was missing in the API? |
|
Thanks for reviewing @alamb!
Yes, it won't work sadly. With current APIs you could maybe sample at the file level, but not at the row group or page level (which requires reading parquet footers for all files, parsing them, etc). The way I implemented things here that is lazily deferred to file opening so there is no wasted work. Also requiring doing rust bits to make it work precludes this working with e.g. On the ExecutionPlan level unless we add some APIs there would be a lot of tight coupling (mainly via downcast matching on both ends) between the custom optimizer rule, the custom physical plan and the data source. I.e. the optimizer rule needs to downcast match every plan in the path between the |
|
The actual changes to |
I agree it is a complex piece of software but I think we can continue to add the right abstractions and simplifications (like you recently did with the moralization work 😄 ). Ultimately the file reader is going to be a key piece of a data toolkit like DataFusion so it's unsurprising (to me) that it holds a lot of the complexity. |
yeah -- maybe I am over sensitive as I feel like as soon as we are able to refactor away some of the complexity then it get all complicated again 😆 |
No you are right: it is a big risk that this code turns into feature spaghetti. It's just not one I think we can necessarily avoid. We should be cautious about introducing complexity and push back (like you have here) but if this is the right place to put it and we can factor it into a shape that only adds complexity, not multiplies or exponentiates it, then maybe we just need to deal with it over time. |
66d2fc2 to
0661eba
Compare
Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@alamb I've restructured some more and pulled out new code into 2026412#diff-bbd611d7b35d7f17633eebbf32a07dc9e394f20135754ed949751e8030049e38 |
|
Regarding how other systems implement this: it seems most columnar, analytical DBs are approximate for the same reasons we are: https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-sampling
https://docs.cloud.google.com/bigquery/docs/table-sampling
https://duckdb.org/docs/current/sql/samples
I think it's also worth highlighting that DuckDB has the behavior built in. In other words @alamb: if we merged 2026412 that's at least enough for someone like me to make it work in my system / in future DataFusion use cases. But I think it's valuable to take it one step further and expose this to users of |
geoffreyclaude
left a comment
There was a problem hiding this comment.
I did a quick pass and raised a few minor issues. Given most issues are with commits which build over the initial Parquet only change, I'd suggest extracting the first commit to a dedicated PR as that brings immediate value and can be reviewed and merged on it's own.
Keep this one open for context though.
| pub(crate) system_target_remaining: Option<f64>, | ||
| /// Optional `REPEATABLE(seed)` value plumbed through from | ||
| /// `TABLESAMPLE`. When set, the row-group and row-fraction | ||
| /// samplers ignore the file path and key only off the seed + |
There was a problem hiding this comment.
Should we reconsider ignoring the file path entirely for REPEATABLE? I agree we want to avoid environment specificities (as full paths) to be reproducible across environments, but keying only on (seed, row_group_index, fraction, cluster_size) means identical file layout will select the same row groups / row windows, creating cross-file correlation.
How about instead of full file path using the file index or some other stable id, so REPEATABLE stays reproducible without making files correlate?
| )?); | ||
| )?; | ||
|
|
||
| // SYSTEM-mode adaptive split: when the SamplePushdown rule |
There was a problem hiding this comment.
These two comments have a lot of duplication. How about keeping only the large inner block for details?
| let seed = hasher.finish(); | ||
| let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); | ||
|
|
||
| let target_rows = ((total_rows as f64) * fraction).ceil().max(1.0) as usize; |
There was a problem hiding this comment.
This code seems pretty risky and error-prone, especially with regards to possible duplicate sampling on edge cases. Can you extract it to a dedicated function and fuzz it to ensure there aren't any window overlaps, out of bounds, general weirdness?
| // The built-in planner only handles SYSTEM (and BLOCK as an | ||
| // alias for SYSTEM, matching Hive). Anything else is a | ||
| // semantics commitment we don't want to make in core. | ||
| Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) | None => {} |
There was a problem hiding this comment.
The None arm means the default if unspecified is SYSTEM: is this what we want?
|
|
||
| `REPEATABLE(seed)` mixes the seed into every random draw, so all | ||
| levels produce the same selection across runs. The selection also | ||
| depends on the file name, the row-group index within the file, and |
There was a problem hiding this comment.
It currently actually doesn't depend on file name. See my related comment for a suggested dependency of file index instead.
| Pushdown::Pushed(new_child) => Ok(Transformed::yes(new_child)), | ||
| Pushdown::Failed(reason) => { | ||
| datafusion_common::plan_err!( | ||
| "TABLESAMPLE is not supported for this source: {reason}. \ |
There was a problem hiding this comment.
source here can be confusing if pushdown failed at an intermediary node. How about: "TABLESAMPLE could not be pushed down: {reason}"?
| // through. Routed against the parquet-backed copy of the table so | ||
| // the `SamplePushdown` rule can absorb the sample into the scan. | ||
| // `REPEATABLE(42)` makes the rows deterministic across runs and | ||
| // across machines (the seed dominates the file path in the |
There was a problem hiding this comment.
Same comment issue as in other places with REPEATABLE which currently ignores file path.
|
Thanks @geoffreyclaude ! I broke out #22024 and addressed the comments relevant to that bit there and cherry picked back to here. |
Thank you Basically I really want to get DataFusion to the top of ClickBench, so I am trying to focus on getting that done before adding more features (that isn't to say we shouldn't add more features, I am just giving my context) |
Will review this later today |
Two changes responding to review on the parent commit: 1. Key sampling on a stable `file_index` instead of `file_name` (apache#22000 (comment)). Both `apply_row_group_sampling` and `apply_row_fraction_sampling` now take `file_index: usize` rather than `file_name: &str`. The parquet opener passes the execution `partition_index`. This makes sampling reproducible across environments (no dependency on the on-disk path), while still decorrelating files assigned to different partitions. 2. Extract the row-window selection into `build_row_window_selectors` and add fuzz coverage (apache#22000 (comment)). The previous inline arithmetic could produce overlapping windows when `target_rows` was close to `total_rows`: `window_size = ceil(target / n_windows)` could exceed `stride = total / n_windows`, so adjacent strides' windows would intersect. The extracted function caps `window_size` at `stride` (the construction that guarantees disjointness) and is covered by: * `row_window_selection_basic_layout` — hand-checked anchor case. * `row_window_selection_returns_none_on_invalid_input` — degenerate inputs return `None` cleanly. * `row_window_selection_full_target_no_overlap` — the previously buggy `target_rows == total_rows` case. * `row_window_selection_fuzz_invariants` — 5 000 randomized `(total_rows, target_rows, cluster_size, seed)` configurations, asserting full coverage, in-bounds positions, and no overlap. * `row_window_selection_fuzz_determinism` — 1 000 iterations verifying identical seeds produce identical layouts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-ups to the cherry-picked refactor that landed the file_index keying: * Reject `TABLESAMPLE` without an explicit method instead of silently treating it as `SYSTEM` (apache#22000 (comment)). PostgreSQL requires an explicit method and Spark defaults to block-level; picking one here in core would commit to semantics callers may not want. Added an slt case to lock the new error. * Rephrase the `SamplePushdown` planning error from "TABLESAMPLE is not supported for this source" to "TABLESAMPLE could not be pushed down" since the failure may originate at any node along the passthrough chain, not just the leaf source (apache#22000 (comment)). Updated the slt assertion to match. * Dedupe the SYSTEM-mode adaptive split comments in the parquet opener; the outer block now covers determinism and the inner block covers the row-group-vs-row split math without overlap (apache#22000 (comment)). * Update the `select.md` and `relation_planner/table_sample.rs` REPEATABLE wording to reflect that sampling now keys on the execution `partition_index`, not the on-disk file path (apache#22000 (comment) and #discussion_r3187445171). * Replace the opener-level "REPEATABLE ignores file name" test with a "sampling keys on partition_index" test that verifies same partition_index → same selection regardless of file name and different partition_index → uncorrelated samples. Added `with_partition_index` to the test builder. * Refresh the `run_examples-7` snapshot to match the new seed mix (the per-row-group hash now folds in the optional REPEATABLE seed alongside `file_index`; deterministic but a different draw). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the cross-cutting infrastructure for pushing TABLESAMPLE-shaped
sampling into file sources, with parquet as the first absorbing
source. There is no SQL surface yet; this commit only ships the
primitives. Wiring a RelationPlanner / ExtensionPlanner so it works
out of the box from SQL is the next commit in this stack.
- `Sample` `UserDefinedLogicalNodeCore` extension node in
`datafusion-expr` (`logical_plan/sample.rs`). Schema-preserving;
validates `fraction ∈ (0, 1]`. Currently encodes
`SampleMethod::System` only.
- `SampleExec` placeholder in `datafusion-physical-plan`. Errors at
`execute` (it's a marker — the `SamplePushdown` rule is expected
to remove it). Implements filter / sort pushdown passthrough so
unrelated optimizer rules see straight through it.
- New `try_push_sample` method on `ExecutionPlan` and `FileSource`,
returning `Absorbed { inner }` / `Passthrough` / `Unsupported
{ reason }`. Default is `Unsupported`; per-node `Passthrough`
overrides on filter, projection, coalesce_batches,
coalesce_partitions, repartition, and non-fetch sort.
- `ParquetSource::try_push_sample` runs the (intentionally private)
hierarchical block-level reduction across files / row groups /
rows, with adaptive collapse when an axis can't reduce. Coordinates
with the opener via `pub(crate)` `system_target_remaining` and
`seed` fields on `ParquetSampling`. Single-file, single-row-group
inputs hit ~p × N rows instead of undershooting at p^(1/3) × N.
- `REPEATABLE(seed)` is plumbed all the way through: when set,
`ParquetSampling::apply_row_group_sampling` and
`apply_row_fraction_sampling` key only on `(seed, ...)` and ignore
the file path, so the same query is reproducible across
environments.
- `SamplePushdown` optimizer rule (between `PushdownSort` and
`EnsureCooperative`) walks top-down. On `Absorbed` it replaces
`SampleExec` with the rebuilt source; on `Passthrough` it pushes
through the single-child node and recurses; on `Unsupported` it
errors at planning time with `"TABLESAMPLE is not supported for
this source"`. There is intentionally no generic post-scan
`SampleExec` yet.
- EXPLAIN visibility: `ParquetSource::fmt_extra` surfaces
`sample_system_target_remaining` when set.
- `optimizer_rule_reference.md` updated to list `SamplePushdown` in
the documented rule order.
- `explain.slt` updated with `physical_plan after SamplePushdown SAME
TEXT AS ABOVE` lines under each verbose-explain test.
Tests: 7 unit tests on `ParquetSource::try_push_sample` covering the
pushdown contract (full / single-file / multi-file / target clamping
/ REPEATABLE determinism / multi-file rounding compensation), and 3
opener end-to-end tests covering the adaptive split for single vs
multi row group inputs and REPEATABLE-seed reproducibility across
file paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires SQL `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the infrastructure from the previous commit so it works on a default `SessionContext` (and therefore in `datafusion-cli` and the sqllogictest harness) without any extra registration. - New `datafusion_sql::sample::TableSampleSystemPlanner` (`RelationPlanner`). Lifts `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` to the core `Sample` extension node. Other forms (`BERNOULLI`, `ROW` count, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected at planning time with errors that point at registering a custom `RelationPlanner` ahead of this one. - New public `SamplePhysicalPlanner` (`ExtensionPlanner`) in `datafusion::physical_planner`. Lowers `Sample` to `SampleExec`. Pre-registered in `DefaultPhysicalPlanner::default()` so the default query planner handles it. - `SessionStateDefaults::default_relation_planners()` returns the built-in planner; `SessionStateBuilder::with_default_features()` installs it. Both gated behind the `sql` feature so `--no-default-features` builds keep working. - `register_relation_planner` already prepends to the chain, so any user-supplied planner runs first and can return `Original` to fall through to the built-in for SYSTEM. That composition is the intended override mechanism. End-to-end coverage: - New `datafusion/sqllogictest/test_files/tablesample.slt` exercises the path a user gets out of the box: `SYSTEM(100)`, `SYSTEM(50) REPEATABLE(42)` deterministic count, EXPLAIN absorbed into ParquetSource, every rejected form, and the planning-time error for sources that don't implement `try_push_sample` (CSV). Docs: - `docs/source/user-guide/sql/select.md` gains a `TABLESAMPLE clause` section explaining what it is, the SYSTEM vs BERNOULLI tradeoff, the parquet implementation strategy, deterministic seeds, the EXPLAIN format, and the list of rejected forms. - `docs/source/library-user-guide/extending-sql.md` reframes the existing TABLESAMPLE example as the way to add additional flavours on top of the built-in SYSTEM planner. - `datafusion-examples/examples/relation_planner/main.rs` carries a matching note in its module docs. - `datafusion-examples/README.md` regenerated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The relation_planner/table_sample.rs example previously errored on TABLESAMPLE methods other than BERNOULLI / ROW. After commit 3 of this stack registers a built-in TableSampleSystemPlanner by default, the example's planner runs *first* in the chain, so its error short-circuits SYSTEM and prevents the built-in from handling it. Reshape the example to use the chain composition pattern instead: - The planner now returns `RelationPlanning::Original` for methods it doesn't implement (SYSTEM, BLOCK, anything else), so the built-in picks up the slack. **No SYSTEM reimplementation in the example.** - The example's `TableSampleQueryPlanner` registers both `TableSampleExtensionPlanner` (this example's `TableSamplePlanNode` → its own `SampleExec`) **and** `SamplePhysicalPlanner` (core `Sample` → core `SampleExec`), since `with_extension_planners(...)` replaces the defaults. - Adds a parquet-backed `sample_data_parquet` table so SYSTEM has a source it can push into (the existing in-memory `sample_data` doesn't implement `try_push_sample`). - New Example 7 demonstrates `TABLESAMPLE SYSTEM (50) REPEATABLE (42)` against the parquet table; the row count is asserted (the exact rows shift with the tempdir path that gets hashed into the per-file seed, but the count from the cube-root math is stable). - Module docstring rewritten to lead with the composition story. The 3 existing relation_planner example tests (match_recognize, pivot_unpivot, table_sample) still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes responding to review on the parent commit: 1. Key sampling on a stable `file_index` instead of `file_name` (apache#22000 (comment)). Both `apply_row_group_sampling` and `apply_row_fraction_sampling` now take `file_index: usize` rather than `file_name: &str`. The parquet opener passes the execution `partition_index`. This makes sampling reproducible across environments (no dependency on the on-disk path), while still decorrelating files assigned to different partitions. 2. Extract the row-window selection into `build_row_window_selectors` and add fuzz coverage (apache#22000 (comment)). The previous inline arithmetic could produce overlapping windows when `target_rows` was close to `total_rows`: `window_size = ceil(target / n_windows)` could exceed `stride = total / n_windows`, so adjacent strides' windows would intersect. The extracted function caps `window_size` at `stride` (the construction that guarantees disjointness) and is covered by: * `row_window_selection_basic_layout` — hand-checked anchor case. * `row_window_selection_returns_none_on_invalid_input` — degenerate inputs return `None` cleanly. * `row_window_selection_full_target_no_overlap` — the previously buggy `target_rows == total_rows` case. * `row_window_selection_fuzz_invariants` — 5 000 randomized `(total_rows, target_rows, cluster_size, seed)` configurations, asserting full coverage, in-bounds positions, and no overlap. * `row_window_selection_fuzz_determinism` — 1 000 iterations verifying identical seeds produce identical layouts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-ups to the cherry-picked refactor that landed the file_index keying: * Reject `TABLESAMPLE` without an explicit method instead of silently treating it as `SYSTEM` (apache#22000 (comment)). PostgreSQL requires an explicit method and Spark defaults to block-level; picking one here in core would commit to semantics callers may not want. Added an slt case to lock the new error. * Rephrase the `SamplePushdown` planning error from "TABLESAMPLE is not supported for this source" to "TABLESAMPLE could not be pushed down" since the failure may originate at any node along the passthrough chain, not just the leaf source (apache#22000 (comment)). Updated the slt assertion to match. * Dedupe the SYSTEM-mode adaptive split comments in the parquet opener; the outer block now covers determinism and the inner block covers the row-group-vs-row split math without overlap (apache#22000 (comment)). * Update the `select.md` and `relation_planner/table_sample.rs` REPEATABLE wording to reflect that sampling now keys on the execution `partition_index`, not the on-disk file path (apache#22000 (comment) and #discussion_r3187445171). * Replace the opener-level "REPEATABLE ignores file name" test with a "sampling keys on partition_index" test that verifies same partition_index → same selection regardless of file name and different partition_index → uncorrelated samples. Added `with_partition_index` to the test builder. * Refresh the `run_examples-7` snapshot to match the new seed mix (the per-row-group hash now folds in the optional REPEATABLE seed alongside `file_index`; deterministic but a different draw). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>



Which issue does this PR close?
datafusion.execution.collect_statisticson wide tables #21624 (datafusion.execution.collect_statisticson wide tables). This PR is independent of (and lands cleanly without) the API proposal in Query-aware statistics requests via ScanArgs / ScanResult (RFC for #21624) #21996 — they're orthogonal building blocks.SYSTEMmethod (BERNOULLI/ROWcounts /BUCKETare rejected at planning time with a pointer toRelationPlannerfor custom semantics).Rationale for this change
DataFusion has the machinery for fine-grained parquet sampling (
ParquetAccessPlanwithSkip/Scan/Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it intoPartitionedFile.extensions, and no SQL surface at all. That works for one-off code but is awkward for:TABLESAMPLESQL — most users want a sample by writingSELECT … FROM t TABLESAMPLE SYSTEM (10), not by reaching intoParquetSourcebuilders.EXPLAIN ANALYZE-driven debug runs against a representative slice instead of the full table.Two prior in-core attempts at TABLESAMPLE were closed without merging on semantics-fragmentation grounds: #16505 (Spark-style row-level + Poisson) and #16325 (
random()-rewrite, which also hit a correctness bug fromrandom()getting pushed into the parquet executor — fixed separately in #16545). The discussion in #13563 settled on "use the RelationPlanner extension API" as the way forward, and #17843 (merged Dec 9 2025) shipped that API specifically with TABLESAMPLE as the canonical motivating example. The blog post Extending SQL in DataFusion: from ->> to TABLESAMPLE documents that pattern.This PR adds two layers in one go:
ParquetSource— opt-in, additive, no behavior change for existing scans.RelationPlanner(auto-registered) that liftsTABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]to aSampleextension node; aSamplePushdownoptimizer rule pushes it intoParquetSourcevia the cube-root hybrid across files / row groups / rows. BothSessionStateBuilder::with_default_features()andDefaultPhysicalPlanner::default()register the necessary glue, sodatafusion-cliand any defaultSessionContextgetTABLESAMPLE SYSTEMout of the box. Downstream consumers can register aRelationPlannerahead of the built-in to add other flavors (BERNOULLI,ROW,BUCKET); the existingrelation_planner/table_sample.rsexample shows that pattern.What changes are included in this PR?
Layer 1 — ParquetSource sampling builders
with_row_group_sampling(fraction):(file_name, row_group_count, fraction)— re-runs match.max(1, ceil(N * fraction))).fraction >= 1.0.with_row_fraction(fraction):ceil(target / cluster_size).RowSelectionper kept row group; the parquet reader uses the page index to read only the data pages covering the selected rows. This gives "page-level" IO savings without requiring per-column page alignment (which doesn't exist in parquet).(file_name, row_group_index, fraction, cluster_size).The two layers compose:
row_group_fraction = 0.1×row_fraction = 0.1reads ~1% of the rows from ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group.Layer 2 — TABLESAMPLE SYSTEM SQL → cube-root pushdown
SampleUserDefinedLogicalNodeCoreextension node indatafusion-expr(logical_plan/sample.rs). Schema-preserving; validatesfraction ∈ (0, 1]. EncodesSampleMethod::Systemonly — by design, sinceBERNOULLI/BUCKET/ row-count semantics differ across DBs.datafusion_sql::sample::TableSampleSystemPlannerRelationPlanner, public, auto-registered viaSessionStateDefaults::default_relation_planners()fromSessionStateBuilder::with_default_features(). HandlesTABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]; rejectsBERNOULLI,ROWcount,BUCKET ... OUT OF ...,OFFSETwith errors that explicitly point users at registering a customRelationPlannerfirst.register_relation_plannerprepends, so a user-supplied planner that returnsPlannedfor the same syntax wins; returningOriginalfalls through to the built-in.SampleExecindatafusion-physical-plan. Errors atexecute(it is a marker node, not an executor — theSamplePushdownrule is expected to remove it). Implements filter/sort pushdown passthrough so unrelated optimizer rules see straight through it.ExtensionPlanner: new publicSamplePhysicalPlannerindatafusion::physical_planner, pre-registered inDefaultPhysicalPlanner::default(). LowersSample→SampleExec. Callers usingwith_extension_planners(...)replace the defaults and need to re-addSamplePhysicalPlannerif they want sampling.try_push_samplemethod onExecutionPlanandFileSource, returningAbsorbed { inner }/Passthrough/Unsupported { reason }. Passthrough overrides on filter, projection, coalesce, repartition, and non-fetch sort.ParquetSource::try_push_sampleruns the cube-root hybrid:q = p^(1/3)applied at all three levels so the IO win at small fractions doesn't concentrate at one granularity. Returnskeep_files: Option<Vec<usize>>that the rule uses to rebuildFileScanConfig.file_groups.SamplePushdownoptimizer rule (betweenPushdownSortandEnsureCooperative) walks top-down. OnAbsorbedit replacesSampleExecwith the rebuilt source; onPassthroughit pushes through the single-child node and recurses; onUnsupportedit errors at planning time. There is intentionally no generic post-scanSampleExecyet — that's a follow-up.ParquetSource::fmt_extrasurfacessample_row_group_fractionandsample_row_fractionwhen set, so EXPLAIN reflects the pushdown.How the override path works
SessionStateBuilder::register_relation_plannerinserts at the front of the planner chain. The first planner to returnPlannedwins; returningOriginalfalls through. So a downstream user who wantsBERNOULLI(or anything else) can:MyTableSamplePlannerruns first; if it doesn't handle the input it returnsOriginaland the built-inTableSampleSystemPlannergets the next look. This is the same composition the existingrelation_planner/table_sample.rsexample demonstrates.Internals
ParquetSamplingstruct (re-exported at the crate root).ParquetMorselizer→PreparedParquetOpen.opener.rs—apply_row_group_samplingandapply_row_fraction_sampling— invoked fromprune_row_groupsright aftercreate_initial_plan.randwith thesmall_rngfeature (already in workspace Cargo.toml).Are these changes tested?
Layer 1 — ParquetSource sampling builders. 7 tests in
datafusion-datasource-parquet::opener::test:row_group_sampling_keeps_target_count—ceil(N * fraction)math.row_group_sampling_is_deterministic— same inputs, same selection.row_group_sampling_differs_per_file— different file_name, different sample.row_group_sampling_no_op_when_fraction_is_one— fraction >= 1.0 keeps everything.row_group_sampling_target_at_least_one—fraction = 0.001over 100 row groups still keeps 1.row_group_sampling_end_to_end— writes a 4-row-group parquet toInMemory, scans withfraction = 0.5, asserts exactly 6 rows out (2 row groups × 3 rows).row_fraction_end_to_end— writes a 100-row single-row-group parquet, scans withrow_fraction = 0.1andcluster_size = 4, asserts the result is in the(1, 16]range.Layer 2 — pushdown contract. 4 unit tests on the
Samplelogical node + 7 unit tests onParquetSource::try_push_sampledirectly:try_push_sample_system_full_is_noop—fraction >= 1.0doesn't configure any sampling.try_push_sample_system_configures_cube_root_on_source— bothrow_group_fractionandrow_fractionare set tocbrt(p).try_push_sample_system_drops_files_for_multi_file_scan— file-level keep_files is the right size and sorted/unique.try_push_sample_system_keeps_at_least_one_file— never drops to zero.try_push_sample_system_skips_file_drop_for_single_file— single-file scans returnkeep_files = None.try_push_sample_system_repeatable_seed_is_deterministic— same seed → same selection; different seeds → different selection.try_push_sample_system_target_clamped_to_num_files— whentarget == num_files, returnsNoneto skip the rebuild.Layer 2 — end-to-end SQL. New
datafusion/sqllogictest/test_files/tablesample.sltruns against a defaultSessionContext, exercising the same pathdatafusion-cliusers get:SYSTEM(100)returns every row.SYSTEM(50) REPEATABLE(42)is deterministic — re-runs match. Asserts the cube-root math gives 813 rows for a single-file / single-row-group parquet.EXPLAINconfirmsSampleExecis gone andParquetSourceadvertises its sampling config (sample_row_group_fraction=0.7937, sample_row_fraction=0.7937).BERNOULLI,ROWcount, fraction out of range) produces the right error.cargo build --workspace,cargo fmt --all, andcargo clippy --workspace --exclude datafusion-benchmarks --all-targets -- -D warningsare clean.Are there any user-facing changes?
SELECT … FROM t TABLESAMPLE SYSTEM (p) [REPEATABLE (seed)]works against any plan that bottoms out at aParquetSourceon a defaultSessionContext.datafusion-clipicks this up with no flag changes. Other forms error at planning time with messages that point at registering a customRelationPlannerfirst.ParquetSource:with_row_group_sampling,with_row_fraction,with_row_cluster_size,sampling(), plus theParquetSamplingstruct.datafusion-expr:Sampleextension node +SampleMethod::System+sample_planhelper.datafusion-sql:sample::TableSampleSystemPlanner(the built-inRelationPlanner).ExecutionPlanandFileSource:try_push_sample(defaultUnsupported), and theSamplePushdownResult/SampleSpec/SampleMethod/FileSourceSampleResulttypes indatafusion-physical-plananddatafusion-datasource.SamplePhysicalPlannerindatafusion::physical_planner. Pre-registered inDefaultPhysicalPlanner::default(). Callers usingwith_extension_planners(...)replace the defaults and need to re-add it.SamplePushdownoptimizer rule in the default physical pipeline. No-op when noSampleExecis in the plan; errors at planning time if aSampleExecexists and can't be pushed (no generic post-scan executor yet).RelationPlannerregistered bySessionStateBuilder::with_default_features(). A user-suppliedRelationPlannerregistered viaregister_relation_plannerruns first and can override or supplement built-in semantics.TABLESAMPLEare unaffected.