Skip to content

[FLINK-39720][table] SubQueryDecorrelator produces incorrect plans for correlated EXISTS with HAVING on aggregate outputs#28217

Open
lincoln-lil wants to merge 1 commit into
apache:masterfrom
lincoln-lil:FLINK-39720
Open

[FLINK-39720][table] SubQueryDecorrelator produces incorrect plans for correlated EXISTS with HAVING on aggregate outputs#28217
lincoln-lil wants to merge 1 commit into
apache:masterfrom
lincoln-lil:FLINK-39720

Conversation

@lincoln-lil
Copy link
Copy Markdown
Contributor

What is the purpose of the change

This pull request fixes a silent wrong-result bug in SubQueryDecorrelator: correlated EXISTS / NOT EXISTS sub-queries that combine a correlated WHERE with a HAVING
clause on aggregate output produced plans where the HAVING predicate pointed at the wrong column.

When decorrelateRel(LogicalAggregate) rewrites the inner aggregate, it injects the correlated columns into the group key, which shifts the position of the original
aggregate-output fields. The Frame records this shift in oldToNewOutputs. But decorrelateRel(LogicalFilter) then reattaches the non-correlated remainder of the
Filter condition (the HAVING clause, for example SUM(r.e) >= 3) to the rewritten input without remapping its RexInputRefs through frame.oldToNewOutputs. Every
other decorrelateRel(...) method in the same file (Project, Aggregate, Join, Correlate) already calls adjustInputRefs on surviving expressions before reattaching
them — the Filter path was the only one missing the call.

The result is a structurally valid but semantically wrong plan: HAVING SUM(r.e) >= 3 ends up applied to the injected correlated column (e.g. r.d) instead of the
SUM(r.e) output. This restores symmetry with Calcite's bottom-up RelDecorrelator.decorrelateRel(Filter), which walks every RexInputRef through oldToNewOutputs in a
single pass.

Brief change log

SubQueryDecorrelator.decorrelateRel(LogicalFilter): re-index remainingCondition (the non-correlated remainder built from nonCorConditions) through
adjustInputRefs(remainingCondition, frame.oldToNewOutputs, frame.r.getRowType()) between RexUtil.composeConjunction(...) and LogicalFilter.create(...). The
defensive null guard mirrors surrounding style; composeConjunction(..., /nullOnEmpty=/false) returns TRUE for an empty list, and adjustInputRefs(TRUE, ...) is a
no-op, so the all-correlated case is unaffected.

Verifying this change

SubQuerySemiJoinTest — five new test cases

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (no)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

…r correlated EXISTS with HAVING on aggregate outputs
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 21, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants