Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perfect the aggregation queries when there is no devices or no data partitions. #13907

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -899,14 +899,15 @@ public void groupByDateBinTest() {
};

// TODO(beyyes) test below
// sql = "select count(*) from (\n" +
// "\tselect device, level, date_bin(1d, time) as bin, \n" +
// "\tcount(num) as count_num, count(*) as count_star, count(device) as count_device,
// count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2,
// count(time) as count_time, avg(num) as avg_num \n" +
// "\tfrom table0 \n" +
// "\tgroup by 3, device, level order by device, level, bin\n" +
// ")\n";
// sql = "select count(*) from (\n" +
// "\tselect device, level, date_bin(1d, time) as bin, \n" +
// "\tcount(num) as count_num, count(*) as count_star, count(device) as
// count_device,
// count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2,
// count(time) as count_time, avg(num) as avg_num \n" +
// "\tfrom table0 \n" +
// "\tgroup by 3, device, level order by device, level, bin\n" +
// ")\n";
}

@Test
Expand Down Expand Up @@ -1004,6 +1005,27 @@ public void aggregationNoDataTest() {
+ "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, sum(num) as sum_num,"
+ "avg(num) as avg_num from table0 where time=32 or time=1971-04-27T01:46:40.000+08:00 group by 3, device, level order by device, level";
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// queried device is not exist
expectedHeader = buildHeaders(3);
sql = "select count(*), count(num), sum(num) from table0 where device='d_not_exist'";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
sql =
"select count(*), count(num), sum(num) from table0 where device='d_not_exist1' or device='d_not_exist2'";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// no data in given time range
sql = "select count(*), count(num), sum(num) from table0 where time>2100-04-26T18:01:40.000";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// only one device has data in queried time
expectedHeader = buildHeaders(2);
sql = "select count(num),sum(num) from table1 where time=0";
retArray = new String[] {"2,6.0,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

@Test
Expand Down Expand Up @@ -1150,6 +1172,11 @@ public void lastByFirstByTest() {
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);

expectedHeader = buildHeaders(3);
sql = "select last_by(time,num+1),last_by(num+1,time),last_by(num+1,floatnum+1) from table0";
retArray = new String[] {"1971-08-20T11:33:20.000Z,16,16,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.file.metadata.statistics.StringStatistics;
import org.apache.tsfile.read.common.TimeRange;
Expand All @@ -55,6 +56,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -170,7 +172,7 @@ public TableAggregationTableScanOperator(
this.maxReturnSize = maxReturnSize;
this.maxTsBlockLineNum = maxTsBlockLineNum;

this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
}

@Override
Expand Down Expand Up @@ -253,17 +255,27 @@ private TsBlock buildResultTsBlock() {
return resultTsBlock;
}

private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) {
private void constructAlignedSeriesScanUtil() {
DeviceEntry deviceEntry;

if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) {
// for device which is not exist
deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList());
} else {
deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
}

AlignedFullPath alignedPath =
constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas);

return new AlignedSeriesScanUtil(
alignedPath,
scanOrder,
seriesScanOptions,
operatorContext.getInstanceContext(),
true,
measurementColumnTSDataTypes);
this.seriesScanUtil =
new AlignedSeriesScanUtil(
alignedPath,
scanOrder,
seriesScanOptions,
operatorContext.getInstanceContext(),
true,
measurementColumnTSDataTypes);
}

/** Return true if we have the result of this timeRange. */
Expand Down Expand Up @@ -313,7 +325,7 @@ && readAndCalcFromFile()) {

if (currentDeviceIndex < deviceCount) {
// construct AlignedSeriesScanUtil for next device
this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
}
Expand Down Expand Up @@ -790,8 +802,7 @@ private void checkIfAllAggregatorHasFinalResult() {

if (currentDeviceIndex < deviceCount) {
// construct AlignedSeriesScanUtil for next device
this.seriesScanUtil =
constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public StatementMemorySource visitExplain(
symbolAllocator,
NOOP)
.plan(context.getAnalysis());
if (context.getAnalysis().isEmptyDataSource()) {
return new StatementMemorySource(new TsBlock(0), header);
}
// if (context.getAnalysis().isEmptyDataSource()) {
// return new StatementMemorySource(new TsBlock(0), header);
// }

// Generate table model distributed plan
final TableDistributedPlanGenerator.PlanContext planContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public Operator visitSeriesAggregationScan(
boolean canUseStatistics =
!TSDataType.BLOB.equals(node.getSeriesPath().getSeriesType())
|| (aggregationDescriptors.stream()
.noneMatch(o -> !judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB)));
.allMatch(o -> judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB)));
SeriesAggregationScanOperator aggregateScanOperator =
new SeriesAggregationScanOperator(
node.getPlanNodeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public List<String> visitAggregationTableScan(
String.format(
"RegionId: %s",
node.getRegionReplicaSet() == null || node.getRegionReplicaSet().getRegionId() == null
? ""
? "Not Assigned"
: node.getRegionReplicaSet().getRegionId().getId()));
return render(node, boxValue, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
Expand Down Expand Up @@ -68,8 +69,10 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.errorprone.annotations.Immutable;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.TimeDuration;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -332,8 +335,9 @@ public List<FunctionCall> getAggregates(QuerySpecification query) {
return aggregates.get(NodeRef.of(query));
}

public boolean hasAggregates() {
return !aggregates.isEmpty();
public boolean noAggregates() {
return aggregates.isEmpty()
|| (aggregates.size() == 1 && aggregates.entrySet().iterator().next().getValue().isEmpty());
}

public void setOrderByAggregates(OrderBy node, List<Expression> aggregates) {
Expand Down Expand Up @@ -809,6 +813,15 @@ public void addEndPointToRedirectNodeList(TEndPoint endPoint) {
redirectNodeList.add(endPoint);
}

public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
String database, IDeviceID deviceId, Filter timeFilter) {
if (dataPartition == null) {
return Collections.singletonList(new TRegionReplicaSet());
} else {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(database, deviceId, timeFilter);
}
}

@Override
public TimePredicate getCovertedTimePredicate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,22 +566,26 @@ public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {

Map<TRegionReplicaSet, AggregationTableScanNode> tableScanNodeMap = new HashMap<>();
boolean needSplit = false;
List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
List<TRegionReplicaSet> regionReplicaSets =
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetWithTimeFilter(
node.getQualifiedObjectName().getDatabaseName(),
deviceEntry.getDeviceID(),
node.getTimeFilter());
analysis.getDataRegionReplicaSetWithTimeFilter(
node.getQualifiedObjectName().getDatabaseName(),
deviceEntry.getDeviceID(),
node.getTimeFilter());
if (regionReplicaSets.size() > 1) {
needSplit = true;
}
regionReplicaSetsList.add(regionReplicaSets);
}

if (regionReplicaSetsList.isEmpty()) {
regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(new TRegionReplicaSet()));
}

Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new HashMap<>();
// Step is SINGLE, has date_bin(time) and device data in more than one region, we need to split
// this node into two-stage Aggregation
needSplit = needSplit && node.getProjection() != null && node.getStep() == SINGLE;
Expand All @@ -591,78 +595,15 @@ public List<PlanNode> visitAggregationTableScan(
split(node, symbolAllocator, queryId);
finalAggregation = splitResult.left;
AggregationTableScanNode partialAggregation = splitResult.right;
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
tableScanNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
partialAggregation.getQualifiedObjectName(),
partialAggregation.getOutputSymbols(),
partialAggregation.getAssignments(),
new ArrayList<>(),
partialAggregation.getIdAndAttributeIndexMap(),
partialAggregation.getScanOrder(),
partialAggregation.getTimePredicate().orElse(null),
partialAggregation.getPushDownPredicate(),
partialAggregation.getPushDownLimit(),
partialAggregation.getPushDownOffset(),
partialAggregation.isPushLimitToEachDevice(),
partialAggregation.getProjection(),
partialAggregation.getAggregations(),
partialAggregation.getGroupingSets(),
partialAggregation.getPreGroupedSymbols(),
partialAggregation.getStep(),
partialAggregation.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i));
}
}
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, partialAggregation);
} else {
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
tableScanNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
node.getQualifiedObjectName(),
node.getOutputSymbols(),
node.getAssignments(),
new ArrayList<>(),
node.getIdAndAttributeIndexMap(),
node.getScanOrder(),
node.getTimePredicate().orElse(null),
node.getPushDownPredicate(),
node.getPushDownLimit(),
node.getPushDownOffset(),
node.isPushLimitToEachDevice(),
node.getProjection(),
node.getAggregations(),
node.getGroupingSets(),
node.getPreGroupedSymbols(),
node.getStep(),
node.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i));
}
}
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
}

List<PlanNode> resultTableScanNodeList = new ArrayList<>();
TRegionReplicaSet mostUsedDataRegion = null;
int maxDeviceEntrySizeOfTableScan = 0;
for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry :
tableScanNodeMap.entrySet()) {
for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry : regionNodeMap.entrySet()) {
TRegionReplicaSet regionReplicaSet = entry.getKey();
TableScanNode subTableScanNode = entry.getValue();
subTableScanNode.setPlanNodeId(queryId.genPlanNodeId());
Expand Down Expand Up @@ -698,6 +639,49 @@ public List<PlanNode> visitAggregationTableScan(
return resultTableScanNodeList;
}

private void buildRegionNodeMap(
AggregationTableScanNode originalAggTableScanNode,
List<List<TRegionReplicaSet>> regionReplicaSetsList,
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap,
AggregationTableScanNode partialAggTableScanNode) {
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
regionNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
partialAggTableScanNode.getQualifiedObjectName(),
partialAggTableScanNode.getOutputSymbols(),
partialAggTableScanNode.getAssignments(),
new ArrayList<>(),
partialAggTableScanNode.getIdAndAttributeIndexMap(),
partialAggTableScanNode.getScanOrder(),
partialAggTableScanNode.getTimePredicate().orElse(null),
partialAggTableScanNode.getPushDownPredicate(),
partialAggTableScanNode.getPushDownLimit(),
partialAggTableScanNode.getPushDownOffset(),
partialAggTableScanNode.isPushLimitToEachDevice(),
partialAggTableScanNode.getProjection(),
partialAggTableScanNode.getAggregations(),
partialAggTableScanNode.getGroupingSets(),
partialAggTableScanNode.getPreGroupedSymbols(),
partialAggTableScanNode.getStep(),
partialAggTableScanNode.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
if (originalAggTableScanNode.getDeviceEntries().size() > i
&& originalAggTableScanNode.getDeviceEntries().get(i) != null) {
aggregationTableScanNode.appendDeviceEntry(
originalAggTableScanNode.getDeviceEntries().get(i));
}
}
}
}

private static OrderingScheme constructOrderingSchema(List<Symbol> symbols) {
Map<Symbol, SortOrder> orderings = new HashMap<>();
symbols.forEach(symbol -> orderings.put(symbol, SortOrder.ASC_NULLS_LAST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer {
@Override
public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
if (!(context.getAnalysis().getStatement() instanceof Query)
|| !context.getAnalysis().hasAggregates()) {
|| context.getAnalysis().noAggregates()) {
return plan;
}

Expand Down
Loading
Loading