diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index b75e202b1a842e..5ef98668cbab12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -18,14 +18,19 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.common.DataSourceType; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import java.util.Map; import java.util.Set; public class DataSourceConfigValidator { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Set ALLOW_SOURCE_KEYS = Sets.newHashSet( DataSourceConfigKeys.JDBC_URL, DataSourceConfigKeys.USER, @@ -51,7 +56,8 @@ public class DataSourceConfigValidator { private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + "."; - public static void validateSource(Map input) throws IllegalArgumentException { + public static void validateSource(Map input, + String dataSourceType) throws IllegalArgumentException { for (Map.Entry entry : input.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); @@ -79,7 +85,7 @@ public static void validateSource(Map input) throws IllegalArgum throw new IllegalArgumentException("Unexpected key: '" + key + "'"); } - if (!isValidValue(key, value)) { + if (!isValidValue(key, value, dataSourceType)) { throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value); } } @@ -103,18 +109,51 @@ public static void validateTarget(Map input) throws IllegalArgum } } - private static boolean isValidValue(String key, String value) { + private static boolean isValidValue(String key, String value, String dataSourceType) { if (value == null || value.isEmpty()) { return false; } - if (key.equals(DataSourceConfigKeys.OFFSET) - && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL) - || value.equals(DataSourceConfigKeys.OFFSET_LATEST) - || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) { - return false; + if (key.equals(DataSourceConfigKeys.OFFSET)) { + return isValidOffset(value, dataSourceType); } return true; } + /** + * Check if the offset value is valid for the given data source type. + * Supported: initial, snapshot, latest, JSON binlog/lsn position. + * earliest is only supported for MySQL. + */ + public static boolean isValidOffset(String offset, String dataSourceType) { + if (offset == null || offset.isEmpty()) { + return false; + } + if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset)) { + return true; + } + // earliest only for MySQL + if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) { + return DataSourceType.MYSQL.name().equalsIgnoreCase(dataSourceType); + } + if (isJsonOffset(offset)) { + return true; + } + return false; + } + + public static boolean isJsonOffset(String offset) { + if (offset == null || offset.trim().isEmpty()) { + return false; + } + try { + JsonNode node = OBJECT_MAPPER.readTree(offset); + return node.isObject(); + } catch (Exception e) { + return false; + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index d7a325e22f541e..3665cdc98217af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -73,7 +73,6 @@ import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; -import org.apache.doris.tablefunction.S3TableValuedFunction; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; import org.apache.doris.transaction.TransactionException; @@ -306,9 +305,8 @@ private void initInsertJob() { this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName()); this.offsetProvider.ensureInitialized(getJobId(), originTvfProps); this.offsetProvider.initOnCreate(); - // validate offset props, only for s3 cause s3 tvf no offset prop - if (jobProperties.getOffsetProperty() != null - && S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) { + // validate offset props + if (jobProperties.getOffsetProperty() != null) { Offset offset = validateOffset(jobProperties.getOffsetProperty()); this.offsetProvider.updateOffset(offset); } @@ -780,8 +778,16 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { */ private void modifyPropertiesInternal(Map inputProperties) throws AnalysisException, JobException { StreamingJobProperties inputStreamProps = new StreamingJobProperties(inputProperties); - if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) - && S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) { + if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) { + // For CDC jobs, ALTER only supports JSON specific offset (e.g. binlog position or LSN), + // named modes like initial/latest/snapshot are only valid at CREATE time. + if (offsetProvider instanceof JdbcSourceOffsetProvider + && !DataSourceConfigValidator.isJsonOffset(inputStreamProps.getOffsetProperty())) { + throw new AnalysisException( + "ALTER JOB for CDC only supports JSON specific offset, " + + "e.g. '{\"file\":\"binlog.000001\",\"pos\":\"154\"}' for MySQL " + + "or '{\"lsn\":\"12345678\"}' for PostgreSQL"); + } Offset offset = validateOffset(inputStreamProps.getOffsetProperty()); this.offsetProvider.updateOffset(offset); if (Config.isCloudMode()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java index dea2c244d6d78c..faf39dbe30899d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java @@ -48,7 +48,10 @@ public class JdbcOffset implements Offset { @Override public String toSerializedJson() { - return null; + if (splits == null || splits.isEmpty()) { + return null; + } + return new Gson().toJson(splits); } @Override @@ -58,7 +61,7 @@ public boolean isEmpty() { @Override public boolean isValidOffset() { - return false; + return splits != null && !splits.isEmpty(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index f6bbdcc5b56386..75a8c880684e74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -29,6 +29,7 @@ import org.apache.doris.job.cdc.split.SnapshotSplit; import org.apache.doris.job.common.DataSourceType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; @@ -360,7 +361,28 @@ public Offset deserializeOffset(String offset) { @Override public Offset deserializeOffsetProperty(String offset) { - // no need cause cdc_stream has offset property + if (offset == null || offset.trim().isEmpty()) { + return null; + } + // Named modes: stored in sourceProperties.offset, CDC client reads it directly. + // Return a placeholder JdbcOffset so validateOffset() passes. + if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)) { + return new JdbcOffset(Collections.singletonList(new BinlogSplit())); + } + // JSON format: {"file":"binlog.000003","pos":154} or {"lsn":"123456"} + if (DataSourceConfigValidator.isJsonOffset(offset)) { + try { + Map offsetMap = objectMapper.readValue(offset, + new TypeReference>() {}); + return new JdbcOffset(Collections.singletonList(new BinlogSplit(offsetMap))); + } catch (Exception e) { + log.warn("Failed to parse JSON offset: {}", offset, e); + return null; + } + } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index fc63710f93ddf1..029bf932e5cc82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -166,7 +166,8 @@ private void validate() throws Exception { boolean sourcePropModified = isPropertiesModified(streamingJob.getSourceProperties(), this.getSourceProperties()); if (sourcePropModified) { - DataSourceConfigValidator.validateSource(this.getSourceProperties()); + DataSourceConfigValidator.validateSource(this.getSourceProperties(), + streamingJob.getDataSourceType().name()); checkUnmodifiableSourceProperties(streamingJob.getSourceProperties()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java index 1640143d277f01..ff278293e8216d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -74,7 +74,8 @@ private void validate() throws JobException { } if (StringUtils.isNotEmpty(createJobInfo.getSourceType())) { - DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties()); + DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties(), + createJobInfo.getSourceType()); DataSourceConfigValidator.validateTarget(createJobInfo.getTargetProperties()); } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 3d9167d6124f1d..3c0e6a8d001f8c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -720,8 +720,10 @@ private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) { case EARLIEST_OFFSET: startingOffset = createInitialOffset(); break; - case TIMESTAMP: case SPECIFIC_OFFSETS: + startingOffset = createOffset(startupOptions.getOffset()); + break; + case TIMESTAMP: case COMMITTED_OFFSETS: default: throw new IllegalStateException( diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index b6d28510613c94..e4fcd44b35015c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -206,7 +206,16 @@ private PostgresSourceConfig generatePostgresConfig( } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { - throw new RuntimeException("Unsupported json offset " + startupMode); + Map offsetMap = ConfigUtil.toStringMap(startupMode); + if (offsetMap == null || !offsetMap.containsKey(SourceInfo.LSN_KEY)) { + throw new RuntimeException("JSON offset for PostgreSQL must contain 'lsn' key, got: " + startupMode); + } + // Ensure ts_usec is present (required by PostgresOffset) + if (!offsetMap.containsKey(SourceInfo.TIMESTAMP_USEC_KEY)) { + offsetMap.put(SourceInfo.TIMESTAMP_USEC_KEY, + String.valueOf(Conversions.toEpochMicros(Instant.MIN))); + } + configFactory.startupOptions(StartupOptions.specificOffset(offsetMap)); } else if (ConfigUtil.is13Timestamp(startupMode)) { // start from timestamp Long ts = Long.parseLong(startupMode); diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy new file mode 100644 index 00000000000000..c7757e3094f4cc --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_special_offset", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_special_offset" + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_mysql_tbl" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // ===== Test 1: offset = earliest, verify data synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "earliest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + def rows = sql """SELECT * FROM ${currentDb}.${table1} order by id""" + assert rows.size() == 2 + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 2: offset = latest, then insert new data, verify synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + // wait for job to be running + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + // insert new data after job started with latest offset + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 1 + }) + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 3: ALTER JOB with JSON binlog offset via PROPERTIES ===== + // Get current binlog position, then create job with initial + def alterBinlogFile = "" + def alterBinlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + def masterStatus = sql """SHOW MASTER STATUS""" + alterBinlogFile = masterStatus[0][0] + alterBinlogPos = masterStatus[0][1].toString() + log.info("ALTER test binlog position: file=${alterBinlogFile}, pos=${alterBinlogPos}") + // insert data after this position + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (20, 'alter_test1')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (21, 'alter_test2')""" + } + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + // pause, then alter offset to specific binlog position via PROPERTIES + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def alterOffsetJson = """{"file":"${alterBinlogFile}","pos":"${alterBinlogPos}"}""" + log.info("ALTER offset: ${alterOffsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${alterOffsetJson}') + """ + sql "RESUME JOB where jobname = '${jobName}'" + // after alter to specific binlog position, data inserted after that position should sync + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1} WHERE id IN (20, 21)""" + return result[0][0] >= 2 + }) + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 3b: ALTER with named mode should fail for CDC ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + test { + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = 'initial') + """ + exception "ALTER JOB for CDC only supports JSON specific offset" + } + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 4: invalid offset format ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "not_valid_offset" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // ===== Test 5: JSON binlog offset, verify data synced ===== + // Get current binlog position, insert data after it, then create job from that position + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("Current binlog position: file=${binlogFile}, pos=${binlogPos}") + // insert data after this binlog position + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (10, 'specific1')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (11, 'specific2')""" + } + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("Using JSON offset: ${offsetJson}") + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = '${offsetJson}' + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + def specificRows = sql """SELECT * FROM ${currentDb}.${table1} WHERE id IN (10, 11) order by id""" + log.info("specificRows: " + specificRows) + assert specificRows.size() == 2 + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // cleanup MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy new file mode 100644 index 00000000000000..9b89d43334fa0f --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_special_offset_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_special_offset_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + // run in cloud and not cloud + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_restart_tbl" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare MySQL source table and get binlog position + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + // get current binlog position + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("Binlog position: file=${binlogFile}, pos=${binlogPos}") + // insert data after this position + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // create job with JSON binlog offset + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("Creating job with offset: ${offsetJson}") + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = '${offsetJson}' + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // wait for data synced + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobInfoBefore = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoBefore: " + jobInfoBefore) + assert jobInfoBefore.get(0).get(1) == "RUNNING" + + // Restart FE + cluster.restartFrontends() + sleep(60000) + context.reconnectFe() + + // check job is consistent after restart + def jobInfoAfter = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoAfter: " + jobInfoAfter) + assert jobInfoAfter.get(0).get(1) == "RUNNING" + assert jobInfoAfter.get(0).get(2) == jobInfoBefore.get(0).get(2) + + // insert more data and verify job still works after restart + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 3 + }) + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + } + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy new file mode 100644 index 00000000000000..2dc84e6648c984 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_special_offset", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_job_special_offset" + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // prepare PG source table + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int, + "name" varchar(100), + PRIMARY KEY ("id") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // ===== Test 1: offset = initial, verify data synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + def rows = sql """SELECT * FROM ${currentDb}.${table1} order by id""" + assert rows.size() == 2 + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 2: offset = latest, then insert new data, verify synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + // insert new data after job started with latest offset + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 1 + }) + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 3: ALTER with named mode should fail for CDC ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + test { + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = 'initial') + """ + exception "ALTER JOB for CDC only supports JSON specific offset" + } + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 4: earliest should fail for PG ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "earliest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // ===== Test 5: invalid offset format ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "not_valid" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // ===== Test 6: JSON LSN offset via ALTER, verify data synced ===== + // Step 1: Create job with initial to establish replication slot and sync snapshot + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + + // Step 2: Get current WAL LSN, then insert new data + def currentLsn = "" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def lsnResult = sql """SELECT pg_current_wal_lsn()::text""" + def lsnStr = lsnResult[0][0].toString() + // Convert PG LSN format (e.g. "0/1A3B4C0") to numeric + def parts = lsnStr.split("/") + def high = Long.parseLong(parts[0], 16) + def low = Long.parseLong(parts[1], 16) + currentLsn = String.valueOf((high << 32) + low) + log.info("Current WAL LSN: ${lsnStr} -> numeric: ${currentLsn}") + // insert new data after this LSN + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (20, 'lsn_test1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (21, 'lsn_test2')""" + } + + // Step 3: PAUSE -> ALTER with JSON LSN -> RESUME + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def lsnOffsetJson = """{"lsn":"${currentLsn}"}""" + log.info("Using JSON LSN offset: ${lsnOffsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${lsnOffsetJson}') + """ + sql "RESUME JOB where jobname = '${jobName}'" + + // Step 4: Verify new data synced + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1} WHERE id IN (20, 21)""" + return result[0][0] >= 2 + }) + def lsnRows = sql """SELECT * FROM ${currentDb}.${table1} WHERE id IN (20, 21) order by id""" + log.info("lsnRows: " + lsnRows) + assert lsnRows.size() == 2 + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // cleanup PG source table + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy new file mode 100644 index 00000000000000..5158da072f8636 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Test ALTER JOB with JSON binlog offset for cdc_stream TVF path. + * + * Scenario: + * 1. Create job with initial offset, wait for snapshot sync. + * 2. Get current binlog position, insert new data. + * 3. PAUSE -> ALTER with JSON binlog offset via PROPERTIES -> RESUME. + * 4. Verify new data synced. + */ +suite("test_streaming_job_cdc_stream_mysql_alter_offset", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_job_cdc_stream_mysql_alter_offset_name" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "cdc_stream_alter_offset_tbl" + def mysqlDb = "test_cdc_db" + def mysqlTable = "cdc_stream_alter_offset_src" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `id` int NULL, + `name` varchar(200) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare source table with snapshot data + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}""" + sql """CREATE TABLE ${mysqlDb}.${mysqlTable} ( + `id` int NOT NULL, + `name` varchar(200) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (1, 'alice'), (2, 'bob')""" + } + + // Step 1: Create job with initial offset via cdc_stream TVF + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name) + SELECT id, name FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial" + ) + """ + + // wait for snapshot sync + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + log.info("SucceedTaskCount: " + cnt) + cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + // verify snapshot data + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${dorisTable}""" + return result[0][0] >= 2 + }) + + // Step 2: Get current binlog position and insert new data + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("Binlog position for ALTER: file=${binlogFile}, pos=${binlogPos}") + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (10, 'alter_tvf_1')""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (11, 'alter_tvf_2')""" + } + + // Step 3: PAUSE -> ALTER with JSON binlog offset -> RESUME + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("ALTER TVF job offset: ${offsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${offsetJson}') + """ + + // verify currentOffset changed in show jobs + def jobInfo = sql """select currentOffset from jobs("type"="insert") where Name='${jobName}'""" + log.info("currentOffset after ALTER: " + jobInfo[0][0]) + + sql "RESUME JOB where jobname = '${jobName}'" + + // Step 4: Verify new data synced + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${dorisTable} WHERE id IN (10, 11)""" + return result[0][0] >= 2 + }) + def alterRows = sql """SELECT * FROM ${currentDb}.${dorisTable} WHERE id IN (10, 11) order by id""" + log.info("alterRows: " + alterRows) + assert alterRows.size() == 2 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + // cleanup MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}""" + } + } +}