From de86e92c8e2710235caba31a54a5e314750dc912 Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Mon, 1 Jun 2026 10:31:04 +0200 Subject: [PATCH 1/2] Make profiler extract location user-configurable via --output-folder Surfaces the profiler's DuckDB extract path as a CLI choice instead of a hardcoded value in per-source pipeline_config.yml files with the default `~/.databricks/labs/lakebridge_profilers/_assessment`. Mirrors the shape of the transpile command's `--output-folder` flag. Closes #2461. --- labs.yml | 2 + .../labs/lakebridge/assessments/pipeline.py | 9 +-- .../labs/lakebridge/assessments/profiler.py | 24 ++++---- .../lakebridge/assessments/profiler_config.py | 1 - .../assessments/profiler_validator.py | 18 ------ src/databricks/labs/lakebridge/cli.py | 27 +++++++-- .../legacy_synapse/pipeline_config.yml | 1 - .../assessments/mssql/pipeline_config.yml | 1 - .../assessments/oracle/pipeline_config.yml | 1 - .../assessments/synapse/pipeline_config.yml | 1 - .../integration/assessments/test_pipeline.py | 55 ++++++++++--------- .../integration/assessments/test_profiler.py | 36 +++++------- .../assessments/test_profiler_validator.py | 13 ----- .../resources/assessments/pipeline_config.yml | 2 - .../assessments/pipeline_config_absolute.yml | 2 - .../pipeline_config_empty_result.yml | 1 - .../pipeline_config_failure_dependency.yml | 4 -- .../assessments/pipeline_config_main.yml | 3 - .../pipeline_config_python_failure.yml | 3 - .../pipeline_config_sql_failure.yml | 2 - .../pipeline_config_with_combined_ddl.yml | 4 +- .../assessments/pipeline_config_with_ddl.yml | 4 +- tests/unit/assessment/test_pipeline_config.py | 2 +- tests/unit/assessment/test_profiler_config.py | 1 - 24 files changed, 88 insertions(+), 129 deletions(-) diff --git a/labs.yml b/labs.yml index 12b6ff362a..a2b66c64be 100644 --- a/labs.yml +++ b/labs.yml @@ -115,6 +115,8 @@ commands: flags: - name: source-tech description: (Optional) The technology/platform of the sources to Profile + - name: output-folder + description: (Optional) Folder where the extracted DuckDB file is written; if omitted, prompts with a smart default - name: test-profiler-connection description: (Internal) Test the connection to the source database for profiling diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index cd23c7f5fc..85c71d7b31 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -36,10 +36,10 @@ class StepExecutionResult: class PipelineClass: - def __init__(self, config: PipelineConfig, executor: DatabaseManager | None): + def __init__(self, config: PipelineConfig, executor: DatabaseManager | None, output_folder: Path): self.config = config self.executor = executor - self._db_path_prefix = Path(config.extract_folder).expanduser() + self._db_path_prefix = output_folder.expanduser() self._create_dir(self._db_path_prefix) self._db_path = str(self._db_path_prefix / DB_NAME) @@ -145,7 +145,6 @@ def _execute_ddl_step(self, step: Step): raise RuntimeError(f"DDL execution failed: {str(e)}") from e def _execute_python_step(self, step: Step): - logging.debug(f"Executing Python script: {step.extract_source}") credential_config = str(cred_file("lakebridge")) venv_path_prefix = Path.home() / ".databricks" / "labs" / "lakebridge_profilers" @@ -328,7 +327,9 @@ def load_config_from_yaml(file_path: str | Path) -> PipelineConfig: data = yaml.safe_load(file) steps = [Step(**step) for step in data['steps']] return PipelineConfig( - name=data['name'], version=data['version'], extract_folder=data['extract_folder'], steps=steps + name=data['name'], + version=data['version'], + steps=steps, ) @staticmethod diff --git a/src/databricks/labs/lakebridge/assessments/profiler.py b/src/databricks/labs/lakebridge/assessments/profiler.py index eb601a35fa..f4763a7957 100644 --- a/src/databricks/labs/lakebridge/assessments/profiler.py +++ b/src/databricks/labs/lakebridge/assessments/profiler.py @@ -18,6 +18,10 @@ logger = logging.getLogger(__name__) +def default_output_folder(platform: str) -> Path: + return Path.home() / ".databricks" / "labs" / "lakebridge_profilers" / f"{platform}_assessment" + + class Profiler: def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None): @@ -26,17 +30,11 @@ def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None @classmethod def create(cls, platform: str) -> "Profiler": - pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.get(platform, None) - pipeline_config = None - if pipeline_config_path: - pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path) - pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path) + pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG[platform] + pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path) + pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path) return cls(platform, pipeline_config) - @classmethod - def supported_platforms(cls) -> list[str]: - return list(PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.keys()) - @staticmethod def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig: # TODO: Choose a better name for this. @@ -49,13 +47,15 @@ def profile( *, extractor: DatabaseManager | None = None, pipeline_config: PipelineConfig | None = None, + output_folder: Path | None = None, ) -> None: platform = self._platform.lower() if not pipeline_config: if not self._pipeline_config: raise ValueError(f"Cannot Proceed without a valid pipeline configuration for {platform}") pipeline_config = self._pipeline_config - self._execute(platform, pipeline_config, extractor) + resolved_output_folder = output_folder or default_output_folder(platform) + self._execute(platform, pipeline_config, resolved_output_folder, extractor) @staticmethod def _setup_extractor(platform: str) -> DatabaseManager | None: @@ -65,12 +65,12 @@ def _setup_extractor(platform: str) -> DatabaseManager | None: connect_config = cred_manager.get_credentials(platform) return DatabaseManager(platform, connect_config) - def _execute(self, platform: str, pipeline_config: PipelineConfig, extractor=None) -> None: + def _execute(self, platform: str, pipeline_config: PipelineConfig, output_folder: Path, extractor=None) -> None: try: if extractor is None: extractor = Profiler._setup_extractor(platform) - result = PipelineClass(pipeline_config, extractor).execute() + result = PipelineClass(pipeline_config, extractor, output_folder).execute() logger.info(f"Profile execution has completed successfully for {platform} for more info check: {result}.") except FileNotFoundError as e: logger.error(f"Configuration file not found for source {platform}: {e}") diff --git a/src/databricks/labs/lakebridge/assessments/profiler_config.py b/src/databricks/labs/lakebridge/assessments/profiler_config.py index 7d0a4b4d7a..d87daabd39 100644 --- a/src/databricks/labs/lakebridge/assessments/profiler_config.py +++ b/src/databricks/labs/lakebridge/assessments/profiler_config.py @@ -75,7 +75,6 @@ def copy(self, /, **changes) -> "Step": class PipelineConfig: name: str version: str - extract_folder: str comment: str | None = None steps: list[Step] = field(default_factory=list) diff --git a/src/databricks/labs/lakebridge/assessments/profiler_validator.py b/src/databricks/labs/lakebridge/assessments/profiler_validator.py index 66a7e60f49..05d3223c14 100644 --- a/src/databricks/labs/lakebridge/assessments/profiler_validator.py +++ b/src/databricks/labs/lakebridge/assessments/profiler_validator.py @@ -1,14 +1,10 @@ -import os from dataclasses import dataclass from collections.abc import Sequence -from pathlib import Path import yaml from duckdb import DuckDBPyConnection, CatalogException, ParserException, Error from pyspark.sql import DataFrame, SparkSession -from databricks.labs.lakebridge.assessments.pipeline import PipelineClass - PROFILER_DB_NAME = "profiler_extract.db" @@ -202,20 +198,6 @@ def validate(self, connection) -> ValidationOutcome: ) -def get_profiler_extract_path(pipeline_config_path: Path) -> Path: - """ - Returns the filesystem path of the profiler extract database. - input: - pipeline_config_path: the location of the pipeline definition .yml file - returns: - the filesystem path to the profiler extract database - """ - pipeline_config = PipelineClass.load_config_from_yaml(pipeline_config_path) - normalized_db_path = os.path.normpath(os.path.expanduser(pipeline_config.extract_folder)) - database_path = Path(normalized_db_path) / PROFILER_DB_NAME - return database_path - - def build_validation_report( validations: Sequence[ValidationStrategy], connection: DuckDBPyConnection ) -> list[ValidationOutcome]: diff --git a/src/databricks/labs/lakebridge/cli.py b/src/databricks/labs/lakebridge/cli.py index f571305be3..bd499042e2 100644 --- a/src/databricks/labs/lakebridge/cli.py +++ b/src/databricks/labs/lakebridge/cli.py @@ -23,7 +23,7 @@ from databricks.labs.lakebridge.assessments.configure_assessment import create_assessment_configurator from databricks.labs.lakebridge.assessments import PROFILER_SOURCE_SYSTEM, PRODUCT_NAME -from databricks.labs.lakebridge.assessments.profiler import Profiler +from databricks.labs.lakebridge.assessments.profiler import Profiler, default_output_folder from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1 from databricks.labs.lakebridge.contexts.application import ApplicationContext @@ -1001,7 +1001,11 @@ def llm_transpile( @lakebridge.command() -def execute_database_profiler(w: WorkspaceClient, source_tech: str | None = None) -> None: +def execute_database_profiler( + w: WorkspaceClient, + source_tech: str | None = None, + output_folder: str | None = None, +) -> None: """Execute the Profiler Extraction for the given source technology""" ctx = ApplicationContext(w) ctx.add_user_agent_extra("cmd", "execute-profiler") @@ -1024,10 +1028,25 @@ def execute_database_profiler(w: WorkspaceClient, source_tech: str | None = None f"Connection details not found. Please run `databricks labs lakebridge configure-database-profiler` " f"to set up connection details for {source_tech}." ) - profiler = Profiler.create(source_tech) + if output_folder is None: + output_folder = prompts.question( + "Enter the output folder where the profile extract will be written", + default=str(default_output_folder(source_tech)), + ).strip() + _validate_output_folder(Path(output_folder)) + + profiler = Profiler.create(source_tech) # TODO: Add extractor logic to ApplicationContext instead of creating inside the Profiler class - profiler.profile() + profiler.profile(output_folder=Path(output_folder)) + + +def _validate_output_folder(output_folder: Path) -> None: + if not output_folder.parts: + raise ValueError("Output folder cannot be empty") + parent = output_folder.expanduser().parent + if not parent.exists(): + raise ValueError(f"Invalid path for '--output-folder', parent does not exist for: {output_folder}") @lakebridge.command() diff --git a/src/databricks/labs/lakebridge/resources/assessments/legacy_synapse/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/legacy_synapse/pipeline_config.yml index b31824c8c7..d74d1f5d9b 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/legacy_synapse/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/legacy_synapse/pipeline_config.yml @@ -1,6 +1,5 @@ name: legacy_synapse_assessment version: "1.0" -extract_folder: "/tmp/data/legacy_synapse_assessment" steps: - name: columns type: sql diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml index 8c689a75fb..79a914b594 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -1,6 +1,5 @@ name: mssql_assessment version: "1.0" -extract_folder: "/tmp/data/mssql_assessment" steps: - name: sys_info type: ddl diff --git a/src/databricks/labs/lakebridge/resources/assessments/oracle/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/oracle/pipeline_config.yml index 5da43c5ff1..9ed1954882 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/oracle/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/oracle/pipeline_config.yml @@ -1,6 +1,5 @@ name: oracle_assessment version: "1.0" -extract_folder: "/tmp/data/oracle_assessment" steps: - name: config_containers type: ddl diff --git a/src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml index 812351223d..f591f1c1c7 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml @@ -1,6 +1,5 @@ name: synapse_assessment version: "1.0" -extract_folder: "~/.databricks/labs/lakebridge_profilers/synapse_assessment" steps: - name: workspace_info type: python diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index cc07f4e20b..8c48f414ac 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -20,12 +20,10 @@ @pytest.fixture -def pipeline_configuration_loader(test_resources: Path, project_path: Path, tmp_path: Path) -> _Loader: +def pipeline_configuration_loader(test_resources: Path) -> _Loader: def _load(resource_name: Path) -> PipelineConfig: config_path = test_resources / "assessments" / resource_name - return Profiler.path_modifier(config_file=config_path, path_prefix=test_resources).copy( - extract_folder=str(tmp_path / "pipeline_output") - ) + return Profiler.path_modifier(config_file=config_path, path_prefix=test_resources) return _load @@ -63,9 +61,10 @@ def python_failure_config(pipeline_configuration_loader: _Loader) -> PipelineCon @pytest.fixture(scope="module") def empty_result_config() -> PipelineConfig: prefix = Path(__file__).parent - config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml" + config_path = prefix / ".." / ".." / "resources" / "assessments" / "pipeline_config_empty_result.yml" config: PipelineConfig = PipelineClass.load_config_from_yaml(config_path) - updated_steps = [step.copy(extract_source=f"{prefix}/../../{step.extract_source}") for step in config.steps] + test_root = prefix / ".." / ".." + updated_steps = [step.copy(extract_source=str(test_root / step.extract_source)) for step in config.steps] return config.copy(steps=updated_steps) @@ -73,8 +72,9 @@ def test_run_pipeline( sandbox_sqlserver: DatabaseManager, pipeline_config: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: - pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver, output_folder=tmp_path) results = pipeline.execute() # Verify all steps completed successfully @@ -84,15 +84,16 @@ def test_run_pipeline( StepExecutionStatus.SKIPPED, ), f"Step {result.step_name} failed with status {result.status}" - assert verify_output(get_logger, Path(pipeline_config.extract_folder)) + assert verify_output(get_logger, tmp_path) def test_run_sql_failure_pipeline( sandbox_sqlserver: DatabaseManager, sql_failure_config: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: - pipeline = PipelineClass(config=sql_failure_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=sql_failure_config, executor=sandbox_sqlserver, output_folder=tmp_path) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -104,8 +105,9 @@ def test_run_python_failure_pipeline( sandbox_sqlserver: DatabaseManager, python_failure_config: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: - pipeline = PipelineClass(config=python_failure_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=python_failure_config, executor=sandbox_sqlserver, output_folder=tmp_path) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -117,8 +119,9 @@ def test_run_python_dep_failure_pipeline( sandbox_sqlserver: DatabaseManager, pipeline_dep_failure_config: PipelineConfig, get_logger: Logger, + tmp_path: Path, ): - pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver, output_folder=tmp_path) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -126,12 +129,16 @@ def test_run_python_dep_failure_pipeline( assert "Pipeline execution failed due to errors in steps: package_status" in str(e.value) -def test_skipped_steps(sandbox_sqlserver: DatabaseManager, pipeline_config: PipelineConfig) -> None: +def test_skipped_steps( + sandbox_sqlserver: DatabaseManager, + pipeline_config: PipelineConfig, + tmp_path: Path, +) -> None: # Modify config to have some inactive steps inactive_steps = [step.copy(flag="inactive") for step in pipeline_config.steps] pipeline_config = pipeline_config.copy(steps=inactive_steps) - pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver, output_folder=tmp_path) results = pipeline.execute() # Verify all steps are marked as skipped @@ -166,12 +173,9 @@ def test_pipeline_config_comments() -> None: pipeline_w_comments = PipelineConfig( name="warehouse_profiler", version="1.0", - extract_folder="/the/output/path", comment="A pipeline for extracting warehouse usage.", ) - pipeline_wo_comments = PipelineConfig( - name="another_warehouse_profiler", version="1.0", extract_folder="/the/output/path" - ) + pipeline_wo_comments = PipelineConfig(name="another_warehouse_profiler", version="1.0") assert pipeline_w_comments.comment == "A pipeline for extracting warehouse usage." assert pipeline_wo_comments.comment is None @@ -202,8 +206,9 @@ def test_run_empty_result_pipeline( sandbox_sqlserver: DatabaseManager, empty_result_config: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: - pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver, output_folder=tmp_path) results = pipeline.execute() # Verify step completed successfully despite empty results @@ -213,7 +218,7 @@ def test_run_empty_result_pipeline( ] # Verify that no table was created (processing was skipped for empty resultset) - with duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) as conn: + with duckdb.connect(str(tmp_path / DB_NAME)) as conn: tables = conn.execute("SHOW TABLES").fetchall() table_names = [table[0] for table in tables] @@ -225,9 +230,10 @@ def test_run_pipeline_with_ddl( sandbox_sqlserver: DatabaseManager, pipeline_config_with_ddl: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: """Test pipeline execution with DDL steps that create tables with proper data types.""" - pipeline = PipelineClass(config=pipeline_config_with_ddl, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config_with_ddl, executor=sandbox_sqlserver, output_folder=tmp_path) results = pipeline.execute() # Verify all steps completed successfully @@ -238,8 +244,7 @@ def test_run_pipeline_with_ddl( ), f"Step {result.step_name} failed with status {result.status}" # Verify tables exist and have proper data types - db_path = str(Path(pipeline_config_with_ddl.extract_folder)) + "/" + DB_NAME - with duckdb.connect(db_path) as conn: + with duckdb.connect(str(tmp_path / DB_NAME)) as conn: # Check inventory table schema (created from DDL) inventory_schema = conn.execute("DESCRIBE inventory").fetchall() get_logger.info(f"Inventory schema: {inventory_schema}") @@ -268,9 +273,10 @@ def test_run_pipeline_with_combined_ddl( sandbox_sqlserver: DatabaseManager, pipeline_config_combined_ddl: PipelineConfig, get_logger: Logger, + tmp_path: Path, ) -> None: """Test pipeline execution with a single DDL file containing multiple CREATE TABLE statements.""" - pipeline = PipelineClass(config=pipeline_config_combined_ddl, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config_combined_ddl, executor=sandbox_sqlserver, output_folder=tmp_path) results = pipeline.execute() # Verify all steps completed successfully @@ -281,8 +287,7 @@ def test_run_pipeline_with_combined_ddl( ), f"Step {result.step_name} failed with status {result.status}" # Verify all tables from combined DDL were created - db_path = str(Path(pipeline_config_combined_ddl.extract_folder)) + "/" + DB_NAME - with duckdb.connect(db_path) as conn: + with duckdb.connect(str(tmp_path / DB_NAME)) as conn: # Check that all three tables exist tables = conn.execute("SHOW TABLES").fetchall() table_names = [table[0] for table in tables] diff --git a/tests/integration/assessments/test_profiler.py b/tests/integration/assessments/test_profiler.py index 1c96b99c53..5175916158 100644 --- a/tests/integration/assessments/test_profiler.py +++ b/tests/integration/assessments/test_profiler.py @@ -8,46 +8,37 @@ from databricks.labs.lakebridge.assessments.profiler import Profiler -def test_supported_source_technologies() -> None: - """Test that supported source technologies are correctly returned""" - profiler = Profiler("synapse", None) - supported_platforms = profiler.supported_platforms() - assert supported_platforms - - def test_profile_missing_platform_config() -> None: - """Test that profiling an unsupported platform raises ValueError""" + """Constructing Profiler directly with no pipeline_config and no override raises.""" with pytest.raises(ValueError, match="Cannot Proceed without a valid pipeline configuration for synapse"): - profiler = Profiler("synapse", None) + profiler = Profiler("synapse") profiler.profile() def test_profile_execution(test_resources: Path, tmp_path: Path) -> None: """Test successful profiling execution using actual pipeline configuration""" - profiler = Profiler("synapse") config_file = test_resources / "assessments" / "pipeline_config_main.yml" - extract_folder = tmp_path / "profiler_main" - config = profiler.path_modifier(config_file=config_file, path_prefix=test_resources).copy( - extract_folder=str(extract_folder) - ) - profiler.profile(pipeline_config=config) - assert (extract_folder / "profiler_extract.db").exists(), "Profiler extract database should be created" + output_folder = tmp_path / "profiler_main" + profiler = Profiler("synapse") + config = Profiler.path_modifier(config_file=config_file, path_prefix=test_resources) + profiler.profile(pipeline_config=config, output_folder=output_folder) + assert (output_folder / "profiler_extract.db").exists(), "Profiler extract database should be created" -def test_profile_execution_with_invalid_config(test_resources: Path) -> None: +def test_profile_execution_with_invalid_config(test_resources: Path, tmp_path: Path) -> None: """Test profiling execution with invalid configuration""" profiler = Profiler("synapse") with pytest.raises(FileNotFoundError): config_file = test_resources / "assessments" / "invalid_pipeline_config.yml" - pipeline_config = profiler.path_modifier(config_file=config_file, path_prefix=test_resources) - profiler.profile(pipeline_config=pipeline_config) + pipeline_config = Profiler.path_modifier(config_file=config_file, path_prefix=test_resources) + profiler.profile(pipeline_config=pipeline_config, output_folder=tmp_path / "out") def test_profile_execution_config_override(test_resources: Path, tmp_path: Path) -> None: """Test successful profiling execution using actual pipeline configuration with config file override""" config_dir = tmp_path / "config_dir" config_dir.mkdir() - extract_folder = tmp_path / "profiler_absolute" + output_folder = tmp_path / "profiler_absolute" # Copy the YAML file and Python script to the temp directory config_file_src = test_resources / "assessments" / "pipeline_config_absolute.yml" config_file_dest = config_dir / config_file_src.name @@ -57,7 +48,6 @@ def test_profile_execution_config_override(test_resources: Path, tmp_path: Path) with open(config_file_src, 'r', encoding="utf-8") as file: config_data = yaml.safe_load(file) - config_data['extract_folder'] = str(extract_folder) for step in config_data['steps']: step['extract_source'] = str(script_dest) with open(config_file_dest, 'w', encoding="utf-8") as file: @@ -65,5 +55,5 @@ def test_profile_execution_config_override(test_resources: Path, tmp_path: Path) profiler = Profiler("synapse") pipeline_config = PipelineClass.load_config_from_yaml(config_file_dest) - profiler.profile(pipeline_config=pipeline_config) - assert (extract_folder / "profiler_extract.db").exists(), "Profiler extract database should be created" + profiler.profile(pipeline_config=pipeline_config, output_folder=output_folder) + assert (output_folder / "profiler_extract.db").exists(), "Profiler extract database should be created" diff --git a/tests/integration/assessments/test_profiler_validator.py b/tests/integration/assessments/test_profiler_validator.py index 592cb7a407..30f2a1e342 100644 --- a/tests/integration/assessments/test_profiler_validator.py +++ b/tests/integration/assessments/test_profiler_validator.py @@ -6,7 +6,6 @@ import pytest from databricks.labs.lakebridge.assessments.profiler_validator import ( - get_profiler_extract_path, EmptyTableValidationCheck, build_validation_report, NullValidationCheck, @@ -37,18 +36,6 @@ def mock_synapse_profiler_extract() -> Generator[Path]: yield synapse_extract_path -def test_get_profiler_extract_path(pipeline_config_path: Path, failure_pipeline_config_path: Path) -> None: - # Parse `extract_folder` **with** a trailing "/" character - expected_db_path = Path("/replaced/after/loading/profiler_extract.db") - profiler_db_path = get_profiler_extract_path(pipeline_config_path) - assert profiler_db_path == expected_db_path - - # Parse `extract_folder` **without** a trailing "/" character - expected_db_path = Path("/replaced/after/loading/profiler_extract.db") - profiler_db_path = get_profiler_extract_path(failure_pipeline_config_path) - assert profiler_db_path == expected_db_path - - def test_validate_non_empty_tables(mock_synapse_profiler_extract: Path) -> None: with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn: validation_checks = [] diff --git a/tests/resources/assessments/pipeline_config.yml b/tests/resources/assessments/pipeline_config.yml index c464e190e1..938a725843 100644 --- a/tests/resources/assessments/pipeline_config.yml +++ b/tests/resources/assessments/pipeline_config.yml @@ -1,7 +1,5 @@ name: ExamplePipeline version: "1.0" -# Value replaced prior to actual use: -extract_folder: /replaced/after/loading/ steps: # Paths for extract_source are relative to the tests/resources/ directory within the project. - name: inventory diff --git a/tests/resources/assessments/pipeline_config_absolute.yml b/tests/resources/assessments/pipeline_config_absolute.yml index 45a7f1667a..3486a7f4fa 100644 --- a/tests/resources/assessments/pipeline_config_absolute.yml +++ b/tests/resources/assessments/pipeline_config_absolute.yml @@ -1,7 +1,5 @@ name: ExamplePipeline version: "1.0" -# Value replaced prior to actual use: -extract_folder: /replaced/after/loading/ steps: - name: random_data type: python diff --git a/tests/resources/assessments/pipeline_config_empty_result.yml b/tests/resources/assessments/pipeline_config_empty_result.yml index d0c83ff175..c4fd472f9f 100644 --- a/tests/resources/assessments/pipeline_config_empty_result.yml +++ b/tests/resources/assessments/pipeline_config_empty_result.yml @@ -1,6 +1,5 @@ name: test_empty_result_pipeline version: 1.0 -extract_folder: /tmp/lakebridge_test_empty_result steps: - name: empty_result_step type: sql diff --git a/tests/resources/assessments/pipeline_config_failure_dependency.yml b/tests/resources/assessments/pipeline_config_failure_dependency.yml index 53802671c0..fe483c47a7 100644 --- a/tests/resources/assessments/pipeline_config_failure_dependency.yml +++ b/tests/resources/assessments/pipeline_config_failure_dependency.yml @@ -1,7 +1,5 @@ name: ExamplePipeline version: "1.0" -# Value replaced prior to actual use: -extract_folder: /replaced/after/loading/ steps: - name: package_status type: python @@ -12,5 +10,3 @@ steps: flag: active dependencies: - databricks_labs_ucx>=0.50.0 - - diff --git a/tests/resources/assessments/pipeline_config_main.yml b/tests/resources/assessments/pipeline_config_main.yml index f3d7e7717b..8ec369ad23 100644 --- a/tests/resources/assessments/pipeline_config_main.yml +++ b/tests/resources/assessments/pipeline_config_main.yml @@ -1,8 +1,5 @@ name: ExamplePipeline version: "1.0" -# Value replaced prior to actual use. -# Note: test_get_profiler_extract_path() requires a trailing '/'. -extract_folder: /replaced/after/loading/ steps: - name: random_data type: python diff --git a/tests/resources/assessments/pipeline_config_python_failure.yml b/tests/resources/assessments/pipeline_config_python_failure.yml index 5adbe40860..f3bc375cac 100644 --- a/tests/resources/assessments/pipeline_config_python_failure.yml +++ b/tests/resources/assessments/pipeline_config_python_failure.yml @@ -1,8 +1,5 @@ name: "Python Failure Pipeline" version: "1.0" -# Value replaced prior to actual use. -# Note: test_get_profiler_extract_path() requires _no_ trailing '/'. -extract_folder: /replaced/after/loading steps: - name: invalid_python_step type: python diff --git a/tests/resources/assessments/pipeline_config_sql_failure.yml b/tests/resources/assessments/pipeline_config_sql_failure.yml index e9a002853d..4871a8501b 100644 --- a/tests/resources/assessments/pipeline_config_sql_failure.yml +++ b/tests/resources/assessments/pipeline_config_sql_failure.yml @@ -1,7 +1,5 @@ name: "SQL Failure Pipeline" version: "1.0" -# Value replaced prior to actual use. -extract_folder: /replaced/after/loading/ steps: - name: invalid_sql_step type: sql diff --git a/tests/resources/assessments/pipeline_config_with_combined_ddl.yml b/tests/resources/assessments/pipeline_config_with_combined_ddl.yml index 0a9dc7c0c3..08566cf3ba 100644 --- a/tests/resources/assessments/pipeline_config_with_combined_ddl.yml +++ b/tests/resources/assessments/pipeline_config_with_combined_ddl.yml @@ -1,7 +1,5 @@ name: PipelineWithCombinedDDL version: "1.0" -# Value replaced prior to actual use: -extract_folder: /replaced/after/loading/ steps: # Single DDL step with multiple CREATE TABLE statements - name: combined_schema @@ -22,4 +20,4 @@ steps: extract_source: assessments/usage.sql mode: append frequency: weekly - flag: active \ No newline at end of file + flag: active diff --git a/tests/resources/assessments/pipeline_config_with_ddl.yml b/tests/resources/assessments/pipeline_config_with_ddl.yml index 70b0a45b16..72f91c95f6 100644 --- a/tests/resources/assessments/pipeline_config_with_ddl.yml +++ b/tests/resources/assessments/pipeline_config_with_ddl.yml @@ -1,7 +1,5 @@ name: PipelineWithDDL version: "1.0" -# Value replaced prior to actual use: -extract_folder: /replaced/after/loading/ steps: # DDL step creates table with proper data types - name: inventory @@ -17,7 +15,7 @@ steps: mode: append frequency: daily flag: active - # Regular SQL step without DDL (backwards compatibility) + # Regular SQL step without DDL - name: usage type: sql extract_source: assessments/usage.sql diff --git a/tests/unit/assessment/test_pipeline_config.py b/tests/unit/assessment/test_pipeline_config.py index 567a16d998..0a1bb49a80 100644 --- a/tests/unit/assessment/test_pipeline_config.py +++ b/tests/unit/assessment/test_pipeline_config.py @@ -7,7 +7,7 @@ def _config(*step_specs: tuple[str, str, str]) -> PipelineConfig: steps = [Step(name=n, type=t, extract_source="dummy.sql", flag=f) for n, t, f in step_specs] - return PipelineConfig(name="test", version="1.0", extract_folder="/tmp", steps=steps) + return PipelineConfig(name="test", version="1.0", steps=steps) @pytest.mark.parametrize( diff --git a/tests/unit/assessment/test_profiler_config.py b/tests/unit/assessment/test_profiler_config.py index 0b65e73e78..a062eaf596 100644 --- a/tests/unit/assessment/test_profiler_config.py +++ b/tests/unit/assessment/test_profiler_config.py @@ -145,7 +145,6 @@ def test_pipeline_config_with_valid_steps() -> None: config = PipelineConfig( name="TestPipeline", version="1.0", - extract_folder="/tmp/test", steps=steps, ) From c5afeb955d539f05f725b9425e1dbaacbd900278 Mon Sep 17 00:00:00 2001 From: Noopur Nigam Date: Fri, 5 Jun 2026 14:01:41 +0530 Subject: [PATCH 2/2] Validate Oracle profiler DuckDB->Delta ingestion and fix timestamp type fidelity Add the missing oracle_extract_schema.yml required by the profiler ingestion validation step (without it the job raised FileNotFoundError for Oracle before reaching ingestion), and fix _ingest_table to derive an explicit Spark schema from the DuckDB column types instead of relying on pandas inference. DuckDB's timezone-naive TIMESTAMP was inferred as Spark TIMESTAMP (LTZ), which shifts wall-clock values by the session offset. It now maps to TIMESTAMP_NTZ, preserving them exactly. Unmapped types (e.g. nested STRUCT from other sources) fall back to inference, so other sources are unaffected. Tests: unit tests for the DuckDB->Spark type mapping (incl. TIMESTAMP->NTZ and STRUCT fallback). An integration test validating a DDL-built mock Oracle extract against the shipped schema YAML and a manual end-to-end fidelity check (not run in CI). --- .../assessments/dashboards/execute.py | 77 +++++++++- .../validation/oracle_extract_schema.yml | 138 ++++++++++++++++++ .../assessments/manual_oracle_e2e_check.py | 112 ++++++++++++++ .../assessments/profiler_extract_utils.py | 51 +++++++ .../assessments/test_oracle_validation.py | 103 +++++++++++++ tests/unit/assessment/test_ingest_schema.py | 57 ++++++++ 6 files changed, 533 insertions(+), 5 deletions(-) create mode 100644 src/databricks/labs/lakebridge/resources/assessments/validation/oracle_extract_schema.yml create mode 100644 tests/integration/assessments/manual_oracle_e2e_check.py create mode 100644 tests/integration/assessments/test_oracle_validation.py create mode 100644 tests/unit/assessment/test_ingest_schema.py diff --git a/src/databricks/labs/lakebridge/assessments/dashboards/execute.py b/src/databricks/labs/lakebridge/assessments/dashboards/execute.py index bc8bce5690..a1e4531df0 100644 --- a/src/databricks/labs/lakebridge/assessments/dashboards/execute.py +++ b/src/databricks/labs/lakebridge/assessments/dashboards/execute.py @@ -8,6 +8,22 @@ import duckdb import yaml from pyspark.sql import SparkSession +from pyspark.sql.types import ( + BooleanType, + ByteType, + DataType, + DateType, + DoubleType, + FloatType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType, + TimestampNTZType, + TimestampType, +) from yaml.parser import ParserError from yaml.scanner import ScannerError @@ -32,6 +48,48 @@ class ExtractIngestionError(Exception): """Raised when the profiler extract ingestion fails due to unexpected errors.""" +# Maps DuckDB column types (as reported by a DuckDB relation) to Spark types so that the +# DuckDB -> pandas -> Spark -> Delta hop preserves types exactly instead of relying on Spark's +# pandas-based schema inference. The important case is TIMESTAMP: DuckDB's TIMESTAMP is timezone +# *naive* (a wall-clock value), but inference yields a timezone-dependent TIMESTAMP (LTZ) that +# reinterprets the value in the session time zone and shifts it. Mapping it to TIMESTAMP_NTZ keeps +# the original wall-clock value. TIMESTAMP WITH TIME ZONE stays LTZ, which is correct for instants. +_DUCKDB_TO_SPARK_TYPE: dict[str, DataType] = { + "BOOLEAN": BooleanType(), + "TINYINT": ByteType(), + "SMALLINT": ShortType(), + "INTEGER": IntegerType(), + "BIGINT": LongType(), + "FLOAT": FloatType(), + "DOUBLE": DoubleType(), + "VARCHAR": StringType(), + "DATE": DateType(), + "TIMESTAMP": TimestampNTZType(), + "TIMESTAMP WITH TIME ZONE": TimestampType(), +} + + +def build_spark_schema(columns: Sequence[str], duckdb_types: Sequence[object]) -> StructType | None: + """ + Build an explicit Spark schema from a DuckDB relation's column names and types. + + Returns ``None`` if any column has a DuckDB type that is not in ``_DUCKDB_TO_SPARK_TYPE`` + (e.g. nested STRUCT/LIST/MAP columns produced by other profilers). In that case the caller + falls back to Spark's schema inference for the whole table, preserving existing behavior. + """ + fields: list[StructField] = [] + for name, duck_type in zip(columns, duckdb_types): + spark_type = _DUCKDB_TO_SPARK_TYPE.get(str(duck_type).upper()) + if spark_type is None: + logger.info( + f"Column '{name}' has unmapped DuckDB type '{duck_type}'; " + "falling back to Spark schema inference for this table." + ) + return None + fields.append(StructField(name, spark_type, True)) + return StructType(fields) + + def main(*argv: str) -> None: """Lakeview Jobs task entry point: profiler_dashboards""" initialize_logging() @@ -169,15 +227,24 @@ def _ingest_profiler_tables(catalog_name: str, schema_name: str, extract_locatio def _ingest_table(extract_location: str, source_table_name: str, target_table_name: str) -> None: """ Ingest a table from a DuckDB profiler extract into a managed Delta table in Unity Catalog. + + An explicit Spark schema is derived from the DuckDB column types so that values are preserved + exactly across the DuckDB -> pandas -> Spark -> Delta hop. In particular, DuckDB's timezone-naive + TIMESTAMP is mapped to Spark TIMESTAMP_NTZ to keep the wall-clock value; relying on schema + inference would produce a timezone-dependent TIMESTAMP (LTZ) and shift the values. For tables + containing column types we do not map (e.g. nested STRUCT columns from other profilers) we fall + back to schema inference. """ + spark = SparkSession.builder.getOrCreate() try: with duckdb.connect(database=extract_location, read_only=True) as duck_conn: - query = f"SELECT * FROM {source_table_name}" - pdf = duck_conn.execute(query).df() - # Save table as a managed Delta table in Unity Catalog + relation = duck_conn.sql(f"SELECT * FROM {source_table_name}") + # An explicit schema preserves types exactly; when a column type is unmapped it is None + # and we fall back to Spark's pandas schema inference. + spark_schema = build_spark_schema(relation.columns, relation.types) + pdf = relation.df() logger.info(f"Saving profiler table '{target_table_name}' to Unity Catalog.") - spark = SparkSession.builder.getOrCreate() - df = spark.createDataFrame(pdf) + df = spark.createDataFrame(pdf, schema=spark_schema) if spark_schema else spark.createDataFrame(pdf) df.write.format("delta").mode("overwrite").saveAsTable(target_table_name) except duckdb.CatalogException as e: logger.error(f"Could not find source table '{source_table_name}' in profiler extract: {e}") diff --git a/src/databricks/labs/lakebridge/resources/assessments/validation/oracle_extract_schema.yml b/src/databricks/labs/lakebridge/resources/assessments/validation/oracle_extract_schema.yml new file mode 100644 index 0000000000..6c2cc48361 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/validation/oracle_extract_schema.yml @@ -0,0 +1,138 @@ +source_tech: oracle +version: 0.1 + +schemas: + main: + tables: + config_containers: + columns: + - name: inst_ids + type: VARCHAR + - name: name + type: VARCHAR + - name: open_mode + type: VARCHAR + - name: pdb_count + type: INTEGER + config_db_features: + columns: + - name: scope + type: VARCHAR + - name: inst_id + type: INTEGER + - name: stat_name + type: VARCHAR + - name: name + type: VARCHAR + - name: value + type: VARCHAR + config_instance: + columns: + - name: inst_id + type: INTEGER + - name: instance_name + type: VARCHAR + - name: version + type: VARCHAR + - name: database_type + type: VARCHAR + config_memory_evolution: + columns: + - name: con_name + type: VARCHAR + - name: instance_number + type: INTEGER + - name: snap_time + type: TIMESTAMP + - name: parameter_name + type: VARCHAR + - name: value + type: VARCHAR + config_pdb_objects: + columns: + - name: pdb_name + type: VARCHAR + - name: owner + type: VARCHAR + - name: object_type + type: VARCHAR + - name: cnt + type: INTEGER + config_pdb_partitions: + columns: + - name: pdb_name + type: VARCHAR + - name: owner + type: VARCHAR + - name: object_type + type: VARCHAR + - name: cnt + type: INTEGER + config_storage: + columns: + - name: con_name + type: VARCHAR + - name: tablespace_type + type: VARCHAR + - name: gb + type: DOUBLE + - name: freegb + type: DOUBLE + - name: maxgb + type: DOUBLE + perf_cpu_waits: + columns: + - name: pdb_name + type: VARCHAR + - name: instance_number + type: INTEGER + - name: mtime + type: TIMESTAMP + - name: event + type: VARCHAR + - name: wait_class + type: VARCHAR + - name: total_wait_time + type: BIGINT + perf_fgd_session_evolution: + columns: + - name: name + type: VARCHAR + - name: instance_number + type: INTEGER + - name: username + type: VARCHAR + - name: snap_time + type: TIMESTAMP + - name: foregd_session_cnt + type: INTEGER + perf_heatmap: + columns: + - name: mtime + type: VARCHAR + - name: pdb_name + type: VARCHAR + - name: instance_number + type: INTEGER + - name: hour + type: VARCHAR + - name: core_nb + type: INTEGER + - name: value + type: DOUBLE + perf_sqltext: + columns: + - name: dt + type: TIMESTAMP + - name: pdb_name + type: VARCHAR + - name: command + type: VARCHAR + - name: parsing_schema_name + type: VARCHAR + - name: instance_number + type: INTEGER + - name: cnt + type: INTEGER + - name: total_run_time_secs + type: DOUBLE diff --git a/tests/integration/assessments/manual_oracle_e2e_check.py b/tests/integration/assessments/manual_oracle_e2e_check.py new file mode 100644 index 0000000000..aef8778c69 --- /dev/null +++ b/tests/integration/assessments/manual_oracle_e2e_check.py @@ -0,0 +1,112 @@ +""" +Manual end-to-end fidelity check for the Oracle profiler DuckDB -> Delta ingest path. + +This is NOT a pytest test (the ``manual_`` prefix keeps pytest from collecting it). It requires a real +Databricks runtime (it uses ``databricks-connect``) and a real Oracle ``profiler_extract.db``, so it is +run by hand rather than in CI. It exercises the *shipped* ingest logic (``build_spark_schema`` + +``spark.createDataFrame``) against every table in the extract and asserts: + + * row counts match between DuckDB and Spark, + * DuckDB ``TIMESTAMP`` columns land as Spark ``TIMESTAMP_NTZ`` with wall-clock values unchanged, + * numeric (DOUBLE/BIGINT/INTEGER) column values round-trip value-exact. + +Usage (from the project root, using the project venv so the package + databricks-connect are importable): + + .venv/bin/python tests/integration/assessments/manual_oracle_e2e_check.py \ + --profile \ + --extract /path/to/profiler_extract.db + +Exit code is 0 on full fidelity, 1 otherwise. +""" + +import argparse +import sys +from pathlib import Path + +import duckdb +from pyspark.sql.types import DoubleType, FloatType, LongType, IntegerType, TimestampNTZType + +from databricks.connect import DatabricksSession +from databricks.labs.lakebridge.assessments.dashboards.execute import build_spark_schema + +# Column types whose values we compare for exact round-trip fidelity. +_NUMERIC_TYPES = (DoubleType, FloatType, LongType, IntegerType) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--profile", required=True, help="Databricks CLI profile to connect with.") + parser.add_argument( + "--extract", + required=True, + help="Path to the Oracle profiler_extract.db DuckDB file.", + ) + parser.add_argument( + "--no-serverless", + action="store_true", + help="Use the profile's configured cluster instead of serverless compute.", + ) + return parser.parse_args() + + +def _spark_session(profile: str, serverless: bool) -> DatabricksSession: + builder = DatabricksSession.builder.profile(profile) + if serverless: + builder = builder.serverless(True) + return builder.getOrCreate() + + +def _sorted_non_null(values: list) -> list: + return sorted(v for v in values if v is not None) + + +def _check_table(spark, con: duckdb.DuckDBPyConnection, table: str) -> bool: + """Ingest one table via the shipped path and compare it to the DuckDB source. Returns True if faithful.""" + relation = con.sql(f"SELECT * FROM {table}") + df = spark.createDataFrame(relation.df(), schema=build_spark_schema(relation.columns, relation.types)) + + duck_rows = con.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] + spark_rows = df.count() + rows_ok = duck_rows == spark_rows + + ntz_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, TimestampNTZType)] + numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, _NUMERIC_TYPES)] + + value_mismatches = [] + for col in ntz_cols + numeric_cols: + duck_vals = _sorted_non_null([r[0] for r in con.execute(f"SELECT {col} FROM {table}").fetchall()]) + spark_vals = _sorted_non_null([r[col] for r in df.select(col).collect()]) + exact = len(duck_vals) == len(spark_vals) and all(repr(a) == repr(b) for a, b in zip(duck_vals, spark_vals)) + if not exact: + value_mismatches.append(col) + + is_faithful = rows_ok and not value_mismatches + status = "OK" if is_faithful else "MISMATCH" + print(f"{table:30s} rows {duck_rows}=={spark_rows} {status} ntz={ntz_cols}") + if not rows_ok: + print(f" row count mismatch: duck={duck_rows} spark={spark_rows}") + for col in value_mismatches: + print(f" value mismatch in column: {col}") + return is_faithful + + +def main() -> int: + args = _parse_args() + extract_path = Path(args.extract).expanduser() + if not extract_path.exists(): + print(f"Extract not found: {extract_path}", file=sys.stderr) + return 1 + + spark = _spark_session(args.profile, serverless=not args.no_serverless) + all_ok = True + with duckdb.connect(database=str(extract_path), read_only=True) as con: + tables = [row[2] for row in con.execute("SHOW ALL TABLES").fetchall()] + for table in tables: + all_ok &= _check_table(spark, con, table) + + print(f"\nE2E fidelity: {'PASS' if all_ok else 'FAIL'} ({len(tables)} tables)") + return 0 if all_ok else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/integration/assessments/profiler_extract_utils.py b/tests/integration/assessments/profiler_extract_utils.py index af0eb35067..ea67e59d0c 100644 --- a/tests/integration/assessments/profiler_extract_utils.py +++ b/tests/integration/assessments/profiler_extract_utils.py @@ -1,11 +1,14 @@ from collections.abc import Callable from dataclasses import dataclass from datetime import datetime +from importlib import resources from pathlib import Path import duckdb from faker import Faker +ORACLE_RESOURCE_PACKAGE = "databricks.labs.lakebridge.resources.assessments.oracle" + @dataclass(frozen=True) class MockTableDefinition: @@ -181,3 +184,51 @@ def build_mock_synapse_extract(extract_db_name: str, path_prefix: Path) -> Path: builder.create_sample_data() builder.shutdown() return full_synapse_extract_path + + +def _oracle_sample_value(duckdb_type: str): + """Return a type-appropriate sample value for a DuckDB column type.""" + duckdb_type = duckdb_type.upper() + if duckdb_type == "TIMESTAMP": + return datetime(2026, 4, 3, 13, 55, 26) + if duckdb_type in {"INTEGER", "BIGINT"}: + return 1 + if duckdb_type == "DOUBLE": + return 1.5 + return "sample" # VARCHAR / fallback + + +def build_mock_oracle_extract(extract_db_name: str, path_prefix: Path) -> Path: + """ + Build a mock Oracle profiler extract whose table schemas come directly from the shipped Oracle + DDL resources. Because the tables are created from the same DDL the real profiler uses, validating + this extract against the shipped ``oracle_extract_schema.yml`` exercises DDL <-> schema-definition + consistency without needing a live Oracle database. + + Each table is seeded with rows so empty-table checks pass. ``config_db_features.inst_id`` gets a row + with a NULL value to mirror the real extract's nullable-integer edge case. + """ + oracle_extract_path = path_prefix + oracle_extract_path.mkdir(parents=True, exist_ok=False) + full_oracle_extract_path = oracle_extract_path / f"{extract_db_name}.db" + + ddl_files = sorted( + (f for f in resources.files(ORACLE_RESOURCE_PACKAGE).iterdir() if f.name.endswith("_ddl.sql")), + key=lambda f: f.name, + ) + with duckdb.connect(database=str(full_oracle_extract_path)) as conn: + for ddl_file in ddl_files: + conn.execute(ddl_file.read_text(encoding="utf-8")) + + for (table_name,) in conn.execute("SHOW TABLES").fetchall(): + columns = conn.execute( + "SELECT column_name, data_type FROM information_schema.columns " + f"WHERE table_name = '{table_name}' ORDER BY ordinal_position" + ).fetchall() + row = [_oracle_sample_value(data_type) for _, data_type in columns] + # Mirror the real extract: config_db_features.inst_id (INTEGER) contains NULLs. + null_row = [None if name == "inst_id" else value for (name, _), value in zip(columns, row)] + placeholders = ", ".join(["?"] * len(row)) + conn.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", row) + conn.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", null_row) + return full_oracle_extract_path diff --git a/tests/integration/assessments/test_oracle_validation.py b/tests/integration/assessments/test_oracle_validation.py new file mode 100644 index 0000000000..3e49c0f675 --- /dev/null +++ b/tests/integration/assessments/test_oracle_validation.py @@ -0,0 +1,103 @@ +""" +Oracle profiler extract validation tests. + +These build a mock Oracle extract from the *shipped* Oracle DDL resources and validate it against the +*shipped* ``oracle_extract_schema.yml``. They therefore guard two things at once: that the validation +schema definition stays consistent with the extract DDL, and that the validator accepts a conformant +Oracle extract. They are pure-DuckDB (no Spark / workspace required). +""" + +import tempfile +from collections.abc import Generator +from importlib import resources +from pathlib import Path + +import duckdb +import pytest + +import databricks.labs.lakebridge.resources.assessments as assessment_resources +from databricks.labs.lakebridge.assessments.profiler_validator import ( + EmptyTableValidationCheck, + ExtractSchemaValidationCheck, + build_validation_report, +) +from .profiler_extract_utils import build_mock_oracle_extract + +ORACLE_TABLES = [ + "config_containers", + "config_db_features", + "config_instance", + "config_memory_evolution", + "config_pdb_objects", + "config_pdb_partitions", + "config_storage", + "perf_cpu_waits", + "perf_fgd_session_evolution", + "perf_heatmap", + "perf_sqltext", +] + + +@pytest.fixture(scope="session") +def mock_oracle_profiler_extract() -> Generator[Path]: + with tempfile.TemporaryDirectory(prefix="lakebridge_test_") as temp_dir: + extract_dir = Path(temp_dir) / "oracle_assessment" + yield build_mock_oracle_extract("mock_oracle_extract", path_prefix=extract_dir) + + +def _oracle_schema_def() -> resources.abc.Traversable: + return resources.files(assessment_resources).joinpath("validation/oracle_extract_schema.yml") + + +def test_oracle_extract_matches_shipped_schema(mock_oracle_profiler_extract: Path) -> None: + """Every Oracle table created from the shipped DDL conforms to the shipped schema definition.""" + with ( + resources.as_file(_oracle_schema_def()) as schema_path, + duckdb.connect(database=mock_oracle_profiler_extract) as duck_conn, + ): + checks = [ + ExtractSchemaValidationCheck( + "main", + table, + source_tech="oracle", + extract_path=str(mock_oracle_profiler_extract), + schema_path=str(schema_path), + ) + for table in ORACLE_TABLES + ] + report = build_validation_report(checks, duck_conn) + + failures = [r for r in report if r.outcome == "FAIL"] + assert not failures, f"Unexpected schema validation failures: {failures}" + assert len(report) == len(ORACLE_TABLES) + + +def test_oracle_tables_are_not_empty(mock_oracle_profiler_extract: Path) -> None: + with duckdb.connect(database=mock_oracle_profiler_extract) as duck_conn: + checks = [EmptyTableValidationCheck(f"main.{table}") for table in ORACLE_TABLES] + report = build_validation_report(checks, duck_conn) + assert all(r.outcome == "PASS" for r in report), [r for r in report if r.outcome != "PASS"] + + +def test_oracle_schema_mismatch_is_detected() -> None: + """A column whose type drifts from the schema definition must be reported as FAIL.""" + with tempfile.TemporaryDirectory(prefix="lakebridge_test_") as temp_dir: + extract = build_mock_oracle_extract("mock_oracle_mismatch", path_prefix=Path(temp_dir) / "oracle_assessment") + with ( + resources.as_file(_oracle_schema_def()) as schema_path, + duckdb.connect(database=extract) as duck_conn, + ): + # config_instance.inst_id is INTEGER in the schema definition; force a mismatch. + duck_conn.execute("ALTER TABLE config_instance ALTER inst_id TYPE VARCHAR") + check = ExtractSchemaValidationCheck( + "main", + "config_instance", + source_tech="oracle", + extract_path=str(extract), + schema_path=str(schema_path), + ) + report = build_validation_report([check], duck_conn) + + assert len(report) == 1 + assert report[0].outcome == "FAIL" + assert report[0].summary == "Unexpected column data type" diff --git a/tests/unit/assessment/test_ingest_schema.py b/tests/unit/assessment/test_ingest_schema.py new file mode 100644 index 0000000000..6941282e58 --- /dev/null +++ b/tests/unit/assessment/test_ingest_schema.py @@ -0,0 +1,57 @@ +""" +Unit tests for the DuckDB -> Spark schema mapping used when ingesting a profiler extract into Delta. + +The key behaviour these lock in: DuckDB's timezone-naive ``TIMESTAMP`` maps to Spark ``TIMESTAMP_NTZ`` +(preserving the wall-clock value) rather than the timezone-dependent ``TIMESTAMP`` (LTZ) that Spark's +pandas schema inference would otherwise pick. Unmapped types fall back to inference (return ``None``). +""" + +import duckdb +from pyspark.sql.types import ( + DoubleType, + IntegerType, + LongType, + StringType, + StructType, + TimestampNTZType, + TimestampType, +) + +from databricks.labs.lakebridge.assessments.dashboards.execute import build_spark_schema + + +def _schema_from_sql(select_sql: str) -> StructType | None: + with duckdb.connect() as conn: + relation = conn.sql(select_sql) + return build_spark_schema(relation.columns, relation.types) + + +def test_oracle_scalar_types_map_to_expected_spark_types() -> None: + schema = _schema_from_sql(""" + SELECT + CAST('x' AS VARCHAR) AS v, + CAST(1 AS INTEGER) AS i, + CAST(1 AS BIGINT) AS b, + CAST(1.5 AS DOUBLE) AS d, + CAST('2026-04-03 13:55:26' AS TIMESTAMP) AS t + """) + assert schema is not None + by_name = {f.name: f.dataType for f in schema.fields} + assert isinstance(by_name["v"], StringType) + assert isinstance(by_name["i"], IntegerType) + assert isinstance(by_name["b"], LongType) + assert isinstance(by_name["d"], DoubleType) + # The fidelity-critical mapping: naive TIMESTAMP -> TIMESTAMP_NTZ, not LTZ. + assert isinstance(by_name["t"], TimestampNTZType) + + +def test_timestamptz_maps_to_ltz() -> None: + schema = _schema_from_sql("SELECT CAST('2026-04-03 13:55:26+00' AS TIMESTAMP WITH TIME ZONE) AS t") + assert schema is not None + assert isinstance(schema.fields[0].dataType, TimestampType) + + +def test_unmapped_type_falls_back_to_inference() -> None: + # A nested STRUCT column has no explicit mapping, so the whole table falls back to inference. + schema = _schema_from_sql("SELECT {'a': 1}::STRUCT(a INTEGER) AS s") + assert schema is None