Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ commands:
flags:
- name: source-tech
description: (Optional) The technology/platform of the sources to Profile
- name: output-folder
description: (Optional) Folder where the extracted DuckDB file is written; if omitted, prompts with a smart default

- name: test-profiler-connection
description: (Internal) Test the connection to the source database for profiling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@
import duckdb
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.types import (
BooleanType,
ByteType,
DataType,
DateType,
DoubleType,
FloatType,
IntegerType,
LongType,
ShortType,
StringType,
StructField,
StructType,
TimestampNTZType,
TimestampType,
)
from yaml.parser import ParserError
from yaml.scanner import ScannerError

Expand All @@ -32,6 +48,48 @@ class ExtractIngestionError(Exception):
"""Raised when the profiler extract ingestion fails due to unexpected errors."""


# Maps DuckDB column types (as reported by a DuckDB relation) to Spark types so that the
# DuckDB -> pandas -> Spark -> Delta hop preserves types exactly instead of relying on Spark's
# pandas-based schema inference. The important case is TIMESTAMP: DuckDB's TIMESTAMP is timezone
# *naive* (a wall-clock value), but inference yields a timezone-dependent TIMESTAMP (LTZ) that
# reinterprets the value in the session time zone and shifts it. Mapping it to TIMESTAMP_NTZ keeps
# the original wall-clock value. TIMESTAMP WITH TIME ZONE stays LTZ, which is correct for instants.
_DUCKDB_TO_SPARK_TYPE: dict[str, DataType] = {
"BOOLEAN": BooleanType(),
"TINYINT": ByteType(),
"SMALLINT": ShortType(),
"INTEGER": IntegerType(),
"BIGINT": LongType(),
"FLOAT": FloatType(),
"DOUBLE": DoubleType(),
"VARCHAR": StringType(),
"DATE": DateType(),
"TIMESTAMP": TimestampNTZType(),
"TIMESTAMP WITH TIME ZONE": TimestampType(),
}


def build_spark_schema(columns: Sequence[str], duckdb_types: Sequence[object]) -> StructType | None:
"""
Build an explicit Spark schema from a DuckDB relation's column names and types.

Returns ``None`` if any column has a DuckDB type that is not in ``_DUCKDB_TO_SPARK_TYPE``
(e.g. nested STRUCT/LIST/MAP columns produced by other profilers). In that case the caller
falls back to Spark's schema inference for the whole table, preserving existing behavior.
"""
fields: list[StructField] = []
for name, duck_type in zip(columns, duckdb_types):
spark_type = _DUCKDB_TO_SPARK_TYPE.get(str(duck_type).upper())
if spark_type is None:
logger.info(
f"Column '{name}' has unmapped DuckDB type '{duck_type}'; "
"falling back to Spark schema inference for this table."
)
return None
fields.append(StructField(name, spark_type, True))
return StructType(fields)


def main(*argv: str) -> None:
"""Lakeview Jobs task entry point: profiler_dashboards"""
initialize_logging()
Expand Down Expand Up @@ -169,15 +227,24 @@ def _ingest_profiler_tables(catalog_name: str, schema_name: str, extract_locatio
def _ingest_table(extract_location: str, source_table_name: str, target_table_name: str) -> None:
"""
Ingest a table from a DuckDB profiler extract into a managed Delta table in Unity Catalog.

An explicit Spark schema is derived from the DuckDB column types so that values are preserved
exactly across the DuckDB -> pandas -> Spark -> Delta hop. In particular, DuckDB's timezone-naive
TIMESTAMP is mapped to Spark TIMESTAMP_NTZ to keep the wall-clock value; relying on schema
inference would produce a timezone-dependent TIMESTAMP (LTZ) and shift the values. For tables
containing column types we do not map (e.g. nested STRUCT columns from other profilers) we fall
back to schema inference.
"""
spark = SparkSession.builder.getOrCreate()
try:
with duckdb.connect(database=extract_location, read_only=True) as duck_conn:
query = f"SELECT * FROM {source_table_name}"
pdf = duck_conn.execute(query).df()
# Save table as a managed Delta table in Unity Catalog
relation = duck_conn.sql(f"SELECT * FROM {source_table_name}")
# An explicit schema preserves types exactly; when a column type is unmapped it is None
# and we fall back to Spark's pandas schema inference.
spark_schema = build_spark_schema(relation.columns, relation.types)
pdf = relation.df()
logger.info(f"Saving profiler table '{target_table_name}' to Unity Catalog.")
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(pdf)
df = spark.createDataFrame(pdf, schema=spark_schema) if spark_schema else spark.createDataFrame(pdf)
df.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
except duckdb.CatalogException as e:
logger.error(f"Could not find source table '{source_table_name}' in profiler extract: {e}")
Expand Down
9 changes: 5 additions & 4 deletions src/databricks/labs/lakebridge/assessments/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class StepExecutionResult:


class PipelineClass:
def __init__(self, config: PipelineConfig, executor: DatabaseManager | None):
def __init__(self, config: PipelineConfig, executor: DatabaseManager | None, output_folder: Path):
self.config = config
self.executor = executor
self._db_path_prefix = Path(config.extract_folder).expanduser()
self._db_path_prefix = output_folder.expanduser()
self._create_dir(self._db_path_prefix)
self._db_path = str(self._db_path_prefix / DB_NAME)

Expand Down Expand Up @@ -145,7 +145,6 @@ def _execute_ddl_step(self, step: Step):
raise RuntimeError(f"DDL execution failed: {str(e)}") from e

def _execute_python_step(self, step: Step):

logging.debug(f"Executing Python script: {step.extract_source}")
credential_config = str(cred_file("lakebridge"))
venv_path_prefix = Path.home() / ".databricks" / "labs" / "lakebridge_profilers"
Expand Down Expand Up @@ -328,7 +327,9 @@ def load_config_from_yaml(file_path: str | Path) -> PipelineConfig:
data = yaml.safe_load(file)
steps = [Step(**step) for step in data['steps']]
return PipelineConfig(
name=data['name'], version=data['version'], extract_folder=data['extract_folder'], steps=steps
name=data['name'],
version=data['version'],
steps=steps,
)

@staticmethod
Expand Down
24 changes: 12 additions & 12 deletions src/databricks/labs/lakebridge/assessments/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
logger = logging.getLogger(__name__)


def default_output_folder(platform: str) -> Path:
return Path.home() / ".databricks" / "labs" / "lakebridge_profilers" / f"{platform}_assessment"


class Profiler:

def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None):
Expand All @@ -26,17 +30,11 @@ def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None

@classmethod
def create(cls, platform: str) -> "Profiler":
pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.get(platform, None)
pipeline_config = None
if pipeline_config_path:
pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path)
pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path)
pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG[platform]
pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path)
pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path)
return cls(platform, pipeline_config)

@classmethod
def supported_platforms(cls) -> list[str]:
return list(PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.keys())

@staticmethod
def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig:
# TODO: Choose a better name for this.
Expand All @@ -49,13 +47,15 @@ def profile(
*,
extractor: DatabaseManager | None = None,
pipeline_config: PipelineConfig | None = None,
output_folder: Path | None = None,
) -> None:
platform = self._platform.lower()
if not pipeline_config:
if not self._pipeline_config:
raise ValueError(f"Cannot Proceed without a valid pipeline configuration for {platform}")
pipeline_config = self._pipeline_config
self._execute(platform, pipeline_config, extractor)
resolved_output_folder = output_folder or default_output_folder(platform)
self._execute(platform, pipeline_config, resolved_output_folder, extractor)

@staticmethod
def _setup_extractor(platform: str) -> DatabaseManager | None:
Expand All @@ -65,12 +65,12 @@ def _setup_extractor(platform: str) -> DatabaseManager | None:
connect_config = cred_manager.get_credentials(platform)
return DatabaseManager(platform, connect_config)

def _execute(self, platform: str, pipeline_config: PipelineConfig, extractor=None) -> None:
def _execute(self, platform: str, pipeline_config: PipelineConfig, output_folder: Path, extractor=None) -> None:
try:
if extractor is None:
extractor = Profiler._setup_extractor(platform)

result = PipelineClass(pipeline_config, extractor).execute()
result = PipelineClass(pipeline_config, extractor, output_folder).execute()
logger.info(f"Profile execution has completed successfully for {platform} for more info check: {result}.")
except FileNotFoundError as e:
logger.error(f"Configuration file not found for source {platform}: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def copy(self, /, **changes) -> "Step":
class PipelineConfig:
name: str
version: str
extract_folder: str
comment: str | None = None
steps: list[Step] = field(default_factory=list)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import os
from dataclasses import dataclass
from collections.abc import Sequence
from pathlib import Path

import yaml
from duckdb import DuckDBPyConnection, CatalogException, ParserException, Error
from pyspark.sql import DataFrame, SparkSession

from databricks.labs.lakebridge.assessments.pipeline import PipelineClass

PROFILER_DB_NAME = "profiler_extract.db"


Expand Down Expand Up @@ -202,20 +198,6 @@ def validate(self, connection) -> ValidationOutcome:
)


def get_profiler_extract_path(pipeline_config_path: Path) -> Path:
"""
Returns the filesystem path of the profiler extract database.
input:
pipeline_config_path: the location of the pipeline definition .yml file
returns:
the filesystem path to the profiler extract database
"""
pipeline_config = PipelineClass.load_config_from_yaml(pipeline_config_path)
normalized_db_path = os.path.normpath(os.path.expanduser(pipeline_config.extract_folder))
database_path = Path(normalized_db_path) / PROFILER_DB_NAME
return database_path


def build_validation_report(
validations: Sequence[ValidationStrategy], connection: DuckDBPyConnection
) -> list[ValidationOutcome]:
Expand Down
27 changes: 23 additions & 4 deletions src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from databricks.labs.lakebridge.assessments.configure_assessment import create_assessment_configurator
from databricks.labs.lakebridge.assessments import PROFILER_SOURCE_SYSTEM, PRODUCT_NAME
from databricks.labs.lakebridge.assessments.profiler import Profiler
from databricks.labs.lakebridge.assessments.profiler import Profiler, default_output_folder

from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1
from databricks.labs.lakebridge.contexts.application import ApplicationContext
Expand Down Expand Up @@ -1001,7 +1001,11 @@ def llm_transpile(


@lakebridge.command()
def execute_database_profiler(w: WorkspaceClient, source_tech: str | None = None) -> None:
def execute_database_profiler(
w: WorkspaceClient,
source_tech: str | None = None,
output_folder: str | None = None,
) -> None:
"""Execute the Profiler Extraction for the given source technology"""
ctx = ApplicationContext(w)
ctx.add_user_agent_extra("cmd", "execute-profiler")
Expand All @@ -1024,10 +1028,25 @@ def execute_database_profiler(w: WorkspaceClient, source_tech: str | None = None
f"Connection details not found. Please run `databricks labs lakebridge configure-database-profiler` "
f"to set up connection details for {source_tech}."
)
profiler = Profiler.create(source_tech)

if output_folder is None:
output_folder = prompts.question(
"Enter the output folder where the profile extract will be written",
default=str(default_output_folder(source_tech)),
).strip()
_validate_output_folder(Path(output_folder))

profiler = Profiler.create(source_tech)
# TODO: Add extractor logic to ApplicationContext instead of creating inside the Profiler class
profiler.profile()
profiler.profile(output_folder=Path(output_folder))


def _validate_output_folder(output_folder: Path) -> None:
if not output_folder.parts:
raise ValueError("Output folder cannot be empty")
parent = output_folder.expanduser().parent
if not parent.exists():
raise ValueError(f"Invalid path for '--output-folder', parent does not exist for: {output_folder}")


@lakebridge.command()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: legacy_synapse_assessment
version: "1.0"
extract_folder: "/tmp/data/legacy_synapse_assessment"
steps:
- name: columns
type: sql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: mssql_assessment
version: "1.0"
extract_folder: "/tmp/data/mssql_assessment"
steps:
- name: sys_info
type: ddl
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: oracle_assessment
version: "1.0"
extract_folder: "/tmp/data/oracle_assessment"
steps:
- name: config_containers
type: ddl
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: synapse_assessment
version: "1.0"
extract_folder: "~/.databricks/labs/lakebridge_profilers/synapse_assessment"
steps:
- name: workspace_info
type: python
Expand Down
Loading
Loading