Skip to content

Commit

Permalink
[flink] Remove scan.push-down support which should be supported by Fl…
Browse files Browse the repository at this point in the history
…ink (#4224)
  • Loading branch information
JingsongLi authored Sep 23, 2024
1 parent 350990a commit a710041
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.</td>
</tr>
<tr>
<td><h5>scan.push-down</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down.</td>
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** A {@link BaseDataTableSource} for Flink 1.15. */
public class DataTableSource extends BaseDataTableSource {

public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this(
tableIdentifier,
table,
streaming,
context,
logStoreTableFactory,
null,
null,
null,
null);
}

public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
super(
tableIdentifier,
table,
streaming,
context,
logStoreTableFactory,
predicate,
projectFields,
limit,
watermarkStrategy);
}

@Override
public DataTableSource copy() {
return new DataTableSource(
tableIdentifier,
table,
streaming,
context,
logStoreTableFactory,
predicate,
projectFields,
limit,
watermarkStrategy);
}

@Override
protected List<String> dynamicPartitionFilteringFields() {
return Collections.emptyList();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.flink.source.table.PushedRichTableSource;
import org.apache.paimon.flink.source.table.PushedTableSource;
import org.apache.paimon.flink.source.table.RichTableSource;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.lineage.TableLineageEntity;
Expand Down Expand Up @@ -83,7 +80,6 @@
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PUSH_DOWN;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;

Expand All @@ -100,11 +96,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier()));
return new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier());
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
Expand All @@ -120,16 +115,12 @@ public DynamicTableSource createDynamicTableSource(Context context) {
}
});
}
DataTableSource source =
new DataTableSource(
context.getObjectIdentifier(),
table,
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
return new Options(table.options()).get(SCAN_PUSH_DOWN)
? new PushedRichTableSource(source)
: new RichTableSource(source);
return new DataTableSource(
context.getObjectIdentifier(),
table,
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,6 @@ public class FlinkConnectorOptions {
"Weight of managed memory for RocksDB in cross-partition update, Flink will compute the memory size "
+ "according to the weight, the actual memory used depends on the running environment.");

public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
ConfigOptions.key("scan.push-down")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, flink will push down projection, filters, limit to the source. The cost is that it "
+ "is difficult to reuse the source in a job. With flink 1.18 or higher version, it "
+ "is possible to reuse the source even with projection push down.");

public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
ConfigOptions.key("source.checkpoint-align.enabled")
.booleanType()
Expand Down
Loading

0 comments on commit a710041

Please sign in to comment.