Add fingerprint pre-check to Recon (Redshift)#2490
Open
ameersalman33 wants to merge 1 commit into
Open
Conversation
|
|
10 tasks
## Changes ### What does this PR do? Adds an opt-in fingerprint pre-check to Recon. When `fingerprint_precheck=True` and the source has a registered query builder, Recon runs a sketch-based detection pass (MD5-sub-bucketed aggregates over both sides) before the row-hash compare pipeline. - MATCH -> Recon short-circuits in seconds; no full table scan, no JOIN. - MISMATCH -> an algebraic solver returns the differing row hashes; a surgical Stage-2 fetch pulls just those rows and feeds them into the existing `compare.reconcile_data` flow. If the mismatch is systemic (>15% of sub-buckets), the precheck defers to the existing pipeline. - Ineligible -> falls through silently. The flag defaults to False; existing behaviour is unchanged. The algorithm is byte-identical to the dataprint sketch-based reconciliation library; this is the first dataprint-into-lakebridge integration. Redshift is the first dialect — adding Snowflake / Oracle / TSQL is one `FingerprintQueryBuilder` subclass plus one registry entry. ### Relevant implementation details - `trigger_recon_service._run_fingerprint_or_reconcile_data` is the single decision point. Static eligibility centralised in `classify_ineligibility`; the schema-dependent `unmapped_target_column_mapping` reason is raised by `align_columns` as a typed exception and routed through `FingerprintRunMetadata.ineligible(...)`. Every reason maps to an `IneligibilityReason` enum value and is recorded on `recon_metrics.fingerprint_metrics.ineligibility_reason`. - Source-side reads use upstream's `RemoteQueryReader` / `remote_query()` TVF unmodified; Stage-1 aggregation pushdown verified empirically on a 1 M-row Redshift fixture (DBR 17.3). - Stage-1 detection is parallelised across source / target via a 2-thread pool; failure semantics match the serial version. - Three new fields on `ReconcileConfig`: `fingerprint_precheck`, `fingerprint_treat_empty_as_null`, `fingerprint_row_count_override`. - Config version bumps 2 -> 3 with a `v2_migrate` that folds two legacy spellings (`redshift_fingerprint_precheck`, `use_fingerprint_precheck`) into the new flag. Existing deployments upgrade automatically. ### Pre-existing fixes that ride along (upstream PR databrickslabs#2339) Two correctness bugs in the upstream Redshift connector MR (databrickslabs#2339) surfaced during the dataprint integration P0 / P1 runs against a real cluster. Both crash the existing row-hash recon path on real customer schemas and are unrelated to dataprint, but they sat in the integration path so they are fixed inline. Both fixes live in `reconcile/query_builder/expression_generator.py` and are pinned by regression tests in `test_expression_generator.py`. - **Databricks block missing TIMESTAMP / TIMESTAMPTZ handler.** Redshift's source-side transform emits `COALESCE(TO_CHAR(ts, 'YYYY-MM-DD HH24:MI:SS.US'), '_null_recon_')` (always 6 fractional digits), but the Databricks block had no override, so the target side fell through to the universal default `TRIM(COALESCE(col, '_null_recon_'))` — Spark emits a variable-length fractional component, omitted entirely for zero-microsecond timestamps. The byte-width drift made per-row SHA2 disagree for every TIMESTAMP / TIMESTAMPTZ row in any Redshift -> Databricks reconcile. Fix: add `COALESCE(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '_null_recon_')` so source and target are byte-identical. - **Redshift block missing BOOLEAN handler.** The Redshift block defined overrides only for SUPER / DATE / TIMESTAMP / TIMESTAMPTZ and had no dialect-level `default`. BOOLEAN columns fell through to the universal default `TRIM(COALESCE(col, '_null_recon_'))`, which Redshift rejects during output schema resolution with `function pg_catalog.btrim(boolean) does not exist`. Any customer schema containing a single BOOLEAN column crashes row-hash recon end-to-end. Fix: explicit `COALESCE(CASE WHEN col THEN 'true' WHEN NOT col THEN 'false' ELSE NULL END, '_null_recon_')` so the rendered string matches Spark's `cast(boolean AS string)` byte-for-byte. ### Hardening from the internal review round After the initial internal review on the contributor's fork, three substantive code changes landed before pushing upstream: - **Pin TZ-aware Spark target columns to UTC** before formatting in `fingerprint/spark_target.py`. The Redshift side already pinned UTC via `TO_CHAR(_ AT TIME ZONE 'UTC', _)`; the Spark side was using `DATE_FORMAT(ts, _)` which renders in `spark.sql.session.timeZone`. On a non-UTC cluster the same instant rendered different bytes on the two sides. Fix splits LTZ vs NTZ handling and routes LTZ through `TO_UTC_TIMESTAMP(_, CURRENT_TIMEZONE())`. NTZ behaviour unchanged. - **Stage-2 build failures fall through to the full pipeline** in `trigger_recon_service.py` instead of marking the table failed. Every other non-MATCH branch already does this; the `build_mismatch_output` exception path was the one inconsistency. Metadata still records `fallback_to_full_pipeline=True` for observability. - **Cast Redshift strings to `VARCHAR(65535)`** in `fingerprint/query_builders/redshift.py` instead of bare `VARCHAR`, whose default 256-byte width truncated long text. `VARCHAR(65535)` is Redshift's maximum and matches Spark's unbounded string semantics. Smaller cleanups: dropped unused `ColumnAlignment.exclude_columns`; reverted a no-op reorder in `connectors/source_adapter.py`; replaced a flaky wall-clock assertion in `test_fetch_parallel.py` with a deterministic distinct-thread-id assertion; pinned the exact rendered SQL on each dialect in `test_expression_generator.py` (instead of substring-checking two different patterns) and added a regression test for the Redshift `BOOLEAN` handler. ### Caveats - DBR 17.3+ required for source-side reads via `remote_query()` (inherited from upstream's `RemoteQueryReader` adoption). - MISMATCH-state cost at 1 M scale currently exceeds row-hash-only mode by 16-94 s because Stage-2 still feeds the existing JOIN. MATCH is the headline win (38.7% on 1 M rows); billion-row scale is the production motivation. Stage-1 hash persistence as Stage-2 input is filed as a follow-up. - Pre-existing `success_count` formula in `verify_successful_reconciliation` (upstream PR databrickslabs#2259, commit `e56c79c3d`) is mathematically wrong; sits next to fingerprint code in `trigger_recon_service.py`. Not fixed here to keep scope contained; filed separately. ### Tests - All unit tests on the touched surface pass; `tests/unit/reconcile/` runs 282 tests in <1 s. The 6 `test_cli_analyze.py` failures are pre-existing on main and unrelated. - Regression tests added for every review-round fix (UTC pin, fallback path, VARCHAR(65535)); `test_expression_generator.py` pins the exact rendered SQL on each dialect for the two pre-existing fixes. - Correctness validated end-to-end on a 1 M-row Redshift / Delta fixture across the 20-scenario dual-mode parity matrix: 39/40 cells PASS, 1 scenario shows a known fingerprint-solver fallback edge with verdict agreement on both sides — only the cap-bounded `mismatch` count differs (fingerprint reports the true 10000, normal reports the cap-50 sample). - Linter clean: pylint 10.00/10 on touched src; ruff, black, mypy green. - Integration coverage to follow alongside the recon e2e cluster fixture (databrickslabs#2453).
667494c to
282740a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Changes
What does this PR do?
Adds an opt-in fingerprint pre-check to Recon. When
fingerprint_precheck=Trueand the source has a registered query builder, Recon runs a sketch-based detection pass (MD5-sub-bucketed aggregates over both sides) before the row-hash compare pipeline.compare.reconcile_dataflow. If the mismatch is systemic (>15% of sub-buckets), the precheck defers to the existing row-hash pipeline.The flag defaults to
False; existing behaviour is unchanged.The algorithm is byte-identical to the dataprint sketch-based reconciliation library (Databricks-internal); this MR is the first dataprint-into-lakebridge integration. Redshift is the first source dialect — adding Snowflake / Oracle / TSQL is one
FingerprintQueryBuildersubclass plus one registry entry, with no orchestrator changes.Relevant implementation details
trigger_recon_service._run_fingerprint_or_reconcile_datais the single decision point. Static eligibility (flag off, unsupported dialect, report type not data, no join columns, filters / transformations / thresholds configured) is centralised inclassify_ineligibility(fingerprint/orchestrator.py) and runs before any source-side compute. One config-time reason —unmapped_target_column_mapping— is detected later, inside the precheck (because it requires the target schema), and surfaces via the typedUnmappedTargetColumnMappingErrorraised byalign_columns; the trigger catches it before the runtime-failure branch and routes it throughFingerprintRunMetadata.ineligible(...). Every reason is one of the values on theIneligibilityReasonenum and is recorded onrecon_metrics.fingerprint_metrics.ineligibility_reason, so adoption queries can count each cause without needing to grep cluster logs.RemoteQueryReader/remote_query()TVF unmodified — no new connector code. Stage-1 aggregation pushdown was verified empirically against a 1 M-row Redshift fixture on a DBR 17.3 cluster: both direct JDBC (4.28 s median) and the upstreamremote_query()TVF path (5.44 s median) push the GROUP BY to Redshift and return identical aggregated row counts.ThreadPoolExecutor(max_workers=2, thread_name_prefix="fp-stage2"). Failure semantics match the serial version; the trigger-layer catch wraps the whole block.ReconcileConfig:fingerprint_precheck: bool = False(the main flag)fingerprint_treat_empty_as_null: bool = False(collapse''to NULL in the per-column hash payload, end-to-end on both sides)fingerprint_row_count_override: int | None = None(pin the adaptive sub-bucket / bucket tier whenDESCRIBE DETAILcannot readnumRecords)ReconcileConfig.__version__bumps from2 -> 3. The newv2_migrateintroducesfingerprint_precheckand folds two legacy spellings (redshift_fingerprint_precheck,use_fingerprint_precheck) into it. Combined with upstream'sv1_migrate, the chainv1 -> v2 -> v3runs automatically viaInstallation.load(ReconcileConfig). Existing deployments upgrade with zero user action.Pre-existing fixes that ride along (upstream PR #2339)
While integrating dataprint end-to-end against Redshift on a real cluster, two pre-existing correctness bugs in the upstream Redshift connector MR (PR #2339) surfaced. Neither is dataprint-specific — both crash the existing row-hash recon path on real customer schemas — but they sat directly in the integration path, so they are fixed inline. Both fixes live in
reconcile/query_builder/expression_generator.pyand are pinned by regression tests.TIMESTAMP/TIMESTAMPTZtransform handlers to the Databricks dialect (target side)COALESCE(TO_CHAR(ts, 'YYYY-MM-DD HH24:MI:SS.US'), '_null_recon_')(always 6 fractional digits, fixed-width). The Databricks dialect block had no override forTIMESTAMP/TIMESTAMPTZ, so the target side fell through to the universal defaultTRIM(COALESCE(col, '_null_recon_')), where Spark's implicitcast(timestamp AS string)emits a variable-length fractional component (omitted entirely for zero-microsecond timestamps;'2023-10-02 18:08:43'vs source'2023-10-02 18:08:43.000000'). The byte-width drift made the per-row SHA2 disagree for every logically-identicalTIMESTAMP/TIMESTAMPTZrow in any Redshift -> Databricks reconcile. Fix: addCOALESCE(DATE_FORMAT(_, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '_null_recon_')for both types so the two sides are byte-identical.BOOLEANtransform handler to the Redshift dialect (source side)SUPER/DATE/TIMESTAMP/TIMESTAMPTZand had no dialect-leveldefault. AnyBOOLEANcolumn fell through to the universal defaultTRIM(COALESCE(col, '_null_recon_')), which Redshift rejects during output schema resolution (before any rows are read) withfunction pg_catalog.btrim(boolean) does not exist. Result: any customer schema containing a singleBOOLEANcolumn crashes row-hash recon end-to-end on the Redshift connector shipped in PR #2339. Fix: explicitCOALESCE(CASE WHEN col THEN 'true' WHEN NOT col THEN 'false' ELSE NULL END, '_null_recon_')so the rendered string matches Spark'scast(boolean AS string)byte-for-byte. The existing unit suite did not catch this because the test fixtures hand-build query strings and never exercise the dialect transform-mapping path.Hardening from internal review
Before pushing upstream this PR went through one round of internal review on the contributor's fork. Three substantive code changes landed: pinning timezone-aware target columns to UTC in
fingerprint/spark_target.pyso a non-UTCspark.sql.session.timeZonedoes not produce Stage-1 false-mismatches; falling through to the full pipeline on abuild_mismatch_outputexception intrigger_recon_service.pyto match the fail-open contract every other branch already follows; and casting Redshift strings toVARCHAR(65535)instead of bareVARCHAR(which truncates at 256). The remaining changes are cleanups (unusedColumnAlignment.exclude_columnsremoved, no-op reorder inconnectors/source_adapter.pyreverted, flaky wall-clock assertion intest_fetch_parallel.pyreplaced with a deterministic distinct-thread-id check, andtest_expression_generator.pypins exact rendered SQL on each dialect).Caveats/things to watch out for when reviewing
remote_query()require classic clusters on DBR 17.3+ or SQL warehouses (Pro / Serverless) on version 2025.35+. This is inherited from upstream'sRemoteQueryReaderadoption, not a fingerprint-specific limitation; the runtime requirement is documented indocs/lakebridge/docs/reconcile/index.mdx.compare.reconcile_dataJOIN rather than replacing it, so on MISMATCH it costs an extra 16-94 s vs. row-hash-only mode. The MATCH path is the headline win (38.7% wall-clock improvement on 1 M rows: 55.5 s vs 90.6 s); the production motivation is billion-row scale, where the row-hash pipeline's full-table JOIN is what becomes intractable. Stage-1 hash persistence as Stage-2 input is filed as a follow-up.expression_generatortest file modified istests/unit/test_install.py(six"version": 2 -> 3fixture bumps, a direct consequence of the migration step we own). Not scope creep.success_countformula inverify_successful_reconciliationis mathematically wrong (yields counts greater thantotal). Pre-existing in upstream PR Improve reconciliation result handling and logging #2259 (commite56c79c3d); sits right next to fingerprint code intrigger_recon_service.py. Not fixed in this MR to keep scope contained; filed as a separate issue.Linked issues
Resolves #..
Functionality
docs/lakebridge/docs/reconcile/configuration.mdxFingerprint Pre-check (Experimental) section +docs/lakebridge/docs/reconcile/index.mdxruntime requirements)databricks labs lakebridge ...ReconcileConfig.fingerprint_precheck(defaultFalse)Tests
v1 -> v2 -> v3config migration, recon-capture typed schema, fingerprint dispatch, Stage-1 / Stage-2 contract symmetry)tests/unit/reconcile/alone runs 307 reconcile tests in 1.13 s. The 6 failingtest_cli_analyze.pycases (Informatica analyzer binary) are pre-existing onmain(verified on stash-clean HEAD) and unrelated.test_expression_generator.pypins handler presence, format string, and source / target byte alignment forTIMESTAMP/TIMESTAMPTZ, plus theBOOLEANCASE WHENrendering on Redshift.mismatchcount differs (fingerprint reports the true 10000, normal reports the cap-50 sample).reconcile/surface plusconfig.py.