Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 9, 2024
1 parent 959d79b commit 212b45d
Show file tree
Hide file tree
Showing 9 changed files with 491 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;

import java.util.List;

public class TiDBSource
implements SeaTunnelSource<SeaTunnelRow, TiDBSourceSplit, TiDBSourceCheckpointState>,
SupportParallelism,
Expand All @@ -45,12 +43,13 @@ public class TiDBSource
private JobContext jobContext;
private SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType;
private TiDBSourceConfig config;
private final CatalogTable catalogTable;
static final String IDENTIFIER = "TIDB-CDC";

public TiDBSource(
ReadonlyConfig config,
SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType,
List<CatalogTable> catalogTables) {
CatalogTable catalogTable) {

this.config =
TiDBSourceConfig.builder()
Expand All @@ -60,6 +59,7 @@ public TiDBSource(
.tiConfiguration(TiDBSourceOptions.getTiConfiguration(config))
.build();
this.seaTunnelDataType = seaTunnelDataType;
this.catalogTable = catalogTable;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.List;
import java.util.Collections;

@AutoService(Factory.class)
public class TiDBSourceFactory implements TableSourceFactory {
Expand Down Expand Up @@ -87,13 +91,21 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
ReadonlyConfig config = context.getOptions();
TiDBCatalogFactory catalogFactory = new TiDBCatalogFactory();
// Build tidb catalog.
TiDBCatalog catalog =
(TiDBCatalog) catalogFactory.createCatalog(factoryIdentifier(), config);

TablePath tablePath =
TablePath.of(
config.get(TiDBSourceOptions.DATABASE_NAME),
config.get(TiDBSourceOptions.TABLE_NAME));
CatalogTable catalogTable = catalog.getTable(tablePath);
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToMultipleRowType(catalogTables);
CatalogTableUtil.convertToDataType(Collections.singletonList(catalogTable));
return (SeaTunnelSource<T, SplitT, StateT>)
new TiDBSource(context.getOptions(), dataType, catalogTables);
new TiDBSource(context.getOptions(), dataType, catalogTable);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;

Expand All @@ -29,7 +30,7 @@
import java.util.Arrays;

/** TiDB source options */
public class TiDBSourceOptions extends SourceOptions {
public class TiDBSourceOptions extends JdbcSourceOptions {

public static final SingleChoiceOption<String> DATABASE_NAME =
(SingleChoiceOption<String>)
Expand Down
Loading

0 comments on commit 212b45d

Please sign in to comment.