Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -522,6 +523,7 @@ private static RelNode convertUpdate(
tableModify,
contextResolvedTable,
updateInfo,
updatedColumns,
tableDebugName,
dataTypeFactory,
typeFactory);
Expand All @@ -534,6 +536,20 @@ private static RelNode convertUpdate(
return updateRelNodeAndRequireIndices.f0;
}

/** Append updated columns that are missing from the sink-declared required columns. */
private static List<Column> mergeRequiredAndUpdatedColumns(
List<Column> requiredColumns, List<Column> updatedColumns) {
Set<String> existingNames =
requiredColumns.stream().map(Column::getName).collect(Collectors.toSet());
List<Column> merged = new ArrayList<>(requiredColumns);
for (Column updated : updatedColumns) {
if (!existingNames.contains(updated.getName())) {
merged.add(updated);
}
}
return merged;
}

private static List<Column> getUpdatedColumns(
LogicalTableModify tableModify, ResolvedSchema resolvedSchema) {
List<Column> updatedColumns = new ArrayList<>();
Expand Down Expand Up @@ -709,13 +725,17 @@ private static Tuple2<RelNode, int[]> convertToRowLevelUpdate(
LogicalTableModify tableModify,
ContextResolvedTable contextResolvedTable,
SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo,
List<Column> updatedColumns,
String tableDebugName,
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory) {
// get the required columns
ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema();
Optional<List<Column>> optionalColumns = rowLevelUpdateInfo.requiredColumns();
List<Column> requiredColumns = optionalColumns.orElse(resolvedSchema.getColumns());
List<Column> requiredColumns =
optionalColumns
.map(cols -> mergeRequiredAndUpdatedColumns(cols, updatedColumns))
.orElse(resolvedSchema.getColumns());
// get the root table scan which we may need rewrite it
LogicalTableScan tableScan = getSourceTableScan(tableModify);
Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.data.RowData.createFieldGetter;
Expand Down Expand Up @@ -437,7 +438,9 @@ public Optional<List<Column>> requiredColumns() {
resolvedCatalogTable.getResolvedSchema());
}
requiredColumnIndices =
getRequiredColumnIndexes(resolvedCatalogTable, requiredCols);
getRequiredColumnIndexes(
resolvedCatalogTable,
mergeRequiredWithUpdatedColumns(requiredCols, updatedColumns));
return Optional.ofNullable(requiredCols);
}

Expand Down Expand Up @@ -750,6 +753,22 @@ public void executeTruncation() {
}
}

private static List<Column> mergeRequiredWithUpdatedColumns(
@Nullable List<Column> requiredColumns, List<Column> updatedColumns) {
if (requiredColumns == null) {
return null;
}
Set<String> existingNames =
requiredColumns.stream().map(Column::getName).collect(Collectors.toSet());
List<Column> merged = new ArrayList<>(requiredColumns);
for (Column updated : updatedColumns) {
if (!existingNames.contains(updated.getName())) {
merged.add(updated);
}
}
return merged;
}

private static int[] getRequiredColumnIndexes(
ResolvedCatalogTable resolvedCatalogTable, @Nullable List<Column> columns) {
if (columns == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ void testUpdateWithCustomColumns() {
util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE b = '123'", explainDetails);
}

@TestTemplate
void testUpdateWithRequiredColumnsExcludingUpdatedColumns() {
util.tableEnv()
.executeSql(
String.format(
"CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double) WITH"
+ " ("
+ "'connector' = 'test-update-delete', "
+ "'required-columns-for-update' = 'a', "
+ "'update-mode' = '%s'"
+ ") ",
updateMode));
util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE a = 1", explainDetails);
}

@TestTemplate
void testUpdateWithMetaColumns() {
util.tableEnv()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ void testPartialUpdate() throws Exception {
.isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
}

@TestTemplate
void testUpdateWithRequiredColumnsExcludingUpdatedColumns() throws Exception {
// Sink declares only the primary key as required while SET updates a non-required column.
String dataId = registerData();
tEnv().executeSql(
String.format(
"CREATE TABLE t ("
+ " a int PRIMARY KEY NOT ENFORCED,"
+ " b string not null,"
+ " c double not null) WITH"
+ " ('connector' = 'test-update-delete', "
+ "'data-id' = '%s',"
+ " 'required-columns-for-update' = 'a', "
+ " 'update-mode' = '%s')",
dataId, updateMode));
tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await();
List<String> rows = toSortedResults(tEnv().executeSql("SELECT * FROM t"));
assertThat(rows.toString())
.isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
}

@TestTemplate
void testStatementSetContainUpdateAndInsert() {
tEnv().executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,142 @@ Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[me
"side" : "second"
} ]
} ]
}]]>
</Resource>
</TestCase>
<TestCase name="testUpdateWithRequiredColumnsExcludingUpdatedColumns[updateMode = ALL_ROWS]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- LogicalProject(a=[$0], b=[IF(=($0, 1), _UTF-16LE'v2', $1)])
+- LogicalTableScan(table=[[default_catalog, default_database, t]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- Calc(select=[a, IF(=(a, 1), 'v2', b) AS b])
+- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- Calc(select=[a, IF((a = 1), 'v2', b) AS b])
+- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])

== Physical Execution Plan ==
{
"nodes" : [ {
"id" : ,
"type" : "Source: t[]",
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
"parallelism" : 1
}, {
"id" : ,
"type" : "Calc[]",
"pact" : "Operator",
"contents" : "[]:Calc(select=[a, IF((a = 1), 'v2', b) AS b])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "ConstraintEnforcer[]",
"pact" : "Operator",
"contents" : "[]:ConstraintEnforcer[NotNullEnforcer(fields=[a])]",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}]]>
</Resource>
</TestCase>
<TestCase name="testUpdateWithRequiredColumnsExcludingUpdatedColumns[updateMode = UPDATED_ROWS]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- LogicalProject(a=[$0], b=[_UTF-16LE'v2'])
+- LogicalFilter(condition=[=($0, 1)])
+- LogicalTableScan(table=[[default_catalog, default_database, t]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- Calc(select=[1 AS a, 'v2' AS b], where=[=(a, 1)])
+- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], fields=[a, b])
+- Calc(select=[1 AS a, 'v2' AS b], where=[(a = 1)])
+- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])

== Physical Execution Plan ==
{
"nodes" : [ {
"id" : ,
"type" : "Source: t[]",
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
"parallelism" : 1
}, {
"id" : ,
"type" : "Calc[]",
"pact" : "Operator",
"contents" : "[]:Calc(select=[1 AS a, 'v2' AS b], where=[(a = 1)])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "ConstraintEnforcer[]",
"pact" : "Operator",
"contents" : "[]:ConstraintEnforcer[NotNullEnforcer(fields=[a])]",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "RowKindSetter[]",
"pact" : "Operator",
"contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 1,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}]]>
</Resource>
</TestCase>
Expand Down