Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 278 additions & 3 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ use lance_namespace::models::{
ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse,
ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest,
MergeInsertIntoTableResponse, NamespaceExistsRequest, QueryTableRequest,
QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, RestoreTableResponse,
TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest,
UpdateTableSchemaMetadataResponse,
QueryTableRequestColumns, QueryTableRequestVector, RenameTableRequest, RenameTableResponse,
RestoreTableRequest, RestoreTableResponse, TableExistsRequest, TableVersion,
UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse,
};

use lance_core::{Error, Result};
Expand Down Expand Up @@ -3467,6 +3467,146 @@ impl LanceNamespace for DirectoryNamespace {
Ok(RestoreTableResponse { transaction_id })
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<RenameTableResponse> {
self.record_op("rename_table");

let new_table_name = &request.new_table_name;

if new_table_name.is_empty() {
return Err(NamespaceError::InvalidInput {
message: "new_table_name cannot be empty".to_string(),
}
.into());
}

// Cross-namespace rename is not supported in directory mode without manifest
if request.new_namespace_id.is_some() && self.manifest_ns.is_none() {
return Err(NamespaceError::Unsupported {
message: "Cross-namespace rename is only supported when manifest mode is enabled"
.to_string(),
}
.into());
}

// Verify source table exists (works with both manifest and dir-listing modes)
let _source_uri = self.resolve_table_location(&request.id).await?;

// Check destination table does not already exist
let dest_id = Some(vec![new_table_name.clone()]);
let dest_exists = {
let mut exists_req = TableExistsRequest::new();
exists_req.id = dest_id.clone();
self.table_exists(exists_req).await.is_ok()
};
if dest_exists {
return Err(NamespaceError::TableAlreadyExists {
message: format!("Destination table '{}' already exists", new_table_name),
}
.into());
}

// For manifest mode with cross-namespace rename, delegate to manifest
if let Some(ref manifest_ns) = self.manifest_ns
&& request.new_namespace_id.is_some()
{
return manifest_ns.rename_table(request).await;
}

// Perform the rename by copying all files from source to destination,
// then removing the source directory.
let source_name = request
.id
.as_ref()
.and_then(|id| id.last())
.ok_or_else(|| {
lance_core::Error::from(NamespaceError::InvalidInput {
message: "Table ID is required for rename".to_string(),
})
})?;

let source_path = self.table_path(source_name);
let dest_path = self.table_path(new_table_name);

// List all files in the source table directory
let entries: Vec<object_store::ObjectMeta> = self
.object_store
.inner
.list(Some(&source_path))
.try_collect()
.await
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!(
"Failed to list source table '{}' files: {:?}",
source_name, e
),
})
})?;

// Copy each file to the new location
for entry in &entries {
let relative = entry
.location
.as_ref()
.strip_prefix(source_path.as_ref())
.unwrap_or(entry.location.as_ref());
let new_location = Path::parse(format!(
"{}/{}",
dest_path.as_ref(),
relative.trim_start_matches('/')
))
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to construct destination path: {:?}", e),
})
})?;

self.object_store
.inner
.copy(&entry.location, &new_location)
.await
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!(
"Failed to copy file '{}' to '{}': {:?}",
entry.location, new_location, e
),
})
})?;
}

// Remove the source directory after successful copy
self.object_store
.remove_dir_all(source_path)
.await
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!(
"Failed to remove source table '{}' after rename: {:?}",
source_name, e
),
})
})?;

// If manifest mode is enabled, update the manifest to reflect the rename
if let Some(ref manifest_ns) = self.manifest_ns {
// Drop the old table entry from manifest
let mut drop_req = DropTableRequest::new();
drop_req.id = request.id.clone();
let _ = manifest_ns.drop_table(drop_req).await;

// Register the new table in manifest
let relative_location = format!("{}.lance", new_table_name);
let _ = manifest_ns
.register_table(new_table_name, relative_location)
.await;
}

Ok(RenameTableResponse {
transaction_id: None,
})
}

async fn update_table_schema_metadata(
&self,
request: UpdateTableSchemaMetadataRequest,
Expand Down Expand Up @@ -9645,4 +9785,139 @@ mod tests {
assert!(err_msg.contains("Table not found"));
assert!(err_msg.contains("table id 'workspace$missing_table'"));
}

#[tokio::test]
async fn test_rename_table_basic() {
let (namespace, _temp_dir) = create_test_namespace().await;

// Create a table
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["original_table".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();

// Rename the table
let mut rename_request = RenameTableRequest::new("renamed_table".to_string());
rename_request.id = Some(vec!["original_table".to_string()]);
let result = namespace.rename_table(rename_request).await;
assert!(result.is_ok());

// Verify original table no longer exists
let mut exists_request = TableExistsRequest::new();
exists_request.id = Some(vec!["original_table".to_string()]);
assert!(namespace.table_exists(exists_request).await.is_err());

// Verify renamed table exists
let mut exists_request = TableExistsRequest::new();
exists_request.id = Some(vec!["renamed_table".to_string()]);
assert!(namespace.table_exists(exists_request).await.is_ok());

// Verify we can describe the renamed table
let mut describe_request = DescribeTableRequest::new();
describe_request.id = Some(vec!["renamed_table".to_string()]);
let describe_result = namespace.describe_table(describe_request).await;
assert!(describe_result.is_ok());
}

#[tokio::test]
async fn test_rename_to_existing_table_should_fail() {
let (namespace, _temp_dir) = create_test_namespace().await;

// Create two tables
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["table_a".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data.clone()))
.await
.unwrap();

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["table_b".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();

// Try to rename table_a to table_b - should fail
let mut rename_request = RenameTableRequest::new("table_b".to_string());
rename_request.id = Some(vec!["table_a".to_string()]);
let result = namespace.rename_table(rename_request).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("already exists"));
}

#[tokio::test]
async fn test_rename_nonexistent_table_should_fail() {
let (namespace, _temp_dir) = create_test_namespace().await;

let mut rename_request = RenameTableRequest::new("new_name".to_string());
rename_request.id = Some(vec!["nonexistent".to_string()]);
let result = namespace.rename_table(rename_request).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("not found"));
}

#[tokio::test]
async fn test_rename_table_empty_name_should_fail() {
let (namespace, _temp_dir) = create_test_namespace().await;

// Create a table
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["my_table".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();

// Try to rename with empty name
let mut rename_request = RenameTableRequest::new("".to_string());
rename_request.id = Some(vec!["my_table".to_string()]);
let result = namespace.rename_table(rename_request).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("empty"));
}

#[tokio::test]
async fn test_rename_table_cross_namespace_should_fail() {
let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

// Create a table
let schema = create_test_schema();
let ipc_data = create_test_ipc_data(&schema);

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["my_table".to_string()]);
namespace
.create_table(create_request, bytes::Bytes::from(ipc_data))
.await
.unwrap();

// Try to rename with cross-namespace - should fail in directory mode without manifest
let mut rename_request = RenameTableRequest::new("new_name".to_string());
rename_request.id = Some(vec!["my_table".to_string()]);
rename_request.new_namespace_id = Some(vec!["other_ns".to_string()]);
let result = namespace.rename_table(rename_request).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Cross-namespace"));
}
}
Loading