Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ pub enum WhenMatched {
/// The row is updated (similar to UpdateAll) only for rows where the expression evaluates to
/// true
UpdateIf(String),
/// The row is updated (similar to UpdateAll) only for rows where the expression evaluates to
/// true
UpdateIfExpr(Expr),
Comment thread
Ar-maan05 marked this conversation as resolved.
/// Fail the operation if a match is found
///
/// This can be used to ensure that no existing rows are overwritten or modified after inserted.
Expand All @@ -286,6 +289,10 @@ impl WhenMatched {
// Store the expression string and defer parsing until we know which path to take
Ok(Self::UpdateIf(expr.to_string()))
}

pub fn update_if_expr(expr: Expr) -> Self {
Self::UpdateIfExpr(expr)
}
}

/// Describes how rows should be handled when there is no matching row in the target table
Expand Down Expand Up @@ -1635,6 +1642,7 @@ impl MergeInsertJob {
self.params.when_matched,
WhenMatched::UpdateAll
| WhenMatched::UpdateIf(_)
| WhenMatched::UpdateIfExpr(_)
| WhenMatched::Fail
| WhenMatched::Delete
| WhenMatched::DoNothing
Expand Down Expand Up @@ -2082,22 +2090,37 @@ impl Merger {
} else {
None
};
let match_filter_expr = if let WhenMatched::UpdateIf(expr_str) = &params.when_matched {
let combined_schema = Arc::new(combined_schema(&schema));
let planner = Planner::new(combined_schema.clone());
let expr = planner.parse_filter(expr_str)?;
let expr = planner.optimize_expr(expr)?;
let match_expr = planner.create_physical_expr(&expr)?;
let data_type = match_expr.data_type(combined_schema.as_ref())?;
if data_type != DataType::Boolean {
return Err(Error::invalid_input(format!(
"Merge insert conditions must be expressions that return a boolean value, received a 'when matched update if' expression ({}) which has data type {}",
expr, data_type
)));
let match_filter_expr = match &params.when_matched {
WhenMatched::UpdateIf(expr_str) => {
let combined_schema = Arc::new(combined_schema(&schema));
let planner = Planner::new(combined_schema.clone());
let expr = planner.parse_filter(expr_str)?;
let expr = planner.optimize_expr(expr)?;
let match_expr = planner.create_physical_expr(&expr)?;
let data_type = match_expr.data_type(combined_schema.as_ref())?;
if data_type != DataType::Boolean {
return Err(Error::invalid_input(format!(
"Merge insert conditions must be expressions that return a boolean value, received a 'when matched update if' expression ({}) which has data type {}",
expr, data_type
)));
}
Some(match_expr)
Comment thread
Ar-maan05 marked this conversation as resolved.
Outdated
}
Some(match_expr)
} else {
None
WhenMatched::UpdateIfExpr(expr) => {
let combined_schema = Arc::new(combined_schema(&schema));
let planner = Planner::new(combined_schema.clone());
let expr = planner.optimize_expr(expr.clone())?;
let match_expr = planner.create_physical_expr(&expr)?;
let data_type = match_expr.data_type(combined_schema.as_ref())?;
if data_type != DataType::Boolean {
return Err(Error::invalid_input(format!(
"Merge insert conditions must be expressions that return a boolean value, received a 'when matched update if' expression ({}) which has data type {}",
expr, data_type
)));
}
Some(match_expr)
}
_ => None,
};
let output_schema = if with_row_addr {
Arc::new(schema.try_with_column(ROW_ADDR_FIELD.clone())?)
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/dataset/write/merge_insert/assign_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub fn merge_insert_action(
));
}
}
WhenMatched::UpdateIfExpr(condition) => {
cases.push((
matched.and(condition.clone()),
Action::UpdateAll.as_literal_expr(),
));
}
WhenMatched::DoNothing => {}
WhenMatched::Fail => {
cases.push((matched, Action::Fail.as_literal_expr()));
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/dataset/write/merge_insert/exec/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@ impl DisplayAs for FullSchemaMergeInsertExec {
crate::dataset::WhenMatched::UpdateIf(condition) => {
format!("UpdateIf({})", condition)
}
crate::dataset::WhenMatched::UpdateIfExpr(expr) => {
format!("UpdateIfExpr({})", expr)
}
Comment thread
Ar-maan05 marked this conversation as resolved.
crate::dataset::WhenMatched::Fail => "Fail".to_string(),
crate::dataset::WhenMatched::Delete => "Delete".to_string(),
};
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/merge_insert/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode {
crate::dataset::WhenMatched::DoNothing => "DoNothing",
crate::dataset::WhenMatched::UpdateAll => "UpdateAll",
crate::dataset::WhenMatched::UpdateIf(_) => "UpdateIf",
crate::dataset::WhenMatched::UpdateIfExpr(_) => "UpdateIfExpr",
crate::dataset::WhenMatched::Fail => "Fail",
crate::dataset::WhenMatched::Delete => "Delete",
};
Expand Down
Loading