Skip to content
Merged
12 changes: 12 additions & 0 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,18 @@ SELECT * FROM TO_CHANGELOG(
-- op_code values are 'I' and 'U' instead of full names
```

#### Upsert stream

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE upsert_source PARTITION BY id,
op_mapping => MAP['INSERT, UPDATE_AFTER', 'u', 'DELETE', 'd']
)
-- INSERT and UPDATE_AFTER produce op='u' (upsert)
-- DELETE produces op='d'
-- UPDATE_BEFORE is omitted
```

#### Deletion flag pattern

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -806,22 +807,53 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.name("TO_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
// Row semantics (no PARTITION BY).
// With PARTITION BY, switches to set
// semantics for co-located parallel execution.
// Row semantics by default; PARTITION BY switches to set semantics.
// REQUIRE_UPDATE_BEFORE / REQUIRE_FULL_DELETE are conditional so the
// planner can skip ChangelogNormalize for upsert sources whose
// op_mapping does not emit UB or full deletes.
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
StaticArgumentTrait.REQUIRE_FULL_DELETE))
StaticArgumentTrait.SUPPORT_UPDATES))
.withConditionalTrait(
StaticArgumentTrait.SET_SEMANTIC_TABLE,
TraitCondition.hasPartitionBy()),
TraitCondition.hasPartitionBy())
.withConditionalTrait(
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
TraitCondition.or(
// op_mapping omitted: default mapping includes
// UPDATE_BEFORE.
TraitCondition.not(
TraitCondition.argIsPresent(
"op_mapping")),
TraitCondition.argMatches(
"op_mapping",
Map.class,
mapping ->
opMappingContainsKey(
(Map<String, String>)
mapping,
"UPDATE_BEFORE"))))
.withConditionalTrait(
StaticArgumentTrait.REQUIRE_FULL_DELETE,
TraitCondition.or(
// op_mapping omitted: default mapping includes
// DELETE.
TraitCondition.not(
TraitCondition.argIsPresent(
"op_mapping")),
TraitCondition.argMatches(
"op_mapping",
Map.class,
mapping ->
opMappingContainsKey(
(Map<String, String>)
mapping,
"DELETE")))),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
Expand Down Expand Up @@ -3377,6 +3409,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final List<FunctionDefinition> ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC);

/**
* True when {@code key} appears among the {@code op_mapping} keys. Each map key may itself be a
* comma-separated list (e.g. {@code "INSERT,UPDATE_AFTER"}) - each part is trimmed and compared
* against {@code key}, so one entry can cover multiple change kinds.
*
* <p>Used by {@code TO_CHANGELOG} conditional traits to inspect the user-provided {@code
* op_mapping} argument.
*/
private static boolean opMappingContainsKey(
final Map<String, String> opMapping, final String key) {
return opMapping.keySet().stream()
.flatMap(k -> Arrays.stream(k.split(",")))
.map(String::trim)
.anyMatch(key::equals);
}

@Internal
public static List<BuiltInFunctionDefinition> getDefinitions() {
final Field[] fields = BuiltInFunctionDefinitions.class.getFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ final class BuiltInCondition implements TraitCondition {
enum Kind {
HAS_PARTITION_BY,
ARG_IS_EQUAL_TO,
NOT
ARG_IS_PRESENT,
NOT,
OR,
ARG_MATCHES
}

private final Kind kind;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;

import java.util.List;
import java.util.function.Predicate;

/**
* A condition that determines whether a conditional trait on a {@link StaticArgument} should be
Expand All @@ -32,6 +33,9 @@
* <p>Implementations must implement {@code hashCode} and {@code equals} for {@link
* StaticArgument#equals}/{@link StaticArgument#hashCode} to work correctly. The built-in factories
* below return value-comparable instances; user-supplied lambdas do not - prefer the factories.
* {@link #argMatches} accepts a caller-supplied {@code Predicate} and is therefore only value-equal
* when the same {@code Predicate} reference is reused; build the predicate once and cache it if
* equality matters.
*
* <pre>{@code
* import static org.apache.flink.table.types.inference.TraitCondition.*;
Expand Down Expand Up @@ -68,4 +72,39 @@ static TraitCondition not(final TraitCondition condition) {
return new BuiltInCondition(
BuiltInCondition.Kind.NOT, List.of(condition), ctx -> !condition.test(ctx));
}

/**
* True when either the {@code left} or the {@code right} {@link TraitCondition} evaluates to
* true.
*/
static TraitCondition or(final TraitCondition left, final TraitCondition right) {
return new BuiltInCondition(
BuiltInCondition.Kind.OR,
List.of(left, right),
ctx -> left.test(ctx) || right.test(ctx));
}

/** True when the named scalar argument was provided by the caller. */
static TraitCondition argIsPresent(final String argName) {
return new BuiltInCondition(
BuiltInCondition.Kind.ARG_IS_PRESENT,
List.of(argName),
ctx -> ctx.hasScalarArgument(argName));
}

/**
* True when the named scalar argument is present and its value matches {@code predicate}. False
* when the argument is absent, cannot be resolved as a literal of {@code argClass}, or {@code
* predicate} evaluates to {@code false}.
*
* <p>Use this for ad-hoc conditions on scalar literals. Prefer the named factories above when
* one fits.
*/
static <T> TraitCondition argMatches(
final String argName, final Class<T> argClass, final Predicate<T> predicate) {
return new BuiltInCondition(
BuiltInCondition.Kind.ARG_MATCHES,
List.of(argName, argClass, predicate),
ctx -> ctx.getScalarArgument(argName, argClass).stream().anyMatch(predicate));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public interface TraitContext {
*/
<T> Optional<T> getScalarArgument(String name, Class<T> clazz);

/**
* Whether the named scalar argument was provided by the caller. Returns false when the argument
* was omitted, or when an argument with the given name is not declared by the function.
*/
boolean hasScalarArgument(String name);

/**
* Builds a {@link TraitContext} from validation-time inputs.
*
Expand Down Expand Up @@ -76,6 +82,17 @@ public <T> Optional<T> getScalarArgument(final String name, final Class<T> clazz
}
return Optional.empty();
}

@Override
public boolean hasScalarArgument(final String name) {
for (int i = 0; i < staticArgs.size(); i++) {
final StaticArgument arg = staticArgs.get(i);
if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) {
return !callContext.isArgumentNull(i);
}
}
return false;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,17 @@ public <T> Optional<T> getScalarArgument(String name, Class<T> clazz) {
}
return Optional.empty();
}

@Override
public boolean hasScalarArgument(String name) {
for (int i = 0; i < declared.size(); i++) {
final StaticArgument arg = declared.get(i);
if (arg.is(StaticArgumentTrait.SCALAR) && arg.getName().equals(name)) {
return !callContext.isArgumentNull(i);
}
}
return false;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public List<TableTestProgram> programs() {
ToChangelogTestPrograms.INSERT,
ToChangelogTestPrograms.RETRACT,
ToChangelogTestPrograms.UPSERT,
ToChangelogTestPrograms.UPSERT_PARTITION_BY,
ToChangelogTestPrograms.UPSERT_PARTITION_BY_KEY_ONLY_DELETES,
ToChangelogTestPrograms.RETRACT_PARTITION_BY,
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
Expand All @@ -52,6 +54,7 @@ public List<TableTestProgram> programs() {
ToChangelogTestPrograms.DELETION_FLAG,
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
ToChangelogTestPrograms.INVALID_OP_MAPPING,
ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND,
ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,68 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)")
.build();

public static final TableTestProgram UPSERT_PARTITION_BY =
Comment thread
fhueske marked this conversation as resolved.
TableTestProgram.of(
"to-changelog-upsert-partition-by",
"PARTITION BY upsert key + mapping without UB skips ChangelogNormalize")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
.addMode(ChangelogMode.upsert())
.producedValues(
Row.ofKind(RowKind.INSERT, "Alice", 10L),
Row.ofKind(RowKind.INSERT, "Bob", 20L),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L),
Row.ofKind(RowKind.DELETE, "Bob", 20L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("name STRING", "op STRING", "score BIGINT")
.consumedValues(
"+I[Alice, C, 10]",
"+I[Bob, C, 20]",
"+I[Alice, C, 30]",
"+I[Bob, D, 20]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY name, "
+ "op => DESCRIPTOR(op), "
+ "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])")
.build();

public static final TableTestProgram UPSERT_PARTITION_BY_KEY_ONLY_DELETES =
TableTestProgram.of(
"to-changelog-upsert-partition-by-key-only-deletes",
"PARTITION BY upsert key + mapping without UB/DELETE handles key-only deletes")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
.addMode(ChangelogMode.upsert(true))
.producedValues(
Row.ofKind(RowKind.INSERT, "Alice", 10L),
Row.ofKind(RowKind.INSERT, "Bob", 20L),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L),
// Key-only delete: source only knows the key.
Row.ofKind(RowKind.DELETE, "Bob", null))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("name STRING", "op STRING", "score BIGINT")
.consumedValues(
"+I[Alice, U, 10]",
"+I[Bob, U, 20]",
"+I[Alice, U, 30]")
Comment thread
fhueske marked this conversation as resolved.
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY name, "
+ "op => DESCRIPTOR(op), "
+ "op_mapping => MAP['INSERT,UPDATE_AFTER', 'U'])")
.build();

public static final TableTestProgram CUSTOM_OP_MAPPING =
TableTestProgram.of(
"to-changelog-custom-op-mapping",
Expand Down Expand Up @@ -511,6 +573,19 @@ public class ToChangelogTestPrograms {
"Unknown change operation: 'INVALID_KIND'")
.build();

public static final TableTestProgram OP_MAPPING_REFERENCES_UNSUPPORTED_KIND =
TableTestProgram.of(
"to-changelog-op-mapping-references-unsupported-kind",
"fails when op_mapping references a change operation the input cannot produce")
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t, "
+ "op_mapping => MAP['INSERT', 'I', 'DELETE', 'D'])",
ValidationException.class,
"the input table only produces [INSERT] and does not produce [DELETE]")
.build();

public static final TableTestProgram DUPLICATE_ROW_KIND =
TableTestProgram.of(
"to-changelog-duplicate-rowkind",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,44 @@ void testRetractPartitionBy() {
"SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)",
CHANGELOG_MODE);
}

@Test
void testUpsertPartitionBy() {
util.tableEnv()
.executeSql(
"CREATE TABLE upsert_source ("
+ " id INT,"
+ " name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'changelog-mode' = 'I,UA,D'"
+ ")");
util.verifyRelPlan(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE upsert_source PARTITION BY id, "
+ "op => DESCRIPTOR(op), "
+ "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])",
CHANGELOG_MODE);
}

@Test
void testUpsertPartitionByNoUpdateBeforeAndDelete() {
util.tableEnv()
.executeSql(
"CREATE TABLE upsert_source ("
+ " id INT,"
+ " name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'changelog-mode' = 'I,UA,D'"
+ ")");
util.verifyRelPlan(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE upsert_source PARTITION BY id, "
+ "op => DESCRIPTOR(op), "
+ "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])",
CHANGELOG_MODE);
}
}
Loading