Skip to content

Commit

Permalink
[flink] Remove scan.push-down support which should be supported by Flink
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Sep 19, 2024
1 parent 626b320 commit 933ba7d
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.</td>
</tr>
<tr>
<td><h5>scan.push-down</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down.</td>
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.flink.source.table.PushedRichTableSource;
import org.apache.paimon.flink.source.table.PushedTableSource;
import org.apache.paimon.flink.source.table.RichTableSource;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.lineage.TableLineageEntity;
Expand Down Expand Up @@ -83,7 +80,6 @@
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PUSH_DOWN;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;

Expand All @@ -100,11 +96,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier()));
return new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier());
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
Expand All @@ -120,16 +115,12 @@ public DynamicTableSource createDynamicTableSource(Context context) {
}
});
}
DataTableSource source =
new DataTableSource(
context.getObjectIdentifier(),
table,
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
return new Options(table.options()).get(SCAN_PUSH_DOWN)
? new PushedRichTableSource(source)
: new RichTableSource(source);
return new DataTableSource(
context.getObjectIdentifier(),
table,
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,6 @@ public class FlinkConnectorOptions {
"Weight of managed memory for RocksDB in cross-partition update, Flink will compute the memory size "
+ "according to the weight, the actual memory used depends on the running environment.");

public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
ConfigOptions.key("scan.push-down")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, flink will push down projection, filters, limit to the source. The cost is that it "
+ "is difficult to reuse the source in a job. With flink 1.18 or higher version, it "
+ "is possible to reuse the source even with projection push down.");

public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
ConfigOptions.key("source.checkpoint-align.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.TableStats;
Expand All @@ -66,13 +70,13 @@

/**
* Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under
* batch mode or change-tracking is disabled. For streaming mode with change-tracking enabled and
* FULL scan mode, it will create a {@link
* org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
* LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created by {@link
* LogSourceProvider}.
* batch mode or streaming mode.
*/
public class DataTableSource extends FlinkTableSource {
public class DataTableSource extends FlinkTableSource
implements LookupTableSource,
SupportsWatermarkPushDown,
SupportsStatisticReport,
SupportsDynamicFiltering {

private final ObjectIdentifier tableIdentifier;
private final boolean streaming;
Expand Down Expand Up @@ -225,7 +229,7 @@ public DataTableSource copy() {
}

@Override
public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,13 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +50,11 @@
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {
public abstract class FlinkTableSource
implements ScanTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown {

private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);

Expand Down Expand Up @@ -85,8 +84,8 @@ public FlinkTableSource(
this.limit = limit;
}

/** @return The unconsumed filters. */
public List<ResolvedExpression> pushFilters(List<ResolvedExpression> filters) {
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
List<String> partitionKeys = table.partitionKeys();
RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());

Expand Down Expand Up @@ -115,35 +114,24 @@ public List<ResolvedExpression> pushFilters(List<ResolvedExpression> filters) {
predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
LOG.info("Consumed filters: {} of {}", consumedFilters, filters);

return unConsumedFilters;
return Result.of(filters, unConsumedFilters);
}

public void pushProjection(int[][] projectedFields) {
@Override
public boolean supportsNestedProjection() {
return false;
}

@Override
public void applyProjection(int[][] projectedFields) {
this.projectFields = projectedFields;
}

public void pushLimit(long limit) {
@Override
public void applyLimit(long limit) {
this.limit = limit;
}

public abstract ChangelogMode getChangelogMode();

public abstract ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

public abstract void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy);

public abstract LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

public abstract TableStats reportStatistics();

public abstract FlinkTableSource copy();

public abstract String asSummaryString();

public abstract List<String> listAcceptedFilterFields();

public abstract void applyDynamicFiltering(List<String> candidateFilterFields);

public abstract boolean isStreaming();

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.plan.stats.TableStats;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** A {@link FlinkTableSource} for system table. */
public class SystemTableSource extends FlinkTableSource {

Expand Down Expand Up @@ -127,36 +122,6 @@ public String asSummaryString() {
return "Paimon-SystemTable-Source";
}

@Override
public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
throw new UnsupportedOperationException();
}

@Override
public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(
LookupTableSource.LookupContext context) {
throw new UnsupportedOperationException();
}

@Override
public TableStats reportStatistics() {
throw new UnsupportedOperationException();
}

@Override
public List<String> listAcceptedFilterFields() {
// system table doesn't support dynamic filtering
return Collections.emptyList();
}

@Override
public void applyDynamicFiltering(List<String> candidateFilterFields) {
throw new UnsupportedOperationException(
String.format(
"Cannot apply dynamic filtering to Paimon system table '%s'.",
table.name()));
}

@Override
public boolean isStreaming() {
return isStreamingMode;
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 933ba7d

Please sign in to comment.