Skip to content

Commit

Permalink
[Improve] add partial limit push down (#553)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Feb 12, 2025
1 parent 3c7bd66 commit efe53f3
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class DorisReadOptions implements Serializable {
private boolean useOldApi;
private boolean useFlightSql;
private Integer flightSqlPort;
// for flink sql limit push down
private Long rowLimit;

public DorisReadOptions(
String readFields,
Expand All @@ -54,7 +56,8 @@ public DorisReadOptions(
Boolean deserializeArrowAsync,
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort) {
Integer flightSqlPort,
Long rowLimit) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -69,6 +72,7 @@ public DorisReadOptions(
this.useOldApi = useOldApi;
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
this.rowLimit = rowLimit;
}

public String getReadFields() {
Expand Down Expand Up @@ -135,6 +139,14 @@ public Integer getFlightSqlPort() {
return flightSqlPort;
}

public Long getRowLimit() {
return rowLimit;
}

public void setRowLimit(Long rowLimit) {
this.rowLimit = rowLimit;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -165,7 +177,8 @@ public boolean equals(Object o) {
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort);
&& Objects.equals(flightSqlPort, that.flightSqlPort)
&& Objects.equals(rowLimit, that.rowLimit);
}

@Override
Expand All @@ -184,7 +197,8 @@ public int hashCode() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}

public DorisReadOptions copy() {
Expand All @@ -202,7 +216,8 @@ public DorisReadOptions copy() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -227,6 +242,7 @@ public static class Builder {
private Boolean useOldApi = false;
private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort;
private Long rowLimit;

/**
* Sets the readFields for doris table to push down projection, such as name,age.
Expand Down Expand Up @@ -406,7 +422,8 @@ public DorisReadOptions build() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ private String parseFlightSql(
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " WHERE " + readOptions.getFilterQuery();
}

if (readOptions.getRowLimit() != null) {
sql += " LIMIT " + readOptions.getRowLimit();
}

logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class DorisValueReader extends ValueReader implements AutoCloseable {
protected String contextId;
protected Schema schema;
protected boolean asyncThreadStarted;
private long readRowCount = 0L;

public DorisValueReader(
PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
Expand Down Expand Up @@ -210,6 +211,10 @@ protected boolean asyncThreadStarted() {
* @return true if hax next value
*/
public boolean hasNext() {
if (readOptions.getRowLimit() != null && readRowCount >= readOptions.getRowLimit()) {
return false;
}

boolean hasNext = false;
if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
// support deserialize Arrow to RowBatch asynchronously
Expand Down Expand Up @@ -275,6 +280,7 @@ public List next() {
LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
readRowCount++;
return rowBatch.next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
Expand Down Expand Up @@ -58,7 +59,8 @@ public final class DorisDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown {
SupportsProjectionPushDown,
SupportsLimitPushDown {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
Expand Down Expand Up @@ -256,4 +258,10 @@ public int hashCode() {
resolvedFilterQuery,
physicalRowDataType);
}

@Override
public void applyLimit(long limit) {
// partial limit push down to reduce the amount of data scanned
readOptions.setRowLimit(limit);
}
}

0 comments on commit efe53f3

Please sign in to comment.