From 43e0e5cf9b832854ea228fb093077872e3a311b6 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 5 Mar 2024 10:17:57 +0800 Subject: [PATCH] [Improve] support multi database sync (#322) --- .../doris/flink/tools/cdc/DatabaseSync.java | 46 +++++++++++-------- .../doris/flink/tools/cdc/SourceSchema.java | 4 ++ .../tools/cdc/mysql/MysqlDatabaseSync.java | 38 +++++++++------ .../flink/tools/cdc/mysql/MysqlSchema.java | 5 ++ .../flink/tools/cdc/oracle/OracleSchema.java | 5 ++ .../tools/cdc/postgres/PostgresSchema.java | 5 ++ .../tools/cdc/sqlserver/SqlServerSchema.java | 5 ++ .../flink/tools/cdc/MockSourceSchema.java | 36 +++++++++++++++ 8 files changed, 112 insertions(+), 32 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index d71335567..3291b4ca1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -115,14 +116,14 @@ public void build() throws Exception { List schemaList = getSchemaList(); Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized."); - if (!dorisSystem.databaseExists(database)) { + + if (!StringUtils.isNullOrWhitespaceOnly(database) + && !dorisSystem.databaseExists(database)) { LOG.info("database {} not exist, created", database); dorisSystem.createDatabase(database); } - List syncTables = new ArrayList<>(); - List dorisTables = new ArrayList<>(); - + List> dorisTables = new ArrayList<>(); Map tableBucketsMap = null; if (tableConfig.containsKey("table-buckets")) { tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); @@ -130,23 +131,32 @@ public void build() throws Exception { Set bucketsTable = new HashSet<>(); for (SourceSchema schema : schemaList) { syncTables.add(schema.getTableName()); + String targetDb = database; + // Synchronize multiple databases using the src database name + if (StringUtils.isNullOrWhitespaceOnly(targetDb)) { + targetDb = schema.getDatabaseName(); + } + if (StringUtils.isNullOrWhitespaceOnly(database) + && !dorisSystem.databaseExists(targetDb)) { + LOG.info("database {} not exist, created", targetDb); + dorisSystem.createDatabase(targetDb); + } String dorisTable = converter.convert(schema.getTableName()); - // Calculate the mapping relationship between upstream and downstream tables tableMapping.put( - schema.getTableIdentifier(), String.format("%s.%s", database, dorisTable)); - if (!dorisSystem.tableExists(database, dorisTable)) { + schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable)); + if (!dorisSystem.tableExists(targetDb, dorisTable)) { TableSchema dorisSchema = schema.convertTableSchema(tableConfig); // set doris target database - dorisSchema.setDatabase(database); + dorisSchema.setDatabase(targetDb); dorisSchema.setTable(dorisTable); if (tableBucketsMap != null) { setTableSchemaBuckets(tableBucketsMap, dorisSchema, dorisTable, bucketsTable); } dorisSystem.createTable(dorisSchema); } - if (!dorisTables.contains(dorisTable)) { - dorisTables.add(dorisTable); + if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) { + dorisTables.add(Tuple2.of(targetDb, dorisTable)); } } if (createTableOnly) { @@ -160,18 +170,18 @@ public void build() throws Exception { } else { SingleOutputStreamOperator parsedStream = streamSource.process(buildProcessFunction()); - for (String table : dorisTables) { + for (Tuple2 dbTbl : dorisTables) { OutputTag recordOutputTag = - ParsingProcessFunction.createRecordOutputTag(table); + ParsingProcessFunction.createRecordOutputTag(dbTbl.f1); DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag); int sinkParallel = sinkConfig.getInteger( DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); sideOutput - .sinkTo(buildDorisSink(table)) + .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1)) .setParallelism(sinkParallel) - .name(table) - .uid(table); + .name(dbTbl.f1) + .uid(dbTbl.f1); } } } @@ -205,7 +215,7 @@ public ParsingProcessFunction buildProcessFunction() { } /** create doris sink. */ - public DorisSink buildDorisSink(String table) { + public DorisSink buildDorisSink(String tableIdentifier) { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); String user = sinkConfig.getString(DorisConfigOptions.USERNAME); @@ -225,8 +235,8 @@ public DorisSink buildDorisSink(String table) { .ifPresent(dorisBuilder::setAutoRedirect); // single sink not need table identifier - if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) { - dorisBuilder.setTableIdentifier(database + "." + table); + if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) { + dorisBuilder.setTableIdentifier(tableIdentifier); } Properties pro = new Properties(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index e09eb00e2..cbef59fa2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -48,6 +48,10 @@ public SourceSchema( this.tableComment = tableComment; } + public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); + + public abstract String getCdcTableName(); + public String getTableIdentifier() { return getString(databaseName, schemaName, tableName); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index f00acfc71..8da57f6e8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -100,24 +100,34 @@ public Connection getConnection() throws SQLException { @Override public List getSchemaList() throws Exception { String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + List schemaList = new ArrayList<>(); try (Connection conn = getConnection()) { DatabaseMetaData metaData = conn.getMetaData(); - try (ResultSet tables = - metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) { - while (tables.next()) { - String tableName = tables.getString("TABLE_NAME"); - String tableComment = tables.getString("REMARKS"); - if (!isSyncNeeded(tableName)) { - continue; + try (ResultSet catalogs = metaData.getCatalogs()) { + while (catalogs.next()) { + String tableCatalog = catalogs.getString("TABLE_CAT"); + if (tableCatalog.matches(databaseName)) { + try (ResultSet tables = + metaData.getTables( + tableCatalog, null, "%", new String[] {"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String tableComment = tables.getString("REMARKS"); + if (!isSyncNeeded(tableName)) { + continue; + } + SourceSchema sourceSchema = + new MysqlSchema( + metaData, tableCatalog, tableName, tableComment); + sourceSchema.setModel( + !sourceSchema.primaryKeys.isEmpty() + ? DataModel.UNIQUE + : DataModel.DUPLICATE); + schemaList.add(sourceSchema); + } + } } - SourceSchema sourceSchema = - new MysqlSchema(metaData, databaseName, tableName, tableComment); - sourceSchema.setModel( - !sourceSchema.primaryKeys.isEmpty() - ? DataModel.UNIQUE - : DataModel.DUPLICATE); - schemaList.add(sourceSchema); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java index f84ca9431..3a9ffbd36 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java @@ -32,4 +32,9 @@ public MysqlSchema( public String convertToDorisType(String fieldType, Integer precision, Integer scale) { return MysqlType.toDorisType(fieldType, precision, scale); } + + @Override + public String getCdcTableName() { + return databaseName + "\\." + tableName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java index f843b6d25..e05918127 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java @@ -37,4 +37,9 @@ public OracleSchema( public String convertToDorisType(String fieldType, Integer precision, Integer scale) { return OracleType.toDorisType(fieldType, precision, scale); } + + @Override + public String getCdcTableName() { + return schemaName + "\\." + tableName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java index 32081164a..a431c4159 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java @@ -37,4 +37,9 @@ public PostgresSchema( public String convertToDorisType(String fieldType, Integer precision, Integer scale) { return PostgresType.toDorisType(fieldType, precision, scale); } + + @Override + public String getCdcTableName() { + return schemaName + "\\." + tableName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java index 6d5ab9aac..18131ce9a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java @@ -37,4 +37,9 @@ public SqlServerSchema( public String convertToDorisType(String fieldType, Integer precision, Integer scale) { return SqlServerType.toDorisType(fieldType, precision, scale); } + + @Override + public String getCdcTableName() { + return schemaName + "\\." + tableName; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java new file mode 100644 index 000000000..3be56e148 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MockSourceSchema.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +public class MockSourceSchema extends SourceSchema { + + public MockSourceSchema(String databaseName, String schemaName, String tableName) + throws Exception { + super(databaseName, schemaName, tableName, ""); + } + + @Override + public String convertToDorisType(String fieldType, Integer precision, Integer scale) { + return null; + } + + @Override + public String getCdcTableName() { + return databaseName + "\\." + tableName; + } +}