Skip to content

Commit

Permalink
[Improve] support multi database sync (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Mar 5, 2024
1 parent 182c274 commit 43e0e5c
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.tools.cdc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -115,38 +116,47 @@ public void build() throws Exception {

List<SourceSchema> schemaList = getSchemaList();
Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized.");
if (!dorisSystem.databaseExists(database)) {

if (!StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(database)) {
LOG.info("database {} not exist, created", database);
dorisSystem.createDatabase(database);
}

List<String> syncTables = new ArrayList<>();
List<String> dorisTables = new ArrayList<>();

List<Tuple2<String, String>> dorisTables = new ArrayList<>();
Map<String, Integer> tableBucketsMap = null;
if (tableConfig.containsKey("table-buckets")) {
tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets"));
}
Set<String> bucketsTable = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String targetDb = database;
// Synchronize multiple databases using the src database name
if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
targetDb = schema.getDatabaseName();
}
if (StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(targetDb)) {
LOG.info("database {} not exist, created", targetDb);
dorisSystem.createDatabase(targetDb);
}
String dorisTable = converter.convert(schema.getTableName());

// Calculate the mapping relationship between upstream and downstream tables
tableMapping.put(
schema.getTableIdentifier(), String.format("%s.%s", database, dorisTable));
if (!dorisSystem.tableExists(database, dorisTable)) {
schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable));
if (!dorisSystem.tableExists(targetDb, dorisTable)) {
TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
// set doris target database
dorisSchema.setDatabase(database);
dorisSchema.setDatabase(targetDb);
dorisSchema.setTable(dorisTable);
if (tableBucketsMap != null) {
setTableSchemaBuckets(tableBucketsMap, dorisSchema, dorisTable, bucketsTable);
}
dorisSystem.createTable(dorisSchema);
}
if (!dorisTables.contains(dorisTable)) {
dorisTables.add(dorisTable);
if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
dorisTables.add(Tuple2.of(targetDb, dorisTable));
}
}
if (createTableOnly) {
Expand All @@ -160,18 +170,18 @@ public void build() throws Exception {
} else {
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(buildProcessFunction());
for (String table : dorisTables) {
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(table);
ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
sideOutput
.sinkTo(buildDorisSink(table))
.sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
.setParallelism(sinkParallel)
.name(table)
.uid(table);
.name(dbTbl.f1)
.uid(dbTbl.f1);
}
}
}
Expand Down Expand Up @@ -205,7 +215,7 @@ public ParsingProcessFunction buildProcessFunction() {
}

/** create doris sink. */
public DorisSink<String> buildDorisSink(String table) {
public DorisSink<String> buildDorisSink(String tableIdentifier) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
Expand All @@ -225,8 +235,8 @@ public DorisSink<String> buildDorisSink(String table) {
.ifPresent(dorisBuilder::setAutoRedirect);

// single sink not need table identifier
if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) {
dorisBuilder.setTableIdentifier(database + "." + table);
if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) {
dorisBuilder.setTableIdentifier(tableIdentifier);
}

Properties pro = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public SourceSchema(
this.tableComment = tableComment;
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);

public abstract String getCdcTableName();

public String getTableIdentifier() {
return getString(databaseName, schemaName, tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,34 @@ public Connection getConnection() throws SQLException {
@Override
public List<SourceSchema> getSchemaList() throws Exception {
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);

List<SourceSchema> schemaList = new ArrayList<>();
try (Connection conn = getConnection()) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet tables =
metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
if (!isSyncNeeded(tableName)) {
continue;
try (ResultSet catalogs = metaData.getCatalogs()) {
while (catalogs.next()) {
String tableCatalog = catalogs.getString("TABLE_CAT");
if (tableCatalog.matches(databaseName)) {
try (ResultSet tables =
metaData.getTables(
tableCatalog, null, "%", new String[] {"TABLE"})) {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
if (!isSyncNeeded(tableName)) {
continue;
}
SourceSchema sourceSchema =
new MysqlSchema(
metaData, tableCatalog, tableName, tableComment);
sourceSchema.setModel(
!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE
: DataModel.DUPLICATE);
schemaList.add(sourceSchema);
}
}
}
SourceSchema sourceSchema =
new MysqlSchema(metaData, databaseName, tableName, tableComment);
sourceSchema.setModel(
!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE
: DataModel.DUPLICATE);
schemaList.add(sourceSchema);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public MysqlSchema(
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return MysqlType.toDorisType(fieldType, precision, scale);
}

@Override
public String getCdcTableName() {
return databaseName + "\\." + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public OracleSchema(
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return OracleType.toDorisType(fieldType, precision, scale);
}

@Override
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public PostgresSchema(
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return PostgresType.toDorisType(fieldType, precision, scale);
}

@Override
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public SqlServerSchema(
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return SqlServerType.toDorisType(fieldType, precision, scale);
}

@Override
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc;

public class MockSourceSchema extends SourceSchema {

public MockSourceSchema(String databaseName, String schemaName, String tableName)
throws Exception {
super(databaseName, schemaName, tableName, "");
}

@Override
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return null;
}

@Override
public String getCdcTableName() {
return databaseName + "\\." + tableName;
}
}

0 comments on commit 43e0e5c

Please sign in to comment.