diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 4b9c69b739a..ed9bdb7292f 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -19,8 +19,8 @@ use lance::dataset::scanner::Scanner; use lance::dataset::statistics::DatasetStatisticsExt; use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{ - Dataset, MergeInsertBuilder, WhenMatched, WhenNotMatched, WhenNotMatchedBySource, WriteMode, - WriteParams, + Dataset, MergeInsertBuilder, UpdateBuilder, WhenMatched, WhenNotMatched, + WhenNotMatchedBySource, WriteMode, WriteParams, }; use lance::index::{DatasetIndexExt, IndexParams, vector::VectorIndexParams}; use lance::session::Session; @@ -48,21 +48,21 @@ use lance_namespace::models::{ BatchDeleteTableVersionsResponse, CountTableRowsRequest, CreateNamespaceRequest, CreateNamespaceResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, CreateTableResponse, CreateTableScalarIndexResponse, CreateTableVersionRequest, - CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, - DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableIndexStatsRequest, - DescribeTableIndexStatsResponse, DescribeTableRequest, DescribeTableResponse, - DescribeTableVersionRequest, DescribeTableVersionResponse, DescribeTransactionRequest, - DescribeTransactionResponse, DropNamespaceRequest, DropNamespaceResponse, - DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, DropTableResponse, - ExplainTableQueryPlanRequest, FragmentStats, FragmentSummary, GetTableStatsRequest, - GetTableStatsResponse, Identity, IndexContent, InsertIntoTableRequest, InsertIntoTableResponse, - ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, - ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse, - ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest, + CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, DeleteFromTableRequest, + DeleteFromTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse, + DescribeTableIndexStatsRequest, DescribeTableIndexStatsResponse, DescribeTableRequest, + DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse, + DescribeTransactionRequest, DescribeTransactionResponse, DropNamespaceRequest, + DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, + DropTableResponse, ExplainTableQueryPlanRequest, FragmentStats, FragmentSummary, + GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, InsertIntoTableRequest, + InsertIntoTableResponse, ListNamespacesRequest, ListNamespacesResponse, + ListTableIndicesRequest, ListTableIndicesResponse, ListTableVersionsRequest, + ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest, MergeInsertIntoTableResponse, NamespaceExistsRequest, QueryTableRequest, QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, RestoreTableResponse, - TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest, - UpdateTableSchemaMetadataResponse, + TableExistsRequest, TableVersion, UpdateTableRequest, UpdateTableResponse, + UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse, }; use lance_core::{Error, Result}; @@ -3841,6 +3841,131 @@ impl LanceNamespace for DirectoryNamespace { }) } + async fn update_table(&self, request: UpdateTableRequest) -> Result { + self.record_op("update_table"); + + if request.updates.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "update_table requires at least one [column, expression] pair".to_string(), + } + .into()); + } + + // Validate every update pair shape and detect duplicate columns up front so we + // surface a clean error instead of failing deep inside the planner. + let mut seen_columns: HashMap = HashMap::with_capacity(request.updates.len()); + for (idx, pair) in request.updates.iter().enumerate() { + if pair.len() != 2 { + return Err(NamespaceError::InvalidInput { + message: format!( + "update_table updates[{}] must be a [column, expression] pair, got {} elements", + idx, + pair.len() + ), + } + .into()); + } + let column = &pair[0]; + if column.trim().is_empty() { + return Err(NamespaceError::InvalidInput { + message: format!("update_table updates[{}] has an empty column name", idx), + } + .into()); + } + if seen_columns.insert(column.clone(), ()).is_some() { + return Err(NamespaceError::InvalidInput { + message: format!( + "update_table cannot update column '{}' more than once", + column + ), + } + .into()); + } + } + + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = Arc::new(self.load_dataset(&table_uri, None, "update_table").await?); + + let mut builder = UpdateBuilder::new(dataset); + for pair in &request.updates { + // Indexing by 0/1 is safe due to the length check above. + builder = builder.set(&pair[0], &pair[1]).map_err(|e| { + lance_core::Error::from(NamespaceError::InvalidInput { + message: format!("Invalid update expression for column '{}': {}", pair[0], e), + }) + })?; + } + if let Some(predicate) = request.predicate.as_deref() + && !predicate.trim().is_empty() + { + builder = builder.update_where(predicate).map_err(|e| { + lance_core::Error::from(NamespaceError::InvalidInput { + message: format!("Invalid update_table predicate '{}': {}", predicate, e), + }) + })?; + } + + let job = builder.build().map_err(|e| { + lance_core::Error::from(NamespaceError::InvalidInput { + message: format!("Failed to build update_table job: {}", e), + }) + })?; + + let result = job.execute().await.map_err(|e| NamespaceError::Internal { + message: format!("Failed to update table at '{}': {}", table_uri, e), + })?; + + let version = result.new_dataset.version().version as i64; + Ok(UpdateTableResponse { + transaction_id: None, + updated_rows: result.rows_updated as i64, + version, + properties: None, + }) + } + + async fn delete_from_table( + &self, + request: DeleteFromTableRequest, + ) -> Result { + self.record_op("delete_from_table"); + + if request.predicate.trim().is_empty() { + return Err(NamespaceError::InvalidInput { + message: "delete_from_table requires a non-empty predicate".to_string(), + } + .into()); + } + + let table_uri = self.resolve_table_location(&request.id).await?; + let mut dataset = self + .load_dataset(&table_uri, None, "delete_from_table") + .await?; + + let result = dataset.delete(&request.predicate).await.map_err(|e| { + // Most predicate errors come back as InvalidInput from the planner; treat any + // unexpected failure as Internal to keep callers' error matrices accurate. + match e { + lance_core::Error::InvalidInput { source, .. } => { + lance_core::Error::from(NamespaceError::InvalidInput { + message: format!( + "Invalid delete predicate '{}': {}", + request.predicate, source + ), + }) + } + other => lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to delete from table at '{}': {}", table_uri, other), + }), + } + })?; + + Ok(DeleteFromTableResponse { + transaction_id: None, + version: Some(result.new_dataset.version().version as i64), + }) + } + async fn query_table(&self, request: QueryTableRequest) -> Result { use arrow::ipc::writer::FileWriter; @@ -9075,6 +9200,176 @@ mod tests { assert!(total_rows > 0); assert!(total_rows < 3); } + + // ---------------------- update_table / delete_from_table ---------------------- + + #[tokio::test] + async fn test_update_full_table() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + // Capture base version so we can assert the update bumped it. + let base_version = open_dataset(&namespace, &table_id[0]) + .await + .version() + .version; + + let request = UpdateTableRequest { + id: Some(table_id.clone()), + updates: vec![vec!["name".to_string(), "'updated'".to_string()]], + predicate: None, + ..Default::default() + }; + + let response = namespace.update_table(request).await.unwrap(); + assert_eq!(response.updated_rows, 3); + assert!(response.version as u64 > base_version); + + // Validate that all rows now carry the new value. + let count_req = CountTableRowsRequest { + id: Some(table_id), + version: None, + predicate: Some("name = 'updated'".to_string()), + ..Default::default() + }; + assert_eq!(namespace.count_table_rows(count_req).await.unwrap(), 3); + } + + #[tokio::test] + async fn test_update_with_predicate() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + let request = UpdateTableRequest { + id: Some(table_id.clone()), + updates: vec![vec!["name".to_string(), "'matched'".to_string()]], + predicate: Some("id > 1".to_string()), + ..Default::default() + }; + + let response = namespace.update_table(request).await.unwrap(); + assert_eq!(response.updated_rows, 2); + + // Rows that did not match the predicate must remain unchanged. + let untouched = CountTableRowsRequest { + id: Some(table_id.clone()), + version: None, + predicate: Some("name = 'Alice'".to_string()), + ..Default::default() + }; + assert_eq!(namespace.count_table_rows(untouched).await.unwrap(), 1); + + let touched = CountTableRowsRequest { + id: Some(table_id), + version: None, + predicate: Some("name = 'matched'".to_string()), + ..Default::default() + }; + assert_eq!(namespace.count_table_rows(touched).await.unwrap(), 2); + } + + #[tokio::test] + async fn test_update_invalid_expression_returns_invalid_input() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + let request = UpdateTableRequest { + id: Some(table_id), + // Reference an unknown column on the right-hand side. + updates: vec![vec!["name".to_string(), "no_such_column + 1".to_string()]], + predicate: None, + ..Default::default() + }; + + let err = namespace.update_table(request).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Invalid input"), + "expected Invalid input error, got: {}", + msg + ); + } + + #[tokio::test] + async fn test_update_rejects_duplicate_columns() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + let request = UpdateTableRequest { + id: Some(table_id), + updates: vec![ + vec!["name".to_string(), "'a'".to_string()], + vec!["name".to_string(), "'b'".to_string()], + ], + predicate: None, + ..Default::default() + }; + + let err = namespace.update_table(request).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Invalid input") && msg.contains("more than once"), + "expected duplicate column InvalidInput error, got: {}", + msg + ); + } + + #[tokio::test] + async fn test_delete_with_predicate() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + let request = DeleteFromTableRequest { + id: Some(table_id.clone()), + predicate: "id > 1".to_string(), + ..Default::default() + }; + + let response = namespace.delete_from_table(request).await.unwrap(); + assert!(response.version.is_some()); + + let count_req = CountTableRowsRequest { + id: Some(table_id), + version: None, + predicate: None, + ..Default::default() + }; + // Original rows = 3; after deleting `id > 1` only row id=1 remains. + assert_eq!(namespace.count_table_rows(count_req).await.unwrap(), 1); + } + + #[tokio::test] + async fn test_delete_empty_predicate_returns_invalid_input() { + let (namespace, _temp_dir, table_id) = create_ns_with_table().await; + + let request = DeleteFromTableRequest { + id: Some(table_id), + predicate: " ".to_string(), + ..Default::default() + }; + + let err = namespace.delete_from_table(request).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Invalid input") && msg.contains("non-empty predicate"), + "expected non-empty predicate InvalidInput error, got: {}", + msg + ); + } + + #[tokio::test] + async fn test_delete_table_not_found() { + let (namespace, _temp_dir) = create_test_namespace().await; + + let request = DeleteFromTableRequest { + id: Some(vec!["does_not_exist".to_string()]), + predicate: "id = 1".to_string(), + ..Default::default() + }; + + let err = namespace.delete_from_table(request).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Table not found"), + "expected TableNotFound for missing table, got: {}", + msg + ); + } } /// Tests for multi-table transaction support via table_version_storage_enabled.