diff --git a/paimon-python/pypaimon/catalog/catalog.py b/paimon-python/pypaimon/catalog/catalog.py index 4a364b06aab5..9c0fe74c835d 100644 --- a/paimon-python/pypaimon/catalog/catalog.py +++ b/paimon-python/pypaimon/catalog/catalog.py @@ -401,3 +401,52 @@ def list_tags_paged( raise NotImplementedError( "list_tags_paged is not supported by this catalog." ) + + def repair_table(self, identifier: Union[str, Identifier], + check_data_files: bool = False, dry_run: bool = True): + """Repair the metadata of a single table. + + Verifies the consistency of the snapshot -> manifest list -> manifest + -> data file chain and optionally fixes issues found. + + Args: + identifier: Table identifier (Identifier or string). + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + RepairReport describing issues found and fixes applied. + """ + raise NotImplementedError( + "repair_table is not supported by this catalog." + ) + + def repair_database(self, database_name: str, + check_data_files: bool = False, dry_run: bool = True): + """Repair all tables in a database. + + Args: + database_name: Name of the database. + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + raise NotImplementedError( + "repair_database is not supported by this catalog." + ) + + def repair_catalog(self, check_data_files: bool = False, dry_run: bool = True): + """Repair all tables in all databases in the catalog. + + Args: + check_data_files: If True, verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + raise NotImplementedError( + "repair_catalog is not supported by this catalog." + ) diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 86e2f775e769..29d5b4e8876c 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -584,3 +584,35 @@ def list_branches( identifier = Identifier.from_string(identifier) table = self.get_table(identifier) return table.branch_manager().branches() + + def repair_table(self, identifier: Union[str, Identifier], + check_data_files: bool = False, dry_run: bool = True): + """Repair the metadata of a single table.""" + from pypaimon.operation.repair import repair_table as _repair_table + + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + table_path = self.get_table_path(identifier) + if not self.file_io.exists(table_path): + raise TableNotExistException(identifier) + return _repair_table(self.file_io, table_path, branch=identifier.get_branch_name(), + check_data_files=check_data_files, dry_run=dry_run) + + def repair_database(self, database_name: str, + check_data_files: bool = False, dry_run: bool = True): + """Repair all tables in a database.""" + from pypaimon.operation.repair import repair_database as _repair_database + + try: + self.get_database(database_name) + except DatabaseNotExistException: + raise + return _repair_database(self.file_io, self.warehouse, database_name, + check_data_files=check_data_files, dry_run=dry_run) + + def repair_catalog(self, check_data_files: bool = False, dry_run: bool = True): + """Repair all tables in all databases in the catalog.""" + from pypaimon.operation.repair import repair_catalog as _repair_catalog + + return _repair_catalog(self.file_io, self.warehouse, + check_data_files=check_data_files, dry_run=dry_run) diff --git a/paimon-python/pypaimon/cli/cli_table.py b/paimon-python/pypaimon/cli/cli_table.py index ba8446fcf9c4..5ff676c5c2d8 100644 --- a/paimon-python/pypaimon/cli/cli_table.py +++ b/paimon-python/pypaimon/cli/cli_table.py @@ -818,6 +818,40 @@ def cmd_table_list_partitions(args): sys.exit(1) +def cmd_table_repair(args): + """ + Execute the 'table repair' command. + + Verifies and optionally repairs the metadata consistency chain of a table. + """ + from pypaimon.cli.cli import load_catalog_config, create_catalog + + config_path = args.config + config = load_catalog_config(config_path) + catalog = create_catalog(config) + + table_identifier = args.table + parts = table_identifier.split('.') + if len(parts) != 2: + print(f"Error: Invalid table identifier '{table_identifier}'. " + f"Expected format: 'database.table'", file=sys.stderr) + sys.exit(1) + + dry_run = not args.fix + + try: + report = catalog.repair_table( + table_identifier, + dry_run=dry_run + ) + print(report.summary()) + if report.has_errors and dry_run: + print("\nRun with --fix to apply available fixes.") + except Exception as e: + print(f"Error: Failed to repair table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + def cmd_table_drop_partition(args): """ Execute the 'table drop-partition' command. @@ -1170,3 +1204,16 @@ def add_table_subcommands(table_parser): update_comment_parser = alter_subparsers.add_parser('update-comment', help='Update table comment') update_comment_parser.add_argument('--comment', '-c', required=True, help='New table comment') update_comment_parser.set_defaults(func=cmd_table_alter) + + # table repair command + repair_parser = table_subparsers.add_parser( + 'repair', help='Verify and repair table metadata consistency') + repair_parser.add_argument( + 'table', + help='Table identifier in format: database.table' + ) + repair_parser.add_argument( + '--fix', action='store_true', + help='Apply fixes (default is dry-run/report only)' + ) + repair_parser.set_defaults(func=cmd_table_repair) diff --git a/paimon-python/pypaimon/operation/__init__.py b/paimon-python/pypaimon/operation/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/operation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/operation/repair.py b/paimon-python/pypaimon/operation/repair.py new file mode 100644 index 000000000000..466292850e73 --- /dev/null +++ b/paimon-python/pypaimon/operation/repair.py @@ -0,0 +1,537 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Repair operation for Paimon tables. + +This module verifies and repairs the metadata consistency chain: + snapshot -> manifest list -> manifest files -> data files + +For filesystem catalogs (no external metastore), repair focuses on: + - Verifying snapshot files are readable and valid + - Verifying manifest list files referenced by snapshots exist and are readable + - Verifying manifest files referenced by manifest lists exist and are readable + - Verifying data files referenced by manifest entries exist + - Removing dangling snapshot references (LATEST pointing to missing snapshot) + - Optionally rewriting the LATEST hint file to the newest valid snapshot +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import List, Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.common.json_util import JSON +from pypaimon.snapshot.snapshot import Snapshot + +logger = logging.getLogger(__name__) + + +@dataclass +class RepairIssue: + """Represents a single issue found during repair verification.""" + level: str # "error" or "warning" + category: str # e.g. "snapshot", "manifest_list", "manifest", "data_file" + message: str + path: Optional[str] = None + + +@dataclass +class RepairReport: + """Report generated by a repair operation.""" + table_path: str + issues: List[RepairIssue] = field(default_factory=list) + snapshots_checked: int = 0 + manifest_lists_checked: int = 0 + manifest_files_checked: int = 0 + data_files_checked: int = 0 + fixes_applied: List[str] = field(default_factory=list) + + @property + def has_errors(self) -> bool: + return any(issue.level == "error" for issue in self.issues) + + @property + def is_healthy(self) -> bool: + return len(self.issues) == 0 + + def summary(self) -> str: + lines = [f"Repair report for: {self.table_path}"] + lines.append(f" Snapshots checked: {self.snapshots_checked}") + lines.append(f" Manifest lists checked: {self.manifest_lists_checked}") + lines.append(f" Manifest files checked: {self.manifest_files_checked}") + lines.append(f" Data files checked: {self.data_files_checked}") + + if self.is_healthy: + lines.append(" Status: HEALTHY - no issues found") + else: + errors = [i for i in self.issues if i.level == "error"] + warnings = [i for i in self.issues if i.level == "warning"] + lines.append(f" Issues: {len(errors)} error(s), {len(warnings)} warning(s)") + for issue in self.issues: + prefix = "ERROR" if issue.level == "error" else "WARN" + path_str = f" [{issue.path}]" if issue.path else "" + lines.append(f" [{prefix}] ({issue.category}) {issue.message}{path_str}") + + if self.fixes_applied: + lines.append(f" Fixes applied: {len(self.fixes_applied)}") + for fix in self.fixes_applied: + lines.append(f" - {fix}") + + return "\n".join(lines) + + +class TableRepair: + """ + Verifies and repairs the metadata chain of a Paimon table. + + The metadata chain is: + LATEST file -> snapshot -> manifest list files -> manifest files -> data files + + This class checks each link in the chain and reports issues. In fix mode, + it can repair certain problems (e.g., LATEST pointing to a missing snapshot). + """ + + def __init__(self, file_io: FileIO, table_path: str, branch: Optional[str] = None): + from pypaimon.branch.branch_manager import BranchManager + from pypaimon.schema.schema_manager import SchemaManager + + self.file_io = file_io + self.table_path = table_path.rstrip('/') + self.branch = BranchManager.normalize_branch(branch) + + branch_root = BranchManager.branch_path(self.table_path, self.branch) + self.snapshot_dir = f"{branch_root}/snapshot" + self.manifest_dir = f"{self.table_path}/manifest" + self.latest_file = f"{self.snapshot_dir}/LATEST" + + schema_manager = SchemaManager(file_io, self.table_path, self.branch) + table_schema = schema_manager.latest() + if table_schema is not None: + field_dict = {f.name: f for f in table_schema.fields} + self.partition_keys_fields = [field_dict[name] for name in table_schema.partition_keys] + self.default_part_value = table_schema.options.get( + "partition.default-name", "__DEFAULT_PARTITION__") + else: + self.partition_keys_fields = [] + self.default_part_value = "__DEFAULT_PARTITION__" + + def verify(self, check_data_files: bool = False) -> RepairReport: + """ + Verify the metadata consistency chain. + + Args: + check_data_files: If True, also verify that data files referenced + in manifests actually exist on disk. This can be + slow for large tables. + + Returns: + RepairReport with all issues found. + """ + report = RepairReport(table_path=self.table_path) + + # Step 1: Check snapshot directory exists + if not self.file_io.exists(self.snapshot_dir): + # No snapshot directory is normal for a newly created table with no data + return report + + # Step 2: Find all snapshot files and the LATEST hint + snapshot_ids = self._list_snapshot_ids() + latest_id = self._read_latest_id() + + if not snapshot_ids and latest_id is None: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="No snapshots found - table may be empty or newly created", + )) + return report + + # Step 3: Check LATEST consistency + if latest_id is not None and latest_id not in snapshot_ids: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"LATEST file points to snapshot-{latest_id} which does not exist", + path=self.latest_file, + )) + + if latest_id is None and snapshot_ids: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="LATEST file is missing but snapshot files exist", + path=self.latest_file, + )) + + # Step 4: Verify each snapshot and its manifest chain + for sid in sorted(snapshot_ids): + self._verify_snapshot(sid, report, check_data_files) + + return report + + def repair(self, check_data_files: bool = False, dry_run: bool = True) -> RepairReport: + """ + Verify and optionally repair the metadata chain. + + Args: + check_data_files: If True, also verify data files exist. + dry_run: If True, only report issues without making changes. + If False, apply fixes where possible. + + Returns: + RepairReport with issues found and fixes applied. + """ + report = self.verify(check_data_files=check_data_files) + + if report.is_healthy: + return report + + if dry_run: + return report + + # Apply fixes + self._fix_latest_file(report, check_data_files) + + return report + + def _fix_latest_file(self, report: RepairReport, check_data_files: bool = False): + """Fix the LATEST file to point to the newest valid snapshot.""" + snapshot_ids = self._list_snapshot_ids() + if not snapshot_ids: + return + + # Find the newest snapshot whose full manifest chain is healthy + for sid in sorted(snapshot_ids, reverse=True): + if not self._is_snapshot_healthy(sid, check_data_files): + continue + + current_latest = self._read_latest_id() + if current_latest == sid: + return + + # Write LATEST atomically + latest_content = str(sid) + try: + success = self.file_io.try_to_write_atomic(self.latest_file, latest_content) + if not success: + self.file_io.delete(self.latest_file) + self.file_io.overwrite_file_utf8(self.latest_file, latest_content) + report.fixes_applied.append( + f"Updated LATEST file to point to snapshot-{sid}" + ) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"Failed to write LATEST file: {e}", + path=self.latest_file, + )) + return + + def _is_snapshot_healthy(self, snapshot_id: int, check_data_files: bool = False) -> bool: + """Check whether a snapshot and its manifest chain are intact.""" + tmp_report = RepairReport(table_path=self.table_path) + self._verify_snapshot(snapshot_id, tmp_report, check_data_files=check_data_files) + return not tmp_report.has_errors + + def _list_snapshot_ids(self) -> List[int]: + """List all snapshot IDs found in the snapshot directory.""" + try: + statuses = self.file_io.list_status(self.snapshot_dir) + except Exception: + return [] + + snapshot_pattern = re.compile(r'^snapshot-(\d+)$') + ids = [] + for status in statuses: + name = status.base_name if hasattr(status, 'base_name') else "" + match = snapshot_pattern.match(name) + if match: + ids.append(int(match.group(1))) + return sorted(ids) + + def _read_latest_id(self) -> Optional[int]: + """Read the LATEST hint file and return the snapshot ID, or None.""" + try: + if not self.file_io.exists(self.latest_file): + return None + content = self.file_io.read_file_utf8(self.latest_file).strip() + if not content: + return None + return int(content) + except (ValueError, Exception): + return None + + def _try_read_snapshot(self, snapshot_id: int) -> Optional[Snapshot]: + """Try to read and parse a snapshot file. Returns None on failure.""" + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + try: + content = self.file_io.read_file_utf8(snapshot_path) + return JSON.from_json(content, Snapshot) + except Exception: + return None + + def _verify_snapshot(self, snapshot_id: int, report: RepairReport, check_data_files: bool): + """Verify a single snapshot and its downstream manifest chain.""" + report.snapshots_checked += 1 + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + + snapshot = self._try_read_snapshot(snapshot_id) + if snapshot is None: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"Snapshot file is unreadable or corrupted (snapshot-{snapshot_id})", + path=snapshot_path, + )) + return + + # Verify base manifest list + self._verify_manifest_list( + snapshot.base_manifest_list, f"snapshot-{snapshot_id}/baseManifestList", + report, check_data_files + ) + + # Verify delta manifest list + self._verify_manifest_list( + snapshot.delta_manifest_list, f"snapshot-{snapshot_id}/deltaManifestList", + report, check_data_files + ) + + # Verify changelog manifest list (optional) + if snapshot.changelog_manifest_list: + self._verify_manifest_list( + snapshot.changelog_manifest_list, + f"snapshot-{snapshot_id}/changelogManifestList", + report, check_data_files + ) + + def _verify_manifest_list(self, manifest_list_name: str, context: str, + report: RepairReport, check_data_files: bool): + """Verify a manifest list file exists and is readable.""" + if not manifest_list_name: + return + + report.manifest_lists_checked += 1 + manifest_list_path = f"{self.manifest_dir}/{manifest_list_name}" + + if not self.file_io.exists(manifest_list_path): + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file missing (referenced by {context})", + path=manifest_list_path, + )) + return + + # Try to read it + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_list_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + manifest_file_names = [] + for record in reader: + manifest_file_names.append(record['_FILE_NAME']) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file is corrupted: {e} (referenced by {context})", + path=manifest_list_path, + )) + return + + # Verify each manifest file referenced in this list + for mf_name in manifest_file_names: + self._verify_manifest_file(mf_name, manifest_list_name, report, check_data_files) + + def _verify_manifest_file(self, manifest_file_name: str, parent_list: str, + report: RepairReport, check_data_files: bool): + """Verify a manifest file exists and is readable.""" + report.manifest_files_checked += 1 + manifest_path = f"{self.manifest_dir}/{manifest_file_name}" + + if not self.file_io.exists(manifest_path): + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file missing (referenced by {parent_list})", + path=manifest_path, + )) + return + + # Always validate the manifest is readable Avro + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + records = list(reader) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file is corrupted: {e}", + path=manifest_path, + )) + return + + if check_data_files: + from pypaimon.table.row.generic_row import GenericRowDeserializer + from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only + + for record in records: + if record.get('_KIND', 0) != 0: + continue + file_dict = record.get('_FILE') + if not file_dict: + continue + file_name = file_dict.get('_FILE_NAME') + if not file_name: + continue + report.data_files_checked += 1 + + external_path = file_dict.get('_EXTERNAL_PATH') + if external_path: + data_file_path = external_path + else: + # Build path: table_path//bucket-/ + path_builder = self.table_path + if self.partition_keys_fields: + try: + partition = GenericRowDeserializer.from_bytes( + record['_PARTITION'], self.partition_keys_fields) + partition_dict = partition.to_dict() + for field_name, field_value in partition_dict.items(): + part_value = (self.default_part_value + if _is_null_or_whitespace_only(field_value) + else str(field_value)) + path_builder = f"{path_builder}/{field_name}={part_value}" + except Exception: + pass + bucket = record.get('_BUCKET', 0) + data_file_path = f"{path_builder}/bucket-{bucket}/{file_name}" + + if not self.file_io.exists(data_file_path): + report.issues.append(RepairIssue( + level="error", + category="data_file", + message=f"Data file missing (referenced by {manifest_file_name})", + path=data_file_path, + )) + + +def repair_table(file_io: FileIO, table_path: str, branch: Optional[str] = None, + check_data_files: bool = False, dry_run: bool = True) -> RepairReport: + """ + Repair a single Paimon table. + + This is the main entry point for repairing a table's metadata chain. + It verifies the consistency of: + snapshot -> manifest list -> manifest files -> data files + + Args: + file_io: FileIO instance for accessing the table's storage. + table_path: Root path of the table. + branch: Branch name (None for main branch). + check_data_files: Whether to verify data file existence (slow). + dry_run: If True, only report issues without fixing. + + Returns: + RepairReport with all findings and any fixes applied. + """ + repairer = TableRepair(file_io, table_path, branch) + return repairer.repair(check_data_files=check_data_files, dry_run=dry_run) + + +def repair_database(file_io: FileIO, warehouse: str, database_name: str, + check_data_files: bool = False, dry_run: bool = True) -> List[RepairReport]: + """ + Repair all tables in a database. + + Args: + file_io: FileIO instance for accessing storage. + warehouse: Warehouse root path. + database_name: Database name to repair. + check_data_files: Whether to verify data file existence. + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + import pyarrow.fs as pafs + + db_path = f"{warehouse.rstrip('/')}/{database_name}.db" + if not file_io.exists(db_path): + raise ValueError(f"Database directory does not exist: {db_path}") + + reports = [] + statuses = file_io.list_status(db_path) + for status in statuses: + is_directory = hasattr(status, 'type') and status.type == pafs.FileType.Directory + name = status.base_name if hasattr(status, 'base_name') else "" + if is_directory and name and not name.startswith("."): + table_path = f"{db_path}/{name}" + report = repair_table(file_io, table_path, check_data_files=check_data_files, + dry_run=dry_run) + reports.append(report) + + return reports + + +def repair_catalog(file_io: FileIO, warehouse: str, + check_data_files: bool = False, dry_run: bool = True) -> List[RepairReport]: + """ + Repair all tables in all databases in the catalog. + + Args: + file_io: FileIO instance for accessing storage. + warehouse: Warehouse root path. + check_data_files: Whether to verify data file existence. + dry_run: If True, only report issues without fixing. + + Returns: + List of RepairReport, one per table. + """ + import pyarrow.fs as pafs + + warehouse = warehouse.rstrip('/') + reports = [] + + statuses = file_io.list_status(warehouse) + for status in statuses: + is_directory = hasattr(status, 'type') and status.type == pafs.FileType.Directory + name = status.base_name if hasattr(status, 'base_name') else "" + if is_directory and name and name.endswith(".db"): + database_name = name[:-3] + try: + db_reports = repair_database( + file_io, warehouse, database_name, + check_data_files=check_data_files, dry_run=dry_run + ) + reports.extend(db_reports) + except Exception as e: + logger.warning(f"Failed to repair database '{database_name}': {e}") + + return reports diff --git a/paimon-python/pypaimon/tests/repair_test.py b/paimon-python/pypaimon/tests/repair_test.py new file mode 100644 index 000000000000..48a944ef4f4f --- /dev/null +++ b/paimon-python/pypaimon/tests/repair_test.py @@ -0,0 +1,807 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for the repair operation.""" + +import json +import os +import shutil +import tempfile +import unittest + +from pypaimon import CatalogFactory, Schema +from pypaimon.operation.repair import ( # noqa: F401 + RepairIssue, + RepairReport, + TableRepair, + repair_table, + repair_database, + repair_catalog, +) +from pypaimon.schema.data_types import AtomicType, DataField + + +class TestRepairReport(unittest.TestCase): + """Tests for RepairReport data class.""" + + def test_empty_report_is_healthy(self): + report = RepairReport(table_path="/test/path") + self.assertTrue(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("HEALTHY", report.summary()) + + def test_report_with_warning(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="warning", category="snapshot", message="Something minor" + )) + self.assertFalse(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("WARN", report.summary()) + + def test_report_with_error(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="error", category="manifest_list", message="File missing", + path="/some/path" + )) + self.assertFalse(report.is_healthy) + self.assertTrue(report.has_errors) + self.assertIn("ERROR", report.summary()) + self.assertIn("/some/path", report.summary()) + + def test_report_summary_counts(self): + report = RepairReport(table_path="/test/path") + report.snapshots_checked = 3 + report.manifest_lists_checked = 6 + report.manifest_files_checked = 12 + report.data_files_checked = 0 + summary = report.summary() + self.assertIn("Snapshots checked: 3", summary) + self.assertIn("Manifest lists checked: 6", summary) + self.assertIn("Manifest files checked: 12", summary) + + def test_report_with_fixes(self): + report = RepairReport(table_path="/test/path") + report.fixes_applied.append("Updated LATEST file") + summary = report.summary() + self.assertIn("Fixes applied: 1", summary) + self.assertIn("Updated LATEST file", summary) + + +class TestTableRepairWithFilesystem(unittest.TestCase): + """Integration tests for TableRepair with actual filesystem.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="repair_test_") + self.warehouse = os.path.join(self.temp_dir, "warehouse") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_catalog_and_table(self): + """Helper to create a catalog with a test table containing data.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + ], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.test_table", schema, False) + return catalog + + def test_repair_empty_table_no_snapshots(self): + """A newly created table with no data should report 'no snapshots'.""" + catalog = self._create_catalog_and_table() + report = catalog.repair_table("test_db.test_table") + # Newly created tables have no snapshot dir content + self.assertFalse(report.has_errors) + + def test_repair_healthy_table_with_snapshot(self): + """A table with a valid snapshot should be reported as healthy.""" + catalog = self._create_catalog_and_table() + catalog.get_table("test_db.test_table") # noqa: F841 + + # Create a minimal snapshot manually + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Create empty manifest list files + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create a snapshot + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + + # Write LATEST + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.test_table") + self.assertEqual(report.snapshots_checked, 1) + self.assertEqual(report.manifest_lists_checked, 2) + self.assertTrue(report.is_healthy) + + def test_repair_detects_missing_manifest_list(self): + """Should detect when a manifest list referenced by snapshot is missing.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Create only delta manifest list, not base + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.test_table") + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("manifest-list-base-1" in m or "missing" in m.lower() + for m in error_messages)) + + def test_repair_detects_dangling_latest(self): + """Should detect when LATEST points to a non-existent snapshot.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + # LATEST points to snapshot-5, but only snapshot-1 exists + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("5") + + report = catalog.repair_table("test_db.test_table") + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("snapshot-5" in m for m in error_messages)) + + def test_repair_fix_dangling_latest(self): + """Should fix LATEST to point to the newest valid snapshot.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Repair with fix=True + report = catalog.repair_table("test_db.test_table", dry_run=False) + self.assertTrue(len(report.fixes_applied) > 0) + self.assertIn("snapshot-1", report.fixes_applied[0]) + + # Verify LATEST is now correct + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "1") + + def test_repair_corrupted_snapshot_file(self): + """Should detect unreadable/corrupted snapshot files.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + # Write garbage to the snapshot file + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + f.write("this is not valid json{{{") + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.test_table") + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("corrupted" in m.lower() or "unreadable" in m.lower() + for m in error_messages)) + + def test_repair_database_level(self): + """Should repair all tables in a database.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("mydb", False) + + schema = Schema( + fields=[DataField(0, "id", AtomicType("INT"))], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("mydb.t1", schema, False) + catalog.create_table("mydb.t2", schema, False) + + reports = catalog.repair_database("mydb") + self.assertEqual(len(reports), 2) + + def test_repair_catalog_level(self): + """Should repair all tables in all databases.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("db1", False) + catalog.create_database("db2", False) + + schema = Schema( + fields=[DataField(0, "id", AtomicType("INT"))], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("db1.t1", schema, False) + catalog.create_table("db2.t2", schema, False) + + reports = catalog.repair_catalog() + self.assertEqual(len(reports), 2) + + def test_dry_run_does_not_modify(self): + """Dry run should not change any files.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Dry run - should not fix + report = catalog.repair_table("test_db.test_table", dry_run=True) + self.assertEqual(len(report.fixes_applied), 0) + + # Verify LATEST was NOT changed + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "99") + + def _write_empty_avro(self, path: str): + """Write an empty Avro file (valid manifest list with no records).""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, []) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_list_with_entry(self, path: str, manifest_file_name: str): + """Write a manifest list Avro referencing a single manifest file.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + record = { + "_VERSION": 1, + "_FILE_NAME": manifest_file_name, + "_FILE_SIZE": 100, + "_NUM_ADDED_FILES": 1, + "_NUM_DELETED_FILES": 0, + "_PARTITION_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_SCHEMA_ID": 0, + "_MIN_ROW_ID": None, + "_MAX_ROW_ID": None, + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_with_data_file(self, path: str, partition_bytes: bytes, + bucket: int, file_name: str, + external_path=None, kind=0): + """Write a manifest Avro with a single data file entry.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_entry import MANIFEST_ENTRY_SCHEMA + + record = { + "_VERSION": 1, + "_KIND": kind, + "_PARTITION": partition_bytes, + "_BUCKET": bucket, + "_TOTAL_BUCKETS": 1, + "_FILE": { + "_FILE_NAME": file_name, + "_FILE_SIZE": 1024, + "_ROW_COUNT": 10, + "_MIN_KEY": b"", + "_MAX_KEY": b"", + "_KEY_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_VALUE_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_MIN_SEQUENCE_NUMBER": 0, + "_MAX_SEQUENCE_NUMBER": 0, + "_SCHEMA_ID": 0, + "_LEVEL": 0, + "_EXTRA_FILES": [], + "_CREATION_TIME": None, + "_DELETE_ROW_COUNT": None, + "_EMBEDDED_FILE_INDEX": None, + "_FILE_SOURCE": None, + "_VALUE_STATS_COLS": None, + "_EXTERNAL_PATH": external_path, + "_FIRST_ROW_ID": None, + "_WRITE_COLS": None, + } + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _serialize_partition(self, values, fields): + """Serialize partition values using GenericRowSerializer.""" + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + row = GenericRow(values, fields) + return GenericRowSerializer.to_bytes(row) + + def test_check_data_files_detects_missing(self): + """check_data_files=True should detect missing data files.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Non-partitioned table: partition is an empty row (0 fields) + partition_bytes = self._serialize_partition([], []) + + # Write manifest file referencing a data file that doesn't exist + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + + # Write manifest list referencing the manifest + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create snapshot + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + # Without check_data_files: should be healthy + report = catalog.repair_table("test_db.test_table", check_data_files=False) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + # With check_data_files: should find the missing data file + report = catalog.repair_table("test_db.test_table", check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertTrue(report.has_errors) + data_file_issues = [i for i in report.issues if i.category == "data_file"] + self.assertEqual(len(data_file_issues), 1) + self.assertIn("data-abc.orc", data_file_issues[0].path) + + def test_check_data_files_skips_delete_entries(self): + """DELETE entries for removed files should not be flagged as missing.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + # Write manifest with a DELETE entry (_KIND=1) for a file that doesn't exist + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="deleted-file.orc", + kind=1 # DELETE + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.test_table", check_data_files=True) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_passes_when_file_exists(self): + """check_data_files=True should pass when data files exist.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create the data file at the expected location + data_file_dir = os.path.join(table_path, "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-abc.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.test_table", check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_with_partition(self): + """check_data_files should construct correct path for partitioned tables.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "dt", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["dt"], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.part_table", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "part_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Serialize partition with dt="2024-01-01" + dt_field = DataField(1, "dt", AtomicType("STRING")) + partition_bytes = self._serialize_partition(["2024-01-01"], [dt_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-part.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at partitioned path + data_file_dir = os.path.join(table_path, "dt=2024-01-01", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-part.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.part_table", check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_custom_partition_default_name(self): + """Tables with custom partition.default-name use that value for null partitions.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "region", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["region"], + primary_keys=["id"], + options={"partition.default-name": "UNSET"}, + comment="" + ) + catalog.create_table("test_db.custom_part", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "custom_part") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Serialize partition with region=None (null) + region_field = DataField(1, "region", AtomicType("STRING")) + partition_bytes = self._serialize_partition([None], [region_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-null.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at custom default partition path + data_file_dir = os.path.join(table_path, "region=UNSET", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-null.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + report = catalog.repair_table("test_db.custom_part", check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_repair_branch_table(self): + """Repairing a branch-qualified identifier checks the branch snapshot dir.""" + catalog = self._create_catalog_and_table() + catalog.get_table("test_db.test_table") # noqa: F841 + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + + # Create main branch snapshot (healthy) + main_snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(main_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-main")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-main")) + snapshot_main = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-main", + "deltaManifestList": "manifest-list-delta-main", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(main_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_main, f) + with open(os.path.join(main_snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + # Create branch "b1" with a dangling LATEST + branch_snapshot_dir = os.path.join(table_path, "branch", "branch-b1", "snapshot") + os.makedirs(branch_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-b1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-b1")) + snapshot_b1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-b1", + "deltaManifestList": "manifest-list-delta-b1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(branch_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_b1, f) + # LATEST points to non-existent snapshot-99 + with open(os.path.join(branch_snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Repair main branch - should be healthy + report_main = catalog.repair_table("test_db.test_table") + self.assertFalse(report_main.has_errors) + + # Repair branch b1 - should detect dangling LATEST + report_branch = catalog.repair_table("test_db.test_table$branch_b1") + self.assertTrue(report_branch.has_errors) + error_messages = [i.message for i in report_branch.issues if i.level == "error"] + self.assertTrue(any("snapshot-99" in m for m in error_messages)) + + def test_fix_latest_respects_check_data_files(self): + """_fix_latest_file should not select a snapshot with missing data files + when check_data_files=True.""" + catalog = self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + # Snapshot 1: healthy (empty manifests) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + snapshot1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot1, f) + + # Snapshot 2: references a missing data file + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-2"), + partition_bytes, bucket=0, file_name="missing-file.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-2"), "manifest-2" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-2")) + snapshot2 = { + "version": 3, "id": 2, "schemaId": 0, + "baseManifestList": "manifest-list-base-2", + "deltaManifestList": "manifest-list-delta-2", + "totalRecordCount": 10, "deltaRecordCount": 10, + "commitUser": "test", "commitIdentifier": 2, + "commitKind": "APPEND", "timeMillis": 2000000 + } + with open(os.path.join(snapshot_dir, "snapshot-2"), 'w') as f: + json.dump(snapshot2, f) + + # LATEST points to non-existent snapshot-99 + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Repair with check_data_files=True: should skip snapshot-2 and pick snapshot-1 + report = catalog.repair_table("test_db.test_table", + check_data_files=True, dry_run=False) + self.assertTrue(any("snapshot-1" in fix for fix in report.fixes_applied)) + + with open(os.path.join(snapshot_dir, "LATEST"), 'r') as f: + self.assertEqual(f.read().strip(), "1") + + +if __name__ == '__main__': + unittest.main()