Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions bindings/python/python/pypaimon_rust/datafusion.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,59 @@ class TableSchema:
def options(self) -> Dict[str, str]: ...
def comment(self) -> Optional[str]: ...

class Snapshot:
def id(self) -> int: ...
def commit_time_ms(self) -> int: ...
def total_record_count(self) -> Optional[int]: ...
def delta_record_count(self) -> Optional[int]: ...
def commit_kind(self) -> str: ...

class Tag:
def name(self) -> str: ...
def snapshot_id(self) -> int: ...

class PartitionStat:
def partition(self) -> Dict[str, str]: ...
def record_count(self) -> int: ...
def file_count(self) -> int: ...
def total_size_bytes(self) -> int: ...

class Table:
def identifier(self) -> str: ...
def location(self) -> str: ...
def schema(self) -> TableSchema: ...
def latest_snapshot(self) -> Optional[Snapshot]:
"""
Warning: This method blocks on a DataFusion runtime.
Calling this from an active asyncio event loop will result in a panic.
"""
...
def list_snapshots(self) -> List[Snapshot]:
"""
Returns all snapshots ordered newest first (descending by ID).

Warning: This method blocks on a DataFusion runtime.
Calling this from an active asyncio event loop will result in a panic.
"""
...
def list_tags(self) -> List[Tag]:
"""
Warning: This method blocks on a DataFusion runtime.
Calling this from an active asyncio event loop will result in a panic.
"""
...
def list_partitions(self) -> List[Dict[str, str]]:
"""
Warning: This method blocks on a DataFusion runtime.
Calling this from an active asyncio event loop will result in a panic.
"""
...
def partition_stats(self) -> List[PartitionStat]:
"""
Warning: This method blocks on a DataFusion runtime.
Calling this from an active asyncio event loop will result in a panic.
"""
...

class PaimonCatalog:
def __init__(self, catalog_options: Dict[str, str]) -> None: ...
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()>
this.add_class::<PyPythonScalarUDFObject>()?;
this.add_class::<PySQLContext>()?;
this.add_function(wrap_pyfunction!(udf, &this)?)?;
this.add_class::<crate::snapshot::PySnapshot>()?;
this.add_class::<crate::tag::PyTag>()?;
this.add_class::<crate::partition::PyPartitionStat>()?;
m.add_submodule(&this)?;
py.import("sys")?
.getattr("modules")?
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ mod error;
mod schema;
mod table;
mod udf;
// ---- #285: observability ----
mod partition;
mod snapshot;
mod tag;

#[pymodule]
fn pypaimon_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
Expand Down
51 changes: 51 additions & 0 deletions bindings/python/src/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use paimon::table::PartitionStat;
use pyo3::prelude::*;

#[pyclass(name = "PartitionStat", module = "pypaimon_rust.datafusion")]
pub struct PyPartitionStat {
inner: PartitionStat,
}

impl From<PartitionStat> for PyPartitionStat {
fn from(inner: PartitionStat) -> Self {
Self { inner }
}
}

#[pymethods]
impl PyPartitionStat {
fn partition(&self) -> HashMap<String, String> {
self.inner.partition.clone()
}

fn record_count(&self) -> i64 {
self.inner.record_count
}

fn file_count(&self) -> u64 {
self.inner.file_count
}

fn total_size_bytes(&self) -> u64 {
self.inner.total_size_bytes
}
}
53 changes: 53 additions & 0 deletions bindings/python/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use paimon::spec::Snapshot;
use pyo3::prelude::*;

#[pyclass(name = "Snapshot", module = "pypaimon_rust.datafusion")]
pub struct PySnapshot {
inner: Snapshot,
}

impl PySnapshot {
pub fn new(inner: Snapshot) -> Self {
Self { inner }
}
}

#[pymethods]
impl PySnapshot {
fn id(&self) -> i64 {
self.inner.id()
}

fn commit_time_ms(&self) -> u64 {
self.inner.time_millis()
}

fn total_record_count(&self) -> Option<i64> {
self.inner.total_record_count()
}

fn delta_record_count(&self) -> Option<i64> {
self.inner.delta_record_count()
}

fn commit_kind(&self) -> String {
self.inner.commit_kind().to_string()
}
}
55 changes: 55 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use paimon::table::{SnapshotManager, TagManager};
use paimon_datafusion::runtime::runtime;
use pyo3::prelude::*;

use crate::error::to_py_err;
use crate::partition::PyPartitionStat;
use crate::schema::PyTableSchema;
use crate::snapshot::PySnapshot;
use crate::tag::PyTag;

#[pyclass(name = "Table", module = "pypaimon_rust.datafusion")]
pub struct PyTable {
Expand All @@ -34,6 +41,7 @@ impl PyTable {

#[pymethods]
impl PyTable {
// ---------------- #284: catalog metadata ----------------
fn identifier(&self) -> String {
let id = self.inner.identifier();
format!("{}.{}", id.database(), id.object())
Expand All @@ -46,4 +54,51 @@ impl PyTable {
fn schema(&self) -> PyTableSchema {
PyTableSchema::new(self.inner.schema().clone())
}

// ---------------- #285: observability ----------------
fn latest_snapshot(&self) -> PyResult<Option<PySnapshot>> {
let sm = SnapshotManager::new(
self.inner.file_io().clone(),
self.inner.location().to_string(),
);
let snap = runtime()
.block_on(sm.get_latest_snapshot())
.map_err(to_py_err)?;
Ok(snap.map(PySnapshot::new))
}

fn list_snapshots(&self) -> PyResult<Vec<PySnapshot>> {
let sm = SnapshotManager::new(
self.inner.file_io().clone(),
self.inner.location().to_string(),
);
let snaps = runtime().block_on(sm.list_all()).map_err(to_py_err)?;
Ok(snaps.into_iter().rev().map(PySnapshot::new).collect())
}

fn list_tags(&self) -> PyResult<Vec<PyTag>> {
let tm = TagManager::new(
self.inner.file_io().clone(),
self.inner.location().to_string(),
);
let tags = runtime().block_on(tm.list_all()).map_err(to_py_err)?;
Ok(tags
.into_iter()
.map(|(name, snap)| PyTag::new(name, snap.id()))
.collect())
}

fn list_partitions(&self) -> PyResult<Vec<HashMap<String, String>>> {
let stats = runtime()
.block_on(self.inner.partition_stats())
.map_err(to_py_err)?;
Ok(stats.into_iter().map(|s| s.partition).collect())
}

fn partition_stats(&self) -> PyResult<Vec<PyPartitionStat>> {
let stats = runtime()
.block_on(self.inner.partition_stats())
.map_err(to_py_err)?;
Ok(stats.into_iter().map(PyPartitionStat::from).collect())
}
}
41 changes: 41 additions & 0 deletions bindings/python/src/tag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use pyo3::prelude::*;

#[pyclass(name = "Tag", module = "pypaimon_rust.datafusion")]
pub struct PyTag {
name: String,
snapshot_id: i64,
}

impl PyTag {
pub fn new(name: String, snapshot_id: i64) -> Self {
Self { name, snapshot_id }
}
}

#[pymethods]
impl PyTag {
fn name(&self) -> String {
self.name.clone()
}

fn snapshot_id(&self) -> i64 {
self.snapshot_id
}
}
37 changes: 37 additions & 0 deletions bindings/python/tests/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,3 +685,40 @@ def test_list_databases_and_tables():
# simple_log_table is non-partitioned, so partition keys are empty.
assert schema.partition_keys() == []

# ---------------- #285: observability ----------------
def test_snapshots_for_simple_table():
catalog = PaimonCatalog({"warehouse": WAREHOUSE})
table = catalog.get_table("default.simple_log_table")

snap = table.latest_snapshot()
assert snap is not None
assert snap.id() >= 1
assert snap.commit_time_ms() > 0
assert snap.commit_kind() in {"APPEND", "COMPACT", "OVERWRITE", "ANALYZE"}

snaps = table.list_snapshots()
assert len(snaps) >= 1
# Newest first.
assert snaps[0].id() == snap.id()


def test_partitions_and_tags_smoke():
catalog = PaimonCatalog({"warehouse": WAREHOUSE})
table = catalog.get_table("default.simple_log_table")

# Non-partitioned, non-tagged table: both should be empty but well-typed.
parts = table.list_partitions()
stats = table.partition_stats()
tags = table.list_tags()

assert isinstance(parts, list)
assert isinstance(stats, list)
assert isinstance(tags, list)
# simple_log_table has no partition keys -> partition_stats yields a single
# empty-partition bucket or zero buckets depending on how the snapshot was
# written. Either is acceptable; we just check the shape.
for p in parts:
assert isinstance(p, dict)
for t in tags:
assert isinstance(t.name(), str)
assert isinstance(t.snapshot_id(), int)
2 changes: 2 additions & 0 deletions crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) mod global_index_scanner;
mod kv_file_reader;
mod kv_file_writer;
mod partition_filter;
mod partition_stat;
mod postpone_file_writer;
mod prepared_files;
mod read_builder;
Expand Down Expand Up @@ -67,6 +68,7 @@ pub use data_evolution_writer::DataEvolutionWriter;
#[cfg(feature = "fulltext")]
pub use full_text_search_builder::FullTextSearchBuilder;
use futures::stream::BoxStream;
pub use partition_stat::PartitionStat;
pub use read_builder::ReadBuilder;
pub use rest_env::RESTEnv;
pub use schema_manager::SchemaManager;
Expand Down
Loading
Loading