Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 310 additions & 15 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use lance::dataset::scanner::Scanner;
use lance::dataset::statistics::DatasetStatisticsExt;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{
Dataset, MergeInsertBuilder, WhenMatched, WhenNotMatched, WhenNotMatchedBySource, WriteMode,
WriteParams,
Dataset, MergeInsertBuilder, UpdateBuilder, WhenMatched, WhenNotMatched,
WhenNotMatchedBySource, WriteMode, WriteParams,
};
use lance::index::{DatasetIndexExt, IndexParams, vector::VectorIndexParams};
use lance::session::Session;
Expand Down Expand Up @@ -48,21 +48,21 @@ use lance_namespace::models::{
BatchDeleteTableVersionsResponse, CountTableRowsRequest, CreateNamespaceRequest,
CreateNamespaceResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest,
CreateTableResponse, CreateTableScalarIndexResponse, CreateTableVersionRequest,
CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableIndexStatsRequest,
DescribeTableIndexStatsResponse, DescribeTableRequest, DescribeTableResponse,
DescribeTableVersionRequest, DescribeTableVersionResponse, DescribeTransactionRequest,
DescribeTransactionResponse, DropNamespaceRequest, DropNamespaceResponse,
DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, DropTableResponse,
ExplainTableQueryPlanRequest, FragmentStats, FragmentSummary, GetTableStatsRequest,
GetTableStatsResponse, Identity, IndexContent, InsertIntoTableRequest, InsertIntoTableResponse,
ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest,
ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse,
ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest,
CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, DeleteFromTableRequest,
DeleteFromTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse,
DescribeTableIndexStatsRequest, DescribeTableIndexStatsResponse, DescribeTableRequest,
DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse,
DescribeTransactionRequest, DescribeTransactionResponse, DropNamespaceRequest,
DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, DropTableRequest,
DropTableResponse, ExplainTableQueryPlanRequest, FragmentStats, FragmentSummary,
GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, InsertIntoTableRequest,
InsertIntoTableResponse, ListNamespacesRequest, ListNamespacesResponse,
ListTableIndicesRequest, ListTableIndicesResponse, ListTableVersionsRequest,
ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest,
MergeInsertIntoTableResponse, NamespaceExistsRequest, QueryTableRequest,
QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, RestoreTableResponse,
TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest,
UpdateTableSchemaMetadataResponse,
TableExistsRequest, TableVersion, UpdateTableRequest, UpdateTableResponse,
UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse,
};

use lance_core::{Error, Result};
Expand Down Expand Up @@ -3841,6 +3841,131 @@ impl LanceNamespace for DirectoryNamespace {
})
}

async fn update_table(&self, request: UpdateTableRequest) -> Result<UpdateTableResponse> {
self.record_op("update_table");

if request.updates.is_empty() {
return Err(NamespaceError::InvalidInput {
message: "update_table requires at least one [column, expression] pair".to_string(),
}
.into());
}

// Validate every update pair shape and detect duplicate columns up front so we
// surface a clean error instead of failing deep inside the planner.
let mut seen_columns: HashMap<String, ()> = HashMap::with_capacity(request.updates.len());
for (idx, pair) in request.updates.iter().enumerate() {
if pair.len() != 2 {
return Err(NamespaceError::InvalidInput {
message: format!(
"update_table updates[{}] must be a [column, expression] pair, got {} elements",
idx,
pair.len()
),
}
.into());
}
let column = &pair[0];
if column.trim().is_empty() {
return Err(NamespaceError::InvalidInput {
message: format!("update_table updates[{}] has an empty column name", idx),
}
.into());
}
if seen_columns.insert(column.clone(), ()).is_some() {
return Err(NamespaceError::InvalidInput {
message: format!(
"update_table cannot update column '{}' more than once",
column
),
}
.into());
}
}

let table_uri = self.resolve_table_location(&request.id).await?;
let dataset = Arc::new(self.load_dataset(&table_uri, None, "update_table").await?);

let mut builder = UpdateBuilder::new(dataset);
for pair in &request.updates {
// Indexing by 0/1 is safe due to the length check above.
builder = builder.set(&pair[0], &pair[1]).map_err(|e| {
lance_core::Error::from(NamespaceError::InvalidInput {
message: format!("Invalid update expression for column '{}': {}", pair[0], e),
})
})?;
}
if let Some(predicate) = request.predicate.as_deref()
&& !predicate.trim().is_empty()
{
builder = builder.update_where(predicate).map_err(|e| {
lance_core::Error::from(NamespaceError::InvalidInput {
message: format!("Invalid update_table predicate '{}': {}", predicate, e),
})
})?;
}

let job = builder.build().map_err(|e| {
lance_core::Error::from(NamespaceError::InvalidInput {
message: format!("Failed to build update_table job: {}", e),
})
})?;

let result = job.execute().await.map_err(|e| NamespaceError::Internal {
message: format!("Failed to update table at '{}': {}", table_uri, e),
})?;

let version = result.new_dataset.version().version as i64;
Ok(UpdateTableResponse {
transaction_id: None,
updated_rows: result.rows_updated as i64,
version,
properties: None,
})
}

async fn delete_from_table(
&self,
request: DeleteFromTableRequest,
) -> Result<DeleteFromTableResponse> {
self.record_op("delete_from_table");

if request.predicate.trim().is_empty() {
return Err(NamespaceError::InvalidInput {
message: "delete_from_table requires a non-empty predicate".to_string(),
}
.into());
}

let table_uri = self.resolve_table_location(&request.id).await?;
let mut dataset = self
.load_dataset(&table_uri, None, "delete_from_table")
.await?;

let result = dataset.delete(&request.predicate).await.map_err(|e| {
// Most predicate errors come back as InvalidInput from the planner; treat any
// unexpected failure as Internal to keep callers' error matrices accurate.
match e {
lance_core::Error::InvalidInput { source, .. } => {
lance_core::Error::from(NamespaceError::InvalidInput {
message: format!(
"Invalid delete predicate '{}': {}",
request.predicate, source
),
})
}
other => lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to delete from table at '{}': {}", table_uri, other),
}),
}
})?;

Ok(DeleteFromTableResponse {
transaction_id: None,
version: Some(result.new_dataset.version().version as i64),
})
}

async fn query_table(&self, request: QueryTableRequest) -> Result<Bytes> {
use arrow::ipc::writer::FileWriter;

Expand Down Expand Up @@ -9075,6 +9200,176 @@ mod tests {
assert!(total_rows > 0);
assert!(total_rows < 3);
}

// ---------------------- update_table / delete_from_table ----------------------

#[tokio::test]
async fn test_update_full_table() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

// Capture base version so we can assert the update bumped it.
let base_version = open_dataset(&namespace, &table_id[0])
.await
.version()
.version;

let request = UpdateTableRequest {
id: Some(table_id.clone()),
updates: vec![vec!["name".to_string(), "'updated'".to_string()]],
predicate: None,
..Default::default()
};

let response = namespace.update_table(request).await.unwrap();
assert_eq!(response.updated_rows, 3);
assert!(response.version as u64 > base_version);

// Validate that all rows now carry the new value.
let count_req = CountTableRowsRequest {
id: Some(table_id),
version: None,
predicate: Some("name = 'updated'".to_string()),
..Default::default()
};
assert_eq!(namespace.count_table_rows(count_req).await.unwrap(), 3);
}

#[tokio::test]
async fn test_update_with_predicate() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

let request = UpdateTableRequest {
id: Some(table_id.clone()),
updates: vec![vec!["name".to_string(), "'matched'".to_string()]],
predicate: Some("id > 1".to_string()),
..Default::default()
};

let response = namespace.update_table(request).await.unwrap();
assert_eq!(response.updated_rows, 2);

// Rows that did not match the predicate must remain unchanged.
let untouched = CountTableRowsRequest {
id: Some(table_id.clone()),
version: None,
predicate: Some("name = 'Alice'".to_string()),
..Default::default()
};
assert_eq!(namespace.count_table_rows(untouched).await.unwrap(), 1);

let touched = CountTableRowsRequest {
id: Some(table_id),
version: None,
predicate: Some("name = 'matched'".to_string()),
..Default::default()
};
assert_eq!(namespace.count_table_rows(touched).await.unwrap(), 2);
}

#[tokio::test]
async fn test_update_invalid_expression_returns_invalid_input() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

let request = UpdateTableRequest {
id: Some(table_id),
// Reference an unknown column on the right-hand side.
updates: vec![vec!["name".to_string(), "no_such_column + 1".to_string()]],
predicate: None,
..Default::default()
};

let err = namespace.update_table(request).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Invalid input"),
"expected Invalid input error, got: {}",
msg
);
}

#[tokio::test]
async fn test_update_rejects_duplicate_columns() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

let request = UpdateTableRequest {
id: Some(table_id),
updates: vec![
vec!["name".to_string(), "'a'".to_string()],
vec!["name".to_string(), "'b'".to_string()],
],
predicate: None,
..Default::default()
};

let err = namespace.update_table(request).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Invalid input") && msg.contains("more than once"),
"expected duplicate column InvalidInput error, got: {}",
msg
);
}

#[tokio::test]
async fn test_delete_with_predicate() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

let request = DeleteFromTableRequest {
id: Some(table_id.clone()),
predicate: "id > 1".to_string(),
..Default::default()
};

let response = namespace.delete_from_table(request).await.unwrap();
assert!(response.version.is_some());

let count_req = CountTableRowsRequest {
id: Some(table_id),
version: None,
predicate: None,
..Default::default()
};
// Original rows = 3; after deleting `id > 1` only row id=1 remains.
assert_eq!(namespace.count_table_rows(count_req).await.unwrap(), 1);
}

#[tokio::test]
async fn test_delete_empty_predicate_returns_invalid_input() {
let (namespace, _temp_dir, table_id) = create_ns_with_table().await;

let request = DeleteFromTableRequest {
id: Some(table_id),
predicate: " ".to_string(),
..Default::default()
};

let err = namespace.delete_from_table(request).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Invalid input") && msg.contains("non-empty predicate"),
"expected non-empty predicate InvalidInput error, got: {}",
msg
);
}

#[tokio::test]
async fn test_delete_table_not_found() {
let (namespace, _temp_dir) = create_test_namespace().await;

let request = DeleteFromTableRequest {
id: Some(vec!["does_not_exist".to_string()]),
predicate: "id = 1".to_string(),
..Default::default()
};

let err = namespace.delete_from_table(request).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Table not found"),
"expected TableNotFound for missing table, got: {}",
msg
);
}
}

/// Tests for multi-table transaction support via table_version_storage_enabled.
Expand Down
Loading