From a98d5ac055e86985463c9102cdf29cbd85fbc9bd Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 19 May 2026 20:28:06 +0200 Subject: [PATCH 1/5] fix: validate filesystem catalog identifiers --- bindings/c/src/identifier.rs | 49 ++++++++++++- crates/paimon/src/catalog/filesystem.rs | 92 +++++++++++++++++++++++++ crates/paimon/src/catalog/mod.rs | 90 +++++++++++++++++++++++- 3 files changed, 228 insertions(+), 3 deletions(-) diff --git a/bindings/c/src/identifier.rs b/bindings/c/src/identifier.rs index 1ccb36dd..6edaa14c 100644 --- a/bindings/c/src/identifier.rs +++ b/bindings/c/src/identifier.rs @@ -19,7 +19,7 @@ use std::ffi::{c_char, c_void}; use paimon::catalog::Identifier; -use crate::error::validate_cstr; +use crate::error::{paimon_error, validate_cstr}; use crate::result::paimon_result_identifier_new; use crate::types::paimon_identifier; @@ -50,8 +50,17 @@ pub unsafe extern "C" fn paimon_identifier_new( } } }; + let identifier = match Identifier::try_new(db, obj) { + Ok(identifier) => identifier, + Err(e) => { + return paimon_result_identifier_new { + identifier: std::ptr::null_mut(), + error: paimon_error::from_paimon(e), + } + } + }; let wrapper = Box::new(paimon_identifier { - inner: Box::into_raw(Box::new(Identifier::new(db, obj))) as *mut c_void, + inner: Box::into_raw(Box::new(identifier)) as *mut c_void, }); paimon_result_identifier_new { identifier: Box::into_raw(wrapper), @@ -72,3 +81,39 @@ pub unsafe extern "C" fn paimon_identifier_free(id: *mut paimon_identifier) { } } } + +#[cfg(test)] +mod tests { + use std::ffi::CString; + + use super::*; + use crate::error::paimon_error_free; + + #[test] + fn test_paimon_identifier_new_should_reject_path_control_names() { + let database = CString::new("db").unwrap(); + let object = CString::new("../escaped").unwrap(); + + let result = unsafe { paimon_identifier_new(database.as_ptr(), object.as_ptr()) }; + + assert!(result.identifier.is_null()); + assert!(!result.error.is_null()); + unsafe { + paimon_error_free(result.error); + } + } + + #[test] + fn test_paimon_identifier_new_should_allow_safe_names() { + let database = CString::new("db").unwrap(); + let object = CString::new("table$snapshots").unwrap(); + + let result = unsafe { paimon_identifier_new(database.as_ptr(), object.as_ptr()) }; + + assert!(result.error.is_null()); + assert!(!result.identifier.is_null()); + unsafe { + paimon_identifier_free(result.identifier); + } + } +} diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index c544a153..7f18f952 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -215,6 +215,8 @@ impl Catalog for FileSystemCatalog { ignore_if_exists: bool, properties: HashMap, ) -> Result<()> { + Identifier::validate_database_name(name)?; + if properties.contains_key(DB_LOCATION_PROP) { return Err(Error::ConfigInvalid { message: "Cannot specify location for a database when using fileSystem catalog." @@ -239,6 +241,8 @@ impl Catalog for FileSystemCatalog { } async fn get_database(&self, name: &str) -> Result { + Identifier::validate_database_name(name)?; + if !self.database_exists(name).await? { return Err(Error::DatabaseNotExist { database: name.to_string(), @@ -254,6 +258,8 @@ impl Catalog for FileSystemCatalog { ignore_if_not_exists: bool, cascade: bool, ) -> Result<()> { + Identifier::validate_database_name(name)?; + let path = self.database_path(name); let database_exists = self.database_exists(name).await?; @@ -279,6 +285,8 @@ impl Catalog for FileSystemCatalog { } async fn get_table(&self, identifier: &Identifier) -> Result { + identifier.validate()?; + let table_path = self.table_path(identifier); if !self.table_exists(identifier).await? { @@ -304,6 +312,8 @@ impl Catalog for FileSystemCatalog { } async fn list_tables(&self, database_name: &str) -> Result> { + Identifier::validate_database_name(database_name)?; + let path = self.database_path(database_name); if !self.database_exists(database_name).await? { @@ -321,6 +331,8 @@ impl Catalog for FileSystemCatalog { creation: Schema, ignore_if_exists: bool, ) -> Result<()> { + identifier.validate()?; + let table_path = self.table_path(identifier); let table_exists = self.table_exists(identifier).await?; @@ -348,6 +360,8 @@ impl Catalog for FileSystemCatalog { } async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()> { + identifier.validate()?; + let table_path = self.table_path(identifier); let table_exists = self.table_exists(identifier).await?; @@ -372,6 +386,9 @@ impl Catalog for FileSystemCatalog { to: &Identifier, ignore_if_not_exists: bool, ) -> Result<()> { + from.validate()?; + to.validate()?; + let from_path = self.table_path(from); let to_path = self.table_path(to); @@ -403,6 +420,8 @@ impl Catalog for FileSystemCatalog { changes: Vec, ignore_if_not_exists: bool, ) -> Result<()> { + identifier.validate()?; + let table_path = self.table_path(identifier); if !self.table_exists(identifier).await? { if ignore_if_not_exists { @@ -511,6 +530,31 @@ mod tests { assert!(!catalog.database_exists("db1").await.unwrap()); } + #[tokio::test] + async fn test_create_database_should_reject_path_traversal_name() { + let (temp_dir, catalog) = create_test_catalog(); + let escaped_path = temp_dir.path().join("escaped.db"); + + let result = catalog + .create_database("../escaped", false, HashMap::new()) + .await; + + assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); + assert!(!escaped_path.exists()); + } + + #[tokio::test] + async fn test_drop_database_should_reject_path_traversal_name() { + let (temp_dir, catalog) = create_test_catalog(); + let escaped_path = temp_dir.path().join("escaped.db"); + std::fs::create_dir(&escaped_path).unwrap(); + + let result = catalog.drop_database("../escaped", false, true).await; + + assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); + assert!(escaped_path.exists()); + } + #[tokio::test] async fn test_table_operations() { let (_temp_dir, catalog) = create_test_catalog(); @@ -568,6 +612,54 @@ mod tests { assert!(!tables.contains(&"table1".to_string())); } + #[tokio::test] + async fn test_create_table_should_reject_path_traversal_object_name() { + let (temp_dir, catalog) = create_test_catalog(); + catalog + .create_database("db1", false, HashMap::new()) + .await + .unwrap(); + let escaped_path = temp_dir.path().join("table_escape"); + + let result = catalog + .create_table( + &Identifier::new("db1", "../../table_escape"), + testing_schema(), + false, + ) + .await; + + assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); + assert!(!escaped_path.exists()); + } + + #[tokio::test] + async fn test_rename_table_should_reject_path_traversal_target_name() { + let (temp_dir, catalog) = create_test_catalog(); + catalog + .create_database("db1", false, HashMap::new()) + .await + .unwrap(); + let source = Identifier::new("db1", "source"); + catalog + .create_table(&source, testing_schema(), false) + .await + .unwrap(); + let escaped_path = temp_dir.path().join("renamed_escape"); + + let result = catalog + .rename_table( + &source, + &Identifier::new("db1", "../../renamed_escape"), + false, + ) + .await; + + assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); + assert!(!escaped_path.exists()); + assert!(catalog.get_table(&source).await.is_ok()); + } + #[tokio::test] async fn test_list_partitions_default_table_not_found_errors() { let (_temp_dir, catalog) = create_test_catalog(); diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 0c944a4c..83c4d339 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -29,6 +29,7 @@ mod rest; use std::collections::HashMap; use std::fmt; +use crate::{Error, Result}; pub use database::*; pub use factory::*; pub use filesystem::*; @@ -76,6 +77,29 @@ impl Identifier { } } + /// Create an identifier from database and object name, validating both names. + pub fn try_new(database: impl Into, object: impl Into) -> Result { + let identifier = Self::new(database, object); + identifier.validate()?; + Ok(identifier) + } + + /// Validate this identifier's database and object names. + pub fn validate(&self) -> Result<()> { + Self::validate_database_name(&self.database)?; + Self::validate_object_name(&self.object) + } + + /// Validate a database name for path-safe catalog use. + pub fn validate_database_name(name: &str) -> Result<()> { + validate_identifier_name("database", name) + } + + /// Validate an object name for path-safe catalog use. + pub fn validate_object_name(name: &str) -> Result<()> { + validate_identifier_name("object", name) + } + /// Database name. pub fn database(&self) -> &str { &self.database @@ -97,6 +121,28 @@ impl Identifier { } } +fn validate_identifier_name(kind: &str, name: &str) -> Result<()> { + let invalid = if name.trim().is_empty() { + Some("cannot be empty or whitespace") + } else if matches!(name, "." | "..") { + Some("cannot be '.' or '..'") + } else if name.contains('/') || name.contains('\\') { + Some("cannot contain path separators") + } else if name.chars().any(char::is_control) { + Some("cannot contain control characters") + } else { + None + }; + + if let Some(reason) = invalid { + return Err(Error::IdentifierInvalid { + message: format!("{kind} name {reason}: {name:?}"), + }); + } + + Ok(()) +} + impl fmt::Display for Identifier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.full_name()) @@ -112,6 +158,49 @@ impl fmt::Debug for Identifier { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_identifier_try_new_should_reject_path_control_names() { + for (database, object) in [ + ("", "table"), + (" ", "table"), + (".", "table"), + ("..", "table"), + ("../escaped", "table"), + ("db\\escaped", "table"), + ("db\nescaped", "table"), + ("db", ""), + ("db", " "), + ("db", "."), + ("db", ".."), + ("db", "../escaped"), + ("db", "nested/table"), + ("db", "nested\\table"), + ("db", "table\0name"), + ] { + let result = Identifier::try_new(database, object); + assert!( + matches!(result, Err(Error::IdentifierInvalid { .. })), + "expected invalid identifier for database={database:?}, object={object:?}, got {result:?}" + ); + } + } + + #[test] + fn test_identifier_try_new_should_allow_system_suffix_and_unicode_names() { + let identifier = Identifier::try_new("analytics", "orders$snapshots").unwrap(); + assert_eq!(identifier.database(), "analytics"); + assert_eq!(identifier.object(), "orders$snapshots"); + + let identifier = Identifier::try_new("数据", "订单").unwrap(); + assert_eq!(identifier.database(), "数据"); + assert_eq!(identifier.object(), "订单"); + } +} + // ======================= Catalog trait =============================== use async_trait::async_trait; @@ -119,7 +208,6 @@ use async_trait::async_trait; use crate::api::PagedList; use crate::spec::{Partition, Schema, SchemaChange}; use crate::table::Table; -use crate::Result; /// Catalog API for reading and writing metadata (databases, tables) in Paimon. /// From a9289b74053c536468ac6549b47bf652a3771f6b Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 19 May 2026 22:03:14 +0200 Subject: [PATCH 2/5] fix: move catalog tests after items --- crates/paimon/src/catalog/mod.rs | 86 ++++++++++++++++---------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 83c4d339..875f5644 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -158,49 +158,6 @@ impl fmt::Debug for Identifier { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_identifier_try_new_should_reject_path_control_names() { - for (database, object) in [ - ("", "table"), - (" ", "table"), - (".", "table"), - ("..", "table"), - ("../escaped", "table"), - ("db\\escaped", "table"), - ("db\nescaped", "table"), - ("db", ""), - ("db", " "), - ("db", "."), - ("db", ".."), - ("db", "../escaped"), - ("db", "nested/table"), - ("db", "nested\\table"), - ("db", "table\0name"), - ] { - let result = Identifier::try_new(database, object); - assert!( - matches!(result, Err(Error::IdentifierInvalid { .. })), - "expected invalid identifier for database={database:?}, object={object:?}, got {result:?}" - ); - } - } - - #[test] - fn test_identifier_try_new_should_allow_system_suffix_and_unicode_names() { - let identifier = Identifier::try_new("analytics", "orders$snapshots").unwrap(); - assert_eq!(identifier.database(), "analytics"); - assert_eq!(identifier.object(), "orders$snapshots"); - - let identifier = Identifier::try_new("数据", "订单").unwrap(); - assert_eq!(identifier.database(), "数据"); - assert_eq!(identifier.object(), "订单"); - } -} - // ======================= Catalog trait =============================== use async_trait::async_trait; @@ -345,3 +302,46 @@ pub trait Catalog: Send + Sync { )) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_identifier_try_new_should_reject_path_control_names() { + for (database, object) in [ + ("", "table"), + (" ", "table"), + (".", "table"), + ("..", "table"), + ("../escaped", "table"), + ("db\\escaped", "table"), + ("db\nescaped", "table"), + ("db", ""), + ("db", " "), + ("db", "."), + ("db", ".."), + ("db", "../escaped"), + ("db", "nested/table"), + ("db", "nested\\table"), + ("db", "table\0name"), + ] { + let result = Identifier::try_new(database, object); + assert!( + matches!(result, Err(Error::IdentifierInvalid { .. })), + "expected invalid identifier for database={database:?}, object={object:?}, got {result:?}" + ); + } + } + + #[test] + fn test_identifier_try_new_should_allow_system_suffix_and_unicode_names() { + let identifier = Identifier::try_new("analytics", "orders$snapshots").unwrap(); + assert_eq!(identifier.database(), "analytics"); + assert_eq!(identifier.object(), "orders$snapshots"); + + let identifier = Identifier::try_new("数据", "订单").unwrap(); + assert_eq!(identifier.database(), "数据"); + assert_eq!(identifier.object(), "订单"); + } +} From 14ab8f087d9a646940ff455dca9c73d0b5c5f04c Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 20 May 2026 12:05:53 +0200 Subject: [PATCH 3/5] fix: validate identifiers in constructor --- bindings/c/src/identifier.rs | 2 +- .../integration_tests/tests/append_tables.rs | 2 +- crates/integration_tests/tests/read_tables.rs | 2 +- crates/integrations/datafusion/src/catalog.rs | 16 ++++-- .../datafusion/src/filter_pushdown.rs | 2 +- .../integrations/datafusion/src/merge_into.rs | 6 +- .../datafusion/src/physical_plan/scan.rs | 4 +- .../integrations/datafusion/src/procedures.rs | 4 +- .../datafusion/src/sql_context.rs | 53 ++++++++---------- .../datafusion/src/system_tables/mod.rs | 2 +- .../integrations/datafusion/src/table/mod.rs | 8 +-- .../datafusion/src/table_function_args.rs | 8 ++- crates/integrations/datafusion/src/update.rs | 4 +- .../tests/cross_partition_tables.rs | 8 +-- .../datafusion/tests/dynamic_bucket_tables.rs | 16 +++--- .../datafusion/tests/merge_into_tests.rs | 4 +- .../datafusion/tests/pk_tables.rs | 12 ++-- .../datafusion/tests/read_tables.rs | 4 +- .../datafusion/tests/sql_context_tests.rs | 22 +++++--- .../datafusion/tests/system_tables.rs | 14 ++--- .../paimon/examples/rest_catalog_example.rs | 6 +- crates/paimon/src/api/api_request.rs | 4 +- crates/paimon/src/catalog/filesystem.rs | 56 +++++++++++-------- crates/paimon/src/catalog/mod.rs | 23 +++----- crates/paimon/src/table/cow_writer.rs | 8 +-- .../paimon/src/table/data_evolution_reader.rs | 4 +- .../paimon/src/table/data_evolution_writer.rs | 4 +- crates/paimon/src/table/read_builder.rs | 16 +++--- crates/paimon/src/table/table_commit.rs | 6 +- crates/paimon/src/table/table_scan.rs | 2 +- crates/paimon/src/table/table_write.rs | 32 +++++------ crates/paimon/tests/rest_api_test.rs | 20 +++---- crates/paimon/tests/rest_catalog_test.rs | 32 +++++------ 33 files changed, 209 insertions(+), 197 deletions(-) diff --git a/bindings/c/src/identifier.rs b/bindings/c/src/identifier.rs index 6edaa14c..9aa36e25 100644 --- a/bindings/c/src/identifier.rs +++ b/bindings/c/src/identifier.rs @@ -50,7 +50,7 @@ pub unsafe extern "C" fn paimon_identifier_new( } } }; - let identifier = match Identifier::try_new(db, obj) { + let identifier = match Identifier::new(db, obj) { Ok(identifier) => identifier, Err(e) => { return paimon_result_identifier_new { diff --git a/crates/integration_tests/tests/append_tables.rs b/crates/integration_tests/tests/append_tables.rs index 981c44d9..5df43d14 100644 --- a/crates/integration_tests/tests/append_tables.rs +++ b/crates/integration_tests/tests/append_tables.rs @@ -51,7 +51,7 @@ async fn setup_dirs(file_io: &paimon::io::FileIO, table_path: &str) { fn make_table(file_io: &paimon::io::FileIO, table_path: &str, schema: TableSchema) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), table_path.to_string(), schema, None, diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index b89268c1..3f0a043a 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -71,7 +71,7 @@ async fn get_table_from_catalog( catalog: &C, table_name: &str, ) -> paimon::Table { - let identifier = Identifier::new("default", table_name); + let identifier = Identifier::new("default", table_name).unwrap(); catalog .get_table(&identifier) .await diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index a93201aa..3bc748c0 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -214,7 +214,7 @@ impl PaimonCatalogProvider { let catalog = Arc::clone(&self.catalog); let db = database.to_string(); let tbl = table_name.to_string(); - let identifier = Identifier::new(db, tbl); + let identifier = Identifier::new(db, tbl).map_err(to_datafusion_error)?; if let Ok(true) = block_on_with_runtime( async move { match catalog.get_table(&identifier).await { @@ -372,7 +372,8 @@ impl SchemaProvider for PaimonSchemaProvider { let catalog = Arc::clone(&self.catalog); let dynamic_options = Arc::clone(&self.dynamic_options); - let identifier = Identifier::new(self.database.clone(), base); + let identifier = + Identifier::new(self.database.clone(), base).map_err(to_datafusion_error)?; await_with_runtime(async move { match catalog.get_table(&identifier).await { Ok(table) => { @@ -407,7 +408,13 @@ impl SchemaProvider for PaimonSchemaProvider { } let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), base.to_string()); + let identifier = match Identifier::new(self.database.clone(), base.to_string()) { + Ok(identifier) => identifier, + Err(e) => { + log::error!("invalid table identifier '{}.{}': {e}", self.database, base); + return false; + } + }; block_on_with_runtime( async move { match catalog.get_table(&identifier).await { @@ -435,7 +442,8 @@ impl SchemaProvider for PaimonSchemaProvider { fn deregister_table(&self, name: &str) -> DFResult>> { let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), name); + let identifier = + Identifier::new(self.database.clone(), name).map_err(to_datafusion_error)?; block_on_with_runtime( async move { // Try to get the table first so we can return it. diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 0d418270..0d6af96b 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -397,7 +397,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), "/tmp/test-filter-pushdown".to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/src/merge_into.rs b/crates/integrations/datafusion/src/merge_into.rs index cd78ddec..04945837 100644 --- a/crates/integrations/datafusion/src/merge_into.rs +++ b/crates/integrations/datafusion/src/merge_into.rs @@ -1496,7 +1496,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name)) + .get_table(&Identifier::new("test_db", name).unwrap()) .await .unwrap(); let mut extra = std::collections::HashMap::new(); @@ -1645,7 +1645,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path.to_string(), table_schema, None, @@ -1689,7 +1689,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name)) + .get_table(&Identifier::new("test_db", name).unwrap()) .await .unwrap(); diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index a7387f4a..896d673b 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -322,7 +322,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); Table::new( file_io, - Identifier::new("test_db", "test_table"), + Identifier::new("test_db", "test_table").unwrap(), "/tmp/test-table".to_string(), table_schema, None, @@ -354,7 +354,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path, table_schema, None, diff --git a/crates/integrations/datafusion/src/procedures.rs b/crates/integrations/datafusion/src/procedures.rs index 183788ab..a3a8b9a1 100644 --- a/crates/integrations/datafusion/src/procedures.rs +++ b/crates/integrations/datafusion/src/procedures.rs @@ -233,7 +233,7 @@ fn require_arg<'a>(args: &'a HashMap, name: &str) -> DFResult<&' fn resolve_table_identifier(table_str: &str, catalog_name: &str) -> DFResult { let parts: Vec<&str> = table_str.split('.').collect(); match parts.len() { - 2 => Ok(Identifier::new(parts[0], parts[1])), + 2 => Identifier::new(parts[0], parts[1]).map_err(to_datafusion_error), 3 => { if parts[0] != catalog_name { return Err(DataFusionError::Plan(format!( @@ -241,7 +241,7 @@ fn resolve_table_identifier(table_str: &str, catalog_name: &str) -> DFResult Err(DataFusionError::Plan(format!( "Invalid table identifier: '{table_str}'. Expected 'database.table' or 'catalog.database.table'" diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index f77b79e2..58665327 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -554,20 +554,16 @@ impl SQLContext { .catalogs .get(catalog.as_ref()) .ok_or_else(|| DataFusionError::Plan(format!("Unknown catalog '{catalog}'")))?; - Ok(( - catalog_arc.clone(), - catalog.to_string(), - Identifier::new(schema.as_ref(), table.as_ref()), - )) + let identifier = Identifier::new(schema.as_ref(), table.as_ref()) + .map_err(to_datafusion_error)?; + Ok((catalog_arc.clone(), catalog.to_string(), identifier)) } datafusion::common::TableReference::Partial { schema, table } => { let catalog = self.current_catalog()?; let catalog_name = self.current_catalog_name(); - Ok(( - catalog, - catalog_name, - Identifier::new(schema.as_ref(), table.as_ref()), - )) + let identifier = Identifier::new(schema.as_ref(), table.as_ref()) + .map_err(to_datafusion_error)?; + Ok((catalog, catalog_name, identifier)) } datafusion::common::TableReference::Bare { table } => { let catalog = self.current_catalog()?; @@ -579,11 +575,9 @@ impl SQLContext { .catalog .default_schema .clone(); - Ok(( - catalog, - catalog_name, - Identifier::new(default_schema, table.as_ref()), - )) + let identifier = + Identifier::new(default_schema, table.as_ref()).map_err(to_datafusion_error)?; + Ok((catalog, catalog_name, identifier)) } } } @@ -846,7 +840,10 @@ impl SQLContext { object_name_to_string(name) } }; - rename_to = Some(Identifier::new(identifier.database().to_string(), new_name)); + rename_to = Some( + Identifier::new(identifier.database().to_string(), new_name) + .map_err(to_datafusion_error)?, + ); } AlterTableOperation::SetTblProperties { table_properties } => { for opt in table_properties { @@ -1256,19 +1253,15 @@ impl SQLContext { let catalog = self.catalogs.get(&parts[0]).ok_or_else(|| { DataFusionError::Plan(format!("Unknown catalog '{}'", parts[0])) })?; - Ok(( - catalog.clone(), - parts[0].clone(), - Identifier::new(parts[1].clone(), parts[2].clone()), - )) + let identifier = Identifier::new(parts[1].clone(), parts[2].clone()) + .map_err(to_datafusion_error)?; + Ok((catalog.clone(), parts[0].clone(), identifier)) } 2 => { let catalog = self.current_catalog()?; - Ok(( - catalog, - self.current_catalog_name(), - Identifier::new(parts[0].clone(), parts[1].clone()), - )) + let identifier = Identifier::new(parts[0].clone(), parts[1].clone()) + .map_err(to_datafusion_error)?; + Ok((catalog, self.current_catalog_name(), identifier)) } 1 => { let catalog = self.current_catalog()?; @@ -1279,11 +1272,9 @@ impl SQLContext { .catalog .default_schema .clone(); - Ok(( - catalog, - self.current_catalog_name(), - Identifier::new(default_schema, parts[0].clone()), - )) + let identifier = Identifier::new(default_schema, parts[0].clone()) + .map_err(to_datafusion_error)?; + Ok((catalog, self.current_catalog_name(), identifier)) } _ => Err(DataFusionError::Plan(format!( "Invalid table reference: {name}" diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index 0305d2f8..2ecef4b9 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -136,7 +136,7 @@ pub(crate) async fn load( if !is_registered(&system_name) { return Ok(None); } - let identifier = Identifier::new(database, base.clone()); + let identifier = Identifier::new(database, base.clone()).map_err(to_datafusion_error)?; match catalog.get_table(&identifier).await { Ok(table) => { if system_name.eq_ignore_ascii_case("partitions") { diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 5bae7443..e7f8c31d 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -285,7 +285,7 @@ mod tests { async fn create_provider(table_name: &str) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name); + let identifier = Identifier::new("default", table_name).unwrap(); let table = catalog .get_table(&identifier) .await @@ -536,7 +536,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_insert"), + Identifier::new("default", "test_insert").unwrap(), table_path.to_string(), table_schema, None, @@ -616,7 +616,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_overwrite"), + Identifier::new("default", "test_overwrite").unwrap(), table_path.to_string(), table_schema, None, @@ -704,7 +704,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_overwrite_unpart"), + Identifier::new("default", "test_overwrite_unpart").unwrap(), table_path.to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/src/table_function_args.rs b/crates/integrations/datafusion/src/table_function_args.rs index c0a6c833..d7b750cf 100644 --- a/crates/integrations/datafusion/src/table_function_args.rs +++ b/crates/integrations/datafusion/src/table_function_args.rs @@ -20,6 +20,8 @@ use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::Expr; use paimon::catalog::Identifier; +use crate::error::to_datafusion_error; + pub(crate) fn extract_string_literal( function_name: &str, expr: &Expr, @@ -72,9 +74,9 @@ pub(crate) fn parse_table_identifier( ) -> DFResult { let parts: Vec<&str> = name.split('.').collect(); match parts.len() { - 1 => Ok(Identifier::new(default_database, parts[0])), - 2 => Ok(Identifier::new(parts[0], parts[1])), - 3 => Ok(Identifier::new(parts[1], parts[2])), + 1 => Identifier::new(default_database, parts[0]).map_err(to_datafusion_error), + 2 => Identifier::new(parts[0], parts[1]).map_err(to_datafusion_error), + 3 => Identifier::new(parts[1], parts[2]).map_err(to_datafusion_error), _ => Err(DataFusionError::Plan(format!( "{function_name}: invalid table name '{name}', expected 'table', 'database.table', or 'catalog.database.table'" ))), diff --git a/crates/integrations/datafusion/src/update.rs b/crates/integrations/datafusion/src/update.rs index 8803dcc6..1e19e08f 100644 --- a/crates/integrations/datafusion/src/update.rs +++ b/crates/integrations/datafusion/src/update.rs @@ -374,7 +374,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name)) + .get_table(&Identifier::new("test_db", name).unwrap()) .await .unwrap(); @@ -564,7 +564,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path.to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/tests/cross_partition_tables.rs b/crates/integrations/datafusion/tests/cross_partition_tables.rs index 13183647..0b37a725 100644 --- a/crates/integrations/datafusion/tests/cross_partition_tables.rs +++ b/crates/integrations/datafusion/tests/cross_partition_tables.rs @@ -290,7 +290,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify initial state: 2 data files (one per partition) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv")) + .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) .await .unwrap(); let plan = table @@ -318,7 +318,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify via scan_all_files: old partition "a" should have a new file // containing the DELETE record for id=1 (written with _VALUE_KIND=1) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv")) + .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) .await .unwrap(); let plan = table @@ -396,7 +396,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify partition "a" now has another DELETE file for id=2 let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv")) + .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) .await .unwrap(); let plan = table @@ -486,7 +486,7 @@ async fn test_cross_partition_first_row_skip() { // Use scan_all_files to verify (FIRST_ROW skips level-0 in normal reads) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_fr")) + .get_table(&Identifier::new("test_db", "t_cross_fr").unwrap()) .await .unwrap(); let rb = table.new_read_builder(); diff --git a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs index 4c8cbcb5..21fde4ce 100644 --- a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs +++ b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs @@ -275,7 +275,7 @@ async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) + .get_table(&Identifier::new("test_db", "t_dyn_partial_route").unwrap()) .await .unwrap(); assert_eq!( @@ -301,7 +301,7 @@ async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) + .get_table(&Identifier::new("test_db", "t_dyn_partial_route").unwrap()) .await .unwrap(); let id1_bucket_after = bucket_containing_id(&table, 1).await; @@ -705,7 +705,7 @@ async fn test_pk_dynamic_bucket_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_ow")) + .get_table(&Identifier::new("test_db", "t_dyn_ow").unwrap()) .await .unwrap(); let hashes_before = collect_all_hashes(&table).await; @@ -733,7 +733,7 @@ async fn test_pk_dynamic_bucket_insert_overwrite() { // Verify HASH index: should have exactly 2 entries (not 3+2=5) let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_ow")) + .get_table(&Identifier::new("test_db", "t_dyn_ow").unwrap()) .await .unwrap(); let hashes_after = collect_all_hashes(&table).await; @@ -788,7 +788,7 @@ async fn test_pk_dynamic_bucket_partitioned_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_part_ow")) + .get_table(&Identifier::new("test_db", "t_dyn_part_ow").unwrap()) .await .unwrap(); let entries_before = read_hash_index_entries(&table).await; @@ -818,7 +818,7 @@ async fn test_pk_dynamic_bucket_partitioned_insert_overwrite() { // Verify HASH index: partition 'b' entries should survive, // partition 'a' should have exactly 1 entry (not 2+1=3) let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_part_ow")) + .get_table(&Identifier::new("test_db", "t_dyn_part_ow").unwrap()) .await .unwrap(); let entries_after = read_hash_index_entries(&table).await; @@ -860,7 +860,7 @@ async fn test_read_spark_dynamic_bucket_and_compare_index() { opts.set(CatalogOptions::WAREHOUSE, &warehouse); let spark_catalog = FileSystemCatalog::new(opts).unwrap(); let spark_table = spark_catalog - .get_table(&Identifier::new("default", "dynamic_bucket_pk_table")) + .get_table(&Identifier::new("default", "dynamic_bucket_pk_table").unwrap()) .await .unwrap(); @@ -958,7 +958,7 @@ async fn test_read_spark_dynamic_bucket_and_compare_index() { FileSystemCatalog::new(opts).unwrap() }; let rust_table = rust_catalog - .get_table(&Identifier::new("test_db", "t_dyn_cmp")) + .get_table(&Identifier::new("test_db", "t_dyn_cmp").unwrap()) .await .unwrap(); diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs b/crates/integrations/datafusion/tests/merge_into_tests.rs index 0969c09f..479bb3c7 100644 --- a/crates/integrations/datafusion/tests/merge_into_tests.rs +++ b/crates/integrations/datafusion/tests/merge_into_tests.rs @@ -457,7 +457,7 @@ async fn test_row_count_after_merge() { // Snapshot 1: 3 rows inserted let table = catalog - .get_table(&Identifier::new("test_db", "target")) + .get_table(&Identifier::new("test_db", "target").unwrap()) .await .unwrap(); let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -783,7 +783,7 @@ async fn test_merge_into_row_id_for_inserted_rows() { // Verify next_row_id in snapshot let table = catalog - .get_table(&Identifier::new("test_db", "target")) + .get_table(&Identifier::new("test_db", "target").unwrap()) .await .unwrap(); let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index fa0bd6f1..2fb9429d 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -1680,7 +1680,7 @@ async fn test_pk_first_row_insert_overwrite() { // Verify via scan_all_files: 2 level-0 files (one per partition) let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow")) + .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) .await .unwrap(); let plan = table @@ -1706,7 +1706,7 @@ async fn test_pk_first_row_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow")) + .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) .await .unwrap(); let plan = table @@ -1732,7 +1732,7 @@ async fn test_pk_first_row_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow")) + .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) .await .unwrap(); let plan = table @@ -1782,7 +1782,7 @@ async fn test_postpone_write_invisible_to_select() { // scan_all_files should find the postpone file let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone")) + .get_table(&Identifier::new("test_db", "t_postpone").unwrap()) .await .unwrap(); let plan = table @@ -1830,7 +1830,7 @@ async fn test_postpone_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone_ow")) + .get_table(&Identifier::new("test_db", "t_postpone_ow").unwrap()) .await .unwrap(); let plan = table @@ -1853,7 +1853,7 @@ async fn test_postpone_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone_ow")) + .get_table(&Identifier::new("test_db", "t_postpone_ow").unwrap()) .await .unwrap(); let plan = table diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 396e9514..1dfc8eca 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -51,7 +51,7 @@ async fn create_context() -> SQLContext { async fn create_provider(table_name: &str) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name); + let identifier = Identifier::new("default", table_name).unwrap(); let table = catalog .get_table(&identifier) .await @@ -65,7 +65,7 @@ async fn create_provider_with_options( extra_options: HashMap, ) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name); + let identifier = Identifier::new("default", table_name).unwrap(); let table = catalog .get_table(&identifier) .await diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index 7deed175..73114512 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -148,7 +148,7 @@ async fn test_create_table() { // Verify schema let table = catalog - .get_table(&Identifier::new("mydb", "users")) + .get_table(&Identifier::new("mydb", "users").unwrap()) .await .unwrap(); let schema = table.schema(); @@ -178,7 +178,7 @@ async fn test_create_table_with_blob_type() { .expect("CREATE TABLE with BLOB should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "assets")) + .get_table(&Identifier::new("mydb", "assets").unwrap()) .await .unwrap(); let schema = table.schema(); @@ -214,7 +214,7 @@ async fn test_create_table_with_partition() { .expect("CREATE TABLE with partition should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "events")) + .get_table(&Identifier::new("mydb", "events").unwrap()) .await .unwrap(); let schema = table.schema(); @@ -333,7 +333,7 @@ async fn test_create_table_with_array_and_map() { .expect("CREATE TABLE with ARRAY and MAP should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "complex_types")) + .get_table(&Identifier::new("mydb", "complex_types").unwrap()) .await .unwrap(); let schema = table.schema(); @@ -386,7 +386,7 @@ async fn test_create_table_with_row_type() { .expect("CREATE TABLE with STRUCT should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "row_table")) + .get_table(&Identifier::new("mydb", "row_table").unwrap()) .await .unwrap(); let schema = table.schema(); @@ -427,7 +427,7 @@ async fn test_drop_table() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "to_drop"), schema, false) + .create_table(&Identifier::new("mydb", "to_drop").unwrap(), schema, false) .await .unwrap(); @@ -476,7 +476,11 @@ async fn test_alter_table_add_column() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "alter_test"), schema, false) + .create_table( + &Identifier::new("mydb", "alter_test").unwrap(), + schema, + false, + ) .await .unwrap(); @@ -517,7 +521,7 @@ async fn test_alter_table_rename() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "old_name"), schema, false) + .create_table(&Identifier::new("mydb", "old_name").unwrap(), schema, false) .await .unwrap(); @@ -555,7 +559,7 @@ async fn test_ddl_context_delegates_select() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "t1"), schema, false) + .create_table(&Identifier::new("mydb", "t1").unwrap(), schema, false) .await .unwrap(); diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index ad4805d0..86479e44 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -101,7 +101,7 @@ async fn test_options_system_table() { } actual.sort(); - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog .get_table(&identifier) .await @@ -159,7 +159,7 @@ async fn test_table_indexes_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog .get_table(&identifier) .await @@ -329,7 +329,7 @@ async fn test_schemas_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog .get_table(&identifier) .await @@ -456,7 +456,7 @@ async fn test_snapshots_system_table() { } // Row count must match the snapshot directory listing. - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog .get_table(&identifier) .await @@ -595,7 +595,7 @@ async fn test_tags_system_table_empty_when_no_tag_dir() { async fn test_tags_system_table_with_seeded_tags() { let (ctx, catalog, tmp) = create_context().await; - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog.get_table(&identifier).await.unwrap(); let sm = paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -685,7 +685,7 @@ async fn test_manifests_system_table() { } } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog.get_table(&identifier).await.unwrap(); let sm = paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -831,7 +831,7 @@ async fn test_files_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); let table = catalog.get_table(&identifier).await.unwrap(); let plan = table .new_read_builder() diff --git a/crates/paimon/examples/rest_catalog_example.rs b/crates/paimon/examples/rest_catalog_example.rs index cca3cbdb..b55d4512 100644 --- a/crates/paimon/examples/rest_catalog_example.rs +++ b/crates/paimon/examples/rest_catalog_example.rs @@ -146,7 +146,7 @@ async fn main() { println!("\n=== Part 2: Table Operations ===\n"); // Create table - let table_identifier = Identifier::new("example_db", "users"); + let table_identifier = Identifier::new("example_db", "users").unwrap(); println!("Creating table '{table_identifier}'..."); let schema = create_test_schema(); match catalog.create_table(&table_identifier, schema, false).await { @@ -176,7 +176,7 @@ async fn main() { } // Rename table - let renamed_identifier = Identifier::new("example_db", "users_renamed"); + let renamed_identifier = Identifier::new("example_db", "users_renamed").unwrap(); println!("\nRenaming table '{table_identifier}' to '{renamed_identifier}'..."); match catalog .rename_table(&table_identifier, &renamed_identifier, false) @@ -191,7 +191,7 @@ async fn main() { // Try to read from an existing table (example_db.users_renamed) // This table must already exist on the REST catalog server - let read_table_identifier = Identifier::new("example_db", "users_renamed"); + let read_table_identifier = Identifier::new("example_db", "users_renamed").unwrap(); println!("Attempting to read from table '{read_table_identifier}'..."); match catalog.get_table(&read_table_identifier).await { diff --git a/crates/paimon/src/api/api_request.rs b/crates/paimon/src/api/api_request.rs index 33a52fd7..e19ea2b6 100644 --- a/crates/paimon/src/api/api_request.rs +++ b/crates/paimon/src/api/api_request.rs @@ -123,8 +123,8 @@ mod tests { #[test] fn test_rename_table_request_serialization() { - let source = Identifier::new("db1".to_string(), "table1".to_string()); - let destination = Identifier::new("db2".to_string(), "table2".to_string()); + let source = Identifier::new("db1".to_string(), "table1".to_string()).unwrap(); + let destination = Identifier::new("db2".to_string(), "table2".to_string()).unwrap(); let req = RenameTableRequest::new(source, destination); let json = serde_json::to_string(&req).unwrap(); diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 7f18f952..cafc581d 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -470,6 +470,14 @@ mod tests { .unwrap() } + fn deserialize_identifier(database: &str, object: &str) -> Identifier { + serde_json::from_value(serde_json::json!({ + "database": database, + "object": object, + })) + .unwrap() + } + #[tokio::test] async fn test_database_operations() { let (_temp_dir, catalog) = create_test_catalog(); @@ -518,7 +526,11 @@ mod tests { .await .unwrap(); catalog - .create_table(&Identifier::new("db1", "table1"), testing_schema(), false) + .create_table( + &Identifier::new("db1", "table1").unwrap(), + testing_schema(), + false, + ) .await .unwrap(); let result = catalog.drop_database("db1", false, false).await; @@ -566,11 +578,15 @@ mod tests { // create and list tables let schema = testing_schema(); catalog - .create_table(&Identifier::new("db1", "table1"), schema.clone(), false) + .create_table( + &Identifier::new("db1", "table1").unwrap(), + schema.clone(), + false, + ) .await .unwrap(); catalog - .create_table(&Identifier::new("db1", "table2"), schema, false) + .create_table(&Identifier::new("db1", "table2").unwrap(), schema, false) .await .unwrap(); let tables = catalog.list_tables("db1").await.unwrap(); @@ -591,11 +607,15 @@ mod tests { .build() .unwrap(); catalog - .create_table(&Identifier::new("db1", "table3"), schema_with_name, false) + .create_table( + &Identifier::new("db1", "table3").unwrap(), + schema_with_name, + false, + ) .await .unwrap(); let table = catalog - .get_table(&Identifier::new("db1", "table3")) + .get_table(&Identifier::new("db1", "table3").unwrap()) .await .unwrap(); let table_schema = table.schema(); @@ -604,7 +624,7 @@ mod tests { // drop table catalog - .drop_table(&Identifier::new("db1", "table1"), false) + .drop_table(&Identifier::new("db1", "table1").unwrap(), false) .await .unwrap(); let tables = catalog.list_tables("db1").await.unwrap(); @@ -621,12 +641,9 @@ mod tests { .unwrap(); let escaped_path = temp_dir.path().join("table_escape"); + let identifier = deserialize_identifier("db1", "../../table_escape"); let result = catalog - .create_table( - &Identifier::new("db1", "../../table_escape"), - testing_schema(), - false, - ) + .create_table(&identifier, testing_schema(), false) .await; assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); @@ -640,20 +657,15 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let source = Identifier::new("db1", "source"); + let source = Identifier::new("db1", "source").unwrap(); catalog .create_table(&source, testing_schema(), false) .await .unwrap(); let escaped_path = temp_dir.path().join("renamed_escape"); - let result = catalog - .rename_table( - &source, - &Identifier::new("db1", "../../renamed_escape"), - false, - ) - .await; + let target = deserialize_identifier("db1", "../../renamed_escape"); + let result = catalog.rename_table(&source, &target, false).await; assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); assert!(!escaped_path.exists()); @@ -663,7 +675,7 @@ mod tests { #[tokio::test] async fn test_list_partitions_default_table_not_found_errors() { let (_temp_dir, catalog) = create_test_catalog(); - let id = Identifier::new("nope_db", "nope_table"); + let id = Identifier::new("nope_db", "nope_table").unwrap(); let result = catalog.list_partitions(&id).await; assert!( matches!( @@ -681,7 +693,7 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let id = Identifier::new("db1", "t1"); + let id = Identifier::new("db1", "t1").unwrap(); catalog .create_table(&id, testing_schema(), false) .await @@ -702,7 +714,7 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let id = Identifier::new("db1", "t1"); + let id = Identifier::new("db1", "t1").unwrap(); catalog .create_table(&id, testing_schema(), false) .await diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 875f5644..fc12576d 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -69,17 +69,12 @@ pub struct Identifier { } impl Identifier { - /// Create an identifier from database and object name. - pub fn new(database: impl Into, object: impl Into) -> Self { - Self { + /// Create an identifier from database and object name, validating both names. + pub fn new(database: impl Into, object: impl Into) -> Result { + let identifier = Self { database: database.into(), object: object.into(), - } - } - - /// Create an identifier from database and object name, validating both names. - pub fn try_new(database: impl Into, object: impl Into) -> Result { - let identifier = Self::new(database, object); + }; identifier.validate()?; Ok(identifier) } @@ -308,7 +303,7 @@ mod tests { use super::*; #[test] - fn test_identifier_try_new_should_reject_path_control_names() { + fn test_identifier_new_should_reject_path_control_names() { for (database, object) in [ ("", "table"), (" ", "table"), @@ -326,7 +321,7 @@ mod tests { ("db", "nested\\table"), ("db", "table\0name"), ] { - let result = Identifier::try_new(database, object); + let result = Identifier::new(database, object); assert!( matches!(result, Err(Error::IdentifierInvalid { .. })), "expected invalid identifier for database={database:?}, object={object:?}, got {result:?}" @@ -335,12 +330,12 @@ mod tests { } #[test] - fn test_identifier_try_new_should_allow_system_suffix_and_unicode_names() { - let identifier = Identifier::try_new("analytics", "orders$snapshots").unwrap(); + fn test_identifier_new_should_allow_system_suffix_and_unicode_names() { + let identifier = Identifier::new("analytics", "orders$snapshots").unwrap(); assert_eq!(identifier.database(), "analytics"); assert_eq!(identifier.object(), "orders$snapshots"); - let identifier = Identifier::try_new("数据", "订单").unwrap(); + let identifier = Identifier::new("数据", "订单").unwrap(); assert_eq!(identifier.database(), "数据"); assert_eq!(identifier.object(), "订单"); } diff --git a/crates/paimon/src/table/cow_writer.rs b/crates/paimon/src/table/cow_writer.rs index c2b5e958..2c2111ea 100644 --- a/crates/paimon/src/table/cow_writer.rs +++ b/crates/paimon/src/table/cow_writer.rs @@ -531,7 +531,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_cow"), + Identifier::new("default", "test_cow").unwrap(), table_path.to_string(), test_append_schema(), None, @@ -549,7 +549,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), "memory:/test".to_string(), TableSchema::new(0, &schema), None, @@ -573,7 +573,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), "memory:/test".to_string(), TableSchema::new(0, &schema), None, @@ -598,7 +598,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), "memory:/test".to_string(), TableSchema::new(0, &schema), None, diff --git a/crates/paimon/src/table/data_evolution_reader.rs b/crates/paimon/src/table/data_evolution_reader.rs index 487ed951..05ee6e4a 100644 --- a/crates/paimon/src/table/data_evolution_reader.rs +++ b/crates/paimon/src/table/data_evolution_reader.rs @@ -1354,7 +1354,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "blob_t"), + Identifier::new("default", "blob_t").unwrap(), table_path, table_schema, None, @@ -1447,7 +1447,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "blob_multi_t"), + Identifier::new("default", "blob_multi_t").unwrap(), table_path, table_schema, None, diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs index fe9e2348..6df4259d 100644 --- a/crates/paimon/src/table/data_evolution_writer.rs +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -710,7 +710,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_de_table"), + Identifier::new("default", "test_de_table").unwrap(), table_path.to_string(), test_data_evolution_schema(), None, @@ -1055,7 +1055,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), "memory:/test".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 3e7684d1..4dc076bc 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -335,7 +335,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), "/tmp/test-read-builder".to_string(), table_schema, None, @@ -357,7 +357,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "partial_update_dv_t"), + Identifier::new("default", "partial_update_dv_t").unwrap(), "/tmp/test-partial-update-dv-read-builder".to_string(), table_schema, None, @@ -414,7 +414,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path, table_schema, None, @@ -473,7 +473,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path, table_schema, None, @@ -530,7 +530,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path, table_schema, None, @@ -590,7 +590,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), table_path, table_schema, None, @@ -641,7 +641,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), "/tmp/test".to_string(), table_schema, None, @@ -697,7 +697,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t"), + Identifier::new("default", "t").unwrap(), "/tmp/test".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 0de494ea..e08da645 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -1131,7 +1131,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_schema(), None, @@ -1141,7 +1141,7 @@ mod tests { fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_partitioned_schema(), None, @@ -1436,7 +1436,7 @@ mod tests { fn test_row_tracking_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_row_tracking_schema(), None, diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index be0d2df4..ce1bb5f1 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -913,7 +913,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); Table::new( file_io, - Identifier::new("test_db", "test_table"), + Identifier::new("test_db", "test_table").unwrap(), "/tmp/test-table".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 49c00198..41e436ba 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -737,7 +737,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_schema(), None, @@ -747,7 +747,7 @@ mod tests { fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_partitioned_schema(), None, @@ -837,7 +837,7 @@ mod tests { fn test_allows_append_blob_table() { let table = Table::new( test_file_io(), - Identifier::new("default", "test_blob_table"), + Identifier::new("default", "test_blob_table").unwrap(), "memory:/test_blob_table".to_string(), test_blob_table_schema(), None, @@ -854,7 +854,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_blob_table"), + Identifier::new("default", "test_blob_table").unwrap(), table_path.to_string(), test_blob_table_schema(), None, @@ -916,7 +916,7 @@ mod tests { fn test_allows_partial_update_fixed_bucket_table() { let table = Table::new( test_file_io(), - Identifier::new("default", "test_partial_update_table"), + Identifier::new("default", "test_partial_update_table").unwrap(), "memory:/test_partial_update_table".to_string(), TableSchema::new( 0, @@ -943,7 +943,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_partial_update_dynamic_bucket_table"), + Identifier::new("default", "test_partial_update_dynamic_bucket_table").unwrap(), table_path.to_string(), TableSchema::new( 0, @@ -973,7 +973,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_partial_update_dv_table"), + Identifier::new("default", "test_partial_update_dv_table").unwrap(), table_path.to_string(), TableSchema::new( 0, @@ -1120,7 +1120,7 @@ mod tests { fn test_bucketed_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), test_bucketed_schema(), None, @@ -1265,7 +1265,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), table_schema, None, @@ -1344,7 +1344,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io.clone(), - Identifier::new("default", "test_table"), + Identifier::new("default", "test_table").unwrap(), table_path.to_string(), table_schema, None, @@ -1389,7 +1389,7 @@ mod tests { fn test_pk_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_pk_table"), + Identifier::new("default", "test_pk_table").unwrap(), table_path.to_string(), test_pk_schema(), None, @@ -1588,7 +1588,7 @@ mod tests { fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_postpone_table"), + Identifier::new("default", "test_postpone_table").unwrap(), table_path.to_string(), test_postpone_pk_schema(), None, @@ -1611,7 +1611,7 @@ mod tests { fn test_postpone_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_postpone_table"), + Identifier::new("default", "test_postpone_table").unwrap(), table_path.to_string(), test_postpone_partitioned_schema(), None, @@ -1860,7 +1860,7 @@ mod tests { fn test_cross_partition_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_cross_partition"), + Identifier::new("default", "test_cross_partition").unwrap(), table_path.to_string(), test_cross_partition_schema(), None, @@ -1891,7 +1891,7 @@ mod tests { .unwrap(); let table2 = Table::new( file_io.clone(), - Identifier::new("default", "test"), + Identifier::new("default", "test").unwrap(), table_path.to_string(), TableSchema::new(0, &schema), None, @@ -1918,7 +1918,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test_cross_partial_update"), + Identifier::new("default", "test_cross_partial_update").unwrap(), table_path.to_string(), TableSchema::new(0, &schema), None, diff --git a/crates/paimon/tests/rest_api_test.rs b/crates/paimon/tests/rest_api_test.rs index 74784b34..0130d0bd 100644 --- a/crates/paimon/tests/rest_api_test.rs +++ b/crates/paimon/tests/rest_api_test.rs @@ -254,7 +254,7 @@ async fn test_list_tables_and_get_table() { // Get table let table_resp = ctx .api - .get_table(&Identifier::new("default", "table1")) + .get_table(&Identifier::new("default", "table1").unwrap()) .await .unwrap(); assert_eq!(table_resp.id.unwrap_or_default(), "table1"); @@ -266,7 +266,7 @@ async fn test_get_table_not_found() { let result = ctx .api - .get_table(&Identifier::new("default", "non_existent_table")) + .get_table(&Identifier::new("default", "non_existent_table").unwrap()) .await; assert!(result.is_err(), "getting non-existent table should fail"); } @@ -320,7 +320,7 @@ async fn test_create_table() { let result = ctx .api - .create_table(&Identifier::new("default", "new_table"), schema) + .create_table(&Identifier::new("default", "new_table").unwrap(), schema) .await; assert!(result.is_ok(), "failed to create table: {result:?}"); @@ -331,7 +331,7 @@ async fn test_create_table() { // Get the table let table_resp = ctx .api - .get_table(&Identifier::new("default", "new_table")) + .get_table(&Identifier::new("default", "new_table").unwrap()) .await .unwrap(); assert_eq!(table_resp.name, Some("new_table".to_string())); @@ -351,7 +351,7 @@ async fn test_drop_table() { // Drop table let result = ctx .api - .drop_table(&Identifier::new("default", "table_to_drop")) + .drop_table(&Identifier::new("default", "table_to_drop").unwrap()) .await; assert!(result.is_ok(), "failed to drop table: {result:?}"); @@ -362,7 +362,7 @@ async fn test_drop_table() { // Dropping non-existent table should fail let result = ctx .api - .drop_table(&Identifier::new("default", "table_to_drop")) + .drop_table(&Identifier::new("default", "table_to_drop").unwrap()) .await; assert!(result.is_err(), "dropping non-existent table should fail"); } @@ -375,7 +375,7 @@ async fn test_drop_table_no_permission() { let result = ctx .api - .drop_table(&Identifier::new("default", "secret_table")) + .drop_table(&Identifier::new("default", "secret_table").unwrap()) .await; assert!(result.is_err(), "dropping no-permission table should fail"); } @@ -393,8 +393,8 @@ async fn test_rename_table() { let result = ctx .api .rename_table( - &Identifier::new("default", "old_table"), - &Identifier::new("default", "new_table"), + &Identifier::new("default", "old_table").unwrap(), + &Identifier::new("default", "new_table").unwrap(), ) .await; assert!(result.is_ok(), "failed to rename table: {result:?}"); @@ -409,7 +409,7 @@ async fn test_rename_table() { // Get the renamed table let table_resp = ctx .api - .get_table(&Identifier::new("default", "new_table")) + .get_table(&Identifier::new("default", "new_table").unwrap()) .await .unwrap(); assert_eq!(table_resp.name, Some("new_table".to_string())); diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index 2166b44d..da4f538b 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -249,7 +249,7 @@ async fn test_catalog_get_table() { "file:///tmp/test_warehouse/default.db/my_table", ); - let identifier = Identifier::new("default", "my_table"); + let identifier = Identifier::new("default", "my_table").unwrap(); let table = ctx.catalog.get_table(&identifier).await; assert!(table.is_ok(), "failed to get table: {table:?}"); } @@ -258,7 +258,7 @@ async fn test_catalog_get_table() { async fn test_catalog_get_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent"); + let identifier = Identifier::new("default", "non_existent").unwrap(); let result = ctx.catalog.get_table(&identifier).await; assert!(result.is_err(), "getting non-existent table should fail"); } @@ -304,7 +304,7 @@ async fn test_catalog_get_table_propagates_oss_options_in_else_branch() { "oss://test-bucket/warehouse/default.db/oss_table", ); - let identifier = Identifier::new("default", "oss_table"); + let identifier = Identifier::new("default", "oss_table").unwrap(); let result = catalog.get_table(&identifier).await; assert!( result.is_ok(), @@ -318,7 +318,7 @@ async fn test_catalog_create_table() { let ctx = setup_catalog(vec!["default"]).await; let schema = test_schema(); - let identifier = Identifier::new("default", "new_table"); + let identifier = Identifier::new("default", "new_table").unwrap(); let result = ctx.catalog.create_table(&identifier, schema, false).await; assert!(result.is_ok(), "failed to create table: {result:?}"); @@ -336,7 +336,7 @@ async fn test_catalog_create_table_already_exists() { ctx.server.add_table("default", "existing_table"); let schema = test_schema(); - let identifier = Identifier::new("default", "existing_table"); + let identifier = Identifier::new("default", "existing_table").unwrap(); // Create with ignore_if_exists=false should fail let result = ctx.catalog.create_table(&identifier, schema, false).await; @@ -354,7 +354,7 @@ async fn test_catalog_create_table_ignore_if_exists() { ctx.server.add_table("default", "existing_table"); let schema = test_schema(); - let identifier = Identifier::new("default", "existing_table"); + let identifier = Identifier::new("default", "existing_table").unwrap(); // Create with ignore_if_exists=true should succeed let result = ctx.catalog.create_table(&identifier, schema, true).await; @@ -371,7 +371,7 @@ async fn test_catalog_drop_table() { // Add a table ctx.server.add_table("default", "table_to_drop"); - let identifier = Identifier::new("default", "table_to_drop"); + let identifier = Identifier::new("default", "table_to_drop").unwrap(); // Drop table let result = ctx.catalog.drop_table(&identifier, false).await; @@ -386,7 +386,7 @@ async fn test_catalog_drop_table() { async fn test_catalog_drop_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent"); + let identifier = Identifier::new("default", "non_existent").unwrap(); // Drop with ignore_if_not_exists=false should fail let result = ctx.catalog.drop_table(&identifier, false).await; @@ -400,7 +400,7 @@ async fn test_catalog_drop_table_not_found() { async fn test_catalog_drop_table_ignore_if_not_exists() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent"); + let identifier = Identifier::new("default", "non_existent").unwrap(); // Drop with ignore_if_not_exists=true should succeed let result = ctx.catalog.drop_table(&identifier, true).await; @@ -419,8 +419,8 @@ async fn test_catalog_rename_table() { // Add a table ctx.server.add_table("default", "old_table"); - let from = Identifier::new("default", "old_table"); - let to = Identifier::new("default", "new_table"); + let from = Identifier::new("default", "old_table").unwrap(); + let to = Identifier::new("default", "new_table").unwrap(); // Rename table let result = ctx.catalog.rename_table(&from, &to, false).await; @@ -436,8 +436,8 @@ async fn test_catalog_rename_table() { async fn test_catalog_rename_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let from = Identifier::new("default", "non_existent"); - let to = Identifier::new("default", "new_name"); + let from = Identifier::new("default", "non_existent").unwrap(); + let to = Identifier::new("default", "new_name").unwrap(); // Rename with ignore_if_not_exists=false should fail let result = ctx.catalog.rename_table(&from, &to, false).await; @@ -451,8 +451,8 @@ async fn test_catalog_rename_table_not_found() { async fn test_catalog_rename_table_ignore_if_not_exists() { let ctx = setup_catalog(vec!["default"]).await; - let from = Identifier::new("default", "non_existent"); - let to = Identifier::new("default", "new_name"); + let from = Identifier::new("default", "non_existent").unwrap(); + let to = Identifier::new("default", "new_name").unwrap(); // Rename with ignore_if_not_exists=true should succeed let result = ctx.catalog.rename_table(&from, &to, true).await; @@ -468,7 +468,7 @@ async fn test_catalog_rename_table_ignore_if_not_exists() { async fn test_catalog_alter_table_unsupported() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "some_table"); + let identifier = Identifier::new("default", "some_table").unwrap(); // alter_table should return Unsupported error let result = ctx.catalog.alter_table(&identifier, vec![], false).await; From 199b46fdc252ec047f3ef27928ef5f250b996194 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 20 May 2026 12:16:03 +0200 Subject: [PATCH 4/5] fix: unwrap input changelog test identifiers --- crates/paimon/src/table/table_write.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 30a5c8b2..41117c65 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -1585,7 +1585,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog"), + Identifier::new("default", "test_input_changelog").unwrap(), table_path.to_string(), pk_changelog_schema(&[ ("changelog-producer", "input"), @@ -1643,7 +1643,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog"), + Identifier::new("default", "test_input_changelog").unwrap(), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1672,7 +1672,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog"), + Identifier::new("default", "test_input_changelog").unwrap(), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1698,7 +1698,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog"), + Identifier::new("default", "test_input_changelog").unwrap(), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1766,7 +1766,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog_dynamic_bucket"), + Identifier::new("default", "test_input_changelog_dynamic_bucket").unwrap(), table_path.to_string(), ordinary_dynamic_pk_changelog_schema(), None, @@ -1846,7 +1846,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog"), + Identifier::new("default", "test_input_changelog").unwrap(), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, From 16b9a29299d1a9bd5abac42637a9a5a553bba0fd Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 20 May 2026 13:56:54 +0200 Subject: [PATCH 5/5] fix: keep identifier constructor infallible --- bindings/c/src/identifier.rs | 49 +--------------- .../integration_tests/tests/append_tables.rs | 2 +- crates/integration_tests/tests/read_tables.rs | 2 +- crates/integrations/datafusion/src/catalog.rs | 16 ++---- .../datafusion/src/filter_pushdown.rs | 2 +- .../integrations/datafusion/src/merge_into.rs | 6 +- .../datafusion/src/physical_plan/scan.rs | 4 +- .../integrations/datafusion/src/procedures.rs | 4 +- .../datafusion/src/sql_context.rs | 53 ++++++++++-------- .../datafusion/src/system_tables/mod.rs | 2 +- .../integrations/datafusion/src/table/mod.rs | 8 +-- .../datafusion/src/table_function_args.rs | 8 +-- crates/integrations/datafusion/src/update.rs | 4 +- .../tests/cross_partition_tables.rs | 8 +-- .../datafusion/tests/dynamic_bucket_tables.rs | 16 +++--- .../datafusion/tests/merge_into_tests.rs | 4 +- .../datafusion/tests/pk_tables.rs | 12 ++-- .../datafusion/tests/read_tables.rs | 4 +- .../datafusion/tests/sql_context_tests.rs | 22 +++----- .../datafusion/tests/system_tables.rs | 14 ++--- .../paimon/examples/rest_catalog_example.rs | 6 +- crates/paimon/src/api/api_request.rs | 4 +- crates/paimon/src/catalog/filesystem.rs | 56 ++++++++----------- crates/paimon/src/catalog/mod.rs | 28 +++++----- crates/paimon/src/table/cow_writer.rs | 8 +-- .../paimon/src/table/data_evolution_reader.rs | 4 +- .../paimon/src/table/data_evolution_writer.rs | 4 +- crates/paimon/src/table/read_builder.rs | 16 +++--- crates/paimon/src/table/table_commit.rs | 6 +- crates/paimon/src/table/table_scan.rs | 2 +- crates/paimon/src/table/table_write.rs | 44 +++++++-------- crates/paimon/tests/rest_api_test.rs | 20 +++---- crates/paimon/tests/rest_catalog_test.rs | 32 +++++------ 33 files changed, 204 insertions(+), 266 deletions(-) diff --git a/bindings/c/src/identifier.rs b/bindings/c/src/identifier.rs index 9aa36e25..1ccb36dd 100644 --- a/bindings/c/src/identifier.rs +++ b/bindings/c/src/identifier.rs @@ -19,7 +19,7 @@ use std::ffi::{c_char, c_void}; use paimon::catalog::Identifier; -use crate::error::{paimon_error, validate_cstr}; +use crate::error::validate_cstr; use crate::result::paimon_result_identifier_new; use crate::types::paimon_identifier; @@ -50,17 +50,8 @@ pub unsafe extern "C" fn paimon_identifier_new( } } }; - let identifier = match Identifier::new(db, obj) { - Ok(identifier) => identifier, - Err(e) => { - return paimon_result_identifier_new { - identifier: std::ptr::null_mut(), - error: paimon_error::from_paimon(e), - } - } - }; let wrapper = Box::new(paimon_identifier { - inner: Box::into_raw(Box::new(identifier)) as *mut c_void, + inner: Box::into_raw(Box::new(Identifier::new(db, obj))) as *mut c_void, }); paimon_result_identifier_new { identifier: Box::into_raw(wrapper), @@ -81,39 +72,3 @@ pub unsafe extern "C" fn paimon_identifier_free(id: *mut paimon_identifier) { } } } - -#[cfg(test)] -mod tests { - use std::ffi::CString; - - use super::*; - use crate::error::paimon_error_free; - - #[test] - fn test_paimon_identifier_new_should_reject_path_control_names() { - let database = CString::new("db").unwrap(); - let object = CString::new("../escaped").unwrap(); - - let result = unsafe { paimon_identifier_new(database.as_ptr(), object.as_ptr()) }; - - assert!(result.identifier.is_null()); - assert!(!result.error.is_null()); - unsafe { - paimon_error_free(result.error); - } - } - - #[test] - fn test_paimon_identifier_new_should_allow_safe_names() { - let database = CString::new("db").unwrap(); - let object = CString::new("table$snapshots").unwrap(); - - let result = unsafe { paimon_identifier_new(database.as_ptr(), object.as_ptr()) }; - - assert!(result.error.is_null()); - assert!(!result.identifier.is_null()); - unsafe { - paimon_identifier_free(result.identifier); - } - } -} diff --git a/crates/integration_tests/tests/append_tables.rs b/crates/integration_tests/tests/append_tables.rs index 5df43d14..981c44d9 100644 --- a/crates/integration_tests/tests/append_tables.rs +++ b/crates/integration_tests/tests/append_tables.rs @@ -51,7 +51,7 @@ async fn setup_dirs(file_io: &paimon::io::FileIO, table_path: &str) { fn make_table(file_io: &paimon::io::FileIO, table_path: &str, schema: TableSchema) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), table_path.to_string(), schema, None, diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 3f0a043a..b89268c1 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -71,7 +71,7 @@ async fn get_table_from_catalog( catalog: &C, table_name: &str, ) -> paimon::Table { - let identifier = Identifier::new("default", table_name).unwrap(); + let identifier = Identifier::new("default", table_name); catalog .get_table(&identifier) .await diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 3bc748c0..a93201aa 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -214,7 +214,7 @@ impl PaimonCatalogProvider { let catalog = Arc::clone(&self.catalog); let db = database.to_string(); let tbl = table_name.to_string(); - let identifier = Identifier::new(db, tbl).map_err(to_datafusion_error)?; + let identifier = Identifier::new(db, tbl); if let Ok(true) = block_on_with_runtime( async move { match catalog.get_table(&identifier).await { @@ -372,8 +372,7 @@ impl SchemaProvider for PaimonSchemaProvider { let catalog = Arc::clone(&self.catalog); let dynamic_options = Arc::clone(&self.dynamic_options); - let identifier = - Identifier::new(self.database.clone(), base).map_err(to_datafusion_error)?; + let identifier = Identifier::new(self.database.clone(), base); await_with_runtime(async move { match catalog.get_table(&identifier).await { Ok(table) => { @@ -408,13 +407,7 @@ impl SchemaProvider for PaimonSchemaProvider { } let catalog = Arc::clone(&self.catalog); - let identifier = match Identifier::new(self.database.clone(), base.to_string()) { - Ok(identifier) => identifier, - Err(e) => { - log::error!("invalid table identifier '{}.{}': {e}", self.database, base); - return false; - } - }; + let identifier = Identifier::new(self.database.clone(), base.to_string()); block_on_with_runtime( async move { match catalog.get_table(&identifier).await { @@ -442,8 +435,7 @@ impl SchemaProvider for PaimonSchemaProvider { fn deregister_table(&self, name: &str) -> DFResult>> { let catalog = Arc::clone(&self.catalog); - let identifier = - Identifier::new(self.database.clone(), name).map_err(to_datafusion_error)?; + let identifier = Identifier::new(self.database.clone(), name); block_on_with_runtime( async move { // Try to get the table first so we can return it. diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 0d6af96b..0d418270 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -397,7 +397,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), "/tmp/test-filter-pushdown".to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/src/merge_into.rs b/crates/integrations/datafusion/src/merge_into.rs index 04945837..cd78ddec 100644 --- a/crates/integrations/datafusion/src/merge_into.rs +++ b/crates/integrations/datafusion/src/merge_into.rs @@ -1496,7 +1496,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name).unwrap()) + .get_table(&Identifier::new("test_db", name)) .await .unwrap(); let mut extra = std::collections::HashMap::new(); @@ -1645,7 +1645,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path.to_string(), table_schema, None, @@ -1689,7 +1689,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name).unwrap()) + .get_table(&Identifier::new("test_db", name)) .await .unwrap(); diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 896d673b..a7387f4a 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -322,7 +322,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); Table::new( file_io, - Identifier::new("test_db", "test_table").unwrap(), + Identifier::new("test_db", "test_table"), "/tmp/test-table".to_string(), table_schema, None, @@ -354,7 +354,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path, table_schema, None, diff --git a/crates/integrations/datafusion/src/procedures.rs b/crates/integrations/datafusion/src/procedures.rs index a3a8b9a1..183788ab 100644 --- a/crates/integrations/datafusion/src/procedures.rs +++ b/crates/integrations/datafusion/src/procedures.rs @@ -233,7 +233,7 @@ fn require_arg<'a>(args: &'a HashMap, name: &str) -> DFResult<&' fn resolve_table_identifier(table_str: &str, catalog_name: &str) -> DFResult { let parts: Vec<&str> = table_str.split('.').collect(); match parts.len() { - 2 => Identifier::new(parts[0], parts[1]).map_err(to_datafusion_error), + 2 => Ok(Identifier::new(parts[0], parts[1])), 3 => { if parts[0] != catalog_name { return Err(DataFusionError::Plan(format!( @@ -241,7 +241,7 @@ fn resolve_table_identifier(table_str: &str, catalog_name: &str) -> DFResult Err(DataFusionError::Plan(format!( "Invalid table identifier: '{table_str}'. Expected 'database.table' or 'catalog.database.table'" diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 58665327..f77b79e2 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -554,16 +554,20 @@ impl SQLContext { .catalogs .get(catalog.as_ref()) .ok_or_else(|| DataFusionError::Plan(format!("Unknown catalog '{catalog}'")))?; - let identifier = Identifier::new(schema.as_ref(), table.as_ref()) - .map_err(to_datafusion_error)?; - Ok((catalog_arc.clone(), catalog.to_string(), identifier)) + Ok(( + catalog_arc.clone(), + catalog.to_string(), + Identifier::new(schema.as_ref(), table.as_ref()), + )) } datafusion::common::TableReference::Partial { schema, table } => { let catalog = self.current_catalog()?; let catalog_name = self.current_catalog_name(); - let identifier = Identifier::new(schema.as_ref(), table.as_ref()) - .map_err(to_datafusion_error)?; - Ok((catalog, catalog_name, identifier)) + Ok(( + catalog, + catalog_name, + Identifier::new(schema.as_ref(), table.as_ref()), + )) } datafusion::common::TableReference::Bare { table } => { let catalog = self.current_catalog()?; @@ -575,9 +579,11 @@ impl SQLContext { .catalog .default_schema .clone(); - let identifier = - Identifier::new(default_schema, table.as_ref()).map_err(to_datafusion_error)?; - Ok((catalog, catalog_name, identifier)) + Ok(( + catalog, + catalog_name, + Identifier::new(default_schema, table.as_ref()), + )) } } } @@ -840,10 +846,7 @@ impl SQLContext { object_name_to_string(name) } }; - rename_to = Some( - Identifier::new(identifier.database().to_string(), new_name) - .map_err(to_datafusion_error)?, - ); + rename_to = Some(Identifier::new(identifier.database().to_string(), new_name)); } AlterTableOperation::SetTblProperties { table_properties } => { for opt in table_properties { @@ -1253,15 +1256,19 @@ impl SQLContext { let catalog = self.catalogs.get(&parts[0]).ok_or_else(|| { DataFusionError::Plan(format!("Unknown catalog '{}'", parts[0])) })?; - let identifier = Identifier::new(parts[1].clone(), parts[2].clone()) - .map_err(to_datafusion_error)?; - Ok((catalog.clone(), parts[0].clone(), identifier)) + Ok(( + catalog.clone(), + parts[0].clone(), + Identifier::new(parts[1].clone(), parts[2].clone()), + )) } 2 => { let catalog = self.current_catalog()?; - let identifier = Identifier::new(parts[0].clone(), parts[1].clone()) - .map_err(to_datafusion_error)?; - Ok((catalog, self.current_catalog_name(), identifier)) + Ok(( + catalog, + self.current_catalog_name(), + Identifier::new(parts[0].clone(), parts[1].clone()), + )) } 1 => { let catalog = self.current_catalog()?; @@ -1272,9 +1279,11 @@ impl SQLContext { .catalog .default_schema .clone(); - let identifier = Identifier::new(default_schema, parts[0].clone()) - .map_err(to_datafusion_error)?; - Ok((catalog, self.current_catalog_name(), identifier)) + Ok(( + catalog, + self.current_catalog_name(), + Identifier::new(default_schema, parts[0].clone()), + )) } _ => Err(DataFusionError::Plan(format!( "Invalid table reference: {name}" diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index 2ecef4b9..0305d2f8 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -136,7 +136,7 @@ pub(crate) async fn load( if !is_registered(&system_name) { return Ok(None); } - let identifier = Identifier::new(database, base.clone()).map_err(to_datafusion_error)?; + let identifier = Identifier::new(database, base.clone()); match catalog.get_table(&identifier).await { Ok(table) => { if system_name.eq_ignore_ascii_case("partitions") { diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index e7f8c31d..5bae7443 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -285,7 +285,7 @@ mod tests { async fn create_provider(table_name: &str) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name).unwrap(); + let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) .await @@ -536,7 +536,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_insert").unwrap(), + Identifier::new("default", "test_insert"), table_path.to_string(), table_schema, None, @@ -616,7 +616,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_overwrite").unwrap(), + Identifier::new("default", "test_overwrite"), table_path.to_string(), table_schema, None, @@ -704,7 +704,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = paimon::table::Table::new( file_io, - Identifier::new("default", "test_overwrite_unpart").unwrap(), + Identifier::new("default", "test_overwrite_unpart"), table_path.to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/src/table_function_args.rs b/crates/integrations/datafusion/src/table_function_args.rs index d7b750cf..c0a6c833 100644 --- a/crates/integrations/datafusion/src/table_function_args.rs +++ b/crates/integrations/datafusion/src/table_function_args.rs @@ -20,8 +20,6 @@ use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::Expr; use paimon::catalog::Identifier; -use crate::error::to_datafusion_error; - pub(crate) fn extract_string_literal( function_name: &str, expr: &Expr, @@ -74,9 +72,9 @@ pub(crate) fn parse_table_identifier( ) -> DFResult { let parts: Vec<&str> = name.split('.').collect(); match parts.len() { - 1 => Identifier::new(default_database, parts[0]).map_err(to_datafusion_error), - 2 => Identifier::new(parts[0], parts[1]).map_err(to_datafusion_error), - 3 => Identifier::new(parts[1], parts[2]).map_err(to_datafusion_error), + 1 => Ok(Identifier::new(default_database, parts[0])), + 2 => Ok(Identifier::new(parts[0], parts[1])), + 3 => Ok(Identifier::new(parts[1], parts[2])), _ => Err(DataFusionError::Plan(format!( "{function_name}: invalid table name '{name}', expected 'table', 'database.table', or 'catalog.database.table'" ))), diff --git a/crates/integrations/datafusion/src/update.rs b/crates/integrations/datafusion/src/update.rs index 1e19e08f..8803dcc6 100644 --- a/crates/integrations/datafusion/src/update.rs +++ b/crates/integrations/datafusion/src/update.rs @@ -374,7 +374,7 @@ mod tests { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", name).unwrap()) + .get_table(&Identifier::new("test_db", name)) .await .unwrap(); @@ -564,7 +564,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path.to_string(), table_schema, None, diff --git a/crates/integrations/datafusion/tests/cross_partition_tables.rs b/crates/integrations/datafusion/tests/cross_partition_tables.rs index 0b37a725..13183647 100644 --- a/crates/integrations/datafusion/tests/cross_partition_tables.rs +++ b/crates/integrations/datafusion/tests/cross_partition_tables.rs @@ -290,7 +290,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify initial state: 2 data files (one per partition) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) + .get_table(&Identifier::new("test_db", "t_cross_dv")) .await .unwrap(); let plan = table @@ -318,7 +318,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify via scan_all_files: old partition "a" should have a new file // containing the DELETE record for id=1 (written with _VALUE_KIND=1) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) + .get_table(&Identifier::new("test_db", "t_cross_dv")) .await .unwrap(); let plan = table @@ -396,7 +396,7 @@ async fn test_cross_partition_delete_file_in_old_partition() { // Verify partition "a" now has another DELETE file for id=2 let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_dv").unwrap()) + .get_table(&Identifier::new("test_db", "t_cross_dv")) .await .unwrap(); let plan = table @@ -486,7 +486,7 @@ async fn test_cross_partition_first_row_skip() { // Use scan_all_files to verify (FIRST_ROW skips level-0 in normal reads) let table = catalog - .get_table(&Identifier::new("test_db", "t_cross_fr").unwrap()) + .get_table(&Identifier::new("test_db", "t_cross_fr")) .await .unwrap(); let rb = table.new_read_builder(); diff --git a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs index 21fde4ce..4c8cbcb5 100644 --- a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs +++ b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs @@ -275,7 +275,7 @@ async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_partial_route").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) .await .unwrap(); assert_eq!( @@ -301,7 +301,7 @@ async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_partial_route").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) .await .unwrap(); let id1_bucket_after = bucket_containing_id(&table, 1).await; @@ -705,7 +705,7 @@ async fn test_pk_dynamic_bucket_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_ow")) .await .unwrap(); let hashes_before = collect_all_hashes(&table).await; @@ -733,7 +733,7 @@ async fn test_pk_dynamic_bucket_insert_overwrite() { // Verify HASH index: should have exactly 2 entries (not 3+2=5) let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_ow")) .await .unwrap(); let hashes_after = collect_all_hashes(&table).await; @@ -788,7 +788,7 @@ async fn test_pk_dynamic_bucket_partitioned_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_part_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_part_ow")) .await .unwrap(); let entries_before = read_hash_index_entries(&table).await; @@ -818,7 +818,7 @@ async fn test_pk_dynamic_bucket_partitioned_insert_overwrite() { // Verify HASH index: partition 'b' entries should survive, // partition 'a' should have exactly 1 entry (not 2+1=3) let table = catalog - .get_table(&Identifier::new("test_db", "t_dyn_part_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_part_ow")) .await .unwrap(); let entries_after = read_hash_index_entries(&table).await; @@ -860,7 +860,7 @@ async fn test_read_spark_dynamic_bucket_and_compare_index() { opts.set(CatalogOptions::WAREHOUSE, &warehouse); let spark_catalog = FileSystemCatalog::new(opts).unwrap(); let spark_table = spark_catalog - .get_table(&Identifier::new("default", "dynamic_bucket_pk_table").unwrap()) + .get_table(&Identifier::new("default", "dynamic_bucket_pk_table")) .await .unwrap(); @@ -958,7 +958,7 @@ async fn test_read_spark_dynamic_bucket_and_compare_index() { FileSystemCatalog::new(opts).unwrap() }; let rust_table = rust_catalog - .get_table(&Identifier::new("test_db", "t_dyn_cmp").unwrap()) + .get_table(&Identifier::new("test_db", "t_dyn_cmp")) .await .unwrap(); diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs b/crates/integrations/datafusion/tests/merge_into_tests.rs index 479bb3c7..0969c09f 100644 --- a/crates/integrations/datafusion/tests/merge_into_tests.rs +++ b/crates/integrations/datafusion/tests/merge_into_tests.rs @@ -457,7 +457,7 @@ async fn test_row_count_after_merge() { // Snapshot 1: 3 rows inserted let table = catalog - .get_table(&Identifier::new("test_db", "target").unwrap()) + .get_table(&Identifier::new("test_db", "target")) .await .unwrap(); let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -783,7 +783,7 @@ async fn test_merge_into_row_id_for_inserted_rows() { // Verify next_row_id in snapshot let table = catalog - .get_table(&Identifier::new("test_db", "target").unwrap()) + .get_table(&Identifier::new("test_db", "target")) .await .unwrap(); let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index 2c2be725..bf1a317b 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -1686,7 +1686,7 @@ async fn test_pk_first_row_insert_overwrite() { // Verify via scan_all_files: 2 level-0 files (one per partition) let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_fr_ow")) .await .unwrap(); let plan = table @@ -1712,7 +1712,7 @@ async fn test_pk_first_row_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_fr_ow")) .await .unwrap(); let plan = table @@ -1738,7 +1738,7 @@ async fn test_pk_first_row_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_fr_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_fr_ow")) .await .unwrap(); let plan = table @@ -1788,7 +1788,7 @@ async fn test_postpone_write_invisible_to_select() { // scan_all_files should find the postpone file let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone").unwrap()) + .get_table(&Identifier::new("test_db", "t_postpone")) .await .unwrap(); let plan = table @@ -1836,7 +1836,7 @@ async fn test_postpone_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_postpone_ow")) .await .unwrap(); let plan = table @@ -1859,7 +1859,7 @@ async fn test_postpone_insert_overwrite() { .unwrap(); let table = catalog - .get_table(&Identifier::new("test_db", "t_postpone_ow").unwrap()) + .get_table(&Identifier::new("test_db", "t_postpone_ow")) .await .unwrap(); let plan = table diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 1dfc8eca..396e9514 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -51,7 +51,7 @@ async fn create_context() -> SQLContext { async fn create_provider(table_name: &str) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name).unwrap(); + let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) .await @@ -65,7 +65,7 @@ async fn create_provider_with_options( extra_options: HashMap, ) -> PaimonTableProvider { let catalog = create_catalog(); - let identifier = Identifier::new("default", table_name).unwrap(); + let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) .await diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index 73114512..7deed175 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -148,7 +148,7 @@ async fn test_create_table() { // Verify schema let table = catalog - .get_table(&Identifier::new("mydb", "users").unwrap()) + .get_table(&Identifier::new("mydb", "users")) .await .unwrap(); let schema = table.schema(); @@ -178,7 +178,7 @@ async fn test_create_table_with_blob_type() { .expect("CREATE TABLE with BLOB should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "assets").unwrap()) + .get_table(&Identifier::new("mydb", "assets")) .await .unwrap(); let schema = table.schema(); @@ -214,7 +214,7 @@ async fn test_create_table_with_partition() { .expect("CREATE TABLE with partition should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "events").unwrap()) + .get_table(&Identifier::new("mydb", "events")) .await .unwrap(); let schema = table.schema(); @@ -333,7 +333,7 @@ async fn test_create_table_with_array_and_map() { .expect("CREATE TABLE with ARRAY and MAP should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "complex_types").unwrap()) + .get_table(&Identifier::new("mydb", "complex_types")) .await .unwrap(); let schema = table.schema(); @@ -386,7 +386,7 @@ async fn test_create_table_with_row_type() { .expect("CREATE TABLE with STRUCT should succeed"); let table = catalog - .get_table(&Identifier::new("mydb", "row_table").unwrap()) + .get_table(&Identifier::new("mydb", "row_table")) .await .unwrap(); let schema = table.schema(); @@ -427,7 +427,7 @@ async fn test_drop_table() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "to_drop").unwrap(), schema, false) + .create_table(&Identifier::new("mydb", "to_drop"), schema, false) .await .unwrap(); @@ -476,11 +476,7 @@ async fn test_alter_table_add_column() { .build() .unwrap(); catalog - .create_table( - &Identifier::new("mydb", "alter_test").unwrap(), - schema, - false, - ) + .create_table(&Identifier::new("mydb", "alter_test"), schema, false) .await .unwrap(); @@ -521,7 +517,7 @@ async fn test_alter_table_rename() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "old_name").unwrap(), schema, false) + .create_table(&Identifier::new("mydb", "old_name"), schema, false) .await .unwrap(); @@ -559,7 +555,7 @@ async fn test_ddl_context_delegates_select() { .build() .unwrap(); catalog - .create_table(&Identifier::new("mydb", "t1").unwrap(), schema, false) + .create_table(&Identifier::new("mydb", "t1"), schema, false) .await .unwrap(); diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index 86479e44..ad4805d0 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -101,7 +101,7 @@ async fn test_options_system_table() { } actual.sort(); - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog .get_table(&identifier) .await @@ -159,7 +159,7 @@ async fn test_table_indexes_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog .get_table(&identifier) .await @@ -329,7 +329,7 @@ async fn test_schemas_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog .get_table(&identifier) .await @@ -456,7 +456,7 @@ async fn test_snapshots_system_table() { } // Row count must match the snapshot directory listing. - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog .get_table(&identifier) .await @@ -595,7 +595,7 @@ async fn test_tags_system_table_empty_when_no_tag_dir() { async fn test_tags_system_table_with_seeded_tags() { let (ctx, catalog, tmp) = create_context().await; - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog.get_table(&identifier).await.unwrap(); let sm = paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -685,7 +685,7 @@ async fn test_manifests_system_table() { } } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog.get_table(&identifier).await.unwrap(); let sm = paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); @@ -831,7 +831,7 @@ async fn test_files_system_table() { assert_eq!(field.data_type(), dtype, "column {i} type"); } - let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()).unwrap(); + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); let table = catalog.get_table(&identifier).await.unwrap(); let plan = table .new_read_builder() diff --git a/crates/paimon/examples/rest_catalog_example.rs b/crates/paimon/examples/rest_catalog_example.rs index b55d4512..cca3cbdb 100644 --- a/crates/paimon/examples/rest_catalog_example.rs +++ b/crates/paimon/examples/rest_catalog_example.rs @@ -146,7 +146,7 @@ async fn main() { println!("\n=== Part 2: Table Operations ===\n"); // Create table - let table_identifier = Identifier::new("example_db", "users").unwrap(); + let table_identifier = Identifier::new("example_db", "users"); println!("Creating table '{table_identifier}'..."); let schema = create_test_schema(); match catalog.create_table(&table_identifier, schema, false).await { @@ -176,7 +176,7 @@ async fn main() { } // Rename table - let renamed_identifier = Identifier::new("example_db", "users_renamed").unwrap(); + let renamed_identifier = Identifier::new("example_db", "users_renamed"); println!("\nRenaming table '{table_identifier}' to '{renamed_identifier}'..."); match catalog .rename_table(&table_identifier, &renamed_identifier, false) @@ -191,7 +191,7 @@ async fn main() { // Try to read from an existing table (example_db.users_renamed) // This table must already exist on the REST catalog server - let read_table_identifier = Identifier::new("example_db", "users_renamed").unwrap(); + let read_table_identifier = Identifier::new("example_db", "users_renamed"); println!("Attempting to read from table '{read_table_identifier}'..."); match catalog.get_table(&read_table_identifier).await { diff --git a/crates/paimon/src/api/api_request.rs b/crates/paimon/src/api/api_request.rs index e19ea2b6..33a52fd7 100644 --- a/crates/paimon/src/api/api_request.rs +++ b/crates/paimon/src/api/api_request.rs @@ -123,8 +123,8 @@ mod tests { #[test] fn test_rename_table_request_serialization() { - let source = Identifier::new("db1".to_string(), "table1".to_string()).unwrap(); - let destination = Identifier::new("db2".to_string(), "table2".to_string()).unwrap(); + let source = Identifier::new("db1".to_string(), "table1".to_string()); + let destination = Identifier::new("db2".to_string(), "table2".to_string()); let req = RenameTableRequest::new(source, destination); let json = serde_json::to_string(&req).unwrap(); diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index cafc581d..7f18f952 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -470,14 +470,6 @@ mod tests { .unwrap() } - fn deserialize_identifier(database: &str, object: &str) -> Identifier { - serde_json::from_value(serde_json::json!({ - "database": database, - "object": object, - })) - .unwrap() - } - #[tokio::test] async fn test_database_operations() { let (_temp_dir, catalog) = create_test_catalog(); @@ -526,11 +518,7 @@ mod tests { .await .unwrap(); catalog - .create_table( - &Identifier::new("db1", "table1").unwrap(), - testing_schema(), - false, - ) + .create_table(&Identifier::new("db1", "table1"), testing_schema(), false) .await .unwrap(); let result = catalog.drop_database("db1", false, false).await; @@ -578,15 +566,11 @@ mod tests { // create and list tables let schema = testing_schema(); catalog - .create_table( - &Identifier::new("db1", "table1").unwrap(), - schema.clone(), - false, - ) + .create_table(&Identifier::new("db1", "table1"), schema.clone(), false) .await .unwrap(); catalog - .create_table(&Identifier::new("db1", "table2").unwrap(), schema, false) + .create_table(&Identifier::new("db1", "table2"), schema, false) .await .unwrap(); let tables = catalog.list_tables("db1").await.unwrap(); @@ -607,15 +591,11 @@ mod tests { .build() .unwrap(); catalog - .create_table( - &Identifier::new("db1", "table3").unwrap(), - schema_with_name, - false, - ) + .create_table(&Identifier::new("db1", "table3"), schema_with_name, false) .await .unwrap(); let table = catalog - .get_table(&Identifier::new("db1", "table3").unwrap()) + .get_table(&Identifier::new("db1", "table3")) .await .unwrap(); let table_schema = table.schema(); @@ -624,7 +604,7 @@ mod tests { // drop table catalog - .drop_table(&Identifier::new("db1", "table1").unwrap(), false) + .drop_table(&Identifier::new("db1", "table1"), false) .await .unwrap(); let tables = catalog.list_tables("db1").await.unwrap(); @@ -641,9 +621,12 @@ mod tests { .unwrap(); let escaped_path = temp_dir.path().join("table_escape"); - let identifier = deserialize_identifier("db1", "../../table_escape"); let result = catalog - .create_table(&identifier, testing_schema(), false) + .create_table( + &Identifier::new("db1", "../../table_escape"), + testing_schema(), + false, + ) .await; assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); @@ -657,15 +640,20 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let source = Identifier::new("db1", "source").unwrap(); + let source = Identifier::new("db1", "source"); catalog .create_table(&source, testing_schema(), false) .await .unwrap(); let escaped_path = temp_dir.path().join("renamed_escape"); - let target = deserialize_identifier("db1", "../../renamed_escape"); - let result = catalog.rename_table(&source, &target, false).await; + let result = catalog + .rename_table( + &source, + &Identifier::new("db1", "../../renamed_escape"), + false, + ) + .await; assert!(matches!(result, Err(Error::IdentifierInvalid { .. }))); assert!(!escaped_path.exists()); @@ -675,7 +663,7 @@ mod tests { #[tokio::test] async fn test_list_partitions_default_table_not_found_errors() { let (_temp_dir, catalog) = create_test_catalog(); - let id = Identifier::new("nope_db", "nope_table").unwrap(); + let id = Identifier::new("nope_db", "nope_table"); let result = catalog.list_partitions(&id).await; assert!( matches!( @@ -693,7 +681,7 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let id = Identifier::new("db1", "t1").unwrap(); + let id = Identifier::new("db1", "t1"); catalog .create_table(&id, testing_schema(), false) .await @@ -714,7 +702,7 @@ mod tests { .create_database("db1", false, HashMap::new()) .await .unwrap(); - let id = Identifier::new("db1", "t1").unwrap(); + let id = Identifier::new("db1", "t1"); catalog .create_table(&id, testing_schema(), false) .await diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index fc12576d..166d59a4 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -69,29 +69,27 @@ pub struct Identifier { } impl Identifier { - /// Create an identifier from database and object name, validating both names. - pub fn new(database: impl Into, object: impl Into) -> Result { - let identifier = Self { + /// Create an identifier from database and object name. + pub fn new(database: impl Into, object: impl Into) -> Self { + Self { database: database.into(), object: object.into(), - }; - identifier.validate()?; - Ok(identifier) + } } /// Validate this identifier's database and object names. - pub fn validate(&self) -> Result<()> { + pub(crate) fn validate(&self) -> Result<()> { Self::validate_database_name(&self.database)?; Self::validate_object_name(&self.object) } /// Validate a database name for path-safe catalog use. - pub fn validate_database_name(name: &str) -> Result<()> { + pub(crate) fn validate_database_name(name: &str) -> Result<()> { validate_identifier_name("database", name) } /// Validate an object name for path-safe catalog use. - pub fn validate_object_name(name: &str) -> Result<()> { + pub(crate) fn validate_object_name(name: &str) -> Result<()> { validate_identifier_name("object", name) } @@ -303,7 +301,7 @@ mod tests { use super::*; #[test] - fn test_identifier_new_should_reject_path_control_names() { + fn test_identifier_validate_should_reject_path_control_names() { for (database, object) in [ ("", "table"), (" ", "table"), @@ -321,7 +319,7 @@ mod tests { ("db", "nested\\table"), ("db", "table\0name"), ] { - let result = Identifier::new(database, object); + let result = Identifier::new(database, object).validate(); assert!( matches!(result, Err(Error::IdentifierInvalid { .. })), "expected invalid identifier for database={database:?}, object={object:?}, got {result:?}" @@ -330,12 +328,14 @@ mod tests { } #[test] - fn test_identifier_new_should_allow_system_suffix_and_unicode_names() { - let identifier = Identifier::new("analytics", "orders$snapshots").unwrap(); + fn test_identifier_validate_should_allow_system_suffix_and_unicode_names() { + let identifier = Identifier::new("analytics", "orders$snapshots"); + identifier.validate().unwrap(); assert_eq!(identifier.database(), "analytics"); assert_eq!(identifier.object(), "orders$snapshots"); - let identifier = Identifier::new("数据", "订单").unwrap(); + let identifier = Identifier::new("数据", "订单"); + identifier.validate().unwrap(); assert_eq!(identifier.database(), "数据"); assert_eq!(identifier.object(), "订单"); } diff --git a/crates/paimon/src/table/cow_writer.rs b/crates/paimon/src/table/cow_writer.rs index 2c2111ea..c2b5e958 100644 --- a/crates/paimon/src/table/cow_writer.rs +++ b/crates/paimon/src/table/cow_writer.rs @@ -531,7 +531,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_cow").unwrap(), + Identifier::new("default", "test_cow"), table_path.to_string(), test_append_schema(), None, @@ -549,7 +549,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), "memory:/test".to_string(), TableSchema::new(0, &schema), None, @@ -573,7 +573,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), "memory:/test".to_string(), TableSchema::new(0, &schema), None, @@ -598,7 +598,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), "memory:/test".to_string(), TableSchema::new(0, &schema), None, diff --git a/crates/paimon/src/table/data_evolution_reader.rs b/crates/paimon/src/table/data_evolution_reader.rs index 05ee6e4a..487ed951 100644 --- a/crates/paimon/src/table/data_evolution_reader.rs +++ b/crates/paimon/src/table/data_evolution_reader.rs @@ -1354,7 +1354,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "blob_t").unwrap(), + Identifier::new("default", "blob_t"), table_path, table_schema, None, @@ -1447,7 +1447,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "blob_multi_t").unwrap(), + Identifier::new("default", "blob_multi_t"), table_path, table_schema, None, diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs index 6df4259d..fe9e2348 100644 --- a/crates/paimon/src/table/data_evolution_writer.rs +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -710,7 +710,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_de_table").unwrap(), + Identifier::new("default", "test_de_table"), table_path.to_string(), test_data_evolution_schema(), None, @@ -1055,7 +1055,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io, - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), "memory:/test".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 4dc076bc..3e7684d1 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -335,7 +335,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), "/tmp/test-read-builder".to_string(), table_schema, None, @@ -357,7 +357,7 @@ mod tests { ); Table::new( file_io, - Identifier::new("default", "partial_update_dv_t").unwrap(), + Identifier::new("default", "partial_update_dv_t"), "/tmp/test-partial-update-dv-read-builder".to_string(), table_schema, None, @@ -414,7 +414,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path, table_schema, None, @@ -473,7 +473,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path, table_schema, None, @@ -530,7 +530,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path, table_schema, None, @@ -590,7 +590,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), table_path, table_schema, None, @@ -641,7 +641,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), "/tmp/test".to_string(), table_schema, None, @@ -697,7 +697,7 @@ mod tests { ); let table = Table::new( file_io, - Identifier::new("default", "t").unwrap(), + Identifier::new("default", "t"), "/tmp/test".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 4f470d82..664b6542 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -1196,7 +1196,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_schema(), None, @@ -1206,7 +1206,7 @@ mod tests { fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_partitioned_schema(), None, @@ -1530,7 +1530,7 @@ mod tests { fn test_row_tracking_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_row_tracking_schema(), None, diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index ce1bb5f1..be0d2df4 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -913,7 +913,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); Table::new( file_io, - Identifier::new("test_db", "test_table").unwrap(), + Identifier::new("test_db", "test_table"), "/tmp/test-table".to_string(), table_schema, None, diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 41117c65..f34c124c 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -752,7 +752,7 @@ mod tests { fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_schema(), None, @@ -762,7 +762,7 @@ mod tests { fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_partitioned_schema(), None, @@ -993,7 +993,7 @@ mod tests { fn test_allows_append_blob_table() { let table = Table::new( test_file_io(), - Identifier::new("default", "test_blob_table").unwrap(), + Identifier::new("default", "test_blob_table"), "memory:/test_blob_table".to_string(), test_blob_table_schema(), None, @@ -1010,7 +1010,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_blob_table").unwrap(), + Identifier::new("default", "test_blob_table"), table_path.to_string(), test_blob_table_schema(), None, @@ -1072,7 +1072,7 @@ mod tests { fn test_allows_partial_update_fixed_bucket_table() { let table = Table::new( test_file_io(), - Identifier::new("default", "test_partial_update_table").unwrap(), + Identifier::new("default", "test_partial_update_table"), "memory:/test_partial_update_table".to_string(), TableSchema::new( 0, @@ -1099,7 +1099,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_partial_update_dynamic_bucket_table").unwrap(), + Identifier::new("default", "test_partial_update_dynamic_bucket_table"), table_path.to_string(), TableSchema::new( 0, @@ -1129,7 +1129,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_partial_update_dv_table").unwrap(), + Identifier::new("default", "test_partial_update_dv_table"), table_path.to_string(), TableSchema::new( 0, @@ -1276,7 +1276,7 @@ mod tests { fn test_bucketed_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), test_bucketed_schema(), None, @@ -1421,7 +1421,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), table_schema, None, @@ -1500,7 +1500,7 @@ mod tests { let table_schema = TableSchema::new(0, &schema); let table = Table::new( file_io.clone(), - Identifier::new("default", "test_table").unwrap(), + Identifier::new("default", "test_table"), table_path.to_string(), table_schema, None, @@ -1545,7 +1545,7 @@ mod tests { fn test_pk_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_pk_table").unwrap(), + Identifier::new("default", "test_pk_table"), table_path.to_string(), test_pk_schema(), None, @@ -1585,7 +1585,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog").unwrap(), + Identifier::new("default", "test_input_changelog"), table_path.to_string(), pk_changelog_schema(&[ ("changelog-producer", "input"), @@ -1643,7 +1643,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog").unwrap(), + Identifier::new("default", "test_input_changelog"), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1672,7 +1672,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog").unwrap(), + Identifier::new("default", "test_input_changelog"), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1698,7 +1698,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog").unwrap(), + Identifier::new("default", "test_input_changelog"), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -1766,7 +1766,7 @@ mod tests { let table = Table::new( file_io.clone(), - Identifier::new("default", "test_input_changelog_dynamic_bucket").unwrap(), + Identifier::new("default", "test_input_changelog_dynamic_bucket"), table_path.to_string(), ordinary_dynamic_pk_changelog_schema(), None, @@ -1846,7 +1846,7 @@ mod tests { let table = Table::new( file_io, - Identifier::new("default", "test_input_changelog").unwrap(), + Identifier::new("default", "test_input_changelog"), table_path.to_string(), pk_changelog_schema(&[("changelog-producer", "input")]), None, @@ -2058,7 +2058,7 @@ mod tests { fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_postpone_table").unwrap(), + Identifier::new("default", "test_postpone_table"), table_path.to_string(), test_postpone_pk_schema(), None, @@ -2081,7 +2081,7 @@ mod tests { fn test_postpone_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_postpone_table").unwrap(), + Identifier::new("default", "test_postpone_table"), table_path.to_string(), test_postpone_partitioned_schema(), None, @@ -2330,7 +2330,7 @@ mod tests { fn test_cross_partition_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), - Identifier::new("default", "test_cross_partition").unwrap(), + Identifier::new("default", "test_cross_partition"), table_path.to_string(), test_cross_partition_schema(), None, @@ -2361,7 +2361,7 @@ mod tests { .unwrap(); let table2 = Table::new( file_io.clone(), - Identifier::new("default", "test").unwrap(), + Identifier::new("default", "test"), table_path.to_string(), TableSchema::new(0, &schema), None, @@ -2388,7 +2388,7 @@ mod tests { .unwrap(); let table = Table::new( file_io, - Identifier::new("default", "test_cross_partial_update").unwrap(), + Identifier::new("default", "test_cross_partial_update"), table_path.to_string(), TableSchema::new(0, &schema), None, diff --git a/crates/paimon/tests/rest_api_test.rs b/crates/paimon/tests/rest_api_test.rs index 0130d0bd..74784b34 100644 --- a/crates/paimon/tests/rest_api_test.rs +++ b/crates/paimon/tests/rest_api_test.rs @@ -254,7 +254,7 @@ async fn test_list_tables_and_get_table() { // Get table let table_resp = ctx .api - .get_table(&Identifier::new("default", "table1").unwrap()) + .get_table(&Identifier::new("default", "table1")) .await .unwrap(); assert_eq!(table_resp.id.unwrap_or_default(), "table1"); @@ -266,7 +266,7 @@ async fn test_get_table_not_found() { let result = ctx .api - .get_table(&Identifier::new("default", "non_existent_table").unwrap()) + .get_table(&Identifier::new("default", "non_existent_table")) .await; assert!(result.is_err(), "getting non-existent table should fail"); } @@ -320,7 +320,7 @@ async fn test_create_table() { let result = ctx .api - .create_table(&Identifier::new("default", "new_table").unwrap(), schema) + .create_table(&Identifier::new("default", "new_table"), schema) .await; assert!(result.is_ok(), "failed to create table: {result:?}"); @@ -331,7 +331,7 @@ async fn test_create_table() { // Get the table let table_resp = ctx .api - .get_table(&Identifier::new("default", "new_table").unwrap()) + .get_table(&Identifier::new("default", "new_table")) .await .unwrap(); assert_eq!(table_resp.name, Some("new_table".to_string())); @@ -351,7 +351,7 @@ async fn test_drop_table() { // Drop table let result = ctx .api - .drop_table(&Identifier::new("default", "table_to_drop").unwrap()) + .drop_table(&Identifier::new("default", "table_to_drop")) .await; assert!(result.is_ok(), "failed to drop table: {result:?}"); @@ -362,7 +362,7 @@ async fn test_drop_table() { // Dropping non-existent table should fail let result = ctx .api - .drop_table(&Identifier::new("default", "table_to_drop").unwrap()) + .drop_table(&Identifier::new("default", "table_to_drop")) .await; assert!(result.is_err(), "dropping non-existent table should fail"); } @@ -375,7 +375,7 @@ async fn test_drop_table_no_permission() { let result = ctx .api - .drop_table(&Identifier::new("default", "secret_table").unwrap()) + .drop_table(&Identifier::new("default", "secret_table")) .await; assert!(result.is_err(), "dropping no-permission table should fail"); } @@ -393,8 +393,8 @@ async fn test_rename_table() { let result = ctx .api .rename_table( - &Identifier::new("default", "old_table").unwrap(), - &Identifier::new("default", "new_table").unwrap(), + &Identifier::new("default", "old_table"), + &Identifier::new("default", "new_table"), ) .await; assert!(result.is_ok(), "failed to rename table: {result:?}"); @@ -409,7 +409,7 @@ async fn test_rename_table() { // Get the renamed table let table_resp = ctx .api - .get_table(&Identifier::new("default", "new_table").unwrap()) + .get_table(&Identifier::new("default", "new_table")) .await .unwrap(); assert_eq!(table_resp.name, Some("new_table".to_string())); diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index da4f538b..2166b44d 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -249,7 +249,7 @@ async fn test_catalog_get_table() { "file:///tmp/test_warehouse/default.db/my_table", ); - let identifier = Identifier::new("default", "my_table").unwrap(); + let identifier = Identifier::new("default", "my_table"); let table = ctx.catalog.get_table(&identifier).await; assert!(table.is_ok(), "failed to get table: {table:?}"); } @@ -258,7 +258,7 @@ async fn test_catalog_get_table() { async fn test_catalog_get_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent").unwrap(); + let identifier = Identifier::new("default", "non_existent"); let result = ctx.catalog.get_table(&identifier).await; assert!(result.is_err(), "getting non-existent table should fail"); } @@ -304,7 +304,7 @@ async fn test_catalog_get_table_propagates_oss_options_in_else_branch() { "oss://test-bucket/warehouse/default.db/oss_table", ); - let identifier = Identifier::new("default", "oss_table").unwrap(); + let identifier = Identifier::new("default", "oss_table"); let result = catalog.get_table(&identifier).await; assert!( result.is_ok(), @@ -318,7 +318,7 @@ async fn test_catalog_create_table() { let ctx = setup_catalog(vec!["default"]).await; let schema = test_schema(); - let identifier = Identifier::new("default", "new_table").unwrap(); + let identifier = Identifier::new("default", "new_table"); let result = ctx.catalog.create_table(&identifier, schema, false).await; assert!(result.is_ok(), "failed to create table: {result:?}"); @@ -336,7 +336,7 @@ async fn test_catalog_create_table_already_exists() { ctx.server.add_table("default", "existing_table"); let schema = test_schema(); - let identifier = Identifier::new("default", "existing_table").unwrap(); + let identifier = Identifier::new("default", "existing_table"); // Create with ignore_if_exists=false should fail let result = ctx.catalog.create_table(&identifier, schema, false).await; @@ -354,7 +354,7 @@ async fn test_catalog_create_table_ignore_if_exists() { ctx.server.add_table("default", "existing_table"); let schema = test_schema(); - let identifier = Identifier::new("default", "existing_table").unwrap(); + let identifier = Identifier::new("default", "existing_table"); // Create with ignore_if_exists=true should succeed let result = ctx.catalog.create_table(&identifier, schema, true).await; @@ -371,7 +371,7 @@ async fn test_catalog_drop_table() { // Add a table ctx.server.add_table("default", "table_to_drop"); - let identifier = Identifier::new("default", "table_to_drop").unwrap(); + let identifier = Identifier::new("default", "table_to_drop"); // Drop table let result = ctx.catalog.drop_table(&identifier, false).await; @@ -386,7 +386,7 @@ async fn test_catalog_drop_table() { async fn test_catalog_drop_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent").unwrap(); + let identifier = Identifier::new("default", "non_existent"); // Drop with ignore_if_not_exists=false should fail let result = ctx.catalog.drop_table(&identifier, false).await; @@ -400,7 +400,7 @@ async fn test_catalog_drop_table_not_found() { async fn test_catalog_drop_table_ignore_if_not_exists() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "non_existent").unwrap(); + let identifier = Identifier::new("default", "non_existent"); // Drop with ignore_if_not_exists=true should succeed let result = ctx.catalog.drop_table(&identifier, true).await; @@ -419,8 +419,8 @@ async fn test_catalog_rename_table() { // Add a table ctx.server.add_table("default", "old_table"); - let from = Identifier::new("default", "old_table").unwrap(); - let to = Identifier::new("default", "new_table").unwrap(); + let from = Identifier::new("default", "old_table"); + let to = Identifier::new("default", "new_table"); // Rename table let result = ctx.catalog.rename_table(&from, &to, false).await; @@ -436,8 +436,8 @@ async fn test_catalog_rename_table() { async fn test_catalog_rename_table_not_found() { let ctx = setup_catalog(vec!["default"]).await; - let from = Identifier::new("default", "non_existent").unwrap(); - let to = Identifier::new("default", "new_name").unwrap(); + let from = Identifier::new("default", "non_existent"); + let to = Identifier::new("default", "new_name"); // Rename with ignore_if_not_exists=false should fail let result = ctx.catalog.rename_table(&from, &to, false).await; @@ -451,8 +451,8 @@ async fn test_catalog_rename_table_not_found() { async fn test_catalog_rename_table_ignore_if_not_exists() { let ctx = setup_catalog(vec!["default"]).await; - let from = Identifier::new("default", "non_existent").unwrap(); - let to = Identifier::new("default", "new_name").unwrap(); + let from = Identifier::new("default", "non_existent"); + let to = Identifier::new("default", "new_name"); // Rename with ignore_if_not_exists=true should succeed let result = ctx.catalog.rename_table(&from, &to, true).await; @@ -468,7 +468,7 @@ async fn test_catalog_rename_table_ignore_if_not_exists() { async fn test_catalog_alter_table_unsupported() { let ctx = setup_catalog(vec!["default"]).await; - let identifier = Identifier::new("default", "some_table").unwrap(); + let identifier = Identifier::new("default", "some_table"); // alter_table should return Unsupported error let result = ctx.catalog.alter_table(&identifier, vec![], false).await;