Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import sys
import warnings
from enum import Enum
from typing import Dict, Optional

Expand Down Expand Up @@ -56,6 +57,23 @@ class MergeEngine(str, Enum):
FIRST_ROW = "first-row"


class StartupMode(str, Enum):
"""
Startup mode for scan operations.
"""
DEFAULT = "default"
LATEST_FULL = "latest-full"
FULL = "full"
LATEST = "latest"
COMPACTED_FULL = "compacted-full"
FROM_TIMESTAMP = "from-timestamp"
FROM_SNAPSHOT = "from-snapshot"
FROM_SNAPSHOT_FULL = "from-snapshot-full"
FROM_CREATION_TIMESTAMP = "from-creation-timestamp"
FROM_FILE_CREATION_TIME = "from-file-creation-time"
INCREMENTAL = "incremental"


class CoreOptions:
"""Core options for Paimon tables."""
# File format constants
Expand Down Expand Up @@ -240,6 +258,21 @@ class CoreOptions:
.with_description("Specify the file name prefix of data files.")
)
# Scan options
SCAN_MODE: ConfigOption[StartupMode] = (
ConfigOptions.key("scan.mode")
.enum_type(StartupMode)
.default_value(StartupMode.DEFAULT)
.with_description(
"Scan startup mode for the table. "
"'default' resolves the actual mode from other scan options. "
"'latest-full' reads the latest snapshot then streams changes. "
"'latest' only streams changes without an initial snapshot. "
"'from-timestamp' reads from a specific timestamp. "
"'from-snapshot' reads from a specific snapshot. "
"'incremental' reads incremental changes between two snapshots/tags."
)
)

SCAN_FALLBACK_BRANCH: ConfigOption[str] = (
ConfigOptions.key("scan.fallback-branch")
.string_type()
Expand Down Expand Up @@ -301,6 +334,24 @@ class CoreOptions:
)
)

SCAN_FILE_CREATION_TIME_MILLIS: ConfigOption[int] = (
ConfigOptions.key("scan.file-creation-time-millis")
.long_type()
.no_default_value()
.with_description(
"After configuring this time, only the data files created after this time will be read."
)
)

SCAN_CREATION_TIME_MILLIS: ConfigOption[int] = (
ConfigOptions.key("scan.creation-time-millis")
.long_type()
.no_default_value()
.with_description(
"Optional timestamp used in case of 'from-creation-timestamp' scan mode."
)
)

SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
Expand Down Expand Up @@ -655,6 +706,42 @@ def blob_target_file_size(self, default=None):
def data_file_prefix(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_PREFIX, default)

def scan_mode(self, default=None):
return self.options.get(CoreOptions.SCAN_MODE, default)

def startup_mode(self) -> 'StartupMode':
"""Resolve the effective startup mode, matching Java CoreOptions.startupMode().

If scan.mode is DEFAULT, auto-detects from other scan options.
Maps deprecated FULL to LATEST_FULL.
"""
mode = self.scan_mode()
if mode == StartupMode.DEFAULT:
if (self.options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
or self.options.contains(CoreOptions.SCAN_TIMESTAMP)):
return StartupMode.FROM_TIMESTAMP
elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
or self.options.contains(CoreOptions.SCAN_TAG_NAME)
or self.options.contains(CoreOptions.SCAN_WATERMARK)):
return StartupMode.FROM_SNAPSHOT
elif self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
return StartupMode.INCREMENTAL
elif self.options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS):
return StartupMode.FROM_FILE_CREATION_TIME
elif self.options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS):
return StartupMode.FROM_CREATION_TIMESTAMP
else:
return StartupMode.LATEST_FULL
elif mode == StartupMode.FULL:
warnings.warn(
"scan.mode 'full' is deprecated, use 'latest-full' instead.",
DeprecationWarning,
stacklevel=2,
)
return StartupMode.LATEST_FULL
else:
return mode

def scan_fallback_branch(self, default=None):
return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default)

Expand Down
103 changes: 103 additions & 0 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def _create_file_scanner(self) -> FileScanner:
snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)

self._validate_scan_mode()

from pypaimon.snapshot.time_travel_util import TimeTravelUtil, SCAN_KEYS
has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
Expand Down Expand Up @@ -158,3 +160,104 @@ def with_slice(self, start_pos, end_pos) -> 'TableScan':
def with_global_index_result(self, result) -> 'TableScan':
self.file_scanner.with_global_index_result(result)
return self

def _validate_scan_mode(self):
"""Validate scan.mode against companion options using a whitelist approach.

Each StartupMode declares exactly which scan keys are allowed. Any
scan key present but not in the whitelist for the resolved mode is
rejected. This matches Java's SchemaValidation mutual-exclusion matrix.
"""
from pypaimon.common.options.core_options import StartupMode

core_options = self.table.options
mode = core_options.startup_mode()
options = core_options.options

has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME)
has_watermark = options.contains(CoreOptions.SCAN_WATERMARK)
has_timestamp_millis = options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
has_timestamp = options.contains(CoreOptions.SCAN_TIMESTAMP)
has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
has_file_creation_time = options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS)
has_creation_time = options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS)

present_keys = []
if has_snapshot_id:
present_keys.append(CoreOptions.SCAN_SNAPSHOT_ID.key())
if has_tag_name:
present_keys.append(CoreOptions.SCAN_TAG_NAME.key())
if has_watermark:
present_keys.append(CoreOptions.SCAN_WATERMARK.key())
if has_timestamp_millis:
present_keys.append(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())
if has_timestamp:
present_keys.append(CoreOptions.SCAN_TIMESTAMP.key())
if has_incremental:
present_keys.append(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
if has_file_creation_time:
present_keys.append(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key())
if has_creation_time:
present_keys.append(CoreOptions.SCAN_CREATION_TIME_MILLIS.key())

# scan.timestamp-millis and scan.timestamp are mutually exclusive
if has_timestamp_millis and has_timestamp:
raise ValueError(
"scan.timestamp-millis and scan.timestamp cannot both be set."
)

# Define allowed companion keys per mode
if mode == StartupMode.FROM_TIMESTAMP:
allowed = {
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
CoreOptions.SCAN_TIMESTAMP.key(),
}
if not (has_timestamp_millis or has_timestamp):
raise ValueError(
"scan.mode is 'from-timestamp' but neither "
"scan.timestamp-millis nor scan.timestamp is set."
)
elif mode == StartupMode.FROM_SNAPSHOT_FULL:
allowed = {CoreOptions.SCAN_SNAPSHOT_ID.key()}
if not has_snapshot_id:
raise ValueError(
"scan.mode is 'from-snapshot-full' but scan.snapshot-id is not set."
)
elif mode == StartupMode.FROM_SNAPSHOT:
allowed = {
CoreOptions.SCAN_SNAPSHOT_ID.key(),
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_WATERMARK.key(),
}
if not (has_snapshot_id or has_tag_name or has_watermark):
raise ValueError(
"scan.mode is 'from-snapshot' but none of "
"scan.snapshot-id, scan.tag-name, or scan.watermark is set."
)
elif mode == StartupMode.INCREMENTAL:
allowed = {CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()}
if not has_incremental:
raise ValueError(
"scan.mode is 'incremental' but "
"incremental-between-timestamp is not set."
)
elif mode in (StartupMode.LATEST_FULL, StartupMode.LATEST):
allowed = set()
elif mode in (StartupMode.COMPACTED_FULL,
StartupMode.FROM_CREATION_TIMESTAMP,
StartupMode.FROM_FILE_CREATION_TIME):
raise ValueError(
f"scan.mode '{mode.value}' is not yet supported in pypaimon."
)
else:
allowed = set()

# Reject any scan key that's not in the whitelist for this mode
disallowed = [k for k in present_keys if k not in allowed]
if disallowed:
raise ValueError(
f"scan.mode '{mode.value}' conflicts with: {disallowed}. "
f"Only {sorted(allowed) if allowed else 'no scan keys'} "
f"are allowed for this mode."
)
94 changes: 94 additions & 0 deletions paimon-python/pypaimon/tests/table_scan_mode_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
import warnings
from unittest.mock import Mock

from pypaimon.common.options.core_options import CoreOptions, StartupMode
from pypaimon.read.table_scan import TableScan


def _scan(options):
scan = TableScan.__new__(TableScan)
scan.table = Mock()
scan.table.options = CoreOptions.from_dict(options)
return scan


class TableScanModeTest(unittest.TestCase):

def test_from_timestamp_requires_timestamp_option(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value,
})

with self.assertRaisesRegex(
ValueError,
"neither scan.timestamp-millis nor scan.timestamp is set"):
scan._validate_scan_mode()

def test_latest_conflicts_with_snapshot_id(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value,
CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
})

with self.assertRaisesRegex(ValueError, "scan.snapshot-id"):
scan._validate_scan_mode()

def test_default_with_timestamp_millis_resolves_to_from_timestamp(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123",
})

self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP)
_scan(options.options.to_map())._validate_scan_mode()

def test_default_with_snapshot_id_resolves_to_from_snapshot(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
})

self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT)
_scan(options.options.to_map())._validate_scan_mode()

def test_unsupported_scan_modes_raise_value_error(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value,
})

with self.assertRaisesRegex(ValueError, "not yet supported"):
scan._validate_scan_mode()

def test_full_mode_maps_to_latest_full_with_deprecation_warning(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value,
})

with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL)

self.assertEqual(len(caught), 1)
self.assertTrue(issubclass(caught[0].category, DeprecationWarning))


if __name__ == '__main__':
unittest.main()
Loading