Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ cargo run --example dataframe -- dataframe

#### Category: Single Process

| Subcommand | File Path | Description |
| --------------- | ------------------------------------------------------------------------------------- | ------------------------------------------ |
| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching |
| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT |
| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE |
| Subcommand | File Path | Description |
| --------------- | ------------------------------------------------------------------------------------- | --------------------------------------------------------------------- |
| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching |
| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT |
| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling |

## SQL Ops Examples

Expand Down
9 changes: 8 additions & 1 deletion datafusion-examples/examples/relation_planner/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@
//! (file: pivot_unpivot.rs, desc: Implement PIVOT / UNPIVOT)
//!
//! - `table_sample`
//! (file: table_sample.rs, desc: Implement TABLESAMPLE)
//! (file: table_sample.rs, desc: Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling)
//!
//! Note: `TABLESAMPLE SYSTEM(p%)` is supported out of the box by the
//! built-in `datafusion_sql::sample::TableSampleSystemPlanner`, which is
//! auto-registered on a default `SessionContext`. The `table_sample`
//! example below shows how to register a *different* planner for the
//! row-level forms (`BERNOULLI`, `ROW`, `BUCKET`) that the built-in
//! intentionally does not handle.
//!
//! ## Snapshot Testing
//!
Expand Down
181 changes: 153 additions & 28 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,48 @@
// specific language governing permissions and limitations
// under the License.

//! # TABLESAMPLE Example
//! # TABLESAMPLE Example — adding row-level sampling on top of the built-in
//!
//! This example demonstrates implementing SQL `TABLESAMPLE` support using
//! DataFusion's extensibility APIs.
//! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the
//! core `Sample` extension node by the auto-registered
//! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the
//! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW`
//! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension,
//! which is what this example demonstrates.
//!
//! This is a working `TABLESAMPLE` implementation that can serve as a starting
//! point for your own projects. It also works as a template for adding other
//! custom SQL operators, covering the full pipeline from parsing to execution.
//! The key composition pattern: when our planner sees `TABLESAMPLE`, it
//! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`)
//! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because
//! `register_relation_planner` prepends to the chain, our planner runs
//! first and the built-in handles whatever we don't. **No SYSTEM
//! reimplementation required.**
//!
//! It shows how to:
//!
//! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`]
//! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`])
//! 3. **Execute** sampling via a custom physical operator ([`SampleExec`])
//! 4. **Compose** with the built-in SYSTEM planner by returning
//! `RelationPlanning::Original` for methods we don't implement
//!
//! ## Supported Syntax
//!
//! ```sql
//! -- Bernoulli sampling (each row has N% chance of selection)
//! -- Bernoulli sampling (each row has N% chance of selection) — this example
//! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT)
//!
//! -- Fractional sampling (0.0 to 1.0)
//! -- Fractional sampling (0.0 to 1.0) — this example
//! SELECT * FROM table TABLESAMPLE (0.1)
//!
//! -- Row count limit
//! -- Row count limit — this example
//! SELECT * FROM table TABLESAMPLE (100 ROWS)
//!
//! -- Reproducible sampling with a seed
//! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)
//!
//! -- SYSTEM (block-level) sampling — handled by the built-in planner
//! -- WITHOUT any code in this example
//! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42)
//! ```
//!
//! ## Architecture
Expand Down Expand Up @@ -81,6 +94,7 @@

use std::{
fmt::{self, Debug, Formatter},
fs::File,
hash::{Hash, Hasher},
pin::Pin,
sync::Arc,
Expand All @@ -98,9 +112,12 @@ use futures::{
stream::{Stream, StreamExt},
};
use rand::{Rng, SeedableRng, rngs::StdRng};
use tempfile::TempDir;
use tonic::async_trait;

use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::{
execution::{
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
Expand All @@ -111,7 +128,9 @@ use datafusion::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
physical_planner::{
DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, SamplePhysicalPlanner,
},
prelude::*,
};
use datafusion_common::{
Expand Down Expand Up @@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> {

let ctx = SessionContext::new_with_state(state);

// Register custom relation planner for logical planning
// Register custom relation planner for logical planning. It's prepended
// to the chain (so it runs *before* the auto-registered built-in
// `TableSampleSystemPlanner`); when our planner returns
// `RelationPlanning::Original`, the built-in handles it.
ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
register_sample_data(&ctx)?;

// Register a parquet-backed copy of the same data so we can demonstrate
// SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources;
// the in-memory `sample_data` table can't absorb a `Sample` node.
let _parquet_dir = register_sample_data_parquet(&ctx).await?;

println!("TABLESAMPLE Example");
println!("===================\n");

Expand Down Expand Up @@ -273,6 +300,34 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
+---------+---------+---------+---------+
");

// Example 7: SYSTEM sampling — handled by the built-in
// `TableSampleSystemPlanner`, **not** by this example's planner.
// Our planner returns `Original` for SYSTEM, so the chain falls
// through. Routed against the parquet-backed copy of the table so
// the `SamplePushdown` rule can absorb the sample into the scan.
// `REPEATABLE(42)` makes the rows deterministic across runs and
// across machines: the parquet sampler keys on the seed plus the
// execution `partition_index` (a stable per-file id), never on the
// on-disk path, so the same query against the same data picks the
// same rows everywhere.
let results = run_example(
ctx,
"Example 7: SYSTEM (handled by the built-in, not this example)",
"SELECT * FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)",
)
.await?;
assert_snapshot!(results, @r"
+---------+---------+
| column1 | column2 |
+---------+---------+
| 6 | row_6 |
| 7 | row_7 |
| 8 | row_8 |
| 9 | row_9 |
| 10 | row_10 |
+---------+---------+
");

Ok(())
}

Expand Down Expand Up @@ -301,6 +356,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
Ok(())
}

/// Register the same data as `sample_data_parquet`, backed by a tempfile
/// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has
/// a `ParquetSource` to push into. Returns the `TempDir` so the caller
/// can keep it alive for the duration of the queries.
async fn register_sample_data_parquet(ctx: &SessionContext) -> Result<TempDir> {
let dir = TempDir::new().map_err(|e| plan_datafusion_err!("tempdir: {e}"))?;
let path = dir.path().join("sample_data.parquet");

let column1: ArrayRef = Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
let column2: ArrayRef = Arc::new(StringArray::from(
(1..=10).map(|i| format!("row_{i}")).collect::<Vec<_>>(),
));
let batch =
RecordBatch::try_from_iter(vec![("column1", column1), ("column2", column2)])?;
let file = File::create(&path).map_err(|e| plan_datafusion_err!("create: {e}"))?;
let mut writer = ArrowWriter::try_new(
file,
batch.schema(),
Some(WriterProperties::builder().build()),
)
.map_err(|e| plan_datafusion_err!("ArrowWriter: {e}"))?;
writer
.write(&batch)
.map_err(|e| plan_datafusion_err!("write: {e}"))?;
writer
.close()
.map_err(|e| plan_datafusion_err!("close: {e}"))?;

ctx.register_parquet(
"sample_data_parquet",
path.to_str().unwrap(),
Default::default(),
)
.await?;
Ok(dir)
}

// ============================================================================
// Logical Planning: TableSamplePlanner + TableSamplePlanNode
// ============================================================================
Expand All @@ -318,7 +410,7 @@ impl RelationPlanner for TableSamplePlanner {
) -> Result<RelationPlanning> {
// Only handle Table relations with TABLESAMPLE clause
let TableFactor::Table {
sample: Some(sample),
sample: Some(kind),
alias,
name,
args,
Expand All @@ -333,23 +425,48 @@ impl RelationPlanner for TableSamplePlanner {
return Ok(RelationPlanning::Original(Box::new(relation)));
};

// Extract sample spec (handles both before/after alias positions)
let sample = match sample {
// Inspect the sample without consuming `kind` yet — we may need to
// hand the relation back unchanged for methods this example doesn't
// implement.
let inspect = match &kind {
ast::TableSampleKind::BeforeTableAlias(s)
| ast::TableSampleKind::AfterTableAlias(s) => s,
| ast::TableSampleKind::AfterTableAlias(s) => s.as_ref(),
};

// Validate sampling method
if let Some(method) = &sample.name
&& *method != TableSampleMethod::Bernoulli
&& *method != TableSampleMethod::Row
{
return not_impl_err!(
"Sampling method {} is not supported (only BERNOULLI and ROW)",
method
// This example handles BERNOULLI / ROW (row-level coin flip) and
// BUCKET (Hive-style modulo). Anything else — most importantly
// SYSTEM / BLOCK — falls through to the next planner so the
// built-in `TableSampleSystemPlanner` (auto-registered via
// `SessionStateBuilder::with_default_features`) can handle it
// with parquet pushdown. **No SYSTEM reimplementation here.**
let we_handle = inspect.bucket.is_some()
|| matches!(
inspect.name,
Some(TableSampleMethod::Bernoulli) | Some(TableSampleMethod::Row) | None
);
if !we_handle {
// Reconstruct the original relation and pass it on.
let original = TableFactor::Table {
sample: Some(kind),
alias,
name,
args,
with_hints,
version,
with_ordinality,
partitions,
json_path,
index_hints,
};
return Ok(RelationPlanning::Original(Box::new(original)));
}

// Extract sample spec (handles both before/after alias positions)
let sample = match kind {
ast::TableSampleKind::BeforeTableAlias(s)
| ast::TableSampleKind::AfterTableAlias(s) => s,
};

// Offset sampling (ClickHouse-style) not supported
if sample.offset.is_some() {
return not_impl_err!(
Expand Down Expand Up @@ -553,8 +670,15 @@ impl Hash for HashableF64 {
// Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner
// ============================================================================

/// Custom query planner that registers [`TableSampleExtensionPlanner`] to
/// convert [`TableSamplePlanNode`] into [`SampleExec`].
/// Custom query planner that registers [`TableSampleExtensionPlanner`]
/// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`])
/// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core
/// `Sample` extension node to its `SampleExec`). Both extension planners
/// coexist: each only handles its own logical node type and returns
/// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here,
/// `TABLESAMPLE SYSTEM` queries that fall through to the built-in
/// `TableSampleSystemPlanner` would fail to plan because
/// `with_extension_planners(...)` *replaces* the defaults.
#[derive(Debug)]
struct TableSampleQueryPlanner;

Expand All @@ -565,9 +689,10 @@ impl QueryPlanner for TableSampleQueryPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
TableSampleExtensionPlanner,
)]);
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![
Arc::new(TableSampleExtensionPlanner),
Arc::new(SamplePhysicalPlanner),
]);
planner
.create_physical_plan(logical_plan, session_state)
.await
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,11 @@ impl SessionStateBuilder {
.get_or_insert_with(Vec::new)
.extend(SessionStateDefaults::default_expr_planners());

#[cfg(feature = "sql")]
self.relation_planners
.get_or_insert_with(Vec::new)
.extend(SessionStateDefaults::default_relation_planners());

self.scalar_functions
.get_or_insert_with(Vec::new)
.extend(SessionStateDefaults::default_scalar_functions());
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::planner::ExprPlanner;
#[cfg(feature = "sql")]
use datafusion_expr::planner::RelationPlanner;
use datafusion_expr::registry::ExtensionTypeRegistrationRef;
use datafusion_expr::{AggregateUDF, HigherOrderUDF, ScalarUDF, WindowUDF};
use std::collections::HashMap;
Expand Down Expand Up @@ -82,6 +84,19 @@ impl SessionStateDefaults {
default_catalog
}

/// Returns the list of default [`RelationPlanner`]s installed by
/// [`Self::default_relation_planners`]. Currently this is just the
/// built-in `TableSampleSystemPlanner`, which lifts
/// `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the core `Sample`
/// extension node so the `SamplePushdown` rule can absorb it into
/// the scan. Other `TABLESAMPLE` flavors are rejected at planning
/// time — register a `RelationPlanner` ahead of this one to add
/// custom semantics.
#[cfg(feature = "sql")]
pub fn default_relation_planners() -> Vec<Arc<dyn RelationPlanner>> {
vec![Arc::new(datafusion_sql::sample::TableSampleSystemPlanner)]
}

/// returns the list of default [`ExprPlanner`]s
pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/optimizer_rule_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ in multiple phases.
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
| 20 | `SamplePushdown` | - | Pushes `TABLESAMPLE` into the source; errors at planning time if the sample can't be absorbed. |
| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
Loading
Loading