From efe53f304274dc4a6cea3c52b6a18e1b3e48bce4 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 12 Feb 2025 10:32:54 +0800 Subject: [PATCH] [Improve] add partial limit push down (#553) --- .../doris/flink/cfg/DorisReadOptions.java | 27 +++++++++++++++---- .../source/reader/DorisFlightValueReader.java | 5 ++++ .../flink/source/reader/DorisValueReader.java | 6 +++++ .../flink/table/DorisDynamicTableSource.java | 10 ++++++- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 22a77b83f..1889ace5e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -39,6 +39,8 @@ public class DorisReadOptions implements Serializable { private boolean useOldApi; private boolean useFlightSql; private Integer flightSqlPort; + // for flink sql limit push down + private Long rowLimit; public DorisReadOptions( String readFields, @@ -54,7 +56,8 @@ public DorisReadOptions( Boolean deserializeArrowAsync, boolean useOldApi, boolean useFlightSql, - Integer flightSqlPort) { + Integer flightSqlPort, + Long rowLimit) { this.readFields = readFields; this.filterQuery = filterQuery; this.requestTabletSize = requestTabletSize; @@ -69,6 +72,7 @@ public DorisReadOptions( this.useOldApi = useOldApi; this.useFlightSql = useFlightSql; this.flightSqlPort = flightSqlPort; + this.rowLimit = rowLimit; } public String getReadFields() { @@ -135,6 +139,14 @@ public Integer getFlightSqlPort() { return flightSqlPort; } + public Long getRowLimit() { + return rowLimit; + } + + public void setRowLimit(Long rowLimit) { + this.rowLimit = rowLimit; + } + public static Builder builder() { return new Builder(); } @@ -165,7 +177,8 @@ public boolean equals(Object o) { && Objects.equals(deserializeQueueSize, that.deserializeQueueSize) && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync) && Objects.equals(useFlightSql, that.useFlightSql) - && Objects.equals(flightSqlPort, that.flightSqlPort); + && Objects.equals(flightSqlPort, that.flightSqlPort) + && Objects.equals(rowLimit, that.rowLimit); } @Override @@ -184,7 +197,8 @@ public int hashCode() { deserializeArrowAsync, useOldApi, useFlightSql, - flightSqlPort); + flightSqlPort, + rowLimit); } public DorisReadOptions copy() { @@ -202,7 +216,8 @@ public DorisReadOptions copy() { deserializeArrowAsync, useOldApi, useFlightSql, - flightSqlPort); + flightSqlPort, + rowLimit); } /** Builder of {@link DorisReadOptions}. */ @@ -227,6 +242,7 @@ public static class Builder { private Boolean useOldApi = false; private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT; private Integer flightSqlPort; + private Long rowLimit; /** * Sets the readFields for doris table to push down projection, such as name,age. @@ -406,7 +422,8 @@ public DorisReadOptions build() { deserializeArrowAsync, useOldApi, useFlightSql, - flightSqlPort); + flightSqlPort, + rowLimit); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java index fbf050501..0234be0f0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java @@ -132,6 +132,11 @@ private String parseFlightSql( if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { sql += " WHERE " + readOptions.getFilterQuery(); } + + if (readOptions.getRowLimit() != null) { + sql += " LIMIT " + readOptions.getRowLimit(); + } + logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); return sql; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index 2db4f7984..e55bb7cad 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -74,6 +74,7 @@ public class DorisValueReader extends ValueReader implements AutoCloseable { protected String contextId; protected Schema schema; protected boolean asyncThreadStarted; + private long readRowCount = 0L; public DorisValueReader( PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) { @@ -210,6 +211,10 @@ protected boolean asyncThreadStarted() { * @return true if hax next value */ public boolean hasNext() { + if (readOptions.getRowLimit() != null && readRowCount >= readOptions.getRowLimit()) { + return false; + } + boolean hasNext = false; if (deserializeArrowToRowBatchAsync && asyncThreadStarted) { // support deserialize Arrow to RowBatch asynchronously @@ -275,6 +280,7 @@ public List next() { LOG.error(SHOULD_NOT_HAPPEN_MESSAGE); throw new ShouldNeverHappenException(); } + readRowCount++; return rowBatch.next(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index a68cf1892..e4f27d8bc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -29,6 +29,7 @@ import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; @@ -58,7 +59,8 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsFilterPushDown, - SupportsProjectionPushDown { + SupportsProjectionPushDown, + SupportsLimitPushDown { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class); private final DorisOptions options; @@ -256,4 +258,10 @@ public int hashCode() { resolvedFilterQuery, physicalRowDataType); } + + @Override + public void applyLimit(long limit) { + // partial limit push down to reduce the amount of data scanned + readOptions.setRowLimit(limit); + } }