Skip to content

Commit

Permalink
Perfect the aggregation queries when there is no devices or no data p…
Browse files Browse the repository at this point in the history
…artitions.
  • Loading branch information
Beyyes authored Oct 25, 2024
1 parent 5d83392 commit 80cd9f7
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -910,14 +910,15 @@ public void groupByDateBinTest() {
};

// TODO(beyyes) test below
// sql = "select count(*) from (\n" +
// "\tselect device, level, date_bin(1d, time) as bin, \n" +
// "\tcount(num) as count_num, count(*) as count_star, count(device) as count_device,
// count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2,
// count(time) as count_time, avg(num) as avg_num \n" +
// "\tfrom table0 \n" +
// "\tgroup by 3, device, level order by device, level, bin\n" +
// ")\n";
// sql = "select count(*) from (\n" +
// "\tselect device, level, date_bin(1d, time) as bin, \n" +
// "\tcount(num) as count_num, count(*) as count_star, count(device) as
// count_device,
// count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2,
// count(time) as count_time, avg(num) as avg_num \n" +
// "\tfrom table0 \n" +
// "\tgroup by 3, device, level order by device, level, bin\n" +
// ")\n";
}

@Test
Expand Down Expand Up @@ -1015,6 +1016,27 @@ public void aggregationNoDataTest() {
+ "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, sum(num) as sum_num,"
+ "avg(num) as avg_num from table0 where time=32 or time=1971-04-27T01:46:40.000+08:00 group by 3, device, level order by device, level";
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// queried device is not exist
expectedHeader = buildHeaders(3);
sql = "select count(*), count(num), sum(num) from table0 where device='d_not_exist'";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
sql =
"select count(*), count(num), sum(num) from table0 where device='d_not_exist1' or device='d_not_exist2'";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// no data in given time range
sql = "select count(*), count(num), sum(num) from table0 where time>2100-04-26T18:01:40.000";
retArray = new String[] {"0,0,null,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);

// only one device has data in queried time
expectedHeader = buildHeaders(2);
sql = "select count(num),sum(num) from table1 where time=0";
retArray = new String[] {"2,6.0,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

@Test
Expand Down Expand Up @@ -1161,6 +1183,11 @@ public void lastByFirstByTest() {
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);

expectedHeader = buildHeaders(3);
sql = "select last_by(time,num+1),last_by(num+1,time),last_by(num+1,floatnum+1) from table0";
retArray = new String[] {"1971-08-20T11:33:20.000Z,16,16,"};
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.file.metadata.statistics.StringStatistics;
import org.apache.tsfile.read.common.TimeRange;
Expand All @@ -55,6 +56,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -170,7 +172,7 @@ public TableAggregationTableScanOperator(
this.maxReturnSize = maxReturnSize;
this.maxTsBlockLineNum = maxTsBlockLineNum;

this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
}

@Override
Expand Down Expand Up @@ -253,17 +255,27 @@ private TsBlock buildResultTsBlock() {
return resultTsBlock;
}

private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) {
private void constructAlignedSeriesScanUtil() {
DeviceEntry deviceEntry;

if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) {
// for device which is not exist
deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList());
} else {
deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
}

AlignedFullPath alignedPath =
constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas);

return new AlignedSeriesScanUtil(
alignedPath,
scanOrder,
seriesScanOptions,
operatorContext.getInstanceContext(),
true,
measurementColumnTSDataTypes);
this.seriesScanUtil =
new AlignedSeriesScanUtil(
alignedPath,
scanOrder,
seriesScanOptions,
operatorContext.getInstanceContext(),
true,
measurementColumnTSDataTypes);
}

/** Return true if we have the result of this timeRange. */
Expand Down Expand Up @@ -313,7 +325,7 @@ && readAndCalcFromFile()) {

if (currentDeviceIndex < deviceCount) {
// construct AlignedSeriesScanUtil for next device
this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
}
Expand Down Expand Up @@ -790,8 +802,7 @@ private void checkIfAllAggregatorHasFinalResult() {

if (currentDeviceIndex < deviceCount) {
// construct AlignedSeriesScanUtil for next device
this.seriesScanUtil =
constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public StatementMemorySource visitExplain(
symbolAllocator,
NOOP)
.plan(context.getAnalysis());
if (context.getAnalysis().isEmptyDataSource()) {
return new StatementMemorySource(new TsBlock(0), header);
}
// if (context.getAnalysis().isEmptyDataSource()) {
// return new StatementMemorySource(new TsBlock(0), header);
// }

// Generate table model distributed plan
final TableDistributedPlanGenerator.PlanContext planContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public Operator visitSeriesAggregationScan(
boolean canUseStatistics =
!TSDataType.BLOB.equals(node.getSeriesPath().getSeriesType())
|| (aggregationDescriptors.stream()
.noneMatch(o -> !judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB)));
.allMatch(o -> judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB)));
SeriesAggregationScanOperator aggregateScanOperator =
new SeriesAggregationScanOperator(
node.getPlanNodeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public List<String> visitAggregationTableScan(
String.format(
"RegionId: %s",
node.getRegionReplicaSet() == null || node.getRegionReplicaSet().getRegionId() == null
? ""
? "Not Assigned"
: node.getRegionReplicaSet().getRegionId().getId()));
return render(node, boxValue, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
Expand Down Expand Up @@ -68,8 +69,10 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.errorprone.annotations.Immutable;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.TimeDuration;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -332,8 +335,9 @@ public List<FunctionCall> getAggregates(QuerySpecification query) {
return aggregates.get(NodeRef.of(query));
}

public boolean hasAggregates() {
return !aggregates.isEmpty();
public boolean noAggregates() {
return aggregates.isEmpty()
|| (aggregates.size() == 1 && aggregates.entrySet().iterator().next().getValue().isEmpty());
}

public void setOrderByAggregates(OrderBy node, List<Expression> aggregates) {
Expand Down Expand Up @@ -809,6 +813,15 @@ public void addEndPointToRedirectNodeList(TEndPoint endPoint) {
redirectNodeList.add(endPoint);
}

public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
String database, IDeviceID deviceId, Filter timeFilter) {
if (dataPartition == null) {
return Collections.singletonList(new TRegionReplicaSet());
} else {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(database, deviceId, timeFilter);
}
}

@Override
public TimePredicate getCovertedTimePredicate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,22 +566,26 @@ public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {

Map<TRegionReplicaSet, AggregationTableScanNode> tableScanNodeMap = new HashMap<>();
boolean needSplit = false;
List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
List<TRegionReplicaSet> regionReplicaSets =
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetWithTimeFilter(
node.getQualifiedObjectName().getDatabaseName(),
deviceEntry.getDeviceID(),
node.getTimeFilter());
analysis.getDataRegionReplicaSetWithTimeFilter(
node.getQualifiedObjectName().getDatabaseName(),
deviceEntry.getDeviceID(),
node.getTimeFilter());
if (regionReplicaSets.size() > 1) {
needSplit = true;
}
regionReplicaSetsList.add(regionReplicaSets);
}

if (regionReplicaSetsList.isEmpty()) {
regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(new TRegionReplicaSet()));
}

Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new HashMap<>();
// Step is SINGLE, has date_bin(time) and device data in more than one region, we need to split
// this node into two-stage Aggregation
needSplit = needSplit && node.getProjection() != null && node.getStep() == SINGLE;
Expand All @@ -591,78 +595,15 @@ public List<PlanNode> visitAggregationTableScan(
split(node, symbolAllocator, queryId);
finalAggregation = splitResult.left;
AggregationTableScanNode partialAggregation = splitResult.right;
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
tableScanNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
partialAggregation.getQualifiedObjectName(),
partialAggregation.getOutputSymbols(),
partialAggregation.getAssignments(),
new ArrayList<>(),
partialAggregation.getIdAndAttributeIndexMap(),
partialAggregation.getScanOrder(),
partialAggregation.getTimePredicate().orElse(null),
partialAggregation.getPushDownPredicate(),
partialAggregation.getPushDownLimit(),
partialAggregation.getPushDownOffset(),
partialAggregation.isPushLimitToEachDevice(),
partialAggregation.getProjection(),
partialAggregation.getAggregations(),
partialAggregation.getGroupingSets(),
partialAggregation.getPreGroupedSymbols(),
partialAggregation.getStep(),
partialAggregation.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i));
}
}
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, partialAggregation);
} else {
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
tableScanNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
node.getQualifiedObjectName(),
node.getOutputSymbols(),
node.getAssignments(),
new ArrayList<>(),
node.getIdAndAttributeIndexMap(),
node.getScanOrder(),
node.getTimePredicate().orElse(null),
node.getPushDownPredicate(),
node.getPushDownLimit(),
node.getPushDownOffset(),
node.isPushLimitToEachDevice(),
node.getProjection(),
node.getAggregations(),
node.getGroupingSets(),
node.getPreGroupedSymbols(),
node.getStep(),
node.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i));
}
}
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
}

List<PlanNode> resultTableScanNodeList = new ArrayList<>();
TRegionReplicaSet mostUsedDataRegion = null;
int maxDeviceEntrySizeOfTableScan = 0;
for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry :
tableScanNodeMap.entrySet()) {
for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry : regionNodeMap.entrySet()) {
TRegionReplicaSet regionReplicaSet = entry.getKey();
TableScanNode subTableScanNode = entry.getValue();
subTableScanNode.setPlanNodeId(queryId.genPlanNodeId());
Expand Down Expand Up @@ -698,6 +639,49 @@ public List<PlanNode> visitAggregationTableScan(
return resultTableScanNodeList;
}

private void buildRegionNodeMap(
AggregationTableScanNode originalAggTableScanNode,
List<List<TRegionReplicaSet>> regionReplicaSetsList,
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap,
AggregationTableScanNode partialAggTableScanNode) {
for (int i = 0; i < regionReplicaSetsList.size(); i++) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) {
AggregationTableScanNode aggregationTableScanNode =
regionNodeMap.computeIfAbsent(
regionReplicaSet,
k -> {
AggregationTableScanNode scanNode =
new AggregationTableScanNode(
queryId.genPlanNodeId(),
partialAggTableScanNode.getQualifiedObjectName(),
partialAggTableScanNode.getOutputSymbols(),
partialAggTableScanNode.getAssignments(),
new ArrayList<>(),
partialAggTableScanNode.getIdAndAttributeIndexMap(),
partialAggTableScanNode.getScanOrder(),
partialAggTableScanNode.getTimePredicate().orElse(null),
partialAggTableScanNode.getPushDownPredicate(),
partialAggTableScanNode.getPushDownLimit(),
partialAggTableScanNode.getPushDownOffset(),
partialAggTableScanNode.isPushLimitToEachDevice(),
partialAggTableScanNode.getProjection(),
partialAggTableScanNode.getAggregations(),
partialAggTableScanNode.getGroupingSets(),
partialAggTableScanNode.getPreGroupedSymbols(),
partialAggTableScanNode.getStep(),
partialAggTableScanNode.getGroupIdSymbol());
scanNode.setRegionReplicaSet(regionReplicaSet);
return scanNode;
});
if (originalAggTableScanNode.getDeviceEntries().size() > i
&& originalAggTableScanNode.getDeviceEntries().get(i) != null) {
aggregationTableScanNode.appendDeviceEntry(
originalAggTableScanNode.getDeviceEntries().get(i));
}
}
}
}

private static OrderingScheme constructOrderingSchema(List<Symbol> symbols) {
Map<Symbol, SortOrder> orderings = new HashMap<>();
symbols.forEach(symbol -> orderings.put(symbol, SortOrder.ASC_NULLS_LAST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer {
@Override
public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
if (!(context.getAnalysis().getStatement() instanceof Query)
|| !context.getAnalysis().hasAggregates()) {
|| context.getAnalysis().noAggregates()) {
return plan;
}

Expand Down
Loading

0 comments on commit 80cd9f7

Please sign in to comment.