From b006f673636f830636be96f53fd4c29c3d79fb18 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Fri, 1 May 2026 11:32:29 +0900 Subject: [PATCH 1/2] Feat: support agg_columns=["*"] for COUNT(*) in aggregate reconcile The aggregate query builder treats every entry of agg_columns as a column identifier and pushes it through the dialect normalizer, which wraps non-identifier characters in backticks. Passing the literal star, e.g. Aggregate(agg_columns=["*"], type="count"), produces SELECT count(`*`) AS `source_count_*` FROM :tbl which fails at analysis with UNRESOLVED_COLUMN. There is currently no way to ask the reconcile aggregate engine to compute COUNT(*) (a true row count), even though the rules table already accepts and stores agg_column = "*". Special-case the literal star with type == "count" inside _get_mapping_cols_with_alias and emit a sqlglot Star expression instead of pushing "*" through the column-name normalizer. The downstream formatter in _agg_query_cols_with_alias then produces SELECT count(*) AS `source_count_*` FROM :tbl NormalizeReconConfigService rewrites entries in agg_columns to their ansi-normalized form before this builder is invoked, so the incoming value may be either the raw "*" or the wrapped "`*`". The check uses DialectUtils.unnormalize_identifier so both forms match. The change is bounded to count + literal star, so non-count aggregates and ordinary column names take the existing path unchanged. Co-authored-by: Isaac --- .../reconcile/query_builder/aggregate_query.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/databricks/labs/lakebridge/reconcile/query_builder/aggregate_query.py b/src/databricks/labs/lakebridge/reconcile/query_builder/aggregate_query.py index b4a802f833..3f516e3fec 100644 --- a/src/databricks/labs/lakebridge/reconcile/query_builder/aggregate_query.py +++ b/src/databricks/labs/lakebridge/reconcile/query_builder/aggregate_query.py @@ -58,6 +58,22 @@ def _get_mapping_cols_with_alias(self, cols_list: list[str], agg_type: str): """ cols_with_mapping: list[exp.Expression] = [] for col in cols_list: + # Special-case the literal star for COUNT(*). + # The column-name normalizer would render "*" as the backtick-quoted + # identifier `*`, producing COUNT(`*`) which fails at SQL analysis. + # NormalizeReconConfigService also rewrites entries in agg_columns to + # the ansi-normalized form, so by the time we get here the value can + # be the literal "*" or the wrapped "`*`". Bypass identifier handling + # in either case and emit a Star expression so the downstream builder + # produces COUNT(*). + if DialectUtils.unnormalize_identifier(col) == "*" and agg_type.lower() == "count": + cols_with_mapping.append( + exp.Alias( + this=exp.Star(), + alias=exp.Identifier(this=f"{agg_type.lower()}<#>*", quoted=False), + ) + ) + continue column_expr = build_column( this=( self._build_column_name_source_normalized(self._get_mapping_col(col)) From 7210cae7e6ceebfa407ef956b5e9510d0119e887 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Fri, 1 May 2026 13:33:16 +0900 Subject: [PATCH 2/2] Add unit tests for COUNT(*) aggregate reconcile support Cover the AggregateQueryBuilder behavior introduced in this branch: - Aggregate(agg_columns=["*"], type="count") emits SELECT count(*) - The same holds when agg_columns has already been ansi-normalized to ["`*`"] by NormalizeReconConfigService - COUNT(*) coexists with COUNT() inside a single aggregate query - Star outside of count (e.g. type="sum") keeps the existing path so the special-case is bounded to the COUNT(*) use case Without the fix in this branch the first three cases produce SELECT count(`*`) ... and fail at SQL analysis with UNRESOLVED_COLUMN. Verified locally by reverting src/.../aggregate_query.py to origin/main: tests fail; with the patch applied: tests pass. Co-authored-by: Isaac --- .../query_builder/test_aggregate_query.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 tests/unit/reconcile/query_builder/test_aggregate_query.py diff --git a/tests/unit/reconcile/query_builder/test_aggregate_query.py b/tests/unit/reconcile/query_builder/test_aggregate_query.py new file mode 100644 index 0000000000..b08810ce46 --- /dev/null +++ b/tests/unit/reconcile/query_builder/test_aggregate_query.py @@ -0,0 +1,71 @@ +from databricks.labs.lakebridge.reconcile.query_builder.aggregate_query import AggregateQueryBuilder +from databricks.labs.lakebridge.reconcile.recon_config import Aggregate, Table +from databricks.labs.lakebridge.transpiler.sqlglot.dialect_utils import get_dialect + + +def _build_table(aggregates: list[Aggregate]) -> Table: + return Table(source_name="supplier", target_name="target_supplier", aggregates=aggregates) + + +def test_count_star_emits_unquoted_star(fake_databricks_datasource, normalize_config_service): + """Aggregate(agg_columns=["*"], type="count") must produce COUNT(*), not COUNT(`*`).""" + table_conf = _build_table([Aggregate(agg_columns=["*"], type="count")]) + normalized = normalize_config_service.normalize_recon_table_config(table_conf) + + rules = AggregateQueryBuilder( + normalized, [], "source", get_dialect("databricks"), fake_databricks_datasource + ).build_queries() + + assert len(rules) == 1 + sql = rules[0].query + assert "count(*)" in sql.lower() + # Regression: the column-name normalizer must not emit COUNT(`*`) + assert "count(`*`)" not in sql.lower() + # Alias of the aggregate column survives normalization + assert "source_count_*" in sql.lower() + + +def test_count_star_normalized_input_emits_unquoted_star(fake_databricks_datasource, normalize_config_service): + """The same fix must hold when agg_columns is already in ansi-normalized form (`*`).""" + table_conf = _build_table([Aggregate(agg_columns=["`*`"], type="count")]) + normalized = normalize_config_service.normalize_recon_table_config(table_conf) + + rules = AggregateQueryBuilder( + normalized, [], "target", get_dialect("databricks"), fake_databricks_datasource + ).build_queries() + + assert len(rules) == 1 + sql = rules[0].query + assert "count(*)" in sql.lower() + assert "count(`*`)" not in sql.lower() + + +def test_count_star_alongside_named_column(fake_databricks_datasource, normalize_config_service): + """COUNT(*) and COUNT() must coexist in a single aggregate query.""" + table_conf = _build_table([Aggregate(agg_columns=["*", "s_acctbal"], type="count")]) + normalized = normalize_config_service.normalize_recon_table_config(table_conf) + + rules = AggregateQueryBuilder( + normalized, [], "source", get_dialect("databricks"), fake_databricks_datasource + ).build_queries() + + sql = rules[0].query.lower() + assert "count(*)" in sql + assert "count(`s_acctbal`)" in sql + # The * branch must not pollute the named-column branch with backticks around * + assert "count(`*`)" not in sql + + +def test_star_with_non_count_aggregate_is_unchanged(fake_databricks_datasource, normalize_config_service): + """The fast-path must only apply to type='count'. Other aggregates keep the existing behavior.""" + table_conf = _build_table([Aggregate(agg_columns=["*"], type="sum")]) + normalized = normalize_config_service.normalize_recon_table_config(table_conf) + + rules = AggregateQueryBuilder( + normalized, [], "source", get_dialect("databricks"), fake_databricks_datasource + ).build_queries() + + sql = rules[0].query.lower() + # Existing path quotes the identifier; we must not silently turn this into sum(*) + assert "sum(*)" not in sql + assert "sum(`*`)" in sql