diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 9280705366b5..c0c658124802 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -16,6 +16,7 @@ # under the License. import sys +import warnings from enum import Enum from typing import Dict, Optional @@ -56,6 +57,23 @@ class MergeEngine(str, Enum): FIRST_ROW = "first-row" +class StartupMode(str, Enum): + """ + Startup mode for scan operations. + """ + DEFAULT = "default" + LATEST_FULL = "latest-full" + FULL = "full" + LATEST = "latest" + COMPACTED_FULL = "compacted-full" + FROM_TIMESTAMP = "from-timestamp" + FROM_SNAPSHOT = "from-snapshot" + FROM_SNAPSHOT_FULL = "from-snapshot-full" + FROM_CREATION_TIMESTAMP = "from-creation-timestamp" + FROM_FILE_CREATION_TIME = "from-file-creation-time" + INCREMENTAL = "incremental" + + class CoreOptions: """Core options for Paimon tables.""" # File format constants @@ -240,6 +258,21 @@ class CoreOptions: .with_description("Specify the file name prefix of data files.") ) # Scan options + SCAN_MODE: ConfigOption[StartupMode] = ( + ConfigOptions.key("scan.mode") + .enum_type(StartupMode) + .default_value(StartupMode.DEFAULT) + .with_description( + "Scan startup mode for the table. " + "'default' resolves the actual mode from other scan options. " + "'latest-full' reads the latest snapshot then streams changes. " + "'latest' only streams changes without an initial snapshot. " + "'from-timestamp' reads from a specific timestamp. " + "'from-snapshot' reads from a specific snapshot. " + "'incremental' reads incremental changes between two snapshots/tags." + ) + ) + SCAN_FALLBACK_BRANCH: ConfigOption[str] = ( ConfigOptions.key("scan.fallback-branch") .string_type() @@ -301,6 +334,24 @@ class CoreOptions: ) ) + SCAN_FILE_CREATION_TIME_MILLIS: ConfigOption[int] = ( + ConfigOptions.key("scan.file-creation-time-millis") + .long_type() + .no_default_value() + .with_description( + "After configuring this time, only the data files created after this time will be read." + ) + ) + + SCAN_CREATION_TIME_MILLIS: ConfigOption[int] = ( + ConfigOptions.key("scan.creation-time-millis") + .long_type() + .no_default_value() + .with_description( + "Optional timestamp used in case of 'from-creation-timestamp' scan mode." + ) + ) + SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("source.split.target-size") .memory_type() @@ -655,6 +706,42 @@ def blob_target_file_size(self, default=None): def data_file_prefix(self, default=None): return self.options.get(CoreOptions.DATA_FILE_PREFIX, default) + def scan_mode(self, default=None): + return self.options.get(CoreOptions.SCAN_MODE, default) + + def startup_mode(self) -> 'StartupMode': + """Resolve the effective startup mode, matching Java CoreOptions.startupMode(). + + If scan.mode is DEFAULT, auto-detects from other scan options. + Maps deprecated FULL to LATEST_FULL. + """ + mode = self.scan_mode() + if mode == StartupMode.DEFAULT: + if (self.options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS) + or self.options.contains(CoreOptions.SCAN_TIMESTAMP)): + return StartupMode.FROM_TIMESTAMP + elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID) + or self.options.contains(CoreOptions.SCAN_TAG_NAME) + or self.options.contains(CoreOptions.SCAN_WATERMARK)): + return StartupMode.FROM_SNAPSHOT + elif self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP): + return StartupMode.INCREMENTAL + elif self.options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS): + return StartupMode.FROM_FILE_CREATION_TIME + elif self.options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS): + return StartupMode.FROM_CREATION_TIMESTAMP + else: + return StartupMode.LATEST_FULL + elif mode == StartupMode.FULL: + warnings.warn( + "scan.mode 'full' is deprecated, use 'latest-full' instead.", + DeprecationWarning, + stacklevel=2, + ) + return StartupMode.LATEST_FULL + else: + return mode + def scan_fallback_branch(self, default=None): return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 623261803503..03a1c8b06297 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -58,6 +58,8 @@ def _create_file_scanner(self) -> FileScanner: snapshot_manager = self.table.snapshot_manager() manifest_list_manager = ManifestListManager(self.table) + self._validate_scan_mode() + from pypaimon.snapshot.time_travel_util import TimeTravelUtil, SCAN_KEYS has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS) has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) @@ -158,3 +160,104 @@ def with_slice(self, start_pos, end_pos) -> 'TableScan': def with_global_index_result(self, result) -> 'TableScan': self.file_scanner.with_global_index_result(result) return self + + def _validate_scan_mode(self): + """Validate scan.mode against companion options using a whitelist approach. + + Each StartupMode declares exactly which scan keys are allowed. Any + scan key present but not in the whitelist for the resolved mode is + rejected. This matches Java's SchemaValidation mutual-exclusion matrix. + """ + from pypaimon.common.options.core_options import StartupMode + + core_options = self.table.options + mode = core_options.startup_mode() + options = core_options.options + + has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID) + has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME) + has_watermark = options.contains(CoreOptions.SCAN_WATERMARK) + has_timestamp_millis = options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS) + has_timestamp = options.contains(CoreOptions.SCAN_TIMESTAMP) + has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) + has_file_creation_time = options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS) + has_creation_time = options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS) + + present_keys = [] + if has_snapshot_id: + present_keys.append(CoreOptions.SCAN_SNAPSHOT_ID.key()) + if has_tag_name: + present_keys.append(CoreOptions.SCAN_TAG_NAME.key()) + if has_watermark: + present_keys.append(CoreOptions.SCAN_WATERMARK.key()) + if has_timestamp_millis: + present_keys.append(CoreOptions.SCAN_TIMESTAMP_MILLIS.key()) + if has_timestamp: + present_keys.append(CoreOptions.SCAN_TIMESTAMP.key()) + if has_incremental: + present_keys.append(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()) + if has_file_creation_time: + present_keys.append(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key()) + if has_creation_time: + present_keys.append(CoreOptions.SCAN_CREATION_TIME_MILLIS.key()) + + # scan.timestamp-millis and scan.timestamp are mutually exclusive + if has_timestamp_millis and has_timestamp: + raise ValueError( + "scan.timestamp-millis and scan.timestamp cannot both be set." + ) + + # Define allowed companion keys per mode + if mode == StartupMode.FROM_TIMESTAMP: + allowed = { + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), + CoreOptions.SCAN_TIMESTAMP.key(), + } + if not (has_timestamp_millis or has_timestamp): + raise ValueError( + "scan.mode is 'from-timestamp' but neither " + "scan.timestamp-millis nor scan.timestamp is set." + ) + elif mode == StartupMode.FROM_SNAPSHOT_FULL: + allowed = {CoreOptions.SCAN_SNAPSHOT_ID.key()} + if not has_snapshot_id: + raise ValueError( + "scan.mode is 'from-snapshot-full' but scan.snapshot-id is not set." + ) + elif mode == StartupMode.FROM_SNAPSHOT: + allowed = { + CoreOptions.SCAN_SNAPSHOT_ID.key(), + CoreOptions.SCAN_TAG_NAME.key(), + CoreOptions.SCAN_WATERMARK.key(), + } + if not (has_snapshot_id or has_tag_name or has_watermark): + raise ValueError( + "scan.mode is 'from-snapshot' but none of " + "scan.snapshot-id, scan.tag-name, or scan.watermark is set." + ) + elif mode == StartupMode.INCREMENTAL: + allowed = {CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()} + if not has_incremental: + raise ValueError( + "scan.mode is 'incremental' but " + "incremental-between-timestamp is not set." + ) + elif mode in (StartupMode.LATEST_FULL, StartupMode.LATEST): + allowed = set() + elif mode in (StartupMode.COMPACTED_FULL, + StartupMode.FROM_CREATION_TIMESTAMP, + StartupMode.FROM_FILE_CREATION_TIME): + raise ValueError( + f"scan.mode '{mode.value}' is not yet supported in pypaimon." + ) + else: + allowed = set() + + # Reject any scan key that's not in the whitelist for this mode + disallowed = [k for k in present_keys if k not in allowed] + if disallowed: + raise ValueError( + f"scan.mode '{mode.value}' conflicts with: {disallowed}. " + f"Only {sorted(allowed) if allowed else 'no scan keys'} " + f"are allowed for this mode." + ) diff --git a/paimon-python/pypaimon/tests/table_scan_mode_test.py b/paimon-python/pypaimon/tests/table_scan_mode_test.py new file mode 100644 index 000000000000..c33fdf9beb40 --- /dev/null +++ b/paimon-python/pypaimon/tests/table_scan_mode_test.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +import warnings +from unittest.mock import Mock + +from pypaimon.common.options.core_options import CoreOptions, StartupMode +from pypaimon.read.table_scan import TableScan + + +def _scan(options): + scan = TableScan.__new__(TableScan) + scan.table = Mock() + scan.table.options = CoreOptions.from_dict(options) + return scan + + +class TableScanModeTest(unittest.TestCase): + + def test_from_timestamp_requires_timestamp_option(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value, + }) + + with self.assertRaisesRegex( + ValueError, + "neither scan.timestamp-millis nor scan.timestamp is set"): + scan._validate_scan_mode() + + def test_latest_conflicts_with_snapshot_id(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value, + CoreOptions.SCAN_SNAPSHOT_ID.key(): "1", + }) + + with self.assertRaisesRegex(ValueError, "scan.snapshot-id"): + scan._validate_scan_mode() + + def test_default_with_timestamp_millis_resolves_to_from_timestamp(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value, + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123", + }) + + self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP) + _scan(options.options.to_map())._validate_scan_mode() + + def test_default_with_snapshot_id_resolves_to_from_snapshot(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value, + CoreOptions.SCAN_SNAPSHOT_ID.key(): "1", + }) + + self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT) + _scan(options.options.to_map())._validate_scan_mode() + + def test_unsupported_scan_modes_raise_value_error(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value, + }) + + with self.assertRaisesRegex(ValueError, "not yet supported"): + scan._validate_scan_mode() + + def test_full_mode_maps_to_latest_full_with_deprecation_warning(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value, + }) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL) + + self.assertEqual(len(caught), 1) + self.assertTrue(issubclass(caught[0].category, DeprecationWarning)) + + +if __name__ == '__main__': + unittest.main()