Skip to content

Commit

Permalink
support tidb cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 16, 2024
1 parent 3c78534 commit 7022ea2
Show file tree
Hide file tree
Showing 38 changed files with 3,051 additions and 747 deletions.
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ connector-cdc-mongodb
connector-cdc-sqlserver
connector-cdc-postgres
connector-cdc-oracle
connector-cdc-tidb
connector-clickhouse
connector-datahub
connector-dingtalk
Expand Down
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ seatunnel.source.Maxcompute = connector-maxcompute
seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.source.TiDB-CDC = connector-cdc-tidb
seatunnel.sink.S3Redshift = connector-s3-redshift
seatunnel.source.Web3j = connector-web3j
seatunnel.source.TDengine = connector-tdengine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
</dependency>

</dependencies>

</project>

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceCheckpointState;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;

import java.util.Collections;
import java.util.List;

public class TiDBSource
implements SeaTunnelSource<SeaTunnelRow, TiDBSourceSplit, TiDBSourceCheckpointState>,
SupportParallelism,
SupportColumnProjection {

static final String IDENTIFIER = "TiDB-CDC";

private TiDBSourceConfig config;
private final CatalogTable catalogTable;

public TiDBSource(ReadonlyConfig config, CatalogTable catalogTable) {

this.config =
TiDBSourceConfig.builder()
.startupMode(config.get(TiDBSourceOptions.STARTUP_MODE))
.databaseName(config.get(TiDBSourceOptions.DATABASE_NAME))
.tableName(config.get(TiDBSourceOptions.TABLE_NAME))
.tiConfiguration(TiDBSourceOptions.getTiConfiguration(config))
.build();
this.catalogTable = catalogTable;
}

/**
* Returns a unique identifier among same factory interfaces.
*
* <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
@Override
public String getPluginName() {
return IDENTIFIER;
}

/**
* Get the boundedness of this source.
*
* @return the boundedness of this source.
*/
@Override
public Boundedness getBoundedness() {
return Boundedness.UNBOUNDED;
}

/**
* Create source reader, used to produce data.
*
* @param context reader context.
* @return source reader.
* @throws Exception when create reader failed.
*/
@Override
public SourceReader<SeaTunnelRow, TiDBSourceSplit> createReader(SourceReader.Context context)
throws Exception {
return new TiDBSourceReader(context, config, catalogTable);
}

/**
* Create source split enumerator, used to generate splits. This method will be called only once
* when start a source.
*
* @param context enumerator context.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
@Override
public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> createEnumerator(
SourceSplitEnumerator.Context<TiDBSourceSplit> context) throws Exception {
return new TiDBSourceSplitEnumerator(context, config);
}

/**
* Create source split enumerator, used to generate splits. This method will be called when
* restore from checkpoint.
*
* @param context enumerator context.
* @param checkpointState checkpoint state.
* @return source split enumerator.
* @throws Exception when create enumerator failed.
*/
@Override
public SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> restoreEnumerator(
SourceSplitEnumerator.Context<TiDBSourceSplit> context,
TiDBSourceCheckpointState checkpointState)
throws Exception {
return new TiDBSourceSplitEnumerator(context, config, checkpointState);
}

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
}
}
Loading

0 comments on commit 7022ea2

Please sign in to comment.