diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 219b995864fcd..fbc777db46e8f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -106,6 +106,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -522,6 +523,7 @@ private static RelNode convertUpdate( tableModify, contextResolvedTable, updateInfo, + updatedColumns, tableDebugName, dataTypeFactory, typeFactory); @@ -534,6 +536,20 @@ private static RelNode convertUpdate( return updateRelNodeAndRequireIndices.f0; } + /** Append updated columns that are missing from the sink-declared required columns. */ + private static List mergeRequiredAndUpdatedColumns( + List requiredColumns, List updatedColumns) { + Set existingNames = + requiredColumns.stream().map(Column::getName).collect(Collectors.toSet()); + List merged = new ArrayList<>(requiredColumns); + for (Column updated : updatedColumns) { + if (!existingNames.contains(updated.getName())) { + merged.add(updated); + } + } + return merged; + } + private static List getUpdatedColumns( LogicalTableModify tableModify, ResolvedSchema resolvedSchema) { List updatedColumns = new ArrayList<>(); @@ -709,13 +725,17 @@ private static Tuple2 convertToRowLevelUpdate( LogicalTableModify tableModify, ContextResolvedTable contextResolvedTable, SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo, + List updatedColumns, String tableDebugName, DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) { // get the required columns ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema(); Optional> optionalColumns = rowLevelUpdateInfo.requiredColumns(); - List requiredColumns = optionalColumns.orElse(resolvedSchema.getColumns()); + List requiredColumns = + optionalColumns + .map(cols -> mergeRequiredAndUpdatedColumns(cols, updatedColumns)) + .orElse(resolvedSchema.getColumns()); // get the root table scan which we may need rewrite it LogicalTableScan tableScan = getSourceTableScan(tableModify); Tuple2, List> colsIndexAndExtraMetaCols = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java index 7101e129918e8..377b90609633f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -76,6 +76,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -437,7 +438,9 @@ public Optional> requiredColumns() { resolvedCatalogTable.getResolvedSchema()); } requiredColumnIndices = - getRequiredColumnIndexes(resolvedCatalogTable, requiredCols); + getRequiredColumnIndexes( + resolvedCatalogTable, + mergeRequiredWithUpdatedColumns(requiredCols, updatedColumns)); return Optional.ofNullable(requiredCols); } @@ -750,6 +753,22 @@ public void executeTruncation() { } } + private static List mergeRequiredWithUpdatedColumns( + @Nullable List requiredColumns, List updatedColumns) { + if (requiredColumns == null) { + return null; + } + Set existingNames = + requiredColumns.stream().map(Column::getName).collect(Collectors.toSet()); + List merged = new ArrayList<>(requiredColumns); + for (Column updated : updatedColumns) { + if (!existingNames.contains(updated.getName())) { + merged.add(updated); + } + } + return merged; + } + private static int[] getRequiredColumnIndexes( ResolvedCatalogTable resolvedCatalogTable, @Nullable List columns) { if (columns == null) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java index 57f22238d0540..568d43e5b7c71 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java @@ -143,6 +143,21 @@ void testUpdateWithCustomColumns() { util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE b = '123'", explainDetails); } + @TestTemplate + void testUpdateWithRequiredColumnsExcludingUpdatedColumns() { + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double) WITH" + + " (" + + "'connector' = 'test-update-delete', " + + "'required-columns-for-update' = 'a', " + + "'update-mode' = '%s'" + + ") ", + updateMode)); + util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE a = 1", explainDetails); + } + @TestTemplate void testUpdateWithMetaColumns() { util.tableEnv() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java index e4e85545ea94e..78dd45e4ad617 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java @@ -124,6 +124,27 @@ void testPartialUpdate() throws Exception { .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); } + @TestTemplate + void testUpdateWithRequiredColumnsExcludingUpdatedColumns() throws Exception { + // Sink declares only the primary key as required while SET updates a non-required column. + String dataId = registerData(); + tEnv().executeSql( + String.format( + "CREATE TABLE t (" + + " a int PRIMARY KEY NOT ENFORCED," + + " b string not null," + + " c double not null) WITH" + + " ('connector' = 'test-update-delete', " + + "'data-id' = '%s'," + + " 'required-columns-for-update' = 'a', " + + " 'update-mode' = '%s')", + dataId, updateMode)); + tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await(); + List rows = toSortedResults(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows.toString()) + .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); + } + @TestTemplate void testStatementSetContainUpdateAndInsert() { tEnv().executeSql( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml index 399fcb0dd5e48..5e2ece1cb48b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml @@ -583,6 +583,142 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[me "side" : "second" } ] } ] +}]]> + + + + + + + + + +