diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index b02215129e5da..5cccc4634e1ce 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -337,6 +337,18 @@ SELECT * FROM TO_CHANGELOG( -- op_code values are 'I' and 'U' instead of full names ``` +#### Upsert stream + +```sql +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source PARTITION BY id, + op_mapping => MAP['INSERT, UPDATE_AFTER', 'u', 'DELETE', 'd'] +) +-- INSERT and UPDATE_AFTER produce op='u' (upsert) +-- DELETE produces op='d' +-- UPDATE_BEFORE is omitted +``` + #### Deletion flag pattern ```sql diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6d17bf880e841..f207b5067292b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -60,6 +60,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -806,9 +807,10 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .name("TO_CHANGELOG") .kind(PROCESS_TABLE) .staticArguments( - // Row semantics (no PARTITION BY). - // With PARTITION BY, switches to set - // semantics for co-located parallel execution. + // Row semantics by default; PARTITION BY switches to set semantics. + // REQUIRE_UPDATE_BEFORE / REQUIRE_FULL_DELETE are conditional so the + // planner can skip ChangelogNormalize for upsert sources whose + // op_mapping does not emit UB or full deletes. StaticArgument.table( "input", Row.class, @@ -816,12 +818,42 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) EnumSet.of( StaticArgumentTrait.TABLE, StaticArgumentTrait.ROW_SEMANTIC_TABLE, - StaticArgumentTrait.SUPPORT_UPDATES, - StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, - StaticArgumentTrait.REQUIRE_FULL_DELETE)) + StaticArgumentTrait.SUPPORT_UPDATES)) .withConditionalTrait( StaticArgumentTrait.SET_SEMANTIC_TABLE, - TraitCondition.hasPartitionBy()), + TraitCondition.hasPartitionBy()) + .withConditionalTrait( + StaticArgumentTrait.REQUIRE_UPDATE_BEFORE, + TraitCondition.or( + // op_mapping omitted: default mapping includes + // UPDATE_BEFORE. + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map) + mapping, + "UPDATE_BEFORE")))) + .withConditionalTrait( + StaticArgumentTrait.REQUIRE_FULL_DELETE, + TraitCondition.or( + // op_mapping omitted: default mapping includes + // DELETE. + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map) + mapping, + "DELETE")))), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", @@ -3377,6 +3409,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final List ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC); + /** + * True when {@code key} appears among the {@code op_mapping} keys. Each map key may itself be a + * comma-separated list (e.g. {@code "INSERT,UPDATE_AFTER"}) - each part is trimmed and compared + * against {@code key}, so one entry can cover multiple change kinds. + * + *

Used by {@code TO_CHANGELOG} conditional traits to inspect the user-provided {@code + * op_mapping} argument. + */ + private static boolean opMappingContainsKey( + final Map opMapping, final String key) { + return opMapping.keySet().stream() + .flatMap(k -> Arrays.stream(k.split(","))) + .map(String::trim) + .anyMatch(key::equals); + } + @Internal public static List getDefinitions() { final Field[] fields = BuiltInFunctionDefinitions.class.getFields(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java index 2a06191ad13ab..ba87cc0559dd6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java @@ -37,7 +37,10 @@ final class BuiltInCondition implements TraitCondition { enum Kind { HAS_PARTITION_BY, ARG_IS_EQUAL_TO, - NOT + ARG_IS_PRESENT, + NOT, + OR, + ARG_MATCHES } private final Kind kind; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 93bdc5c8ce479..14734db11f4b6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import java.util.List; +import java.util.function.Predicate; /** * A condition that determines whether a conditional trait on a {@link StaticArgument} should be @@ -32,6 +33,9 @@ *

Implementations must implement {@code hashCode} and {@code equals} for {@link * StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. The built-in factories * below return value-comparable instances; user-supplied lambdas do not - prefer the factories. + * {@link #argMatches} accepts a caller-supplied {@code Predicate} and is therefore only value-equal + * when the same {@code Predicate} reference is reused; build the predicate once and cache it if + * equality matters. * *

{@code
  * import static org.apache.flink.table.types.inference.TraitCondition.*;
@@ -68,4 +72,39 @@ static TraitCondition not(final TraitCondition condition) {
         return new BuiltInCondition(
                 BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx));
     }
+
+    /**
+     * True when either the {@code left} or the {@code right} {@link TraitCondition} evaluates to
+     * true.
+     */
+    static TraitCondition or(final TraitCondition left, final TraitCondition right) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.OR,
+                List.of(left, right),
+                ctx -> left.test(ctx) || right.test(ctx));
+    }
+
+    /** True when the named scalar argument was provided by the caller. */
+    static TraitCondition argIsPresent(final String argName) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_IS_PRESENT,
+                List.of(argName),
+                ctx -> ctx.hasScalarArgument(argName));
+    }
+
+    /**
+     * True when the named scalar argument is present and its value matches {@code predicate}. False
+     * when the argument is absent, cannot be resolved as a literal of {@code argClass}, or {@code
+     * predicate} evaluates to {@code false}.
+     *
+     * 

Use this for ad-hoc conditions on scalar literals. Prefer the named factories above when + * one fits. + */ + static TraitCondition argMatches( + final String argName, final Class argClass, final Predicate predicate) { + return new BuiltInCondition( + BuiltInCondition.Kind.ARG_MATCHES, + List.of(argName, argClass, predicate), + ctx -> ctx.getScalarArgument(argName, argClass).stream().anyMatch(predicate)); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java index e58a4b36dceea..1c8d1a9b8bf5d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitContext.java @@ -46,6 +46,12 @@ public interface TraitContext { */ Optional getScalarArgument(String name, Class clazz); + /** + * Whether the named scalar argument was provided by the caller. Returns false when the argument + * was omitted, or when an argument with the given name is not declared by the function. + */ + boolean hasScalarArgument(String name); + /** * Builds a {@link TraitContext} from validation-time inputs. * @@ -76,6 +82,17 @@ public Optional getScalarArgument(final String name, final Class clazz } return Optional.empty(); } + + @Override + public boolean hasScalarArgument(final String name) { + for (int i = 0; i < staticArgs.size(); i++) { + final StaticArgument arg = staticArgs.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + return !callContext.isArgumentNull(i); + } + } + return false; + } }; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 059fcd6184d69..2c393adf875ec 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -315,6 +315,17 @@ public Optional getScalarArgument(String name, Class clazz) { } return Optional.empty(); } + + @Override + public boolean hasScalarArgument(String name) { + for (int i = 0; i < declared.size(); i++) { + final StaticArgument arg = declared.get(i); + if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) { + return !callContext.isArgumentNull(i); + } + } + return false; + } }; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index e038850fb8d17..bfffa8bd67d04 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -42,6 +42,8 @@ public List programs() { ToChangelogTestPrograms.INSERT, ToChangelogTestPrograms.RETRACT, ToChangelogTestPrograms.UPSERT, + ToChangelogTestPrograms.UPSERT_PARTITION_BY, + ToChangelogTestPrograms.UPSERT_PARTITION_BY_KEY_ONLY_DELETES, ToChangelogTestPrograms.RETRACT_PARTITION_BY, ToChangelogTestPrograms.CUSTOM_OP_MAPPING, ToChangelogTestPrograms.CUSTOM_OP_NAME, @@ -52,6 +54,7 @@ public List programs() { ToChangelogTestPrograms.DELETION_FLAG, ToChangelogTestPrograms.INVALID_DESCRIPTOR, ToChangelogTestPrograms.INVALID_OP_MAPPING, + ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, ToChangelogTestPrograms.DUPLICATE_ROW_KIND); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 7f33cfc141952..250ad1f1d8fbe 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -184,6 +184,68 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); + public static final TableTestProgram UPSERT_PARTITION_BY = + TableTestProgram.of( + "to-changelog-upsert-partition-by", + "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, C, 10]", + "+I[Bob, C, 20]", + "+I[Alice, C, 30]", + "+I[Bob, D, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])") + .build(); + + public static final TableTestProgram UPSERT_PARTITION_BY_KEY_ONLY_DELETES = + TableTestProgram.of( + "to-changelog-upsert-partition-by-key-only-deletes", + "PARTITION BY upsert key + mapping without UB/DELETE handles key-only deletes") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert(true)) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + // Key-only delete: source only knows the key. + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, U, 10]", + "+I[Bob, U, 20]", + "+I[Alice, U, 30]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'U'])") + .build(); + public static final TableTestProgram CUSTOM_OP_MAPPING = TableTestProgram.of( "to-changelog-custom-op-mapping", @@ -511,6 +573,19 @@ public class ToChangelogTestPrograms { "Unknown change operation: 'INVALID_KIND'") .build(); + public static final TableTestProgram OP_MAPPING_REFERENCES_UNSUPPORTED_KIND = + TableTestProgram.of( + "to-changelog-op-mapping-references-unsupported-kind", + "fails when op_mapping references a change operation the input cannot produce") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "op_mapping => MAP['INSERT', 'I', 'DELETE', 'D'])", + ValidationException.class, + "the input table only produces [INSERT] and does not produce [DELETE]") + .build(); + public static final TableTestProgram DUPLICATE_ROW_KIND = TableTestProgram.of( "to-changelog-duplicate-rowkind", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index 1279031868820..7f6f93849c282 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -104,4 +104,44 @@ void testRetractPartitionBy() { "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)", CHANGELOG_MODE); } + + @Test + void testUpsertPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source PARTITION BY id, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])", + CHANGELOG_MODE); + } + + @Test + void testUpsertPartitionByNoUpdateBeforeAndDelete() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source PARTITION BY id, " + + "op => DESCRIPTOR(op), " + + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index b477039f2da3c..7cea1058e5c82 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -71,6 +71,46 @@ LogicalProject(op=[$0], id=[$1], name=[$2]) + + + + + TABLE upsert_source PARTITION BY id, op => DESCRIPTOR(op), op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])]]> + + + + + + + + + + + TABLE upsert_source PARTITION BY id, op => DESCRIPTOR(op), op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])]]> + + + + + + diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 36ed1a615bbc7..cf0547dbab0c8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -19,6 +19,8 @@ package org.apache.flink.table.runtime.functions.ptf; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; @@ -37,6 +39,7 @@ import javax.annotation.Nullable; import java.util.EnumMap; +import java.util.List; import java.util.Map; /** @@ -77,6 +80,9 @@ public ToChangelogFunction(final SpecializedContext context) { final Map opMapping = callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); + if (opMapping != null) { + validateOpMap(this.rawOpMap, tableSemantics); + } this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); } @@ -108,6 +114,34 @@ private static Map buildOpMap(@Nullable final MapLives here rather than in the input type strategy because {@link + * TableSemantics#changelogMode()} returns empty during type inference and is only populated at + * specialization time, which is when this constructor runs. + */ + private static void validateOpMap( + final Map mapping, final TableSemantics tableSemantics) { + final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); + if (inputMode == null) { + return; + } + final List unsupported = + mapping.keySet().stream().filter(kind -> !inputMode.contains(kind)).toList(); + if (!unsupported.isEmpty()) { + throw new ValidationException( + String.format( + "Invalid 'op_mapping' for TO_CHANGELOG: the input table only produces " + + "%s and does not produce %s. Remove those entries from the " + + "mapping.", + inputMode.getContainedKinds(), unsupported)); + } + } + public void eval( final Context ctx, final RowData input,