Skip to content

Commit

Permalink
[FLINK-34572] Support OceanBase Jdbc Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jun 28, 2024
1 parent 1bab533 commit b8dca26
Show file tree
Hide file tree
Showing 20 changed files with 1,242 additions and 39 deletions.
45 changes: 41 additions & 4 deletions docs/content.zh/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,9 @@ JDBC Catalog

`JdbcCatalog` 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
目前,JDBC Catalog 有这几个实现:Postgres Catalog、MySQL Catalog、CrateDB Catalog 和 OceanBase Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。

```java
// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
Expand All @@ -451,17 +450,19 @@ tableExists(ObjectPath tablePath);

### JDBC Catalog 的使用

本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。
本小节主要描述如何创建并使用 Jdbc Catalog。
请参阅 [Dependencies](#dependencies) 部分了解如何配置 JDBC 连接器和相应的驱动。

JDBC catalog 支持以下参数:
- `name`:必填,catalog 的名称。
- `default-database`:必填,默认要连接的数据库。
- `username`:必填,Postgres/MySQL 账户的用户名
- `username`:必填,数据库账户的用户名
- `password`:必填,账户的密码。
- `base-url`:必填,(不应该包含数据库名)
- 对于 Postgres Catalog `base-url` 应为 `"jdbc:postgresql://<ip>:<port>"` 的格式。
- 对于 MySQL Catalog `base-url` 应为 `"jdbc:mysql://<ip>:<port>"` 的格式。
- 对于 OceanBase Catalog `base-url` 应为 `"jdbc:oceanbase://<ip>:<port>"` 的格式。
- `compatible-mode`: 选填,数据库的兼容模式。

{{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
{{< tab "SQL" >}}
Expand Down Expand Up @@ -657,6 +658,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```

<a name="jdbc-catalog-for-oceanbase"></a>

### JDBC Catalog for OceanBase

<a name="oceanbase-metaspace-mapping"></a>

#### OceanBase 元空间映射

OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL 模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle 模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。

在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用 `database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用 `schema.table_name` 或只使用 `table_name`

因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下:

| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) |
|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
| catalog name (defined in Flink only) | N/A | N/A |
| database name | database name | schema name |
| table name | table name | table name |

Flink 中的 OceanBase 表的完整路径应该是 ``"`<catalog>`.`<db or schema>`.`<table>`"``

这里提供了一些访问 OceanBase 表的例子:

```sql
-- 扫描 默认数据库 'mydb' 中的 'test_table' 表
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'given_database' 数据库中的 'test_table2' 表,
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;
```

<a name="data-type-mapping"></a>

数据类型映射
Expand Down
40 changes: 36 additions & 4 deletions docs/content/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,9 @@ JDBC Catalog

The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol.

Currently, there are two JDBC catalog implementations, Postgres Catalog and MySQL Catalog. They support the following catalog methods. Other methods are currently not supported.
Currently, there are following JDBC catalog implementations: Postgres Catalog, MySQL Catalog, CrateDB Catalog and OceanBase Catalog. They support the following catalog methods. Other methods are currently not supported.

```java
// The supported methods by Postgres & MySQL Catalog.
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
Expand All @@ -457,17 +456,19 @@ Other `Catalog` methods are currently not supported.

### Usage of JDBC Catalog

The section mainly describes how to create and use a Postgres Catalog or MySQL Catalog.
The section mainly describes how to create and use a Jdbc Catalog.
Please refer to [Dependencies](#dependencies) section for how to setup a JDBC connector and the corresponding driver.

The JDBC catalog supports the following options:
- `name`: required, name of the catalog.
- `default-database`: required, default database to connect to.
- `username`: required, username of Postgres/MySQL account.
- `username`: required, username of database account.
- `password`: required, password of the account.
- `base-url`: required (should not contain the database name)
- for Postgres Catalog this should be `"jdbc:postgresql://<ip>:<port>"`
- for MySQL Catalog this should be `"jdbc:mysql://<ip>:<port>"`
- for OceanBase Catalog this should be `jdbc:oceanbase://<ip>:<port>`
- `compatible-mode`: optional, the compatible mode of database.

{{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
{{< tab "SQL" >}}
Expand Down Expand Up @@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```

### JDBC Catalog for OceanBase

#### OceanBase Metaspace Mapping

OceanBase database supports multiple tenant management, and each tenant can work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of OceanBase, there are databases and tables but no schema in one tenant, these objects just like databases and tables in the MySQL database. On Oracle mode of OceanBase, there are schemas and tables but no database in one tenant, these objects just like schemas and tables in the Oracle database.

In Flink, when querying tables registered by OceanBase Catalog, users can use either `database.table_name` or just `table_name` on OceanBase MySQL mode, or use either `schema.table_name` or just `table_name` on OceanBase Oracle mode.

Therefore, the metaspace mapping between Flink Catalog and OceanBase is as following:

| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) |
|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
| catalog name (defined in Flink only) | N/A | N/A |
| database name | database name | schema name |
| table name | table name | table name |

The full path of OceanBase table in Flink should be "`<catalog>`.`<db or schema>`.`<table>`".

Here are some examples to access OceanBase tables:

```sql
-- scan table 'test_table', the default database or schema is 'mydb'.
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- scan table 'test_table' with the given database or schema.
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;
```

Data Type Mapping
----------------
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,23 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
primaryKey.ifPresent(
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
Schema tableSchema = schemaBuilder.build();

Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
props.put(URL.key(), dbUrl);
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}

protected Map<String, String> getOptions(ObjectPath tablePath) {
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
return props;
}

@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect;
import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
import org.apache.flink.connector.jdbc.databases.oceanbase.catalog.OceanBaseCatalog;
import org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseDialect;
import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
Expand Down Expand Up @@ -82,6 +84,14 @@ public static AbstractJdbcCatalog createCatalog(
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
} else if (dialect instanceof OceanBaseDialect) {
return new OceanBaseCatalog(
userClassLoader,
catalogName,
compatibleMode,
defaultDatabase,
baseUrl,
connectionProperties);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.databases.oceanbase.catalog;

import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import org.apache.commons.lang3.StringUtils;

import java.sql.DatabaseMetaData;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.COMPATIBLE_MODE;

/** Catalog for OceanBase. */
public class OceanBaseCatalog extends AbstractJdbcCatalog {

private static final Set<String> builtinDatabases =
new HashSet<String>() {
{
add("__public");
add("information_schema");
add("mysql");
add("oceanbase");
add("LBACSYS");
add("ORAAUDITOR");
}
};

private final String compatibleMode;
private final JdbcDialectTypeMapper dialectTypeMapper;

public OceanBaseCatalog(
ClassLoader userClassLoader,
String catalogName,
String compatibleMode,
String defaultDatabase,
String baseUrl,
Properties connectionProperties) {
super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
this.compatibleMode = compatibleMode;
this.dialectTypeMapper = new OceanBaseTypeMapper(compatibleMode);
}

private boolean isMySQLMode() {
return "mysql".equalsIgnoreCase(compatibleMode);
}

private String getConnUrl() {
return isMySQLMode() ? baseUrl : defaultUrl;
}

@Override
public List<String> listDatabases() throws CatalogException {
String query =
isMySQLMode()
? "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`"
: "SELECT USERNAME FROM ALL_USERS";
return extractColumnValuesBySQL(
getConnUrl(), query, 1, dbName -> !builtinDatabases.contains(dbName));
}

@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(
StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
String sql =
isMySQLMode()
? "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?"
: "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?";
return extractColumnValuesBySQL(getConnUrl(), sql, 1, null, databaseName);
}

@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
String query =
isMySQLMode()
? "SELECT TABLE_NAME FROM information_schema.`TABLES` "
+ "WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?"
: "SELECT TABLE_NAME FROM ALL_TABLES "
+ "WHERE OWNER = ? and TABLE_NAME = ?";
return !extractColumnValuesBySQL(
getConnUrl(),
query,
1,
null,
tablePath.getDatabaseName(),
tablePath.getObjectName())
.isEmpty();
}

@Override
protected Optional<UniqueConstraint> getPrimaryKey(
DatabaseMetaData metaData, String database, String schema, String table)
throws SQLException {
if (isMySQLMode()) {
return super.getPrimaryKey(metaData, database, null, table);
} else {
return super.getPrimaryKey(metaData, null, database, table);
}
}

@Override
protected Map<String, String> getOptions(ObjectPath tablePath) {
Map<String, String> options = super.getOptions(tablePath);
options.put(COMPATIBLE_MODE.key(), compatibleMode);
return options;
}

/** Converts OceanBase type to Flink {@link DataType}. */
@Override
protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
throws SQLException {
return dialectTypeMapper.mapping(tablePath, metadata, colIndex);
}

@Override
protected String getTableName(ObjectPath tablePath) {
return tablePath.getObjectName();
}

@Override
protected String getSchemaName(ObjectPath tablePath) {
return null;
}

@Override
protected String getSchemaTableName(ObjectPath tablePath) {
return tablePath.getObjectName();
}
}
Loading

0 comments on commit b8dca26

Please sign in to comment.