,
+ SupportParallelism,
+ SupportColumnProjection {
+
+ static final String IDENTIFIER = "TIDB-CDC";
+
+ private TiDBSourceConfig config;
+ private final CatalogTable catalogTable;
+
+ public TiDBSource(ReadonlyConfig config, CatalogTable catalogTable) {
+
+ this.config =
+ TiDBSourceConfig.builder()
+ .startupMode(config.get(TiDBSourceOptions.STARTUP_MODE))
+ .databaseName(config.get(TiDBSourceOptions.DATABASE_NAME))
+ .tableName(config.get(TiDBSourceOptions.TABLE_NAME))
+ .tiConfiguration(TiDBSourceOptions.getTiConfiguration(config))
+ .build();
+ this.catalogTable = catalogTable;
+ }
+
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ * For consistency, an identifier should be declared as one lower case word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ @Override
+ public String getPluginName() {
+ return IDENTIFIER;
+ }
+
+ /**
+ * Get the boundedness of this source.
+ *
+ * @return the boundedness of this source.
+ */
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.UNBOUNDED;
+ }
+
+ /**
+ * Create source reader, used to produce data.
+ *
+ * @param context reader context.
+ * @return source reader.
+ * @throws Exception when create reader failed.
+ */
+ @Override
+ public SourceReader createReader(SourceReader.Context context)
+ throws Exception {
+ return new TiDBSourceReader(context, config, catalogTable);
+ }
+
+ /**
+ * Create source split enumerator, used to generate splits. This method will be called only once
+ * when start a source.
+ *
+ * @param context enumerator context.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ @Override
+ public SourceSplitEnumerator createEnumerator(
+ SourceSplitEnumerator.Context context) throws Exception {
+ return new TiDBSourceSplitEnumerator(context, config);
+ }
+
+ /**
+ * Create source split enumerator, used to generate splits. This method will be called when
+ * restore from checkpoint.
+ *
+ * @param context enumerator context.
+ * @param checkpointState checkpoint state.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ @Override
+ public SourceSplitEnumerator restoreEnumerator(
+ SourceSplitEnumerator.Context context,
+ TiDBSourceCheckpointState checkpointState)
+ throws Exception {
+ return new TiDBSourceSplitEnumerator(context, config, checkpointState);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
new file mode 100644
index 000000000000..1660dd3e0f97
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class TiDBSourceFactory implements TableSourceFactory {
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ * For consistency, an identifier should be declared as one lower case word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ @Override
+ public String factoryIdentifier() {
+ return TiDBSource.IDENTIFIER;
+ }
+
+ /**
+ * Returns the rule for options.
+ *
+ *
1. Used to verify whether the parameters configured by the user conform to the rules of
+ * the options;
+ *
+ *
2. Used for Web-UI to prompt user to configure option value;
+ */
+ @Override
+ public OptionRule optionRule() {
+ return TiDBSourceOptions.getBaseRule()
+ .required(
+ TiDBSourceOptions.DATABASE_NAME,
+ TiDBSourceOptions.TABLE_NAME,
+ TiDBSourceOptions.PD_ADDRESSES)
+ .optional(
+ TiDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY,
+ TiDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY,
+ TiDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT,
+ TiDBSourceOptions.TIKV_GRPC_TIMEOUT,
+ TiDBSourceOptions.STARTUP_MODE)
+ .build();
+ }
+
+ /**
+ * TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource,
+ * Then deprecated the method
+ */
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return TiDBSource.class;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public
+ TableSource createSource(TableSourceFactoryContext context) {
+ return () -> {
+ ReadonlyConfig config = context.getOptions();
+ TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
+ // Build tidb catalog.
+ TiDBCatalog catalog =
+ (TiDBCatalog) catalogFactory.createCatalog(factoryIdentifier(), config);
+
+ TablePath tablePath =
+ TablePath.of(
+ config.get(TiDBSourceOptions.DATABASE_NAME),
+ config.get(TiDBSourceOptions.TABLE_NAME));
+ CatalogTable catalogTable = catalog.getTable(tablePath);
+ return (SeaTunnelSource)
+ new TiDBSource(context.getOptions(), catalogTable);
+ };
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java
new file mode 100644
index 000000000000..faa174ae2ca1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
+
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+
+import org.tikv.common.TiConfiguration;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode
+public class TiDBSourceConfig {
+ private String databaseName;
+ private String tableName;
+ private StartupMode startupMode;
+ private TiConfiguration tiConfiguration;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
new file mode 100644
index 000000000000..5a0646a14aaa
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
+
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+
+import org.tikv.common.ConfigUtils;
+import org.tikv.common.TiConfiguration;
+
+import java.util.Arrays;
+
+/** TiDB source options */
+public class TiDBSourceOptions extends JdbcSourceOptions {
+
+ public static final SingleChoiceOption DATABASE_NAME =
+ (SingleChoiceOption)
+ Options.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the TiDB server to monitor.");
+
+ public static final SingleChoiceOption TABLE_NAME =
+ (SingleChoiceOption)
+ Options.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the database to monitor.");
+
+ public static final SingleChoiceOption STARTUP_MODE =
+ Options.key(SourceOptions.STARTUP_MODE_KEY)
+ .singleChoice(
+ StartupMode.class,
+ Arrays.asList(StartupMode.INITIAL, StartupMode.SPECIFIC))
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source, valid enumerations are "
+ + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");
+
+ public static final SingleChoiceOption PD_ADDRESSES =
+ (SingleChoiceOption)
+ Options.key("pd-addresses")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("TiKV cluster's PD address");
+
+ public static final SingleChoiceOption TIKV_GRPC_TIMEOUT =
+ (SingleChoiceOption)
+ Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC timeout in ms");
+
+ public static final SingleChoiceOption TIKV_GRPC_SCAN_TIMEOUT =
+ (SingleChoiceOption)
+ Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC scan timeout in ms");
+
+ public static final SingleChoiceOption TIKV_BATCH_GET_CONCURRENCY =
+ (SingleChoiceOption)
+ Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch get concurrency");
+
+ public static final SingleChoiceOption TIKV_BATCH_SCAN_CONCURRENCY =
+ (SingleChoiceOption)
+ Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch scan concurrency");
+
+ public static TiConfiguration getTiConfiguration(final ReadonlyConfig configuration) {
+ final String pdAddrsStr = configuration.get(PD_ADDRESSES);
+ final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
+ configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
+ configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
+ configuration
+ .getOptional(TIKV_BATCH_GET_CONCURRENCY)
+ .ifPresent(tiConf::setBatchGetConcurrency);
+
+ configuration
+ .getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
+ .ifPresent(tiConf::setBatchScanConcurrency);
+ return tiConf;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java
new file mode 100644
index 000000000000..1f1208917c6f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.tikv.common.meta.TiTableInfo;
+
+public interface DataConverter {
+
+ SeaTunnelRow convert(T object, TiTableInfo tableInfo, SeaTunnelRowType rowType)
+ throws Exception;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java
new file mode 100644
index 000000000000..733bb50a280b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.tikv.common.meta.TiTableInfo;
+import org.tikv.common.types.DataType;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultDataConverter implements DataConverter {
+
+ @Override
+ public SeaTunnelRow convert(Object value, TiTableInfo tableInfo, SeaTunnelRowType rowType)
+ throws Exception {
+ Object[] fields = new Object[rowType.getTotalFields()];
+ Map fromTypes = new HashMap<>();
+ tableInfo
+ .getColumns()
+ .forEach(
+ tiColumnInfo -> {
+ fromTypes.put(tiColumnInfo.getName(), tiColumnInfo.getType());
+ });
+ for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
+ SeaTunnelDataType> seaTunnelDataType = rowType.getFieldType(fieldIndex);
+ String fieldName = rowType.getFieldName(fieldIndex);
+ switch (seaTunnelDataType.getSqlType()) {
+ case NULL:
+ fields[fieldIndex] = null;
+ break;
+ case STRING:
+ fields[fieldIndex] = convertToString(value);
+ break;
+ case BOOLEAN:
+ fields[fieldIndex] = convertToBoolean(value);
+ break;
+ case TINYINT:
+ fields[fieldIndex] = Byte.parseByte(value.toString());
+ break;
+ case SMALLINT:
+ fields[fieldIndex] = Short.parseShort(value.toString());
+ break;
+ case INT:
+ fields[fieldIndex] = convertToInt(value, fromTypes.get(fieldName));
+ break;
+ case BIGINT:
+ fields[fieldIndex] = convertToLong(value);
+ break;
+ case FLOAT:
+ fields[fieldIndex] = convertToFloat(value);
+ break;
+ case DOUBLE:
+ fields[fieldIndex] = convertToDouble(value);
+ break;
+ case DECIMAL:
+ fields[fieldIndex] = createDecimalConverter(value);
+ break;
+ case DATE:
+ fields[fieldIndex] = LocalDate.class.cast(value);
+ break;
+ case TIME:
+ fields[fieldIndex] = LocalTime.class.cast(value);
+ break;
+ case TIMESTAMP:
+ fields[fieldIndex] = LocalDateTime.class.cast(value);
+ break;
+ case BYTES:
+ fields[fieldIndex] = convertToBinary(value);
+ break;
+ case ARRAY:
+ fields[fieldIndex] = convertToArray(value);
+ break;
+ case MAP:
+ case ROW:
+ default:
+ throw CommonError.unsupportedDataType(
+ "SeaTunnel", seaTunnelDataType.getSqlType().toString(), fieldName);
+ }
+ }
+ return new SeaTunnelRow(fields);
+ }
+
+ private static Object convertToBoolean(Object object) {
+ if (object instanceof Boolean) {
+ return object;
+ } else if (object instanceof Long) {
+ return (Long) object == 1;
+ } else if (object instanceof Byte) {
+ return (byte) object == 1;
+ } else if (object instanceof Short) {
+ return (short) object == 1;
+ } else {
+ return Boolean.parseBoolean(object.toString());
+ }
+ }
+
+ private static Object convertToInt(Object value, org.tikv.common.types.DataType dataType) {
+ if (value instanceof Integer) {
+ return value;
+ } else if (value instanceof Long) {
+ return dataType.isUnsigned()
+ ? Integer.valueOf(Short.toUnsignedInt(((Long) value).shortValue()))
+ : ((Long) value).intValue();
+ } else {
+ return Integer.parseInt(value.toString());
+ }
+ }
+
+ private static Object convertToLong(Object value) {
+ if (value instanceof Integer) {
+ return ((Integer) value).longValue();
+ } else if (value instanceof Long) {
+ return value;
+ } else {
+ return Long.parseLong(value.toString());
+ }
+ }
+
+ private static Object convertToDouble(Object value) {
+ if (value instanceof Float) {
+ return ((Float) value).doubleValue();
+ } else if (value instanceof Double) {
+ return value;
+ } else {
+ return Double.parseDouble(value.toString());
+ }
+ }
+
+ private static Object convertToFloat(Object value) {
+ if (value instanceof Float) {
+ return value;
+ } else if (value instanceof Double) {
+ return ((Double) value).floatValue();
+ } else {
+ return Float.parseFloat(value.toString());
+ }
+ }
+
+ private static Object createDecimalConverter(Object value) {
+ BigDecimal result;
+ if (value instanceof String) {
+ result = new BigDecimal((String) value);
+ } else if (value instanceof Long) {
+ result = new BigDecimal((String) value);
+ } else if (value instanceof Double) {
+ result = BigDecimal.valueOf((Double) value);
+ } else if (value instanceof BigDecimal) {
+ result = (BigDecimal) value;
+ } else {
+ throw new IllegalArgumentException(
+ "Unable to convert to decimal from unexpected value '"
+ + value
+ + "' of type "
+ + value.getClass());
+ }
+ return result;
+ }
+
+ public Object[] convertToArray(Object value) throws SQLException {
+ String[] array = ((String) value).split(",");
+ if (array == null) {
+ return null;
+ }
+ return array;
+ }
+
+ private static Object convertToBinary(Object value) {
+ if (value instanceof byte[]) {
+ return value;
+ } else if (value instanceof String) {
+ return ((String) value).getBytes();
+ } else if (value instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported BYTES value type: " + value.getClass().getSimpleName());
+ }
+ }
+
+ private static Object convertToString(Object value) {
+ if (value instanceof byte[]) {
+ return new String((byte[]) value);
+ }
+ return value;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java
new file mode 100644
index 000000000000..77654239d1e5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.tikv.common.meta.TiTableInfo;
+
+public abstract class AbstractSeaTunnelRowDeserializer {
+ protected final TiTableInfo tableInfo;
+ protected final SeaTunnelRowType rowType;
+ protected final CatalogTable catalogTable;
+
+ protected AbstractSeaTunnelRowDeserializer(TiTableInfo tableInfo, CatalogTable catalogTable) {
+ this.tableInfo = tableInfo;
+ this.rowType = catalogTable.getTableSchema().toPhysicalRowDataType();
+ this.catalogTable = catalogTable;
+ }
+
+ abstract void deserialize(Input record, Collector output) throws Exception;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java
new file mode 100644
index 000000000000..06616b6c4ff6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DataConverter;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DefaultDataConverter;
+
+import org.tikv.common.key.RowKey;
+import org.tikv.common.meta.TiTableInfo;
+import org.tikv.kvproto.Kvrpcpb;
+
+import static org.tikv.common.codec.TableCodec.decodeObjects;
+
+/** Deserialize snapshot data */
+public class SeaTunnelRowSnapshotRecordDeserializer
+ extends AbstractSeaTunnelRowDeserializer {
+
+ private final DataConverter converter;
+
+ public SeaTunnelRowSnapshotRecordDeserializer(
+ TiTableInfo tableInfo, CatalogTable catalogTable) {
+ super(tableInfo, catalogTable);
+ this.converter = new DefaultDataConverter();
+ }
+
+ @Override
+ public void deserialize(Kvrpcpb.KvPair record, Collector output)
+ throws Exception {
+ Object[] values =
+ decodeObjects(
+ record.getValue().toByteArray(),
+ RowKey.decode(record.getKey().toByteArray()).getHandle(),
+ tableInfo);
+ SeaTunnelRow row = converter.convert(values, tableInfo, rowType);
+ output.collect(row);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java
new file mode 100644
index 000000000000..5e286e591e27
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DataConverter;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DefaultDataConverter;
+
+import org.tikv.common.key.RowKey;
+import org.tikv.common.meta.TiTableInfo;
+import org.tikv.kvproto.Cdcpb;
+
+import static org.tikv.common.codec.TableCodec.decodeObjects;
+
+public class SeaTunnelRowStreamingRecordDeserializer
+ extends AbstractSeaTunnelRowDeserializer {
+
+ private final DataConverter converter;
+
+ public SeaTunnelRowStreamingRecordDeserializer(
+ TiTableInfo tableInfo, CatalogTable catalogTable) {
+ super(tableInfo, catalogTable);
+ converter = new DefaultDataConverter();
+ }
+
+ @Override
+ public void deserialize(Cdcpb.Event.Row row, Collector output) throws Exception {
+
+ final RowKey rowKey = RowKey.decode(row.getKey().toByteArray());
+ final long handle = rowKey.getHandle();
+ Object[] values;
+ switch (row.getOpType()) {
+ case DELETE:
+ values = decodeObjects(row.getOldValue().toByteArray(), handle, tableInfo);
+ SeaTunnelRow record = converter.convert(values, tableInfo, rowType);
+ record.setRowKind(RowKind.DELETE);
+ output.collect(record);
+ break;
+ case PUT:
+ try {
+ values =
+ decodeObjects(
+ row.getValue().toByteArray(),
+ RowKey.decode(row.getKey().toByteArray()).getHandle(),
+ tableInfo);
+ if (row.getOldValue() == null || row.getOldValue().isEmpty()) {
+ SeaTunnelRow insert = converter.convert(values, tableInfo, rowType);
+ insert.setRowKind(RowKind.INSERT);
+ output.collect(insert);
+ } else {
+ SeaTunnelRow update = converter.convert(values, tableInfo, rowType);
+ update.setRowKind(RowKind.UPDATE_AFTER);
+ output.collect(update);
+ }
+ break;
+ } catch (final RuntimeException e) {
+ throw new RuntimeException(
+ String.format(
+ "Fail to deserialize row: %s, table: %s",
+ row, tableInfo.getId()),
+ e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown Row Op Type: " + row.getOpType());
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java
new file mode 100644
index 000000000000..94b1b3a41624
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@ToString
+public class TiDBSourceCheckpointState implements Serializable {
+ private Map assignedSplit;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
new file mode 100644
index 000000000000..bc6c1b7602d9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;
+
+import org.tikv.common.TiSession;
+import org.tikv.kvproto.Coprocessor;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class TiDBSourceSplitEnumerator
+ implements SourceSplitEnumerator {
+
+ private final TiDBSourceConfig sourceConfig;
+ private final Map assignedSplit;
+ private final Map pendingSplit;
+ private final Context context;
+ private TiSession tiSession;
+ private long tableId;
+
+ public TiDBSourceSplitEnumerator(
+ @NonNull Context context, @NonNull TiDBSourceConfig dorisConfig) {
+ this(context, dorisConfig, null);
+ }
+
+ public TiDBSourceSplitEnumerator(
+ @NonNull Context context,
+ @NonNull TiDBSourceConfig sourceConfig,
+ TiDBSourceCheckpointState restoreState) {
+ this.context = context;
+ this.sourceConfig = sourceConfig;
+ this.assignedSplit = new HashMap<>();
+ this.pendingSplit = new HashMap<>();
+ if (restoreState != null) {
+ this.assignedSplit.putAll(restoreState.getAssignedSplit());
+ }
+ }
+
+ @Override
+ public void open() {
+ this.tiSession = TiSession.create(sourceConfig.getTiConfiguration());
+ this.tableId =
+ this.tiSession
+ .getCatalog()
+ .getTable(sourceConfig.getDatabaseName(), sourceConfig.getTableName())
+ .getId();
+ }
+
+ /** The method is executed by the engine only once. */
+ @Override
+ public void run() throws Exception {
+ Set readers = context.registeredReaders();
+ addPendingSplit(getTiDBSourceSplit());
+ fetchAssignedSplit();
+ assignSplit(readers);
+ log.debug(
+ "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers);
+ readers.forEach(context::signalNoMoreSplits);
+ }
+
+ private void fetchAssignedSplit() {
+ for (Map.Entry split : pendingSplit.entrySet()) {
+ if (assignedSplit.containsKey(split.getKey())) {
+ // override split
+ pendingSplit.put(split.getKey(), split.getValue());
+ }
+ }
+ }
+
+ private synchronized void addPendingSplit(List splits) {
+ splits.forEach(
+ split -> {
+ pendingSplit.put(
+ getSplitOwner(split.splitId(), context.currentParallelism()), split);
+ });
+ }
+
+ private void assignSplit(Collection readers) {
+ for (Integer reader : readers) {
+ final TiDBSourceSplit assignmentForReader = pendingSplit.remove(reader);
+ if (assignmentForReader != null) {
+ log.debug("Assign splits {} to reader {}", assignmentForReader, reader);
+ context.assignSplit(reader, assignmentForReader);
+ }
+ }
+ }
+
+ private static int getSplitOwner(String splitId, int numReaders) {
+ return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ private List getTiDBSourceSplit() {
+ List sourceSplits = Lists.newArrayList();
+ List keyRanges =
+ TableKeyRangeUtils.getTableKeyRanges(this.tableId, context.currentParallelism());
+ for (Coprocessor.KeyRange keyRange : keyRanges) {
+ sourceSplits.add(
+ new TiDBSourceSplit(
+ sourceConfig.getDatabaseName(),
+ sourceConfig.getTableName(),
+ keyRange,
+ sourceConfig.getStartupMode() == StartupMode.INITIAL ? -1 : 0,
+ keyRange.getStart(),
+ false));
+ }
+ return sourceSplits;
+ }
+
+ /**
+ * Called to close the enumerator, in case it holds on to any resources, like threads or network
+ * connections.
+ */
+ @Override
+ public void close() throws IOException {
+ if (this.tiSession != null) {
+ try {
+ this.tiSession.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Add a split back to the split enumerator. It will only happen when a {@link SourceReader}
+ * fails and there are splits assigned to it after the last successful checkpoint.
+ *
+ * @param splits The split to add back to the enumerator for reassignment.
+ * @param subtaskId The id of the subtask to which the returned splits belong.
+ */
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+ log.debug("Add back splits {} to TiDBSourceSplitEnumerator.", splits);
+ if (!splits.isEmpty()) {
+ addPendingSplit(splits);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {} are not assigned.",
+ subtaskId,
+ splits);
+ }
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+
+ @Override
+ public void registerReader(int subtaskId) {
+ log.debug("Register reader {} to TiDBSourceSplitEnumerator.", subtaskId);
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ /**
+ * If the source is bounded, checkpoint is not triggered.
+ *
+ * @param checkpointId
+ */
+ @Override
+ public TiDBSourceCheckpointState snapshotState(long checkpointId) throws Exception {
+ return new TiDBSourceCheckpointState(pendingSplit);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // do nothing
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java
new file mode 100644
index 000000000000..84dbcff57eda
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java
@@ -0,0 +1,57 @@
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader;
+
+import org.tikv.common.key.RowKey;
+import org.tikv.kvproto.Cdcpb;
+
+import lombok.Data;
+
+import java.util.Objects;
+
+@Data
+public class RowKeyWithTs implements Comparable {
+ private final long timestamp;
+ private final RowKey rowKey;
+
+ private RowKeyWithTs(final long timestamp, final RowKey rowKey) {
+ this.timestamp = timestamp;
+ this.rowKey = rowKey;
+ }
+
+ private RowKeyWithTs(final long timestamp, final byte[] key) {
+ this(timestamp, RowKey.decode(key));
+ }
+
+ @Override
+ public int compareTo(final RowKeyWithTs that) {
+ int res = Long.compare(this.timestamp, that.timestamp);
+ if (res == 0) {
+ res = Long.compare(this.rowKey.getTableId(), that.rowKey.getTableId());
+ }
+ if (res == 0) {
+ res = Long.compare(this.rowKey.getHandle(), that.rowKey.getHandle());
+ }
+ return res;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.timestamp, this.rowKey.getTableId(), this.rowKey.getHandle());
+ }
+
+ @Override
+ public boolean equals(final Object thatObj) {
+ if (thatObj instanceof RowKeyWithTs) {
+ final RowKeyWithTs that = (RowKeyWithTs) thatObj;
+ return this.timestamp == that.timestamp && this.rowKey.equals(that.rowKey);
+ }
+ return false;
+ }
+
+ static RowKeyWithTs ofStart(final Cdcpb.Event.Row row) {
+ return new RowKeyWithTs(row.getStartTs(), row.getKey().toByteArray());
+ }
+
+ static RowKeyWithTs ofCommit(final Cdcpb.Event.Row row) {
+ return new RowKeyWithTs(row.getCommitTs(), row.getKey().toByteArray());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java
new file mode 100644
index 000000000000..c2902cef8185
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowSnapshotRecordDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;
+
+import org.tikv.cdc.CDCClient;
+import org.tikv.common.TiSession;
+import org.tikv.common.key.RowKey;
+import org.tikv.common.meta.TiTableInfo;
+import org.tikv.kvproto.Cdcpb;
+import org.tikv.kvproto.Coprocessor;
+import org.tikv.kvproto.Kvrpcpb;
+import org.tikv.shade.com.google.protobuf.ByteString;
+import org.tikv.txn.KVClient;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+public class TiDBSourceReader implements SourceReader {
+
+ private static final long STREAMING_VERSION_START_EPOCH = 0L;
+ private final SourceReader.Context context;
+ private final TiDBSourceConfig config;
+ private final Deque sourceSplits;
+ private final Set snapshotCompleted;
+
+ private final Map cacheCDCClient;
+ private final Map resolvedTsStates;
+ private final Map snapshotOffsets;
+
+ private SeaTunnelRowSnapshotRecordDeserializer snapshotRecordDeserializer;
+ private SeaTunnelRowStreamingRecordDeserializer streamingRecordDeserializer;
+
+ /** Task local variables. */
+ private transient TiSession session = null;
+
+ private transient TreeMap prewrites = null;
+ private transient TreeMap commits = null;
+ private transient BlockingQueue committedEvents = null;
+
+ private CatalogTable catalogTable;
+
+ public TiDBSourceReader(Context context, TiDBSourceConfig config, CatalogTable catalogTable) {
+ this.context = context;
+ this.config = config;
+ this.sourceSplits = new LinkedList<>();
+ this.snapshotCompleted = new HashSet<>();
+
+ this.cacheCDCClient = new HashMap<>();
+ this.resolvedTsStates = new HashMap<>();
+ this.snapshotOffsets = new HashMap<>();
+
+ this.prewrites = new TreeMap<>();
+ this.commits = new TreeMap<>();
+ // cdc event will lose if pull cdc event block when region split
+ // use queue to separate read and write to ensure pull event unblock.
+ // since sink jdbc is slow, 5000W queue size may be safe size.
+ this.committedEvents = new LinkedBlockingQueue<>();
+
+ this.catalogTable = catalogTable;
+ }
+
+ /** Open the source reader. */
+ @Override
+ public void open() throws Exception {
+ this.session = TiSession.create(config.getTiConfiguration());
+ TiTableInfo tableInfo =
+ session.getCatalog().getTable(config.getDatabaseName(), config.getTableName());
+ this.snapshotRecordDeserializer =
+ new SeaTunnelRowSnapshotRecordDeserializer(tableInfo, catalogTable);
+ this.streamingRecordDeserializer =
+ new SeaTunnelRowStreamingRecordDeserializer(tableInfo, catalogTable);
+ }
+
+ /**
+ * Called to close the reader, in case it holds on to any resources, like threads or network
+ * connections.
+ */
+ @Override
+ public void close() throws IOException {
+ if (this.session != null) {
+ try {
+ this.session.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Generate the next batch of records.
+ *
+ * @param output output collector.
+ * @throws Exception if error occurs.
+ */
+ @Override
+ public void pollNext(Collector output) throws Exception {
+ if (config.getStartupMode() == StartupMode.INITIAL) {
+ for (TiDBSourceSplit sourceSplit : sourceSplits) {
+ if (!sourceSplit.isSnapshotCompleted()) {
+ snapshotEvents(sourceSplit, output);
+ }
+ // completed snapshot
+ snapshotCompleted.add(sourceSplit);
+ }
+ }
+ for (TiDBSourceSplit sourceSplit : sourceSplits) {
+ captureStreamingEvents(sourceSplit, output);
+ }
+ }
+
+ protected void snapshotEvents(TiDBSourceSplit split, Collector output)
+ throws Exception {
+ log.info("read snapshot events");
+ Coprocessor.KeyRange keyRange = split.getKeyRange();
+ long resolvedTs = split.getResolvedTs();
+ try (KVClient scanClient = session.createKVClient()) {
+ long startTs = session.getTimestamp().getVersion();
+ ByteString start = split.getSnapshotStart();
+ while (true) {
+ final List segment =
+ scanClient.scan(start, keyRange.getEnd(), startTs);
+ if (segment.isEmpty()) {
+ resolvedTs = startTs;
+ resolvedTsStates.put(split, resolvedTs);
+ break;
+ }
+ for (final Kvrpcpb.KvPair pair : segment) {
+ if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) {
+ snapshotRecordDeserializer.deserialize(pair, output);
+ }
+ }
+ start =
+ RowKey.toRawKey(segment.get(segment.size() - 1).getKey())
+ .next()
+ .toByteString();
+ snapshotOffsets.put(split, start);
+ }
+ }
+ }
+
+ protected void captureStreamingEvents(TiDBSourceSplit split, Collector output)
+ throws Exception {
+ long resolvedTs = resolvedTsStates.get(split);
+ if (resolvedTs >= STREAMING_VERSION_START_EPOCH) {
+ log.info("Capture streaming event from resolvedTs:{}", resolvedTs);
+ long finalResolvedTs = resolvedTs;
+ CDCClient cdcClient =
+ cacheCDCClient.computeIfAbsent(
+ split,
+ k -> {
+ CDCClient client = new CDCClient(session, k.getKeyRange());
+ client.start(finalResolvedTs);
+ return client;
+ });
+ final Cdcpb.Event.Row row = cdcClient.get();
+ if (row == null) {
+ return;
+ }
+ handleRow(row);
+ resolvedTs = cdcClient.getMaxResolvedTs();
+ if (commits.size() > 0) {
+ flushRows(resolvedTs);
+ }
+ }
+ // ouput data
+ while (!committedEvents.isEmpty()) {
+ Cdcpb.Event.Row row = committedEvents.take();
+ this.streamingRecordDeserializer.deserialize(row, output);
+ }
+ }
+
+ /**
+ * Get the current split checkpoint state by checkpointId.
+ *
+ * If the source is bounded, checkpoint is not triggered.
+ *
+ * @param checkpointId checkpoint Id.
+ * @return split checkpoint state.
+ * @throws Exception if error occurs.
+ */
+ @Override
+ public List snapshotState(long checkpointId) throws Exception {
+ for (TiDBSourceSplit sourceSplit : sourceSplits) {
+ if (resolvedTsStates.containsKey(sourceSplit)) {
+ sourceSplit.setResolvedTs(resolvedTsStates.get(sourceSplit));
+ }
+ if (snapshotCompleted.contains(sourceSplit)) {
+ sourceSplit.setSnapshotCompleted(true);
+ }
+ if (snapshotOffsets.containsKey(sourceSplit)) {
+ sourceSplit.setSnapshotStart(snapshotOffsets.get(sourceSplit));
+ }
+ }
+ return new ArrayList<>(sourceSplits);
+ }
+
+ /**
+ * Add the split checkpoint state to reader.
+ *
+ * @param splits split checkpoint state.
+ */
+ @Override
+ public void addSplits(List splits) {
+ for (TiDBSourceSplit split : splits) {
+ this.resolvedTsStates.put(split, split.getResolvedTs());
+ this.snapshotOffsets.put(split, split.getSnapshotStart());
+ }
+ sourceSplits.addAll(splits);
+ }
+
+ /**
+ * This method is called when the reader is notified that it will not receive any further
+ * splits.
+ *
+ * It is triggered when the enumerator calls {@link
+ * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask.
+ */
+ @Override
+ public void handleNoMoreSplits() {}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+
+ private void handleRow(final Cdcpb.Event.Row row) {
+ if (!TableKeyRangeUtils.isRecordKey(row.getKey().toByteArray())) {
+ // Don't handle index key for now
+ return;
+ }
+ log.debug("binlog record, type: {}, data: {}", row.getType(), row);
+ switch (row.getType()) {
+ case COMMITTED:
+ prewrites.put(RowKeyWithTs.ofStart(row), row);
+ commits.put(RowKeyWithTs.ofCommit(row), row);
+ break;
+ case COMMIT:
+ commits.put(RowKeyWithTs.ofCommit(row), row);
+ break;
+ case PREWRITE:
+ prewrites.put(RowKeyWithTs.ofStart(row), row);
+ break;
+ case ROLLBACK:
+ prewrites.remove(RowKeyWithTs.ofStart(row));
+ break;
+ default:
+ log.warn("Unsupported row type:" + row.getType());
+ }
+ }
+
+ protected void flushRows(final long resolvedTs) throws Exception {
+ while (!commits.isEmpty() && commits.firstKey().getTimestamp() <= resolvedTs) {
+ final Cdcpb.Event.Row commitRow = commits.pollFirstEntry().getValue();
+ final Cdcpb.Event.Row prewriteRow = prewrites.remove(RowKeyWithTs.ofStart(commitRow));
+ // if pull cdc event block when region split, cdc event will lose.
+ committedEvents.offer(prewriteRow);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java
new file mode 100644
index 000000000000..91a202c2d2fb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import org.tikv.kvproto.Coprocessor;
+import org.tikv.shade.com.google.protobuf.ByteString;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class TiDBSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = -9043797960947110643L;
+ private String database;
+ private String table;
+ private Coprocessor.KeyRange keyRange;
+ private long resolvedTs;
+ private ByteString snapshotStart;
+ private boolean snapshotCompleted;
+
+ /**
+ * Get the split id of this source split.
+ *
+ * @return id of this source split.
+ */
+ @Override
+ public String splitId() {
+ return String.format(
+ "%s:%s:%s-%s", database, table, keyRange.getStart(), keyRange.getEnd());
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "TiDBSourceSplit: %s.%s,start=%s,end=%s",
+ getDatabase(), getTable(), getKeyRange().getStart(), getKeyRange().getEnd());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java
new file mode 100644
index 000000000000..4e9602566713
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java
@@ -0,0 +1,35 @@
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
+
+import org.tikv.common.TiSession;
+import org.tikv.common.meta.TiDBInfo;
+import org.tikv.common.meta.TiTableInfo;
+
+import io.debezium.relational.TableId;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TableDiscoveryUtils {
+
+ public static List listTables(TiSession tiSession, TiDBSourceConfig sourceConfig) {
+ List databases =
+ tiSession.getCatalog().listDatabases().stream()
+ .filter(
+ tiDBInfo ->
+ tiDBInfo.getName().equals(sourceConfig.getDatabaseName()))
+ .collect(Collectors.toList());
+ List tableIds = Lists.newArrayList();
+ for (TiDBInfo tiDBInfo : databases) {
+ for (TiTableInfo tiTableInfo : tiDBInfo.getTables()) {
+ if (tiTableInfo.getName().equals(sourceConfig.getTableName())) {
+ tableIds.add(new TableId(tiDBInfo.getName(), null, tiTableInfo.getName()));
+ }
+ }
+ }
+ return tableIds;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java
new file mode 100644
index 000000000000..5db48e272e01
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
+import org.tikv.common.key.RowKey;
+import org.tikv.common.util.KeyRangeUtils;
+import org.tikv.kvproto.Coprocessor.KeyRange;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/** Utils to obtain the keyRange of table. */
+public class TableKeyRangeUtils {
+ public static KeyRange getTableKeyRange(final long tableId) {
+ return KeyRangeUtils.makeCoprocRange(
+ RowKey.createMin(tableId).toByteString(),
+ RowKey.createBeyondMax(tableId).toByteString());
+ }
+
+ public static List getTableKeyRanges(final long tableId, final int num) {
+ Preconditions.checkArgument(num > 0, "Illegal value of num");
+
+ if (num == 1) {
+ return ImmutableList.of(getTableKeyRange(tableId));
+ }
+
+ final long delta =
+ BigInteger.valueOf(Long.MAX_VALUE)
+ .subtract(BigInteger.valueOf(Long.MIN_VALUE + 1))
+ .divide(BigInteger.valueOf(num))
+ .longValueExact();
+ final ImmutableList.Builder builder = ImmutableList.builder();
+ for (int i = 0; i < num; i++) {
+ final RowKey startKey =
+ (i == 0)
+ ? RowKey.createMin(tableId)
+ : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * i);
+ final RowKey endKey =
+ (i == num - 1)
+ ? RowKey.createBeyondMax(tableId)
+ : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * (i + 1));
+ builder.add(
+ KeyRangeUtils.makeCoprocRange(startKey.toByteString(), endKey.toByteString()));
+ }
+ return builder.build();
+ }
+
+ public static KeyRange getTableKeyRange(final long tableId, final int num, final int idx) {
+ Preconditions.checkArgument(idx >= 0 && idx < num, "Illegal value of idx");
+ return getTableKeyRanges(tableId, num).get(idx);
+ }
+
+ public static boolean isRecordKey(final byte[] key) {
+ return key[9] == '_' && key[10] == 'r';
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java
new file mode 100644
index 000000000000..b7e663361bdf
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.cdc;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+import org.apache.seatunnel.shade.com.google.common.collect.Range;
+import org.apache.seatunnel.shade.com.google.common.collect.TreeMultiset;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.TiSession;
+import org.tikv.common.key.Key;
+import org.tikv.common.region.TiRegion;
+import org.tikv.common.util.RangeSplitter;
+import org.tikv.common.util.RangeSplitter.RegionTask;
+import org.tikv.kvproto.Cdcpb.Event.Row;
+import org.tikv.kvproto.Coprocessor.KeyRange;
+import org.tikv.kvproto.Kvrpcpb;
+import org.tikv.shade.io.grpc.ManagedChannel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
+
+public class CDCClient implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class);
+
+ private final TiSession session;
+ private final KeyRange keyRange;
+ private final CDCConfig config;
+
+ private final BlockingQueue eventsBuffer;
+ private final ConcurrentHashMap regionClients =
+ new ConcurrentHashMap<>();
+ private final Map regionToResolvedTs = new HashMap<>();
+ private final TreeMultiset resolvedTsSet = TreeMultiset.create();
+
+ private boolean started = false;
+
+ private Consumer eventConsumer;
+
+ public CDCClient(final TiSession session, final KeyRange keyRange) {
+ this(session, keyRange, new CDCConfig());
+ }
+
+ public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) {
+ Preconditions.checkState(
+ session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI),
+ "Unsupported Isolation Level"); // only support SI for now
+ this.session = session;
+ this.keyRange = keyRange;
+ this.config = config;
+ eventsBuffer = new LinkedBlockingQueue<>(config.getEventBufferSize());
+ // fix: use queue.put() instead of queue.offer(), otherwise will lose event
+ eventConsumer =
+ (event) -> {
+ // try 2 times offer.
+ for (int i = 0; i < 2; i++) {
+ if (eventsBuffer.offer(event)) {
+ return;
+ }
+ }
+ // else use put.
+ try {
+ eventsBuffer.put(event);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ };
+ }
+
+ public synchronized void start(final long startTs) {
+ Preconditions.checkState(!started, "Client is already started");
+ applyKeyRange(keyRange, startTs);
+ started = true;
+ }
+
+ public synchronized Row get() throws InterruptedException {
+ final CDCEvent event = eventsBuffer.poll();
+ if (event != null) {
+ switch (event.eventType) {
+ case ROW:
+ return event.row;
+ case RESOLVED_TS:
+ handleResolvedTs(event.regionId, event.resolvedTs);
+ break;
+ case ERROR:
+ handleErrorEvent(event.regionId, event.error, event.resolvedTs);
+ break;
+ }
+ }
+ return null;
+ }
+
+ public synchronized long getMinResolvedTs() {
+ return resolvedTsSet.firstEntry().getElement();
+ }
+
+ public synchronized long getMaxResolvedTs() {
+ return resolvedTsSet.lastEntry().getElement();
+ }
+
+ public synchronized void close() {
+ removeRegions(regionClients.keySet());
+ }
+
+ private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) {
+ final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager());
+
+ final Iterator newRegionsIterator =
+ splitter.splitRangeByRegion(Arrays.asList(keyRange)).stream()
+ .map(RegionTask::getRegion)
+ .sorted((a, b) -> Long.compare(a.getId(), b.getId()))
+ .iterator();
+ final Iterator oldRegionsIterator = regionClients.values().iterator();
+
+ final ArrayList regionsToAdd = new ArrayList<>();
+ final ArrayList regionsToRemove = new ArrayList<>();
+
+ TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
+ RegionCDCClient oldRegionClient =
+ oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
+
+ while (newRegion != null && oldRegionClient != null) {
+ if (newRegion.getId() == oldRegionClient.getRegion().getId()) {
+ // check if should refresh region
+ if (!oldRegionClient.isRunning()) {
+ regionsToRemove.add(newRegion.getId());
+ regionsToAdd.add(newRegion);
+ }
+
+ newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
+ oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
+ } else if (newRegion.getId() < oldRegionClient.getRegion().getId()) {
+ regionsToAdd.add(newRegion);
+ newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
+ } else {
+ regionsToRemove.add(oldRegionClient.getRegion().getId());
+ oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
+ }
+ }
+
+ while (newRegion != null) {
+ regionsToAdd.add(newRegion);
+ newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
+ }
+
+ while (oldRegionClient != null) {
+ regionsToRemove.add(oldRegionClient.getRegion().getId());
+ oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
+ }
+
+ removeRegions(regionsToRemove);
+ addRegions(regionsToAdd, timestamp);
+ LOGGER.info("keyRange applied");
+ }
+
+ private synchronized void addRegions(final Iterable regions, final long timestamp) {
+ LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp);
+ for (final TiRegion region : regions) {
+ if (overlapWithRegion(region)) {
+ final String address =
+ session.getRegionManager()
+ .getStoreById(region.getLeader().getStoreId())
+ .getStore()
+ .getAddress();
+ final ManagedChannel channel =
+ session.getChannelFactory()
+ .getChannel(address, session.getPDClient().getHostMapping());
+ try {
+ final RegionCDCClient client =
+ new RegionCDCClient(region, keyRange, channel, eventConsumer, config);
+ regionClients.put(region.getId(), client);
+ regionToResolvedTs.put(region.getId(), timestamp);
+ resolvedTsSet.add(timestamp);
+ client.start(timestamp);
+ } catch (final Exception e) {
+ LOGGER.error(
+ "failed to add region(regionId: {}, reason: {})", region.getId(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private synchronized void removeRegions(final Iterable regionIds) {
+ LOGGER.info("remove regions: {}", regionIds);
+ for (final long regionId : regionIds) {
+ final RegionCDCClient regionClient = regionClients.remove(regionId);
+ if (regionClient != null) {
+ try {
+ regionClient.close();
+ } catch (final Exception e) {
+ LOGGER.error(
+ "failed to close region client, region id: {}, error: {}", regionId, e);
+ } finally {
+ resolvedTsSet.remove(regionToResolvedTs.remove(regionId));
+ regionToResolvedTs.remove(regionId);
+ }
+ }
+ }
+ }
+
+ private boolean overlapWithRegion(final TiRegion region) {
+ final Range regionRange =
+ Range.closedOpen(
+ Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey()));
+ final Range clientRange =
+ Range.closedOpen(
+ Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd()));
+ final Range intersection = regionRange.intersection(clientRange);
+ return !intersection.isEmpty();
+ }
+
+ private void handleResolvedTs(final long regionId, final long resolvedTs) {
+ LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId);
+ resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs));
+ resolvedTsSet.add(resolvedTs);
+ }
+
+ public void handleErrorEvent(final long regionId, final Throwable error, long resolvedTs) {
+ LOGGER.info("handle error: {}, regionId: {}", error, regionId);
+ final TiRegion region = regionClients.get(regionId).getRegion();
+ session.getRegionManager()
+ .onRequestFail(region); // invalidate cache for corresponding region
+
+ removeRegions(Arrays.asList(regionId));
+ applyKeyRange(keyRange, resolvedTs); // reapply the whole keyRange
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java
new file mode 100644
index 000000000000..8bccc47314ac
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.cdc;
+
+import org.tikv.kvproto.Cdcpb.Event.Row;
+
+class CDCEvent {
+ enum CDCEventType {
+ ROW,
+ RESOLVED_TS,
+ ERROR
+ }
+
+ public final long regionId;
+
+ public final CDCEventType eventType;
+
+ public final long resolvedTs;
+
+ public final Row row;
+
+ public final Throwable error;
+
+ private CDCEvent(
+ final long regionId,
+ final CDCEventType eventType,
+ final long resolvedTs,
+ final Row row,
+ final Throwable error) {
+ this.regionId = regionId;
+ this.eventType = eventType;
+ this.resolvedTs = resolvedTs;
+ this.row = row;
+ this.error = error;
+ }
+
+ public static CDCEvent rowEvent(final long regionId, final Row row) {
+ return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null);
+ }
+
+ public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) {
+ return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null);
+ }
+
+ public static CDCEvent error(final long regionId, final Throwable error) {
+ return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error);
+ }
+
+ // add new CDCEvent constructor
+ public static CDCEvent error(final long regionId, final Throwable error, long resolvedTs) {
+ return new CDCEvent(regionId, CDCEventType.ERROR, resolvedTs, null, error);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("CDCEvent[").append(eventType.toString()).append("] {");
+ switch (eventType) {
+ case ERROR:
+ builder.append("error=").append(error.getMessage());
+ break;
+ case RESOLVED_TS:
+ builder.append("resolvedTs=").append(resolvedTs);
+ break;
+ case ROW:
+ builder.append("row=").append(row);
+ break;
+ }
+ return builder.append("}").toString();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java
new file mode 100644
index 000000000000..fd2a9918eeab
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.cdc;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.region.TiRegion;
+import org.tikv.common.util.FastByteComparisons;
+import org.tikv.common.util.KeyRangeUtils;
+import org.tikv.kvproto.Cdcpb;
+import org.tikv.kvproto.Cdcpb.ChangeDataEvent;
+import org.tikv.kvproto.Cdcpb.ChangeDataRequest;
+import org.tikv.kvproto.Cdcpb.Event.LogType;
+import org.tikv.kvproto.Cdcpb.Event.Row;
+import org.tikv.kvproto.Cdcpb.Header;
+import org.tikv.kvproto.Cdcpb.ResolvedTs;
+import org.tikv.kvproto.ChangeDataGrpc;
+import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub;
+import org.tikv.kvproto.Coprocessor.KeyRange;
+import org.tikv.shade.io.grpc.ManagedChannel;
+import org.tikv.shade.io.grpc.stub.StreamObserver;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class RegionCDCClient implements AutoCloseable, StreamObserver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class);
+ private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0);
+ private static final Set ALLOWED_LOGTYPE =
+ ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK);
+
+ private TiRegion region;
+ private final KeyRange keyRange;
+ private final KeyRange regionKeyRange;
+ private final ManagedChannel channel;
+ private final ChangeDataStub asyncStub;
+ private final Consumer eventConsumer;
+ private final CDCConfig config;
+ private final Predicate rowFilter;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private final boolean started = false;
+
+ private long resolvedTs = 0;
+
+ public RegionCDCClient(
+ final TiRegion region,
+ final KeyRange keyRange,
+ final ManagedChannel channel,
+ final Consumer eventConsumer,
+ final CDCConfig config) {
+ this.region = region;
+ this.keyRange = keyRange;
+ this.channel = channel;
+ this.asyncStub = ChangeDataGrpc.newStub(channel);
+ this.eventConsumer = eventConsumer;
+ this.config = config;
+
+ this.regionKeyRange =
+ KeyRange.newBuilder()
+ .setStart(region.getStartKey())
+ .setEnd(region.getEndKey())
+ .build();
+
+ this.rowFilter =
+ regionEnclosed()
+ ? ((row) -> true)
+ : new Predicate() {
+ final byte[] buffer = new byte[config.getMaxRowKeySize()];
+
+ final byte[] start = keyRange.getStart().toByteArray();
+ final byte[] end = keyRange.getEnd().toByteArray();
+
+ @Override
+ public boolean test(final Row row) {
+ final int len = row.getKey().size();
+ row.getKey().copyTo(buffer, 0);
+ return (FastByteComparisons.compareTo(
+ buffer, 0, len, start, 0, start.length)
+ >= 0)
+ && (FastByteComparisons.compareTo(
+ buffer, 0, len, end, 0, end.length)
+ < 0);
+ }
+ };
+ }
+
+ public synchronized void start(final long startTs) {
+ Preconditions.checkState(!started, "RegionCDCClient has already started");
+ resolvedTs = startTs;
+ running.set(true);
+ LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get());
+ final ChangeDataRequest request =
+ ChangeDataRequest.newBuilder()
+ .setRequestId(REQ_ID_COUNTER.incrementAndGet())
+ .setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build())
+ .setRegionId(region.getId())
+ .setCheckpointTs(startTs)
+ .setStartKey(keyRange.getStart())
+ .setEndKey(keyRange.getEnd())
+ .setRegionEpoch(region.getRegionEpoch())
+ .setExtraOp(config.getExtraOp())
+ .build();
+ final StreamObserver requestObserver = asyncStub.eventFeed(this);
+ HashMap params = new HashMap<>();
+ params.put("requestId", request.getRequestId());
+ params.put("header", request.getHeader());
+ params.put("regionId", request.getRegionId());
+ params.put("checkpointTs", request.getCheckpointTs());
+ params.put("startKey", request.getStartKey().toString());
+ params.put("endKey", request.getEndKey().toString());
+ params.put("regionEpoch", request.getRegionEpoch());
+ params.put("extraOp", request.getExtraOp());
+ requestObserver.onNext(request);
+ }
+
+ public TiRegion getRegion() {
+ return region;
+ }
+
+ public void setRegion(TiRegion region) {
+ this.region = region;
+ }
+
+ public KeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ public KeyRange getRegionKeyRange() {
+ return regionKeyRange;
+ }
+
+ public boolean regionEnclosed() {
+ return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd())
+ .encloses(
+ KeyRangeUtils.makeRange(
+ regionKeyRange.getStart(), regionKeyRange.getEnd()));
+ }
+
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOGGER.info("close (region: {})", region.getId());
+ running.set(false);
+ // fix: close grpc channel will make client threadpool shutdown.
+ /*
+ synchronized (this) {
+ channel.shutdown();
+ }
+ try {
+ LOGGER.debug("awaitTermination (region: {})", region.getId());
+ channel.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId());
+ Thread.currentThread().interrupt();
+ synchronized (this) {
+ channel.shutdownNow();
+ }
+ }
+ */
+ LOGGER.info("terminated (region: {})", region.getId());
+ }
+
+ @Override
+ public void onCompleted() {
+ // should never been called
+ onError(new IllegalStateException("RegionCDCClient should never complete"));
+ }
+
+ @Override
+ public void onError(final Throwable error) {
+ onError(error, this.resolvedTs);
+ }
+
+ private void onError(final Throwable error, long resolvedTs) {
+ LOGGER.error(
+ "region CDC error: region: {}, resolvedTs:{}, error: {}",
+ region.getId(),
+ resolvedTs,
+ error);
+ running.set(false);
+ eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs));
+ }
+
+ @Override
+ public void onNext(final ChangeDataEvent event) {
+ try {
+ if (running.get()) {
+ // fix: miss to process error event
+ onErrorEventHandle(event);
+ event.getEventsList().stream()
+ .flatMap(ev -> ev.getEntries().getEntriesList().stream())
+ .filter(row -> ALLOWED_LOGTYPE.contains(row.getType()))
+ .filter(this.rowFilter)
+ .map(row -> CDCEvent.rowEvent(region.getId(), row))
+ .forEach(this::submitEvent);
+
+ if (event.hasResolvedTs()) {
+ final ResolvedTs resolvedTs = event.getResolvedTs();
+ this.resolvedTs = resolvedTs.getTs();
+ if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) {
+ submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs()));
+ }
+ }
+ }
+ } catch (final Exception e) {
+ onError(e, resolvedTs);
+ }
+ }
+
+ // error event handle
+ private void onErrorEventHandle(final ChangeDataEvent event) {
+ List errorEvents =
+ event.getEventsList().stream()
+ .filter(errEvent -> errEvent.hasError())
+ .collect(Collectors.toList());
+ if (errorEvents != null && errorEvents.size() > 0) {
+ onError(
+ new RuntimeException(
+ "regionCDC error:" + errorEvents.get(0).getError().toString()),
+ this.resolvedTs);
+ }
+ }
+
+ private void submitEvent(final CDCEvent event) {
+ LOGGER.debug("submit event: {}", event);
+ eventConsumer.accept(event);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml b/seatunnel-connectors-v2/connector-cdc/pom.xml
index 44916d35caa2..a5d908dd9e5d 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/pom.xml
@@ -36,6 +36,7 @@
connector-cdc-mongodb
connector-cdc-postgres
connector-cdc-oracle
+ connector-cdc-tidb
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index cfe0ed44ecdb..6298518e2ae4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -52,6 +52,7 @@
2.4.3
12.2.0
3.0.0
+ 3.2.0
@@ -202,6 +203,13 @@
${iris.jdbc.version}
provided
+
+
+ org.tikv
+ tikv-client-java
+ ${tikv.version}
+ provided
+
@@ -308,6 +316,11 @@
com.intersystems
intersystems-jdbc
+
+
+ org.tikv
+ tikv-client-java
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml
new file mode 100644
index 000000000000..5c7def860a1f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+
+ connector-tidb-e2e
+ SeaTunnel : E2E : Connector V2 : TiDB
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-cdc-tidb
+ ${project.version}
+ test
+
+
+ mysql
+ mysql-connector-java
+ 8.0.27
+ test
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
new file mode 100644
index 000000000000..ed3bfa807ca0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tidb;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+public class TiDBCDCIT extends TiDBTestBase implements TestResource {
+
+ private final String SQL_TEMPLATE = "select id,name,description,weight from %s.%s";
+
+ private String driverUrl() {
+ return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/TiDB-CDC/lib && cd "
+ + "/tmp/seatunnel/plugins/TiDB-CDC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ startContainers();
+ }
+
+ @TestTemplate
+ public void testAllEvents(TestContainer container) throws Exception {
+ Container.ExecResult execResult = container.executeJob("/tidb/tidb_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ try (Connection connection = getJdbcConnection("inventory");
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+ statement.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
+ statement.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
+ statement.execute(
+ "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ statement.execute("DELETE FROM products WHERE id=111;");
+ }
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL("inventory", "products")),
+ query(getQuerySQL("inventory", "products_sink")));
+ });
+ }
+
+ @TestTemplate
+ public void testAllTypes(TestContainer container) throws Exception {
+ Container.ExecResult execResult = container.executeJob("/tidb/tidb_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ try (Connection connection = getJdbcConnection("column_type_test");
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
+ }
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL("column_type_test", "full_types")),
+ query(getQuerySQL("column_type_test", "full_types_sink")));
+ });
+ }
+
+ private List> query(String sql) {
+ try (Connection connection = getJdbcConnection("inventory")) {
+ ResultSet resultSet = connection.createStatement().executeQuery(sql);
+ List> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getObject(i));
+ }
+ log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects));
+ result.add(objects);
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getQuerySQL(String database, String tableName) {
+ return String.format(SQL_TEMPLATE, database, tableName);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ stopContainers();
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java
new file mode 100644
index 000000000000..445e5d47ad91
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tidb;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.apache.commons.lang3.RandomUtils;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertNotNull;
+
+/** Utility class for tidb tests. */
+@Slf4j
+public class TiDBTestBase extends TestSuiteBase {
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+
+ public static final String PD_CONTAINER_HOST = "pd-e2e";
+ public static final String TIKV_CONTAINER_HOST = "tikv-e2e";
+ public static final String TIDB_CONTAINER_HOST = "tidb-e2e";
+
+ public static final String TIDB_USER = "root";
+ public static final String TIDB_PASSWORD = "seatunnel";
+
+ public static final int TIDB_PORT = 4000;
+ public static final int TIKV_PORT_ORIGIN = 20160;
+ public static final int PD_PORT_ORIGIN = 2379;
+ public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ public static final GenericContainer> PD =
+ new FixedHostPortGenericContainer<>("pingcap/pd:v6.1.0")
+ .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml")
+ .withFixedExposedPort(pdPort, PD_PORT_ORIGIN)
+ .withCommand(
+ "--name=pd0",
+ "--client-urls=http://0.0.0.0:" + pdPort + ",http://0.0.0.0:2379",
+ "--peer-urls=http://0.0.0.0:2380",
+ "--advertise-client-urls=http://pd0:" + pdPort + ",http://pd0:2379",
+ "--advertise-peer-urls=http://pd0:2380",
+ "--initial-cluster=pd0=http://pd0:2380",
+ "--data-dir=/data/pd0",
+ "--config=/pd.toml",
+ "--log-file=/logs/pd0.log")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PD_CONTAINER_HOST)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
+ public static final GenericContainer> TIKV =
+ new FixedHostPortGenericContainer<>("pingcap/tikv:v6.1.0")
+ .withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN)
+ .withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml")
+ .withCommand(
+ "--addr=0.0.0.0:20160",
+ "--advertise-addr=tikv0:20160",
+ "--data-dir=/data/tikv0",
+ "--pd=pd0:2379",
+ "--config=/tikv.toml",
+ "--log-file=/logs/tikv0.log")
+ .withNetwork(NETWORK)
+ .dependsOn(PD)
+ .withNetworkAliases(TIKV_CONTAINER_HOST)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
+ public static final GenericContainer> TIDB =
+ new GenericContainer<>("pingcap/tidb:v6.1.0")
+ .withExposedPorts(TIDB_PORT)
+ .withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml")
+ .withCommand(
+ "--store=tikv",
+ "--path=pd0:2379",
+ "--config=/tidb.toml",
+ "--advertise-address=tidb0")
+ .withNetwork(NETWORK)
+ .dependsOn(TIKV)
+ .withNetworkAliases(TIDB_CONTAINER_HOST)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
+ public static void startContainers() throws Exception {
+ log.info("Starting containers...");
+ Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join();
+ log.info("Containers are started.");
+ }
+
+ public static void stopContainers() {
+ Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop);
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ return "jdbc:mysql://"
+ + TIDB.getContainerIpAddress()
+ + ":"
+ + TIDB.getMappedPort(TIDB_PORT)
+ + "/"
+ + databaseName;
+ }
+
+ protected Connection getJdbcConnection(String databaseName) throws SQLException {
+ return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD);
+ }
+
+ private static void dropTestDatabase(Connection connection, String databaseName)
+ throws SQLException {
+ try {
+ Awaitility.await(String.format("Dropping database %s", databaseName))
+ .atMost(120, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try {
+ String sql =
+ String.format(
+ "DROP DATABASE IF EXISTS %s", databaseName);
+ connection.createStatement().execute(sql);
+ return true;
+ } catch (SQLException e) {
+ log.warn(
+ String.format(
+ "DROP DATABASE %s failed: {}", databaseName),
+ e.getMessage());
+ return false;
+ }
+ });
+ } catch (ConditionTimeoutException e) {
+ throw new IllegalStateException("Failed to drop test database", e);
+ }
+ }
+
+ /**
+ * Executes a JDBC statement using the default jdbc config without autocommitting the
+ * connection.
+ */
+ protected void initializeTidbTable(String sqlFile) {
+ final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+ final URL ddlTestFile = TiDBTestBase.class.getClassLoader().getResource(ddlFile);
+ assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ try (Connection connection = getJdbcConnection("");
+ Statement statement = connection.createStatement()) {
+ dropTestDatabase(connection, sqlFile);
+ final List statements =
+ Arrays.stream(
+ Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/pd.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/pd.toml
new file mode 100644
index 000000000000..d4d23f5465a3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/pd.toml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# PD Configuration.
+
+name = "pd"
+data-dir = "default.pd"
+
+client-urls = "http://127.0.0.1:2379"
+# if not set, use ${client-urls}
+advertise-client-urls = ""
+
+peer-urls = "http://127.0.0.1:2380"
+# if not set, use ${peer-urls}
+advertise-peer-urls = ""
+
+initial-cluster = "pd=http://127.0.0.1:2380"
+initial-cluster-state = "new"
+
+lease = 3
+tso-save-interval = "3s"
+
+[security]
+# Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty
+cacert-path = ""
+# Path of file that contains X509 certificate in PEM format.
+cert-path = ""
+# Path of file that contains X509 key in PEM format.
+key-path = ""
+
+[log]
+level = "error"
+
+# log format, one of json, text, console
+#format = "text"
+
+# disable automatic timestamps in output
+#disable-timestamp = false
+
+# file logging
+[log.file]
+#filename = ""
+# max log file size in MB
+#max-size = 300
+# max log file keep days
+#max-days = 28
+# maximum number of old log files to retain
+#max-backups = 7
+# rotate log by day
+#log-rotate = true
+
+[metric]
+# prometheus client push interval, set "0s" to disable prometheus.
+interval = "15s"
+# prometheus pushgateway address, leaves it empty will disable prometheus.
+address = "pushgateway:9091"
+
+[schedule]
+max-merge-region-size = 0
+split-merge-interval = "1h"
+max-snapshot-count = 3
+max-pending-peer-count = 16
+max-store-down-time = "30m"
+leader-schedule-limit = 4
+region-schedule-limit = 4
+replica-schedule-limit = 8
+merge-schedule-limit = 8
+tolerant-size-ratio = 5.0
+
+# customized schedulers, the format is as below
+# if empty, it will use balance-leader, balance-region, hot-region as default
+# [[schedule.schedulers]]
+# type = "evict-leader"
+# args = ["1"]
+
+[replication]
+# The number of replicas for each region.
+max-replicas = 3
+# The label keys specified the location of a store.
+# The placement priorities is implied by the order of label keys.
+# For example, ["zone", "rack"] means that we should place replicas to
+# different zones first, then to different racks if we don't have enough zones.
+location-labels = []
+
+[label-property]
+# Do not assign region leaders to stores that have these tags.
+# [[label-property.reject-leader]]
+# key = "zone"
+# value = "cn1
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tidb.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tidb.toml
new file mode 100644
index 000000000000..742eb935d1c4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tidb.toml
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TiDB Configuration.
+
+# TiDB server host.
+host = "0.0.0.0"
+
+# TiDB server port.
+port = 4000
+
+# Registered store name, [tikv, mocktikv]
+store = "tikv"
+
+# TiDB storage path.
+path = "/tmp/tidb"
+
+# The socket file to use for connection.
+socket = ""
+
+# Run ddl worker on this tidb-server.
+run-ddl = true
+
+# Schema lease duration, very dangerous to change only if you know what you do.
+lease = "0"
+
+# When create table, split a separated region for it. It is recommended to
+# turn off this option if there will be a large number of tables created.
+split-table = true
+
+# The limit of concurrent executed sessions.
+token-limit = 1000
+
+# Only print a log when out of memory quota.
+# Valid options: ["log", "cancel"]
+oom-action = "log"
+
+# Set the memory quota for a query in bytes. Default: 32GB
+mem-quota-query = 34359738368
+
+# Set system variable 'lower_case_table_names'
+lower-case-table-names = 2
+
+[log]
+# Log level: debug, info, warn, error, fatal.
+level = "error"
+
+# Log format, one of json, text, console.
+format = "text"
+
+# Disable automatic timestamp in output
+disable-timestamp = false
+
+# Stores slow query log into separated files.
+slow-query-file = ""
+
+# Queries with execution time greater than this value will be logged. (Milliseconds)
+slow-threshold = 300
+
+# Queries with internal result greater than this value will be logged.
+expensive-threshold = 10000
+
+# Maximum query length recorded in log.
+query-log-max-len = 2048
+
+# File logging.
+[log.file]
+# Log file name.
+filename = ""
+
+# Max log file size in MB (upper limit to 4096MB).
+max-size = 300
+
+# Max log file keep days. No clean up by default.
+max-days = 0
+
+# Maximum number of old log files to retain. No clean up by default.
+max-backups = 0
+
+[security]
+# Path of file that contains list of trusted SSL CAs for connection with mysql client.
+ssl-ca = ""
+
+# Path of file that contains X509 certificate in PEM format for connection with mysql client.
+ssl-cert = ""
+
+# Path of file that contains X509 key in PEM format for connection with mysql client.
+ssl-key = ""
+
+# Path of file that contains list of trusted SSL CAs for connection with cluster components.
+cluster-ssl-ca = ""
+
+# Path of file that contains X509 certificate in PEM format for connection with cluster components.
+cluster-ssl-cert = ""
+
+# Path of file that contains X509 key in PEM format for connection with cluster components.
+cluster-ssl-key = ""
+
+[status]
+# If enable status report HTTP service.
+report-status = true
+
+# TiDB status port.
+status-port = 10080
+
+# Prometheus client push interval in second, set \"0\" to disable prometheus push.
+metrics-interval = 15
+
+[performance]
+# Max CPUs to use, 0 use number of CPUs in the machine.
+max-procs = 0
+# StmtCountLimit limits the max count of statement inside a transaction.
+stmt-count-limit = 5000
+
+# Set keep alive option for tcp connection.
+tcp-keep-alive = true
+
+# Whether support cartesian product.
+cross-join = true
+
+# Stats lease duration, which influences the time of analyze and stats load.
+stats-lease = "3s"
+
+# Run auto analyze worker on this tidb-server.
+run-auto-analyze = true
+
+# Probability to use the query feedback to update stats, 0 or 1 for always false/true.
+feedback-probability = 0.0
+
+# The max number of query feedback that cache in memory.
+query-feedback-limit = 1024
+
+# Pseudo stats will be used if the ratio between the modify count and
+# row count in statistics of a table is greater than it.
+pseudo-estimate-ratio = 0.7
+
+[proxy-protocol]
+# PROXY protocol acceptable client networks.
+# Empty string means disable PROXY protocol, * means all networks.
+networks = ""
+
+# PROXY protocol header read timeout, unit is second
+header-timeout = 5
+
+[opentracing]
+# Enable opentracing.
+enable = false
+
+# Whether to enable the rpc metrics.
+rpc-metrics = false
+
+[opentracing.sampler]
+# Type specifies the type of the sampler: const, probabilistic, rateLimiting, or remote
+type = "const"
+
+# Param is a value passed to the sampler.
+# Valid values for Param field are:
+# - for "const" sampler, 0 or 1 for always false/true respectively
+# - for "probabilistic" sampler, a probability between 0 and 1
+# - for "rateLimiting" sampler, the number of spans per second
+# - for "remote" sampler, param is the same as for "probabilistic"
+# and indicates the initial sampling rate before the actual one
+# is received from the mothership
+param = 1.0
+
+# SamplingServerURL is the address of jaeger-agent's HTTP sampling server
+sampling-server-url = ""
+
+# MaxOperations is the maximum number of operations that the sampler
+# will keep track of. If an operation is not tracked, a default probabilistic
+# sampler will be used rather than the per operation specific sampler.
+max-operations = 0
+
+# SamplingRefreshInterval controls how often the remotely controlled sampler will poll
+# jaeger-agent for the appropriate sampling strategy.
+sampling-refresh-interval = 0
+
+[opentracing.reporter]
+# QueueSize controls how many spans the reporter can keep in memory before it starts dropping
+# new spans. The queue is continuously drained by a background go-routine, as fast as spans
+# can be sent out of process.
+queue-size = 0
+
+# BufferFlushInterval controls how often the buffer is force-flushed, even if it's not full.
+# It is generally not useful, as it only matters for very low traffic services.
+buffer-flush-interval = 0
+
+# LogSpans, when true, enables LoggingReporter that runs in parallel with the main reporter
+# and logs all submitted spans. Main Configuration.Logger must be initialized in the code
+# for this option to have any effect.
+log-spans = false
+
+# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
+local-agent-host-port = ""
+
+[tikv-client]
+# Max gRPC connections that will be established with each tikv-server.
+grpc-connection-count = 16
+
+# After a duration of this time in seconds if the client doesn't see any activity it pings
+# the server to see if the transport is still alive.
+grpc-keepalive-time = 10
+
+# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
+# and if no activity is seen even after that the connection is closed.
+grpc-keepalive-timeout = 3
+
+# max time for commit command, must be twice bigger than raft election timeout.
+commit-timeout = "41s"
+
+[binlog]
+
+# Socket file to write binlog.
+binlog-socket = ""
+
+# WriteTimeout specifies how long it will wait for writing binlog to pump.
+write-timeout = "15s"
+
+# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog,
+# but still provide service.
+ignore-error = false
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tikv.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tikv.toml
new file mode 100644
index 000000000000..372ab91ac9d4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tikv.toml
@@ -0,0 +1,513 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TiKV config template
+# Human-readable big numbers:
+# File size(based on byte): KB, MB, GB, TB, PB
+# e.g.: 1_048_576 = "1MB"
+# Time(based on ms): ms, s, m, h
+# e.g.: 78_000 = "1.3m"
+
+# log level: trace, debug, info, warn, error, off.
+log-level = "error"
+# file to store log, write to stderr if it's empty.
+# log-file = ""
+log-rotation-size="500MB"
+
+[readpool.storage]
+# size of thread pool for high-priority operations
+# high-concurrency = 4
+# size of thread pool for normal-priority operations
+# normal-concurrency = 4
+# size of thread pool for low-priority operations
+# low-concurrency = 4
+# max running high-priority operations, reject if exceed
+# max-tasks-high = 8000
+# max running normal-priority operations, reject if exceed
+# max-tasks-normal = 8000
+# max running low-priority operations, reject if exceed
+# max-tasks-low = 8000
+# size of stack size for each thread pool
+# stack-size = "10MB"
+
+[readpool.coprocessor]
+# Notice: if CPU_NUM > 8, default thread pool size for coprocessors
+# will be set to CPU_NUM * 0.8.
+
+# high-concurrency = 8
+# normal-concurrency = 8
+# low-concurrency = 8
+# max-tasks-high = 16000
+# max-tasks-normal = 16000
+# max-tasks-low = 16000
+# stack-size = "10MB"
+
+[server]
+# set listening address.
+# addr = "127.0.0.1:20160"
+# set advertise listening address for client communication, if not set, use addr instead.
+# advertise-addr = ""
+# notify capacity, 40960 is suitable for about 7000 regions.
+# notify-capacity = 40960
+# maximum number of messages can be processed in one tick.
+# messages-per-tick = 4096
+
+# compression type for grpc channel, available values are no, deflate and gzip.
+# grpc-compression-type = "no"
+# size of thread pool for grpc server.
+# grpc-concurrency = 4
+# The number of max concurrent streams/requests on a client connection.
+# grpc-concurrent-stream = 1024
+# The number of connections with each tikv server to send raft messages.
+# grpc-raft-conn-num = 10
+# Amount to read ahead on individual grpc streams.
+# grpc-stream-initial-window-size = "2MB"
+
+# How many snapshots can be sent concurrently.
+# concurrent-send-snap-limit = 32
+# How many snapshots can be recv concurrently.
+# concurrent-recv-snap-limit = 32
+
+# max count of tasks being handled, new tasks will be rejected.
+# end-point-max-tasks = 2000
+
+# max recursion level allowed when decoding dag expression
+# end-point-recursion-limit = 1000
+
+# max time to handle coprocessor request before timeout
+# end-point-request-max-handle-duration = "60s"
+
+# the max bytes that snapshot can be written to disk in one second,
+# should be set based on your disk performance
+# snap-max-write-bytes-per-sec = "100MB"
+
+# set attributes about this server, e.g. { zone = "us-west-1", disk = "ssd" }.
+# labels = {}
+
+[storage]
+# set the path to rocksdb directory.
+# data-dir = "/tmp/tikv/store"
+
+# notify capacity of scheduler's channel
+# scheduler-notify-capacity = 10240
+
+# maximum number of messages can be processed in one tick
+# scheduler-messages-per-tick = 1024
+
+# the number of slots in scheduler latches, concurrency control for write.
+# scheduler-concurrency = 2048000
+
+# scheduler's worker pool size, should increase it in heavy write cases,
+# also should less than total cpu cores.
+# scheduler-worker-pool-size = 4
+
+# When the pending write bytes exceeds this threshold,
+# the "scheduler too busy" error is displayed.
+# scheduler-pending-write-threshold = "100MB"
+
+[pd]
+# pd endpoints
+# endpoints = []
+
+[metric]
+# the Prometheus client push interval. Setting the value to 0s stops Prometheus client from pushing.
+# interval = "15s"
+# the Prometheus pushgateway address. Leaving it empty stops Prometheus client from pushing.
+address = "pushgateway:9091"
+# the Prometheus client push job name. Note: A node id will automatically append, e.g., "tikv_1".
+# job = "tikv"
+
+[raftstore]
+# true (default value) for high reliability, this can prevent data loss when power failure.
+# sync-log = true
+
+# set the path to raftdb directory, default value is data-dir/raft
+# raftdb-path = ""
+
+# set store capacity, if no set, use disk capacity.
+# capacity = 0
+
+# notify capacity, 40960 is suitable for about 7000 regions.
+# notify-capacity = 40960
+
+# maximum number of messages can be processed in one tick.
+# messages-per-tick = 4096
+
+# Region heartbeat tick interval for reporting to pd.
+# pd-heartbeat-tick-interval = "60s"
+# Store heartbeat tick interval for reporting to pd.
+# pd-store-heartbeat-tick-interval = "10s"
+
+# When region size changes exceeds region-split-check-diff, we should check
+# whether the region should be split or not.
+# region-split-check-diff = "6MB"
+
+# Interval to check region whether need to be split or not.
+# split-region-check-tick-interval = "10s"
+
+# When raft entry exceed the max size, reject to propose the entry.
+# raft-entry-max-size = "8MB"
+
+# Interval to gc unnecessary raft log.
+# raft-log-gc-tick-interval = "10s"
+# A threshold to gc stale raft log, must >= 1.
+# raft-log-gc-threshold = 50
+# When entry count exceed this value, gc will be forced trigger.
+# raft-log-gc-count-limit = 72000
+# When the approximate size of raft log entries exceed this value, gc will be forced trigger.
+# It's recommanded to set it to 3/4 of region-split-size.
+# raft-log-gc-size-limit = "72MB"
+
+# When a peer hasn't been active for max-peer-down-duration,
+# we will consider this peer to be down and report it to pd.
+# max-peer-down-duration = "5m"
+
+# Interval to check whether start manual compaction for a region,
+# region-compact-check-interval = "5m"
+# Number of regions for each time to check.
+# region-compact-check-step = 100
+# The minimum number of delete tombstones to trigger manual compaction.
+# region-compact-min-tombstones = 10000
+# Interval to check whether should start a manual compaction for lock column family,
+# if written bytes reach lock-cf-compact-threshold for lock column family, will fire
+# a manual compaction for lock column family.
+# lock-cf-compact-interval = "10m"
+# lock-cf-compact-bytes-threshold = "256MB"
+
+# Interval (s) to check region whether the data are consistent.
+# consistency-check-interval = 0
+
+# Use delete range to drop a large number of continuous keys.
+# use-delete-range = false
+
+# delay time before deleting a stale peer
+# clean-stale-peer-delay = "10m"
+
+# Interval to cleanup import sst files.
+# cleanup-import-sst-interval = "10m"
+
+[coprocessor]
+# When it is true, it will try to split a region with table prefix if
+# that region crosses tables. It is recommended to turn off this option
+# if there will be a large number of tables created.
+# split-region-on-table = true
+# When the region's size exceeds region-max-size, we will split the region
+# into two which the left region's size will be region-split-size or a little
+# bit smaller.
+# region-max-size = "144MB"
+# region-split-size = "96MB"
+
+[rocksdb]
+# Maximum number of concurrent background jobs (compactions and flushes)
+# max-background-jobs = 8
+
+# This value represents the maximum number of threads that will concurrently perform a
+# compaction job by breaking it into multiple, smaller ones that are run simultaneously.
+# Default: 1 (i.e. no subcompactions)
+# max-sub-compactions = 1
+
+# Number of open files that can be used by the DB. You may need to
+# increase this if your database has a large working set. Value -1 means
+# files opened are always kept open. You can estimate number of files based
+# on target_file_size_base and target_file_size_multiplier for level-based
+# compaction.
+# If max-open-files = -1, RocksDB will prefetch index and filter blocks into
+# block cache at startup, so if your database has a large working set, it will
+# take several minutes to open the db.
+max-open-files = 1024
+
+# Max size of rocksdb's MANIFEST file.
+# For detailed explanation please refer to https://github.com/facebook/rocksdb/wiki/MANIFEST
+# max-manifest-file-size = "20MB"
+
+# If true, the database will be created if it is missing.
+# create-if-missing = true
+
+# rocksdb wal recovery mode
+# 0 : TolerateCorruptedTailRecords, tolerate incomplete record in trailing data on all logs;
+# 1 : AbsoluteConsistency, We don't expect to find any corruption in the WAL;
+# 2 : PointInTimeRecovery, Recover to point-in-time consistency;
+# 3 : SkipAnyCorruptedRecords, Recovery after a disaster;
+# wal-recovery-mode = 2
+
+# rocksdb write-ahead logs dir path
+# This specifies the absolute dir path for write-ahead logs (WAL).
+# If it is empty, the log files will be in the same dir as data.
+# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
+# wal-dir to a directory on a persistent storage.
+# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
+# wal-dir = "/tmp/tikv/store"
+
+# The following two fields affect how archived write-ahead logs will be deleted.
+# 1. If both set to 0, logs will be deleted asap and will not get into the archive.
+# 2. If wal-ttl-seconds is 0 and wal-size-limit is not 0,
+# WAL files will be checked every 10 min and if total size is greater
+# then wal-size-limit, they will be deleted starting with the
+# earliest until size_limit is met. All empty files will be deleted.
+# 3. If wal-ttl-seconds is not 0 and wal-size-limit is 0, then
+# WAL files will be checked every wal-ttl-seconds / 2 and those that
+# are older than wal-ttl-seconds will be deleted.
+# 4. If both are not 0, WAL files will be checked every 10 min and both
+# checks will be performed with ttl being first.
+# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
+# wal-ttl-seconds to a value greater than 0 (like 86400) and backup your db on a regular basis.
+# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
+# wal-ttl-seconds = 0
+# wal-size-limit = 0
+
+# rocksdb max total wal size
+# max-total-wal-size = "4GB"
+
+# Rocksdb Statistics provides cumulative stats over time.
+# Turn statistics on will introduce about 5%-10% overhead for RocksDB,
+# but it is worthy to know the internal status of RocksDB.
+# enable-statistics = true
+
+# Dump statistics periodically in information logs.
+# Same as rocksdb's default value (10 min).
+# stats-dump-period = "10m"
+
+# Due to Rocksdb FAQ: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ,
+# If you want to use rocksdb on multi disks or spinning disks, you should set value at
+# least 2MB;
+# compaction-readahead-size = 0
+
+# This is the maximum buffer size that is used by WritableFileWrite
+# writable-file-max-buffer-size = "1MB"
+
+# Use O_DIRECT for both reads and writes in background flush and compactions
+# use-direct-io-for-flush-and-compaction = false
+
+# Limit the disk IO of compaction and flush. Compaction and flush can cause
+# terrible spikes if they exceed a certain threshold. Consider setting this to
+# 50% ~ 80% of the disk throughput for a more stable result. However, in heavy
+# write workload, limiting compaction and flush speed can cause write stalls too.
+# rate-bytes-per-sec = 0
+
+# Enable or disable the pipelined write
+# enable-pipelined-write = true
+
+# Allows OS to incrementally sync files to disk while they are being
+# written, asynchronously, in the background.
+# bytes-per-sync = "0MB"
+
+# Allows OS to incrementally sync WAL to disk while it is being written.
+# wal-bytes-per-sync = "0KB"
+
+# Specify the maximal size of the Rocksdb info log file. If the log file
+# is larger than `max_log_file_size`, a new info log file will be created.
+# If max_log_file_size == 0, all logs will be written to one log file.
+# Default: 1GB
+# info-log-max-size = "1GB"
+
+# Time for the Rocksdb info log file to roll (in seconds).
+# If specified with non-zero value, log file will be rolled
+# if it has been active longer than `log_file_time_to_roll`.
+# Default: 0 (disabled)
+# info-log-roll-time = "0"
+
+# Maximal Rocksdb info log files to be kept.
+# Default: 10
+# info-log-keep-log-file-num = 10
+
+# This specifies the Rocksdb info LOG dir.
+# If it is empty, the log files will be in the same dir as data.
+# If it is non empty, the log files will be in the specified dir,
+# and the db data dir's absolute path will be used as the log file
+# name's prefix.
+# Default: empty
+# info-log-dir = ""
+
+# Column Family default used to store actual data of the database.
+[rocksdb.defaultcf]
+# compression method (if any) is used to compress a block.
+# no: kNoCompression
+# snappy: kSnappyCompression
+# zlib: kZlibCompression
+# bzip2: kBZip2Compression
+# lz4: kLZ4Compression
+# lz4hc: kLZ4HCCompression
+# zstd: kZSTD
+
+# per level compression
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+
+# Approximate size of user data packed per block. Note that the
+# block size specified here corresponds to uncompressed data.
+# block-size = "64KB"
+
+# If you're doing point lookups you definitely want to turn bloom filters on, We use
+# bloom filters to avoid unnecessary disk reads. Default bits_per_key is 10, which
+# yields ~1% false positive rate. Larger bits_per_key values will reduce false positive
+# rate, but increase memory usage and space amplification.
+# bloom-filter-bits-per-key = 10
+
+# false means one sst file one bloom filter, true means evry block has a corresponding bloom filter
+# block-based-bloom-filter = false
+
+# level0-file-num-compaction-trigger = 4
+
+# Soft limit on number of level-0 files. We start slowing down writes at this point.
+# level0-slowdown-writes-trigger = 20
+
+# Maximum number of level-0 files. We stop writes at this point.
+# level0-stop-writes-trigger = 36
+
+# Amount of data to build up in memory (backed by an unsorted log
+# on disk) before converting to a sorted on-disk file.
+# write-buffer-size = "128MB"
+
+# The maximum number of write buffers that are built up in memory.
+# max-write-buffer-number = 5
+
+# The minimum number of write buffers that will be merged together
+# before writing to storage.
+# min-write-buffer-number-to-merge = 1
+
+# Control maximum total data size for base level (level 1).
+# max-bytes-for-level-base = "512MB"
+
+# Target file size for compaction.
+# target-file-size-base = "8MB"
+
+# Max bytes for compaction.max_compaction_bytes
+# max-compaction-bytes = "2GB"
+
+# There are four different algorithms to pick files to compact.
+# 0 : ByCompensatedSize
+# 1 : OldestLargestSeqFirst
+# 2 : OldestSmallestSeqFirst
+# 3 : MinOverlappingRatio
+# compaction-pri = 3
+
+# block-cache used to cache uncompressed blocks, big block-cache can speed up read.
+# in normal cases should tune to 30%-50% system's total memory.
+# block-cache-size = "1GB"
+
+# Indicating if we'd put index/filter blocks to the block cache.
+# If not specified, each "table reader" object will pre-load index/filter block
+# during table initialization.
+# cache-index-and-filter-blocks = true
+
+# Pin level0 filter and index blocks in cache.
+# pin-l0-filter-and-index-blocks = true
+
+# Enable read amplication statistics.
+# value => memory usage (percentage of loaded blocks memory)
+# 1 => 12.50 %
+# 2 => 06.25 %
+# 4 => 03.12 %
+# 8 => 01.56 %
+# 16 => 00.78 %
+# read-amp-bytes-per-bit = 0
+
+# Pick target size of each level dynamically.
+# dynamic-level-bytes = true
+
+# Options for Column Family write
+# Column Family write used to store commit informations in MVCC model
+[rocksdb.writecf]
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+# block-size = "64KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "512MB"
+# target-file-size-base = "8MB"
+
+# in normal cases should tune to 10%-30% system's total memory.
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 4
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 3
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[rocksdb.lockcf]
+# compression-per-level = ["no", "no", "no", "no", "no", "no", "no"]
+# block-size = "16KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "128MB"
+# target-file-size-base = "8MB"
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 1
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 0
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[raftdb]
+# max-sub-compactions = 1
+max-open-files = 1024
+# max-manifest-file-size = "20MB"
+# create-if-missing = true
+
+# enable-statistics = true
+# stats-dump-period = "10m"
+
+# compaction-readahead-size = 0
+# writable-file-max-buffer-size = "1MB"
+# use-direct-io-for-flush-and-compaction = false
+# enable-pipelined-write = true
+# allow-concurrent-memtable-write = false
+# bytes-per-sync = "0MB"
+# wal-bytes-per-sync = "0KB"
+
+# info-log-max-size = "1GB"
+# info-log-roll-time = "0"
+# info-log-keep-log-file-num = 10
+# info-log-dir = ""
+
+[raftdb.defaultcf]
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+# block-size = "64KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "512MB"
+# target-file-size-base = "8MB"
+
+# should tune to 256MB~2GB.
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 4
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 0
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[security]
+# set the path for certificates. Empty string means disabling secure connectoins.
+# ca-path = ""
+# cert-path = ""
+# key-path = ""
+
+[import]
+# the directory to store importing kv data.
+# import-dir = "/tmp/tikv/import"
+# number of threads to handle RPC requests.
+# num-threads = 8
+# stream channel window size, stream will be blocked on channel full.
+# stream-channel-window = 128
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/column_all_type.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/column_all_type.sql
new file mode 100644
index 000000000000..d1276baca7c9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/column_all_type.sql
@@ -0,0 +1,119 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: column_type_test
+-- ----------------------------------------------------------------------------------------------------------------
+DROP DATABASE IF EXISTS column_type_test;
+CREATE DATABASE column_type_test;
+USE column_type_test;
+
+CREATE TABLE full_types (
+ id INT AUTO_INCREMENT NOT NULL,
+ tiny_c TINYINT,
+ tiny_un_c TINYINT UNSIGNED ,
+ small_c SMALLINT,
+ small_un_c SMALLINT UNSIGNED,
+ medium_c MEDIUMINT,
+ medium_un_c MEDIUMINT UNSIGNED,
+ int_c INTEGER ,
+ int_un_c INTEGER UNSIGNED,
+ int11_c INT(11) ,
+ big_c BIGINT,
+ big_un_c BIGINT UNSIGNED,
+ varchar_c VARCHAR(255),
+ char_c CHAR(3),
+ real_c REAL,
+ float_c FLOAT,
+ double_c DOUBLE,
+ decimal_c DECIMAL(8, 4),
+ numeric_c NUMERIC(6, 0),
+ big_decimal_c DECIMAL(65, 1),
+ bit1_c BIT,
+ tiny1_c TINYINT(1),
+ boolean_c BOOLEAN,
+ date_c DATE,
+ time_c TIME(0),
+ datetime3_c DATETIME(3),
+ datetime6_c DATETIME(6),
+ timestamp_c TIMESTAMP,
+ file_uuid BINARY(16),
+ bit_c BIT(64),
+ text_c TEXT,
+ tiny_blob_c TINYBLOB,
+ blob_c BLOB,
+ medium_blob_c MEDIUMBLOB,
+ long_blob_c LONGBLOB,
+ year_c YEAR,
+ enum_c enum('red', 'white') default 'red',
+ set_c SET('a', 'b'),
+ json_c JSON,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO full_types VALUES (
+ DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807,
+ 18446744073709551615,
+ 'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true,
+ '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',
+ unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100',
+ 'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021,
+ 'red', 'a,b,a', '{"key1": "value1"}'
+ );
+
+CREATE TABLE full_types_sink (
+ id INT AUTO_INCREMENT NOT NULL,
+ tiny_c TINYINT,
+ tiny_un_c TINYINT UNSIGNED ,
+ small_c SMALLINT,
+ small_un_c SMALLINT UNSIGNED,
+ medium_c MEDIUMINT,
+ medium_un_c MEDIUMINT UNSIGNED,
+ int_c INTEGER ,
+ int_un_c INTEGER UNSIGNED,
+ int11_c INT(11) ,
+ big_c BIGINT,
+ big_un_c BIGINT UNSIGNED,
+ varchar_c VARCHAR(255),
+ char_c CHAR(3),
+ real_c REAL,
+ float_c FLOAT,
+ double_c DOUBLE,
+ decimal_c DECIMAL(8, 4),
+ numeric_c NUMERIC(6, 0),
+ big_decimal_c DECIMAL(65, 1),
+ bit1_c BIT,
+ tiny1_c TINYINT(1),
+ boolean_c BOOLEAN,
+ date_c DATE,
+ time_c TIME(0),
+ datetime3_c DATETIME(3),
+ datetime6_c DATETIME(6),
+ timestamp_c TIMESTAMP,
+ file_uuid BINARY(16),
+ bit_c BIT(64),
+ text_c TEXT,
+ tiny_blob_c TINYBLOB,
+ blob_c BLOB,
+ medium_blob_c MEDIUMBLOB,
+ long_blob_c LONGBLOB,
+ year_c YEAR,
+ enum_c enum('red', 'white') default 'red',
+ set_c SET('a', 'b'),
+ json_c JSON,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/inventory.sql
new file mode 100644
index 000000000000..d910a2c4ec55
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/inventory.sql
@@ -0,0 +1,50 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE inventory;
+
+USE inventory;
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products
+(
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'seatunnel',
+ description VARCHAR(512),
+ weight DECIMAL(20, 10)
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+CREATE TABLE products_sink
+(
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'seatunnel',
+ description VARCHAR(512),
+ weight DECIMAL(20, 10)
+);
+
+INSERT INTO products
+VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
+ (default, "car battery", "12V car battery", 8.1),
+ (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
+ (default, "hammer", "12oz carpenter's hammer", 0.75),
+ (default, "hammer", "14oz carpenter's hammer", 0.875),
+ (default, "hammer", "16oz carpenter's hammer", 1.0),
+ (default, "rocks", "box of assorted rocks", 5.3),
+ (default, "jacket", "water resistent black wind breaker", 0.1),
+ (default, "spare tire", "24 inch spare tire", 22.2);
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
new file mode 100644
index 000000000000..fc8dfc2b802b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ TiDB-CDC {
+ result_table_name = "trans_tidb_all_type_cdc"
+ base-url = "jdbc:mysql://tidb-e2e:4000/column_type_test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "seatunnel"
+ database-name = "column_type_test"
+ table-name = "full_types"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ source_table_name = "trans_tidb_all_type_cdc"
+ url = "jdbc:mysql://tidb-e2e:4000/column_type_test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "seatunnel"
+ database = "column_type_test"
+ table = "full_types_sink"
+ generate_sink_sql = true
+ primary_keys = ["id"]
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
new file mode 100644
index 000000000000..b3032b8071c5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ TiDB-CDC {
+ result_table_name = "customers_tidb_cdc"
+ base-url = "jdbc:mysql://tidb-e2e:4000/inventory"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "seatunnel"
+ database-name = "inventory"
+ table-name = "products"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ source_table_name = "trans_tidb_cdc"
+ url = "jdbc:mysql://tidb-e2e:4000/inventory"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "seatunnel"
+ database = "inventory"
+ table = "products_sink"
+ generate_sink_sql = true
+ primary_keys = ["id"]
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 2db67f88147c..ca333ff648fd 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -76,6 +76,7 @@
connector-hudi-e2e
connector-milvus-e2e
connector-activemq-e2e
+ connector-tidb-e2e