diff --git a/config/plugin_config b/config/plugin_config
index f6549168d6da..cb632e73460f 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -28,6 +28,7 @@ connector-cdc-mongodb
connector-cdc-sqlserver
connector-cdc-postgres
connector-cdc-oracle
+connector-cdc-tidb
connector-clickhouse
connector-datahub
connector-dingtalk
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1942f875d7c9..b20a28e03cc9 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -103,6 +103,7 @@ seatunnel.source.Maxcompute = connector-maxcompute
seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
+seatunnel.source.TiDB-CDC = connector-cdc-tidb
seatunnel.sink.S3Redshift = connector-s3-redshift
seatunnel.source.Web3j = connector-web3j
seatunnel.source.TDengine = connector-tdengine
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
index 1f779cd1a7b8..7f5bdb721150 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
@@ -65,7 +65,6 @@
org.tikvtikv-client-java
-
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBDialect.java
deleted file mode 100644
index 46ebd9596152..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBDialect.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-
-import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges;
-
-import java.util.List;
-
-public class TiDBDialect implements JdbcDataSourceDialect {
- /** Get the name of dialect. */
- @Override
- public String getName() {
- return null;
- }
-
- /**
- * Check if the CollectionId is case-sensitive or not.
- *
- * @param sourceConfig
- */
- @Override
- public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
- return false;
- }
-
- /**
- * Returns the {@link ChunkSplitter} which used to split collection to splits.
- *
- * @param sourceConfig
- */
- @Override
- public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
- return null;
- }
-
- /**
- * Discovers the list of table to capture.
- *
- * @param sourceConfig
- */
- @Override
- public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
- return null;
- }
-
- /**
- * Creates and opens a new {@link JdbcConnection} backing connection pool.
- *
- * @param sourceConfig a basic source configuration.
- * @return a utility that simplifies using a JDBC connection.
- */
- @Override
- public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
- return null;
- }
-
- /**
- * Query and build the schema of table.
- *
- * @param jdbc
- * @param tableId
- */
- @Override
- public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
- return null;
- }
-
- @Override
- public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) {
- return null;
- }
-
- @Override
- public JdbcSourceFetchTaskContext createFetchTaskContext(
- SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSource.java
deleted file mode 100644
index 1e1a7c07b61b..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSource.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
-import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
-import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
-import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
-import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
-import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
-import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
-
-import java.io.Serializable;
-
-public class TiDBIncrementalSource extends IncrementalSource {
-
- static final String IDENTIFIER = "TIDB-CDC";
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- *
For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- @Override
- public String getPluginName() {
- return IDENTIFIER;
- }
-
- /**
- * Create source split enumerator, used to generate splits. This method will be called only once
- * when start a source.
- *
- * @param enumeratorContext enumerator context.
- * @return source split enumerator.
- * @throws Exception when create enumerator failed.
- */
- @Override
- public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext)
- throws Exception {
- return null;
- }
-
- /**
- * Create source split enumerator, used to generate splits. This method will be called when
- * restore from checkpoint.
- *
- * @param enumeratorContext enumerator context.
- * @param checkpointState checkpoint state.
- * @return source split enumerator.
- * @throws Exception when create enumerator failed.
- */
- @Override
- public SourceSplitEnumerator restoreEnumerator(
- SourceSplitEnumerator.Context enumeratorContext, Serializable checkpointState)
- throws Exception {
- return null;
- }
-
- @Override
- public Option getStartupModeOption() {
- return null;
- }
-
- @Override
- public Option getStopModeOption() {
- return null;
- }
-
- @Override
- public SourceConfig.Factory createSourceConfigFactory(ReadonlyConfig config) {
- return null;
- }
-
- @Override
- public DebeziumDeserializationSchema createDebeziumDeserializationSchema(
- ReadonlyConfig config) {
- return null;
- }
-
- @Override
- public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) {
- return null;
- }
-
- @Override
- public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java
deleted file mode 100644
index 60d42347ef60..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class TiDBIncrementalSourceFactory implements TableSourceFactory {
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- *
For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- @Override
- public String factoryIdentifier() {
- return TiDBIncrementalSource.IDENTIFIER;
- }
-
- /**
- * Returns the rule for options.
- *
- *
1. Used to verify whether the parameters configured by the user conform to the rules of
- * the options;
- *
- *
2. Used for Web-UI to prompt user to configure option value;
- */
- @Override
- public OptionRule optionRule() {
- return null;
- }
-
- /**
- * TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource,
- * Then deprecated the method
- */
- @Override
- public Class extends SeaTunnelSource> getSourceClass() {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
new file mode 100644
index 000000000000..ddfea4ccce79
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceCheckpointState;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TiDBSource
+ implements SeaTunnelSource,
+ SupportParallelism,
+ SupportColumnProjection {
+
+ static final String IDENTIFIER = "TiDB-CDC";
+
+ private TiDBSourceConfig config;
+ private final CatalogTable catalogTable;
+
+ public TiDBSource(ReadonlyConfig config, CatalogTable catalogTable) {
+
+ this.config =
+ TiDBSourceConfig.builder()
+ .startupMode(config.get(TiDBSourceOptions.STARTUP_MODE))
+ .databaseName(config.get(TiDBSourceOptions.DATABASE_NAME))
+ .tableName(config.get(TiDBSourceOptions.TABLE_NAME))
+ .tiConfiguration(TiDBSourceOptions.getTiConfiguration(config))
+ .build();
+ this.catalogTable = catalogTable;
+ }
+
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ *
For consistency, an identifier should be declared as one lower case word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ @Override
+ public String getPluginName() {
+ return IDENTIFIER;
+ }
+
+ /**
+ * Get the boundedness of this source.
+ *
+ * @return the boundedness of this source.
+ */
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.UNBOUNDED;
+ }
+
+ /**
+ * Create source reader, used to produce data.
+ *
+ * @param context reader context.
+ * @return source reader.
+ * @throws Exception when create reader failed.
+ */
+ @Override
+ public SourceReader createReader(SourceReader.Context context)
+ throws Exception {
+ return new TiDBSourceReader(context, config, catalogTable);
+ }
+
+ /**
+ * Create source split enumerator, used to generate splits. This method will be called only once
+ * when start a source.
+ *
+ * @param context enumerator context.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ @Override
+ public SourceSplitEnumerator createEnumerator(
+ SourceSplitEnumerator.Context context) throws Exception {
+ return new TiDBSourceSplitEnumerator(context, config);
+ }
+
+ /**
+ * Create source split enumerator, used to generate splits. This method will be called when
+ * restore from checkpoint.
+ *
+ * @param context enumerator context.
+ * @param checkpointState checkpoint state.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ @Override
+ public SourceSplitEnumerator restoreEnumerator(
+ SourceSplitEnumerator.Context context,
+ TiDBSourceCheckpointState checkpointState)
+ throws Exception {
+ return new TiDBSourceSplitEnumerator(context, config, checkpointState);
+ }
+
+ @Override
+ public List getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
new file mode 100644
index 000000000000..39d8b03739b9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class TiDBSourceFactory implements TableSourceFactory {
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ *
For consistency, an identifier should be declared as one lower case word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ @Override
+ public String factoryIdentifier() {
+ return TiDBSource.IDENTIFIER;
+ }
+
+ /**
+ * Returns the rule for options.
+ *
+ *
1. Used to verify whether the parameters configured by the user conform to the rules of
+ * the options;
+ *
+ *
2. Used for Web-UI to prompt user to configure option value;
+ */
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ TiDBSourceOptions.DATABASE_NAME,
+ TiDBSourceOptions.TABLE_NAME,
+ TiDBSourceOptions.PD_ADDRESSES)
+ .optional(
+ TiDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY,
+ TiDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY,
+ TiDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT,
+ TiDBSourceOptions.TIKV_GRPC_TIMEOUT,
+ TiDBSourceOptions.STARTUP_MODE)
+ .build();
+ }
+
+ /**
+ * TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource,
+ * Then deprecated the method
+ */
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return TiDBSource.class;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public
+ TableSource createSource(TableSourceFactoryContext context) {
+ return () -> {
+ ReadonlyConfig config = context.getOptions();
+ TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
+ // Build tidb catalog.
+ TiDBCatalog catalog =
+ (TiDBCatalog) catalogFactory.createCatalog(factoryIdentifier(), config);
+
+ TablePath tablePath =
+ TablePath.of(
+ config.get(TiDBSourceOptions.DATABASE_NAME),
+ config.get(TiDBSourceOptions.TABLE_NAME));
+ CatalogTable catalogTable = catalog.getTable(tablePath);
+ return (SeaTunnelSource)
+ new TiDBSource(context.getOptions(), catalogTable);
+ };
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceOptions.java
deleted file mode 100644
index ce1348471e23..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceOptions.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
-
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
-
-import org.tikv.common.ConfigUtils;
-import org.tikv.common.TiConfiguration;
-
-import java.util.Map;
-
-/** TiDB source options */
-public class TiDBSourceOptions {
-
- public static final SingleChoiceOption DATABASE_NAME =
- (SingleChoiceOption)
- Options.key("database-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Database name of the TiDB server to monitor.");
-
- public static final SingleChoiceOption TABLE_NAME =
- (SingleChoiceOption)
- Options.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name of the TiDB database to monitor.");
-
- public static final SingleChoiceOption SCAN_STARTUP_MODE =
- (SingleChoiceOption)
- Options.key("scan.startup.mode")
- .stringType()
- .defaultValue("initial")
- .withDescription(
- "Optional startup mode for TiDB CDC consumer, valid enumerations are "
- + "\"initial\", \"latest-offset\"");
-
- public static final SingleChoiceOption PD_ADDRESSES =
- (SingleChoiceOption)
- Options.key("pd-addresses")
- .stringType()
- .noDefaultValue()
- .withDescription("TiKV cluster's PD address");
-
- public static final SingleChoiceOption TIKV_GRPC_TIMEOUT =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC timeout in ms");
-
- public static final SingleChoiceOption TIKV_GRPC_SCAN_TIMEOUT =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC scan timeout in ms");
-
- public static final SingleChoiceOption TIKV_BATCH_GET_CONCURRENCY =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch get concurrency");
-
- public static final SingleChoiceOption TIKV_BATCH_SCAN_CONCURRENCY =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch scan concurrency");
-
- public static TiConfiguration getTiConfiguration(
- final String pdAddrsStr, final Map options) {
-
- final ReadonlyConfig configuration = ReadonlyConfig.fromMap(options);
-
- final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
- configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
- configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
- configuration
- .getOptional(TIKV_BATCH_GET_CONCURRENCY)
- .ifPresent(tiConf::setBatchGetConcurrency);
-
- configuration
- .getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
- .ifPresent(tiConf::setBatchScanConcurrency);
- return tiConf;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java
new file mode 100644
index 000000000000..faa174ae2ca1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
+
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+
+import org.tikv.common.TiConfiguration;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode
+public class TiDBSourceConfig {
+ private String databaseName;
+ private String tableName;
+ private StartupMode startupMode;
+ private TiConfiguration tiConfiguration;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
new file mode 100644
index 000000000000..0313c794f10e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+
+import org.tikv.common.ConfigUtils;
+import org.tikv.common.TiConfiguration;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/** TiDB source options */
+public class TiDBSourceOptions implements Serializable {
+
+ public static final Option DATABASE_NAME =
+ Options.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the TiDB server to monitor.");
+
+ public static final Option TABLE_NAME =
+ Options.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the database to monitor.");
+
+ public static final Option STARTUP_MODE =
+ Options.key(SourceOptions.STARTUP_MODE_KEY)
+ .singleChoice(
+ StartupMode.class,
+ Arrays.asList(StartupMode.INITIAL, StartupMode.SPECIFIC))
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source, valid enumerations are "
+ + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");
+
+ public static final Option PD_ADDRESSES =
+ Options.key("pd-addresses")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("TiKV cluster's PD address");
+
+ public static final Option TIKV_GRPC_TIMEOUT =
+ Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC timeout in ms");
+
+ public static final Option TIKV_GRPC_SCAN_TIMEOUT =
+ Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC scan timeout in ms");
+
+ public static final Option TIKV_BATCH_GET_CONCURRENCY =
+ Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch get concurrency");
+
+ public static final Option TIKV_BATCH_SCAN_CONCURRENCY =
+ Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch scan concurrency");
+
+ public static TiConfiguration getTiConfiguration(final ReadonlyConfig configuration) {
+ final String pdAddrsStr = configuration.get(PD_ADDRESSES);
+ final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
+ configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
+ configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
+ configuration
+ .getOptional(TIKV_BATCH_GET_CONCURRENCY)
+ .ifPresent(tiConf::setBatchGetConcurrency);
+
+ configuration
+ .getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
+ .ifPresent(tiConf::setBatchScanConcurrency);
+ return tiConf;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java
new file mode 100644
index 000000000000..1f1208917c6f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.tikv.common.meta.TiTableInfo;
+
+public interface DataConverter {
+
+ SeaTunnelRow convert(T object, TiTableInfo tableInfo, SeaTunnelRowType rowType)
+ throws Exception;
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java
new file mode 100644
index 000000000000..733bb50a280b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.tikv.common.meta.TiTableInfo;
+import org.tikv.common.types.DataType;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultDataConverter implements DataConverter