From ac940a790e9d57337d6f37c36d350c5e22640bd3 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Tue, 19 May 2026 14:41:14 +0800 Subject: [PATCH 1/2] [FLINK-36735][table] Include updated columns when sink declares partial required columns for row-level update --- .../planner/connectors/DynamicSinkUtils.java | 22 ++- .../TestUpdateDeleteTableFactory.java | 21 ++- .../plan/batch/sql/RowLevelUpdateTest.java | 15 ++ .../runtime/batch/sql/UpdateTableITCase.java | 21 +++ .../plan/batch/sql/RowLevelUpdateTest.xml | 136 ++++++++++++++++++ 5 files changed, 213 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 219b995864fcd..fbc777db46e8f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -106,6 +106,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -522,6 +523,7 @@ private static RelNode convertUpdate( tableModify, contextResolvedTable, updateInfo, + updatedColumns, tableDebugName, dataTypeFactory, typeFactory); @@ -534,6 +536,20 @@ private static RelNode convertUpdate( return updateRelNodeAndRequireIndices.f0; } + /** Append updated columns that are missing from the sink-declared required columns. */ + private static List mergeRequiredAndUpdatedColumns( + List requiredColumns, List updatedColumns) { + Set existingNames = + requiredColumns.stream().map(Column::getName).collect(Collectors.toSet()); + List merged = new ArrayList<>(requiredColumns); + for (Column updated : updatedColumns) { + if (!existingNames.contains(updated.getName())) { + merged.add(updated); + } + } + return merged; + } + private static List getUpdatedColumns( LogicalTableModify tableModify, ResolvedSchema resolvedSchema) { List updatedColumns = new ArrayList<>(); @@ -709,13 +725,17 @@ private static Tuple2 convertToRowLevelUpdate( LogicalTableModify tableModify, ContextResolvedTable contextResolvedTable, SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo, + List updatedColumns, String tableDebugName, DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) { // get the required columns ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema(); Optional> optionalColumns = rowLevelUpdateInfo.requiredColumns(); - List requiredColumns = optionalColumns.orElse(resolvedSchema.getColumns()); + List requiredColumns = + optionalColumns + .map(cols -> mergeRequiredAndUpdatedColumns(cols, updatedColumns)) + .orElse(resolvedSchema.getColumns()); // get the root table scan which we may need rewrite it LogicalTableScan tableScan = getSourceTableScan(tableModify); Tuple2, List> colsIndexAndExtraMetaCols = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java index 7101e129918e8..377b90609633f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -76,6 +76,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -437,7 +438,9 @@ public Optional> requiredColumns() { resolvedCatalogTable.getResolvedSchema()); } requiredColumnIndices = - getRequiredColumnIndexes(resolvedCatalogTable, requiredCols); + getRequiredColumnIndexes( + resolvedCatalogTable, + mergeRequiredWithUpdatedColumns(requiredCols, updatedColumns)); return Optional.ofNullable(requiredCols); } @@ -750,6 +753,22 @@ public void executeTruncation() { } } + private static List mergeRequiredWithUpdatedColumns( + @Nullable List requiredColumns, List updatedColumns) { + if (requiredColumns == null) { + return null; + } + Set existingNames = + requiredColumns.stream().map(Column::getName).collect(Collectors.toSet()); + List merged = new ArrayList<>(requiredColumns); + for (Column updated : updatedColumns) { + if (!existingNames.contains(updated.getName())) { + merged.add(updated); + } + } + return merged; + } + private static int[] getRequiredColumnIndexes( ResolvedCatalogTable resolvedCatalogTable, @Nullable List columns) { if (columns == null) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java index 57f22238d0540..568d43e5b7c71 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java @@ -143,6 +143,21 @@ void testUpdateWithCustomColumns() { util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE b = '123'", explainDetails); } + @TestTemplate + void testUpdateWithRequiredColumnsExcludingUpdatedColumns() { + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double) WITH" + + " (" + + "'connector' = 'test-update-delete', " + + "'required-columns-for-update' = 'a', " + + "'update-mode' = '%s'" + + ") ", + updateMode)); + util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE a = 1", explainDetails); + } + @TestTemplate void testUpdateWithMetaColumns() { util.tableEnv() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java index e4e85545ea94e..78dd45e4ad617 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java @@ -124,6 +124,27 @@ void testPartialUpdate() throws Exception { .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); } + @TestTemplate + void testUpdateWithRequiredColumnsExcludingUpdatedColumns() throws Exception { + // Sink declares only the primary key as required while SET updates a non-required column. + String dataId = registerData(); + tEnv().executeSql( + String.format( + "CREATE TABLE t (" + + " a int PRIMARY KEY NOT ENFORCED," + + " b string not null," + + " c double not null) WITH" + + " ('connector' = 'test-update-delete', " + + "'data-id' = '%s'," + + " 'required-columns-for-update' = 'a', " + + " 'update-mode' = '%s')", + dataId, updateMode)); + tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await(); + List rows = toSortedResults(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows.toString()) + .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); + } + @TestTemplate void testStatementSetContainUpdateAndInsert() { tEnv().executeSql( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml index 399fcb0dd5e48..5e2ece1cb48b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml @@ -583,6 +583,142 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[me "side" : "second" } ] } ] +}]]> + + + + + + + + + + From 22fbe6acc8f5debe49f797f37e84a52d2249eacf Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Tue, 19 May 2026 10:05:53 +0800 Subject: [PATCH 2/2] [FLINK-36736][table] Fix target columns for row-level updates --- .../planner/connectors/DynamicSinkUtils.java | 27 +++-- .../TestUpdateDeleteTableFactory.java | 29 ++++- .../plan/batch/sql/RowLevelUpdateTest.java | 2 +- .../runtime/batch/sql/UpdateTableITCase.java | 35 +++++- .../plan/batch/sql/RowLevelUpdateTest.xml | 100 +++++++++--------- 5 files changed, 130 insertions(+), 63 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index fbc777db46e8f..1157ec3cb11c3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -300,8 +300,7 @@ private static RelNode convertSinkToRel( isOverwrite, sink, contextResolvedTable.getResolvedTable(), - sinkAbilitySpecs, - targetColumns); + sinkAbilitySpecs); // rewrite rel node for delete if (isDelete) { @@ -315,7 +314,7 @@ private static RelNode convertSinkToRel( typeFactory, sinkAbilitySpecs); } else if (isUpdate) { - input = + Tuple2 updateResult = convertUpdate( (LogicalTableModify) input, sink, @@ -324,7 +323,12 @@ private static RelNode convertSinkToRel( dataTypeFactory, typeFactory, sinkAbilitySpecs); + input = updateResult.f0; + // align target columns with the projected row delivered to the sink + targetColumns = toNestedIndexPaths(updateResult.f1); } + // apply target columns after UPDATE rewrite so the sink sees the final column set + validateAndApplyTargetColumns(sink, targetColumns, sinkAbilitySpecs); sinkAbilitySpecs.forEach(spec -> spec.apply(sink)); @@ -491,7 +495,7 @@ private static RelNode convertDelete( return deleteRelNodeAndRequireIndices.f0; } - private static RelNode convertUpdate( + private static Tuple2 convertUpdate( LogicalTableModify tableModify, DynamicTableSink sink, ContextResolvedTable contextResolvedTable, @@ -533,7 +537,7 @@ private static RelNode convertUpdate( updateInfo.getRowLevelUpdateMode(), context, updateRelNodeAndRequireIndices.f1)); - return updateRelNodeAndRequireIndices.f0; + return updateRelNodeAndRequireIndices; } /** Append updated columns that are missing from the sink-declared required columns. */ @@ -606,6 +610,14 @@ private static Tuple2 convertToRowLevelDelete( getPhysicalColumnIndices(colIndexes, resolvedSchema)); } + private static int[][] toNestedIndexPaths(int[] columnIndices) { + int[][] result = new int[columnIndices.length][]; + for (int i = 0; i < columnIndices.length; i++) { + result[i] = new int[] {columnIndices[i]}; + } + return result; + } + /** Return the indices from {@param colIndexes} that belong to physical column. */ private static int[] getPhysicalColumnIndices(List colIndexes, ResolvedSchema schema) { return colIndexes.stream() @@ -1046,8 +1058,7 @@ private static void prepareDynamicSink( boolean isOverwrite, DynamicTableSink sink, ResolvedCatalogTable table, - List sinkAbilitySpecs, - int[][] targetColumns) { + List sinkAbilitySpecs) { table.getDistribution() .ifPresent( distribution -> @@ -1059,8 +1070,6 @@ private static void prepareDynamicSink( validateAndApplyOverwrite(tableDebugName, isOverwrite, sink, sinkAbilitySpecs); validateAndApplyMetadata(tableDebugName, sink, table.getResolvedSchema(), sinkAbilitySpecs); - - validateAndApplyTargetColumns(sink, targetColumns, sinkAbilitySpecs); } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java index 377b90609633f..0569895dd4a37 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -43,6 +43,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting; import org.apache.flink.table.connector.sink.abilities.SupportsTruncate; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -157,12 +158,31 @@ public class TestUpdateDeleteTableFactory private static final AtomicInteger idCounter = new AtomicInteger(0); private static final Map> registeredRowData = new HashMap<>(); + private static final Map> capturedUpdateTargetColumns = + new HashMap<>(); + + private static final Map capturedAppliedTargetColumns = + new HashMap<>(); + public static String registerRowData(Collection data) { String id = String.valueOf(idCounter.incrementAndGet()); registeredRowData.put(id, data); return id; } + public static Optional getCapturedUpdateTargetColumns(ObjectIdentifier id) { + return capturedUpdateTargetColumns.get(id); + } + + public static int[][] getCapturedAppliedTargetColumns(ObjectIdentifier id) { + return capturedAppliedTargetColumns.get(id); + } + + public static void clearCapturedTargetColumns(ObjectIdentifier id) { + capturedUpdateTargetColumns.remove(id); + capturedAppliedTargetColumns.remove(id); + } + @Override public DynamicTableSink createDynamicTableSink(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); @@ -324,7 +344,7 @@ private static class TestScanContext implements RowLevelModificationScanContext /** A sink that supports row-level update. */ private static class SupportsRowLevelUpdateSink - implements DynamicTableSink, SupportsRowLevelUpdate { + implements DynamicTableSink, SupportsRowLevelUpdate, SupportsTargetColumnWriting { protected final ObjectIdentifier tableIdentifier; protected final ResolvedCatalogTable resolvedCatalogTable; @@ -377,6 +397,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + capturedUpdateTargetColumns.put(tableIdentifier, context.getTargetColumns()); return new DataStreamSinkProvider() { @Override @@ -418,6 +439,12 @@ public String asSummaryString() { return "SupportsRowLevelUpdateSink"; } + @Override + public boolean applyTargetColumns(int[][] targetColumns) { + capturedAppliedTargetColumns.put(tableIdentifier, targetColumns); + return false; + } + @Override public RowLevelUpdateInfo applyRowLevelUpdate( List updatedColumns, @Nullable RowLevelModificationScanContext context) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java index 568d43e5b7c71..9eeeeeb13d0f7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java @@ -144,7 +144,7 @@ void testUpdateWithCustomColumns() { } @TestTemplate - void testUpdateWithRequiredColumnsExcludingUpdatedColumns() { + void testUpdateColumnDisjointFromRequired() { util.tableEnv() .executeSql( String.format( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java index 78dd45e4ad617..011262bbf29b7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; import org.apache.flink.table.data.GenericRowData; @@ -41,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -62,6 +64,8 @@ private static Collection data() { @TestTemplate void testUpdate() throws Exception { String dataId = registerData(); + ObjectIdentifier tableId = ObjectIdentifier.of("default_catalog", "default_database", "t"); + TestUpdateDeleteTableFactory.clearCapturedTargetColumns(tableId); tEnv().executeSql( String.format( "CREATE TABLE t (" @@ -77,6 +81,14 @@ void testUpdate() throws Exception { assertThat(rows.toString()) .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uaa, 16.0]]"); + // Sink receives the full row, so both targetColumns paths should be [a,b,c]. + Optional captured = + TestUpdateDeleteTableFactory.getCapturedUpdateTargetColumns(tableId); + assertThat(captured).isPresent(); + assertThat(captured.get()).isEqualTo(new int[][] {{0}, {1}, {2}}); + int[][] applied = TestUpdateDeleteTableFactory.getCapturedAppliedTargetColumns(tableId); + assertThat(applied).isEqualTo(new int[][] {{0}, {1}, {2}}); + tEnv().executeSql("UPDATE t SET b = 'uab' WHERE a > (SELECT count(1) FROM t WHERE a > 1)") .await(); rows = toSortedResults(tEnv().executeSql("SELECT * FROM t")); @@ -87,6 +99,8 @@ void testUpdate() throws Exception { @TestTemplate void testPartialUpdate() throws Exception { String dataId = registerData(); + ObjectIdentifier tableId = ObjectIdentifier.of("default_catalog", "default_database", "t"); + TestUpdateDeleteTableFactory.clearCapturedTargetColumns(tableId); tEnv().executeSql( String.format( "CREATE TABLE t (" @@ -103,6 +117,14 @@ void testPartialUpdate() throws Exception { assertThat(rows.toString()) .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); + // Sink-required columns [a,b] should drive both targetColumns paths. + Optional captured = + TestUpdateDeleteTableFactory.getCapturedUpdateTargetColumns(tableId); + assertThat(captured).isPresent(); + assertThat(captured.get()).isEqualTo(new int[][] {{0}, {1}}); + int[][] applied = TestUpdateDeleteTableFactory.getCapturedAppliedTargetColumns(tableId); + assertThat(applied).isEqualTo(new int[][] {{0}, {1}}); + // test partial update with requiring partial primary keys dataId = registerData(); tEnv().executeSql( @@ -125,9 +147,11 @@ void testPartialUpdate() throws Exception { } @TestTemplate - void testUpdateWithRequiredColumnsExcludingUpdatedColumns() throws Exception { - // Sink declares only the primary key as required while SET updates a non-required column. + void testUpdateColumnDisjointFromRequired() throws Exception { + // The planner must merge updated columns into required columns for targetColumns. String dataId = registerData(); + ObjectIdentifier tableId = ObjectIdentifier.of("default_catalog", "default_database", "t"); + TestUpdateDeleteTableFactory.clearCapturedTargetColumns(tableId); tEnv().executeSql( String.format( "CREATE TABLE t (" @@ -143,6 +167,13 @@ void testUpdateWithRequiredColumnsExcludingUpdatedColumns() throws Exception { List rows = toSortedResults(tEnv().executeSql("SELECT * FROM t")); assertThat(rows.toString()) .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]"); + + Optional captured = + TestUpdateDeleteTableFactory.getCapturedUpdateTargetColumns(tableId); + assertThat(captured).isPresent(); + assertThat(captured.get()).isEqualTo(new int[][] {{0}, {1}}); + int[][] applied = TestUpdateDeleteTableFactory.getCapturedAppliedTargetColumns(tableId); + assertThat(applied).isEqualTo(new int[][] {{0}, {1}}); } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml index 5e2ece1cb48b5..6148f841a2de3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml @@ -19,17 +19,17 @@ limitations under the License. ($2, 123), 123, $0)], b=[IF(>($2, 123), _UTF-16LE'v2', $1)], c=[IF(>($2, 123), +($2, 1), $2)]) +- LogicalTableScan(table=[[default_catalog, default_database, t]]) == Optimized Physical Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[0],[1],[2]], fields=[a, b, c]) +- Calc(select=[IF(>(c, 123), 123, a) AS a, IF(>(c, 123), 'v2', b) AS b, IF(>(c, 123), +(c, 1), c) AS c]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) == Optimized Execution Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[0],[1],[2]], fields=[a, b, c]) +- Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) @@ -70,18 +70,18 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fi ($2, 123)]) +- LogicalTableScan(table=[[default_catalog, default_database, t]]) == Optimized Physical Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[0],[1],[2]], fields=[a, b, c]) +- Calc(select=[123 AS a, 'v2' AS b, +(c, 1) AS c], where=[>(c, 123)]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) == Optimized Execution Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[0],[1],[2]], fields=[a, b, c]) +- Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) @@ -133,17 +133,17 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[1],[0],[2]], fi ($4, 123), 123, $2)], b=[IF(>($4, 123), _UTF-16LE'v2', $3)], c=[IF(>($4, 123), +($4, 1), $4)]) +- LogicalTableScan(table=[[default_catalog, default_database, t]]) == Optimized Physical Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[2],[3],[4]], fields=[a, b, c]) +- Calc(select=[IF(>(c, 123), 123, a) AS a, IF(>(c, 123), 'v2', b) AS b, IF(>(c, 123), +(c, 1), c) AS c]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3]) == Optimized Execution Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[2],[3],[4]], fields=[a, b, c]) +- Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3]) @@ -184,18 +184,18 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fi ($4, 123)]) +- LogicalTableScan(table=[[default_catalog, default_database, t]]) == Optimized Physical Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[2],[3],[4]], fields=[a, b, c]) +- Calc(select=[123 AS a, 'v2' AS b, +(c, 1) AS c], where=[>(c, 123)]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3]) == Optimized Execution Plan == -Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fields=[a, b, c]) +Sink(table=[default_catalog.default_database.t], targetColumns=[[2],[3],[4]], fields=[a, b, c]) +- Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)]) +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3]) @@ -247,17 +247,17 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[3],[2],[4]], fi - + - +