diff --git a/paimon-python/pypaimon/operation/__init__.py b/paimon-python/pypaimon/operation/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/operation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/operation/repair.py b/paimon-python/pypaimon/operation/repair.py new file mode 100644 index 000000000000..5bd044e19030 --- /dev/null +++ b/paimon-python/pypaimon/operation/repair.py @@ -0,0 +1,396 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Repair operation for Paimon tables. + +This module verifies the metadata consistency chain: + snapshot -> manifest list -> manifest files -> data files + +For filesystem catalogs (no external metastore), verification focuses on: + - Verifying snapshot files are readable and valid + - Verifying manifest list files referenced by snapshots exist and are readable + - Verifying manifest files referenced by manifest lists exist and are readable + - Verifying data files referenced by manifest entries exist + - Detecting dangling snapshot references (LATEST pointing to missing snapshot) +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import List, Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.common.json_util import JSON +from pypaimon.snapshot.snapshot import Snapshot + +logger = logging.getLogger(__name__) + + +@dataclass +class RepairIssue: + """Represents a single issue found during repair verification.""" + level: str # "error" or "warning" + category: str # e.g. "snapshot", "manifest_list", "manifest", "data_file" + message: str + path: Optional[str] = None + + +@dataclass +class RepairReport: + """Report generated by a repair operation.""" + table_path: str + issues: List[RepairIssue] = field(default_factory=list) + snapshots_checked: int = 0 + manifest_lists_checked: int = 0 + manifest_files_checked: int = 0 + data_files_checked: int = 0 + fixes_applied: List[str] = field(default_factory=list) + + @property + def has_errors(self) -> bool: + return any(issue.level == "error" for issue in self.issues) + + @property + def is_healthy(self) -> bool: + return len(self.issues) == 0 + + def summary(self) -> str: + lines = [f"Repair report for: {self.table_path}"] + lines.append(f" Snapshots checked: {self.snapshots_checked}") + lines.append(f" Manifest lists checked: {self.manifest_lists_checked}") + lines.append(f" Manifest files checked: {self.manifest_files_checked}") + lines.append(f" Data files checked: {self.data_files_checked}") + + if self.is_healthy: + lines.append(" Status: HEALTHY - no issues found") + else: + errors = [i for i in self.issues if i.level == "error"] + warnings = [i for i in self.issues if i.level == "warning"] + lines.append(f" Issues: {len(errors)} error(s), {len(warnings)} warning(s)") + for issue in self.issues: + prefix = "ERROR" if issue.level == "error" else "WARN" + path_str = f" [{issue.path}]" if issue.path else "" + lines.append(f" [{prefix}] ({issue.category}) {issue.message}{path_str}") + + if self.fixes_applied: + lines.append(f" Fixes applied: {len(self.fixes_applied)}") + for fix in self.fixes_applied: + lines.append(f" - {fix}") + + return "\n".join(lines) + + +class TableRepair: + """ + Verifies the metadata chain of a Paimon table. + + The metadata chain is: + LATEST file -> snapshot -> manifest list files -> manifest files -> data files + + This class checks each link in the chain and reports issues. + """ + + def __init__(self, file_io: FileIO, table_path: str, branch: Optional[str] = None): + from pypaimon.branch.branch_manager import BranchManager + from pypaimon.schema.schema_manager import SchemaManager + + self.file_io = file_io + self.table_path = table_path.rstrip('/') + self.branch = BranchManager.normalize_branch(branch) + + branch_root = BranchManager.branch_path(self.table_path, self.branch) + self.snapshot_dir = f"{branch_root}/snapshot" + self.manifest_dir = f"{self.table_path}/manifest" + self.latest_file = f"{self.snapshot_dir}/LATEST" + + schema_manager = SchemaManager(file_io, self.table_path, self.branch) + table_schema = schema_manager.latest() + if table_schema is not None: + field_dict = {f.name: f for f in table_schema.fields} + self.partition_keys_fields = [field_dict[name] for name in table_schema.partition_keys] + self.default_part_value = table_schema.options.get( + "partition.default-name", "__DEFAULT_PARTITION__") + else: + self.partition_keys_fields = [] + self.default_part_value = "__DEFAULT_PARTITION__" + + def verify(self, check_data_files: bool = False) -> RepairReport: + """ + Verify the metadata consistency chain. + + Args: + check_data_files: If True, also verify that data files referenced + in manifests actually exist on disk. Time complexity + is O(total_data_files) — proportional to the number + of data file entries across all manifests. + + Returns: + RepairReport with all issues found. + """ + if check_data_files: + logger.info("Verifying data files for table %s (this may be slow for large tables)", + self.table_path) + report = RepairReport(table_path=self.table_path) + + # Step 1: Check snapshot directory exists + if not self.file_io.exists(self.snapshot_dir): + return report + + # Step 2: Find all snapshot files and the LATEST hint + snapshot_ids = self._list_snapshot_ids() + latest_id = self._read_latest_id() + + if not snapshot_ids and latest_id is None: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="No snapshots found - table may be empty or newly created", + )) + return report + + # Step 3: Check LATEST consistency + if latest_id is not None and latest_id not in snapshot_ids: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"LATEST file points to snapshot-{latest_id} which does not exist", + path=self.latest_file, + )) + + if latest_id is None and snapshot_ids: + report.issues.append(RepairIssue( + level="warning", + category="snapshot", + message="LATEST file is missing but snapshot files exist", + path=self.latest_file, + )) + + # Step 4: Verify each snapshot and its manifest chain + checked_files = set() + for sid in sorted(snapshot_ids): + self._verify_snapshot(sid, report, check_data_files, checked_files) + + return report + + def _list_snapshot_ids(self) -> List[int]: + """List all snapshot IDs found in the snapshot directory.""" + try: + statuses = self.file_io.list_status(self.snapshot_dir) + except Exception: + return [] + + snapshot_pattern = re.compile(r'^snapshot-(\d+)$') + ids = [] + for status in statuses: + name = status.base_name + match = snapshot_pattern.match(name) + if match: + ids.append(int(match.group(1))) + return sorted(ids) + + def _read_latest_id(self) -> Optional[int]: + """Read the LATEST hint file and return the snapshot ID, or None.""" + try: + if not self.file_io.exists(self.latest_file): + return None + content = self.file_io.read_file_utf8(self.latest_file).strip() + if not content: + return None + return int(content) + except (ValueError, Exception): + return None + + def _try_read_snapshot(self, snapshot_id: int) -> Optional[Snapshot]: + """Try to read and parse a snapshot file. Returns None on failure.""" + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + try: + content = self.file_io.read_file_utf8(snapshot_path) + return JSON.from_json(content, Snapshot) + except Exception: + return None + + def _verify_snapshot(self, snapshot_id: int, report: RepairReport, + check_data_files: bool, checked_files: set): + """Verify a single snapshot and its downstream manifest chain.""" + report.snapshots_checked += 1 + snapshot_path = f"{self.snapshot_dir}/snapshot-{snapshot_id}" + + snapshot = self._try_read_snapshot(snapshot_id) + if snapshot is None: + report.issues.append(RepairIssue( + level="error", + category="snapshot", + message=f"Snapshot file is unreadable or corrupted (snapshot-{snapshot_id})", + path=snapshot_path, + )) + return + + # Verify base manifest list + self._verify_manifest_list( + snapshot.base_manifest_list, f"snapshot-{snapshot_id}/baseManifestList", + report, check_data_files, checked_files + ) + + # Verify delta manifest list + self._verify_manifest_list( + snapshot.delta_manifest_list, f"snapshot-{snapshot_id}/deltaManifestList", + report, check_data_files, checked_files + ) + + # Verify changelog manifest list (optional) + if snapshot.changelog_manifest_list: + self._verify_manifest_list( + snapshot.changelog_manifest_list, + f"snapshot-{snapshot_id}/changelogManifestList", + report, check_data_files, checked_files + ) + + def _verify_manifest_list(self, manifest_list_name: str, context: str, + report: RepairReport, check_data_files: bool, + checked_files: set): + """Verify a manifest list file exists and is readable.""" + if not manifest_list_name: + return + + report.manifest_lists_checked += 1 + manifest_list_path = f"{self.manifest_dir}/{manifest_list_name}" + + if not self.file_io.exists(manifest_list_path): + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file missing (referenced by {context})", + path=manifest_list_path, + )) + return + + # Try to read it + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_list_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + manifest_file_names = [] + for record in reader: + manifest_file_names.append(record['_FILE_NAME']) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest_list", + message=f"Manifest list file is corrupted: {e} (referenced by {context})", + path=manifest_list_path, + )) + return + + # Verify each manifest file referenced in this list + for mf_name in manifest_file_names: + self._verify_manifest_file(mf_name, manifest_list_name, report, + check_data_files, checked_files) + + def _verify_manifest_file(self, manifest_file_name: str, parent_list: str, + report: RepairReport, check_data_files: bool, + checked_files: set): + """Verify a manifest file exists and is readable.""" + report.manifest_files_checked += 1 + manifest_path = f"{self.manifest_dir}/{manifest_file_name}" + + if not self.file_io.exists(manifest_path): + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file missing (referenced by {parent_list})", + path=manifest_path, + )) + return + + # Always validate the manifest is readable Avro + try: + from io import BytesIO + import fastavro + with self.file_io.new_input_stream(manifest_path) as input_stream: + avro_bytes = input_stream.read() + buffer = BytesIO(avro_bytes) + reader = fastavro.reader(buffer) + records = list(reader) + except Exception as e: + report.issues.append(RepairIssue( + level="error", + category="manifest", + message=f"Manifest file is corrupted: {e}", + path=manifest_path, + )) + return + + if check_data_files: + from pypaimon.table.row.generic_row import GenericRowDeserializer + from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only + + for record in records: + if record.get('_KIND', 0) != 0: + continue + file_dict = record.get('_FILE') + if not file_dict: + continue + file_name = file_dict.get('_FILE_NAME') + if not file_name: + continue + + external_path = file_dict.get('_EXTERNAL_PATH') + if external_path: + data_file_path = external_path + else: + # Build path: table_path//bucket-/ + path_builder = self.table_path + if self.partition_keys_fields: + try: + partition = GenericRowDeserializer.from_bytes( + record['_PARTITION'], self.partition_keys_fields) + partition_dict = partition.to_dict() + for field_name, field_value in partition_dict.items(): + part_value = (self.default_part_value + if _is_null_or_whitespace_only(field_value) + else str(field_value)) + path_builder = f"{path_builder}/{field_name}={part_value}" + except Exception as e: + report.issues.append(RepairIssue( + level="warning", + category="partition", + message=(f"Failed to deserialize partition for {file_name}: {e}. " + "Data file path may be incorrect."), + path=manifest_path, + )) + bucket = record.get('_BUCKET', 0) + data_file_path = f"{path_builder}/bucket-{bucket}/{file_name}" + + if data_file_path in checked_files: + continue + checked_files.add(data_file_path) + + report.data_files_checked += 1 + if report.data_files_checked % 1000 == 0: + logger.info("Checked %d data files so far...", report.data_files_checked) + + if not self.file_io.exists(data_file_path): + report.issues.append(RepairIssue( + level="error", + category="data_file", + message=f"Data file missing (referenced by {manifest_file_name})", + path=data_file_path, + )) diff --git a/paimon-python/pypaimon/tests/repair_test.py b/paimon-python/pypaimon/tests/repair_test.py new file mode 100644 index 000000000000..e1e9932b4ee9 --- /dev/null +++ b/paimon-python/pypaimon/tests/repair_test.py @@ -0,0 +1,716 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for the repair verification operation.""" + +import json +import os +import shutil +import tempfile +import unittest + +from pypaimon import CatalogFactory, Schema +from pypaimon.operation.repair import ( + RepairIssue, + RepairReport, + TableRepair, +) +from pypaimon.schema.data_types import AtomicType, DataField + + +class TestRepairReport(unittest.TestCase): + """Tests for RepairReport data class.""" + + def test_empty_report_is_healthy(self): + report = RepairReport(table_path="/test/path") + self.assertTrue(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("HEALTHY", report.summary()) + + def test_report_with_warning(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="warning", category="snapshot", message="Something minor" + )) + self.assertFalse(report.is_healthy) + self.assertFalse(report.has_errors) + self.assertIn("WARN", report.summary()) + + def test_report_with_error(self): + report = RepairReport(table_path="/test/path") + report.issues.append(RepairIssue( + level="error", category="manifest_list", message="File missing", + path="/some/path" + )) + self.assertFalse(report.is_healthy) + self.assertTrue(report.has_errors) + self.assertIn("ERROR", report.summary()) + self.assertIn("/some/path", report.summary()) + + def test_report_summary_counts(self): + report = RepairReport(table_path="/test/path") + report.snapshots_checked = 3 + report.manifest_lists_checked = 6 + report.manifest_files_checked = 12 + report.data_files_checked = 0 + summary = report.summary() + self.assertIn("Snapshots checked: 3", summary) + self.assertIn("Manifest lists checked: 6", summary) + self.assertIn("Manifest files checked: 12", summary) + + def test_report_with_fixes(self): + report = RepairReport(table_path="/test/path") + report.fixes_applied.append("Updated LATEST file") + summary = report.summary() + self.assertIn("Fixes applied: 1", summary) + self.assertIn("Updated LATEST file", summary) + + +class TestTableRepairVerify(unittest.TestCase): + """Integration tests for TableRepair.verify() with actual filesystem.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="repair_test_") + self.warehouse = os.path.join(self.temp_dir, "warehouse") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_catalog_and_table(self): + """Helper to create a catalog with a test table containing data.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + ], + partition_keys=[], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.test_table", schema, False) + return catalog + + def _get_table_repair(self, table_path, branch=None): + """Create a TableRepair instance using PyArrow filesystem.""" + from pypaimon.common.file_io import FileIO + file_io = FileIO.get(table_path) + return TableRepair(file_io, table_path, branch=branch) + + def test_verify_empty_table_no_snapshots(self): + """A newly created table with no data should report no issues.""" + self._create_catalog_and_table() + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertFalse(report.has_errors) + + def test_verify_healthy_table_with_snapshot(self): + """A table with a valid snapshot should be reported as healthy.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertEqual(report.snapshots_checked, 1) + self.assertEqual(report.manifest_lists_checked, 2) + self.assertTrue(report.is_healthy) + + def test_verify_detects_missing_manifest_list(self): + """Should detect when a manifest list referenced by snapshot is missing.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + # Create only delta manifest list, not base + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("manifest-list-base-1" in m or "missing" in m.lower() + for m in error_messages)) + + def test_verify_detects_dangling_latest(self): + """Should detect when LATEST points to a non-existent snapshot.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, + "deltaRecordCount": 0, + "commitUser": "test", + "commitIdentifier": 1, + "commitKind": "APPEND", + "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + # LATEST points to snapshot-5, but only snapshot-1 exists + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("5") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("snapshot-5" in m for m in error_messages)) + + def test_verify_corrupted_snapshot_file(self): + """Should detect unreadable/corrupted snapshot files.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(snapshot_dir, exist_ok=True) + + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + f.write("this is not valid json{{{") + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify() + self.assertTrue(report.has_errors) + error_messages = [i.message for i in report.issues if i.level == "error"] + self.assertTrue(any("corrupted" in m.lower() or "unreadable" in m.lower() + for m in error_messages)) + + def test_check_data_files_detects_missing(self): + """check_data_files=True should detect missing data files.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + + # Without check_data_files: should be healthy + report = repairer.verify(check_data_files=False) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + # With check_data_files: should find the missing data file + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertTrue(report.has_errors) + data_file_issues = [i for i in report.issues if i.category == "data_file"] + self.assertEqual(len(data_file_issues), 1) + self.assertIn("data-abc.orc", data_file_issues[0].path) + + def test_check_data_files_skips_delete_entries(self): + """DELETE entries for removed files should not be flagged as missing.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="deleted-file.orc", + kind=1 # DELETE + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertEqual(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_passes_when_file_exists(self): + """check_data_files=True should pass when data files exist.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-abc.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create the data file at the expected location + data_file_dir = os.path.join(table_path, "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-abc.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_with_partition(self): + """check_data_files should construct correct path for partitioned tables.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "dt", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["dt"], + primary_keys=["id"], + options={}, + comment="" + ) + catalog.create_table("test_db.part_table", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "part_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + dt_field = DataField(1, "dt", AtomicType("STRING")) + partition_bytes = self._serialize_partition(["2024-01-01"], [dt_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-part.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at partitioned path + data_file_dir = os.path.join(table_path, "dt=2024-01-01", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-part.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_check_data_files_custom_partition_default_name(self): + """Tables with custom partition.default-name use that value for null partitions.""" + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db", False) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "region", AtomicType("STRING")), + DataField(2, "name", AtomicType("STRING")), + ], + partition_keys=["region"], + primary_keys=["id"], + options={"partition.default-name": "UNSET"}, + comment="" + ) + catalog.create_table("test_db.custom_part", schema, False) + + table_path = os.path.join(self.warehouse, "test_db.db", "custom_part") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + region_field = DataField(1, "region", AtomicType("STRING")) + partition_bytes = self._serialize_partition([None], [region_field]) + + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-1"), + partition_bytes, bucket=0, file_name="data-null.orc" + ) + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-1" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + + # Create data file at custom default partition path + data_file_dir = os.path.join(table_path, "region=UNSET", "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-null.orc"), 'wb') as f: + f.write(b"fake data") + + snapshot_data = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_data, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertGreater(report.data_files_checked, 0) + self.assertFalse(report.has_errors) + + def test_verify_branch_table(self): + """Verifying a branch-qualified table checks the branch snapshot dir.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(manifest_dir, exist_ok=True) + + # Create main branch snapshot (healthy) + main_snapshot_dir = os.path.join(table_path, "snapshot") + os.makedirs(main_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-main")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-main")) + snapshot_main = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-main", + "deltaManifestList": "manifest-list-delta-main", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(main_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_main, f) + with open(os.path.join(main_snapshot_dir, "LATEST"), 'w') as f: + f.write("1") + + # Create branch "b1" with a dangling LATEST + branch_snapshot_dir = os.path.join(table_path, "branch", "branch-b1", "snapshot") + os.makedirs(branch_snapshot_dir, exist_ok=True) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-base-b1")) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-b1")) + snapshot_b1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-b1", + "deltaManifestList": "manifest-list-delta-b1", + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(branch_snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_b1, f) + with open(os.path.join(branch_snapshot_dir, "LATEST"), 'w') as f: + f.write("99") + + # Verify main branch - should be healthy + repairer_main = self._get_table_repair(table_path) + report_main = repairer_main.verify() + self.assertFalse(report_main.has_errors) + + # Verify branch b1 - should detect dangling LATEST + repairer_branch = self._get_table_repair(table_path, branch="b1") + report_branch = repairer_branch.verify() + self.assertTrue(report_branch.has_errors) + error_messages = [i.message for i in report_branch.issues if i.level == "error"] + self.assertTrue(any("snapshot-99" in m for m in error_messages)) + + def test_check_data_files_shared_manifests_no_double_count(self): + """Multiple snapshots sharing manifest files should not double-count data files.""" + self._create_catalog_and_table() + + table_path = os.path.join(self.warehouse, "test_db.db", "test_table") + snapshot_dir = os.path.join(table_path, "snapshot") + manifest_dir = os.path.join(table_path, "manifest") + os.makedirs(snapshot_dir, exist_ok=True) + os.makedirs(manifest_dir, exist_ok=True) + + partition_bytes = self._serialize_partition([], []) + + # Create one manifest file referenced by both snapshots + self._write_manifest_with_data_file( + os.path.join(manifest_dir, "manifest-shared"), + partition_bytes, bucket=0, file_name="data-shared.orc" + ) + + # Snapshot 1: base uses shared manifest + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-1"), "manifest-shared" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-1")) + snapshot_1 = { + "version": 3, "id": 1, "schemaId": 0, + "baseManifestList": "manifest-list-base-1", + "deltaManifestList": "manifest-list-delta-1", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 1, + "commitKind": "APPEND", "timeMillis": 1000000 + } + with open(os.path.join(snapshot_dir, "snapshot-1"), 'w') as f: + json.dump(snapshot_1, f) + + # Snapshot 2: base also uses the same shared manifest + self._write_manifest_list_with_entry( + os.path.join(manifest_dir, "manifest-list-base-2"), "manifest-shared" + ) + self._write_empty_avro(os.path.join(manifest_dir, "manifest-list-delta-2")) + snapshot_2 = { + "version": 3, "id": 2, "schemaId": 0, + "baseManifestList": "manifest-list-base-2", + "deltaManifestList": "manifest-list-delta-2", + "totalRecordCount": 10, "deltaRecordCount": 0, + "commitUser": "test", "commitIdentifier": 2, + "commitKind": "APPEND", "timeMillis": 2000000 + } + with open(os.path.join(snapshot_dir, "snapshot-2"), 'w') as f: + json.dump(snapshot_2, f) + with open(os.path.join(snapshot_dir, "LATEST"), 'w') as f: + f.write("2") + + # Create the data file + data_file_dir = os.path.join(table_path, "bucket-0") + os.makedirs(data_file_dir, exist_ok=True) + with open(os.path.join(data_file_dir, "data-shared.orc"), 'wb') as f: + f.write(b"fake data") + + repairer = self._get_table_repair(table_path) + report = repairer.verify(check_data_files=True) + self.assertFalse(report.has_errors) + self.assertEqual(report.snapshots_checked, 2) + # Data file should only be counted once despite being referenced by both snapshots + self.assertEqual(report.data_files_checked, 1) + + def _write_empty_avro(self, path: str): + """Write an empty Avro file (valid manifest list with no records).""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, []) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_list_with_entry(self, path: str, manifest_file_name: str): + """Write a manifest list Avro referencing a single manifest file.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA + + record = { + "_VERSION": 1, + "_FILE_NAME": manifest_file_name, + "_FILE_SIZE": 100, + "_NUM_ADDED_FILES": 1, + "_NUM_DELETED_FILES": 0, + "_PARTITION_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_SCHEMA_ID": 0, + "_MIN_ROW_ID": None, + "_MAX_ROW_ID": None, + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _write_manifest_with_data_file(self, path: str, partition_bytes: bytes, + bucket: int, file_name: str, + external_path=None, kind=0): + """Write a manifest Avro with a single data file entry.""" + import fastavro + from io import BytesIO + from pypaimon.manifest.schema.manifest_entry import MANIFEST_ENTRY_SCHEMA + + record = { + "_VERSION": 1, + "_KIND": kind, + "_PARTITION": partition_bytes, + "_BUCKET": bucket, + "_TOTAL_BUCKETS": 1, + "_FILE": { + "_FILE_NAME": file_name, + "_FILE_SIZE": 1024, + "_ROW_COUNT": 10, + "_MIN_KEY": b"", + "_MAX_KEY": b"", + "_KEY_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_VALUE_STATS": {"_MIN_VALUES": b"", "_MAX_VALUES": b"", "_NULL_COUNTS": None}, + "_MIN_SEQUENCE_NUMBER": 0, + "_MAX_SEQUENCE_NUMBER": 0, + "_SCHEMA_ID": 0, + "_LEVEL": 0, + "_EXTRA_FILES": [], + "_CREATION_TIME": None, + "_DELETE_ROW_COUNT": None, + "_EMBEDDED_FILE_INDEX": None, + "_FILE_SOURCE": None, + "_VALUE_STATS_COLS": None, + "_EXTERNAL_PATH": external_path, + "_FIRST_ROW_ID": None, + "_WRITE_COLS": None, + } + } + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, [record]) + with open(path, 'wb') as f: + f.write(buffer.getvalue()) + + def _serialize_partition(self, values, fields): + """Serialize partition values using GenericRowSerializer.""" + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + row = GenericRow(values, fields) + return GenericRowSerializer.to_bytes(row) + + +if __name__ == '__main__': + unittest.main()