Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,31 @@ def create_tag(
tag_mgr = self.tag_manager()
tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists)

def create_tag_from_timestamp(self, tag_name: str, timestamp_millis: int,
ignore_if_exists: bool = False) -> None:
"""
Create a tag from the latest snapshot at or before the given timestamp.

Args:
tag_name: Name for the tag
timestamp_millis: The timestamp in milliseconds. The tag will be
created from the latest snapshot with commit time <= this value.
ignore_if_exists: If True, don't raise error if tag already exists

Raises:
ValueError: If no snapshot exists at or before the given timestamp,
or tag already exists (when ignore_if_exists=False)
"""
snapshot_mgr = self.snapshot_manager()
snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis)
if snapshot is None:
raise ValueError(
f"No snapshot found at or before {timestamp_millis}ms."
)

tag_mgr = self.tag_manager()
tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists)

def delete_tag(self, tag_name: str) -> bool:
"""
Delete a tag.
Expand Down Expand Up @@ -311,7 +336,7 @@ def rollback_to_timestamp(self, timestamp_millis: int) -> None:
snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis)
if snapshot is None:
raise ValueError(
f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms."
f"No snapshot found at or before {timestamp_millis}ms."
)
self.rollback_to(snapshot.id)

Expand Down
30 changes: 30 additions & 0 deletions paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,35 @@ def test_replace_tag_snapshot_not_exists_raises(self):
self.assertIn("doesn't exist", str(cm.exception))


# -- create_tag_from_timestamp ---------------------------------------------

def test_create_tag_from_timestamp(self):
table = self.catalog.get_table(self.identifier)
# Create a second snapshot
wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pydict(
{"id": [10, 11], "value": ["x", "y"]},
schema=self.pa_schema,
))
wb.new_commit().commit(w.prepare_commit())
w.close()

# Use a timestamp far in the future to capture the latest snapshot
import time
future_ts = int(time.time() * 1000) + 100_000
table.create_tag_from_timestamp("ts_tag", future_ts)

tag = table.tag_manager().get("ts_tag")
self.assertEqual(tag.trim_to_snapshot().id, 2)

def test_create_tag_from_timestamp_no_snapshot_raises(self):
table = self.catalog.get_table(self.identifier)
# Use timestamp 0 — no snapshot can be at or before epoch
with self.assertRaises(ValueError) as cm:
table.create_tag_from_timestamp("never", 0)
self.assertIn("at or before", str(cm.exception))


if __name__ == "__main__":
unittest.main()
Loading