From 5f597cb065d27b0874d63a9ef3f85010f65f1c11 Mon Sep 17 00:00:00 2001 From: Jakub Dutkowski Date: Wed, 13 Oct 2021 13:09:45 +0200 Subject: [PATCH] Use Flink 1.14.0 --- pom.xml | 12 ++- .../flink/ignite/dialect/IgniteDialect.java | 5 ++ .../table/IgniteDynamicTableFactory.java | 13 ++- .../table/IgniteDynamicTableSource.java | 22 +++-- .../flink/ignite/FlinkConnectorIgniteIT.java | 10 ++- .../ignite/StreamTableEnvironmentUtil.java | 88 ++----------------- 6 files changed, 44 insertions(+), 106 deletions(-) diff --git a/pom.xml b/pom.xml index fa17b0f..f5e3b34 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ under the License. 11 - 1.11.2 + 1.14.0 2.12.1 2.11 2.10.0 @@ -128,7 +128,7 @@ under the License. org.apache.flink - flink-table-planner-blink_${scala.binary.version} + flink-table-planner_${scala.binary.version} ${flink.version} test @@ -154,6 +154,14 @@ under the License. test + + com.h2database + h2 + + 1.4.197 + test + + diff --git a/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java b/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java index e232753..b8c9aba 100644 --- a/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java +++ b/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java @@ -26,6 +26,11 @@ public JdbcRowConverter getRowConverter(RowType rowType) { return new IgniteRowConverter(rowType); } + @Override + public String getLimitClause(long l) { + return "LIMIT " + l; + } + @Override public Optional defaultDriverName() { return Optional.of("org.apache.ignite.IgniteJdbcThinDriver"); diff --git a/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableFactory.java b/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableFactory.java index 0a27d0d..d3447ce 100644 --- a/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableFactory.java +++ b/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableFactory.java @@ -4,7 +4,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -109,14 +109,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { // validate all options helper.validate(); - JdbcOptions jdbcOptions = getJdbcOptions(config); + JdbcConnectorOptions jdbcOptions = getJdbcOptions(config); JdbcDatePartitionReadOptions readOptions = getJdbcReadOptions(config).orElse(null); - // get table schema - TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - // table source - return new IgniteDynamicTableSource(jdbcOptions, readOptions, physicalSchema); + return new IgniteDynamicTableSource(jdbcOptions, readOptions, context.getCatalogTable().getResolvedSchema()); } @@ -125,9 +122,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { throw new NotImplementedException("Ignite dynamic sink not implemented yet"); } - private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) { + private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) { final String url = readableConfig.get(URL); - final JdbcOptions.Builder builder = JdbcOptions.builder() + final JdbcConnectorOptions.Builder builder = JdbcConnectorOptions.builder() .setDriverName(DRIVER_NAME) .setDBUrl(url) .setTableName(readableConfig.get(TABLE_NAME)) diff --git a/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableSource.java b/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableSource.java index 5c8ae7b..2b5c405 100644 --- a/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableSource.java +++ b/src/main/java/pl/touk/flink/ignite/table/IgniteDynamicTableSource.java @@ -1,15 +1,14 @@ package pl.touk.flink.ignite.table; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import java.time.LocalDate; @@ -17,11 +16,11 @@ public class IgniteDynamicTableSource implements ScanTableSource { - private final JdbcOptions options; + private final JdbcConnectorOptions options; private final JdbcDatePartitionReadOptions readOptions; - private final TableSchema tableSchema; + private final ResolvedSchema tableSchema; - public IgniteDynamicTableSource(JdbcOptions options, JdbcDatePartitionReadOptions readOptions, TableSchema tableSchema) { + public IgniteDynamicTableSource(JdbcConnectorOptions options, JdbcDatePartitionReadOptions readOptions, ResolvedSchema tableSchema) { this.options = options; this.readOptions = readOptions; this.tableSchema = tableSchema; @@ -39,9 +38,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon final JdbcDialect dialect = options.getDialect(); String query = dialect.getSelectFromStatement( - options.getTableName(), tableSchema.getFieldNames(), new String[0]); + options.getTableName(), tableSchema.getColumnNames().toArray(new String[0]), new String[0]); - final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + DataType rowDataType = tableSchema.toPhysicalRowDataType(); + final RowType rowType = (RowType) rowDataType.getLogicalType(); final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder() .setDrivername(options.getDriverName()) @@ -50,8 +50,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon .setPassword(options.getPassword().orElse(null)) .setQuery(query) .setRowConverter(dialect.getRowConverter(rowType)) - .setRowDataTypeInfo((TypeInformation) runtimeProviderContext - .createTypeInformation(tableSchema.toRowDataType())); + .setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(rowDataType)); if (readOptions != null) { LocalDate lowerBound = readOptions.getPartitionLowerBound(); @@ -66,7 +65,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon } builder.setQuery(query); - return InputFormatProvider.of(builder.build()); } diff --git a/src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java b/src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java index 8393829..c348c3d 100644 --- a/src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java +++ b/src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java @@ -1,9 +1,13 @@ package pl.touk.flink.ignite; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.ignite.Ignite; @@ -42,10 +46,8 @@ public class FlinkConnectorIgniteIT { @BeforeAll static void setUp() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - tableEnv = StreamTableEnvironmentUtil.create( - env, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() - ); + tableEnv = StreamTableEnvironmentUtil.create(); + ignitePort = PortFinder.getAvailablePort(); File igniteWorkDir = Files.createTempDirectory("igniteSpec").toFile(); diff --git a/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java b/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java index 8e40812..b7cf074 100644 --- a/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java +++ b/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java @@ -1,95 +1,23 @@ package pl.touk.flink.ignite; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.module.ModuleManager; -import java.lang.reflect.Method; -import java.util.Map; - -/* - Workaround for StreamTableEnvironmentImpl.create check : - if (!settings.isStreamingMode()) { - throw new TableException( - "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); - } - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Conversion-of-Table-Blink-batch-to-DataStream-tc34080.html#a34090 - */ public class StreamTableEnvironmentUtil { - public static StreamTableEnvironment create( - StreamExecutionEnvironment executionEnvironment, - EnvironmentSettings settings) { - - // temporary solution until FLINK-15635 is fixed - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - ModuleManager moduleManager = new ModuleManager(); - - CatalogManager catalogManager = CatalogManager.newBuilder() - .classLoader(classLoader) - .config(new Configuration()) - .defaultCatalog( - settings.getBuiltInCatalogName(), - new GenericInMemoryCatalog( - settings.getBuiltInCatalogName(), - settings.getBuiltInDatabaseName())) - .executionConfig(executionEnvironment.getConfig()) - .build(); - + public static StreamTableEnvironment create() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); TableConfig tableConfig = new TableConfig(); - FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); - - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, executionEnvironment); - - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); - - return new StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - functionCatalog, - tableConfig, - executionEnvironment, - planner, - executor, - settings.isStreamingMode(), - classLoader - ); - } - - private static Executor lookupExecutor( - Map executorProperties, - StreamExecutionEnvironment executionEnvironment) { - try { - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); + tableConfig.getConfiguration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); - return (Executor) createMethod.invoke( - executorFactory, - executorProperties, - executionEnvironment); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } + return StreamTableEnvironmentImpl.create(env, settings, tableConfig); } }