diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md index f8f517bfe..0887b6e0b 100644 --- a/docs/content.zh/docs/connectors/table/jdbc.md +++ b/docs/content.zh/docs/connectors/table/jdbc.md @@ -432,10 +432,9 @@ JDBC Catalog `JdbcCatalog` 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。 -目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。 +目前,JDBC Catalog 有四个实现:Postgres Catalog、MySQL Catalog、CrateDB Catalog 和 OceanBase Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。 ```java -// Postgres Catalog & MySQL Catalog 支持的方法 databaseExists(String databaseName); listDatabases(); getDatabase(String databaseName); @@ -450,17 +449,19 @@ tableExists(ObjectPath tablePath); ### JDBC Catalog 的使用 -本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。 +本小节主要描述如何创建并使用 JDBC Catalog。 请参阅 [依赖](#依赖) 部分了解如何配置 JDBC 连接器和相应的驱动。 JDBC catalog 支持以下参数: - `name`:必填,catalog 的名称。 - `default-database`:必填,默认要连接的数据库。 -- `username`:必填,Postgres/MySQL 账户的用户名。 +- `username`:必填,数据库账户的用户名。 - `password`:必填,账户的密码。 - `base-url`:必填,(不应该包含数据库名) - 对于 Postgres Catalog `base-url` 应为 `"jdbc:postgresql://:"` 的格式。 - 对于 MySQL Catalog `base-url` 应为 `"jdbc:mysql://:"` 的格式。 + - 对于 OceanBase Catalog `base-url` 应为 `"jdbc:oceanbase://:"` 的格式。 +- `compatible-mode`: 选填,数据库的兼容模式。 {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}} {{< tab "SQL" >}} @@ -656,6 +657,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2` SELECT * FROM crate.`custom_schema.test_table2`; SELECT * FROM `custom_schema.test_table2`; ``` + + + +### JDBC Catalog for OceanBase + + + +#### OceanBase 元空间映射 + +OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL 模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle 模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。 + +在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用 `database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用 `schema.table_name` 或只使用 `table_name`。 + +因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下: + +| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) | +|:-------------------------------------|:-------------------------------------------|---------------------------------------------| +| catalog name (defined in Flink only) | N/A | N/A | +| database name | database name | schema name | +| table name | table name | table name | + +Flink 中的 OceanBase 表的完整路径应该是 ``"``.``.``"``。 + +这里提供了一些访问 OceanBase 表的例子: + +```sql +-- 扫描 默认数据库 'mydb' 中的 'test_table' 表 +SELECT * FROM oceanbase_catalog.mydb.test_table; +SELECT * FROM mydb.test_table; +SELECT * FROM test_table; + +-- 扫描 'given_database' 数据库中的 'test_table2' 表, +SELECT * FROM oceanbase_catalog.given_database.test_table2; +SELECT * FROM given_database.test_table2; +``` + 数据类型映射 diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md index 056f2ab59..258e996cc 100644 --- a/docs/content/docs/connectors/table/jdbc.md +++ b/docs/content/docs/connectors/table/jdbc.md @@ -441,10 +441,9 @@ JDBC Catalog The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol. -Currently, there are two JDBC catalog implementations, Postgres Catalog and MySQL Catalog. They support the following catalog methods. Other methods are currently not supported. +Currently, there are following JDBC catalog implementations: Postgres Catalog, MySQL Catalog, CrateDB Catalog and OceanBase Catalog. They support the following catalog methods. Other methods are currently not supported. ```java -// The supported methods by Postgres & MySQL Catalog. databaseExists(String databaseName); listDatabases(); getDatabase(String databaseName); @@ -457,17 +456,19 @@ Other `Catalog` methods are currently not supported. ### Usage of JDBC Catalog -The section mainly describes how to create and use a Postgres Catalog or MySQL Catalog. +The section mainly describes how to create and use a JDBC Catalog. Please refer to [Dependencies](#dependencies) section for how to setup a JDBC connector and the corresponding driver. The JDBC catalog supports the following options: - `name`: required, name of the catalog. - `default-database`: required, default database to connect to. -- `username`: required, username of Postgres/MySQL account. +- `username`: required, username of database account. - `password`: required, password of the account. - `base-url`: required (should not contain the database name) - for Postgres Catalog this should be `"jdbc:postgresql://:"` - for MySQL Catalog this should be `"jdbc:mysql://:"` + - for OceanBase Catalog this should be `jdbc:oceanbase://:` +- `compatible-mode`: optional, the compatible mode of database. {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}} {{< tab "SQL" >}} @@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`; SELECT * FROM `custom_schema.test_table2`; ``` +### JDBC Catalog for OceanBase + +#### OceanBase Metaspace Mapping + +OceanBase database supports multiple tenant management, and each tenant can work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of OceanBase, there are databases and tables but no schema in one tenant, these objects just like databases and tables in the MySQL database. On Oracle mode of OceanBase, there are schemas and tables but no database in one tenant, these objects just like schemas and tables in the Oracle database. + +In Flink, when querying tables registered by OceanBase Catalog, users can use either `database.table_name` or just `table_name` on OceanBase MySQL mode, or use either `schema.table_name` or just `table_name` on OceanBase Oracle mode. + +Therefore, the metaspace mapping between Flink Catalog and OceanBase is as following: + +| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) | +|:-------------------------------------|:-------------------------------------------|---------------------------------------------| +| catalog name (defined in Flink only) | N/A | N/A | +| database name | database name | schema name | +| table name | table name | table name | + +The full path of OceanBase table in Flink should be "``.``.`
`". + +Here are some examples to access OceanBase tables: + +```sql +-- scan table 'test_table', the default database or schema is 'mydb'. +SELECT * FROM oceanbase_catalog.mydb.test_table; +SELECT * FROM mydb.test_table; +SELECT * FROM test_table; + +-- scan table 'test_table' with the given database or schema. +SELECT * FROM oceanbase_catalog.given_database.test_table2; +SELECT * FROM given_database.test_table2; +``` + Data Type Mapping ---------------- Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily. diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index a05214846..cb692ce31 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; @@ -87,6 +88,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Abstract catalog for any JDBC catalogs. */ +@PublicEvolving public abstract class AbstractJdbcCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); @@ -298,20 +300,23 @@ public CatalogBaseTable getTable(ObjectPath tablePath) primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); Schema tableSchema = schemaBuilder.build(); - - Map props = new HashMap<>(); - props.put(CONNECTOR.key(), IDENTIFIER); - props.put(URL.key(), dbUrl); - props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY)); - props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY)); - props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); - return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); + return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath)); } catch (Exception e) { throw new CatalogException( String.format("Failed getting table %s", tablePath.getFullName()), e); } } + protected Map getOptions(ObjectPath tablePath) { + Map props = new HashMap<>(); + props.put(CONNECTOR.key(), IDENTIFIER); + props.put(URL.key(), baseUrl + tablePath.getDatabaseName()); + props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY)); + props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY)); + props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); + return props; + } + @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index a1dbf5af3..218340043 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -18,12 +18,6 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog; -import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect; -import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog; -import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; -import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog; -import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; @@ -73,18 +67,7 @@ public static AbstractJdbcCatalog createCatalog( Properties connectionProperties) { JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, compatibleMode, userClassLoader); - if (dialect instanceof PostgresDialect) { - return new PostgresCatalog( - userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); - } else if (dialect instanceof CrateDBDialect) { - return new CrateDBCatalog( - userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); - } else if (dialect instanceof MySqlDialect) { - return new MySqlCatalog( - userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); - } else { - throw new UnsupportedOperationException( - String.format("Catalog for '%s' is not supported yet.", dialect)); - } + return dialect.createCatalog( + userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java index 7592cf2ee..753862a92 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java @@ -18,10 +18,13 @@ package org.apache.flink.connector.jdbc.databases.cratedb.dialect; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog; import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect; import org.apache.flink.table.types.logical.RowType; import java.util.Optional; +import java.util.Properties; /** JDBC dialect for CrateDB. */ public class CrateDBDialect extends AbstractPostgresCompatibleDialect { @@ -33,6 +36,17 @@ public String dialectName() { return "CrateDB"; } + @Override + public AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + return new CrateDBCatalog( + userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); + } + @Override public CrateDBRowConverter getRowConverter(RowType rowType) { return new CrateDBRowConverter(rowType); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java index 6d79d3000..0bcec8d48 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java @@ -19,7 +19,9 @@ package org.apache.flink.connector.jdbc.databases.mysql.dialect; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -27,6 +29,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -50,6 +53,17 @@ public class MySqlDialect extends AbstractDialect { // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements"; + @Override + public AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + return new MySqlCatalog( + userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); + } + @Override public JdbcRowConverter getRowConverter(RowType rowType) { return new MySQLRowConverter(rowType); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalog.java new file mode 100644 index 000000000..ded0266c0 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalog.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.COMPATIBLE_MODE; + +/** Catalog for OceanBase. */ +public class OceanBaseCatalog extends AbstractJdbcCatalog { + + private static final Set builtinDatabases = + new HashSet() { + { + add("__public"); + add("information_schema"); + add("mysql"); + add("oceanbase"); + add("LBACSYS"); + add("ORAAUDITOR"); + } + }; + + private final String compatibleMode; + private final JdbcDialectTypeMapper dialectTypeMapper; + + public OceanBaseCatalog( + ClassLoader userClassLoader, + String catalogName, + String compatibleMode, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); + this.compatibleMode = compatibleMode; + this.dialectTypeMapper = new OceanBaseTypeMapper(compatibleMode); + } + + private boolean isMySQLMode() { + return "mysql".equalsIgnoreCase(compatibleMode); + } + + private String getConnUrl() { + return isMySQLMode() ? baseUrl : defaultUrl; + } + + @Override + public List listDatabases() throws CatalogException { + String query = + isMySQLMode() + ? "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`" + : "SELECT USERNAME FROM ALL_USERS"; + return extractColumnValuesBySQL( + getConnUrl(), query, 1, dbName -> !builtinDatabases.contains(dbName)); + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkState( + StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + String sql = + isMySQLMode() + ? "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?" + : "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?"; + return extractColumnValuesBySQL(getConnUrl(), sql, 1, null, databaseName); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + String query = + isMySQLMode() + ? "SELECT TABLE_NAME FROM information_schema.`TABLES` " + + "WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?" + : "SELECT TABLE_NAME FROM ALL_TABLES " + + "WHERE OWNER = ? and TABLE_NAME = ?"; + return !extractColumnValuesBySQL( + getConnUrl(), + query, + 1, + null, + tablePath.getDatabaseName(), + tablePath.getObjectName()) + .isEmpty(); + } + + @Override + protected Optional getPrimaryKey( + DatabaseMetaData metaData, String database, String schema, String table) + throws SQLException { + if (isMySQLMode()) { + return super.getPrimaryKey(metaData, database, null, table); + } else { + return super.getPrimaryKey(metaData, null, database, table); + } + } + + @Override + protected Map getOptions(ObjectPath tablePath) { + Map options = super.getOptions(tablePath); + options.put(COMPATIBLE_MODE.key(), compatibleMode); + return options; + } + + /** Converts OceanBase type to Flink {@link DataType}. */ + @Override + protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + return dialectTypeMapper.mapping(tablePath, metadata, colIndex); + } + + @Override + protected String getTableName(ObjectPath tablePath) { + return tablePath.getObjectName(); + } + + @Override + protected String getSchemaName(ObjectPath tablePath) { + return null; + } + + @Override + protected String getSchemaTableName(ObjectPath tablePath) { + return tablePath.getObjectName(); + } +} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseTypeMapper.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseTypeMapper.java new file mode 100644 index 000000000..56835ba56 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseTypeMapper.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; + +/** OceanBaseTypeMapper util class. */ +@Internal +public class OceanBaseTypeMapper implements JdbcDialectTypeMapper { + + private static final int RAW_TIME_LENGTH = 10; + private static final int RAW_TIMESTAMP_LENGTH = 19; + + private static final int TYPE_BINARY_FLOAT = 100; + private static final int TYPE_BINARY_DOUBLE = 101; + + private final String compatibleMode; + + public OceanBaseTypeMapper(String compatibleMode) { + this.compatibleMode = compatibleMode; + } + + @Override + public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + String typeName = metadata.getColumnTypeName(colIndex).toUpperCase(); + int jdbcType = metadata.getColumnType(colIndex); + String columnName = metadata.getColumnName(colIndex); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (jdbcType) { + case Types.BIT: + return DataTypes.BOOLEAN(); + case Types.TINYINT: + return isUnsignedType(typeName) || precision > 4 + ? DataTypes.SMALLINT() + : DataTypes.TINYINT(); + case Types.SMALLINT: + return isUnsignedType(typeName) ? DataTypes.INT() : DataTypes.SMALLINT(); + case Types.INTEGER: + return !typeName.toUpperCase().startsWith("MEDIUMINT") && isUnsignedType(typeName) + ? DataTypes.BIGINT() + : DataTypes.INT(); + case Types.BIGINT: + return isUnsignedType(typeName) ? DataTypes.DECIMAL(20, 0) : DataTypes.BIGINT(); + case Types.FLOAT: + case Types.NUMERIC: + case Types.DECIMAL: + if ("mysql".equalsIgnoreCase(compatibleMode)) { + return isUnsignedType(typeName) + ? getDecimalType(precision + 1, scale) + : getDecimalType(precision, scale); + } + return getNumericType(precision, scale); + case Types.REAL: + case TYPE_BINARY_FLOAT: + return DataTypes.FLOAT(); + case Types.DOUBLE: + case TYPE_BINARY_DOUBLE: + return DataTypes.DOUBLE(); + case Types.DATE: + return "YEAR".equals(typeName) ? DataTypes.INT() : DataTypes.DATE(); + case Types.TIME: + return isExplicitPrecision(precision, RAW_TIME_LENGTH) + ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1) + : DataTypes.TIME(0); + case Types.TIMESTAMP: + return typeName.equalsIgnoreCase("DATE") + ? DataTypes.DATE() + : isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH) + ? DataTypes.TIMESTAMP(precision - RAW_TIMESTAMP_LENGTH - 1) + : DataTypes.TIMESTAMP(0); + case Types.CHAR: + case Types.NCHAR: + return DataTypes.CHAR(precision); + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + return precision > 0 ? DataTypes.VARCHAR(precision) : DataTypes.STRING(); + case Types.CLOB: + return DataTypes.STRING(); + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + case Types.BLOB: + return DataTypes.BYTES(); + default: + throw new UnsupportedOperationException( + String.format( + "Doesn't support type '%s' on column '%s'.", typeName, columnName)); + } + } + + private DataType getNumericType(int precision, int scale) { + if (precision == 0) { + return DataTypes.STRING(); + } + if (scale <= 0) { + int width = precision - scale; + if (width < 3) { + return DataTypes.TINYINT(); + } else if (width < 5) { + return DataTypes.SMALLINT(); + } else if (width < 10) { + return DataTypes.INT(); + } else if (width < 19) { + return DataTypes.BIGINT(); + } + } + return getDecimalType(precision, scale); + } + + private DataType getDecimalType(int precision, int scale) { + if (precision >= DecimalType.MAX_PRECISION || precision == 0) { + return DataTypes.STRING(); + } + return DataTypes.DECIMAL(precision, scale); + } + + private boolean isUnsignedType(String typeName) { + return typeName.toUpperCase().contains("UNSIGNED"); + } + + private boolean isExplicitPrecision(int precision, int defaultPrecision) { + return precision > defaultPrecision + && (precision - defaultPrecision - 1 + <= ("mysql".equalsIgnoreCase(compatibleMode) ? 6 : 9)); + } +} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java index 885d33927..a428c12fc 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java @@ -19,8 +19,10 @@ package org.apache.flink.connector.jdbc.databases.oceanbase.dialect; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; +import org.apache.flink.connector.jdbc.databases.oceanbase.catalog.OceanBaseCatalog; import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -30,6 +32,7 @@ import java.util.EnumSet; import java.util.Optional; +import java.util.Properties; import java.util.Set; /** JDBC dialect for OceanBase. */ @@ -38,9 +41,11 @@ public class OceanBaseDialect extends AbstractDialect { private static final long serialVersionUID = 1L; + private final String compatibleMode; private final AbstractDialect dialect; public OceanBaseDialect(@Nonnull String compatibleMode) { + this.compatibleMode = compatibleMode; switch (compatibleMode.toLowerCase()) { case "mysql": this.dialect = new MySqlDialect(); @@ -59,6 +64,22 @@ public String dialectName() { return "OceanBase"; } + @Override + public AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + return new OceanBaseCatalog( + userClassLoader, + catalogName, + compatibleMode, + defaultDatabase, + baseUrl, + connectionProperties); + } + @Override public Optional defaultDriverName() { return Optional.of("com.oceanbase.jdbc.Driver"); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java index 95981217f..d733cd49a 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java @@ -69,7 +69,10 @@ public JdbcDeserializationConverter createInternalConverter(LogicalType type) { case SMALLINT: return val -> val instanceof Number ? ((Number) val).shortValue() : val; case INTEGER: - return val -> val instanceof Number ? ((Number) val).intValue() : val; + return val -> + val instanceof Number + ? ((Number) val).intValue() + : val instanceof Date ? ((Date) val).toLocalDate().getYear() : val; case BIGINT: return val -> val instanceof Number ? ((Number) val).longValue() : val; case DECIMAL: diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java index d0924aee8..0af2a0d45 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java @@ -19,10 +19,13 @@ package org.apache.flink.connector.jdbc.databases.postgres.dialect; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog; import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect; import org.apache.flink.table.types.logical.RowType; import java.util.Optional; +import java.util.Properties; /** JDBC dialect for PostgreSQL. */ @Internal @@ -30,6 +33,17 @@ public class PostgresDialect extends AbstractPostgresCompatibleDialect { private static final long serialVersionUID = 1L; + @Override + public AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + return new PostgresCatalog( + userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); + } + @Override public PostgresRowConverter getRowConverter(RowType rowType) { return new PostgresRowConverter(rowType); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java index 6cc6bbd57..5438a95b5 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java @@ -19,12 +19,14 @@ package org.apache.flink.connector.jdbc.dialect; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; import java.io.Serializable; import java.util.Optional; +import java.util.Properties; /** * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable @@ -42,6 +44,26 @@ public interface JdbcDialect extends Serializable { */ String dialectName(); + /** + * Creates a JDBC catalog instance from given information. + * + * @param userClassLoader the classloader used to load JDBC driver + * @param catalogName the registered catalog name + * @param defaultDatabase the default database name + * @param baseUrl the base URL of the database, e.g. jdbc:mysql://localhost:3306 + * @param connectionProperties the properties used to connect the database + * @return A JDBC catalog instance. + */ + default AbstractJdbcCatalog createCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectionProperties) { + throw new UnsupportedOperationException( + String.format("Catalog for '%s' is not supported yet.", dialectName())); + } + /** * Get converter that convert jdbc object and Flink internal object each other. * diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java index 4cac1d60e..036cb5875 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java @@ -31,7 +31,7 @@ @ExtendWith(OceanBaseDatabase.class) public interface OceanBaseMysqlTestBase extends DatabaseTest { - default TableRow tableRow(String name, TableField... fields) { + static TableRow tableRow(String name, TableField... fields) { return new OceanBaseTableRow("mysql", name, fields); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java index 31aad559c..2ceeeeb9a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java @@ -31,7 +31,7 @@ @ExtendWith(OceanBaseTestDatabase.class) public interface OceanBaseOracleTestBase extends DatabaseTest { - default TableRow tableRow(String name, TableField... fields) { + static TableRow tableRow(String name, TableField... fields) { return new OceanBaseTableRow("oracle", name, fields); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalogITCaseBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalogITCaseBase.java new file mode 100644 index 000000000..f16270d2e --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseCatalogITCaseBase.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test base for {@link OceanBaseCatalog}. */ +public abstract class OceanBaseCatalogITCaseBase implements JdbcITCaseBase, DatabaseTest { + + private final String catalogName; + private final String compatibleMode; + private final String defaultDatabase; + + public OceanBaseCatalogITCaseBase( + String catalogName, String compatibleMode, String defaultDatabase) { + this.catalogName = catalogName; + this.compatibleMode = compatibleMode; + this.defaultDatabase = defaultDatabase; + } + + protected OceanBaseCatalog catalog; + protected TableEnvironment tEnv; + + protected abstract TableRow allTypesSourceTable(); + + protected abstract TableRow allTypesSinkTable(); + + protected abstract List allTypesTableRows(); + + protected void before() {} + + protected void after() {} + + @BeforeEach + void setup() { + before(); + + try (Connection conn = getMetadata().getConnection()) { + allTypesSourceTable().insertIntoTableValues(conn, allTypesTableRows()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + catalog = + new OceanBaseCatalog( + Thread.currentThread().getContextClassLoader(), + catalogName, + compatibleMode, + defaultDatabase, + getMetadata() + .getJdbcUrl() + .substring(0, getMetadata().getJdbcUrl().lastIndexOf("/")), + getBriefAuthProperties( + getMetadata().getUsername(), getMetadata().getPassword())); + + tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tEnv.registerCatalog(catalogName, catalog); + tEnv.useCatalog(catalogName); + } + + @AfterEach + void cleanup() { + after(); + } + + @Test + void testGetDb_DatabaseNotExistException() { + String databaseNotExist = "nonexistent"; + assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist)) + .satisfies( + anyCauseMatches( + DatabaseNotExistException.class, + String.format( + "Database %s does not exist in Catalog", + databaseNotExist))); + } + + @Test + void testDbExists() { + String databaseNotExist = "nonexistent"; + assertThat(catalog.databaseExists(databaseNotExist)).isFalse(); + assertThat(catalog.databaseExists(defaultDatabase)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(defaultDatabase); + assertThat(actual) + .containsAll( + getManagedTables().stream() + .map(TableManaged::getTableName) + .collect(Collectors.toList())); + } + + @Test + void testListTables_DatabaseNotExistException() { + String anyDatabase = "anyDatabase"; + assertThatThrownBy(() -> catalog.listTables(anyDatabase)) + .satisfies( + anyCauseMatches( + DatabaseNotExistException.class, + String.format( + "Database %s does not exist in Catalog", anyDatabase))); + } + + @Test + void testTableExists() { + String tableNotExist = "nonexist"; + assertThat(catalog.tableExists(new ObjectPath(defaultDatabase, tableNotExist))).isFalse(); + assertThat( + catalog.tableExists( + new ObjectPath( + defaultDatabase, allTypesSourceTable().getTableName()))) + .isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + String anyTableNotExist = "anyTable"; + assertThatThrownBy( + () -> catalog.getTable(new ObjectPath(defaultDatabase, anyTableNotExist))) + .satisfies( + anyCauseMatches( + TableNotExistException.class, + String.format( + "Table (or view) %s.%s does not exist in Catalog", + defaultDatabase, anyTableNotExist))); + } + + @Test + void testGetTables_TableNotExistException_NoDb() { + String databaseNotExist = "nonexistdb"; + String tableNotExist = "anyTable"; + assertThatThrownBy(() -> catalog.getTable(new ObjectPath(databaseNotExist, tableNotExist))) + .satisfies( + anyCauseMatches( + TableNotExistException.class, + String.format( + "Table (or view) %s.%s does not exist in Catalog", + databaseNotExist, tableNotExist))); + } + + @Test + void testGetTable() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(defaultDatabase, allTypesSourceTable().getTableName())); + assertThat(table.getUnresolvedSchema().getColumns()) + .isEqualTo(allTypesSourceTable().getTableSchema().getColumns()); + } + + // ------ test select query. ------ + + @Test + void testWithoutCatalogDB() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s", + allTypesSourceTable().getTableName())) + .execute() + .collect()); + + assertThat(results).isEqualTo(allTypesTableRows()); + } + + @Test + void testWithoutCatalog() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`", + defaultDatabase, + allTypesSourceTable().getTableName())) + .execute() + .collect()); + assertThat(results).isEqualTo(allTypesTableRows()); + } + + @Test + void testFullPath() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s.%s.`%s`", + catalogName, + defaultDatabase, + allTypesSourceTable().getTableName())) + .execute() + .collect()); + assertThat(results).isEqualTo(allTypesTableRows()); + } + + @Test + void testSelectToInsert() throws Exception { + String sql = + String.format( + "insert into `%s` select * from `%s`", + allTypesSinkTable().getTableName(), allTypesSourceTable().getTableName()); + tEnv.executeSql(sql).await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s", + allTypesSinkTable().getTableName())) + .execute() + .collect()); + assertThat(results).isEqualTo(allTypesTableRows()); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseMysqlCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseMysqlCatalogITCase.java new file mode 100644 index 000000000..e2397c0cb --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseMysqlCatalogITCase.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase.tableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.assertj.core.api.Assertions.assertThat; + +/** E2E test for {@link OceanBaseCatalog} with OceanBase MySql mode. */ +public class OceanBaseMysqlCatalogITCase extends OceanBaseCatalogITCaseBase + implements OceanBaseMysqlTestBase { + + private static final String DEFAULT_DB = "test"; + private static final String TEST_DB = "catalog_test"; + + private static final TableRow TABLE_ALL_TYPES = createTableAllTypeTable("t_all_types"); + private static final TableRow TABLE_ALL_TYPES_SINK = + createTableAllTypeTable("t_all_types_sink"); + private static final TableRow TABLE_GROUPED_BY_SINK = createGroupedTable("t_grouped_by_sink"); + private static final TableRow TABLE_PK = createGroupedTable("t_pk"); + private static final TableRow TABLE_PK2 = + TableBuilder.tableRow( + "t_pk", + pkField( + "pid", + dbType("int(11) NOT NULL AUTO_INCREMENT"), + DataTypes.BIGINT().notNull()), + field("col_varchar", dbType("varchar(255)"), DataTypes.BIGINT())); + + private static final List TABLE_ALL_TYPES_ROWS = + Arrays.asList( + Row.ofKind( + RowKind.INSERT, + 1L, + -1L, + new BigDecimal(1), + null, + true, + null, + "hello", + Date.valueOf("2021-08-04").toLocalDate(), + 2024, + LocalDateTime.parse("2021-08-04T01:54:16"), + new BigDecimal(-1), + new BigDecimal(1), + -1.0d, + 1.0d, + "enum2", + -9.1f, + 9.1f, + -1, + 1L, + -1, + 1L, + null, + "col_longtext", + null, + -1, + 1, + "col_mediumtext", + new BigDecimal(-99), + new BigDecimal(99), + -1.0d, + 1.0d, + "set_ele1", + Short.parseShort("-1"), + 1, + "col_text", + Time.valueOf("10:32:34").toLocalTime(), + LocalDateTime.parse("2021-08-04T01:54:16"), + "col_tinytext", + Byte.parseByte("-1"), + Short.parseShort("1"), + null, + "col_varchar", + LocalDateTime.parse("2021-08-04T01:54:16.463"), + Time.valueOf("09:33:43").toLocalTime(), + LocalDateTime.parse("2021-08-04T01:54:16.463"), + null), + Row.ofKind( + RowKind.INSERT, + 2L, + -1L, + new BigDecimal(1), + null, + true, + null, + "hello", + Date.valueOf("2021-08-04").toLocalDate(), + 2024, + LocalDateTime.parse("2021-08-04T01:53:19"), + new BigDecimal(-1), + new BigDecimal(1), + -1.0d, + 1.0d, + "enum2", + -9.1f, + 9.1f, + -1, + 1L, + -1, + 1L, + null, + "col_longtext", + null, + -1, + 1, + "col_mediumtext", + new BigDecimal(-99), + new BigDecimal(99), + -1.0d, + 1.0d, + "set_ele1,set_ele12", + Short.parseShort("-1"), + 1, + "col_text", + Time.valueOf("10:32:34").toLocalTime(), + LocalDateTime.parse("2021-08-04T01:53:19"), + "col_tinytext", + Byte.parseByte("-1"), + Short.parseShort("1"), + null, + "col_varchar", + LocalDateTime.parse("2021-08-04T01:53:19.098"), + Time.valueOf("09:33:43").toLocalTime(), + LocalDateTime.parse("2021-08-04T01:53:19.098"), + null)); + + private static TableRow createTableAllTypeTable(String tableName) { + return tableRow( + tableName, + pkField( + "pid", + dbType("bigint(20) NOT NULL AUTO_INCREMENT"), + DataTypes.BIGINT().notNull()), + field("col_bigint", dbType("bigint(20)"), DataTypes.BIGINT()), + field( + "col_bigint_unsigned", + dbType("bigint(20) unsigned"), + DataTypes.DECIMAL(20, 0)), + field("col_binary", dbType("binary(100)"), DataTypes.BYTES()), + field("col_bit", dbType("bit(1)"), DataTypes.BOOLEAN()), + field("col_blob", dbType("blob"), DataTypes.BYTES()), + field("col_char", dbType("char(10)"), DataTypes.CHAR(10)), + field("col_date", dbType("date"), DataTypes.DATE()), + field("col_year", dbType("year"), DataTypes.INT()), + field( + "col_datetime", + dbType("datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"), + DataTypes.TIMESTAMP(0)), + field("col_decimal", dbType("decimal(10,0)"), DataTypes.DECIMAL(10, 0)), + field( + "col_decimal_unsigned", + dbType("decimal(10,0) unsigned"), + DataTypes.DECIMAL(11, 0)), + field("col_double", dbType("double"), DataTypes.DOUBLE()), + field("col_double_unsigned", dbType("double unsigned"), DataTypes.DOUBLE()), + field("col_enum", dbType("enum('enum1','enum2','enum11')"), DataTypes.VARCHAR(6)), + field("col_float", dbType("float"), DataTypes.FLOAT()), + field("col_float_unsigned", dbType("float unsigned"), DataTypes.FLOAT()), + field("col_int", dbType("int(11)"), DataTypes.INT()), + field("col_int_unsigned", dbType("int(10) unsigned"), DataTypes.BIGINT()), + field("col_integer", dbType("int(11)"), DataTypes.INT()), + field("col_integer_unsigned", dbType("int(10) unsigned"), DataTypes.BIGINT()), + field("col_longblob", dbType("longblob"), DataTypes.BYTES()), + field( + "col_longtext", + dbType("longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_bin"), + DataTypes.STRING()), + field("col_mediumblob", dbType("mediumblob"), DataTypes.BYTES()), + field("col_mediumint", dbType("mediumint(9)"), DataTypes.INT()), + field("col_mediumint_unsigned", dbType("mediumint(8) unsigned"), DataTypes.INT()), + field("col_mediumtext", dbType("mediumtext"), DataTypes.VARCHAR(16777215)), + field("col_numeric", dbType("decimal(10,0)"), DataTypes.DECIMAL(10, 0)), + field( + "col_numeric_unsigned", + dbType("decimal(10,0) unsigned"), + DataTypes.DECIMAL(11, 0)), + field("col_real", dbType("double"), DataTypes.DOUBLE()), + field("col_real_unsigned", dbType("double unsigned"), DataTypes.DOUBLE()), + field("col_set", dbType("set('set_ele1','set_ele12')"), DataTypes.VARCHAR(18)), + field("col_smallint", dbType("smallint(6)"), DataTypes.SMALLINT()), + field("col_smallint_unsigned", dbType("smallint(5) unsigned"), DataTypes.INT()), + field("col_text", dbType("text"), DataTypes.VARCHAR(65535)), + field("col_time", dbType("time"), DataTypes.TIME(0)), + field( + "col_timestamp", + dbType( + "timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"), + DataTypes.TIMESTAMP(0)), + field("col_tinytext", dbType("tinytext"), DataTypes.VARCHAR(255)), + field("col_tinyint", dbType("tinyint"), DataTypes.TINYINT()), + field( + "col_tinyint_unsigned", + dbType("tinyint(255) unsigned"), + DataTypes.SMALLINT()), + field("col_tinyblob", dbType("tinyblob"), DataTypes.BYTES()), + field("col_varchar", dbType("varchar(255)"), DataTypes.VARCHAR(255)), + field( + "col_datetime_p3", + dbType( + "datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3)"), + DataTypes.TIMESTAMP(3).notNull()), + field("col_time_p3", dbType("time(3)"), DataTypes.TIME(3)), + field( + "col_timestamp_p3", + dbType( + "timestamp(3) NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3)"), + DataTypes.TIMESTAMP(3)), + field("col_varbinary", dbType("varbinary(255)"), DataTypes.BYTES())); + } + + private static TableRow createGroupedTable(String tableName) { + return tableRow( + tableName, + pkField( + "pid", + dbType("bigint(20) NOT NULL AUTO_INCREMENT"), + DataTypes.BIGINT().notNull()), + field("col_bigint", dbType("bigint(20)"), DataTypes.BIGINT())); + } + + public OceanBaseMysqlCatalogITCase() { + super("oceanbase_mysql_catalog", "mysql", DEFAULT_DB); + } + + @Override + protected TableRow allTypesSourceTable() { + return TABLE_ALL_TYPES; + } + + @Override + protected TableRow allTypesSinkTable() { + return TABLE_ALL_TYPES_SINK; + } + + @Override + protected List allTypesTableRows() { + return TABLE_ALL_TYPES_ROWS; + } + + @Override + public List getManagedTables() { + return Arrays.asList( + TABLE_ALL_TYPES, TABLE_ALL_TYPES_SINK, TABLE_GROUPED_BY_SINK, TABLE_PK); + } + + @Override + protected void before() { + try (Connection conn = getMetadata().getConnection(); + Statement st = conn.createStatement()) { + st.execute(String.format("CREATE DATABASE `%s` CHARSET=utf8", TEST_DB)); + st.execute(String.format("use `%s`", TEST_DB)); + st.execute(TABLE_PK2.getCreateQuery()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void after() { + try (Connection conn = getMetadata().getConnection(); + Statement st = conn.createStatement()) { + st.execute(String.format("DROP DATABASE IF EXISTS `%s`", TEST_DB)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + assertThat(actual).containsExactlyInAnyOrder(DEFAULT_DB, TEST_DB); + } + + @Test + void testGetTablePrimaryKey() throws TableNotExistException { + // test the PK of test.t_user + Schema tableSchemaTestPK1 = TABLE_PK.getTableSchema(); + CatalogBaseTable tablePK1 = + catalog.getTable(new ObjectPath(TEST_DB, TABLE_PK.getTableName())); + assertThat(tableSchemaTestPK1.getPrimaryKey()) + .isEqualTo(tablePK1.getUnresolvedSchema().getPrimaryKey()); + + // test the PK of TEST_DB.t_user + Schema tableSchemaTestPK2 = TABLE_PK2.getTableSchema(); + CatalogBaseTable tablePK2 = + catalog.getTable(new ObjectPath(TEST_DB, TABLE_PK2.getTableName())); + assertThat(tableSchemaTestPK2.getPrimaryKey()) + .isEqualTo(tablePK2.getUnresolvedSchema().getPrimaryKey()); + } + + @Test + void testSelectField() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select pid from %s", + TABLE_ALL_TYPES.getTableName())) + .execute() + .collect()); + assertThat(results) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L))); + } + + @Test + void testGroupByInsert() throws Exception { + // Changes primary key for the next record. + tEnv.executeSql( + String.format( + "insert into `%s` select max(`pid`) `pid`, `col_bigint` from `%s` " + + "group by `col_bigint` ", + TABLE_GROUPED_BY_SINK.getTableName(), + TABLE_ALL_TYPES.getTableName())) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + TABLE_GROUPED_BY_SINK.getTableName())) + .execute() + .collect()); + assertThat(results) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, 2L, -1L))); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseOracleCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseOracleCatalogITCase.java new file mode 100644 index 000000000..08a6816be --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseOracleCatalogITCase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Disabled; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase.tableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; + +/** E2E test for {@link OceanBaseCatalog} with OceanBase Oracle mode. */ +@Disabled("OceanBase Oracle mode can only be tested locally.") +public class OceanBaseOracleCatalogITCase extends OceanBaseCatalogITCaseBase + implements OceanBaseOracleTestBase { + + private static final String SCHEMA = "SYS"; + + private static final TableRow TABLE_ALL_TYPES = createTableAllTypeTable("T_ALL_TYPES"); + private static final TableRow TABLE_ALL_TYPES_SINK = + createTableAllTypeTable("T_ALL_TYPES_SINK"); + + private static final List TABLE_ALL_TYPES_ROWS = + Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + "1.12345", + 1.175E-10F, + 1.79769E+40D, + "a", + "abc", + "abcdef", + LocalDate.parse("1997-01-01"), + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + LocalDateTime.parse("2020-01-01T15:35:00.123456789"), + "Hello World"), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + "1.12345", + 1.175E-10F, + 1.79769E+40D, + "a", + "abc", + "abcdef", + LocalDate.parse("1997-01-02"), + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + LocalDateTime.parse("2020-01-01T15:36:01.123456789"), + "Hey Leonard")); + + private static TableRow createTableAllTypeTable(String tableName) { + return tableRow( + tableName, + pkField("ID", dbType("NUMBER(18,0)"), DataTypes.BIGINT().notNull()), + field("DECIMAL_COL", dbType("NUMBER(10,4)"), DataTypes.DECIMAL(10, 4)), + field("FLOAT_COL", dbType("FLOAT"), DataTypes.STRING()), + field("BINARY_FLOAT_COL", dbType("BINARY_FLOAT"), DataTypes.FLOAT()), + field("BINARY_DOUBLE_COL", dbType("BINARY_DOUBLE"), DataTypes.DOUBLE()), + field("CHAR_COL", dbType("CHAR"), DataTypes.CHAR(1)), + field("NCHAR_COL", dbType("NCHAR(3)"), DataTypes.CHAR(3)), + field("VARCHAR2_COL", dbType("VARCHAR2(30)"), DataTypes.VARCHAR(30)), + field("DATE_COL", dbType("DATE"), DataTypes.DATE()), + field("TIMESTAMP6_COL", dbType("TIMESTAMP"), DataTypes.TIMESTAMP(6)), + field("TIMESTAMP9_COL", dbType("TIMESTAMP(9)"), DataTypes.TIMESTAMP(9)), + field("CLOB_COL", dbType("CLOB"), DataTypes.STRING())); + } + + public OceanBaseOracleCatalogITCase() { + super("oceanbase_oracle_catalog", "oracle", SCHEMA); + } + + @Override + protected TableRow allTypesSourceTable() { + return TABLE_ALL_TYPES; + } + + @Override + protected TableRow allTypesSinkTable() { + return TABLE_ALL_TYPES_SINK; + } + + @Override + protected List allTypesTableRows() { + return TABLE_ALL_TYPES_ROWS; + } + + @Override + public List getManagedTables() { + return Arrays.asList(TABLE_ALL_TYPES, TABLE_ALL_TYPES_SINK); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java index aecde26a7..c4d8f3f5a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java @@ -25,6 +25,7 @@ import java.util.Map; +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase.tableRow; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java index 5e96be837..48685a8cf 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase.tableRow; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java index 7569b7bb5..b2123f826 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java @@ -31,12 +31,13 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase.tableRow; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; /** The Table Sink ITCase for OceanBase Oracle mode. */ -@Disabled +@Disabled("OceanBase Oracle mode can only be tested locally.") class OceanBaseOracleDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements OceanBaseOracleTestBase { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java index 9f94fa736..3b3c8899b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java @@ -32,11 +32,12 @@ import java.util.Arrays; import java.util.List; +import static org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase.tableRow; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; /** The Table Source ITCase for OceanBase Oracle mode. */ -@Disabled +@Disabled("OceanBase Oracle mode can only be tested locally.") class OceanBaseOracleDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase implements OceanBaseOracleTestBase { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java index ba31718ef..a78262982 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java @@ -19,21 +19,37 @@ package org.apache.flink.connector.jdbc.testutils.databases.oceanbase; import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; /** {@link JdbcDatabaseContainer} for OceanBase. */ public class OceanBaseContainer extends JdbcDatabaseContainer { - public static final Integer SQL_PORT = 2881; + private static final int SQL_PORT = 2881; + private static final String DEFAULT_PASSWORD = ""; + + private String password = DEFAULT_PASSWORD; public OceanBaseContainer(String dockerImageName) { this(DockerImageName.parse(dockerImageName)); - - addExposedPort(SQL_PORT); } public OceanBaseContainer(DockerImageName dockerImageName) { super(dockerImageName); + + addExposedPort(SQL_PORT); + setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1)); + } + + @Override + protected void configure() { + if (!DEFAULT_PASSWORD.equals(password)) { + addEnv("OB_TENANT_PASSWORD", password); + } + } + + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); } @Override @@ -64,11 +80,16 @@ public String getUsername() { @Override public String getPassword() { - return ""; + return password; } @Override protected String getTestQueryString() { return "SELECT 1"; } + + public OceanBaseContainer withPassword(String password) { + this.password = password; + return this; + } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java index 50001f653..da659ff7a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java @@ -25,21 +25,33 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.Transferable; -import java.sql.Connection; -import java.sql.Statement; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; /** OceanBase database for testing. */ public class OceanBaseDatabase extends DatabaseExtension implements OceanBaseImages { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseDatabase.class); + private static final String ZONE_OFFSET = + DateTimeFormatter.ofPattern("xxx") + .format(ZoneId.systemDefault().getRules().getOffset(Instant.now())); + private static final OceanBaseContainer CONTAINER = new OceanBaseContainer(OCEANBASE_CE_4) .withEnv("MODE", "slim") - .withEnv("FASTBOOT", "true") - .withEnv("OB_DATAFILE_SIZE", "1G") + .withEnv("OB_DATAFILE_SIZE", "2G") .withEnv("OB_LOG_DISK_SIZE", "4G") + .withPassword("123456") + .withUrlParam("useSSL", "false") + .withUrlParam("serverTimezone", ZONE_OFFSET) + .withCopyToContainer( + Transferable.of( + String.format("SET GLOBAL time_zone = '%s';", ZONE_OFFSET)), + "/root/boot/init.d/init.sql") .withLogConsumer(new Slf4jLogConsumer(LOG)); private static OceanBaseMetadata metadata; @@ -49,7 +61,13 @@ public static OceanBaseMetadata getMetadata() { throw new FlinkRuntimeException("Container is stopped."); } if (metadata == null) { - metadata = new OceanBaseMetadata(CONTAINER); + metadata = + new OceanBaseMetadata( + CONTAINER.getUsername(), + CONTAINER.getPassword(), + CONTAINER.getJdbcUrl(), + CONTAINER.getDriverClassName(), + CONTAINER.getDockerImageName()); } return metadata; } @@ -57,10 +75,6 @@ public static OceanBaseMetadata getMetadata() { @Override protected DatabaseMetadata startDatabase() throws Exception { CONTAINER.start(); - try (Connection connection = getMetadata().getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("SET GLOBAL time_zone = '+00:00'"); - } return getMetadata(); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java index 70194da4d..373d8e07f 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java @@ -21,7 +21,7 @@ /** OceanBase docker images. */ public interface OceanBaseImages { - String OCEANBASE_CE_4 = "oceanbase/oceanbase-ce:4.2.1_bp3"; + String OCEANBASE_CE_4 = "oceanbase/oceanbase-ce:4.2.1-lts"; String OCEANBASE_CE_3 = "oceanbase/oceanbase-ce:3.1.4"; } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java index b9df95e5f..8c1e30a3a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java @@ -31,14 +31,6 @@ public class OceanBaseMetadata implements DatabaseMetadata { private final String driver; private final String version; - public OceanBaseMetadata(OceanBaseContainer container) { - this.username = container.getUsername(); - this.password = container.getPassword(); - this.url = container.getJdbcUrl(); - this.driver = container.getDriverClassName(); - this.version = container.getDockerImageName(); - } - public OceanBaseMetadata( String username, String password, String url, String driver, String version) { this.username = username;