diff --git a/bindings/python/python/pypaimon_rust/datafusion.pyi b/bindings/python/python/pypaimon_rust/datafusion.pyi index 172c1fe8..66562937 100644 --- a/bindings/python/python/pypaimon_rust/datafusion.pyi +++ b/bindings/python/python/pypaimon_rust/datafusion.pyi @@ -36,10 +36,59 @@ class TableSchema: def options(self) -> Dict[str, str]: ... def comment(self) -> Optional[str]: ... +class Snapshot: + def id(self) -> int: ... + def commit_time_ms(self) -> int: ... + def total_record_count(self) -> Optional[int]: ... + def delta_record_count(self) -> Optional[int]: ... + def commit_kind(self) -> str: ... + +class Tag: + def name(self) -> str: ... + def snapshot_id(self) -> int: ... + +class PartitionStat: + def partition(self) -> Dict[str, str]: ... + def record_count(self) -> int: ... + def file_count(self) -> int: ... + def total_size_bytes(self) -> int: ... + class Table: def identifier(self) -> str: ... def location(self) -> str: ... def schema(self) -> TableSchema: ... + def latest_snapshot(self) -> Optional[Snapshot]: + """ + Warning: This method blocks on a DataFusion runtime. + Calling this from an active asyncio event loop will result in a panic. + """ + ... + def list_snapshots(self) -> List[Snapshot]: + """ + Returns all snapshots ordered newest first (descending by ID). + + Warning: This method blocks on a DataFusion runtime. + Calling this from an active asyncio event loop will result in a panic. + """ + ... + def list_tags(self) -> List[Tag]: + """ + Warning: This method blocks on a DataFusion runtime. + Calling this from an active asyncio event loop will result in a panic. + """ + ... + def list_partitions(self) -> List[Dict[str, str]]: + """ + Warning: This method blocks on a DataFusion runtime. + Calling this from an active asyncio event loop will result in a panic. + """ + ... + def partition_stats(self) -> List[PartitionStat]: + """ + Warning: This method blocks on a DataFusion runtime. + Calling this from an active asyncio event loop will result in a panic. + """ + ... class PaimonCatalog: def __init__(self, catalog_options: Dict[str, str]) -> None: ... diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index cc855ee8..12b9bbdb 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -262,6 +262,9 @@ pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> this.add_class::()?; this.add_class::()?; this.add_function(wrap_pyfunction!(udf, &this)?)?; + this.add_class::()?; + this.add_class::()?; + this.add_class::()?; m.add_submodule(&this)?; py.import("sys")? .getattr("modules")? diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index d0d2002b..0cb97f8f 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -23,6 +23,10 @@ mod error; mod schema; mod table; mod udf; +// ---- #285: observability ---- +mod partition; +mod snapshot; +mod tag; #[pymodule] fn pypaimon_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { diff --git a/bindings/python/src/partition.rs b/bindings/python/src/partition.rs new file mode 100644 index 00000000..f9624438 --- /dev/null +++ b/bindings/python/src/partition.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use paimon::table::PartitionStat; +use pyo3::prelude::*; + +#[pyclass(name = "PartitionStat", module = "pypaimon_rust.datafusion")] +pub struct PyPartitionStat { + inner: PartitionStat, +} + +impl From for PyPartitionStat { + fn from(inner: PartitionStat) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyPartitionStat { + fn partition(&self) -> HashMap { + self.inner.partition.clone() + } + + fn record_count(&self) -> i64 { + self.inner.record_count + } + + fn file_count(&self) -> u64 { + self.inner.file_count + } + + fn total_size_bytes(&self) -> u64 { + self.inner.total_size_bytes + } +} diff --git a/bindings/python/src/snapshot.rs b/bindings/python/src/snapshot.rs new file mode 100644 index 00000000..58505188 --- /dev/null +++ b/bindings/python/src/snapshot.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use paimon::spec::Snapshot; +use pyo3::prelude::*; + +#[pyclass(name = "Snapshot", module = "pypaimon_rust.datafusion")] +pub struct PySnapshot { + inner: Snapshot, +} + +impl PySnapshot { + pub fn new(inner: Snapshot) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PySnapshot { + fn id(&self) -> i64 { + self.inner.id() + } + + fn commit_time_ms(&self) -> u64 { + self.inner.time_millis() + } + + fn total_record_count(&self) -> Option { + self.inner.total_record_count() + } + + fn delta_record_count(&self) -> Option { + self.inner.delta_record_count() + } + + fn commit_kind(&self) -> String { + self.inner.commit_kind().to_string() + } +} diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 0a0c35c9..e15c89d5 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -15,11 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; +use paimon::table::{SnapshotManager, TagManager}; +use paimon_datafusion::runtime::runtime; use pyo3::prelude::*; +use crate::error::to_py_err; +use crate::partition::PyPartitionStat; use crate::schema::PyTableSchema; +use crate::snapshot::PySnapshot; +use crate::tag::PyTag; #[pyclass(name = "Table", module = "pypaimon_rust.datafusion")] pub struct PyTable { @@ -34,6 +41,7 @@ impl PyTable { #[pymethods] impl PyTable { + // ---------------- #284: catalog metadata ---------------- fn identifier(&self) -> String { let id = self.inner.identifier(); format!("{}.{}", id.database(), id.object()) @@ -46,4 +54,51 @@ impl PyTable { fn schema(&self) -> PyTableSchema { PyTableSchema::new(self.inner.schema().clone()) } + + // ---------------- #285: observability ---------------- + fn latest_snapshot(&self) -> PyResult> { + let sm = SnapshotManager::new( + self.inner.file_io().clone(), + self.inner.location().to_string(), + ); + let snap = runtime() + .block_on(sm.get_latest_snapshot()) + .map_err(to_py_err)?; + Ok(snap.map(PySnapshot::new)) + } + + fn list_snapshots(&self) -> PyResult> { + let sm = SnapshotManager::new( + self.inner.file_io().clone(), + self.inner.location().to_string(), + ); + let snaps = runtime().block_on(sm.list_all()).map_err(to_py_err)?; + Ok(snaps.into_iter().rev().map(PySnapshot::new).collect()) + } + + fn list_tags(&self) -> PyResult> { + let tm = TagManager::new( + self.inner.file_io().clone(), + self.inner.location().to_string(), + ); + let tags = runtime().block_on(tm.list_all()).map_err(to_py_err)?; + Ok(tags + .into_iter() + .map(|(name, snap)| PyTag::new(name, snap.id())) + .collect()) + } + + fn list_partitions(&self) -> PyResult>> { + let stats = runtime() + .block_on(self.inner.partition_stats()) + .map_err(to_py_err)?; + Ok(stats.into_iter().map(|s| s.partition).collect()) + } + + fn partition_stats(&self) -> PyResult> { + let stats = runtime() + .block_on(self.inner.partition_stats()) + .map_err(to_py_err)?; + Ok(stats.into_iter().map(PyPartitionStat::from).collect()) + } } diff --git a/bindings/python/src/tag.rs b/bindings/python/src/tag.rs new file mode 100644 index 00000000..c679dd01 --- /dev/null +++ b/bindings/python/src/tag.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; + +#[pyclass(name = "Tag", module = "pypaimon_rust.datafusion")] +pub struct PyTag { + name: String, + snapshot_id: i64, +} + +impl PyTag { + pub fn new(name: String, snapshot_id: i64) -> Self { + Self { name, snapshot_id } + } +} + +#[pymethods] +impl PyTag { + fn name(&self) -> String { + self.name.clone() + } + + fn snapshot_id(&self) -> i64 { + self.snapshot_id + } +} diff --git a/bindings/python/tests/test_datafusion.py b/bindings/python/tests/test_datafusion.py index 236b130f..6cffd814 100644 --- a/bindings/python/tests/test_datafusion.py +++ b/bindings/python/tests/test_datafusion.py @@ -685,3 +685,40 @@ def test_list_databases_and_tables(): # simple_log_table is non-partitioned, so partition keys are empty. assert schema.partition_keys() == [] +# ---------------- #285: observability ---------------- +def test_snapshots_for_simple_table(): + catalog = PaimonCatalog({"warehouse": WAREHOUSE}) + table = catalog.get_table("default.simple_log_table") + + snap = table.latest_snapshot() + assert snap is not None + assert snap.id() >= 1 + assert snap.commit_time_ms() > 0 + assert snap.commit_kind() in {"APPEND", "COMPACT", "OVERWRITE", "ANALYZE"} + + snaps = table.list_snapshots() + assert len(snaps) >= 1 + # Newest first. + assert snaps[0].id() == snap.id() + + +def test_partitions_and_tags_smoke(): + catalog = PaimonCatalog({"warehouse": WAREHOUSE}) + table = catalog.get_table("default.simple_log_table") + + # Non-partitioned, non-tagged table: both should be empty but well-typed. + parts = table.list_partitions() + stats = table.partition_stats() + tags = table.list_tags() + + assert isinstance(parts, list) + assert isinstance(stats, list) + assert isinstance(tags, list) + # simple_log_table has no partition keys -> partition_stats yields a single + # empty-partition bucket or zero buckets depending on how the snapshot was + # written. Either is acceptable; we just check the shape. + for p in parts: + assert isinstance(p, dict) + for t in tags: + assert isinstance(t.name(), str) + assert isinstance(t.snapshot_id(), int) diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c425a9eb..73482637 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -38,6 +38,7 @@ pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; mod partition_filter; +mod partition_stat; mod postpone_file_writer; mod prepared_files; mod read_builder; @@ -67,6 +68,7 @@ pub use data_evolution_writer::DataEvolutionWriter; #[cfg(feature = "fulltext")] pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; +pub use partition_stat::PartitionStat; pub use read_builder::ReadBuilder; pub use rest_env::RESTEnv; pub use schema_manager::SchemaManager; diff --git a/crates/paimon/src/table/partition_stat.rs b/crates/paimon/src/table/partition_stat.rs new file mode 100644 index 00000000..893ce092 --- /dev/null +++ b/crates/paimon/src/table/partition_stat.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-partition statistics computed by scanning the latest snapshot's manifest entries. +//! +//! Mirrors what Java Paimon exposes via the `$partitions` system table for runtime introspection. + +use std::collections::HashMap; + +use crate::io::FileIO; +use crate::spec::{ + avro::from_avro_bytes_fast, BinaryRow, DataField, DataType, FileKind, ManifestEntry, + ManifestFileMeta, Snapshot, +}; +use crate::table::SnapshotManager; +use crate::table::Table; + +const MANIFEST_DIR: &str = "manifest"; + +/// Per-partition aggregated statistics. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionStat { + /// Partition key/value mapping (e.g. `{"dt": "2024-01-01", "hr": "10"}`). + pub partition: HashMap, + /// Net record count (added rows minus deleted rows) across all live data files. + pub record_count: i64, + /// Net data file count (additions minus deletions). + pub file_count: u64, + /// Net total bytes for live data files. + pub total_size_bytes: u64, +} + +#[derive(Default)] +struct Accum { + record_count: i64, + file_count: i64, + total_size_bytes: i64, +} + +impl Table { + /// Compute per-partition statistics from the latest snapshot. + /// + /// **Warning:** This method reads all manifest lists and entries from the latest snapshot. + /// For tables with a large number of manifests, this operation can be expensive. + /// + /// Returns an empty Vec when the table has no snapshots yet. + pub async fn partition_stats(&self) -> crate::Result> { + let sm = SnapshotManager::new(self.file_io().clone(), self.location().to_string()); + let snapshot = match sm.get_latest_snapshot().await? { + Some(s) => s, + None => return Ok(Vec::new()), + }; + + let entries = read_all_manifest_entries(self.file_io(), self.location(), &snapshot).await?; + let partition_keys = self.schema().partition_keys(); + let partition_fields = self.schema().partition_fields(); + + aggregate_partition_stats(&entries, partition_keys, &partition_fields) + } + + /// List all partition values present in the latest snapshot. + /// + /// **Warning:** This method computes partition statistics which reads all manifest lists + /// and entries. For large tables, this operation can be expensive. + /// + /// Returns an empty Vec when the table has no snapshots yet. + pub async fn list_partitions(&self) -> crate::Result>> { + Ok(self + .partition_stats() + .await? + .into_iter() + .map(|s| s.partition) + .collect()) + } +} + +async fn read_manifest_list( + file_io: &FileIO, + table_path: &str, + list_name: &str, +) -> crate::Result> { + if list_name.is_empty() { + return Ok(Vec::new()); + } + let path = format!( + "{}/{}/{}", + table_path.trim_end_matches('/'), + MANIFEST_DIR, + list_name + ); + let input = file_io.new_input(&path)?; + let bytes = input.read().await?; + from_avro_bytes_fast::(&bytes) +} + +async fn read_all_manifest_entries( + file_io: &FileIO, + table_path: &str, + snapshot: &Snapshot, +) -> crate::Result> { + let mut metas = read_manifest_list(file_io, table_path, snapshot.base_manifest_list()).await?; + metas.extend(read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()).await?); + + let manifest_dir = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR); + let mut all_entries = Vec::new(); + for meta in metas { + let path = format!("{}/{}", manifest_dir, meta.file_name()); + let input = file_io.new_input(&path)?; + let bytes = input.read().await?; + let entries = from_avro_bytes_fast::(&bytes)?; + all_entries.extend(entries); + } + Ok(all_entries) +} + +fn aggregate_partition_stats( + entries: &[ManifestEntry], + partition_keys: &[String], + partition_fields: &[DataField], +) -> crate::Result> { + let mut grouped: HashMap, Accum> = HashMap::new(); + for entry in entries { + let bucket = grouped.entry(entry.partition().to_vec()).or_default(); + let file = entry.file(); + let live_rows = file.row_count - file.delete_row_count.unwrap_or(0); + let sign: i64 = match entry.kind() { + FileKind::Add => 1, + FileKind::Delete => -1, + }; + bucket.record_count += sign * live_rows; + bucket.file_count += sign; + bucket.total_size_bytes += sign * file.file_size; + } + + let mut out = Vec::with_capacity(grouped.len()); + for (partition_bytes, accum) in grouped { + if accum.file_count <= 0 { + // Partition has been fully deleted in this snapshot. + continue; + } + let partition = decode_partition(&partition_bytes, partition_keys, partition_fields)?; + out.push(PartitionStat { + partition, + record_count: accum.record_count.max(0), + file_count: accum.file_count.max(0) as u64, + total_size_bytes: accum.total_size_bytes.max(0) as u64, + }); + } + Ok(out) +} + +fn decode_partition( + bytes: &[u8], + keys: &[String], + fields: &[DataField], +) -> crate::Result> { + let mut map = HashMap::new(); + if keys.is_empty() { + return Ok(map); + } + let row = BinaryRow::from_serialized_bytes(bytes)?; + for (i, key) in keys.iter().enumerate() { + let dt = fields + .iter() + .find(|f| f.name() == key) + .map(|f| f.data_type()); + let value = if row.is_null_at(i) { + "null".to_string() + } else { + partition_value_to_string(&row, i, dt) + }; + map.insert(key.clone(), value); + } + Ok(map) +} + +fn partition_value_to_string(row: &BinaryRow, pos: usize, dt: Option<&DataType>) -> String { + let pos_i = pos; + match dt { + Some(DataType::TinyInt(_)) => row.get_byte(pos_i).map(|v| v.to_string()), + Some(DataType::SmallInt(_)) => row.get_short(pos_i).map(|v| v.to_string()), + Some(DataType::Int(_)) | Some(DataType::Date(_)) => { + row.get_int(pos_i).map(|v| v.to_string()) + } + Some(DataType::BigInt(_)) => row.get_long(pos_i).map(|v| v.to_string()), + Some(DataType::Boolean(_)) => row.get_boolean(pos_i).map(|v| v.to_string()), + Some(DataType::Float(_)) => row.get_float(pos_i).map(|v| v.to_string()), + Some(DataType::Double(_)) => row.get_double(pos_i).map(|v| v.to_string()), + Some(DataType::Char(_)) | Some(DataType::VarChar(_)) => { + row.get_string(pos_i).map(|v| v.to_string()) + } + Some(DataType::Binary(_)) | Some(DataType::VarBinary(_)) => { + row.get_binary(pos_i).map(hex::encode) + } + _ => row.get_string(pos_i).map(|v| v.to_string()), + } + .unwrap_or_else(|_| "?".to_string()) +}