Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions docs/lakebridge/docs/reconcile/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,83 @@ Place the file in `.lakebridge/` in your Databricks workspace home folder.

---

## Fingerprint Pre-check (Experimental)

The fingerprint pre-check is an opt-in optimisation that, when source and target are
already in sync, replaces the per-row hash-and-join pipeline with a sub-bucket-level
aggregate comparison. On a 1 M-row Redshift fixture in MATCH state, the precheck
short-circuits the recon at roughly **30–45 % of the v3 baseline wall-clock**; on
MISMATCH it surgically fetches only the differing rows instead of streaming the full
column set across JDBC. When neither the MATCH nor the surgical-fetch path applies the
recon falls open to the full pipeline so correctness is never compromised.

### When to enable

- The source / target tables are expected to be **mostly identical** (post-migration,
ongoing CDC) — the precheck pays off most when MATCH is the common outcome.
- The runtime targets **DBR 17.3 or later**. The Stage-2 source-side fetch uses
Databricks' `remote_query()` table-valued function which requires DBR 17.3+. On
earlier runtimes, leave the flag off — the eligibility gate doesn't yet check DBR
version, so the JDBC call would fail mid-fetch and trigger the fail-open path.
- The source dialect has a registered fingerprint query builder. Today only
`redshift` is wired. Other dialects fall through to the full pipeline silently.

### Configuration

Add to `recon_config_*.json` at the top level:

```yaml
fingerprint_precheck: true
# Optional. False (default) keeps '' distinct from NULL in fingerprint hashing,
# matching the row-hash compare path in expression_generator. Flip to True only if
# your data treats '' and NULL as the same value AND you have audited the impact;
# the flag is wired symmetrically across both source-side Redshift SQL and the
# target-side Spark Stage-1 / Stage-2 serialisers so the two cannot drift.
fingerprint_treat_empty_as_null: false
# Optional. When set, overrides the target Delta DESCRIBE DETAIL numRecords
# lookup used to pick the sub-bucket tier. Use this when the target is non-Delta
# (DESCRIBE DETAIL returns no numRecords) or when Delta stats are stale and the
# heuristic lands on a tier that is too coarse / too fine for your workload.
# Values <= 0 are treated as unset.
fingerprint_row_count_override: 250000000
```

### Eligibility rules

The pre-check declines (and the recon proceeds via the full pipeline) when **any** of
the conditions below hold. These reasons are recorded in
`recon_metrics.fingerprint_metrics.ineligibility_reason` so adoption queries can
distinguish "feature off" from "feature on but table ineligible".

| Reason value | Meaning |
| --- | --- |
| `flag_disabled` | `fingerprint_precheck` is false (default). |
| `unsupported_dialect` | The source dialect has no registered fingerprint query builder (today: anything other than `redshift`). |
| `report_type_not_data` | `report_type` is `schema` (precheck operates on data-level reconciles). |
| `no_join_columns` | `join_columns` is empty. The precheck needs primary-key columns to disambiguate culprit rows during Stage-2. |
| `filters_configured` | `filters.source` or `filters.target` is set. The precheck does not project filter predicates into Stage-1 aggregates yet. |
| `transforms_configured` | `transformations` is set. Custom transformations are not supported on the fingerprint hash path. |
| `column_thresholds_configured` | `column_thresholds` is set. Threshold semantics conflict with exact-hash comparison. |
| `table_thresholds_configured` | `table_thresholds` is set. Same rationale as column thresholds. |

A separate runtime gate validates that every `column_mapping.target_name` resolves to
a real target column before issuing the source-side scan — a typo in the mapping is
caught at eligibility time, not after Stage-1 has already pulled across JDBC.

### Tuning and observability

- The pre-check selects a sub-bucket / bucket count adaptively from the target's
Delta `numRecords` (DESCRIBE DETAIL). On a non-Delta target, or when the metric
is missing, it falls back to a static `(1 048 576, 32 768)` pair. Override
explicitly via `ReconcileConfig.fingerprint_row_count_override` (an
approximate target row count) when the heuristic picks a tier that you can
show is wrong for your workload.
- Every recon writes a `fingerprint_metrics` named-struct to
`recon_metrics.fingerprint_metrics` regardless of eligibility, so adoption,
fall-open rate, and verdict distribution can be tracked from one query.

---

## TABLE Config Schema

<Tabs>
Expand Down
12 changes: 12 additions & 0 deletions docs/lakebridge/docs/reconcile/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ The User configuring reconcile must have permission to:
- `USE CATALOG` and `CREATE SCHEMA` on the target catalog
- `CREATE VOLUME` if using a pre-existing schema on a serverless cluster

### Runtime requirements

- The job / interactive cluster running reconcile must be on **DBR 15.4 LTS or later**
for the standard data-comparison path.
- If `fingerprint_precheck` is enabled (see
[Configuration Reference → Fingerprint Pre-check](/docs/reconcile/configuration#fingerprint-pre-check-experimental)),
the cluster must be on **DBR 17.3 or later**. The Stage-2 source-side fetch uses
Databricks' `remote_query()` table-valued function, which became available on DBR
17.3. On earlier runtimes, leave the flag off; otherwise the JDBC call fails
mid-fetch and the recon falls open to the full pipeline (correct, but the precheck
buys you nothing while paying for itself in cluster time).

### Serverless cluster support

Reconcile automatically detects the cluster type and optimizes intermediate data persistence accordingly:
Expand Down
35 changes: 34 additions & 1 deletion src/databricks/labs/lakebridge/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,27 @@ class ReconcileJobConfig:
@dataclass
class ReconcileConfig:
__file__ = "reconcile.yml"
__version__ = 2
__version__ = 3

report_type: str
source: SourceConnectionConfig
target: TargetConnectionConfig
metadata_config: ReconcileMetadataConfig
job_overrides: ReconcileJobConfig | None = None
fingerprint_precheck: bool = False
# When True, fingerprint hashing collapses '' to NULL on BOTH source and target
# sides — flipped here so the Stage-1 / Stage-2 serialisers cannot drift apart
# (without this knob, target call sites silently kept the function default of
# False while source picked up a constant override). Staying False matches the
# row-hash compare path in ``expression_generator``.
fingerprint_treat_empty_as_null: bool = False
# Optional explicit row count for fingerprint tier selection. When set,
# overrides Delta ``DESCRIBE DETAIL`` numRecords lookup so customers whose
# target is non-Delta (or whose Delta stats are stale) can pick the right
# sub-bucket tier without waiting for a full COUNT(*). ``None`` keeps the
# default heuristic. Values ``<= 0`` are treated as "unset" by
# ``fetch_target_row_count``.
fingerprint_row_count_override: int | None = None

@classmethod
def v1_migrate(cls, raw: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -314,6 +328,25 @@ def v1_migrate(cls, raw: dict[str, Any]) -> dict[str, Any]:
raw["version"] = 2
return raw

@classmethod
def v2_migrate(cls, raw: dict[str, Any]) -> dict[str, Any]:
"""v2 → v3: introduce the source-agnostic ``fingerprint_precheck`` flag.

Older field names (``redshift_fingerprint_precheck``, ``use_fingerprint_precheck``)
from internal pre-v2 deployments are folded into the new flag if present;
otherwise the field defaults to ``False`` so existing v2 configs keep their
current behaviour.
"""
if "fingerprint_precheck" not in raw:
for legacy in ("redshift_fingerprint_precheck", "use_fingerprint_precheck"):
if legacy in raw:
raw["fingerprint_precheck"] = raw.pop(legacy)
break
for legacy in ("redshift_fingerprint_precheck", "use_fingerprint_precheck"):
raw.pop(legacy, None)
raw["version"] = 3
return raw

@property
def database_config(self) -> DatabaseConfig:
"""TODO remove. this was kept for backwards compatibility while migrating to ReconcileConfig v2"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Fingerprint-accelerated reconciliation."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Shared constants and SQL helpers for fingerprint detection and row filtering."""

from __future__ import annotations

# Must match the row-hash path's NULL stand-in (``'_null_recon_'`` literal in
# ``reconcile/query_builder/expression_generator.py``). Fingerprint and row-hash
# both encode NULLs into the per-column hash payload before MD5/SHA — picking a
# different stand-in here would alias real data ``'_null_recon_'`` with NULL on
# only one side and produce the inverse alias on the other, so any row that
# happens to carry either literal would be silently misclassified by Stage-1.
# A unit test pins this to the row-hash literal so a future drift fails CI
# rather than the reconcile.
NULL_SENTINEL = "_null_recon_"

# chr(1) — column separator inside the MD5 concat. Rendered three ways:
# Redshift SQL: CHR(1)
# Spark SQL: CHAR(1)
# Python / Spark DataFrame: "\x01"
SEPARATOR_PYTHON = "\x01"
SEPARATOR_REDSHIFT_SQL = "CHR(1)"
SEPARATOR_SPARK_SQL = "CHAR(1)"

# Static defaults retained for backwards compatibility and as the fallback when the
# adaptive selector has no row count to work with.
SUB_BUCKET_COUNT = 1_048_576 # 1M sub-buckets
BUCKET_COUNT = 32_768

# Adaptive tier table. Each entry: (max_row_count_inclusive, sub_bucket_count, bucket_count).
# Last entry's max_row_count is None and clamps everything larger. Sub-bucket counts are
# powers of 2 to keep MOD distribution clean; bucket count = sub_bucket_count / 1024.
SUB_BUCKET_TIERS: tuple[tuple[int | None, int, int], ...] = (
(50_000, 16_384, 128), # < 50K
(500_000, 262_144, 512), # 50K – 500K
(50_000_000, 1_048_576, 1_024), # 500K – 50M
(500_000_000, 2_097_152, 2_048), # 50M – 500M
(5_000_000_000, 4_194_304, 4_096), # 500M – 5B
(50_000_000_000, 8_388_608, 8_192), # 5B – 50B
(None, 16_777_216, 16_384), # 50B+
)


def pick_sub_bucket_count(row_count: int | None) -> tuple[int, int]:
"""Select (sub_bucket_count, bucket_count) for ``row_count``.

Falls back to (SUB_BUCKET_COUNT, BUCKET_COUNT) when the count is unknown or
non-positive, so callers can pass None safely.

>>> pick_sub_bucket_count(10_000)
(16384, 128)
>>> pick_sub_bucket_count(100_000_000)
(2097152, 2048)
>>> pick_sub_bucket_count(None)
(1048576, 32768)
"""
if row_count is None or row_count <= 0:
return SUB_BUCKET_COUNT, BUCKET_COUNT
for max_row_count, sub_buckets, buckets in SUB_BUCKET_TIERS:
if max_row_count is None or row_count <= max_row_count:
return sub_buckets, buckets
return SUB_BUCKET_COUNT, BUCKET_COUNT


def build_fingerprint_where_clause(
sb_expr: str,
rh1_expr: str,
solved_hashes: dict[int, list[int]],
unsolved_sb_ids: list[int],
) -> str:
"""Build the WHERE body (no ``WHERE``, no trailing alias) for a filtered fetch.

Emits the union form ``(sb_expr IN (..) AND rh1_expr IN (..)) [OR sb_expr IN (..)]``.
The form is mathematically equivalent to per-sub-bucket disjuncts because
``sb_id = ABS(MOD(rh1, N))`` is invariant, but stays ``O(|sb_expr| + |IN list|)``
instead of ``O(k · |sb_expr|)`` so it stays under Redshift's 16 MB statement
limit even on workloads with millions of solved sub-buckets.

Raises ``ValueError`` when both filter inputs are empty: callers must gate the
fetch (eligibility check in the orchestrator) before reaching this helper. An
empty result here would interpolate to ``WHERE )`` downstream — fail-loud beats
silently emitting a syntactically broken query that fail-open would mask.
"""
if not solved_hashes and not unsolved_sb_ids:
raise ValueError(
"build_fingerprint_where_clause requires at least one of solved_hashes "
"or unsolved_sb_ids to be non-empty; the empty case must be filtered "
"out by the caller before issuing a fetch."
)
conditions: list[str] = []
# Sort all IN-list operands for deterministic SQL across dict / list iteration
# orders — helps query-plan caching and unit-test diffing.
if solved_hashes:
sb_list = ", ".join(str(sb_id) for sb_id in sorted(solved_hashes))
hash_list = ", ".join(str(h) for h in sorted({h for hs in solved_hashes.values() for h in hs}))
conditions.append(f"({sb_expr} IN ({sb_list}) AND {rh1_expr} IN ({hash_list}))")
if unsolved_sb_ids:
sb_list = ", ".join(str(sb_id) for sb_id in sorted(unsolved_sb_ids))
conditions.append(f"{sb_expr} IN ({sb_list})")
return " OR ".join(conditions)
Loading