Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] add partial limit push down #553

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class DorisReadOptions implements Serializable {
private boolean useOldApi;
private boolean useFlightSql;
private Integer flightSqlPort;
// for flink sql limit push down
private Long rowLimit;

public DorisReadOptions(
String readFields,
Expand All @@ -54,7 +56,8 @@ public DorisReadOptions(
Boolean deserializeArrowAsync,
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort) {
Integer flightSqlPort,
Long rowLimit) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -69,6 +72,7 @@ public DorisReadOptions(
this.useOldApi = useOldApi;
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
this.rowLimit = rowLimit;
}

public String getReadFields() {
Expand Down Expand Up @@ -135,6 +139,14 @@ public Integer getFlightSqlPort() {
return flightSqlPort;
}

public Long getRowLimit() {
return rowLimit;
}

public void setRowLimit(Long rowLimit) {
this.rowLimit = rowLimit;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -165,7 +177,8 @@ public boolean equals(Object o) {
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort);
&& Objects.equals(flightSqlPort, that.flightSqlPort)
&& Objects.equals(rowLimit, that.rowLimit);
}

@Override
Expand All @@ -184,7 +197,8 @@ public int hashCode() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}

public DorisReadOptions copy() {
Expand All @@ -202,7 +216,8 @@ public DorisReadOptions copy() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -227,6 +242,7 @@ public static class Builder {
private Boolean useOldApi = false;
private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort;
private Long rowLimit;

/**
* Sets the readFields for doris table to push down projection, such as name,age.
Expand Down Expand Up @@ -406,7 +422,8 @@ public DorisReadOptions build() {
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
flightSqlPort,
rowLimit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ private String parseFlightSql(
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " WHERE " + readOptions.getFilterQuery();
}

if (readOptions.getRowLimit() != null) {
sql += " LIMIT " + readOptions.getRowLimit();
}

logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class DorisValueReader extends ValueReader implements AutoCloseable {
protected String contextId;
protected Schema schema;
protected boolean asyncThreadStarted;
private long readRowCount = 0L;

public DorisValueReader(
PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
Expand Down Expand Up @@ -210,6 +211,10 @@ protected boolean asyncThreadStarted() {
* @return true if hax next value
*/
public boolean hasNext() {
if (readOptions.getRowLimit() != null && readRowCount >= readOptions.getRowLimit()) {
return false;
}

boolean hasNext = false;
if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
// support deserialize Arrow to RowBatch asynchronously
Expand Down Expand Up @@ -275,6 +280,7 @@ public List next() {
LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
readRowCount++;
return rowBatch.next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
Expand Down Expand Up @@ -58,7 +59,8 @@ public final class DorisDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown {
SupportsProjectionPushDown,
SupportsLimitPushDown {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
Expand Down Expand Up @@ -253,4 +255,10 @@ public int hashCode() {
resolvedFilterQuery,
physicalRowDataType);
}

@Override
public void applyLimit(long limit) {
// partial limit push down to reduce the amount of data scanned
readOptions.setRowLimit(limit);
}
}