catalogTables) {
- super(options, dataType, catalogTables);
- }
-
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- @Override
- public String getPluginName() {
- return IDENTIFIER;
- }
-
- @Override
- public Option getStartupModeOption() {
- return TiDBSourceOptions.STARTUP_MODE;
- }
-
- @Override
- public Option getStopModeOption() {
- return TiDBSourceOptions.STOP_MODE;
- }
-
- @Override
- public SourceConfig.Factory createSourceConfigFactory(
- @Nonnull ReadonlyConfig config) {
- TiDBSourceConfigProvider.Builder builder = new TiDBSourceConfigProvider.Builder();
- builder.databaseName(config.get(TiDBSourceOptions.DATABASE_NAME));
- builder.tableName(config.get(TiDBSourceOptions.TABLE_NAME));
- builder.startupConfig(startupConfig);
- builder.stopConfig(stopConfig);
- TiConfiguration configuration =
- TiDBSourceOptions.getTiConfiguration(
- config.get(TiDBSourceOptions.PD_ADDRESSES), config);
- builder.tiConfiguration(configuration);
- builder.parallelism(incrementalParallelism);
- return builder;
- }
-
- @Override
- public DebeziumDeserializationSchema createDebeziumDeserializationSchema(
- ReadonlyConfig config) {
- return null;
- }
-
- @Override
- public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) {
- return new TiDBDialect();
- }
-
- @Override
- public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java
deleted file mode 100644
index 7d0f6d4b8b3a..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBIncrementalSourceFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceSplit;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.connector.TableSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
-
-import com.google.auto.service.AutoService;
-
-import java.io.Serializable;
-import java.util.List;
-
-@AutoService(Factory.class)
-public class TiDBIncrementalSourceFactory implements TableSourceFactory {
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- @Override
- public String factoryIdentifier() {
- return TiDBIncrementalSource.IDENTIFIER;
- }
-
- /**
- * Returns the rule for options.
- *
- *
1. Used to verify whether the parameters configured by the user conform to the rules of
- * the options;
- *
- *
2. Used for Web-UI to prompt user to configure option value;
- */
- @Override
- public OptionRule optionRule() {
- return TiDBSourceOptions.getBaseRule()
- .required(
- TiDBSourceOptions.DATABASE_NAME,
- TiDBSourceOptions.TABLE_NAME,
- TiDBSourceOptions.PD_ADDRESSES)
- .optional(
- TiDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY,
- TiDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY,
- TiDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT,
- TiDBSourceOptions.TIKV_GRPC_TIMEOUT,
- TiDBSourceOptions.STARTUP_MODE,
- TiDBSourceOptions.STOP_MODE)
- .build();
- }
-
- /**
- * TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource,
- * Then deprecated the method
- */
- @Override
- public Class extends SeaTunnelSource> getSourceClass() {
- return TiDBIncrementalSource.class;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public
- TableSource createSource(TableSourceFactoryContext context) {
- return () -> {
- List catalogTables =
- CatalogTableUtil.getCatalogTables(
- context.getOptions(), context.getClassLoader());
- SeaTunnelDataType dataType =
- CatalogTableUtil.convertToMultipleRowType(catalogTables);
- return (SeaTunnelSource)
- new TiDBIncrementalSource<>(context.getOptions(), dataType, catalogTables);
- };
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
index 257cbbfe0cec..ddfea4ccce79 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java
@@ -33,12 +33,15 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
+import java.util.Collections;
+import java.util.List;
+
public class TiDBSource
implements SeaTunnelSource,
SupportParallelism,
SupportColumnProjection {
- static final String IDENTIFIER = "TIDB-CDC";
+ static final String IDENTIFIER = "TiDB-CDC";
private TiDBSourceConfig config;
private final CatalogTable catalogTable;
@@ -120,4 +123,9 @@ public SourceSplitEnumerator restore
throws Exception {
return new TiDBSourceSplitEnumerator(context, config, checkpointState);
}
+
+ @Override
+ public List getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
+ }
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
index 1660dd3e0f97..39d8b03739b9 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java
@@ -59,7 +59,7 @@ public String factoryIdentifier() {
*/
@Override
public OptionRule optionRule() {
- return TiDBSourceOptions.getBaseRule()
+ return OptionRule.builder()
.required(
TiDBSourceOptions.DATABASE_NAME,
TiDBSourceOptions.TABLE_NAME,
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfigProvider.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfigProvider.java
deleted file mode 100644
index b14d0128cfc3..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfigProvider.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
-
-import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
-import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
-import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
-
-import org.tikv.common.TiConfiguration;
-
-import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
-
-public class TiDBSourceConfigProvider {
- private TiDBSourceConfigProvider() {}
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- public static class Builder implements SourceConfig.Factory {
- private String databaseName;
- private String tableName;
- private StartupConfig startupConfig;
- private StopConfig stopConfig;
- private int parallelism;
- private TiConfiguration tiConfiguration;
-
- public Builder databaseName(String databaseName) {
- this.databaseName = databaseName;
- return this;
- }
-
- public Builder tableName(String tableName) {
- this.tableName = tableName;
- return this;
- }
-
- public Builder startupConfig(StartupConfig startupConfig) {
- this.startupConfig = startupConfig;
- return this;
- }
-
- public Builder stopConfig(StopConfig stopConfig) {
- this.stopConfig = stopConfig;
- return this;
- }
-
- public Builder parallelism(int parallelism) {
- this.parallelism = parallelism;
- return this;
- }
-
- public Builder tiConfiguration(TiConfiguration tiConfiguration) {
- this.tiConfiguration = tiConfiguration;
- return this;
- }
-
- public Builder validate() {
- checkNotNull(databaseName, "databaseName must be provided");
- checkNotNull(tableName, "tableName must be provided");
- checkNotNull(tiConfiguration, "tiConfiguration must be provided");
- return this;
- }
-
- @Override
- public TiDBSourceConfig create(int subtask) {
- return new TiDBSourceConfig(
- databaseName,
- tableName,
- startupConfig,
- stopConfig,
- parallelism,
- tiConfiguration);
- }
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
index 5a0646a14aaa..0313c794f10e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java
@@ -17,36 +17,34 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;
+import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
-import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.tikv.common.ConfigUtils;
import org.tikv.common.TiConfiguration;
+import java.io.Serializable;
import java.util.Arrays;
/** TiDB source options */
-public class TiDBSourceOptions extends JdbcSourceOptions {
+public class TiDBSourceOptions implements Serializable {
- public static final SingleChoiceOption DATABASE_NAME =
- (SingleChoiceOption)
- Options.key("database-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Database name of the TiDB server to monitor.");
+ public static final Option DATABASE_NAME =
+ Options.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the TiDB server to monitor.");
- public static final SingleChoiceOption TABLE_NAME =
- (SingleChoiceOption)
- Options.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name of the database to monitor.");
+ public static final Option TABLE_NAME =
+ Options.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the database to monitor.");
- public static final SingleChoiceOption STARTUP_MODE =
+ public static final Option STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
@@ -56,40 +54,35 @@ public class TiDBSourceOptions extends JdbcSourceOptions {
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");
- public static final SingleChoiceOption PD_ADDRESSES =
- (SingleChoiceOption)
- Options.key("pd-addresses")
- .stringType()
- .noDefaultValue()
- .withDescription("TiKV cluster's PD address");
+ public static final Option PD_ADDRESSES =
+ Options.key("pd-addresses")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("TiKV cluster's PD address");
- public static final SingleChoiceOption TIKV_GRPC_TIMEOUT =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC timeout in ms");
+ public static final Option TIKV_GRPC_TIMEOUT =
+ Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC timeout in ms");
- public static final SingleChoiceOption TIKV_GRPC_SCAN_TIMEOUT =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC scan timeout in ms");
+ public static final Option TIKV_GRPC_SCAN_TIMEOUT =
+ Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
+ .longType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC scan timeout in ms");
- public static final SingleChoiceOption TIKV_BATCH_GET_CONCURRENCY =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch get concurrency");
+ public static final Option TIKV_BATCH_GET_CONCURRENCY =
+ Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch get concurrency");
- public static final SingleChoiceOption TIKV_BATCH_SCAN_CONCURRENCY =
- (SingleChoiceOption)
- Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch scan concurrency");
+ public static final Option TIKV_BATCH_SCAN_CONCURRENCY =
+ Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
+ .intType()
+ .noDefaultValue()
+ .withDescription("TiKV GRPC batch scan concurrency");
public static TiConfiguration getTiConfiguration(final ReadonlyConfig configuration) {
final String pdAddrsStr = configuration.get(PD_ADDRESSES);
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/dialect/TiDBDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/dialect/TiDBDialect.java
deleted file mode 100644
index 682b61a2340c..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/dialect/TiDBDialect.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.dialect;
-
-import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
-import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch.TiDBFetchTaskContext;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch.TiDBScanFetchTask;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch.TiDBStreamFetchTask;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.splitter.TiDBChunkSplitter;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableDiscoveryUtils;
-
-import org.tikv.common.TiSession;
-
-import io.debezium.relational.TableId;
-
-import java.util.List;
-
-import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier.TIDB;
-
-public class TiDBDialect implements DataSourceDialect {
-
- /** Get the name of dialect. */
- @Override
- public String getName() {
- return TIDB;
- }
-
- /**
- * Discovers the list of data collection to capture.
- *
- * @param sourceConfig
- */
- @Override
- public List discoverDataCollections(TiDBSourceConfig sourceConfig) {
- TiSession tiSession = getTiSession(sourceConfig);
- return TableDiscoveryUtils.listTables(tiSession, sourceConfig);
- }
-
- private TiSession getTiSession(TiDBSourceConfig sourceConfig) {
- return TiSession.create(sourceConfig.getTiConfiguration());
- }
-
- /**
- * Check if the CollectionId is case-sensitive or not.
- *
- * @param sourceConfig
- */
- @Override
- public boolean isDataCollectionIdCaseSensitive(TiDBSourceConfig sourceConfig) {
- return true;
- }
-
- /**
- * Returns the {@link ChunkSplitter} which used to split collection to splits.
- *
- * @param sourceConfig
- */
- @Override
- public ChunkSplitter createChunkSplitter(TiDBSourceConfig sourceConfig) {
- return new TiDBChunkSplitter(sourceConfig);
- }
-
- /**
- * The fetch task used to fetch data of a snapshot split or incremental split.
- *
- * @param sourceSplitBase
- */
- @Override
- public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) {
- if (sourceSplitBase.isSnapshotSplit()) {
- return new TiDBScanFetchTask(sourceSplitBase.asSnapshotSplit());
- } else {
- return new TiDBStreamFetchTask(sourceSplitBase.asIncrementalSplit());
- }
- }
-
- /**
- * The task context used for fetch task to fetch data from external systems.
- *
- * @param sourceSplitBase
- * @param sourceConfig
- */
- @Override
- public FetchTask.Context createFetchTaskContext(
- SourceSplitBase sourceSplitBase, TiDBSourceConfig sourceConfig) {
- return new TiDBFetchTaskContext();
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBFetchTaskContext.java
deleted file mode 100644
index dfc31ccf5ee7..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBFetchTaskContext.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch;
-
-import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-
-import io.debezium.connector.base.ChangeEventQueue;
-import io.debezium.pipeline.DataChangeEvent;
-import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class TiDBFetchTaskContext implements FetchTask.Context {
-
- @Override
- public void configure(SourceSplitBase sourceSplitBase) {}
-
- @Override
- public ChangeEventQueue getQueue() {
- return null;
- }
-
- @Override
- public TableId getTableId(SourceRecord record) {
- return null;
- }
-
- @Override
- public Tables.TableFilter getTableFilter() {
- return null;
- }
-
- @Override
- public boolean isExactlyOnce() {
- return false;
- }
-
- @Override
- public Offset getStreamOffset(SourceRecord record) {
- return null;
- }
-
- @Override
- public boolean isDataChangeRecord(SourceRecord record) {
- return false;
- }
-
- @Override
- public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
- return false;
- }
-
- @Override
- public void rewriteOutputBuffer(
- Map outputBuffer, SourceRecord changeRecord) {}
-
- @Override
- public List formatMessageTimestamp(Collection snapshotRecords) {
- return null;
- }
-
- @Override
- public void close() {}
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBScanFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBScanFetchTask.java
deleted file mode 100644
index fc36100e2c07..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBScanFetchTask.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch;
-
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-
-public class TiDBScanFetchTask implements FetchTask {
-
- private final SnapshotSplit snapshotSplit;
-
- private volatile boolean taskRunning = false;
-
- public TiDBScanFetchTask(SnapshotSplit snapshotSplit) {
- this.snapshotSplit = snapshotSplit;
- }
- /**
- * Execute current task.
- *
- * @param context
- */
- @Override
- public void execute(Context context) throws Exception {
- TiDBFetchTaskContext taskContext = (TiDBFetchTaskContext) context;
- }
-
- /** Returns current task is running or not. */
- @Override
- public boolean isRunning() {
- return taskRunning;
- }
-
- /** Close this task */
- @Override
- public void shutdown() {}
-
- /** Returns the split that the task used. */
- @Override
- public SourceSplitBase getSplit() {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBStreamFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBStreamFetchTask.java
deleted file mode 100644
index 42dbe84f4772..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/fetch/TiDBStreamFetchTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.fetch;
-
-import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-
-public class TiDBStreamFetchTask implements FetchTask {
-
- private final IncrementalSplit incrementalSplit;
-
- public TiDBStreamFetchTask(IncrementalSplit incrementalSplit) {
- this.incrementalSplit = incrementalSplit;
- }
- /**
- * Execute current task.
- *
- * @param context
- */
- @Override
- public void execute(Context context) throws Exception {}
-
- /** Returns current task is running or not. */
- @Override
- public boolean isRunning() {
- return false;
- }
-
- /** Close this task */
- @Override
- public void shutdown() {}
-
- /** Returns the split that the task used. */
- @Override
- public SourceSplitBase getSplit() {
- return null;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java
index 84dbcff57eda..8e36a62fd93b 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader;
import org.tikv.common.key.RowKey;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/splitter/TiDBChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/splitter/TiDBChunkSplitter.java
deleted file mode 100644
index 34e3db76434e..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/splitter/TiDBChunkSplitter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.splitter;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
-import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;
-
-import org.tikv.common.TiSession;
-import org.tikv.common.meta.TiTableInfo;
-import org.tikv.kvproto.Coprocessor;
-
-import io.debezium.relational.TableId;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collection;
-import java.util.List;
-
-public class TiDBChunkSplitter implements ChunkSplitter {
-
- private final TiDBSourceConfig sourceConfig;
-
- private transient TiSession session;
-
- public TiDBChunkSplitter(TiDBSourceConfig sourceConfig) {
- this.sourceConfig = sourceConfig;
- session = TiSession.create(sourceConfig.getTiConfiguration());
- }
-
- /**
- * Generates all snapshot splits (chunks) for the give data collection.
- *
- * @param tableId
- */
- @Override
- public Collection generateSplits(TableId tableId) {
- TiTableInfo tableInfo =
- session.getCatalog()
- .getTable(sourceConfig.getDatabaseName(), sourceConfig.getTableName());
- List keyRanges =
- TableKeyRangeUtils.getTableKeyRanges(
- tableInfo.getId(), sourceConfig.getParallelism());
- return generateSnapshotSplits(tableId, keyRanges);
- }
-
- private Collection generateSnapshotSplits(
- TableId tableId, List keyRanges) {
- List snapshotSplits = Lists.newArrayList();
- for (Coprocessor.KeyRange keyRange : keyRanges) {
- snapshotSplits.add(
- new SnapshotSplit(
- splitId(tableId, keyRange),
- tableId,
- null,
- new Object[] {keyRange},
- new Object[] {keyRange}));
- }
- return snapshotSplits;
- }
-
- private String splitId(@Nonnull TableId tableId, Coprocessor.KeyRange keyRange) {
- return String.format(
- "%s:%s:%s", tableId.identifier(), keyRange.getStart(), keyRange.getEnd());
- }
-}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java
deleted file mode 100644
index 4e9602566713..000000000000
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableDiscoveryUtils.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
-import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
-
-import org.tikv.common.TiSession;
-import org.tikv.common.meta.TiDBInfo;
-import org.tikv.common.meta.TiTableInfo;
-
-import io.debezium.relational.TableId;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class TableDiscoveryUtils {
-
- public static List listTables(TiSession tiSession, TiDBSourceConfig sourceConfig) {
- List databases =
- tiSession.getCatalog().listDatabases().stream()
- .filter(
- tiDBInfo ->
- tiDBInfo.getName().equals(sourceConfig.getDatabaseName()))
- .collect(Collectors.toList());
- List tableIds = Lists.newArrayList();
- for (TiDBInfo tiDBInfo : databases) {
- for (TiTableInfo tiTableInfo : tiDBInfo.getTables()) {
- if (tiTableInfo.getName().equals(sourceConfig.getTableName())) {
- tableIds.add(new TableId(tiDBInfo.getName(), null, tiTableInfo.getName()));
- }
- }
- }
- return tableIds;
- }
-}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a16d86cad5ae..61129cc88be1 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -109,6 +109,7 @@
3.1.3
3.4.1
1.1
+ 3.3.5
@@ -510,6 +511,12 @@
${project.version}
provided
+
+ org.apache.seatunnel
+ connector-cdc-tidb
+ ${project.version}
+ provided
+
org.apache.seatunnel
connector-cdc-postgres
@@ -690,6 +697,11 @@
${snowflake.version}
provided
+
+ org.tikv
+ tikv-client-java
+ ${tidb.version}
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
similarity index 97%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
index 5c7def860a1f..171c708a145e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
@@ -22,7 +22,7 @@
${revision}
- connector-tidb-e2e
+ connector-cdc-tidb-e2e
SeaTunnel : E2E : Connector V2 : TiDB
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
similarity index 95%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
index ed3bfa807ca0..78979b5b85b9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
@@ -19,7 +19,9 @@
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.junit.jupiter.api.AfterAll;
@@ -41,6 +43,10 @@
import static org.awaitility.Awaitility.await;
@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public class TiDBCDCIT extends TiDBTestBase implements TestResource {
private final String SQL_TEMPLATE = "select id,name,description,weight from %s.%s";
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/pd.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/pd.toml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tidb.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tidb.toml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tikv.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/config/tikv.toml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/column_all_type.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/column_all_type.sql
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql
similarity index 100%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/ddl/inventory.sql
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
similarity index 96%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
index fc8dfc2b802b..647091b18a9e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf
@@ -29,6 +29,8 @@ source {
result_table_name = "trans_tidb_all_type_cdc"
base-url = "jdbc:mysql://tidb-e2e:4000/column_type_test"
driver = "com.mysql.cj.jdbc.Driver"
+ tikv.grpc.timeout_in_ms = 20000
+ pd-addresses = "pd-e2e:2379"
user = "root"
password = "seatunnel"
database-name = "column_type_test"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
similarity index 95%
rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
index b3032b8071c5..4f42866ed603 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf
@@ -29,6 +29,8 @@ source {
result_table_name = "customers_tidb_cdc"
base-url = "jdbc:mysql://tidb-e2e:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
+ tikv.grpc.timeout_in_ms = 20000
+ pd-addresses = "pd-e2e:2379"
user = "root"
password = "seatunnel"
database-name = "inventory"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index ca333ff648fd..78c8969fb64c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -76,7 +76,7 @@
connector-hudi-e2e
connector-milvus-e2e
connector-activemq-e2e
- connector-tidb-e2e
+ connector-cdc-tidb-e2e