From 16e6417a54815c072010b9e1ea507b493c1d712c Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Tue, 19 May 2026 17:41:38 +0800 Subject: [PATCH 1/2] [python] Add create_tag_from_timestamp to FileStoreTable Allow creating a tag from the latest snapshot at or before a given timestamp (in milliseconds), aligning with Java's CreateTagFromTimestampProcedure behavior. --- .../pypaimon/table/file_store_table.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 67be2587b7e8..baddb0e71a8e 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -183,6 +183,31 @@ def create_tag( tag_mgr = self.tag_manager() tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists) + def create_tag_from_timestamp(self, tag_name: str, timestamp_millis: int, + ignore_if_exists: bool = False) -> None: + """ + Create a tag from the latest snapshot at or before the given timestamp. + + Args: + tag_name: Name for the tag + timestamp_millis: The timestamp in milliseconds. The tag will be + created from the latest snapshot with commit time <= this value. + ignore_if_exists: If True, don't raise error if tag already exists + + Raises: + ValueError: If no snapshot exists at or before the given timestamp, + or tag already exists (when ignore_if_exists=False) + """ + snapshot_mgr = self.snapshot_manager() + snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis) + if snapshot is None: + raise ValueError( + f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms." + ) + + tag_mgr = self.tag_manager() + tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists) + def delete_tag(self, tag_name: str) -> bool: """ Delete a tag. From 8f72e99c8789a1d07caa9723ea37bc5ff4ee87fc Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 24 May 2026 21:09:45 +0800 Subject: [PATCH 2/2] [python] Address review feedback on create_tag_from_timestamp - Align error message wording to "at or before" - Add unit tests for create_tag_from_timestamp --- .../pypaimon/table/file_store_table.py | 4 +-- .../tests/filesystem_catalog_tag_test.py | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index baddb0e71a8e..bcad062fe8fb 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -202,7 +202,7 @@ def create_tag_from_timestamp(self, tag_name: str, timestamp_millis: int, snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis) if snapshot is None: raise ValueError( - f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms." + f"No snapshot found at or before {timestamp_millis}ms." ) tag_mgr = self.tag_manager() @@ -336,7 +336,7 @@ def rollback_to_timestamp(self, timestamp_millis: int) -> None: snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis) if snapshot is None: raise ValueError( - f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms." + f"No snapshot found at or before {timestamp_millis}ms." ) self.rollback_to(snapshot.id) diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py index a721dec3d442..c5208fd0b683 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py @@ -235,5 +235,35 @@ def test_replace_tag_snapshot_not_exists_raises(self): self.assertIn("doesn't exist", str(cm.exception)) + # -- create_tag_from_timestamp --------------------------------------------- + + def test_create_tag_from_timestamp(self): + table = self.catalog.get_table(self.identifier) + # Create a second snapshot + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pydict( + {"id": [10, 11], "value": ["x", "y"]}, + schema=self.pa_schema, + )) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + # Use a timestamp far in the future to capture the latest snapshot + import time + future_ts = int(time.time() * 1000) + 100_000 + table.create_tag_from_timestamp("ts_tag", future_ts) + + tag = table.tag_manager().get("ts_tag") + self.assertEqual(tag.trim_to_snapshot().id, 2) + + def test_create_tag_from_timestamp_no_snapshot_raises(self): + table = self.catalog.get_table(self.identifier) + # Use timestamp 0 — no snapshot can be at or before epoch + with self.assertRaises(ValueError) as cm: + table.create_tag_from_timestamp("never", 0) + self.assertIn("at or before", str(cm.exception)) + + if __name__ == "__main__": unittest.main()