Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;

import java.util.Map;
import java.util.Set;

public class DataSourceConfigValidator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final Set<String> ALLOW_SOURCE_KEYS = Sets.newHashSet(
DataSourceConfigKeys.JDBC_URL,
DataSourceConfigKeys.USER,
Expand All @@ -51,7 +56,8 @@ public class DataSourceConfigValidator {

private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";

public static void validateSource(Map<String, String> input) throws IllegalArgumentException {
public static void validateSource(Map<String, String> input,
String dataSourceType) throws IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
Expand Down Expand Up @@ -79,7 +85,7 @@ public static void validateSource(Map<String, String> input) throws IllegalArgum
throw new IllegalArgumentException("Unexpected key: '" + key + "'");
}

if (!isValidValue(key, value)) {
if (!isValidValue(key, value, dataSourceType)) {
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value);
}
}
Expand All @@ -103,18 +109,51 @@ public static void validateTarget(Map<String, String> input) throws IllegalArgum
}
}

private static boolean isValidValue(String key, String value) {
private static boolean isValidValue(String key, String value, String dataSourceType) {
if (value == null || value.isEmpty()) {
return false;
}

if (key.equals(DataSourceConfigKeys.OFFSET)
&& !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
|| value.equals(DataSourceConfigKeys.OFFSET_LATEST)
|| value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
if (key.equals(DataSourceConfigKeys.OFFSET)) {
return isValidOffset(value, dataSourceType);
}
return true;
}

/**
* Check if the offset value is valid for the given data source type.
* Supported: initial, snapshot, latest, JSON binlog/lsn position.
* earliest is only supported for MySQL.
*/
public static boolean isValidOffset(String offset, String dataSourceType) {
if (offset == null || offset.isEmpty()) {
return false;
}
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset)) {
return true;
}
// earliest only for MySQL
if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) {
return DataSourceType.MYSQL.name().equalsIgnoreCase(dataSourceType);
}
if (isJsonOffset(offset)) {
return true;
}
return false;
}

public static boolean isJsonOffset(String offset) {
if (offset == null || offset.trim().isEmpty()) {
return false;
}
try {
JsonNode node = OBJECT_MAPPER.readTree(offset);
return node.isObject();
} catch (Exception e) {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.transaction.TransactionException;
Expand Down Expand Up @@ -306,9 +305,8 @@ private void initInsertJob() {
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
this.offsetProvider.initOnCreate();
// validate offset props, only for s3 cause s3 tvf no offset prop
if (jobProperties.getOffsetProperty() != null
&& S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) {
// validate offset props
if (jobProperties.getOffsetProperty() != null) {
Offset offset = validateOffset(jobProperties.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
}
Expand Down Expand Up @@ -780,8 +778,16 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
*/
private void modifyPropertiesInternal(Map<String, String> inputProperties) throws AnalysisException, JobException {
StreamingJobProperties inputStreamProps = new StreamingJobProperties(inputProperties);
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
&& S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
// For CDC jobs, ALTER only supports JSON specific offset (e.g. binlog position or LSN),
// named modes like initial/latest/snapshot are only valid at CREATE time.
if (offsetProvider instanceof JdbcSourceOffsetProvider
&& !DataSourceConfigValidator.isJsonOffset(inputStreamProps.getOffsetProperty())) {
throw new AnalysisException(
"ALTER JOB for CDC only supports JSON specific offset, "
+ "e.g. '{\"file\":\"binlog.000001\",\"pos\":\"154\"}' for MySQL "
+ "or '{\"lsn\":\"12345678\"}' for PostgreSQL");
}
Offset offset = validateOffset(inputStreamProps.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
if (Config.isCloudMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public class JdbcOffset implements Offset {

@Override
public String toSerializedJson() {
return null;
if (splits == null || splits.isEmpty()) {
return null;
}
return new Gson().toJson(splits);
}

@Override
Expand All @@ -58,7 +61,7 @@ public boolean isEmpty() {

@Override
public boolean isValidOffset() {
return false;
return splits != null && !splits.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
Expand Down Expand Up @@ -360,7 +361,28 @@ public Offset deserializeOffset(String offset) {

@Override
public Offset deserializeOffsetProperty(String offset) {
// no need cause cdc_stream has offset property
if (offset == null || offset.trim().isEmpty()) {
return null;
}
// Named modes: stored in sourceProperties.offset, CDC client reads it directly.
// Return a placeholder JdbcOffset so validateOffset() passes.
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)) {
return new JdbcOffset(Collections.singletonList(new BinlogSplit()));
}
// JSON format: {"file":"binlog.000003","pos":154} or {"lsn":"123456"}
if (DataSourceConfigValidator.isJsonOffset(offset)) {
try {
Map<String, String> offsetMap = objectMapper.readValue(offset,
new TypeReference<Map<String, String>>() {});
return new JdbcOffset(Collections.singletonList(new BinlogSplit(offsetMap)));
} catch (Exception e) {
log.warn("Failed to parse JSON offset: {}", offset, e);
return null;
}
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private void validate() throws Exception {
boolean sourcePropModified =
isPropertiesModified(streamingJob.getSourceProperties(), this.getSourceProperties());
if (sourcePropModified) {
DataSourceConfigValidator.validateSource(this.getSourceProperties());
DataSourceConfigValidator.validateSource(this.getSourceProperties(),
streamingJob.getSourceType());
checkUnmodifiableSourceProperties(streamingJob.getSourceProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private void validate() throws JobException {
}

if (StringUtils.isNotEmpty(createJobInfo.getSourceType())) {
DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties());
DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties(),
createJobInfo.getSourceType());
DataSourceConfigValidator.validateTarget(createJobInfo.getTargetProperties());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,10 @@ private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) {
case EARLIEST_OFFSET:
startingOffset = createInitialOffset();
break;
case TIMESTAMP:
case SPECIFIC_OFFSETS:
startingOffset = createOffset(startupOptions.getOffset());
break;
case TIMESTAMP:
case COMMITTED_OFFSETS:
default:
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,16 @@ private PostgresSourceConfig generatePostgresConfig(
} else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.latest());
} else if (ConfigUtil.isJson(startupMode)) {
throw new RuntimeException("Unsupported json offset " + startupMode);
Map<String, String> offsetMap = ConfigUtil.toStringMap(startupMode);
if (offsetMap == null || !offsetMap.containsKey(SourceInfo.LSN_KEY)) {
throw new RuntimeException("JSON offset for PostgreSQL must contain 'lsn' key, got: " + startupMode);
}
// Ensure ts_usec is present (required by PostgresOffset)
if (!offsetMap.containsKey(SourceInfo.TIMESTAMP_USEC_KEY)) {
offsetMap.put(SourceInfo.TIMESTAMP_USEC_KEY,
String.valueOf(Conversions.toEpochMicros(Instant.MIN)));
}
configFactory.startupOptions(StartupOptions.specificOffset(offsetMap));
} else if (ConfigUtil.is13Timestamp(startupMode)) {
// start from timestamp
Long ts = Long.parseLong(startupMode);
Expand Down
Loading
Loading