Skip to content

Commit

Permalink
Merge pull request #9 from TouK/bump_to_flink_1_14
Browse files Browse the repository at this point in the history
Bump to flink 1 14
  • Loading branch information
jedrz authored Nov 30, 2023
2 parents ddc49c6 + 828501f commit 477d3eb
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 106 deletions.
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ under the License.

<properties>
<java.version>11</java.version>
<flink.version>1.11.2</flink.version>
<flink.version>1.14.6</flink.version>
<log4j.version>2.12.1</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<ignite.version>2.10.0</ignite.version>
Expand Down Expand Up @@ -128,7 +128,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -154,6 +154,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<!-- Ignite needs this version, as per https://stackoverflow.com/a/68774254/3132741 -->
<version>1.4.197</version>
<scope>test</scope>
</dependency>

</dependencies>

<distributionManagement>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public JdbcRowConverter getRowConverter(RowType rowType) {
return new IgniteRowConverter(rowType);
}

@Override
public String getLimitClause(long l) {
return "LIMIT " + l;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.apache.ignite.IgniteJdbcThinDriver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -109,14 +109,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
// validate all options
helper.validate();

JdbcOptions jdbcOptions = getJdbcOptions(config);
JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
JdbcDatePartitionReadOptions readOptions = getJdbcReadOptions(config).orElse(null);

// get table schema
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

// table source
return new IgniteDynamicTableSource(jdbcOptions, readOptions, physicalSchema);
return new IgniteDynamicTableSource(jdbcOptions, readOptions, context.getCatalogTable().getResolvedSchema());

}

Expand All @@ -125,9 +122,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
throw new NotImplementedException("Ignite dynamic sink not implemented yet");
}

private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
final JdbcConnectorOptions.Builder builder = JdbcConnectorOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package pl.touk.flink.ignite.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.time.LocalDate;
import java.time.ZoneId;

public class IgniteDynamicTableSource implements ScanTableSource {

private final JdbcOptions options;
private final JdbcConnectorOptions options;
private final JdbcDatePartitionReadOptions readOptions;
private final TableSchema tableSchema;
private final ResolvedSchema tableSchema;

public IgniteDynamicTableSource(JdbcOptions options, JdbcDatePartitionReadOptions readOptions, TableSchema tableSchema) {
public IgniteDynamicTableSource(JdbcConnectorOptions options, JdbcDatePartitionReadOptions readOptions, ResolvedSchema tableSchema) {
this.options = options;
this.readOptions = readOptions;
this.tableSchema = tableSchema;
Expand All @@ -39,9 +38,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
final JdbcDialect dialect = options.getDialect();

String query = dialect.getSelectFromStatement(
options.getTableName(), tableSchema.getFieldNames(), new String[0]);
options.getTableName(), tableSchema.getColumnNames().toArray(new String[0]), new String[0]);

final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
DataType rowDataType = tableSchema.toPhysicalRowDataType();
final RowType rowType = (RowType) rowDataType.getLogicalType();

final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
Expand All @@ -50,8 +50,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
.setPassword(options.getPassword().orElse(null))
.setQuery(query)
.setRowConverter(dialect.getRowConverter(rowType))
.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
.createTypeInformation(tableSchema.toRowDataType()));
.setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(rowDataType));

if (readOptions != null) {
LocalDate lowerBound = readOptions.getPartitionLowerBound();
Expand All @@ -66,7 +65,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}

builder.setQuery(query);

return InputFormatProvider.of(builder.build());

}
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package pl.touk.flink.ignite;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -42,10 +46,8 @@ public class FlinkConnectorIgniteIT {

@BeforeAll
static void setUp() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tableEnv = StreamTableEnvironmentUtil.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
);
tableEnv = StreamTableEnvironmentUtil.create();


ignitePort = PortFinder.getAvailablePort();
File igniteWorkDir = Files.createTempDirectory("igniteSpec").toFile();
Expand Down
88 changes: 8 additions & 80 deletions src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java
Original file line number Diff line number Diff line change
@@ -1,95 +1,23 @@
package pl.touk.flink.ignite;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;

import java.lang.reflect.Method;
import java.util.Map;

/*
Workaround for StreamTableEnvironmentImpl.create check :
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Conversion-of-Table-Blink-batch-to-DataStream-tc34080.html#a34090
*/
public class StreamTableEnvironmentUtil {

public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings) {

// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

ModuleManager moduleManager = new ModuleManager();

CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(new Configuration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();

public static StreamTableEnvironment create() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);

return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader
);
}

private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
tableConfig.getConfiguration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
return StreamTableEnvironmentImpl.create(env, settings, tableConfig);
}

}

0 comments on commit 477d3eb

Please sign in to comment.