Skip to content

Commit

Permalink
support tidb cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 11, 2024
1 parent 9ccf84d commit f6d647e
Show file tree
Hide file tree
Showing 19 changed files with 2,159 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceCheckpointState;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;

public class TiDBSource
implements SeaTunnelSource<SeaTunnelRow, TiDBSourceSplit, TiDBSourceCheckpointState>,
SupportParallelism,
SupportColumnProjection {

static final String IDENTIFIER = "TIDB-CDC";

private TiDBSourceConfig config;
private final CatalogTable catalogTable;

public TiDBSource(ReadonlyConfig config, CatalogTable catalogTable) {

this.config =
TiDBSourceConfig.builder()
.startupMode(config.get(TiDBSourceOptions.STARTUP_MODE))
.databaseName(config.get(TiDBSourceOptions.DATABASE_NAME))
.tableName(config.get(TiDBSourceOptions.TABLE_NAME))
.tiConfiguration(TiDBSourceOptions.getTiConfiguration(config))
.build();
this.catalogTable = catalogTable;
}

/**
* Returns a unique identifier among same factory interfaces.
*
* <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
@Override
public String getPluginName() {
return IDENTIFIER;
}

/**
* Get the boundedness of this source.
*
* @return the boundedness of this source.
*/
@Override
public Boundedness getBoundedness() {
return Boundedness.UNBOUNDED;
}

/**
* Create source reader, used to produce data.
*
* @param context reader context.
* @return source reader.
* @throws Exception when create reader failed.
*/
@Override
public SourceReader<SeaTunnelRow, TiDBSourceSplit> createReader(SourceReader.Context context)
throws Exception {
return new TiDBSourceReader(context, config, catalogTable);
}

/**
* Create source split enumerator, used to generate splits. This method will be called only once
* when start a source.
*
* @param context enumerator context.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
@Override
public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> createEnumerator(
SourceSplitEnumerator.Context<TiDBSourceSplit> context) throws Exception {
return new TiDBSourceSplitEnumerator(context, config);
}

/**
* Create source split enumerator, used to generate splits. This method will be called when
* restore from checkpoint.
*
* @param context enumerator context.
* @param checkpointState checkpoint state.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
@Override
public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> restoreEnumerator(
SourceSplitEnumerator.Context<TiDBSourceSplit> context,
TiDBSourceCheckpointState checkpointState)
throws Exception {
return new TiDBSourceSplitEnumerator(context, config, checkpointState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;

import com.google.auto.service.AutoService;

import java.io.Serializable;

@AutoService(Factory.class)
public class TiDBSourceFactory implements TableSourceFactory {
/**
* Returns a unique identifier among same factory interfaces.
*
* <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
@Override
public String factoryIdentifier() {
return TiDBSource.IDENTIFIER;
}

/**
* Returns the rule for options.
*
* <p>1. Used to verify whether the parameters configured by the user conform to the rules of
* the options;
*
* <p>2. Used for Web-UI to prompt user to configure option value;
*/
@Override
public OptionRule optionRule() {
return TiDBSourceOptions.getBaseRule()
.required(
TiDBSourceOptions.DATABASE_NAME,
TiDBSourceOptions.TABLE_NAME,
TiDBSourceOptions.PD_ADDRESSES)
.optional(
TiDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY,
TiDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY,
TiDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT,
TiDBSourceOptions.TIKV_GRPC_TIMEOUT,
TiDBSourceOptions.STARTUP_MODE)
.build();
}

/**
* TODO: Implement SupportParallelism in the TableSourceFactory instead of the SeaTunnelSource,
* Then deprecated the method
*/
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return TiDBSource.class;
}

@SuppressWarnings("unchecked")
@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> {
ReadonlyConfig config = context.getOptions();
TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
// Build tidb catalog.
TiDBCatalog catalog =
(TiDBCatalog) catalogFactory.createCatalog(factoryIdentifier(), config);

TablePath tablePath =
TablePath.of(
config.get(TiDBSourceOptions.DATABASE_NAME),
config.get(TiDBSourceOptions.TABLE_NAME));
CatalogTable catalogTable = catalog.getTable(tablePath);
return (SeaTunnelSource<T, SplitT, StateT>)
new TiDBSource(context.getOptions(), catalogTable);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;

import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;

import org.tikv.common.TiConfiguration;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public class TiDBSourceConfig {
private String databaseName;
private String tableName;
private StartupMode startupMode;
private TiConfiguration tiConfiguration;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config;

import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;

import org.tikv.common.ConfigUtils;
import org.tikv.common.TiConfiguration;

import java.util.Arrays;

/** TiDB source options */
public class TiDBSourceOptions extends JdbcSourceOptions {

public static final SingleChoiceOption<String> DATABASE_NAME =
(SingleChoiceOption<String>)
Options.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the TiDB server to monitor.");

public static final SingleChoiceOption<String> TABLE_NAME =
(SingleChoiceOption<String>)
Options.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the database to monitor.");

public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
Arrays.asList(StartupMode.INITIAL, StartupMode.SPECIFIC))
.defaultValue(StartupMode.INITIAL)
.withDescription(
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");

public static final SingleChoiceOption<String> PD_ADDRESSES =
(SingleChoiceOption<String>)
Options.key("pd-addresses")
.stringType()
.noDefaultValue()
.withDescription("TiKV cluster's PD address");

public static final SingleChoiceOption<Long> TIKV_GRPC_TIMEOUT =
(SingleChoiceOption<Long>)
Options.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
.longType()
.noDefaultValue()
.withDescription("TiKV GRPC timeout in ms");

public static final SingleChoiceOption<Long> TIKV_GRPC_SCAN_TIMEOUT =
(SingleChoiceOption<Long>)
Options.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
.longType()
.noDefaultValue()
.withDescription("TiKV GRPC scan timeout in ms");

public static final SingleChoiceOption<Integer> TIKV_BATCH_GET_CONCURRENCY =
(SingleChoiceOption<Integer>)
Options.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch get concurrency");

public static final SingleChoiceOption<Integer> TIKV_BATCH_SCAN_CONCURRENCY =
(SingleChoiceOption<Integer>)
Options.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
.intType()
.noDefaultValue()
.withDescription("TiKV GRPC batch scan concurrency");

public static TiConfiguration getTiConfiguration(final ReadonlyConfig configuration) {
final String pdAddrsStr = configuration.get(PD_ADDRESSES);
final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
configuration
.getOptional(TIKV_BATCH_GET_CONCURRENCY)
.ifPresent(tiConf::setBatchGetConcurrency);

configuration
.getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
.ifPresent(tiConf::setBatchScanConcurrency);
return tiConf;
}
}
Loading

0 comments on commit f6d647e

Please sign in to comment.