From baa46adc7b9cb641eb8b30cc15f92a4d34b169d1 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Fri, 26 Jul 2024 23:44:31 +0800 Subject: [PATCH] support tidb cdc --- .idea/vcs.xml | 22 +- config/plugin_config | 1 + plugin-mapping.properties | 1 + .../connector-cdc/connector-cdc-tidb/pom.xml | 70 +++ .../seatunnel/cdc/tidb/source/TiDBSource.java | 131 +++++ .../cdc/tidb/source/TiDBSourceFactory.java | 105 ++++ .../tidb/source/config/TiDBSourceConfig.java | 40 ++ .../tidb/source/config/TiDBSourceOptions.java | 101 ++++ .../tidb/source/converter/DataConverter.java | 29 + .../converter/DefaultDataConverter.java | 215 ++++++++ .../AbstractSeaTunnelRowDeserializer.java | 39 ++ ...eaTunnelRowSnapshotRecordDeserializer.java | 55 ++ ...aTunnelRowStreamingRecordDeserializer.java | 85 +++ .../enumerator/TiDBSourceCheckpointState.java | 36 ++ .../enumerator/TiDBSourceSplitEnumerator.java | 209 +++++++ .../cdc/tidb/source/reader/RowKeyWithTs.java | 74 +++ .../tidb/source/reader/TiDBSourceReader.java | 296 ++++++++++ .../tidb/source/split/TiDBSourceSplit.java | 59 ++ .../tidb/source/utils/TableKeyRangeUtils.java | 74 +++ .../src/main/java/org/tikv/cdc/CDCClient.java | 251 +++++++++ .../src/main/java/org/tikv/cdc/CDCEvent.java | 86 +++ .../java/org/tikv/cdc/RegionCDCClient.java | 257 +++++++++ seatunnel-connectors-v2/connector-cdc/pom.xml | 1 + .../connector-jdbc/pom.xml | 13 + seatunnel-dist/pom.xml | 12 + .../connector-cdc-tidb-e2e/pom.xml | 65 +++ .../e2e/connector/tidb/TiDBCDCIT.java | 157 ++++++ .../e2e/connector/tidb/TiDBTestBase.java | 206 +++++++ .../src/test/resources/config/pd.toml | 101 ++++ .../src/test/resources/config/tidb.toml | 233 ++++++++ .../src/test/resources/config/tikv.toml | 513 ++++++++++++++++++ .../test/resources/ddl/column_all_type.sql | 119 ++++ .../src/test/resources/ddl/inventory.sql | 50 ++ .../tidb/all_types_tidb_source_to_sink.conf | 56 ++ .../resources/tidb/tidb_source_to_sink.conf | 56 ++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 36 files changed, 3800 insertions(+), 19 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 73b278131839..310737b0c14c 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,24 +1,5 @@ - - - - + + + \ No newline at end of file diff --git a/config/plugin_config b/config/plugin_config index f6549168d6da..cb632e73460f 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -28,6 +28,7 @@ connector-cdc-mongodb connector-cdc-sqlserver connector-cdc-postgres connector-cdc-oracle +connector-cdc-tidb connector-clickhouse connector-datahub connector-dingtalk diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 1942f875d7c9..b20a28e03cc9 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -103,6 +103,7 @@ seatunnel.source.Maxcompute = connector-maxcompute seatunnel.sink.Maxcompute = connector-maxcompute seatunnel.source.MySQL-CDC = connector-cdc-mysql seatunnel.source.MongoDB-CDC = connector-cdc-mongodb +seatunnel.source.TiDB-CDC = connector-cdc-tidb seatunnel.sink.S3Redshift = connector-s3-redshift seatunnel.source.Web3j = connector-web3j seatunnel.source.TDengine = connector-tdengine diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml new file mode 100644 index 000000000000..7f5bdb721150 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml @@ -0,0 +1,70 @@ + + + + 4.0.0 + + org.apache.seatunnel + connector-cdc + ${revision} + + + connector-cdc-tidb + SeaTunnel : Connectors V2 : CDC : TIDB + + + + + org.apache.seatunnel + connector-cdc-base + ${project.version} + compile + + + + org.apache.seatunnel + connector-jdbc + ${project.version} + pom + import + + + + + + + + org.apache.seatunnel + connector-cdc-base + + + + org.apache.seatunnel + connector-jdbc + ${project.version} + + + + org.tikv + tikv-client-java + + + + diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java new file mode 100644 index 000000000000..ddfea4ccce79 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSource.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceCheckpointState; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit; + +import java.util.Collections; +import java.util.List; + +public class TiDBSource + implements SeaTunnelSource, + SupportParallelism, + SupportColumnProjection { + + static final String IDENTIFIER = "TiDB-CDC"; + + private TiDBSourceConfig config; + private final CatalogTable catalogTable; + + public TiDBSource(ReadonlyConfig config, CatalogTable catalogTable) { + + this.config = + TiDBSourceConfig.builder() + .startupMode(config.get(TiDBSourceOptions.STARTUP_MODE)) + .databaseName(config.get(TiDBSourceOptions.DATABASE_NAME)) + .tableName(config.get(TiDBSourceOptions.TABLE_NAME)) + .tiConfiguration(TiDBSourceOptions.getTiConfiguration(config)) + .build(); + this.catalogTable = catalogTable; + } + + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String getPluginName() { + return IDENTIFIER; + } + + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.UNBOUNDED; + } + + /** + * Create source reader, used to produce data. + * + * @param context reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ + @Override + public SourceReader createReader(SourceReader.Context context) + throws Exception { + return new TiDBSourceReader(context, config, catalogTable); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called only once + * when start a source. + * + * @param context enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context context) throws Exception { + return new TiDBSourceSplitEnumerator(context, config); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called when + * restore from checkpoint. + * + * @param context enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context context, + TiDBSourceCheckpointState checkpointState) + throws Exception { + return new TiDBSourceSplitEnumerator(context, config, checkpointState); + } + + @Override + public List getProducedCatalogTables() { + return Collections.singletonList(catalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java new file mode 100644 index 000000000000..39d8b03739b9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/TiDBSourceFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +@AutoService(Factory.class) +public class TiDBSourceFactory implements TableSourceFactory { + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String factoryIdentifier() { + return TiDBSource.IDENTIFIER; + } + + /** + * Returns the rule for options. + * + *

1. Used to verify whether the parameters configured by the user conform to the rules of + * the options; + * + *

2. Used for Web-UI to prompt user to configure option value; + */ + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + TiDBSourceOptions.DATABASE_NAME, + TiDBSourceOptions.TABLE_NAME, + TiDBSourceOptions.PD_ADDRESSES) + .optional( + TiDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY, + TiDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY, + TiDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT, + TiDBSourceOptions.TIKV_GRPC_TIMEOUT, + TiDBSourceOptions.STARTUP_MODE) + .build(); + } + + /** + * TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource, + * Then deprecated the method + */ + @Override + public Class getSourceClass() { + return TiDBSource.class; + } + + @SuppressWarnings("unchecked") + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> { + ReadonlyConfig config = context.getOptions(); + TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory(); + // Build tidb catalog. + TiDBCatalog catalog = + (TiDBCatalog) catalogFactory.createCatalog(factoryIdentifier(), config); + + TablePath tablePath = + TablePath.of( + config.get(TiDBSourceOptions.DATABASE_NAME), + config.get(TiDBSourceOptions.TABLE_NAME)); + CatalogTable catalogTable = catalog.getTable(tablePath); + return (SeaTunnelSource) + new TiDBSource(context.getOptions(), catalogTable); + }; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java new file mode 100644 index 000000000000..faa174ae2ca1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config; + +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; + +import org.tikv.common.TiConfiguration; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode +public class TiDBSourceConfig { + private String databaseName; + private String tableName; + private StartupMode startupMode; + private TiConfiguration tiConfiguration; +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java new file mode 100644 index 000000000000..0313c794f10e --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/config/TiDBSourceOptions.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; + +import org.tikv.common.ConfigUtils; +import org.tikv.common.TiConfiguration; + +import java.io.Serializable; +import java.util.Arrays; + +/** TiDB source options */ +public class TiDBSourceOptions implements Serializable { + + public static final Option DATABASE_NAME = + Options.key("database-name") + .stringType() + .noDefaultValue() + .withDescription("Database name of the TiDB server to monitor."); + + public static final Option TABLE_NAME = + Options.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of the database to monitor."); + + public static final Option STARTUP_MODE = + Options.key(SourceOptions.STARTUP_MODE_KEY) + .singleChoice( + StartupMode.class, + Arrays.asList(StartupMode.INITIAL, StartupMode.SPECIFIC)) + .defaultValue(StartupMode.INITIAL) + .withDescription( + "Optional startup mode for CDC source, valid enumerations are " + + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\""); + + public static final Option PD_ADDRESSES = + Options.key("pd-addresses") + .stringType() + .noDefaultValue() + .withDescription("TiKV cluster's PD address"); + + public static final Option TIKV_GRPC_TIMEOUT = + Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT) + .longType() + .noDefaultValue() + .withDescription("TiKV GRPC timeout in ms"); + + public static final Option TIKV_GRPC_SCAN_TIMEOUT = + Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT) + .longType() + .noDefaultValue() + .withDescription("TiKV GRPC scan timeout in ms"); + + public static final Option TIKV_BATCH_GET_CONCURRENCY = + Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY) + .intType() + .noDefaultValue() + .withDescription("TiKV GRPC batch get concurrency"); + + public static final Option TIKV_BATCH_SCAN_CONCURRENCY = + Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY) + .intType() + .noDefaultValue() + .withDescription("TiKV GRPC batch scan concurrency"); + + public static TiConfiguration getTiConfiguration(final ReadonlyConfig configuration) { + final String pdAddrsStr = configuration.get(PD_ADDRESSES); + final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr); + configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout); + configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout); + configuration + .getOptional(TIKV_BATCH_GET_CONCURRENCY) + .ifPresent(tiConf::setBatchGetConcurrency); + + configuration + .getOptional(TIKV_BATCH_SCAN_CONCURRENCY) + .ifPresent(tiConf::setBatchScanConcurrency); + return tiConf; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java new file mode 100644 index 000000000000..1f1208917c6f --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DataConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.tikv.common.meta.TiTableInfo; + +public interface DataConverter { + + SeaTunnelRow convert(T object, TiTableInfo tableInfo, SeaTunnelRowType rowType) + throws Exception; +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java new file mode 100644 index 000000000000..733bb50a280b --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/converter/DefaultDataConverter.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; + +import org.tikv.common.meta.TiTableInfo; +import org.tikv.common.types.DataType; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +public class DefaultDataConverter implements DataConverter { + + @Override + public SeaTunnelRow convert(Object value, TiTableInfo tableInfo, SeaTunnelRowType rowType) + throws Exception { + Object[] fields = new Object[rowType.getTotalFields()]; + Map fromTypes = new HashMap<>(); + tableInfo + .getColumns() + .forEach( + tiColumnInfo -> { + fromTypes.put(tiColumnInfo.getName(), tiColumnInfo.getType()); + }); + for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = rowType.getFieldType(fieldIndex); + String fieldName = rowType.getFieldName(fieldIndex); + switch (seaTunnelDataType.getSqlType()) { + case NULL: + fields[fieldIndex] = null; + break; + case STRING: + fields[fieldIndex] = convertToString(value); + break; + case BOOLEAN: + fields[fieldIndex] = convertToBoolean(value); + break; + case TINYINT: + fields[fieldIndex] = Byte.parseByte(value.toString()); + break; + case SMALLINT: + fields[fieldIndex] = Short.parseShort(value.toString()); + break; + case INT: + fields[fieldIndex] = convertToInt(value, fromTypes.get(fieldName)); + break; + case BIGINT: + fields[fieldIndex] = convertToLong(value); + break; + case FLOAT: + fields[fieldIndex] = convertToFloat(value); + break; + case DOUBLE: + fields[fieldIndex] = convertToDouble(value); + break; + case DECIMAL: + fields[fieldIndex] = createDecimalConverter(value); + break; + case DATE: + fields[fieldIndex] = LocalDate.class.cast(value); + break; + case TIME: + fields[fieldIndex] = LocalTime.class.cast(value); + break; + case TIMESTAMP: + fields[fieldIndex] = LocalDateTime.class.cast(value); + break; + case BYTES: + fields[fieldIndex] = convertToBinary(value); + break; + case ARRAY: + fields[fieldIndex] = convertToArray(value); + break; + case MAP: + case ROW: + default: + throw CommonError.unsupportedDataType( + "SeaTunnel", seaTunnelDataType.getSqlType().toString(), fieldName); + } + } + return new SeaTunnelRow(fields); + } + + private static Object convertToBoolean(Object object) { + if (object instanceof Boolean) { + return object; + } else if (object instanceof Long) { + return (Long) object == 1; + } else if (object instanceof Byte) { + return (byte) object == 1; + } else if (object instanceof Short) { + return (short) object == 1; + } else { + return Boolean.parseBoolean(object.toString()); + } + } + + private static Object convertToInt(Object value, org.tikv.common.types.DataType dataType) { + if (value instanceof Integer) { + return value; + } else if (value instanceof Long) { + return dataType.isUnsigned() + ? Integer.valueOf(Short.toUnsignedInt(((Long) value).shortValue())) + : ((Long) value).intValue(); + } else { + return Integer.parseInt(value.toString()); + } + } + + private static Object convertToLong(Object value) { + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } else if (value instanceof Long) { + return value; + } else { + return Long.parseLong(value.toString()); + } + } + + private static Object convertToDouble(Object value) { + if (value instanceof Float) { + return ((Float) value).doubleValue(); + } else if (value instanceof Double) { + return value; + } else { + return Double.parseDouble(value.toString()); + } + } + + private static Object convertToFloat(Object value) { + if (value instanceof Float) { + return value; + } else if (value instanceof Double) { + return ((Double) value).floatValue(); + } else { + return Float.parseFloat(value.toString()); + } + } + + private static Object createDecimalConverter(Object value) { + BigDecimal result; + if (value instanceof String) { + result = new BigDecimal((String) value); + } else if (value instanceof Long) { + result = new BigDecimal((String) value); + } else if (value instanceof Double) { + result = BigDecimal.valueOf((Double) value); + } else if (value instanceof BigDecimal) { + result = (BigDecimal) value; + } else { + throw new IllegalArgumentException( + "Unable to convert to decimal from unexpected value '" + + value + + "' of type " + + value.getClass()); + } + return result; + } + + public Object[] convertToArray(Object value) throws SQLException { + String[] array = ((String) value).split(","); + if (array == null) { + return null; + } + return array; + } + + private static Object convertToBinary(Object value) { + if (value instanceof byte[]) { + return value; + } else if (value instanceof String) { + return ((String) value).getBytes(); + } else if (value instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) value; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + value.getClass().getSimpleName()); + } + } + + private static Object convertToString(Object value) { + if (value instanceof byte[]) { + return new String((byte[]) value); + } + return value; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java new file mode 100644 index 000000000000..77654239d1e5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/AbstractSeaTunnelRowDeserializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.tikv.common.meta.TiTableInfo; + +public abstract class AbstractSeaTunnelRowDeserializer { + protected final TiTableInfo tableInfo; + protected final SeaTunnelRowType rowType; + protected final CatalogTable catalogTable; + + protected AbstractSeaTunnelRowDeserializer(TiTableInfo tableInfo, CatalogTable catalogTable) { + this.tableInfo = tableInfo; + this.rowType = catalogTable.getTableSchema().toPhysicalRowDataType(); + this.catalogTable = catalogTable; + } + + abstract void deserialize(Input record, Collector output) throws Exception; +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java new file mode 100644 index 000000000000..06616b6c4ff6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowSnapshotRecordDeserializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DataConverter; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DefaultDataConverter; + +import org.tikv.common.key.RowKey; +import org.tikv.common.meta.TiTableInfo; +import org.tikv.kvproto.Kvrpcpb; + +import static org.tikv.common.codec.TableCodec.decodeObjects; + +/** Deserialize snapshot data */ +public class SeaTunnelRowSnapshotRecordDeserializer + extends AbstractSeaTunnelRowDeserializer { + + private final DataConverter converter; + + public SeaTunnelRowSnapshotRecordDeserializer( + TiTableInfo tableInfo, CatalogTable catalogTable) { + super(tableInfo, catalogTable); + this.converter = new DefaultDataConverter(); + } + + @Override + public void deserialize(Kvrpcpb.KvPair record, Collector output) + throws Exception { + Object[] values = + decodeObjects( + record.getValue().toByteArray(), + RowKey.decode(record.getKey().toByteArray()).getHandle(), + tableInfo); + SeaTunnelRow row = converter.convert(values, tableInfo, rowType); + output.collect(row); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java new file mode 100644 index 000000000000..5e286e591e27 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/deserializer/SeaTunnelRowStreamingRecordDeserializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DataConverter; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DefaultDataConverter; + +import org.tikv.common.key.RowKey; +import org.tikv.common.meta.TiTableInfo; +import org.tikv.kvproto.Cdcpb; + +import static org.tikv.common.codec.TableCodec.decodeObjects; + +public class SeaTunnelRowStreamingRecordDeserializer + extends AbstractSeaTunnelRowDeserializer { + + private final DataConverter converter; + + public SeaTunnelRowStreamingRecordDeserializer( + TiTableInfo tableInfo, CatalogTable catalogTable) { + super(tableInfo, catalogTable); + converter = new DefaultDataConverter(); + } + + @Override + public void deserialize(Cdcpb.Event.Row row, Collector output) throws Exception { + + final RowKey rowKey = RowKey.decode(row.getKey().toByteArray()); + final long handle = rowKey.getHandle(); + Object[] values; + switch (row.getOpType()) { + case DELETE: + values = decodeObjects(row.getOldValue().toByteArray(), handle, tableInfo); + SeaTunnelRow record = converter.convert(values, tableInfo, rowType); + record.setRowKind(RowKind.DELETE); + output.collect(record); + break; + case PUT: + try { + values = + decodeObjects( + row.getValue().toByteArray(), + RowKey.decode(row.getKey().toByteArray()).getHandle(), + tableInfo); + if (row.getOldValue() == null || row.getOldValue().isEmpty()) { + SeaTunnelRow insert = converter.convert(values, tableInfo, rowType); + insert.setRowKind(RowKind.INSERT); + output.collect(insert); + } else { + SeaTunnelRow update = converter.convert(values, tableInfo, rowType); + update.setRowKind(RowKind.UPDATE_AFTER); + output.collect(update); + } + break; + } catch (final RuntimeException e) { + throw new RuntimeException( + String.format( + "Fail to deserialize row: %s, table: %s", + row, tableInfo.getId()), + e); + } + default: + throw new IllegalArgumentException("Unknown Row Op Type: " + row.getOpType()); + } + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java new file mode 100644 index 000000000000..94b1b3a41624 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceCheckpointState.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator; + +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.io.Serializable; +import java.util.Map; + +@Getter +@Setter +@AllArgsConstructor +@ToString +public class TiDBSourceCheckpointState implements Serializable { + private Map assignedSplit; +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java new file mode 100644 index 000000000000..bc6c1b7602d9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils; + +import org.tikv.common.TiSession; +import org.tikv.kvproto.Coprocessor; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class TiDBSourceSplitEnumerator + implements SourceSplitEnumerator { + + private final TiDBSourceConfig sourceConfig; + private final Map assignedSplit; + private final Map pendingSplit; + private final Context context; + private TiSession tiSession; + private long tableId; + + public TiDBSourceSplitEnumerator( + @NonNull Context context, @NonNull TiDBSourceConfig dorisConfig) { + this(context, dorisConfig, null); + } + + public TiDBSourceSplitEnumerator( + @NonNull Context context, + @NonNull TiDBSourceConfig sourceConfig, + TiDBSourceCheckpointState restoreState) { + this.context = context; + this.sourceConfig = sourceConfig; + this.assignedSplit = new HashMap<>(); + this.pendingSplit = new HashMap<>(); + if (restoreState != null) { + this.assignedSplit.putAll(restoreState.getAssignedSplit()); + } + } + + @Override + public void open() { + this.tiSession = TiSession.create(sourceConfig.getTiConfiguration()); + this.tableId = + this.tiSession + .getCatalog() + .getTable(sourceConfig.getDatabaseName(), sourceConfig.getTableName()) + .getId(); + } + + /** The method is executed by the engine only once. */ + @Override + public void run() throws Exception { + Set readers = context.registeredReaders(); + addPendingSplit(getTiDBSourceSplit()); + fetchAssignedSplit(); + assignSplit(readers); + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void fetchAssignedSplit() { + for (Map.Entry split : pendingSplit.entrySet()) { + if (assignedSplit.containsKey(split.getKey())) { + // override split + pendingSplit.put(split.getKey(), split.getValue()); + } + } + } + + private synchronized void addPendingSplit(List splits) { + splits.forEach( + split -> { + pendingSplit.put( + getSplitOwner(split.splitId(), context.currentParallelism()), split); + }); + } + + private void assignSplit(Collection readers) { + for (Integer reader : readers) { + final TiDBSourceSplit assignmentForReader = pendingSplit.remove(reader); + if (assignmentForReader != null) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + private static int getSplitOwner(String splitId, int numReaders) { + return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + private List getTiDBSourceSplit() { + List sourceSplits = Lists.newArrayList(); + List keyRanges = + TableKeyRangeUtils.getTableKeyRanges(this.tableId, context.currentParallelism()); + for (Coprocessor.KeyRange keyRange : keyRanges) { + sourceSplits.add( + new TiDBSourceSplit( + sourceConfig.getDatabaseName(), + sourceConfig.getTableName(), + keyRange, + sourceConfig.getStartupMode() == StartupMode.INITIAL ? -1 : 0, + keyRange.getStart(), + false)); + } + return sourceSplits; + } + + /** + * Called to close the enumerator, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + if (this.tiSession != null) { + try { + this.tiSession.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} + * fails and there are splits assigned to it after the last successful checkpoint. + * + * @param splits The split to add back to the enumerator for reassignment. + * @param subtaskId The id of the subtask to which the returned splits belong. + */ + @Override + public void addSplitsBack(List splits, int subtaskId) { + log.debug("Add back splits {} to TiDBSourceSplitEnumerator.", splits); + if (!splits.isEmpty()) { + addPendingSplit(splits); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to TiDBSourceSplitEnumerator.", subtaskId); + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + /** + * If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId + */ + @Override + public TiDBSourceCheckpointState snapshotState(long checkpointId) throws Exception { + return new TiDBSourceCheckpointState(pendingSplit); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // do nothing + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java new file mode 100644 index 000000000000..8e36a62fd93b --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/RowKeyWithTs.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader; + +import org.tikv.common.key.RowKey; +import org.tikv.kvproto.Cdcpb; + +import lombok.Data; + +import java.util.Objects; + +@Data +public class RowKeyWithTs implements Comparable { + private final long timestamp; + private final RowKey rowKey; + + private RowKeyWithTs(final long timestamp, final RowKey rowKey) { + this.timestamp = timestamp; + this.rowKey = rowKey; + } + + private RowKeyWithTs(final long timestamp, final byte[] key) { + this(timestamp, RowKey.decode(key)); + } + + @Override + public int compareTo(final RowKeyWithTs that) { + int res = Long.compare(this.timestamp, that.timestamp); + if (res == 0) { + res = Long.compare(this.rowKey.getTableId(), that.rowKey.getTableId()); + } + if (res == 0) { + res = Long.compare(this.rowKey.getHandle(), that.rowKey.getHandle()); + } + return res; + } + + @Override + public int hashCode() { + return Objects.hash(this.timestamp, this.rowKey.getTableId(), this.rowKey.getHandle()); + } + + @Override + public boolean equals(final Object thatObj) { + if (thatObj instanceof RowKeyWithTs) { + final RowKeyWithTs that = (RowKeyWithTs) thatObj; + return this.timestamp == that.timestamp && this.rowKey.equals(that.rowKey); + } + return false; + } + + static RowKeyWithTs ofStart(final Cdcpb.Event.Row row) { + return new RowKeyWithTs(row.getStartTs(), row.getKey().toByteArray()); + } + + static RowKeyWithTs ofCommit(final Cdcpb.Event.Row row) { + return new RowKeyWithTs(row.getCommitTs(), row.getKey().toByteArray()); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java new file mode 100644 index 000000000000..c2902cef8185 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/reader/TiDBSourceReader.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowSnapshotRecordDeserializer; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils; + +import org.tikv.cdc.CDCClient; +import org.tikv.common.TiSession; +import org.tikv.common.key.RowKey; +import org.tikv.common.meta.TiTableInfo; +import org.tikv.kvproto.Cdcpb; +import org.tikv.kvproto.Coprocessor; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.shade.com.google.protobuf.ByteString; +import org.tikv.txn.KVClient; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@Slf4j +public class TiDBSourceReader implements SourceReader { + + private static final long STREAMING_VERSION_START_EPOCH = 0L; + private final SourceReader.Context context; + private final TiDBSourceConfig config; + private final Deque sourceSplits; + private final Set snapshotCompleted; + + private final Map cacheCDCClient; + private final Map resolvedTsStates; + private final Map snapshotOffsets; + + private SeaTunnelRowSnapshotRecordDeserializer snapshotRecordDeserializer; + private SeaTunnelRowStreamingRecordDeserializer streamingRecordDeserializer; + + /** Task local variables. */ + private transient TiSession session = null; + + private transient TreeMap prewrites = null; + private transient TreeMap commits = null; + private transient BlockingQueue committedEvents = null; + + private CatalogTable catalogTable; + + public TiDBSourceReader(Context context, TiDBSourceConfig config, CatalogTable catalogTable) { + this.context = context; + this.config = config; + this.sourceSplits = new LinkedList<>(); + this.snapshotCompleted = new HashSet<>(); + + this.cacheCDCClient = new HashMap<>(); + this.resolvedTsStates = new HashMap<>(); + this.snapshotOffsets = new HashMap<>(); + + this.prewrites = new TreeMap<>(); + this.commits = new TreeMap<>(); + // cdc event will lose if pull cdc event block when region split + // use queue to separate read and write to ensure pull event unblock. + // since sink jdbc is slow, 5000W queue size may be safe size. + this.committedEvents = new LinkedBlockingQueue<>(); + + this.catalogTable = catalogTable; + } + + /** Open the source reader. */ + @Override + public void open() throws Exception { + this.session = TiSession.create(config.getTiConfiguration()); + TiTableInfo tableInfo = + session.getCatalog().getTable(config.getDatabaseName(), config.getTableName()); + this.snapshotRecordDeserializer = + new SeaTunnelRowSnapshotRecordDeserializer(tableInfo, catalogTable); + this.streamingRecordDeserializer = + new SeaTunnelRowStreamingRecordDeserializer(tableInfo, catalogTable); + } + + /** + * Called to close the reader, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException { + if (this.session != null) { + try { + this.session.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ + @Override + public void pollNext(Collector output) throws Exception { + if (config.getStartupMode() == StartupMode.INITIAL) { + for (TiDBSourceSplit sourceSplit : sourceSplits) { + if (!sourceSplit.isSnapshotCompleted()) { + snapshotEvents(sourceSplit, output); + } + // completed snapshot + snapshotCompleted.add(sourceSplit); + } + } + for (TiDBSourceSplit sourceSplit : sourceSplits) { + captureStreamingEvents(sourceSplit, output); + } + } + + protected void snapshotEvents(TiDBSourceSplit split, Collector output) + throws Exception { + log.info("read snapshot events"); + Coprocessor.KeyRange keyRange = split.getKeyRange(); + long resolvedTs = split.getResolvedTs(); + try (KVClient scanClient = session.createKVClient()) { + long startTs = session.getTimestamp().getVersion(); + ByteString start = split.getSnapshotStart(); + while (true) { + final List segment = + scanClient.scan(start, keyRange.getEnd(), startTs); + if (segment.isEmpty()) { + resolvedTs = startTs; + resolvedTsStates.put(split, resolvedTs); + break; + } + for (final Kvrpcpb.KvPair pair : segment) { + if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) { + snapshotRecordDeserializer.deserialize(pair, output); + } + } + start = + RowKey.toRawKey(segment.get(segment.size() - 1).getKey()) + .next() + .toByteString(); + snapshotOffsets.put(split, start); + } + } + } + + protected void captureStreamingEvents(TiDBSourceSplit split, Collector output) + throws Exception { + long resolvedTs = resolvedTsStates.get(split); + if (resolvedTs >= STREAMING_VERSION_START_EPOCH) { + log.info("Capture streaming event from resolvedTs:{}", resolvedTs); + long finalResolvedTs = resolvedTs; + CDCClient cdcClient = + cacheCDCClient.computeIfAbsent( + split, + k -> { + CDCClient client = new CDCClient(session, k.getKeyRange()); + client.start(finalResolvedTs); + return client; + }); + final Cdcpb.Event.Row row = cdcClient.get(); + if (row == null) { + return; + } + handleRow(row); + resolvedTs = cdcClient.getMaxResolvedTs(); + if (commits.size() > 0) { + flushRows(resolvedTs); + } + } + // ouput data + while (!committedEvents.isEmpty()) { + Cdcpb.Event.Row row = committedEvents.take(); + this.streamingRecordDeserializer.deserialize(row, output); + } + } + + /** + * Get the current split checkpoint state by checkpointId. + * + *

If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ + @Override + public List snapshotState(long checkpointId) throws Exception { + for (TiDBSourceSplit sourceSplit : sourceSplits) { + if (resolvedTsStates.containsKey(sourceSplit)) { + sourceSplit.setResolvedTs(resolvedTsStates.get(sourceSplit)); + } + if (snapshotCompleted.contains(sourceSplit)) { + sourceSplit.setSnapshotCompleted(true); + } + if (snapshotOffsets.containsKey(sourceSplit)) { + sourceSplit.setSnapshotStart(snapshotOffsets.get(sourceSplit)); + } + } + return new ArrayList<>(sourceSplits); + } + + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ + @Override + public void addSplits(List splits) { + for (TiDBSourceSplit split : splits) { + this.resolvedTsStates.put(split, split.getResolvedTs()); + this.snapshotOffsets.put(split, split.getSnapshotStart()); + } + sourceSplits.addAll(splits); + } + + /** + * This method is called when the reader is notified that it will not receive any further + * splits. + * + *

It is triggered when the enumerator calls {@link + * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask. + */ + @Override + public void handleNoMoreSplits() {} + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + private void handleRow(final Cdcpb.Event.Row row) { + if (!TableKeyRangeUtils.isRecordKey(row.getKey().toByteArray())) { + // Don't handle index key for now + return; + } + log.debug("binlog record, type: {}, data: {}", row.getType(), row); + switch (row.getType()) { + case COMMITTED: + prewrites.put(RowKeyWithTs.ofStart(row), row); + commits.put(RowKeyWithTs.ofCommit(row), row); + break; + case COMMIT: + commits.put(RowKeyWithTs.ofCommit(row), row); + break; + case PREWRITE: + prewrites.put(RowKeyWithTs.ofStart(row), row); + break; + case ROLLBACK: + prewrites.remove(RowKeyWithTs.ofStart(row)); + break; + default: + log.warn("Unsupported row type:" + row.getType()); + } + } + + protected void flushRows(final long resolvedTs) throws Exception { + while (!commits.isEmpty() && commits.firstKey().getTimestamp() <= resolvedTs) { + final Cdcpb.Event.Row commitRow = commits.pollFirstEntry().getValue(); + final Cdcpb.Event.Row prewriteRow = prewrites.remove(RowKeyWithTs.ofStart(commitRow)); + // if pull cdc event block when region split, cdc event will lose. + committedEvents.offer(prewriteRow); + } + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java new file mode 100644 index 000000000000..91a202c2d2fb --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/split/TiDBSourceSplit.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split; + +import org.apache.seatunnel.api.source.SourceSplit; + +import org.tikv.kvproto.Coprocessor; +import org.tikv.shade.com.google.protobuf.ByteString; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@AllArgsConstructor +@Getter +@Setter +public class TiDBSourceSplit implements SourceSplit { + + private static final long serialVersionUID = -9043797960947110643L; + private String database; + private String table; + private Coprocessor.KeyRange keyRange; + private long resolvedTs; + private ByteString snapshotStart; + private boolean snapshotCompleted; + + /** + * Get the split id of this source split. + * + * @return id of this source split. + */ + @Override + public String splitId() { + return String.format( + "%s:%s:%s-%s", database, table, keyRange.getStart(), keyRange.getEnd()); + } + + @Override + public String toString() { + return String.format( + "TiDBSourceSplit: %s.%s,start=%s,end=%s", + getDatabase(), getTable(), getKeyRange().getStart(), getKeyRange().getEnd()); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java new file mode 100644 index 000000000000..5db48e272e01 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/utils/TableKeyRangeUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; +import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; + +import org.tikv.common.key.RowKey; +import org.tikv.common.util.KeyRangeUtils; +import org.tikv.kvproto.Coprocessor.KeyRange; + +import java.math.BigInteger; +import java.util.List; + +/** Utils to obtain the keyRange of table. */ +public class TableKeyRangeUtils { + public static KeyRange getTableKeyRange(final long tableId) { + return KeyRangeUtils.makeCoprocRange( + RowKey.createMin(tableId).toByteString(), + RowKey.createBeyondMax(tableId).toByteString()); + } + + public static List getTableKeyRanges(final long tableId, final int num) { + Preconditions.checkArgument(num > 0, "Illegal value of num"); + + if (num == 1) { + return ImmutableList.of(getTableKeyRange(tableId)); + } + + final long delta = + BigInteger.valueOf(Long.MAX_VALUE) + .subtract(BigInteger.valueOf(Long.MIN_VALUE + 1)) + .divide(BigInteger.valueOf(num)) + .longValueExact(); + final ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < num; i++) { + final RowKey startKey = + (i == 0) + ? RowKey.createMin(tableId) + : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * i); + final RowKey endKey = + (i == num - 1) + ? RowKey.createBeyondMax(tableId) + : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * (i + 1)); + builder.add( + KeyRangeUtils.makeCoprocRange(startKey.toByteString(), endKey.toByteString())); + } + return builder.build(); + } + + public static KeyRange getTableKeyRange(final long tableId, final int num, final int idx) { + Preconditions.checkArgument(idx >= 0 && idx < num, "Illegal value of idx"); + return getTableKeyRanges(tableId, num).get(idx); + } + + public static boolean isRecordKey(final byte[] key) { + return key[9] == '_' && key[10] == 'r'; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java new file mode 100644 index 000000000000..b7e663361bdf --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCClient.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.cdc; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; +import org.apache.seatunnel.shade.com.google.common.collect.Range; +import org.apache.seatunnel.shade.com.google.common.collect.TreeMultiset; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.RangeSplitter; +import org.tikv.common.util.RangeSplitter.RegionTask; +import org.tikv.kvproto.Cdcpb.Event.Row; +import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.shade.io.grpc.ManagedChannel; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; + +public class CDCClient implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class); + + private final TiSession session; + private final KeyRange keyRange; + private final CDCConfig config; + + private final BlockingQueue eventsBuffer; + private final ConcurrentHashMap regionClients = + new ConcurrentHashMap<>(); + private final Map regionToResolvedTs = new HashMap<>(); + private final TreeMultiset resolvedTsSet = TreeMultiset.create(); + + private boolean started = false; + + private Consumer eventConsumer; + + public CDCClient(final TiSession session, final KeyRange keyRange) { + this(session, keyRange, new CDCConfig()); + } + + public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) { + Preconditions.checkState( + session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), + "Unsupported Isolation Level"); // only support SI for now + this.session = session; + this.keyRange = keyRange; + this.config = config; + eventsBuffer = new LinkedBlockingQueue<>(config.getEventBufferSize()); + // fix: use queue.put() instead of queue.offer(), otherwise will lose event + eventConsumer = + (event) -> { + // try 2 times offer. + for (int i = 0; i < 2; i++) { + if (eventsBuffer.offer(event)) { + return; + } + } + // else use put. + try { + eventsBuffer.put(event); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "Client is already started"); + applyKeyRange(keyRange, startTs); + started = true; + } + + public synchronized Row get() throws InterruptedException { + final CDCEvent event = eventsBuffer.poll(); + if (event != null) { + switch (event.eventType) { + case ROW: + return event.row; + case RESOLVED_TS: + handleResolvedTs(event.regionId, event.resolvedTs); + break; + case ERROR: + handleErrorEvent(event.regionId, event.error, event.resolvedTs); + break; + } + } + return null; + } + + public synchronized long getMinResolvedTs() { + return resolvedTsSet.firstEntry().getElement(); + } + + public synchronized long getMaxResolvedTs() { + return resolvedTsSet.lastEntry().getElement(); + } + + public synchronized void close() { + removeRegions(regionClients.keySet()); + } + + private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) { + final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager()); + + final Iterator newRegionsIterator = + splitter.splitRangeByRegion(Arrays.asList(keyRange)).stream() + .map(RegionTask::getRegion) + .sorted((a, b) -> Long.compare(a.getId(), b.getId())) + .iterator(); + final Iterator oldRegionsIterator = regionClients.values().iterator(); + + final ArrayList regionsToAdd = new ArrayList<>(); + final ArrayList regionsToRemove = new ArrayList<>(); + + TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + RegionCDCClient oldRegionClient = + oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + + while (newRegion != null && oldRegionClient != null) { + if (newRegion.getId() == oldRegionClient.getRegion().getId()) { + // check if should refresh region + if (!oldRegionClient.isRunning()) { + regionsToRemove.add(newRegion.getId()); + regionsToAdd.add(newRegion); + } + + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } else if (newRegion.getId() < oldRegionClient.getRegion().getId()) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } else { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + } + + while (newRegion != null) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } + + while (oldRegionClient != null) { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + + removeRegions(regionsToRemove); + addRegions(regionsToAdd, timestamp); + LOGGER.info("keyRange applied"); + } + + private synchronized void addRegions(final Iterable regions, final long timestamp) { + LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp); + for (final TiRegion region : regions) { + if (overlapWithRegion(region)) { + final String address = + session.getRegionManager() + .getStoreById(region.getLeader().getStoreId()) + .getStore() + .getAddress(); + final ManagedChannel channel = + session.getChannelFactory() + .getChannel(address, session.getPDClient().getHostMapping()); + try { + final RegionCDCClient client = + new RegionCDCClient(region, keyRange, channel, eventConsumer, config); + regionClients.put(region.getId(), client); + regionToResolvedTs.put(region.getId(), timestamp); + resolvedTsSet.add(timestamp); + client.start(timestamp); + } catch (final Exception e) { + LOGGER.error( + "failed to add region(regionId: {}, reason: {})", region.getId(), e); + throw new RuntimeException(e); + } + } + } + } + + private synchronized void removeRegions(final Iterable regionIds) { + LOGGER.info("remove regions: {}", regionIds); + for (final long regionId : regionIds) { + final RegionCDCClient regionClient = regionClients.remove(regionId); + if (regionClient != null) { + try { + regionClient.close(); + } catch (final Exception e) { + LOGGER.error( + "failed to close region client, region id: {}, error: {}", regionId, e); + } finally { + resolvedTsSet.remove(regionToResolvedTs.remove(regionId)); + regionToResolvedTs.remove(regionId); + } + } + } + } + + private boolean overlapWithRegion(final TiRegion region) { + final Range regionRange = + Range.closedOpen( + Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey())); + final Range clientRange = + Range.closedOpen( + Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd())); + final Range intersection = regionRange.intersection(clientRange); + return !intersection.isEmpty(); + } + + private void handleResolvedTs(final long regionId, final long resolvedTs) { + LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId); + resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs)); + resolvedTsSet.add(resolvedTs); + } + + public void handleErrorEvent(final long regionId, final Throwable error, long resolvedTs) { + LOGGER.info("handle error: {}, regionId: {}", error, regionId); + final TiRegion region = regionClients.get(regionId).getRegion(); + session.getRegionManager() + .onRequestFail(region); // invalidate cache for corresponding region + + removeRegions(Arrays.asList(regionId)); + applyKeyRange(keyRange, resolvedTs); // reapply the whole keyRange + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java new file mode 100644 index 000000000000..8bccc47314ac --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/CDCEvent.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.cdc; + +import org.tikv.kvproto.Cdcpb.Event.Row; + +class CDCEvent { + enum CDCEventType { + ROW, + RESOLVED_TS, + ERROR + } + + public final long regionId; + + public final CDCEventType eventType; + + public final long resolvedTs; + + public final Row row; + + public final Throwable error; + + private CDCEvent( + final long regionId, + final CDCEventType eventType, + final long resolvedTs, + final Row row, + final Throwable error) { + this.regionId = regionId; + this.eventType = eventType; + this.resolvedTs = resolvedTs; + this.row = row; + this.error = error; + } + + public static CDCEvent rowEvent(final long regionId, final Row row) { + return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null); + } + + public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) { + return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null); + } + + public static CDCEvent error(final long regionId, final Throwable error) { + return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error); + } + + // add new CDCEvent constructor + public static CDCEvent error(final long regionId, final Throwable error, long resolvedTs) { + return new CDCEvent(regionId, CDCEventType.ERROR, resolvedTs, null, error); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CDCEvent[").append(eventType.toString()).append("] {"); + switch (eventType) { + case ERROR: + builder.append("error=").append(error.getMessage()); + break; + case RESOLVED_TS: + builder.append("resolvedTs=").append(resolvedTs); + break; + case ROW: + builder.append("row=").append(row); + break; + } + return builder.append("}").toString(); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java new file mode 100644 index 000000000000..fd2a9918eeab --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.cdc; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; +import org.apache.seatunnel.shade.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.FastByteComparisons; +import org.tikv.common.util.KeyRangeUtils; +import org.tikv.kvproto.Cdcpb; +import org.tikv.kvproto.Cdcpb.ChangeDataEvent; +import org.tikv.kvproto.Cdcpb.ChangeDataRequest; +import org.tikv.kvproto.Cdcpb.Event.LogType; +import org.tikv.kvproto.Cdcpb.Event.Row; +import org.tikv.kvproto.Cdcpb.Header; +import org.tikv.kvproto.Cdcpb.ResolvedTs; +import org.tikv.kvproto.ChangeDataGrpc; +import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; +import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.shade.io.grpc.ManagedChannel; +import org.tikv.shade.io.grpc.stub.StreamObserver; + +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class RegionCDCClient implements AutoCloseable, StreamObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); + private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0); + private static final Set ALLOWED_LOGTYPE = + ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); + + private TiRegion region; + private final KeyRange keyRange; + private final KeyRange regionKeyRange; + private final ManagedChannel channel; + private final ChangeDataStub asyncStub; + private final Consumer eventConsumer; + private final CDCConfig config; + private final Predicate rowFilter; + + private final AtomicBoolean running = new AtomicBoolean(false); + + private final boolean started = false; + + private long resolvedTs = 0; + + public RegionCDCClient( + final TiRegion region, + final KeyRange keyRange, + final ManagedChannel channel, + final Consumer eventConsumer, + final CDCConfig config) { + this.region = region; + this.keyRange = keyRange; + this.channel = channel; + this.asyncStub = ChangeDataGrpc.newStub(channel); + this.eventConsumer = eventConsumer; + this.config = config; + + this.regionKeyRange = + KeyRange.newBuilder() + .setStart(region.getStartKey()) + .setEnd(region.getEndKey()) + .build(); + + this.rowFilter = + regionEnclosed() + ? ((row) -> true) + : new Predicate() { + final byte[] buffer = new byte[config.getMaxRowKeySize()]; + + final byte[] start = keyRange.getStart().toByteArray(); + final byte[] end = keyRange.getEnd().toByteArray(); + + @Override + public boolean test(final Row row) { + final int len = row.getKey().size(); + row.getKey().copyTo(buffer, 0); + return (FastByteComparisons.compareTo( + buffer, 0, len, start, 0, start.length) + >= 0) + && (FastByteComparisons.compareTo( + buffer, 0, len, end, 0, end.length) + < 0); + } + }; + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "RegionCDCClient has already started"); + resolvedTs = startTs; + running.set(true); + LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get()); + final ChangeDataRequest request = + ChangeDataRequest.newBuilder() + .setRequestId(REQ_ID_COUNTER.incrementAndGet()) + .setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build()) + .setRegionId(region.getId()) + .setCheckpointTs(startTs) + .setStartKey(keyRange.getStart()) + .setEndKey(keyRange.getEnd()) + .setRegionEpoch(region.getRegionEpoch()) + .setExtraOp(config.getExtraOp()) + .build(); + final StreamObserver requestObserver = asyncStub.eventFeed(this); + HashMap params = new HashMap<>(); + params.put("requestId", request.getRequestId()); + params.put("header", request.getHeader()); + params.put("regionId", request.getRegionId()); + params.put("checkpointTs", request.getCheckpointTs()); + params.put("startKey", request.getStartKey().toString()); + params.put("endKey", request.getEndKey().toString()); + params.put("regionEpoch", request.getRegionEpoch()); + params.put("extraOp", request.getExtraOp()); + requestObserver.onNext(request); + } + + public TiRegion getRegion() { + return region; + } + + public void setRegion(TiRegion region) { + this.region = region; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + public KeyRange getRegionKeyRange() { + return regionKeyRange; + } + + public boolean regionEnclosed() { + return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd()) + .encloses( + KeyRangeUtils.makeRange( + regionKeyRange.getStart(), regionKeyRange.getEnd())); + } + + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() throws Exception { + LOGGER.info("close (region: {})", region.getId()); + running.set(false); + // fix: close grpc channel will make client threadpool shutdown. + /* + synchronized (this) { + channel.shutdown(); + } + try { + LOGGER.debug("awaitTermination (region: {})", region.getId()); + channel.awaitTermination(60, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId()); + Thread.currentThread().interrupt(); + synchronized (this) { + channel.shutdownNow(); + } + } + */ + LOGGER.info("terminated (region: {})", region.getId()); + } + + @Override + public void onCompleted() { + // should never been called + onError(new IllegalStateException("RegionCDCClient should never complete")); + } + + @Override + public void onError(final Throwable error) { + onError(error, this.resolvedTs); + } + + private void onError(final Throwable error, long resolvedTs) { + LOGGER.error( + "region CDC error: region: {}, resolvedTs:{}, error: {}", + region.getId(), + resolvedTs, + error); + running.set(false); + eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs)); + } + + @Override + public void onNext(final ChangeDataEvent event) { + try { + if (running.get()) { + // fix: miss to process error event + onErrorEventHandle(event); + event.getEventsList().stream() + .flatMap(ev -> ev.getEntries().getEntriesList().stream()) + .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) + .filter(this.rowFilter) + .map(row -> CDCEvent.rowEvent(region.getId(), row)) + .forEach(this::submitEvent); + + if (event.hasResolvedTs()) { + final ResolvedTs resolvedTs = event.getResolvedTs(); + this.resolvedTs = resolvedTs.getTs(); + if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { + submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); + } + } + } + } catch (final Exception e) { + onError(e, resolvedTs); + } + } + + // error event handle + private void onErrorEventHandle(final ChangeDataEvent event) { + List errorEvents = + event.getEventsList().stream() + .filter(errEvent -> errEvent.hasError()) + .collect(Collectors.toList()); + if (errorEvents != null && errorEvents.size() > 0) { + onError( + new RuntimeException( + "regionCDC error:" + errorEvents.get(0).getError().toString()), + this.resolvedTs); + } + } + + private void submitEvent(final CDCEvent event) { + LOGGER.debug("submit event: {}", event); + eventConsumer.accept(event); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml b/seatunnel-connectors-v2/connector-cdc/pom.xml index 44916d35caa2..a5d908dd9e5d 100644 --- a/seatunnel-connectors-v2/connector-cdc/pom.xml +++ b/seatunnel-connectors-v2/connector-cdc/pom.xml @@ -36,6 +36,7 @@ connector-cdc-mongodb connector-cdc-postgres connector-cdc-oracle + connector-cdc-tidb diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index cfe0ed44ecdb..6298518e2ae4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -52,6 +52,7 @@ 2.4.3 12.2.0 3.0.0 + 3.2.0 @@ -202,6 +203,13 @@ ${iris.jdbc.version} provided + + + org.tikv + tikv-client-java + ${tikv.version} + provided + @@ -308,6 +316,11 @@ com.intersystems intersystems-jdbc + + + org.tikv + tikv-client-java + diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a16d86cad5ae..61129cc88be1 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -109,6 +109,7 @@ 3.1.3 3.4.1 1.1 + 3.3.5 @@ -510,6 +511,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-cdc-tidb + ${project.version} + provided + org.apache.seatunnel connector-cdc-postgres @@ -690,6 +697,11 @@ ${snowflake.version} provided + + org.tikv + tikv-client-java + ${tidb.version} + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml new file mode 100644 index 000000000000..300d2f47b9f7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml @@ -0,0 +1,65 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-cdc-tidb-e2e + SeaTunnel : E2E : Connector V2 : TiDB + + + 8 + 8 + + + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + org.apache.seatunnel + connector-cdc-tidb + ${project.version} + test + + + org.apache.seatunnel + connector-jdbc + ${project.version} + test + + + mysql + mysql-connector-java + 8.0.27 + test + + + com.alibaba + dns-cache-manipulator + 1.8.0 + test + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java new file mode 100644 index 000000000000..86552eb23b3b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.tidb; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not support cdc") +public class TiDBCDCIT extends TiDBTestBase implements TestResource { + + private final String SQL_TEMPLATE = "select id,name,description,weight from %s.%s"; + + private String driverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/TiDB-CDC/lib && cd " + + "/tmp/seatunnel/plugins/TiDB-CDC/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + startContainers(); + } + + @TestTemplate + public void testAllEvents(TestContainer container) throws Exception { + initializeTidbTable("inventory"); + Container.ExecResult execResult = container.executeJob("/tidb/tidb_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + try (Connection connection = getJdbcConnection("inventory"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getQuerySQL("inventory", "products")), + query(getQuerySQL("inventory", "products_sink"))); + }); + } + + @TestTemplate + public void testAllTypes(TestContainer container) throws Exception { + initializeTidbTable("column_all_type"); + Container.ExecResult execResult = container.executeJob("/tidb/tidb_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + try (Connection connection = getJdbcConnection("column_type_test"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getQuerySQL("column_type_test", "full_types")), + query(getQuerySQL("column_type_test", "full_types_sink"))); + }); + } + + private List> query(String sql) { + try (Connection connection = getJdbcConnection("inventory")) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private String getQuerySQL(String database, String tableName) { + return String.format(SQL_TEMPLATE, database, tableName); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + stopContainers(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java new file mode 100644 index 000000000000..44835036d178 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBTestBase.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.tidb; + +import org.apache.seatunnel.e2e.common.TestSuiteBase; + +import org.apache.commons.lang3.RandomUtils; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import com.alibaba.dcm.DnsCacheManipulator; +import lombok.extern.slf4j.Slf4j; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertNotNull; + +/** Utility class for tidb tests. */ +@Slf4j +public class TiDBTestBase extends TestSuiteBase { + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + public static final String PD_CONTAINER_HOST = "pd-e2e"; + public static final String TIKV_CONTAINER_HOST = "tikv-e2e"; + public static final String TIDB_CONTAINER_HOST = "tidb-e2e"; + + public static final String TIDB_USER = "root"; + public static final String TIDB_PASSWORD = "seatunnel"; + + public static final int TIDB_PORT = 4000; + public static final int TIKV_PORT_ORIGIN = 20160; + public static final int PD_PORT_ORIGIN = 2379; + public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000); + + public static final GenericContainer PD = + new FixedHostPortGenericContainer<>("pingcap/pd:v6.1.0") + .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml") + .withFixedExposedPort(pdPort, PD_PORT_ORIGIN) + .withCommand( + "--name=pd0", + "--client-urls=http://0.0.0.0:" + pdPort + ",http://0.0.0.0:2379", + "--peer-urls=http://0.0.0.0:2380", + "--advertise-client-urls=http://pd0:" + pdPort + ",http://pd0:2379", + "--advertise-peer-urls=http://pd0:2380", + "--initial-cluster=pd0=http://pd0:2380", + "--data-dir=/data/pd0", + "--config=/pd.toml", + "--log-file=/logs/pd0.log") + .withNetwork(NETWORK) + .withNetworkAliases(PD_CONTAINER_HOST) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(log)); + + public static final GenericContainer TIKV = + new FixedHostPortGenericContainer<>("pingcap/tikv:v6.1.0") + .withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN) + .withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml") + .withCommand( + "--addr=0.0.0.0:20160", + "--advertise-addr=tikv0:20160", + "--data-dir=/data/tikv0", + "--pd=pd0:2379", + "--config=/tikv.toml", + "--log-file=/logs/tikv0.log") + .withNetwork(NETWORK) + .dependsOn(PD) + .withNetworkAliases(TIKV_CONTAINER_HOST) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(log)); + + public static final GenericContainer TIDB = + new GenericContainer<>("pingcap/tidb:v6.1.0") + .withExposedPorts(TIDB_PORT) + .withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml") + .withCommand( + "--store=tikv", + "--path=pd0:2379", + "--config=/tidb.toml", + "--advertise-address=tidb0") + .withNetwork(NETWORK) + .dependsOn(TIKV) + .withNetworkAliases(TIDB_CONTAINER_HOST) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(log)); + + public static void startContainers() throws Exception { + // Add jvm dns cache for flink to invoke pd interface. + DnsCacheManipulator.setDnsCache(PD_CONTAINER_HOST, "127.0.0.1"); + DnsCacheManipulator.setDnsCache(TIKV_CONTAINER_HOST, "127.0.0.1"); + log.info("Starting containers..."); + Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); + log.info("Containers are started."); + } + + public static void stopContainers() { + DnsCacheManipulator.removeDnsCache(PD_CONTAINER_HOST); + DnsCacheManipulator.removeDnsCache(TIKV_CONTAINER_HOST); + Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); + } + + public String getJdbcUrl(String databaseName) { + return "jdbc:mysql://" + + TIDB.getContainerIpAddress() + + ":" + + TIDB.getMappedPort(TIDB_PORT) + + "/" + + databaseName; + } + + protected Connection getJdbcConnection(String databaseName) throws SQLException { + return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD); + } + + private static void dropTestDatabase(Connection connection, String databaseName) + throws SQLException { + try { + Awaitility.await(String.format("Dropping database %s", databaseName)) + .atMost(120, TimeUnit.SECONDS) + .until( + () -> { + try { + String sql = + String.format( + "DROP DATABASE IF EXISTS %s", databaseName); + connection.createStatement().execute(sql); + return true; + } catch (SQLException e) { + log.warn( + String.format( + "DROP DATABASE %s failed: {}", databaseName), + e.getMessage()); + return false; + } + }); + } catch (ConditionTimeoutException e) { + throw new IllegalStateException("Failed to drop test database", e); + } + } + + /** + * Executes a JDBC statement using the default jdbc config without autocommitting the + * connection. + */ + protected void initializeTidbTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = TiDBTestBase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getJdbcConnection(""); + Statement statement = connection.createStatement()) { + dropTestDatabase(connection, sqlFile); + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml new file mode 100644 index 000000000000..d4d23f5465a3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/pd.toml @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# PD Configuration. + +name = "pd" +data-dir = "default.pd" + +client-urls = "http://127.0.0.1:2379" +# if not set, use ${client-urls} +advertise-client-urls = "" + +peer-urls = "http://127.0.0.1:2380" +# if not set, use ${peer-urls} +advertise-peer-urls = "" + +initial-cluster = "pd=http://127.0.0.1:2380" +initial-cluster-state = "new" + +lease = 3 +tso-save-interval = "3s" + +[security] +# Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty +cacert-path = "" +# Path of file that contains X509 certificate in PEM format. +cert-path = "" +# Path of file that contains X509 key in PEM format. +key-path = "" + +[log] +level = "error" + +# log format, one of json, text, console +#format = "text" + +# disable automatic timestamps in output +#disable-timestamp = false + +# file logging +[log.file] +#filename = "" +# max log file size in MB +#max-size = 300 +# max log file keep days +#max-days = 28 +# maximum number of old log files to retain +#max-backups = 7 +# rotate log by day +#log-rotate = true + +[metric] +# prometheus client push interval, set "0s" to disable prometheus. +interval = "15s" +# prometheus pushgateway address, leaves it empty will disable prometheus. +address = "pushgateway:9091" + +[schedule] +max-merge-region-size = 0 +split-merge-interval = "1h" +max-snapshot-count = 3 +max-pending-peer-count = 16 +max-store-down-time = "30m" +leader-schedule-limit = 4 +region-schedule-limit = 4 +replica-schedule-limit = 8 +merge-schedule-limit = 8 +tolerant-size-ratio = 5.0 + +# customized schedulers, the format is as below +# if empty, it will use balance-leader, balance-region, hot-region as default +# [[schedule.schedulers]] +# type = "evict-leader" +# args = ["1"] + +[replication] +# The number of replicas for each region. +max-replicas = 3 +# The label keys specified the location of a store. +# The placement priorities is implied by the order of label keys. +# For example, ["zone", "rack"] means that we should place replicas to +# different zones first, then to different racks if we don't have enough zones. +location-labels = [] + +[label-property] +# Do not assign region leaders to stores that have these tags. +# [[label-property.reject-leader]] +# key = "zone" +# value = "cn1 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml new file mode 100644 index 000000000000..742eb935d1c4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tidb.toml @@ -0,0 +1,233 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TiDB Configuration. + +# TiDB server host. +host = "0.0.0.0" + +# TiDB server port. +port = 4000 + +# Registered store name, [tikv, mocktikv] +store = "tikv" + +# TiDB storage path. +path = "/tmp/tidb" + +# The socket file to use for connection. +socket = "" + +# Run ddl worker on this tidb-server. +run-ddl = true + +# Schema lease duration, very dangerous to change only if you know what you do. +lease = "0" + +# When create table, split a separated region for it. It is recommended to +# turn off this option if there will be a large number of tables created. +split-table = true + +# The limit of concurrent executed sessions. +token-limit = 1000 + +# Only print a log when out of memory quota. +# Valid options: ["log", "cancel"] +oom-action = "log" + +# Set the memory quota for a query in bytes. Default: 32GB +mem-quota-query = 34359738368 + +# Set system variable 'lower_case_table_names' +lower-case-table-names = 2 + +[log] +# Log level: debug, info, warn, error, fatal. +level = "error" + +# Log format, one of json, text, console. +format = "text" + +# Disable automatic timestamp in output +disable-timestamp = false + +# Stores slow query log into separated files. +slow-query-file = "" + +# Queries with execution time greater than this value will be logged. (Milliseconds) +slow-threshold = 300 + +# Queries with internal result greater than this value will be logged. +expensive-threshold = 10000 + +# Maximum query length recorded in log. +query-log-max-len = 2048 + +# File logging. +[log.file] +# Log file name. +filename = "" + +# Max log file size in MB (upper limit to 4096MB). +max-size = 300 + +# Max log file keep days. No clean up by default. +max-days = 0 + +# Maximum number of old log files to retain. No clean up by default. +max-backups = 0 + +[security] +# Path of file that contains list of trusted SSL CAs for connection with mysql client. +ssl-ca = "" + +# Path of file that contains X509 certificate in PEM format for connection with mysql client. +ssl-cert = "" + +# Path of file that contains X509 key in PEM format for connection with mysql client. +ssl-key = "" + +# Path of file that contains list of trusted SSL CAs for connection with cluster components. +cluster-ssl-ca = "" + +# Path of file that contains X509 certificate in PEM format for connection with cluster components. +cluster-ssl-cert = "" + +# Path of file that contains X509 key in PEM format for connection with cluster components. +cluster-ssl-key = "" + +[status] +# If enable status report HTTP service. +report-status = true + +# TiDB status port. +status-port = 10080 + +# Prometheus client push interval in second, set \"0\" to disable prometheus push. +metrics-interval = 15 + +[performance] +# Max CPUs to use, 0 use number of CPUs in the machine. +max-procs = 0 +# StmtCountLimit limits the max count of statement inside a transaction. +stmt-count-limit = 5000 + +# Set keep alive option for tcp connection. +tcp-keep-alive = true + +# Whether support cartesian product. +cross-join = true + +# Stats lease duration, which influences the time of analyze and stats load. +stats-lease = "3s" + +# Run auto analyze worker on this tidb-server. +run-auto-analyze = true + +# Probability to use the query feedback to update stats, 0 or 1 for always false/true. +feedback-probability = 0.0 + +# The max number of query feedback that cache in memory. +query-feedback-limit = 1024 + +# Pseudo stats will be used if the ratio between the modify count and +# row count in statistics of a table is greater than it. +pseudo-estimate-ratio = 0.7 + +[proxy-protocol] +# PROXY protocol acceptable client networks. +# Empty string means disable PROXY protocol, * means all networks. +networks = "" + +# PROXY protocol header read timeout, unit is second +header-timeout = 5 + +[opentracing] +# Enable opentracing. +enable = false + +# Whether to enable the rpc metrics. +rpc-metrics = false + +[opentracing.sampler] +# Type specifies the type of the sampler: const, probabilistic, rateLimiting, or remote +type = "const" + +# Param is a value passed to the sampler. +# Valid values for Param field are: +# - for "const" sampler, 0 or 1 for always false/true respectively +# - for "probabilistic" sampler, a probability between 0 and 1 +# - for "rateLimiting" sampler, the number of spans per second +# - for "remote" sampler, param is the same as for "probabilistic" +# and indicates the initial sampling rate before the actual one +# is received from the mothership +param = 1.0 + +# SamplingServerURL is the address of jaeger-agent's HTTP sampling server +sampling-server-url = "" + +# MaxOperations is the maximum number of operations that the sampler +# will keep track of. If an operation is not tracked, a default probabilistic +# sampler will be used rather than the per operation specific sampler. +max-operations = 0 + +# SamplingRefreshInterval controls how often the remotely controlled sampler will poll +# jaeger-agent for the appropriate sampling strategy. +sampling-refresh-interval = 0 + +[opentracing.reporter] +# QueueSize controls how many spans the reporter can keep in memory before it starts dropping +# new spans. The queue is continuously drained by a background go-routine, as fast as spans +# can be sent out of process. +queue-size = 0 + +# BufferFlushInterval controls how often the buffer is force-flushed, even if it's not full. +# It is generally not useful, as it only matters for very low traffic services. +buffer-flush-interval = 0 + +# LogSpans, when true, enables LoggingReporter that runs in parallel with the main reporter +# and logs all submitted spans. Main Configuration.Logger must be initialized in the code +# for this option to have any effect. +log-spans = false + +# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address +local-agent-host-port = "" + +[tikv-client] +# Max gRPC connections that will be established with each tikv-server. +grpc-connection-count = 16 + +# After a duration of this time in seconds if the client doesn't see any activity it pings +# the server to see if the transport is still alive. +grpc-keepalive-time = 10 + +# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds +# and if no activity is seen even after that the connection is closed. +grpc-keepalive-timeout = 3 + +# max time for commit command, must be twice bigger than raft election timeout. +commit-timeout = "41s" + +[binlog] + +# Socket file to write binlog. +binlog-socket = "" + +# WriteTimeout specifies how long it will wait for writing binlog to pump. +write-timeout = "15s" + +# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog, +# but still provide service. +ignore-error = false diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml new file mode 100644 index 000000000000..372ab91ac9d4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/config/tikv.toml @@ -0,0 +1,513 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TiKV config template +# Human-readable big numbers: +# File size(based on byte): KB, MB, GB, TB, PB +# e.g.: 1_048_576 = "1MB" +# Time(based on ms): ms, s, m, h +# e.g.: 78_000 = "1.3m" + +# log level: trace, debug, info, warn, error, off. +log-level = "error" +# file to store log, write to stderr if it's empty. +# log-file = "" +log-rotation-size="500MB" + +[readpool.storage] +# size of thread pool for high-priority operations +# high-concurrency = 4 +# size of thread pool for normal-priority operations +# normal-concurrency = 4 +# size of thread pool for low-priority operations +# low-concurrency = 4 +# max running high-priority operations, reject if exceed +# max-tasks-high = 8000 +# max running normal-priority operations, reject if exceed +# max-tasks-normal = 8000 +# max running low-priority operations, reject if exceed +# max-tasks-low = 8000 +# size of stack size for each thread pool +# stack-size = "10MB" + +[readpool.coprocessor] +# Notice: if CPU_NUM > 8, default thread pool size for coprocessors +# will be set to CPU_NUM * 0.8. + +# high-concurrency = 8 +# normal-concurrency = 8 +# low-concurrency = 8 +# max-tasks-high = 16000 +# max-tasks-normal = 16000 +# max-tasks-low = 16000 +# stack-size = "10MB" + +[server] +# set listening address. +# addr = "127.0.0.1:20160" +# set advertise listening address for client communication, if not set, use addr instead. +# advertise-addr = "" +# notify capacity, 40960 is suitable for about 7000 regions. +# notify-capacity = 40960 +# maximum number of messages can be processed in one tick. +# messages-per-tick = 4096 + +# compression type for grpc channel, available values are no, deflate and gzip. +# grpc-compression-type = "no" +# size of thread pool for grpc server. +# grpc-concurrency = 4 +# The number of max concurrent streams/requests on a client connection. +# grpc-concurrent-stream = 1024 +# The number of connections with each tikv server to send raft messages. +# grpc-raft-conn-num = 10 +# Amount to read ahead on individual grpc streams. +# grpc-stream-initial-window-size = "2MB" + +# How many snapshots can be sent concurrently. +# concurrent-send-snap-limit = 32 +# How many snapshots can be recv concurrently. +# concurrent-recv-snap-limit = 32 + +# max count of tasks being handled, new tasks will be rejected. +# end-point-max-tasks = 2000 + +# max recursion level allowed when decoding dag expression +# end-point-recursion-limit = 1000 + +# max time to handle coprocessor request before timeout +# end-point-request-max-handle-duration = "60s" + +# the max bytes that snapshot can be written to disk in one second, +# should be set based on your disk performance +# snap-max-write-bytes-per-sec = "100MB" + +# set attributes about this server, e.g. { zone = "us-west-1", disk = "ssd" }. +# labels = {} + +[storage] +# set the path to rocksdb directory. +# data-dir = "/tmp/tikv/store" + +# notify capacity of scheduler's channel +# scheduler-notify-capacity = 10240 + +# maximum number of messages can be processed in one tick +# scheduler-messages-per-tick = 1024 + +# the number of slots in scheduler latches, concurrency control for write. +# scheduler-concurrency = 2048000 + +# scheduler's worker pool size, should increase it in heavy write cases, +# also should less than total cpu cores. +# scheduler-worker-pool-size = 4 + +# When the pending write bytes exceeds this threshold, +# the "scheduler too busy" error is displayed. +# scheduler-pending-write-threshold = "100MB" + +[pd] +# pd endpoints +# endpoints = [] + +[metric] +# the Prometheus client push interval. Setting the value to 0s stops Prometheus client from pushing. +# interval = "15s" +# the Prometheus pushgateway address. Leaving it empty stops Prometheus client from pushing. +address = "pushgateway:9091" +# the Prometheus client push job name. Note: A node id will automatically append, e.g., "tikv_1". +# job = "tikv" + +[raftstore] +# true (default value) for high reliability, this can prevent data loss when power failure. +# sync-log = true + +# set the path to raftdb directory, default value is data-dir/raft +# raftdb-path = "" + +# set store capacity, if no set, use disk capacity. +# capacity = 0 + +# notify capacity, 40960 is suitable for about 7000 regions. +# notify-capacity = 40960 + +# maximum number of messages can be processed in one tick. +# messages-per-tick = 4096 + +# Region heartbeat tick interval for reporting to pd. +# pd-heartbeat-tick-interval = "60s" +# Store heartbeat tick interval for reporting to pd. +# pd-store-heartbeat-tick-interval = "10s" + +# When region size changes exceeds region-split-check-diff, we should check +# whether the region should be split or not. +# region-split-check-diff = "6MB" + +# Interval to check region whether need to be split or not. +# split-region-check-tick-interval = "10s" + +# When raft entry exceed the max size, reject to propose the entry. +# raft-entry-max-size = "8MB" + +# Interval to gc unnecessary raft log. +# raft-log-gc-tick-interval = "10s" +# A threshold to gc stale raft log, must >= 1. +# raft-log-gc-threshold = 50 +# When entry count exceed this value, gc will be forced trigger. +# raft-log-gc-count-limit = 72000 +# When the approximate size of raft log entries exceed this value, gc will be forced trigger. +# It's recommanded to set it to 3/4 of region-split-size. +# raft-log-gc-size-limit = "72MB" + +# When a peer hasn't been active for max-peer-down-duration, +# we will consider this peer to be down and report it to pd. +# max-peer-down-duration = "5m" + +# Interval to check whether start manual compaction for a region, +# region-compact-check-interval = "5m" +# Number of regions for each time to check. +# region-compact-check-step = 100 +# The minimum number of delete tombstones to trigger manual compaction. +# region-compact-min-tombstones = 10000 +# Interval to check whether should start a manual compaction for lock column family, +# if written bytes reach lock-cf-compact-threshold for lock column family, will fire +# a manual compaction for lock column family. +# lock-cf-compact-interval = "10m" +# lock-cf-compact-bytes-threshold = "256MB" + +# Interval (s) to check region whether the data are consistent. +# consistency-check-interval = 0 + +# Use delete range to drop a large number of continuous keys. +# use-delete-range = false + +# delay time before deleting a stale peer +# clean-stale-peer-delay = "10m" + +# Interval to cleanup import sst files. +# cleanup-import-sst-interval = "10m" + +[coprocessor] +# When it is true, it will try to split a region with table prefix if +# that region crosses tables. It is recommended to turn off this option +# if there will be a large number of tables created. +# split-region-on-table = true +# When the region's size exceeds region-max-size, we will split the region +# into two which the left region's size will be region-split-size or a little +# bit smaller. +# region-max-size = "144MB" +# region-split-size = "96MB" + +[rocksdb] +# Maximum number of concurrent background jobs (compactions and flushes) +# max-background-jobs = 8 + +# This value represents the maximum number of threads that will concurrently perform a +# compaction job by breaking it into multiple, smaller ones that are run simultaneously. +# Default: 1 (i.e. no subcompactions) +# max-sub-compactions = 1 + +# Number of open files that can be used by the DB. You may need to +# increase this if your database has a large working set. Value -1 means +# files opened are always kept open. You can estimate number of files based +# on target_file_size_base and target_file_size_multiplier for level-based +# compaction. +# If max-open-files = -1, RocksDB will prefetch index and filter blocks into +# block cache at startup, so if your database has a large working set, it will +# take several minutes to open the db. +max-open-files = 1024 + +# Max size of rocksdb's MANIFEST file. +# For detailed explanation please refer to https://github.com/facebook/rocksdb/wiki/MANIFEST +# max-manifest-file-size = "20MB" + +# If true, the database will be created if it is missing. +# create-if-missing = true + +# rocksdb wal recovery mode +# 0 : TolerateCorruptedTailRecords, tolerate incomplete record in trailing data on all logs; +# 1 : AbsoluteConsistency, We don't expect to find any corruption in the WAL; +# 2 : PointInTimeRecovery, Recover to point-in-time consistency; +# 3 : SkipAnyCorruptedRecords, Recovery after a disaster; +# wal-recovery-mode = 2 + +# rocksdb write-ahead logs dir path +# This specifies the absolute dir path for write-ahead logs (WAL). +# If it is empty, the log files will be in the same dir as data. +# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set +# wal-dir to a directory on a persistent storage. +# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database +# wal-dir = "/tmp/tikv/store" + +# The following two fields affect how archived write-ahead logs will be deleted. +# 1. If both set to 0, logs will be deleted asap and will not get into the archive. +# 2. If wal-ttl-seconds is 0 and wal-size-limit is not 0, +# WAL files will be checked every 10 min and if total size is greater +# then wal-size-limit, they will be deleted starting with the +# earliest until size_limit is met. All empty files will be deleted. +# 3. If wal-ttl-seconds is not 0 and wal-size-limit is 0, then +# WAL files will be checked every wal-ttl-seconds / 2 and those that +# are older than wal-ttl-seconds will be deleted. +# 4. If both are not 0, WAL files will be checked every 10 min and both +# checks will be performed with ttl being first. +# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set +# wal-ttl-seconds to a value greater than 0 (like 86400) and backup your db on a regular basis. +# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database +# wal-ttl-seconds = 0 +# wal-size-limit = 0 + +# rocksdb max total wal size +# max-total-wal-size = "4GB" + +# Rocksdb Statistics provides cumulative stats over time. +# Turn statistics on will introduce about 5%-10% overhead for RocksDB, +# but it is worthy to know the internal status of RocksDB. +# enable-statistics = true + +# Dump statistics periodically in information logs. +# Same as rocksdb's default value (10 min). +# stats-dump-period = "10m" + +# Due to Rocksdb FAQ: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ, +# If you want to use rocksdb on multi disks or spinning disks, you should set value at +# least 2MB; +# compaction-readahead-size = 0 + +# This is the maximum buffer size that is used by WritableFileWrite +# writable-file-max-buffer-size = "1MB" + +# Use O_DIRECT for both reads and writes in background flush and compactions +# use-direct-io-for-flush-and-compaction = false + +# Limit the disk IO of compaction and flush. Compaction and flush can cause +# terrible spikes if they exceed a certain threshold. Consider setting this to +# 50% ~ 80% of the disk throughput for a more stable result. However, in heavy +# write workload, limiting compaction and flush speed can cause write stalls too. +# rate-bytes-per-sec = 0 + +# Enable or disable the pipelined write +# enable-pipelined-write = true + +# Allows OS to incrementally sync files to disk while they are being +# written, asynchronously, in the background. +# bytes-per-sync = "0MB" + +# Allows OS to incrementally sync WAL to disk while it is being written. +# wal-bytes-per-sync = "0KB" + +# Specify the maximal size of the Rocksdb info log file. If the log file +# is larger than `max_log_file_size`, a new info log file will be created. +# If max_log_file_size == 0, all logs will be written to one log file. +# Default: 1GB +# info-log-max-size = "1GB" + +# Time for the Rocksdb info log file to roll (in seconds). +# If specified with non-zero value, log file will be rolled +# if it has been active longer than `log_file_time_to_roll`. +# Default: 0 (disabled) +# info-log-roll-time = "0" + +# Maximal Rocksdb info log files to be kept. +# Default: 10 +# info-log-keep-log-file-num = 10 + +# This specifies the Rocksdb info LOG dir. +# If it is empty, the log files will be in the same dir as data. +# If it is non empty, the log files will be in the specified dir, +# and the db data dir's absolute path will be used as the log file +# name's prefix. +# Default: empty +# info-log-dir = "" + +# Column Family default used to store actual data of the database. +[rocksdb.defaultcf] +# compression method (if any) is used to compress a block. +# no: kNoCompression +# snappy: kSnappyCompression +# zlib: kZlibCompression +# bzip2: kBZip2Compression +# lz4: kLZ4Compression +# lz4hc: kLZ4HCCompression +# zstd: kZSTD + +# per level compression +# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"] + +# Approximate size of user data packed per block. Note that the +# block size specified here corresponds to uncompressed data. +# block-size = "64KB" + +# If you're doing point lookups you definitely want to turn bloom filters on, We use +# bloom filters to avoid unnecessary disk reads. Default bits_per_key is 10, which +# yields ~1% false positive rate. Larger bits_per_key values will reduce false positive +# rate, but increase memory usage and space amplification. +# bloom-filter-bits-per-key = 10 + +# false means one sst file one bloom filter, true means evry block has a corresponding bloom filter +# block-based-bloom-filter = false + +# level0-file-num-compaction-trigger = 4 + +# Soft limit on number of level-0 files. We start slowing down writes at this point. +# level0-slowdown-writes-trigger = 20 + +# Maximum number of level-0 files. We stop writes at this point. +# level0-stop-writes-trigger = 36 + +# Amount of data to build up in memory (backed by an unsorted log +# on disk) before converting to a sorted on-disk file. +# write-buffer-size = "128MB" + +# The maximum number of write buffers that are built up in memory. +# max-write-buffer-number = 5 + +# The minimum number of write buffers that will be merged together +# before writing to storage. +# min-write-buffer-number-to-merge = 1 + +# Control maximum total data size for base level (level 1). +# max-bytes-for-level-base = "512MB" + +# Target file size for compaction. +# target-file-size-base = "8MB" + +# Max bytes for compaction.max_compaction_bytes +# max-compaction-bytes = "2GB" + +# There are four different algorithms to pick files to compact. +# 0 : ByCompensatedSize +# 1 : OldestLargestSeqFirst +# 2 : OldestSmallestSeqFirst +# 3 : MinOverlappingRatio +# compaction-pri = 3 + +# block-cache used to cache uncompressed blocks, big block-cache can speed up read. +# in normal cases should tune to 30%-50% system's total memory. +# block-cache-size = "1GB" + +# Indicating if we'd put index/filter blocks to the block cache. +# If not specified, each "table reader" object will pre-load index/filter block +# during table initialization. +# cache-index-and-filter-blocks = true + +# Pin level0 filter and index blocks in cache. +# pin-l0-filter-and-index-blocks = true + +# Enable read amplication statistics. +# value => memory usage (percentage of loaded blocks memory) +# 1 => 12.50 % +# 2 => 06.25 % +# 4 => 03.12 % +# 8 => 01.56 % +# 16 => 00.78 % +# read-amp-bytes-per-bit = 0 + +# Pick target size of each level dynamically. +# dynamic-level-bytes = true + +# Options for Column Family write +# Column Family write used to store commit informations in MVCC model +[rocksdb.writecf] +# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"] +# block-size = "64KB" +# write-buffer-size = "128MB" +# max-write-buffer-number = 5 +# min-write-buffer-number-to-merge = 1 +# max-bytes-for-level-base = "512MB" +# target-file-size-base = "8MB" + +# in normal cases should tune to 10%-30% system's total memory. +# block-cache-size = "256MB" +# level0-file-num-compaction-trigger = 4 +# level0-slowdown-writes-trigger = 20 +# level0-stop-writes-trigger = 36 +# cache-index-and-filter-blocks = true +# pin-l0-filter-and-index-blocks = true +# compaction-pri = 3 +# read-amp-bytes-per-bit = 0 +# dynamic-level-bytes = true + +[rocksdb.lockcf] +# compression-per-level = ["no", "no", "no", "no", "no", "no", "no"] +# block-size = "16KB" +# write-buffer-size = "128MB" +# max-write-buffer-number = 5 +# min-write-buffer-number-to-merge = 1 +# max-bytes-for-level-base = "128MB" +# target-file-size-base = "8MB" +# block-cache-size = "256MB" +# level0-file-num-compaction-trigger = 1 +# level0-slowdown-writes-trigger = 20 +# level0-stop-writes-trigger = 36 +# cache-index-and-filter-blocks = true +# pin-l0-filter-and-index-blocks = true +# compaction-pri = 0 +# read-amp-bytes-per-bit = 0 +# dynamic-level-bytes = true + +[raftdb] +# max-sub-compactions = 1 +max-open-files = 1024 +# max-manifest-file-size = "20MB" +# create-if-missing = true + +# enable-statistics = true +# stats-dump-period = "10m" + +# compaction-readahead-size = 0 +# writable-file-max-buffer-size = "1MB" +# use-direct-io-for-flush-and-compaction = false +# enable-pipelined-write = true +# allow-concurrent-memtable-write = false +# bytes-per-sync = "0MB" +# wal-bytes-per-sync = "0KB" + +# info-log-max-size = "1GB" +# info-log-roll-time = "0" +# info-log-keep-log-file-num = 10 +# info-log-dir = "" + +[raftdb.defaultcf] +# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"] +# block-size = "64KB" +# write-buffer-size = "128MB" +# max-write-buffer-number = 5 +# min-write-buffer-number-to-merge = 1 +# max-bytes-for-level-base = "512MB" +# target-file-size-base = "8MB" + +# should tune to 256MB~2GB. +# block-cache-size = "256MB" +# level0-file-num-compaction-trigger = 4 +# level0-slowdown-writes-trigger = 20 +# level0-stop-writes-trigger = 36 +# cache-index-and-filter-blocks = true +# pin-l0-filter-and-index-blocks = true +# compaction-pri = 0 +# read-amp-bytes-per-bit = 0 +# dynamic-level-bytes = true + +[security] +# set the path for certificates. Empty string means disabling secure connectoins. +# ca-path = "" +# cert-path = "" +# key-path = "" + +[import] +# the directory to store importing kv data. +# import-dir = "/tmp/tikv/import" +# number of threads to handle RPC requests. +# num-threads = 8 +# stream channel window size, stream will be blocked on channel full. +# stream-channel-window = 128 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql new file mode 100644 index 000000000000..d1276baca7c9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/column_all_type.sql @@ -0,0 +1,119 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- +DROP DATABASE IF EXISTS column_type_test; +CREATE DATABASE column_type_test; +USE column_type_test; + +CREATE TABLE full_types ( + id INT AUTO_INCREMENT NOT NULL, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED , + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + int11_c INT(11) , + big_c BIGINT, + big_un_c BIGINT UNSIGNED, + varchar_c VARCHAR(255), + char_c CHAR(3), + real_c REAL, + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, + date_c DATE, + time_c TIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + file_uuid BINARY(16), + bit_c BIT(64), + text_c TEXT, + tiny_blob_c TINYBLOB, + blob_c BLOB, + medium_blob_c MEDIUMBLOB, + long_blob_c LONGBLOB, + year_c YEAR, + enum_c enum('red', 'white') default 'red', + set_c SET('a', 'b'), + json_c JSON, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO full_types VALUES ( + DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807, + 18446744073709551615, + 'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true, + '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100', + 'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021, + 'red', 'a,b,a', '{"key1": "value1"}' + ); + +CREATE TABLE full_types_sink ( + id INT AUTO_INCREMENT NOT NULL, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED , + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + int11_c INT(11) , + big_c BIGINT, + big_un_c BIGINT UNSIGNED, + varchar_c VARCHAR(255), + char_c CHAR(3), + real_c REAL, + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, + date_c DATE, + time_c TIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + file_uuid BINARY(16), + bit_c BIT(64), + text_c TEXT, + tiny_blob_c TINYBLOB, + blob_c BLOB, + medium_blob_c MEDIUMBLOB, + long_blob_c LONGBLOB, + year_c YEAR, + enum_c enum('red', 'white') default 'red', + set_c SET('a', 'b'), + json_c JSON, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql new file mode 100644 index 000000000000..d910a2c4ec55 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/ddl/inventory.sql @@ -0,0 +1,50 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE inventory; + +USE inventory; + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'seatunnel', + description VARCHAR(512), + weight DECIMAL(20, 10) +); +ALTER TABLE products AUTO_INCREMENT = 101; + +CREATE TABLE products_sink +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'seatunnel', + description VARCHAR(512), + weight DECIMAL(20, 10) +); + +INSERT INTO products +VALUES (default, "scooter", "Small 2-wheel scooter", 3.14), + (default, "car battery", "12V car battery", 8.1), + (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8), + (default, "hammer", "12oz carpenter's hammer", 0.75), + (default, "hammer", "14oz carpenter's hammer", 0.875), + (default, "hammer", "16oz carpenter's hammer", 1.0), + (default, "rocks", "box of assorted rocks", 5.3), + (default, "jacket", "water resistent black wind breaker", 0.1), + (default, "spare tire", "24 inch spare tire", 22.2); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf new file mode 100644 index 000000000000..2985731e87eb --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/all_types_tidb_source_to_sink.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + TiDB-CDC { + result_table_name = "trans_tidb_all_type_cdc" + base-url = "jdbc:mysql://tidb-e2e:4000/column_type_test" + driver = "com.mysql.cj.jdbc.Driver" + tikv.grpc.timeout_in_ms = 20000 + pd-addresses = "pd-e2e:2379" + username = "root" + password = "seatunnel" + database-name = "column_type_test" + table-name = "full_types" + } +} + +transform { +} + +sink { + jdbc { + source_table_name = "trans_tidb_all_type_cdc" + url = "jdbc:mysql://tidb-e2e:4000/column_type_test" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "seatunnel" + database = "column_type_test" + table = "full_types_sink" + generate_sink_sql = true + primary_keys = ["id"] + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf new file mode 100644 index 000000000000..a1eda3a053ff --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/resources/tidb/tidb_source_to_sink.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + TiDB-CDC { + result_table_name = "customers_tidb_cdc" + base-url = "jdbc:mysql://tidb-e2e:4000/inventory" + driver = "com.mysql.cj.jdbc.Driver" + tikv.grpc.timeout_in_ms = 20000 + pd-addresses = "pd-e2e:2379" + username = "root" + password = "seatunnel" + database-name = "inventory" + table-name = "products" + } +} + +transform { +} + +sink { + jdbc { + source_table_name = "trans_tidb_cdc" + url = "jdbc:mysql://tidb-e2e:4000/inventory" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "seatunnel" + database = "inventory" + table = "products_sink" + generate_sink_sql = true + primary_keys = ["id"] + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 2db67f88147c..78c8969fb64c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -76,6 +76,7 @@ connector-hudi-e2e connector-milvus-e2e connector-activemq-e2e + connector-cdc-tidb-e2e