From 642aae6b1a13cb6fd3f74d69e138ff40814a6f0e Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Tue, 19 May 2026 14:21:46 +0200 Subject: [PATCH 1/7] [FLINK-39708][table] Add TraitCondition.mapArgIncludesKey factory --- .../types/inference/BuiltInCondition.java | 3 +- .../table/types/inference/TraitCondition.java | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java index 2a06191ad13ab..bc7383a1cfcd1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java @@ -37,7 +37,8 @@ final class BuiltInCondition implements TraitCondition { enum Kind { HAS_PARTITION_BY, ARG_IS_EQUAL_TO, - NOT + NOT, + MAP_ARG_INCLUDES_KEY } private final Kind kind; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 93bdc5c8ce479..5519a3dc60979 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -20,7 +20,9 @@ import org.apache.flink.annotation.PublicEvolving; +import java.util.Arrays; import java.util.List; +import java.util.Map; /** * A condition that determines whether a conditional trait on a {@link StaticArgument} should be @@ -68,4 +70,30 @@ static TraitCondition not(final TraitCondition condition) { return new BuiltInCondition( BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx)); } + + /** + * True when the named {@code MAP} scalar argument has a key that, after + * splitting on comma and trimming each part, equals {@code key}. Returns true when the argument + * is omitted, on the assumption that an absent argument means the function falls back to a + * default that includes all keys. + */ + @SuppressWarnings("rawtypes") + static TraitCondition mapArgIncludesKey(final String argName, final String key) { + return new BuiltInCondition( + BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY, + List.of(argName, key), + ctx -> + ctx.getScalarArgument(argName, Map.class) + .map(map -> mapKeysContain(map, key)) + .orElse(true)); + } + + /** True when any key in {@code map}, split on comma and trimmed, equals {@code expected}. */ + private static boolean mapKeysContain(final Map map, final String expected) { + return map.keySet().stream() + .map(String.class::cast) + .flatMap(k -> Arrays.stream(k.split(","))) + .map(String::trim) + .anyMatch(expected::equals); + } } From efd5e4a1c2acd838b31b6521a7a70e7c76349692 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Tue, 19 May 2026 14:21:52 +0200 Subject: [PATCH 2/7] [FLINK-39708][table] Support TO_CHANGELOG retract/upsert stream -> upsert stream with set semantics --- .../functions/BuiltInFunctionDefinitions.java | 21 ++++++--- .../exec/stream/ToChangelogSemanticTests.java | 2 + .../exec/stream/ToChangelogTestPrograms.java | 44 +++++++++++++++++++ .../plan/stream/sql/ToChangelogTest.java | 40 +++++++++++++++++ .../plan/stream/sql/ToChangelogTest.xml | 40 +++++++++++++++++ .../functions/ptf/ToChangelogFunction.java | 39 ++++++++++++++++ 6 files changed, 179 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6d17bf880e841..bf614705e6741 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -806,9 +806,10 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .name("TO_CHANGELOG") .kind(PROCESS_TABLE) .staticArguments( - // Row semantics (no PARTITION BY). - // With PARTITION BY, switches to set - // semantics for co-located parallel execution. + // Row semantics by default; PARTITION BY switches to set semantics. + // REQUIRE_UPDATE_BEFORE / REQUIRE_FULL_DELETE are conditional so the + // planner can skip ChangelogNormalize for upsert sources whose + // op_mapping does not emit UB or full deletes. StaticArgument.table( "input", Row.class, @@ -816,12 +817,18 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) EnumSet.of( StaticArgumentTrait.TABLE, StaticArgumentTrait.ROW_SEMANTIC_TABLE, - StaticArgumentTrait.SUPPORT_UPDATES, - StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, - StaticArgumentTrait.REQUIRE_FULL_DELETE)) + StaticArgumentTrait.SUPPORT_UPDATES)) .withConditionalTrait( StaticArgumentTrait.SET_SEMANTIC_TABLE, - TraitCondition.hasPartitionBy()), + TraitCondition.hasPartitionBy()) + .withConditionalTrait( + StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, + TraitCondition.mapArgIncludesKey( + "op_mapping", "UPDATE_BEFORE")) + .withConditionalTrait( + StaticArgumentTrait.REQUIRE_FULL_DELETE, + TraitCondition.mapArgIncludesKey( + "op_mapping", "DELETE")), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index e038850fb8d17..6fe7d9522dcd5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -42,6 +42,7 @@ public List programs() { ToChangelogTestPrograms.INSERT, ToChangelogTestPrograms.RETRACT, ToChangelogTestPrograms.UPSERT, + ToChangelogTestPrograms.UPSERT_PARTITION_BY, ToChangelogTestPrograms.RETRACT_PARTITION_BY, ToChangelogTestPrograms.CUSTOM_OP_MAPPING, ToChangelogTestPrograms.CUSTOM_OP_NAME, @@ -52,6 +53,7 @@ public List programs() { ToChangelogTestPrograms.DELETION_FLAG, ToChangelogTestPrograms.INVALID_DESCRIPTOR, ToChangelogTestPrograms.INVALID_OP_MAPPING, + ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, ToChangelogTestPrograms.DUPLICATE_ROW_KIND); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 7f33cfc141952..cb91ec3a5aa47 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -184,6 +184,37 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); + public static final TableTestProgram UPSERT_PARTITION_BY = + TableTestProgram.of( + "to-changelog-upsert-partition-by", + "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, C, 10]", + "+I[Bob, C, 20]", + "+I[Alice, C, 30]", + "+I[Bob, D, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])") + .build(); + public static final TableTestProgram CUSTOM_OP_MAPPING = TableTestProgram.of( "to-changelog-custom-op-mapping", @@ -511,6 +542,19 @@ public class ToChangelogTestPrograms { "Unknown change operation: 'INVALID_KIND'") .build(); + public static final TableTestProgram OP_MAPPING_REFERENCES_UNSUPPORTED_KIND = + TableTestProgram.of( + "to-changelog-op-mapping-references-unsupported-kind", + "fails when op_mapping references a change operation the input cannot produce") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "op_mapping => MAP['INSERT', 'I', 'DELETE', 'D'])", + ValidationException.class, + "the input table only produces [INSERT] and does not produce [DELETE]") + .build(); + public static final TableTestProgram DUPLICATE_ROW_KIND = TableTestProgram.of( "to-changelog-duplicate-rowkind", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index 1279031868820..ec9ea8dcdbd12 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -104,4 +104,44 @@ void testRetractPartitionBy() { "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)", CHANGELOG_MODE); } + + @Test + void testUpsertPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source PARTITION BY id, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])", + CHANGELOG_MODE); + } + + @Test + void testUpsertPartitionByNoUpdateBeforeOrDelete() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source PARTITION BY id, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index b477039f2da3c..6c31dfb6f1c23 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -71,6 +71,46 @@ LogicalProject(op=[$0], id=[$1], name=[$2]) + + + + + TABLE upsert_source PARTITION BY id, op => DESCRIPTOR(op), op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])]]> + + + + + + + + + + + TABLE upsert_source PARTITION BY id, op => DESCRIPTOR(op), op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])]]> + + + + + + diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 36ed1a615bbc7..0ed96cfe41f71 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -19,6 +19,8 @@ package org.apache.flink.table.runtime.functions.ptf; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; @@ -38,6 +40,9 @@ import java.util.EnumMap; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; /** * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. @@ -77,6 +82,11 @@ public ToChangelogFunction(final SpecializedContext context) { final Map opMapping = callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); + if (opMapping != null) { + // Only user-supplied mappings are validated. The default mapping covers all kinds by + // design and is harmless for insert-only or upsert inputs. + validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics); + } this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); } @@ -108,6 +118,35 @@ private static Map buildOpMap(@Nullable final MapLives here rather than in the input type strategy because {@link + * TableSemantics#changelogMode()} returns empty during type inference and is only populated at + * specialization time, which is when this constructor runs. + */ + private static void validateAgainstInputChangelogMode( + final Map mapping, final TableSemantics tableSemantics) { + final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); + if (inputMode == null) { + return; + } + final Set unsupported = + mapping.keySet().stream() + .filter(kind -> !inputMode.contains(kind)) + .collect(Collectors.toCollection(TreeSet::new)); + if (!unsupported.isEmpty()) { + throw new ValidationException( + String.format( + "Invalid 'op_mapping' for TO_CHANGELOG: the input table only produces " + + "%s and does not produce %s. Remove those entries from the " + + "mapping.", + inputMode.getContainedKinds(), unsupported)); + } + } + public void eval( final Context ctx, final RowData input, From 3d05a3e0b0b836de92a37820254a535e25bcb9d4 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Tue, 19 May 2026 14:21:56 +0200 Subject: [PATCH 3/7] [FLINK-39708][docs] Document avoiding ChangelogNormalize for upsert sources in TO_CHANGELOG --- .../docs/sql/reference/queries/changelog.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index b02215129e5da..07a34903808d1 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -337,6 +337,18 @@ SELECT * FROM TO_CHANGELOG( -- op_code values are 'I' and 'U' instead of full names ``` +#### Upsert stream + +```sql +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source PARTITION BY id, + op_mapping => MAP['INSERT, UPDATE_AFTER', 'u', 'DELETE', 'd'] +) +-- INSERT and UPDATE_AFTER produce op='u' (upsert) +-- DELETE produces op='d' +-- UPDATE_BEFORE is omitted +``` + #### Deletion flag pattern ```sql @@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema changes**. The partition ke Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance. +#### Avoiding ChangelogNormalize for upsert sources + +When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: + +* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization. +* If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`. + #### Table API ```java From 52f9d2440cbe3146652edca8814ea6584f87b5b5 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 20 May 2026 15:12:11 +0200 Subject: [PATCH 4/7] [FLINK-39708][table] Address review on TO_CHANGELOG validation, test, and doc --- .../docs/sql/reference/queries/changelog.md | 7 ----- .../exec/stream/ToChangelogSemanticTests.java | 1 + .../exec/stream/ToChangelogTestPrograms.java | 31 +++++++++++++++++++ .../functions/ptf/ToChangelogFunction.java | 10 ++---- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 07a34903808d1..5cccc4634e1ce 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -381,13 +381,6 @@ When `PARTITION BY` is provided, **the output schema changes**. The partition ke Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance. -#### Avoiding ChangelogNormalize for upsert sources - -When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: - -* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization. -* If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`. - #### Table API ```java diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index 6fe7d9522dcd5..bfffa8bd67d04 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -43,6 +43,7 @@ public List programs() { ToChangelogTestPrograms.RETRACT, ToChangelogTestPrograms.UPSERT, ToChangelogTestPrograms.UPSERT_PARTITION_BY, + ToChangelogTestPrograms.UPSERT_PARTITION_BY_KEY_ONLY_DELETES, ToChangelogTestPrograms.RETRACT_PARTITION_BY, ToChangelogTestPrograms.CUSTOM_OP_MAPPING, ToChangelogTestPrograms.CUSTOM_OP_NAME, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index cb91ec3a5aa47..250ad1f1d8fbe 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -215,6 +215,37 @@ public class ToChangelogTestPrograms { + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])") .build(); + public static final TableTestProgram UPSERT_PARTITION_BY_KEY_ONLY_DELETES = + TableTestProgram.of( + "to-changelog-upsert-partition-by-key-only-deletes", + "PARTITION BY upsert key + mapping without UB/DELETE handles key-only deletes") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert(true)) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + // Key-only delete: source only knows the key. + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, U, 10]", + "+I[Bob, U, 20]", + "+I[Alice, U, 30]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'U'])") + .build(); + public static final TableTestProgram CUSTOM_OP_MAPPING = TableTestProgram.of( "to-changelog-custom-op-mapping", diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 0ed96cfe41f71..ca1476e77451b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -39,10 +39,8 @@ import javax.annotation.Nullable; import java.util.EnumMap; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; /** * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. @@ -133,10 +131,8 @@ private static void validateAgainstInputChangelogMode( if (inputMode == null) { return; } - final Set unsupported = - mapping.keySet().stream() - .filter(kind -> !inputMode.contains(kind)) - .collect(Collectors.toCollection(TreeSet::new)); + final List unsupported = + mapping.keySet().stream().filter(kind -> !inputMode.contains(kind)).toList(); if (!unsupported.isEmpty()) { throw new ValidationException( String.format( From cb8dbe8e005a8b2eeb08dc904b292b0c3b30f81a Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 20 May 2026 15:12:16 +0200 Subject: [PATCH 5/7] [FLINK-39708][table] Add argMatches, argIsPresent, or, and hasScalarArgument --- .../functions/BuiltInFunctionDefinitions.java | 16 +++-- .../types/inference/BuiltInCondition.java | 4 +- .../table/types/inference/TraitCondition.java | 64 ++++++++++++++----- .../table/types/inference/TraitContext.java | 17 +++++ .../bridging/BridgingSqlFunction.java | 11 ++++ 5 files changed, 91 insertions(+), 21 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index bf614705e6741..dd843b22eb1b5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -823,12 +823,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) TraitCondition.hasPartitionBy()) .withConditionalTrait( StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, - TraitCondition.mapArgIncludesKey( - "op_mapping", "UPDATE_BEFORE")) + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.mapArgIncludesKey( + "op_mapping", "UPDATE_BEFORE"))) .withConditionalTrait( StaticArgumentTrait.REQUIRE_FULL_DELETE, - TraitCondition.mapArgIncludesKey( - "op_mapping", "DELETE")), + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.mapArgIncludesKey( + "op_mapping", "DELETE"))), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java index bc7383a1cfcd1..ba87cc0559dd6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java @@ -37,8 +37,10 @@ final class BuiltInCondition implements TraitCondition { enum Kind { HAS_PARTITION_BY, ARG_IS_EQUAL_TO, + ARG_IS_PRESENT, NOT, - MAP_ARG_INCLUDES_KEY + OR, + ARG_MATCHES } private final Kind kind; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 5519a3dc60979..4504a6abd4be8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.function.Predicate; /** * A condition that determines whether a conditional trait on a {@link StaticArgument} should be @@ -34,6 +35,9 @@ *

Implementations must implement {@code hashCode} and {@code equals} for {@link * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. The built-in factories * below return value-comparable instances; user-supplied lambdas do not - prefer the factories. + * {@link #argMatches} accepts a caller-supplied {@code Predicate} and is therefore only value-equal + * when the same {@code Predicate} reference is reused; build the predicate once and cache it if + * equality matters. * *

{@code
  * import static org.apache.flink.table.types.inference.TraitCondition.*;
@@ -71,29 +75,57 @@ static TraitCondition not(final TraitCondition condition) {
                 BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx));
     }
 
+    /** True when either {@code left} or {@code right} evaluates to true. */
+    static TraitCondition or(final TraitCondition left, final TraitCondition right) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.OR,
+                List.of(left, right),
+                ctx -> left.test(ctx) || right.test(ctx));
+    }
+
+    /** True when the named scalar argument was provided by the caller. */
+    static TraitCondition argIsPresent(final String argName) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_IS_PRESENT,
+                List.of(argName),
+                ctx -> ctx.hasScalarArgument(argName));
+    }
+
     /**
-     * True when the named {@code MAP} scalar argument has a key that, after
-     * splitting on comma and trimming each part, equals {@code key}. Returns true when the argument
-     * is omitted, on the assumption that an absent argument means the function falls back to a
-     * default that includes all keys.
+     * True when the named scalar argument is present and its value matches {@code predicate}. False
+     * when the argument is absent or cannot be resolved as a literal of {@code argClass}.
+     *
+     * 

Use this for ad-hoc conditions on scalar literals. Prefer the named factories above when + * one fits. */ - @SuppressWarnings("rawtypes") - static TraitCondition mapArgIncludesKey(final String argName, final String key) { + static TraitCondition argMatches( + final String argName, final Class argClass, final Predicate predicate) { return new BuiltInCondition( - BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY, - List.of(argName, key), - ctx -> - ctx.getScalarArgument(argName, Map.class) - .map(map -> mapKeysContain(map, key)) - .orElse(true)); + BuiltInCondition.Kind.ARG_MATCHES, + List.of(argName, argClass, predicate), + ctx -> ctx.getScalarArgument(argName, argClass).stream().anyMatch(predicate)); + } + + /** + * True when the named {@code MAP} scalar argument is present and contains + * {@code key} among its keys. False when the argument is absent or cannot be resolved as a + * literal {@link Map}. + * + *

Also matches compound keys: if a key contains commas (e.g. {@code "INSERT,UPDATE_AFTER"}), + * each comma-separated part is trimmed and compared against {@code key} - useful for mappings + * where one entry covers multiple kinds. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + static TraitCondition mapArgIncludesKey(final String argName, final String key) { + return argMatches( + argName, Map.class, map -> mapKeysContain((Map) map, key)); } - /** True when any key in {@code map}, split on comma and trimmed, equals {@code expected}. */ - private static boolean mapKeysContain(final Map map, final String expected) { + /** True when any key in {@code map}, split on comma and trimmed, equals {@code key}. */ + private static boolean mapKeysContain(final Map map, final String key) { return map.keySet().stream() - .map(String.class::cast) .flatMap(k -> Arrays.stream(k.split(","))) .map(String::trim) - .anyMatch(expected::equals); + .anyMatch(key::equals); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java index e58a4b36dceea..1c8d1a9b8bf5d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java @@ -46,6 +46,12 @@ public interface TraitContext { */ Optional getScalarArgument(String name, Class clazz); + /** + * Whether the named scalar argument was provided by the caller. Returns false when the argument + * was omitted, or when an argument with the given name is not declared by the function. + */ + boolean hasScalarArgument(String name); + /** * Builds a {@link TraitContext} from validation-time inputs. * @@ -76,6 +82,17 @@ public Optional getScalarArgument(final String name, final Class clazz } return Optional.empty(); } + + @Override + public boolean hasScalarArgument(final String name) { + for (int i = 0; i < staticArgs.size(); i++) { + final StaticArgument arg = staticArgs.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + return !callContext.isArgumentNull(i); + } + } + return false; + } }; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 059fcd6184d69..2c393adf875ec 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -315,6 +315,17 @@ public Optional getScalarArgument(String name, Class clazz) { } return Optional.empty(); } + + @Override + public boolean hasScalarArgument(String name) { + for (int i = 0; i < declared.size(); i++) { + final StaticArgument arg = declared.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + return !callContext.isArgumentNull(i); + } + } + return false; + } }; } From 2e87ebe4c0c144712065725e222c94ca6b3c149c Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Thu, 21 May 2026 12:20:23 +0200 Subject: [PATCH 6/7] [FLINK-39708][table] Address review: drop mapArgIncludesKey, inline TO_CHANGELOG conditional traits --- .../functions/BuiltInFunctionDefinitions.java | 41 +++++++++++++++++-- .../table/types/inference/TraitCondition.java | 28 +------------ 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index dd843b22eb1b5..2268aa9e3d8c5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -60,6 +60,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -824,19 +825,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .withConditionalTrait( StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, TraitCondition.or( + // op_mapping omitted: default mapping includes + // UPDATE_BEFORE. TraitCondition.not( TraitCondition.argIsPresent( "op_mapping")), - TraitCondition.mapArgIncludesKey( - "op_mapping", "UPDATE_BEFORE"))) + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map) + mapping, + "UPDATE_BEFORE")))) .withConditionalTrait( StaticArgumentTrait.REQUIRE_FULL_DELETE, TraitCondition.or( + // op_mapping omitted: default mapping includes + // DELETE. TraitCondition.not( TraitCondition.argIsPresent( "op_mapping")), - TraitCondition.mapArgIncludesKey( - "op_mapping", "DELETE"))), + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map) + mapping, + "DELETE")))), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", @@ -3392,6 +3409,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final List ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC); + /** + * True when {@code key} appears among the {@code op_mapping} keys. Each map key may itself be a + * comma-separated list (e.g. {@code "INSERT,UPDATE_AFTER"}) - each part is trimmed and compared + * against {@code key}, so one entry can cover multiple change kinds. + * + *

Used by {@code TO_CHANGELOG} conditional traits to inspect the + * user-provided {@code op_mapping} argument. + */ + private static boolean opMappingContainsKey( + final Map opMapping, final String key) { + return opMapping.keySet().stream() + .flatMap(k -> Arrays.stream(k.split(","))) + .map(String::trim) + .anyMatch(key::equals); + } + @Internal public static List getDefinitions() { final Field[] fields = BuiltInFunctionDefinitions.class.getFields(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 4504a6abd4be8..2a5a17b436b05 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -20,9 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; -import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.function.Predicate; /** @@ -93,7 +91,8 @@ static TraitCondition argIsPresent(final String argName) { /** * True when the named scalar argument is present and its value matches {@code predicate}. False - * when the argument is absent or cannot be resolved as a literal of {@code argClass}. + * when the argument is absent, cannot be resolved as a literal of {@code argClass}, + * or {@code predicate} evaluates to `false`. * *

Use this for ad-hoc conditions on scalar literals. Prefer the named factories above when * one fits. @@ -105,27 +104,4 @@ static TraitCondition argMatches( List.of(argName, argClass, predicate), ctx -> ctx.getScalarArgument(argName, argClass).stream().anyMatch(predicate)); } - - /** - * True when the named {@code MAP} scalar argument is present and contains - * {@code key} among its keys. False when the argument is absent or cannot be resolved as a - * literal {@link Map}. - * - *

Also matches compound keys: if a key contains commas (e.g. {@code "INSERT,UPDATE_AFTER"}), - * each comma-separated part is trimmed and compared against {@code key} - useful for mappings - * where one entry covers multiple kinds. - */ - @SuppressWarnings({"rawtypes", "unchecked"}) - static TraitCondition mapArgIncludesKey(final String argName, final String key) { - return argMatches( - argName, Map.class, map -> mapKeysContain((Map) map, key)); - } - - /** True when any key in {@code map}, split on comma and trimmed, equals {@code key}. */ - private static boolean mapKeysContain(final Map map, final String key) { - return map.keySet().stream() - .flatMap(k -> Arrays.stream(k.split(","))) - .map(String::trim) - .anyMatch(key::equals); - } } From f768f895a8b62c407b247723ad0b46949a4fcc5e Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Thu, 21 May 2026 13:54:50 +0200 Subject: [PATCH 7/7] [FLINK-39708][table] Address review nits: rename test, javadoc tweaks, generic type T --- .../table/functions/BuiltInFunctionDefinitions.java | 4 ++-- .../flink/table/types/inference/TraitCondition.java | 13 ++++++++----- .../planner/plan/stream/sql/ToChangelogTest.java | 2 +- .../planner/plan/stream/sql/ToChangelogTest.xml | 2 +- .../runtime/functions/ptf/ToChangelogFunction.java | 13 ++++++------- 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 2268aa9e3d8c5..f207b5067292b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3414,8 +3414,8 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) * comma-separated list (e.g. {@code "INSERT,UPDATE_AFTER"}) - each part is trimmed and compared * against {@code key}, so one entry can cover multiple change kinds. * - *

Used by {@code TO_CHANGELOG} conditional traits to inspect the - * user-provided {@code op_mapping} argument. + *

Used by {@code TO_CHANGELOG} conditional traits to inspect the user-provided {@code + * op_mapping} argument. */ private static boolean opMappingContainsKey( final Map opMapping, final String key) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 2a5a17b436b05..14734db11f4b6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -73,7 +73,10 @@ static TraitCondition not(final TraitCondition condition) { BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx)); } - /** True when either {@code left} or {@code right} evaluates to true. */ + /** + * True when either the {@code left} or the {@code right} {@link TraitCondition} evaluates to + * true. + */ static TraitCondition or(final TraitCondition left, final TraitCondition right) { return new BuiltInCondition( BuiltInCondition.Kind.OR, @@ -91,14 +94,14 @@ static TraitCondition argIsPresent(final String argName) { /** * True when the named scalar argument is present and its value matches {@code predicate}. False - * when the argument is absent, cannot be resolved as a literal of {@code argClass}, - * or {@code predicate} evaluates to `false`. + * when the argument is absent, cannot be resolved as a literal of {@code argClass}, or {@code + * predicate} evaluates to {@code false}. * *

Use this for ad-hoc conditions on scalar literals. Prefer the named factories above when * one fits. */ - static TraitCondition argMatches( - final String argName, final Class argClass, final Predicate predicate) { + static TraitCondition argMatches( + final String argName, final Class argClass, final Predicate predicate) { return new BuiltInCondition( BuiltInCondition.Kind.ARG_MATCHES, List.of(argName, argClass, predicate), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index ec9ea8dcdbd12..7f6f93849c282 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -126,7 +126,7 @@ void testUpsertPartitionBy() { } @Test - void testUpsertPartitionByNoUpdateBeforeOrDelete() { + void testUpsertPartitionByNoUpdateBeforeAndDelete() { util.tableEnv() .executeSql( "CREATE TABLE upsert_source (" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 6c31dfb6f1c23..7cea1058e5c82 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -94,7 +94,7 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRI ]]> - + TABLE upsert_source PARTITION BY id, op => DESCRIPTOR(op), op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])]]> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index ca1476e77451b..cf0547dbab0c8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -81,9 +81,7 @@ public ToChangelogFunction(final SpecializedContext context) { callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); if (opMapping != null) { - // Only user-supplied mappings are validated. The default mapping covers all kinds by - // design and is harmless for insert-only or upsert inputs. - validateAgainstInputChangelogMode(this.rawOpMap, tableSemantics); + validateOpMap(this.rawOpMap, tableSemantics); } this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); } @@ -117,15 +115,16 @@ private static Map buildOpMap(@Nullable final MapLives here rather than in the input type strategy because {@link * TableSemantics#changelogMode()} returns empty during type inference and is only populated at * specialization time, which is when this constructor runs. */ - private static void validateAgainstInputChangelogMode( + private static void validateOpMap( final Map mapping, final TableSemantics tableSemantics) { final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); if (inputMode == null) {