From a710041f0ecddc9a0c22244a3e24f0fb7c06450d Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 23 Sep 2024 14:24:56 +0800 Subject: [PATCH] [flink] Remove scan.push-down support which should be supported by Flink (#4224) --- .../flink_connector_configuration.html | 6 - .../paimon/flink/source/DataTableSource.java | 96 ++++++++ .../flink/source/table/RichTableSource.java | 53 ----- .../flink/AbstractFlinkTableFactory.java | 29 +-- .../paimon/flink/FlinkConnectorOptions.java | 9 - .../flink/source/BaseDataTableSource.java | 221 ++++++++++++++++++ .../paimon/flink/source/DataTableSource.java | 186 ++------------- .../paimon/flink/source/FlinkTableSource.java | 52 ++--- .../flink/source/SystemTableSource.java | 35 --- .../flink/source/table/BaseTableSource.java | 49 ---- .../source/table/PushedRichTableSource.java | 65 ------ .../flink/source/table/PushedTableSource.java | 65 ------ .../flink/source/table/RichTableSource.java | 76 ------ .../flink/ContinuousFileStoreITCase.java | 27 --- .../flink/source/FlinkTableSourceTest.java | 30 +-- 15 files changed, 382 insertions(+), 617 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java delete mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index e96f2f65e812..cf4c7c1da692 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -116,12 +116,6 @@ Integer Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism. - -
scan.push-down
- true - Boolean - If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down. -
scan.remove-normalize
false diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java new file mode 100644 index 000000000000..f870d837019f --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.log.LogStoreTableFactory; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +/** A {@link BaseDataTableSource} for Flink 1.15. */ +public class DataTableSource extends BaseDataTableSource { + + public DataTableSource( + ObjectIdentifier tableIdentifier, + Table table, + boolean streaming, + DynamicTableFactory.Context context, + @Nullable LogStoreTableFactory logStoreTableFactory) { + this( + tableIdentifier, + table, + streaming, + context, + logStoreTableFactory, + null, + null, + null, + null); + } + + public DataTableSource( + ObjectIdentifier tableIdentifier, + Table table, + boolean streaming, + DynamicTableFactory.Context context, + @Nullable LogStoreTableFactory logStoreTableFactory, + @Nullable Predicate predicate, + @Nullable int[][] projectFields, + @Nullable Long limit, + @Nullable WatermarkStrategy watermarkStrategy) { + super( + tableIdentifier, + table, + streaming, + context, + logStoreTableFactory, + predicate, + projectFields, + limit, + watermarkStrategy); + } + + @Override + public DataTableSource copy() { + return new DataTableSource( + tableIdentifier, + table, + streaming, + context, + logStoreTableFactory, + predicate, + projectFields, + limit, + watermarkStrategy); + } + + @Override + protected List dynamicPartitionFilteringFields() { + return Collections.emptyList(); + } +} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java deleted file mode 100644 index 50c8dc5ffca9..000000000000 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.table; - -import org.apache.paimon.flink.source.FlinkTableSource; - -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; -import org.apache.flink.table.data.RowData; - -/** The {@link BaseTableSource} with Lookup and watermark. */ -public class RichTableSource extends BaseTableSource - implements LookupTableSource, SupportsWatermarkPushDown { - - private final FlinkTableSource source; - - public RichTableSource(FlinkTableSource source) { - super(source); - this.source = source; - } - - @Override - public RichTableSource copy() { - return new RichTableSource(source.copy()); - } - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - return source.getLookupRuntimeProvider(context); - } - - @Override - public void applyWatermark(WatermarkStrategy watermarkStrategy) { - source.pushWatermark(watermarkStrategy); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 86a22dc07c9e..e469044f5f4f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -28,9 +28,6 @@ import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.source.DataTableSource; import org.apache.paimon.flink.source.SystemTableSource; -import org.apache.paimon.flink.source.table.PushedRichTableSource; -import org.apache.paimon.flink.source.table.PushedTableSource; -import org.apache.paimon.flink.source.table.RichTableSource; import org.apache.paimon.lineage.LineageMeta; import org.apache.paimon.lineage.LineageMetaFactory; import org.apache.paimon.lineage.TableLineageEntity; @@ -83,7 +80,6 @@ import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; import static org.apache.paimon.flink.FlinkConnectorOptions.NONE; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PUSH_DOWN; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory; @@ -100,11 +96,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; if (origin instanceof SystemCatalogTable) { - return new PushedTableSource( - new SystemTableSource( - ((SystemCatalogTable) origin).table(), - isStreamingMode, - context.getObjectIdentifier())); + return new SystemTableSource( + ((SystemCatalogTable) origin).table(), + isStreamingMode, + context.getObjectIdentifier()); } else { Table table = buildPaimonTable(context); if (table instanceof FileStoreTable) { @@ -120,16 +115,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { } }); } - DataTableSource source = - new DataTableSource( - context.getObjectIdentifier(), - table, - isStreamingMode, - context, - createOptionalLogStoreFactory(context).orElse(null)); - return new Options(table.options()).get(SCAN_PUSH_DOWN) - ? new PushedRichTableSource(source) - : new RichTableSource(source); + return new DataTableSource( + context.getObjectIdentifier(), + table, + isStreamingMode, + context, + createOptionalLogStoreFactory(context).orElse(null)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 2a8f5f4cd7cd..d181d7b5a0c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -244,15 +244,6 @@ public class FlinkConnectorOptions { "Weight of managed memory for RocksDB in cross-partition update, Flink will compute the memory size " + "according to the weight, the actual memory used depends on the running environment."); - public static final ConfigOption SCAN_PUSH_DOWN = - ConfigOptions.key("scan.push-down") - .booleanType() - .defaultValue(true) - .withDescription( - "If true, flink will push down projection, filters, limit to the source. The cost is that it " - + "is difficult to reuse the source in a job. With flink 1.18 or higher version, it " - + "is possible to reuse the source even with projection push down."); - public static final ConfigOption SOURCE_CHECKPOINT_ALIGN_ENABLED = ConfigOptions.key("source.checkpoint-align.enabled") .booleanType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java new file mode 100644 index 000000000000..8775ab8f5486 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.CoreOptions.LogChangelogMode; +import org.apache.paimon.CoreOptions.LogConsistency; +import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.log.LogSourceProvider; +import org.apache.paimon.flink.log.LogStoreTableFactory; +import org.apache.paimon.flink.lookup.FileStoreLookupFunction; +import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Projection; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.LookupTableSource.LookupContext; +import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.List; +import java.util.stream.IntStream; + +import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; +import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; +import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; + +/** + * Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under + * batch mode or streaming mode. + */ +public abstract class BaseDataTableSource extends FlinkTableSource + implements LookupTableSource, SupportsWatermarkPushDown { + + protected final ObjectIdentifier tableIdentifier; + protected final boolean streaming; + protected final DynamicTableFactory.Context context; + @Nullable protected final LogStoreTableFactory logStoreTableFactory; + + @Nullable protected WatermarkStrategy watermarkStrategy; + + public BaseDataTableSource( + ObjectIdentifier tableIdentifier, + Table table, + boolean streaming, + DynamicTableFactory.Context context, + @Nullable LogStoreTableFactory logStoreTableFactory, + @Nullable Predicate predicate, + @Nullable int[][] projectFields, + @Nullable Long limit, + @Nullable WatermarkStrategy watermarkStrategy) { + super(table, predicate, projectFields, limit); + this.tableIdentifier = tableIdentifier; + this.streaming = streaming; + this.context = context; + this.logStoreTableFactory = logStoreTableFactory; + this.predicate = predicate; + this.projectFields = projectFields; + this.limit = limit; + this.watermarkStrategy = watermarkStrategy; + } + + @Override + public ChangelogMode getChangelogMode() { + if (!streaming) { + // batch merge all, return insert only + return ChangelogMode.insertOnly(); + } + + if (table.primaryKeys().isEmpty()) { + return ChangelogMode.insertOnly(); + } else { + Options options = Options.fromMap(table.options()); + + if (new CoreOptions(options).mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) { + return ChangelogMode.insertOnly(); + } + + if (options.get(SCAN_REMOVE_NORMALIZE)) { + return ChangelogMode.all(); + } + + if (logStoreTableFactory == null + && options.get(CHANGELOG_PRODUCER) != ChangelogProducer.NONE) { + return ChangelogMode.all(); + } + + // optimization: transaction consistency and all changelog mode avoid the generation of + // normalized nodes. See FlinkTableSink.getChangelogMode validation. + return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL + && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL + ? ChangelogMode.all() + : ChangelogMode.upsert(); + } + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + LogSourceProvider logSourceProvider = null; + if (logStoreTableFactory != null) { + logSourceProvider = + logStoreTableFactory.createSourceProvider(context, scanContext, projectFields); + } + + WatermarkStrategy watermarkStrategy = this.watermarkStrategy; + Options options = Options.fromMap(table.options()); + if (watermarkStrategy != null) { + WatermarkEmitStrategy emitStrategy = options.get(SCAN_WATERMARK_EMIT_STRATEGY); + if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) { + watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy); + } + Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT); + if (idleTimeout != null) { + watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout); + } + String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP); + if (watermarkAlignGroup != null) { + watermarkStrategy = + WatermarkAlignUtils.withWatermarkAlignment( + watermarkStrategy, + watermarkAlignGroup, + options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), + options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)); + } + } + + FlinkSourceBuilder sourceBuilder = + new FlinkSourceBuilder(table) + .sourceName(tableIdentifier.asSummaryString()) + .sourceBounded(!streaming) + .logSourceProvider(logSourceProvider) + .projection(projectFields) + .predicate(predicate) + .limit(limit) + .watermarkStrategy(watermarkStrategy) + .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); + + return new PaimonDataStreamScanProvider( + !streaming, + env -> + sourceBuilder + .sourceParallelism(inferSourceParallelism(env)) + .env(env) + .build()); + } + + protected abstract List dynamicPartitionFilteringFields(); + + @Override + public void applyWatermark(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + } + + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + if (limit != null) { + throw new RuntimeException( + "Limit push down should not happen in Lookup source, but it is " + limit); + } + int[] projection = + projectFields == null + ? IntStream.range(0, table.rowType().getFieldCount()).toArray() + : Projection.of(projectFields).toTopLevelIndexes(); + int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes(); + Options options = new Options(table.options()); + boolean enableAsync = options.get(LOOKUP_ASYNC); + int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER); + return LookupRuntimeProviderFactory.create( + new FileStoreLookupFunction(table, projection, joinKey, predicate), + enableAsync, + asyncThreadNumber); + } + + @Override + public String asSummaryString() { + return "Paimon-DataSource"; + } + + @Override + public boolean isStreaming() { + return streaming; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index c4544426fdc6..200550c884d9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -18,68 +18,31 @@ package org.apache.paimon.flink.source; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.ChangelogProducer; -import org.apache.paimon.CoreOptions.LogChangelogMode; -import org.apache.paimon.CoreOptions.LogConsistency; -import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy; -import org.apache.paimon.flink.PaimonDataStreamScanProvider; -import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.log.LogStoreTableFactory; -import org.apache.paimon.flink.lookup.FileStoreLookupFunction; -import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory; -import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; -import org.apache.paimon.utils.Projection; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.LookupTableSource.LookupContext; -import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider; -import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; +import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; +import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.plan.stats.TableStats; import javax.annotation.Nullable; -import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.stream.IntStream; -import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; -import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; -import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY; -import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC; -import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; -import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; import static org.apache.paimon.utils.Preconditions.checkState; /** - * Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under - * batch mode or change-tracking is disabled. For streaming mode with change-tracking enabled and - * FULL scan mode, it will create a {@link - * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code - * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created by {@link - * LogSourceProvider}. + * A {@link BaseDataTableSource} implements {@link SupportsStatisticReport} and {@link + * SupportsDynamicFiltering}. */ -public class DataTableSource extends FlinkTableSource { - - private final ObjectIdentifier tableIdentifier; - private final boolean streaming; - private final DynamicTableFactory.Context context; - @Nullable private final LogStoreTableFactory logStoreTableFactory; - - @Nullable private WatermarkStrategy watermarkStrategy; +public class DataTableSource extends BaseDataTableSource + implements SupportsStatisticReport, SupportsDynamicFiltering { @Nullable private List dynamicPartitionFilteringFields; @@ -113,102 +76,19 @@ public DataTableSource( @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @Nullable List dynamicPartitionFilteringFields) { - super(table, predicate, projectFields, limit); - this.tableIdentifier = tableIdentifier; - this.streaming = streaming; - this.context = context; - this.logStoreTableFactory = logStoreTableFactory; - this.predicate = predicate; - this.projectFields = projectFields; - this.limit = limit; - this.watermarkStrategy = watermarkStrategy; + super( + tableIdentifier, + table, + streaming, + context, + logStoreTableFactory, + predicate, + projectFields, + limit, + watermarkStrategy); this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields; } - @Override - public ChangelogMode getChangelogMode() { - if (!streaming) { - // batch merge all, return insert only - return ChangelogMode.insertOnly(); - } - - if (table.primaryKeys().isEmpty()) { - return ChangelogMode.insertOnly(); - } else { - Options options = Options.fromMap(table.options()); - - if (new CoreOptions(options).mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) { - return ChangelogMode.insertOnly(); - } - - if (options.get(SCAN_REMOVE_NORMALIZE)) { - return ChangelogMode.all(); - } - - if (logStoreTableFactory == null - && options.get(CHANGELOG_PRODUCER) != ChangelogProducer.NONE) { - return ChangelogMode.all(); - } - - // optimization: transaction consistency and all changelog mode avoid the generation of - // normalized nodes. See FlinkTableSink.getChangelogMode validation. - return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL - && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL - ? ChangelogMode.all() - : ChangelogMode.upsert(); - } - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - LogSourceProvider logSourceProvider = null; - if (logStoreTableFactory != null) { - logSourceProvider = - logStoreTableFactory.createSourceProvider(context, scanContext, projectFields); - } - - WatermarkStrategy watermarkStrategy = this.watermarkStrategy; - Options options = Options.fromMap(table.options()); - if (watermarkStrategy != null) { - WatermarkEmitStrategy emitStrategy = options.get(SCAN_WATERMARK_EMIT_STRATEGY); - if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) { - watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy); - } - Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT); - if (idleTimeout != null) { - watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout); - } - String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP); - if (watermarkAlignGroup != null) { - watermarkStrategy = - WatermarkAlignUtils.withWatermarkAlignment( - watermarkStrategy, - watermarkAlignGroup, - options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), - options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)); - } - } - - FlinkSourceBuilder sourceBuilder = - new FlinkSourceBuilder(table) - .sourceName(tableIdentifier.asSummaryString()) - .sourceBounded(!streaming) - .logSourceProvider(logSourceProvider) - .projection(projectFields) - .predicate(predicate) - .limit(limit) - .watermarkStrategy(watermarkStrategy) - .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields); - - return new PaimonDataStreamScanProvider( - !streaming, - env -> - sourceBuilder - .sourceParallelism(inferSourceParallelism(env)) - .env(env) - .build()); - } - @Override public DataTableSource copy() { return new DataTableSource( @@ -224,31 +104,6 @@ public DataTableSource copy() { dynamicPartitionFilteringFields); } - @Override - public void pushWatermark(WatermarkStrategy watermarkStrategy) { - this.watermarkStrategy = watermarkStrategy; - } - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - if (limit != null) { - throw new RuntimeException( - "Limit push down should not happen in Lookup source, but it is " + limit); - } - int[] projection = - projectFields == null - ? IntStream.range(0, table.rowType().getFieldCount()).toArray() - : Projection.of(projectFields).toTopLevelIndexes(); - int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes(); - Options options = new Options(table.options()); - boolean enableAsync = options.get(LOOKUP_ASYNC); - int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER); - return LookupRuntimeProviderFactory.create( - new FileStoreLookupFunction(table, projection, joinKey, predicate), - enableAsync, - asyncThreadNumber); - } - @Override public TableStats reportStatistics() { if (streaming) { @@ -259,11 +114,6 @@ public TableStats reportStatistics() { return new TableStats(splitStatistics.totalRowCount()); } - @Override - public String asSummaryString() { - return "Paimon-DataSource"; - } - @Override public List listAcceptedFilterFields() { // note that streaming query doesn't support dynamic filtering @@ -286,7 +136,7 @@ public void applyDynamicFiltering(List candidateFilterFields) { } @Override - public boolean isStreaming() { - return streaming; + protected List dynamicPartitionFilteringFields() { + return dynamicPartitionFilteringFields; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 7254eefaa435..920a1ba140ed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -30,18 +30,13 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.LookupTableSource.LookupContext; -import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider; import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +50,11 @@ import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; /** A Flink {@link ScanTableSource} for paimon. */ -public abstract class FlinkTableSource { +public abstract class FlinkTableSource + implements ScanTableSource, + SupportsFilterPushDown, + SupportsProjectionPushDown, + SupportsLimitPushDown { private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class); @@ -85,8 +84,8 @@ public FlinkTableSource( this.limit = limit; } - /** @return The unconsumed filters. */ - public List pushFilters(List filters) { + @Override + public Result applyFilters(List filters) { List partitionKeys = table.partitionKeys(); RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType()); @@ -115,35 +114,24 @@ public List pushFilters(List filters) { predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); LOG.info("Consumed filters: {} of {}", consumedFilters, filters); - return unConsumedFilters; + return Result.of(filters, unConsumedFilters); } - public void pushProjection(int[][] projectedFields) { + @Override + public boolean supportsNestedProjection() { + return false; + } + + @Override + public void applyProjection(int[][] projectedFields) { this.projectFields = projectedFields; } - public void pushLimit(long limit) { + @Override + public void applyLimit(long limit) { this.limit = limit; } - public abstract ChangelogMode getChangelogMode(); - - public abstract ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext); - - public abstract void pushWatermark(WatermarkStrategy watermarkStrategy); - - public abstract LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context); - - public abstract TableStats reportStatistics(); - - public abstract FlinkTableSource copy(); - - public abstract String asSummaryString(); - - public abstract List listAcceptedFilterFields(); - - public abstract void applyDynamicFiltering(List candidateFilterFields); - public abstract boolean isStreaming(); @Nullable diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index 49ed0c0b8368..e914c617fffb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -32,17 +32,12 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.plan.stats.TableStats; import javax.annotation.Nullable; -import java.util.Collections; -import java.util.List; - /** A {@link FlinkTableSource} for system table. */ public class SystemTableSource extends FlinkTableSource { @@ -127,36 +122,6 @@ public String asSummaryString() { return "Paimon-SystemTable-Source"; } - @Override - public void pushWatermark(WatermarkStrategy watermarkStrategy) { - throw new UnsupportedOperationException(); - } - - @Override - public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider( - LookupTableSource.LookupContext context) { - throw new UnsupportedOperationException(); - } - - @Override - public TableStats reportStatistics() { - throw new UnsupportedOperationException(); - } - - @Override - public List listAcceptedFilterFields() { - // system table doesn't support dynamic filtering - return Collections.emptyList(); - } - - @Override - public void applyDynamicFiltering(List candidateFilterFields) { - throw new UnsupportedOperationException( - String.format( - "Cannot apply dynamic filtering to Paimon system table '%s'.", - table.name())); - } - @Override public boolean isStreaming() { return isStreamingMode; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java deleted file mode 100644 index 57c4b01a2876..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.table; - -import org.apache.paimon.flink.source.FlinkTableSource; - -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.ScanTableSource; - -/** The {@link ScanTableSource} with push down interfaces. */ -public abstract class BaseTableSource implements ScanTableSource { - - private final FlinkTableSource source; - - public BaseTableSource(FlinkTableSource source) { - this.source = source; - } - - @Override - public ChangelogMode getChangelogMode() { - return source.getChangelogMode(); - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - return source.getScanRuntimeProvider(runtimeProviderContext); - } - - @Override - public String asSummaryString() { - return source.asSummaryString(); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java deleted file mode 100644 index 7e55b83f6b8b..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.table; - -import org.apache.paimon.flink.source.FlinkTableSource; - -import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.expressions.ResolvedExpression; - -import java.util.List; - -/** A {@link RichTableSource} with push down. */ -public class PushedRichTableSource extends RichTableSource - implements SupportsFilterPushDown, SupportsProjectionPushDown, SupportsLimitPushDown { - - private final FlinkTableSource source; - - public PushedRichTableSource(FlinkTableSource source) { - super(source); - this.source = source; - } - - @Override - public PushedRichTableSource copy() { - return new PushedRichTableSource(source.copy()); - } - - @Override - public Result applyFilters(List filters) { - return Result.of(filters, source.pushFilters(filters)); - } - - @Override - public void applyLimit(long limit) { - source.pushLimit(limit); - } - - @Override - public boolean supportsNestedProjection() { - return false; - } - - @Override - public void applyProjection(int[][] projectedFields) { - source.pushProjection(projectedFields); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java deleted file mode 100644 index a1389b5bfa56..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.table; - -import org.apache.paimon.flink.source.FlinkTableSource; - -import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.expressions.ResolvedExpression; - -import java.util.List; - -/** The {@link BaseTableSource} with push down. */ -public class PushedTableSource extends BaseTableSource - implements SupportsFilterPushDown, SupportsProjectionPushDown, SupportsLimitPushDown { - - private final FlinkTableSource source; - - public PushedTableSource(FlinkTableSource source) { - super(source); - this.source = source; - } - - @Override - public PushedTableSource copy() { - return new PushedTableSource(source.copy()); - } - - @Override - public Result applyFilters(List filters) { - return Result.of(filters, source.pushFilters(filters)); - } - - @Override - public void applyLimit(long limit) { - source.pushLimit(limit); - } - - @Override - public boolean supportsNestedProjection() { - return false; - } - - @Override - public void applyProjection(int[][] projectedFields) { - source.pushProjection(projectedFields); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java deleted file mode 100644 index 4bf0c169b71c..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.table; - -import org.apache.paimon.flink.source.FlinkTableSource; - -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; -import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; -import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.plan.stats.TableStats; - -import java.util.List; - -/** The {@link BaseTableSource} with lookup, watermark, statistic and dynamic filtering. */ -public class RichTableSource extends BaseTableSource - implements LookupTableSource, - SupportsWatermarkPushDown, - SupportsStatisticReport, - SupportsDynamicFiltering { - - private final FlinkTableSource source; - - public RichTableSource(FlinkTableSource source) { - super(source); - this.source = source; - } - - @Override - public RichTableSource copy() { - return new RichTableSource(source.copy()); - } - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - return source.getLookupRuntimeProvider(context); - } - - @Override - public void applyWatermark(WatermarkStrategy watermarkStrategy) { - source.pushWatermark(watermarkStrategy); - } - - @Override - public TableStats reportStatistics() { - return source.reportStatistics(); - } - - @Override - public List listAcceptedFilterFields() { - return source.listAcceptedFilterFields(); - } - - @Override - public void applyDynamicFiltering(List candidateFilterFields) { - source.applyDynamicFiltering(candidateFilterFields); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index a0ce8be3af7f..cf97f7b67d4d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -57,33 +57,6 @@ protected List ddl() { + " WITH ('changelog-producer'='input', 'bucket' = '1')"); } - @Test - public void testSourceReuseWithoutScanPushDown() { - sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')"); - sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')"); - - StatementSet statementSet = sEnv.createStatementSet(); - statementSet.addInsertSql( - "INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */"); - statementSet.addInsertSql( - "INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */"); - assertThat(statementSet.compilePlan().explain()).contains("Reused"); - - statementSet = sEnv.createStatementSet(); - statementSet.addInsertSql( - "INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'"); - statementSet.addInsertSql( - "INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'"); - assertThat(statementSet.compilePlan().explain()).contains("Reused"); - - statementSet = sEnv.createStatementSet(); - statementSet.addInsertSql( - "INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5"); - statementSet.addInsertSql( - "INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10"); - assertThat(statementSet.compilePlan().explain()).contains("Reused"); - } - @Test public void testSourceReuseWithScanPushDown() { // source can be reused with projection applied diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java index 95f5721c4651..cff9ab6f4d25 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java @@ -52,13 +52,14 @@ public void testApplyFilterNonPartitionTable() throws Exception { Schema schema = Schema.newBuilder().column("col1", DataTypes.INT()).build(); TableSchema tableSchema = new SchemaManager(fileIO, tablePath).createTable(schema); Table table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); - FlinkTableSource tableSource = + DataTableSource tableSource = new DataTableSource( ObjectIdentifier.of("catalog1", "db1", "T"), table, false, null, null); // col1 = 1 List filters = ImmutableList.of(col1Equal1()); - Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters); + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) + .isEqualTo(filters); } @Test @@ -81,54 +82,57 @@ public void testApplyPartitionTable() throws Exception { // col1 = 1 && p1 = 1 => [p1 = 1] List filters = ImmutableList.of(col1Equal1(), p1Equal1()); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(0))); // col1 = 1 && p2 like '%a' => None filters = ImmutableList.of(col1Equal1(), p2Like("%a")); - Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters); + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) + .isEqualTo(filters); // col1 = 1 && p2 like 'a%' => [p2 like 'a%'] filters = ImmutableList.of(col1Equal1(), p2Like("a%")); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(0))); // rand(42) > 0.1 => None filters = ImmutableList.of(rand()); - Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters); + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) + .isEqualTo(filters); // upper(p1) = "A" => [upper(p1) = "A"] filters = ImmutableList.of(upperP2EqualA()); - Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters); + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) + .isEqualTo(filters); // col1 = 1 && (p2 like 'a%' or p1 = 1) => [p2 like 'a%' or p1 = 1] filters = ImmutableList.of(col1Equal1(), or(p2Like("a%"), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(0))); // col1 = 1 && (p2 like '%a' or p1 = 1) => None filters = ImmutableList.of(col1Equal1(), or(p2Like("%a"), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .containsExactlyInAnyOrder(filters.toArray(new ResolvedExpression[0])); // col1 = 1 && (p2 like 'a%' && p1 = 1) => [p2 like 'a%' && p1 = 1] filters = ImmutableList.of(col1Equal1(), and(p2Like("a%"), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(0))); // col1 = 1 && (p2 like '%a' && p1 = 1) => None filters = ImmutableList.of(col1Equal1(), and(p2Like("%a"), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .containsExactlyInAnyOrder(filters.toArray(new ResolvedExpression[0])); // p2 like 'a%' && (col1 = 1 or p1 = 1) => [col1 = 1 or p1 = 1] filters = ImmutableList.of(p2Like("a%"), or(col1Equal1(), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(1))); // p2 like 'a%' && (col1 = 1 && p1 = 1) => [col1 = 1 && p1 = 1] filters = ImmutableList.of(p2Like("a%"), and(col1Equal1(), p1Equal1())); - Assertions.assertThat(tableSource.pushFilters(filters)) + Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters()) .isEqualTo(ImmutableList.of(filters.get(1))); }