Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
unique_key_and_multi_partitions_hash_aggregate,
),
t("filter_pushdown_unique_key", filter_pushdown_unique_key),
t("prefilter_chunks_shared_scan", prefilter_chunks_shared_scan),
t("divide_by_zero", divide_by_zero),
t(
"filter_multiple_in_for_decimal",
Expand Down Expand Up @@ -7438,6 +7439,37 @@ async fn filter_pushdown_unique_key(service: Box<dyn SqlClient>) -> Result<(), C
}
}

async fn prefilter_chunks_shared_scan(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.exec_query("CREATE SCHEMA s").await?;
service
.exec_query("CREATE TABLE s.Versions (a int, b int, val int) unique key (a, b)")
.await?;

service
.exec_query(
"INSERT INTO s.Versions (a, b, val, __seq) VALUES (1, 1, 10, 1), (2, 2, 20, 2), (3, 3, 5, 3)",
)
.await?;
// Newer versions of (1, 1) and (3, 3) in another in-memory chunk.
service
.exec_query("INSERT INTO s.Versions (a, b, val, __seq) VALUES (1, 1, 30, 4), (3, 3, 50, 5)")
.await?;

// Two scans of the same index over the same in-memory chunks: one carries a dedup-safe
// key filter (a = 1), the other none. The worker must not trim the shared chunks by the
// first scan's predicate, or the unfiltered branch loses rows. Filtered branch dedups to
// (1,1)->30 = 30; unfiltered branch dedups to 30+20+50 = 100; total = 130.
let query = "SELECT sum(val) FROM (\
SELECT val FROM s.Versions WHERE a = 1 \
UNION ALL \
SELECT val FROM s.Versions\
) t";
let r = service.exec_query(query).await?;
assert_eq!(to_rows(&r), rows(&[(130)]));

Ok(())
}

async fn divide_by_zero(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.exec_query("CREATE SCHEMA s").await?;
service.exec_query("CREATE TABLE s.t(i int, z int)").await?;
Expand Down
12 changes: 11 additions & 1 deletion rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1448,9 +1448,19 @@ impl ClusterImpl {
}

let chunk_load_guard = crate::trace::OpGuard::start(OpKind::ChunkLoad, "chunks.load");
let chunk_id_to_record_batches = self.load_in_memory_chunks(&plan_node).await?;
let mut chunk_id_to_record_batches = self.load_in_memory_chunks(&plan_node).await?;
drop(chunk_load_guard);

// Trim in-memory chunks by the dedup-safe pushable predicate before they cross
// IPC to the select subprocess (which re-applies the same predicate anyway).
let chunk_filter_guard =
crate::trace::OpGuard::start(OpKind::Execution, "chunks.prefilter");
crate::queryplanner::query_executor::filter_in_memory_chunks_for_worker(
&plan_node,
&mut chunk_id_to_record_batches,
);
drop(chunk_filter_guard);

let mut res = None;
#[cfg(not(target_os = "windows"))]
{
Expand Down
42 changes: 41 additions & 1 deletion rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use crate::queryplanner::panic::PanicWorkerSerialized;
use crate::queryplanner::panic::{plan_panic_worker, PanicWorkerNode};
use crate::queryplanner::partition_filter::PartitionFilter;
use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;
use crate::queryplanner::query_executor::{ClusterSendExec, CubeTable, InlineTableProvider};
use crate::queryplanner::query_executor::{
dedup_safe_unique_key_filter, ClusterSendExec, CubeTable, InlineTableProvider,
};
use crate::queryplanner::rolling::RollingWindowAggregateSerialized;
use crate::queryplanner::serialized_plan::PreSerializedPlan;
use crate::queryplanner::serialized_plan::{IndexSnapshot, InlineSnapshot, PartitionSnapshot};
Expand All @@ -70,6 +72,7 @@ use datafusion::logical_expr::{
};
use datafusion::physical_expr::{Distribution, LexRequirement};
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_proto::bytes::Serializeable;
use serde::{Deserialize as SerdeDeser, Deserializer, Serialize as SerdeSer, Serializer};
use serde_derive::Deserialize;
use serde_derive::Serialize;
Expand All @@ -94,6 +97,12 @@ pub struct PlanningMeta {
#[serde(deserialize_with = "de_vec_as_map")]
#[serde(serialize_with = "se_vec_as_map")]
pub multi_part_subtree: HashMap<u64, MultiPartition>,
/// Aligned 1:1 with `indices`: the proto-encoded dedup-safe pushable predicate for
/// each unique-key index scan, used by the worker to trim in-memory chunks before
/// IPC. `None` when the index has no unique key or nothing is pushable. `#[serde(default)]`
/// so plans from an older router (no field) decode as "no pre-filter".
#[serde(default)]
pub pushable_chunk_filters: Vec<Option<Vec<u8>>>,
}

fn se_vec_as_map<S: Serializer>(m: &HashMap<u64, MultiPartition>, s: S) -> Result<S::Ok, S::Error> {
Expand Down Expand Up @@ -194,6 +203,36 @@ pub async fn choose_index_ext(
i.partitions = pick_partitions(i, c, ps)?;
}

// Precompute, per index scan, the dedup-safe pushable predicate so the worker can
// trim in-memory chunks before IPC without re-deriving it from the plan. Encoding
// failures degrade to `None` (no pre-filter); correctness is unaffected since the
// subprocess re-applies the predicate. Reuses exactly the filters that already gate
// partition pruning above.
let pushable_chunk_filters = indices
.iter()
.zip(collector.constraints.iter())
.map(|(i, c)| {
let table = i.table_path.table.get_row();
let key_columns = table.unique_key_columns()?;
let predicate = dedup_safe_unique_key_filter(&c.filters, &key_columns)?;
// Filters reach the scan provider unqualified, and the worker applies the
// predicate against the bare index schema. Strip relation qualifiers so column
// references resolve there.
let predicate = predicate
.transform(|e| {
Ok(match e {
Expr::Column(col) => common::tree_node::Transformed::yes(Expr::Column(
common::Column::new_unqualified(col.name),
)),
other => common::tree_node::Transformed::no(other),
})
})
.map(|t| t.data)
.ok()?;
predicate.to_bytes().ok().map(|b| b.to_vec())
})
.collect::<Vec<_>>();

// We have enough information to finalize the logical plan.
let mut r = ChooseIndex {
chosen_indices: &indices,
Expand Down Expand Up @@ -231,6 +270,7 @@ pub async fn choose_index_ext(
PlanningMeta {
indices,
multi_part_subtree,
pushable_chunk_filters,
},
))
}
Expand Down
112 changes: 100 additions & 12 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::arrow::array::{
Int16Array, Int32Array, Int64Array, MutableArrayData, NullArray, StringArray,
TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::compute::{filter_record_batch, SortOptions};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::ipc::writer::StreamWriter;
Expand Down Expand Up @@ -82,6 +82,7 @@ use datafusion::physical_plan::{
use datafusion::prelude::{and, SessionConfig, SessionContext};
use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_datasource::source::DataSourceExec;
use datafusion_proto::bytes::Serializeable;
use futures_util::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use log::{debug, error, trace, warn};
Expand Down Expand Up @@ -840,17 +841,7 @@ impl CubeTable {
let predicate = if let Some(key_columns) = &unique_key_columns {
// Filters passed to a scan reference only this table's columns, so matching
// by bare column name is enough.
let pushable = filters
.iter()
.filter(|f| {
!f.is_volatile()
&& f.column_refs()
.iter()
.all(|c| key_columns.iter().any(|k| k.get_name() == &c.name))
})
.cloned()
.collect::<Vec<_>>();
combine_filters(&pushable)
dedup_safe_unique_key_filter(filters, key_columns)
} else {
combine_filters(filters)
};
Expand Down Expand Up @@ -2360,6 +2351,103 @@ impl SerializedRecordBatchStream {
/// Combines an array of filter expressions into a single filter expression
/// consisting of the input filter expressions joined with logical AND.
/// Returns None if the filters array is empty.
/// The subset of `filters` that commutes with last-row dedup on a unique-key table:
/// non-volatile and referencing only unique-key columns. Such filters select whole
/// version groups, so applying them below the dedup (or to raw chunks) can't resurrect
/// an overwritten row. Combined into a single predicate, or `None` when nothing is
/// pushable. Shared by the scan-time per-stream `FilterExec` and the worker-side
/// in-memory chunk pre-filter so the two can't diverge.
pub fn dedup_safe_unique_key_filter(filters: &[Expr], key_columns: &[&Column]) -> Option<Expr> {
let pushable = filters
.iter()
.filter(|f| {
!f.is_volatile()
&& f.column_refs()
.iter()
.all(|c| key_columns.iter().any(|k| k.get_name() == &c.name))
})
.cloned()
.collect::<Vec<_>>();
combine_filters(&pushable)
}

fn filter_batch_by_predicate(
predicate: &dyn PhysicalExpr,
batch: &RecordBatch,
) -> Result<RecordBatch, CubeError> {
let mask = predicate.evaluate(batch)?.into_array(batch.num_rows())?;
let mask = mask
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
CubeError::internal(
"Pushable chunk filter did not evaluate to a boolean mask".to_string(),
)
})?;
Ok(filter_record_batch(batch, mask)?)
}

/// Trim loaded in-memory chunks by the per-index pushable predicate computed at planning
/// time, before they are serialized and shipped over IPC to the select subprocess. The
/// subprocess re-applies the same predicate, so this only reduces payload and is never
/// relied on for correctness — any failure is logged and the affected chunks are left
/// untrimmed rather than failing the query. Filtered batches keep their (possibly
/// zero-row) schema.
pub fn filter_in_memory_chunks_for_worker(
plan: &SerializedPlan,
chunks: &mut HashMap<u64, Vec<RecordBatch>>,
) {
let groups = plan.in_memory_chunk_filter_groups();
if groups.is_empty() {
return;
}
let session_state =
QueryPlannerImpl::minimal_session_state_from_final_config(SessionConfig::new()).build();
'group: for (bytes, chunk_ids) in groups {
let predicate = match Expr::from_bytes_with_registry(&bytes, &session_state) {
Ok(p) => p,
Err(e) => {
warn!("Skipping in-memory chunk pre-filter, can't decode predicate: {e}");
continue;
}
};
// All chunks of a group share the index schema, so the physical expression is built
// once (lazily, from the first non-empty batch) and reused across the group.
let mut phys: Option<Arc<dyn PhysicalExpr>> = None;
for id in chunk_ids {
let Some(batches) = chunks.get_mut(&id) else {
continue;
};
for b in batches.iter_mut() {
if b.num_rows() == 0 {
continue;
}
if phys.is_none() {
match b
.schema()
.as_ref()
.clone()
.to_dfschema()
.and_then(|s| session_state.create_physical_expr(predicate.clone(), &s))
{
Ok(p) => phys = Some(p),
Err(e) => {
warn!(
"Skipping in-memory chunk pre-filter, can't build predicate: {e}"
);
continue 'group;
}
}
}
match filter_batch_by_predicate(phys.as_ref().unwrap().as_ref(), b) {
Ok(filtered) => *b = filtered,
Err(e) => warn!("Skipping in-memory chunk {id} pre-filter: {e}"),
}
}
}
}
}

fn combine_filters(filters: &[Expr]) -> Option<Expr> {
if filters.is_empty() {
return None;
Expand Down
53 changes: 53 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,59 @@ impl SerializedPlan {
})
}

/// In-memory chunks of executable partitions grouped by the proto-encoded pushable
/// predicate of their index (from `PlanningMeta::pushable_chunk_filters`). Empty when
/// no index carries a pre-filter. The worker uses this to trim chunks before IPC.
pub fn in_memory_chunk_filter_groups(&self) -> Vec<(Vec<u8>, Vec<u64>)> {
let pushable = &self.planning_meta().pushable_chunk_filters;
if pushable.iter().all(|p| p.is_none()) {
return Vec::new();
}

let executable = |partition: &PartitionSnapshot| {
self.partition_ids_to_execute
.binary_search_by_key(&partition.partition.get_id(), |(id, _)| *id)
.is_ok()
};

// The subprocess shares in-memory batches by chunk id across scans (a self-join or
// self-union of one index yields one snapshot per scan, all pointing at the same
// chunks). Trimming a shared chunk in place would corrupt the rows seen by a scan
// that re-applies a different predicate, or none at all. So count references across
// ALL snapshots (predicate-bearing or not) and only pre-filter chunks referenced by
// exactly one scan.
let mut chunk_counts: HashMap<u64, usize> = HashMap::new();
for index in self.index_snapshots().iter() {
for partition in index.partitions().iter().filter(|p| executable(p)) {
for chunk in partition.chunks() {
if chunk.get_row().in_memory() {
*chunk_counts.entry(chunk.get_id()).or_default() += 1;
}
}
}
}

let mut groups = Vec::new();
for (idx, index) in self.index_snapshots().iter().enumerate() {
let Some(bytes) = pushable.get(idx).and_then(|p| p.as_ref()) else {
continue;
};
let mut chunk_ids = Vec::new();
for partition in index.partitions().iter().filter(|p| executable(p)) {
for chunk in partition.chunks() {
if chunk.get_row().in_memory() && chunk_counts.get(&chunk.get_id()) == Some(&1)
{
chunk_ids.push(chunk.get_id());
}
}
}
if !chunk_ids.is_empty() {
groups.push((bytes.clone(), chunk_ids));
}
}
groups
}

pub fn in_memory_chunks_to_load(&self) -> Vec<(IdRow<Chunk>, IdRow<Partition>, IdRow<Index>)> {
self.list_in_memory_chunks_to_load(|id| {
self.partition_ids_to_execute
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/cubestore/src/sql/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ mod tests {
PlanningMeta {
indices: Vec::new(),
multi_part_subtree: HashMap::new(),
pushable_chunk_filters: Vec::new(),
},
None,
)
Expand Down Expand Up @@ -515,6 +516,7 @@ mod tests {
PlanningMeta {
indices: Vec::new(),
multi_part_subtree: HashMap::new(),
pushable_chunk_filters: Vec::new(),
},
None,
)
Expand Down Expand Up @@ -619,6 +621,7 @@ mod tests {
PlanningMeta {
indices: Vec::new(),
multi_part_subtree: HashMap::new(),
pushable_chunk_filters: Vec::new(),
},
None,
)
Expand Down Expand Up @@ -698,6 +701,7 @@ mod tests {
PlanningMeta {
indices: Vec::new(),
multi_part_subtree: HashMap::new(),
pushable_chunk_filters: Vec::new(),
},
None,
)
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ impl SqlService for SqlServiceImpl {
PlanningMeta {
indices: Vec::new(),
multi_part_subtree: HashMap::new(),
pushable_chunk_filters: Vec::new(),
},
None,
)
Expand Down
Loading