From 86f36478f0c04ae66debfc1e9d4427d72dfd7ae2 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 24 May 2026 11:05:45 +0800 Subject: [PATCH 1/2] [python] Add table repair verification logic for pypaimon Implement read-only metadata consistency verification for Paimon tables. This verifies the chain: LATEST -> snapshot -> manifest list -> manifest files -> data files, and reports any broken links or corrupted files. Key components: - RepairIssue/RepairReport: data classes for structured issue reporting - TableRepair.verify(): walks the metadata chain and detects issues - Support for branch-qualified tables and partitioned data file paths - Respects custom partition.default-name configuration - Progress logging every 1000 data files when check_data_files=True - Documented time complexity: O(total_data_files) --- paimon-python/pypaimon/operation/__init__.py | 16 + paimon-python/pypaimon/operation/repair.py | 380 +++++++++++ paimon-python/pypaimon/tests/repair_test.py | 651 +++++++++++++++++++ 3 files changed, 1047 insertions(+) create mode 100644 paimon-python/pypaimon/operation/__init__.py create mode 100644 paimon-python/pypaimon/operation/repair.py create mode 100644 paimon-python/pypaimon/tests/repair_test.py diff --git a/paimon-python/pypaimon/operation/__init__.py b/paimon-python/pypaimon/operation/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/operation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/operation/repair.py b/paimon-python/pypaimon/operation/repair.py new file mode 100644 index 000000000000..d95bcccb8cef --- /dev/null +++ b/paimon-python/pypaimon/operation/repair.py @@ -0,0 +1,380 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Repair operation for Paimon tables. + +This module verifies the metadata consistency chain: + snapshot -> manifest list -> manifest files -> data files + +For filesystem catalogs (no external metastore), verification focuses on: + - Verifying snapshot files are readable and valid + - Verifying manifest list files referenced by snapshots exist and are readable + - Verifying manifest files referenced by manifest lists exist and are readable + - Verifying data files referenced by manifest entries exist + - Detecting dangling snapshot references (LATEST pointing to missing snapshot) +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import List, Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.common.json_util import JSON +from pypaimon.snapshot.snapshot import Snapshot + +logger = logging.getLogger(__name__) + + +@dataclass +class RepairIssue: + """Represents a single issue found during repair verification.""" + level: str # "error" or "warning" + category: str # e.g. "snapshot", "manifest_list", "manifest", "data_file" + message: str + path: Optional[str] = None + + +@dataclass +class RepairReport: + """Report generated by a repair operation.""" + table_path: str + issues: List[RepairIssue] = field(default_factory=list) + snapshots_checked: int = 0 + manifest_lists_checked: int = 0 + manifest_files_checked: int = 0 + data_files_checked: int = 0 + fixes_applied: List[str] = field(default_factory=list) + + @property + def has_errors(self) -> bool: + return any(issue.level == "error" for issue in self.issues) + + @property + def is_healthy(self) -> bool: + return len(self.issues) == 0 + + def summary(self) -> str: + lines = [f"Repair report for: {self.table_path}"] + lines.append(f" Snapshots checked: {self.snapshots_checked}") + lines.append(f" Manifest lists checked: {self.manifest_lists_checked}") + lines.append(f" Manifest files checked: {self.manifest_files_checked}") + lines.append(f" Data files checked: {self.data_files_checked}") + + if self.is_healthy: + lines.append(" Status: HEALTHY - no issues found") + else: + errors = [i for i in self.issues if i.level == "error"] + warnings = [i for i in self.issues if i.level == "warning"] + lines.append(f" Issues: {len(errors)} error(s), {len(warnings)} warning(s)") + for issue in self.issues: + prefix = "ERROR" if issue.level == "error" else "WARN" + path_str = f" [{issue.path}]" if issue.path else "" + lines.append(f" [{prefix}] ({issue.category}) {issue.message}{path_str}") + + if self.fixes_applied: + lines.append(f" Fixes applied: {len(self.fixes_applied)}") + for fix in self.fixes_applied: + lines.append(f" - {fix}") + + return "\n".join(lines) + + +class TableRepair: + """ + Verifies the metadata chain of a Paimon table. + + The metadata chain is: + LATEST file -> snapshot -> manifest list files -> manifest files -> data files + + This class checks each link in the chain and reports issues. + """ + + def __init__(self, file_io: FileIO, table_path: str, branch: Optional[str] = None): + from pypaimon.branch.branch_manager import BranchManager + from pypaimon.schema.schema_manager import SchemaManager + + self.file_io = file_io + self.table_path = table_path.rstrip('/') + self.branch = BranchManager.normalize_branch(branch) + + branch_root = BranchManager.branch_path(self.table_path, self.branch) + self.snapshot_dir = f"{branch_root}/snapshot" + self.manifest_dir = f"{self.table_path}/manifest" + self.latest_file = f"{self.snapshot_dir}/LATEST" + + schema_manager = SchemaManager(file_io, self.table_path, self.branch) + table_schema = schema_manager.latest() + if table_schema is not None: + field_dict = {f.name: f for f in table_schema.fields} + self.partition_keys_fields = [field_dict[name] for name in table_schema.partition_keys] + self.default_part_value = table_schema.options.get( + "partition.default-name", "__DEFAULT_PARTITION__") + else: + self.partition_keys_fields = [] + self.default_part_value = "__DEFAULT_PARTITION__" + + def verify(self, check_data_files: bool = False) -> RepairReport: + """ + Verify the metadata consistency chain. + + Args: + check_data_files: If True, also verify that data files referenced + in manifests actually exist on disk. Time complexity + is O(total_data_files) — proportional to the number + of data file entries across all manifests. + + Returns: + RepairReport with all issues found. + """ + if check_data_files: + logger.info("Verifying data files for table %s (this may be slow for large tables)", + self.table_path) + report = RepairReport(table_path=self.table_path) + + # Step 1: Check snapshot directory exists + if not self.file_io.exists(self.snapshot_dir): + return report + + # Step 2: Find all snapshot files and the LATEST hint + snapshot_ids = self._list_snapshot_ids() + latest_id = self._read_latest_id() + + if not snapshot_ids and latest_id is None: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="No snapshots found - table may be empty or newly created", + )) + return report + + # Step 3: Check LATEST consistency + if latest_id is not None and latest_id not in snapshot_ids: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"LATEST file points to snapshot-{latest_id} which does not exist", + path=self.latest_file, + )) + + if latest_id is None and snapshot_ids: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="LATEST file is missing but snapshot files exist", + path=self.latest_file, + )) + + # Step 4: Verify each snapshot and its manifest chain + for sid in sorted(snapshot_ids): + self._verify_snapshot(sid, report, check_data_files) + + return report + + def _list_snapshot_ids(self) -> List[int]: + """List all snapshot IDs found in the snapshot directory.""" + try: + statuses = self.file_io.list_status(self.snapshot_dir) + except Exception: + return [] + + snapshot_pattern = re.compile(r'^snapshot-(\d+)$') + ids = [] + for status in statuses: + name = status.base_name if hasattr(status, 'base_name') else "" + match = snapshot_pattern.match(name) + if match: + ids.append(int(match.group(1))) + return sorted(ids) + + def _read_latest_id(self) -> Optional[int]: + """Read the LATEST hint file and return the snapshot ID, or None.""" + try: + if not self.file_io.exists(self.latest_file): + return None + content = self.file_io.read_file_utf8(self.latest_file).strip() + if not content: + return None + return int(content) + except (ValueError, Exception): + return None + + def _try_read_snapshot(self, snapshot_id: int) -> Optional[Snapshot]: + """Try to read and parse a snapshot file. Returns None on failure.""" + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + try: + content = self.file_io.read_file_utf8(snapshot_path) + return JSON.from_json(content, Snapshot) + except Exception: + return None + + def _verify_snapshot(self, snapshot_id: int, report: RepairReport, check_data_files: bool): + """Verify a single snapshot and its downstream manifest chain.""" + report.snapshots_checked += 1 + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + + snapshot = self._try_read_snapshot(snapshot_id) + if snapshot is None: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"Snapshot file is unreadable or corrupted (snapshot-{snapshot_id})", + path=snapshot_path, + )) + return + + # Verify base manifest list + self._verify_manifest_list( + snapshot.base_manifest_list, f"snapshot-{snapshot_id}/baseManifestList", + report, check_data_files + ) + + # Verify delta manifest list + self._verify_manifest_list( + snapshot.delta_manifest_list, f"snapshot-{snapshot_id}/deltaManifestList", + report, check_data_files + ) + + # Verify changelog manifest list (optional) + if snapshot.changelog_manifest_list: + self._verify_manifest_list( + snapshot.changelog_manifest_list, + f"snapshot-{snapshot_id}/changelogManifestList", + report, check_data_files + ) + + def _verify_manifest_list(self, manifest_list_name: str, context: str, + report: RepairReport, check_data_files: bool): + """Verify a manifest list file exists and is readable.""" + if not manifest_list_name: + return + + report.manifest_lists_checked += 1 + manifest_list_path = f"{self.manifest_dir}/{manifest_list_name}" + + if not self.file_io.exists(manifest_list_path): + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file missing (referenced by {context})", + path=manifest_list_path, + )) + return + + # Try to read it + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_list_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + manifest_file_names = [] + for record in reader: + manifest_file_names.append(record['_FILE_NAME']) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file is corrupted: {e} (referenced by {context})", + path=manifest_list_path, + )) + return + + # Verify each manifest file referenced in this list + for mf_name in manifest_file_names: + self._verify_manifest_file(mf_name, manifest_list_name, report, check_data_files) + + def _verify_manifest_file(self, manifest_file_name: str, parent_list: str, + report: RepairReport, check_data_files: bool): + """Verify a manifest file exists and is readable.""" + report.manifest_files_checked += 1 + manifest_path = f"{self.manifest_dir}/{manifest_file_name}" + + if not self.file_io.exists(manifest_path): + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file missing (referenced by {parent_list})", + path=manifest_path, + )) + return + + # Always validate the manifest is readable Avro + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + records = list(reader) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file is corrupted: {e}", + path=manifest_path, + )) + return + + if check_data_files: + from pypaimon.table.row.generic_row import GenericRowDeserializer + from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only + + for record in records: + if record.get('_KIND', 0) != 0: + continue + file_dict = record.get('_FILE') + if not file_dict: + continue + file_name = file_dict.get('_FILE_NAME') + if not file_name: + continue + report.data_files_checked += 1 + if report.data_files_checked % 1000 == 0: + logger.info("Checked %d data files so far...", report.data_files_checked) + + external_path = file_dict.get('_EXTERNAL_PATH') + if external_path: + data_file_path = external_path + else: + # Build path: table_path//bucket-/ + path_builder = self.table_path + if self.partition_keys_fields: + try: + partition = GenericRowDeserializer.from_bytes( + record['_PARTITION'], self.partition_keys_fields) + partition_dict = partition.to_dict() + for field_name, field_value in partition_dict.items(): + part_value = (self.default_part_value + if _is_null_or_whitespace_only(field_value) + else str(field_value)) + path_builder = f"{path_builder}/{field_name}={part_value}" + except Exception: + pass + bucket = record.get('_BUCKET', 0) + data_file_path = f"{path_builder}/bucket-{bucket}/{file_name}" + + if not self.file_io.exists(data_file_path): + report.issues.append(RepairIssue( + level="error", + category="data_file", + message=f"Data file missing (referenced by {manifest_file_name})", + path=data_file_path, + )) diff --git a/paimon-python/pypaimon/tests/repair_test.py b/paimon-python/pypaimon/tests/repair_test.py new file mode 100644 index 000000000000..712976b0fcb1 --- /dev/null +++ b/paimon-python/pypaimon/tests/repair_test.py @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for the repair verification operation.""" + +import json +import os +import shutil +import tempfile +import unittest + +from pypaimon import CatalogFactory, Schema +from pypaimon.operation.repair import ( + RepairIssue, + RepairReport, + TableRepair, +) +from pypaimon.schema.data_types import AtomicType, DataField + + +class TestRepairReport(unittest.TestCase): + """Tests for RepairReport data class.""" + + def test_empty_report_is_healthy(self): + report = RepairReport(table_path="/test/path") + self.assertTrue(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("HEALTHY", report.summary()) + + def test_report_with_warning(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="warning", category="snapshot", message="Something minor" + )) + self.assertFalse(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("WARN", report.summary()) + + def test_report_with_error(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="error", category="manifest_list", message="File missing", + path="/some/path" + )) + self.assertFalse(report.is_healthy) + self.assertTrue(report.has_errors) + self.assertIn("ERROR", report.summary()) + self.assertIn("/some/path", report.summary()) + + def test_report_summary_counts(self): + report = RepairReport(table_path="/test/path") + report.snapshots_checked = 3 + report.manifest_lists_checked = 6 + report.manifest_files_checked = 12 + report.data_files_checked = 0 + summary = report.summary() + self.assertIn("Snapshots checked: 3", summary) + self.assertIn("Manifest lists checked: 6", summary) + self.assertIn("Manifest files checked: 12", summary) + + def test_report_with_fixes(self): + report = RepairReport(table_path="/test/path") + report.fixes_applied.append("Updated LATEST file") + summary = report.summary() + self.assertIn("Fixes applied: 1", summary) + self.assertIn("Updated LATEST file", summary) + + +class TestTableRepairVerify(unittest.TestCase): + """Integration tests for TableRepair.verify() with actual filesystem.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="repair_test_") + self.warehouse = os.path.join(self.temp_dir, "warehouse") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_catalog_and_table(self): + """Helper to create a catalog with a test table containing data.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + ], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.test_table", schema, False) + return catalog + + def _get_table_repair(self, table_path, branch=None): + """Create a TableRepair instance using PyArrow filesystem.""" + from pypaimon.common.file_io import FileIO + file_io = FileIO.get(table_path) + return TableRepair(file_io, table_path, branch=branch) + + def test_verify_empty_table_no_snapshots(self): + """A newly created table with no data should report no issues.""" + self._create_catalog_and_table() + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertFalse(report.has_errors) + + def test_verify_healthy_table_with_snapshot(self): + """A table with a valid snapshot should be reported as healthy.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertEqual(report.snapshots_checked, 1) + self.assertEqual(report.manifest_lists_checked, 2) + self.assertTrue(report.is_healthy) + + def test_verify_detects_missing_manifest_list(self): + """Should detect when a manifest list referenced by snapshot is missing.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Create only delta manifest list, not base + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("manifest-list-base-1" in m or "missing" in m.lower() + for m in error_messages)) + + def test_verify_detects_dangling_latest(self): + """Should detect when LATEST points to a non-existent snapshot.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + # LATEST points to snapshot-5, but only snapshot-1 exists + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("5") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("snapshot-5" in m for m in error_messages)) + + def test_verify_corrupted_snapshot_file(self): + """Should detect unreadable/corrupted snapshot files.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + f.write("this is not valid json{{{") + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("corrupted" in m.lower() or "unreadable" in m.lower() + for m in error_messages)) + + def test_check_data_files_detects_missing(self): + """check_data_files=True should detect missing data files.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + + # Without check_data_files: should be healthy + report = repairer.verify(check_data_files=False) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + # With check_data_files: should find the missing data file + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertTrue(report.has_errors) + data_file_issues = [i for i in report.issues if i.category == "data_file"] + self.assertEqual(len(data_file_issues), 1) + self.assertIn("data-abc.orc", data_file_issues[0].path) + + def test_check_data_files_skips_delete_entries(self): + """DELETE entries for removed files should not be flagged as missing.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="deleted-file.orc", + kind=1 # DELETE + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_passes_when_file_exists(self): + """check_data_files=True should pass when data files exist.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create the data file at the expected location + data_file_dir = os.path.join(table_path, "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-abc.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_with_partition(self): + """check_data_files should construct correct path for partitioned tables.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "dt", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["dt"], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.part_table", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "part_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + dt_field = DataField(1, "dt", AtomicType("STRING")) + partition_bytes = self._serialize_partition(["2024-01-01"], [dt_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-part.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at partitioned path + data_file_dir = os.path.join(table_path, "dt=2024-01-01", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-part.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_custom_partition_default_name(self): + """Tables with custom partition.default-name use that value for null partitions.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "region", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["region"], + primary_keys=["id"], + options={"partition.default-name": "UNSET"}, + comment="" + ) + catalog.create_table("test_db.custom_part", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "custom_part") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + region_field = DataField(1, "region", AtomicType("STRING")) + partition_bytes = self._serialize_partition([None], [region_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-null.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at custom default partition path + data_file_dir = os.path.join(table_path, "region=UNSET", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-null.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_verify_branch_table(self): + """Verifying a branch-qualified table checks the branch snapshot dir.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + + # Create main branch snapshot (healthy) + main_snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(main_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-main")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-main")) + snapshot_main = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-main", + "deltaManifestList": "manifest-list-delta-main", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(main_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_main, f) + with open(os.path.join(main_snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + # Create branch "b1" with a dangling LATEST + branch_snapshot_dir = os.path.join(table_path, "branch", "branch-b1", "snapshot") + os.makedirs(branch_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-b1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-b1")) + snapshot_b1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-b1", + "deltaManifestList": "manifest-list-delta-b1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(branch_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_b1, f) + with open(os.path.join(branch_snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Verify main branch - should be healthy + repairer_main = self._get_table_repair(table_path) + report_main = repairer_main.verify() + self.assertFalse(report_main.has_errors) + + # Verify branch b1 - should detect dangling LATEST + repairer_branch = self._get_table_repair(table_path, branch="b1") + report_branch = repairer_branch.verify() + self.assertTrue(report_branch.has_errors) + error_messages = [i.message for i in report_branch.issues if i.level == "error"] + self.assertTrue(any("snapshot-99" in m for m in error_messages)) + + def _write_empty_avro(self, path: str): + """Write an empty Avro file (valid manifest list with no records).""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, []) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_list_with_entry(self, path: str, manifest_file_name: str): + """Write a manifest list Avro referencing a single manifest file.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + record = { + "_VERSION": 1, + "_FILE_NAME": manifest_file_name, + "_FILE_SIZE": 100, + "_NUM_ADDED_FILES": 1, + "_NUM_DELETED_FILES": 0, + "_PARTITION_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_SCHEMA_ID": 0, + "_MIN_ROW_ID": None, + "_MAX_ROW_ID": None, + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_with_data_file(self, path: str, partition_bytes: bytes, + bucket: int, file_name: str, + external_path=None, kind=0): + """Write a manifest Avro with a single data file entry.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_entry import MANIFEST_ENTRY_SCHEMA + + record = { + "_VERSION": 1, + "_KIND": kind, + "_PARTITION": partition_bytes, + "_BUCKET": bucket, + "_TOTAL_BUCKETS": 1, + "_FILE": { + "_FILE_NAME": file_name, + "_FILE_SIZE": 1024, + "_ROW_COUNT": 10, + "_MIN_KEY": b"", + "_MAX_KEY": b"", + "_KEY_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_VALUE_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_MIN_SEQUENCE_NUMBER": 0, + "_MAX_SEQUENCE_NUMBER": 0, + "_SCHEMA_ID": 0, + "_LEVEL": 0, + "_EXTRA_FILES": [], + "_CREATION_TIME": None, + "_DELETE_ROW_COUNT": None, + "_EMBEDDED_FILE_INDEX": None, + "_FILE_SOURCE": None, + "_VALUE_STATS_COLS": None, + "_EXTERNAL_PATH": external_path, + "_FIRST_ROW_ID": None, + "_WRITE_COLS": None, + } + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _serialize_partition(self, values, fields): + """Serialize partition values using GenericRowSerializer.""" + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + row = GenericRow(values, fields) + return GenericRowSerializer.to_bytes(row) + + +if __name__ == '__main__': + unittest.main() From 25edb3de7a1bdf7042ad5f99986f99323e693609 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 24 May 2026 11:35:29 +0800 Subject: [PATCH 2/2] [python] Add table repair fix mode and catalog integration for pypaimon Add the ability to fix metadata inconsistencies found during verification. Currently supports fixing the LATEST hint file to point to the newest valid snapshot when it references a missing one. Key additions: - TableRepair.repair(dry_run=False): applies fixes after verification - repair_table/repair_database/repair_catalog module-level entry points - Catalog.repair_table/repair_database/repair_catalog API with type annotations - FileSystemCatalog implementation delegating to repair module - Fix mode selects newest snapshot with intact manifest chain - check_data_files is respected when choosing which snapshot to fix to - Per-table error isolation in repair_database (continues on failure) - Idempotent fix operations (safe to re-run after interruption) --- paimon-python/pypaimon/catalog/catalog.py | 49 +++ .../pypaimon/catalog/filesystem_catalog.py | 32 ++ paimon-python/pypaimon/operation/repair.py | 169 ++++++++++ paimon-python/pypaimon/tests/repair_test.py | 289 ++++++++++++++++++ 4 files changed, 539 insertions(+) diff --git a/paimon-python/pypaimon/catalog/catalog.py b/paimon-python/pypaimon/catalog/catalog.py index 4a364b06aab5..1ba5b4718079 100644 --- a/paimon-python/pypaimon/catalog/catalog.py +++ b/paimon-python/pypaimon/catalog/catalog.py @@ -401,3 +401,52 @@ def list_tags_paged( raise NotImplementedError( "list_tags_paged is not supported by this catalog." ) + + def repair_table(self, identifier: Union[str, Identifier], + check_data_files: bool = False, dry_run: bool = True) -> 'RepairReport': + """Repair the metadata of a single table. + + Verifies the consistency of the snapshot -> manifest list -> manifest + -> data file chain and optionally fixes issues found. + + Args: + identifier: Table identifier (Identifier or string). + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + RepairReport describing issues found and fixes applied. + """ + raise NotImplementedError( + "repair_table is not supported by this catalog." + ) + + def repair_database(self, database_name: str, + check_data_files: bool = False, dry_run: bool = True) -> 'List[RepairReport]': + """Repair all tables in a database. + + Args: + database_name: Name of the database. + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + raise NotImplementedError( + "repair_database is not supported by this catalog." + ) + + def repair_catalog(self, check_data_files: bool = False, dry_run: bool = True) -> 'List[RepairReport]': + """Repair all tables in all databases in the catalog. + + Args: + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + raise NotImplementedError( + "repair_catalog is not supported by this catalog." + ) diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 86e2f775e769..a83dff8e62f5 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -584,3 +584,35 @@ def list_branches( identifier = Identifier.from_string(identifier) table = self.get_table(identifier) return table.branch_manager().branches() + + def repair_table(self, identifier: Union[str, Identifier], + check_data_files: bool = False, dry_run: bool = True) -> 'RepairReport': + """Repair the metadata of a single table.""" + from pypaimon.operation.repair import repair_table as _repair_table + + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + table_path = self.get_table_path(identifier) + if not self.file_io.exists(table_path): + raise TableNotExistException(identifier) + return _repair_table(self.file_io, table_path, branch=identifier.get_branch_name(), + check_data_files=check_data_files, dry_run=dry_run) + + def repair_database(self, database_name: str, + check_data_files: bool = False, dry_run: bool = True) -> 'List[RepairReport]': + """Repair all tables in a database.""" + from pypaimon.operation.repair import repair_database as _repair_database + + try: + self.get_database(database_name) + except DatabaseNotExistException: + raise + return _repair_database(self.file_io, self.warehouse, database_name, + check_data_files=check_data_files, dry_run=dry_run) + + def repair_catalog(self, check_data_files: bool = False, dry_run: bool = True) -> 'List[RepairReport]': + """Repair all tables in all databases in the catalog.""" + from pypaimon.operation.repair import repair_catalog as _repair_catalog + + return _repair_catalog(self.file_io, self.warehouse, + check_data_files=check_data_files, dry_run=dry_run) diff --git a/paimon-python/pypaimon/operation/repair.py b/paimon-python/pypaimon/operation/repair.py index d95bcccb8cef..bd59dc7d9a66 100644 --- a/paimon-python/pypaimon/operation/repair.py +++ b/paimon-python/pypaimon/operation/repair.py @@ -378,3 +378,172 @@ def _verify_manifest_file(self, manifest_file_name: str, parent_list: str, message=f"Data file missing (referenced by {manifest_file_name})", path=data_file_path, )) + + def repair(self, check_data_files: bool = False, dry_run: bool = True) -> RepairReport: + """ + Verify and optionally repair the metadata chain. + + Args: + check_data_files: If True, also verify data files exist. + dry_run: If True, only report issues without making changes. + If False, apply fixes where possible. + + Returns: + RepairReport with issues found and fixes applied. + """ + report = self.verify(check_data_files=check_data_files) + + if report.is_healthy: + return report + + if dry_run: + return report + + # Apply fixes + self._fix_latest_file(report, check_data_files) + + return report + + def _fix_latest_file(self, report: RepairReport, check_data_files: bool = False): + """Fix the LATEST file to point to the newest valid snapshot. + + This operation is idempotent: it performs a single atomic write of the + LATEST file. If interrupted before writing, LATEST is unchanged and + re-running will retry the fix. If interrupted after writing, LATEST is + already correct. Re-running on an already-fixed table is a no-op. + """ + snapshot_ids = self._list_snapshot_ids() + if not snapshot_ids: + return + + # Find the newest snapshot whose full manifest chain is healthy + for sid in sorted(snapshot_ids, reverse=True): + if not self._is_snapshot_healthy(sid, check_data_files): + continue + + current_latest = self._read_latest_id() + if current_latest == sid: + return + + # Write LATEST atomically + latest_content = str(sid) + try: + success = self.file_io.try_to_write_atomic(self.latest_file, latest_content) + if not success: + self.file_io.delete(self.latest_file) + self.file_io.overwrite_file_utf8(self.latest_file, latest_content) + report.fixes_applied.append( + f"Updated LATEST file to point to snapshot-{sid}" + ) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"Failed to write LATEST file: {e}", + path=self.latest_file, + )) + return + + def _is_snapshot_healthy(self, snapshot_id: int, check_data_files: bool = False) -> bool: + """Check whether a snapshot and its manifest chain are intact.""" + tmp_report = RepairReport(table_path=self.table_path) + self._verify_snapshot(snapshot_id, tmp_report, check_data_files=check_data_files) + return not tmp_report.has_errors + + +def repair_table(file_io: FileIO, table_path: str, branch: Optional[str] = None, + check_data_files: bool = False, dry_run: bool = True) -> RepairReport: + """ + Repair a single Paimon table. + + This is the main entry point for repairing a table's metadata chain. + It verifies the consistency of: + snapshot -> manifest list -> manifest files -> data files + + Args: + file_io: FileIO instance for accessing the table's storage. + table_path: Root path of the table. + branch: Branch name (None for main branch). + check_data_files: Whether to verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + RepairReport with all findings and any fixes applied. + """ + repairer = TableRepair(file_io, table_path, branch) + return repairer.repair(check_data_files=check_data_files, dry_run=dry_run) + + +def repair_database(file_io: FileIO, warehouse: str, database_name: str, + check_data_files: bool = False, dry_run: bool = True) -> List[RepairReport]: + """ + Repair all tables in a database. + + Args: + file_io: FileIO instance for accessing storage. + warehouse: Warehouse root path. + database_name: Database name to repair. + check_data_files: Whether to verify data file existence. + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + import pyarrow.fs as pafs + + db_path = f"{warehouse.rstrip('/')}/{database_name}.db" + if not file_io.exists(db_path): + raise ValueError(f"Database directory does not exist: {db_path}") + + reports = [] + statuses = file_io.list_status(db_path) + for status in statuses: + is_directory = hasattr(status, 'type') and status.type == pafs.FileType.Directory + name = status.base_name if hasattr(status, 'base_name') else "" + if is_directory and name and not name.startswith("."): + table_path = f"{db_path}/{name}" + try: + report = repair_table(file_io, table_path, check_data_files=check_data_files, + dry_run=dry_run) + reports.append(report) + except Exception as e: + logger.warning(f"Failed to repair table '{name}' in database '{database_name}': {e}") + + return reports + + +def repair_catalog(file_io: FileIO, warehouse: str, + check_data_files: bool = False, dry_run: bool = True) -> List[RepairReport]: + """ + Repair all tables in all databases in the catalog. + + Args: + file_io: FileIO instance for accessing storage. + warehouse: Warehouse root path. + check_data_files: Whether to verify data file existence. + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + import pyarrow.fs as pafs + + warehouse = warehouse.rstrip('/') + reports = [] + + statuses = file_io.list_status(warehouse) + for status in statuses: + is_directory = hasattr(status, 'type') and status.type == pafs.FileType.Directory + name = status.base_name if hasattr(status, 'base_name') else "" + if is_directory and name and name.endswith(".db"): + database_name = name[:-3] + try: + db_reports = repair_database( + file_io, warehouse, database_name, + check_data_files=check_data_files, dry_run=dry_run + ) + reports.extend(db_reports) + except Exception as e: + logger.warning(f"Failed to repair database '{database_name}': {e}") + + return reports diff --git a/paimon-python/pypaimon/tests/repair_test.py b/paimon-python/pypaimon/tests/repair_test.py index 712976b0fcb1..fea35b51cf38 100644 --- a/paimon-python/pypaimon/tests/repair_test.py +++ b/paimon-python/pypaimon/tests/repair_test.py @@ -28,6 +28,9 @@ RepairIssue, RepairReport, TableRepair, + repair_table, + repair_database, + repair_catalog, ) from pypaimon.schema.data_types import AtomicType, DataField @@ -647,5 +650,291 @@ def _serialize_partition(self, values, fields): return GenericRowSerializer.to_bytes(row) +class TestTableRepairFixMode(unittest.TestCase): + """Tests for TableRepair fix mode and catalog integration.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="repair_fix_test_") + self.warehouse = os.path.join(self.temp_dir, "warehouse") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_catalog_and_table(self): + """Helper to create a catalog with a test table.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + ], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.test_table", schema, False) + return catalog + + def _write_empty_avro(self, path: str): + """Write an empty Avro file (valid manifest list with no records).""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, []) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_list_with_entry(self, path: str, manifest_file_name: str): + """Write a manifest list Avro referencing a single manifest file.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + record = { + "_VERSION": 1, + "_FILE_NAME": manifest_file_name, + "_FILE_SIZE": 100, + "_NUM_ADDED_FILES": 1, + "_NUM_DELETED_FILES": 0, + "_PARTITION_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_SCHEMA_ID": 0, + "_MIN_ROW_ID": None, + "_MAX_ROW_ID": None, + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_with_data_file(self, path: str, partition_bytes: bytes, + bucket: int, file_name: str, + external_path=None, kind=0): + """Write a manifest Avro with a single data file entry.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_entry import MANIFEST_ENTRY_SCHEMA + record = { + "_VERSION": 1, + "_KIND": kind, + "_PARTITION": partition_bytes, + "_BUCKET": bucket, + "_TOTAL_BUCKETS": 1, + "_FILE": { + "_FILE_NAME": file_name, + "_FILE_SIZE": 1024, + "_ROW_COUNT": 10, + "_MIN_KEY": b"", + "_MAX_KEY": b"", + "_KEY_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_VALUE_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_MIN_SEQUENCE_NUMBER": 0, + "_MAX_SEQUENCE_NUMBER": 0, + "_SCHEMA_ID": 0, + "_LEVEL": 0, + "_EXTRA_FILES": [], + "_CREATION_TIME": None, + "_DELETE_ROW_COUNT": None, + "_EMBEDDED_FILE_INDEX": None, + "_FILE_SOURCE": None, + "_VALUE_STATS_COLS": None, + "_EXTERNAL_PATH": external_path, + "_FIRST_ROW_ID": None, + "_WRITE_COLS": None, + } + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _serialize_partition(self, values, fields): + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + row = GenericRow(values, fields) + return GenericRowSerializer.to_bytes(row) + + def test_repair_fix_dangling_latest(self): + """Should fix LATEST to point to the newest valid snapshot.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + report = catalog.repair_table("test_db.test_table", dry_run=False) + self.assertTrue(len(report.fixes_applied) > 0) + self.assertIn("snapshot-1", report.fixes_applied[0]) + + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "1") + + def test_dry_run_does_not_modify(self): + """Dry run should not change any files.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + report = catalog.repair_table("test_db.test_table", dry_run=True) + self.assertEqual(len(report.fixes_applied), 0) + + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "99") + + def test_repair_database_level(self): + """Should repair all tables in a database.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("mydb", False) + + schema = Schema( + fields=[DataField(0, "id", AtomicType("INT"))], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("mydb.t1", schema, False) + catalog.create_table("mydb.t2", schema, False) + + reports = catalog.repair_database("mydb") + self.assertEqual(len(reports), 2) + + def test_repair_catalog_level(self): + """Should repair all tables in all databases.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("db1", False) + catalog.create_database("db2", False) + + schema = Schema( + fields=[DataField(0, "id", AtomicType("INT"))], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("db1.t1", schema, False) + catalog.create_table("db2.t2", schema, False) + + reports = catalog.repair_catalog() + self.assertEqual(len(reports), 2) + + def test_fix_latest_respects_check_data_files(self): + """_fix_latest_file should not select a snapshot with missing data files + when check_data_files=True.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + # Snapshot 1: healthy (empty manifests) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + snapshot1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot1, f) + + # Snapshot 2: references a missing data file + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-2"), + partition_bytes, bucket=0, file_name="missing-file.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-2"), "manifest-2" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-2")) + snapshot2 = { + "version": 3, "id": 2, "schemaId": 0, + "baseManifestList": "manifest-list-base-2", + "deltaManifestList": "manifest-list-delta-2", + "totalRecordCount": 10, "deltaRecordCount": 10, + "commitUser": "test", "commitIdentifier": 2, + "commitKind": "APPEND", "timeMillis": 2000000 + } + with open(os.path.join(snapshot_dir, "snapshot-2"), 'w') as f: + json.dump(snapshot2, f) + + # LATEST points to non-existent snapshot-99 + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Repair with check_data_files=True: should skip snapshot-2 and pick snapshot-1 + report = catalog.repair_table("test_db.test_table", + check_data_files=True, dry_run=False) + self.assertTrue(any("snapshot-1" in fix for fix in report.fixes_applied)) + + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "1") + + def test_repair_is_idempotent(self): + """Running repair twice should converge: second run is a no-op.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # First repair fixes the issue + report1 = catalog.repair_table("test_db.test_table", dry_run=False) + self.assertTrue(len(report1.fixes_applied) > 0) + + # Second repair finds no issues — idempotent + report2 = catalog.repair_table("test_db.test_table", dry_run=False) + self.assertTrue(report2.is_healthy) + self.assertEqual(len(report2.fixes_applied), 0) + + if __name__ == '__main__': unittest.main()