Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions crates/paimon/src/catalog/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ impl Catalog for FileSystemCatalog {
ignore_if_exists: bool,
properties: HashMap<String, String>,
) -> Result<()> {
Identifier::validate_database_name(name)?;

if properties.contains_key(DB_LOCATION_PROP) {
return Err(Error::ConfigInvalid {
message: "Cannot specify location for a database when using fileSystem catalog."
Expand All @@ -239,6 +241,8 @@ impl Catalog for FileSystemCatalog {
}

async fn get_database(&self, name: &str) -> Result<Database> {
Identifier::validate_database_name(name)?;

if !self.database_exists(name).await? {
return Err(Error::DatabaseNotExist {
database: name.to_string(),
Expand All @@ -254,6 +258,8 @@ impl Catalog for FileSystemCatalog {
ignore_if_not_exists: bool,
cascade: bool,
) -> Result<()> {
Identifier::validate_database_name(name)?;

let path = self.database_path(name);

let database_exists = self.database_exists(name).await?;
Expand All @@ -279,6 +285,8 @@ impl Catalog for FileSystemCatalog {
}

async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
identifier.validate()?;

let table_path = self.table_path(identifier);

if !self.table_exists(identifier).await? {
Expand All @@ -304,6 +312,8 @@ impl Catalog for FileSystemCatalog {
}

async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
Identifier::validate_database_name(database_name)?;

let path = self.database_path(database_name);

if !self.database_exists(database_name).await? {
Expand All @@ -321,6 +331,8 @@ impl Catalog for FileSystemCatalog {
creation: Schema,
ignore_if_exists: bool,
) -> Result<()> {
identifier.validate()?;

let table_path = self.table_path(identifier);

let table_exists = self.table_exists(identifier).await?;
Expand Down Expand Up @@ -348,6 +360,8 @@ impl Catalog for FileSystemCatalog {
}

async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()> {
identifier.validate()?;

let table_path = self.table_path(identifier);

let table_exists = self.table_exists(identifier).await?;
Expand All @@ -372,6 +386,9 @@ impl Catalog for FileSystemCatalog {
to: &Identifier,
ignore_if_not_exists: bool,
) -> Result<()> {
from.validate()?;
to.validate()?;

let from_path = self.table_path(from);
let to_path = self.table_path(to);

Expand Down Expand Up @@ -403,6 +420,8 @@ impl Catalog for FileSystemCatalog {
changes: Vec<crate::spec::SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()> {
identifier.validate()?;

let table_path = self.table_path(identifier);
if !self.table_exists(identifier).await? {
if ignore_if_not_exists {
Expand Down Expand Up @@ -511,6 +530,31 @@ mod tests {
assert!(!catalog.database_exists("db1").await.unwrap());
}

#[tokio::test]
async fn test_create_database_should_reject_path_traversal_name() {
let (temp_dir, catalog) = create_test_catalog();
let escaped_path = temp_dir.path().join("escaped.db");

let result = catalog
.create_database("../escaped", false, HashMap::new())
.await;

assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
assert!(!escaped_path.exists());
}

#[tokio::test]
async fn test_drop_database_should_reject_path_traversal_name() {
let (temp_dir, catalog) = create_test_catalog();
let escaped_path = temp_dir.path().join("escaped.db");
std::fs::create_dir(&escaped_path).unwrap();

let result = catalog.drop_database("../escaped", false, true).await;

assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
assert!(escaped_path.exists());
}

#[tokio::test]
async fn test_table_operations() {
let (_temp_dir, catalog) = create_test_catalog();
Expand Down Expand Up @@ -568,6 +612,54 @@ mod tests {
assert!(!tables.contains(&"table1".to_string()));
}

#[tokio::test]
async fn test_create_table_should_reject_path_traversal_object_name() {
let (temp_dir, catalog) = create_test_catalog();
catalog
.create_database("db1", false, HashMap::new())
.await
.unwrap();
let escaped_path = temp_dir.path().join("table_escape");

let result = catalog
.create_table(
&Identifier::new("db1", "../../table_escape"),
testing_schema(),
false,
)
.await;

assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
assert!(!escaped_path.exists());
}

#[tokio::test]
async fn test_rename_table_should_reject_path_traversal_target_name() {
let (temp_dir, catalog) = create_test_catalog();
catalog
.create_database("db1", false, HashMap::new())
.await
.unwrap();
let source = Identifier::new("db1", "source");
catalog
.create_table(&source, testing_schema(), false)
.await
.unwrap();
let escaped_path = temp_dir.path().join("renamed_escape");

let result = catalog
.rename_table(
&source,
&Identifier::new("db1", "../../renamed_escape"),
false,
)
.await;

assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
assert!(!escaped_path.exists());
assert!(catalog.get_table(&source).await.is_ok());
}

#[tokio::test]
async fn test_list_partitions_default_table_not_found_errors() {
let (_temp_dir, catalog) = create_test_catalog();
Expand Down
85 changes: 84 additions & 1 deletion crates/paimon/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod rest;
use std::collections::HashMap;
use std::fmt;

use crate::{Error, Result};
pub use database::*;
pub use factory::*;
pub use filesystem::*;
Expand Down Expand Up @@ -76,6 +77,22 @@ impl Identifier {
}
}

/// Validate this identifier's database and object names.
pub(crate) fn validate(&self) -> Result<()> {
Self::validate_database_name(&self.database)?;
Self::validate_object_name(&self.object)
}

/// Validate a database name for path-safe catalog use.
pub(crate) fn validate_database_name(name: &str) -> Result<()> {
validate_identifier_name("database", name)
}

/// Validate an object name for path-safe catalog use.
pub(crate) fn validate_object_name(name: &str) -> Result<()> {
validate_identifier_name("object", name)
}

/// Database name.
pub fn database(&self) -> &str {
&self.database
Expand All @@ -97,6 +114,28 @@ impl Identifier {
}
}

fn validate_identifier_name(kind: &str, name: &str) -> Result<()> {
let invalid = if name.trim().is_empty() {
Some("cannot be empty or whitespace")
} else if matches!(name, "." | "..") {
Some("cannot be '.' or '..'")
} else if name.contains('/') || name.contains('\\') {
Some("cannot contain path separators")
} else if name.chars().any(char::is_control) {
Some("cannot contain control characters")
} else {
None
};

if let Some(reason) = invalid {
return Err(Error::IdentifierInvalid {
message: format!("{kind} name {reason}: {name:?}"),
});
}

Ok(())
}

impl fmt::Display for Identifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.full_name())
Expand All @@ -119,7 +158,6 @@ use async_trait::async_trait;
use crate::api::PagedList;
use crate::spec::{Partition, Schema, SchemaChange};
use crate::table::Table;
use crate::Result;

/// Catalog API for reading and writing metadata (databases, tables) in Paimon.
///
Expand Down Expand Up @@ -257,3 +295,48 @@ pub trait Catalog: Send + Sync {
))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_identifier_validate_should_reject_path_control_names() {
for (database, object) in [
("", "table"),
(" ", "table"),
(".", "table"),
("..", "table"),
("../escaped", "table"),
("db\\escaped", "table"),
("db\nescaped", "table"),
("db", ""),
("db", " "),
("db", "."),
("db", ".."),
("db", "../escaped"),
("db", "nested/table"),
("db", "nested\\table"),
("db", "table\0name"),
] {
let result = Identifier::new(database, object).validate();
assert!(
matches!(result, Err(Error::IdentifierInvalid { .. })),
"expected invalid identifier for database={database:?}, object={object:?}, got {result:?}"
);
}
}

#[test]
fn test_identifier_validate_should_allow_system_suffix_and_unicode_names() {
let identifier = Identifier::new("analytics", "orders$snapshots");
identifier.validate().unwrap();
assert_eq!(identifier.database(), "analytics");
assert_eq!(identifier.object(), "orders$snapshots");

let identifier = Identifier::new("数据", "订单");
identifier.validate().unwrap();
assert_eq!(identifier.database(), "数据");
assert_eq!(identifier.object(), "订单");
}
}
Loading