From 115ca1c50906004efcec5f53861adb2980d14165 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Thu, 24 Oct 2024 19:09:50 +0800 Subject: [PATCH 1/3] fix subquery --- .../IoTDBMultiIDsWithAttributesTableIT.java | 39 +++++++++++++++---- .../TableAggregationTableScanOperator.java | 36 +++++++++++------ ...ableModelStatementMemorySourceVisitor.java | 6 +-- .../plan/planner/OperatorTreeGenerator.java | 2 +- .../planner/plan/node/PlanGraphPrinter.java | 2 +- .../plan/relational/analyzer/Analysis.java | 12 ++++++ .../TableDistributedPlanGenerator.java | 24 ++++++++---- .../PushPredicateIntoTableScan.java | 8 +++- 8 files changed, 94 insertions(+), 35 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index ec659eddb426..5f1c6e2437c8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -899,14 +899,15 @@ public void groupByDateBinTest() { }; // TODO(beyyes) test below - // sql = "select count(*) from (\n" + - // "\tselect device, level, date_bin(1d, time) as bin, \n" + - // "\tcount(num) as count_num, count(*) as count_star, count(device) as count_device, - // count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2, - // count(time) as count_time, avg(num) as avg_num \n" + - // "\tfrom table0 \n" + - // "\tgroup by 3, device, level order by device, level, bin\n" + - // ")\n"; + // sql = "select count(*) from (\n" + + // "\tselect device, level, date_bin(1d, time) as bin, \n" + + // "\tcount(num) as count_num, count(*) as count_star, count(device) as + // count_device, + // count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2, + // count(time) as count_time, avg(num) as avg_num \n" + + // "\tfrom table0 \n" + + // "\tgroup by 3, device, level order by device, level, bin\n" + + // ")\n"; } @Test @@ -1004,6 +1005,23 @@ public void aggregationNoDataTest() { + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, sum(num) as sum_num," + "avg(num) as avg_num from table0 where time=32 or time=1971-04-27T01:46:40.000+08:00 group by 3, device, level order by device, level"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + // not exist device test + expectedHeader = buildHeaders(3); + sql = "select count(*), count(num), sum(num) from table0 where device='d_not_exist'"; + retArray = new String[] {"0,0,null,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + // not exist time range test + sql = "select count(*), count(num), sum(num) from table0 where time>2100-04-26T18:01:40.000"; + retArray = new String[] {"0,0,null,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + // only one device in this time + expectedHeader = buildHeaders(2); + sql = "select count(num),sum(num) from table1 where time=0"; + retArray = new String[] {"2,6.0,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } @Test @@ -1150,6 +1168,11 @@ public void lastByFirstByTest() { "1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,", }; tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + + expectedHeader = buildHeaders(3); + sql = "select last_by(time,num+1),last_by(num+1,time),last_by(num+1,floatnum+1) from table0"; + retArray = new String[] {"1971-08-20T11:33:20.000Z,16,16,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index a96240f62b75..51a42e4ce3ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -40,6 +40,7 @@ import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.file.metadata.statistics.StringStatistics; import org.apache.tsfile.read.common.TimeRange; @@ -55,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -170,7 +172,7 @@ public TableAggregationTableScanOperator( this.maxReturnSize = maxReturnSize; this.maxTsBlockLineNum = maxTsBlockLineNum; - this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); + constructAlignedSeriesScanUtil(); } @Override @@ -253,17 +255,28 @@ private TsBlock buildResultTsBlock() { return resultTsBlock; } - private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) { + private void constructAlignedSeriesScanUtil() { + DeviceEntry deviceEntry; + + if (this.deviceEntries.size() <= this.currentDeviceIndex + || this.deviceEntries.get(this.currentDeviceIndex) == null) { + // for device which not exist + deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList()); + } else { + deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); + } + AlignedFullPath alignedPath = constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas); - return new AlignedSeriesScanUtil( - alignedPath, - scanOrder, - seriesScanOptions, - operatorContext.getInstanceContext(), - true, - measurementColumnTSDataTypes); + this.seriesScanUtil = + new AlignedSeriesScanUtil( + alignedPath, + scanOrder, + seriesScanOptions, + operatorContext.getInstanceContext(), + true, + measurementColumnTSDataTypes); } /** Return true if we have the result of this timeRange. */ @@ -313,7 +326,7 @@ && readAndCalcFromFile()) { if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device - this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); + constructAlignedSeriesScanUtil(); queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); } @@ -790,8 +803,7 @@ private void checkIfAllAggregatorHasFinalResult() { if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device - this.seriesScanUtil = - constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); + constructAlignedSeriesScanUtil(); queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java index ec603a90e836..e87f2d24a2be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java @@ -75,9 +75,9 @@ public StatementMemorySource visitExplain( symbolAllocator, NOOP) .plan(context.getAnalysis()); - if (context.getAnalysis().isEmptyDataSource()) { - return new StatementMemorySource(new TsBlock(0), header); - } + // if (context.getAnalysis().isEmptyDataSource()) { + // return new StatementMemorySource(new TsBlock(0), header); + // } // Generate table model distributed plan final TableDistributedPlanGenerator.PlanContext planContext = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 5dd0c61bd751..3f70b46398c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -671,7 +671,7 @@ public Operator visitSeriesAggregationScan( boolean canUseStatistics = !TSDataType.BLOB.equals(node.getSeriesPath().getSeriesType()) || (aggregationDescriptors.stream() - .noneMatch(o -> !judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB))); + .allMatch(o -> judgeCanUseStatistics(o.getAggregationType(), TSDataType.BLOB))); SeriesAggregationScanOperator aggregateScanOperator = new SeriesAggregationScanOperator( node.getPlanNodeId(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 84b289125694..d33f0b244814 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -696,7 +696,7 @@ public List visitAggregationTableScan( String.format( "RegionId: %s", node.getRegionReplicaSet() == null || node.getRegionReplicaSet().getRegionId() == null - ? "" + ? "Not Assigned" : node.getRegionReplicaSet().getRegionId().getId())); return render(node, boxValue, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 32f0e5aec920..1b34213f96c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; @@ -68,8 +69,10 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Streams; import com.google.errorprone.annotations.Immutable; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.TimeDuration; import javax.annotation.Nullable; @@ -809,6 +812,15 @@ public void addEndPointToRedirectNodeList(TEndPoint endPoint) { redirectNodeList.add(endPoint); } + public List getDataRegionReplicaSetWithTimeFilter( + String database, IDeviceID deviceId, Filter timeFilter) { + if (dataPartition == null) { + return Collections.singletonList(new TRegionReplicaSet()); + } else { + return dataPartition.getDataRegionReplicaSetWithTimeFilter(database, deviceId, timeFilter); + } + } + @Override public TimePredicate getCovertedTimePredicate() { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 848dbf4c917b..7b35709522e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -571,17 +571,21 @@ public List visitAggregationTableScan( List> regionReplicaSetsList = new ArrayList<>(); for (DeviceEntry deviceEntry : node.getDeviceEntries()) { List regionReplicaSets = - analysis - .getDataPartitionInfo() - .getDataRegionReplicaSetWithTimeFilter( - node.getQualifiedObjectName().getDatabaseName(), - deviceEntry.getDeviceID(), - node.getTimeFilter()); + analysis.getDataRegionReplicaSetWithTimeFilter( + node.getQualifiedObjectName().getDatabaseName(), + deviceEntry.getDeviceID(), + node.getTimeFilter()); if (regionReplicaSets.size() > 1) { needSplit = true; } regionReplicaSetsList.add(regionReplicaSets); } + + if (regionReplicaSetsList.isEmpty()) { + regionReplicaSetsList = + Collections.singletonList(Collections.singletonList(new TRegionReplicaSet())); + } + // Step is SINGLE, has date_bin(time) and device data in more than one region, we need to split // this node into two-stage Aggregation needSplit = needSplit && node.getProjection() != null && node.getStep() == SINGLE; @@ -620,7 +624,9 @@ public List visitAggregationTableScan( scanNode.setRegionReplicaSet(regionReplicaSet); return scanNode; }); - aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); + if (node.getDeviceEntries().get(i) != null) { + aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); + } } } } else { @@ -653,7 +659,9 @@ public List visitAggregationTableScan( scanNode.setRegionReplicaSet(regionReplicaSet); return scanNode; }); - aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); + if (node.getDeviceEntries().size() > i && node.getDeviceEntries().get(i) != null) { + aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 9db2f87cfd94..e3d9e6b3f847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -483,7 +483,9 @@ private void getDeviceEntriesWithDataPartitions( .recordPlanCost(TABLE_TYPE, SCHEMA_FETCHER, System.nanoTime() - startTime); if (deviceEntries.isEmpty()) { - analysis.setFinishQueryAfterAnalyze(); + if (!analysis.hasAggregates()) { + analysis.setFinishQueryAfterAnalyze(); + } analysis.setEmptyDataSource(true); } else { Filter timeFilter = @@ -506,7 +508,9 @@ private void getDeviceEntriesWithDataPartitions( } if (dataPartition.getDataPartitionMap().isEmpty()) { - analysis.setFinishQueryAfterAnalyze(); + if (!analysis.hasAggregates()) { + analysis.setFinishQueryAfterAnalyze(); + } analysis.setEmptyDataSource(true); } else { analysis.upsertDataPartition(dataPartition); From 2f007c6142de6f8331c816f9b280be3aeace8595 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Thu, 24 Oct 2024 19:48:45 +0800 Subject: [PATCH 2/3] buildRegionNodeMap --- .../IoTDBMultiIDsWithAttributesTableIT.java | 10 +- .../TableAggregationTableScanOperator.java | 5 +- .../TableDistributedPlanGenerator.java | 118 +++++++----------- .../PushPredicateIntoTableScan.java | 2 + 4 files changed, 58 insertions(+), 77 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 5f1c6e2437c8..c4a04273413c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -1006,18 +1006,22 @@ public void aggregationNoDataTest() { + "avg(num) as avg_num from table0 where time=32 or time=1971-04-27T01:46:40.000+08:00 group by 3, device, level order by device, level"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); - // not exist device test + // queried device is not exist expectedHeader = buildHeaders(3); sql = "select count(*), count(num), sum(num) from table0 where device='d_not_exist'"; retArray = new String[] {"0,0,null,"}; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + sql = + "select count(*), count(num), sum(num) from table0 where device='d_not_exist1' or device='d_not_exist2'"; + retArray = new String[] {"0,0,null,"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); - // not exist time range test + // no data in given time range sql = "select count(*), count(num), sum(num) from table0 where time>2100-04-26T18:01:40.000"; retArray = new String[] {"0,0,null,"}; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); - // only one device in this time + // only one device has data in queried time expectedHeader = buildHeaders(2); sql = "select count(num),sum(num) from table1 where time=0"; retArray = new String[] {"2,6.0,"}; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 51a42e4ce3ad..e5b3e2b1b3f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -258,9 +258,8 @@ private TsBlock buildResultTsBlock() { private void constructAlignedSeriesScanUtil() { DeviceEntry deviceEntry; - if (this.deviceEntries.size() <= this.currentDeviceIndex - || this.deviceEntries.get(this.currentDeviceIndex) == null) { - // for device which not exist + if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) { + // for device which is not exist deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList()); } else { deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7b35709522e3..925bc23fb2ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -566,7 +566,6 @@ public List visitAggregation(AggregationNode node, PlanContext context public List visitAggregationTableScan( AggregationTableScanNode node, PlanContext context) { - Map tableScanNodeMap = new HashMap<>(); boolean needSplit = false; List> regionReplicaSetsList = new ArrayList<>(); for (DeviceEntry deviceEntry : node.getDeviceEntries()) { @@ -586,6 +585,7 @@ public List visitAggregationTableScan( Collections.singletonList(Collections.singletonList(new TRegionReplicaSet())); } + Map regionNodeMap = new HashMap<>(); // Step is SINGLE, has date_bin(time) and device data in more than one region, we need to split // this node into two-stage Aggregation needSplit = needSplit && node.getProjection() != null && node.getStep() == SINGLE; @@ -595,82 +595,15 @@ public List visitAggregationTableScan( split(node, symbolAllocator, queryId); finalAggregation = splitResult.left; AggregationTableScanNode partialAggregation = splitResult.right; - for (int i = 0; i < regionReplicaSetsList.size(); i++) { - for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) { - AggregationTableScanNode aggregationTableScanNode = - tableScanNodeMap.computeIfAbsent( - regionReplicaSet, - k -> { - AggregationTableScanNode scanNode = - new AggregationTableScanNode( - queryId.genPlanNodeId(), - partialAggregation.getQualifiedObjectName(), - partialAggregation.getOutputSymbols(), - partialAggregation.getAssignments(), - new ArrayList<>(), - partialAggregation.getIdAndAttributeIndexMap(), - partialAggregation.getScanOrder(), - partialAggregation.getTimePredicate().orElse(null), - partialAggregation.getPushDownPredicate(), - partialAggregation.getPushDownLimit(), - partialAggregation.getPushDownOffset(), - partialAggregation.isPushLimitToEachDevice(), - partialAggregation.getProjection(), - partialAggregation.getAggregations(), - partialAggregation.getGroupingSets(), - partialAggregation.getPreGroupedSymbols(), - partialAggregation.getStep(), - partialAggregation.getGroupIdSymbol()); - scanNode.setRegionReplicaSet(regionReplicaSet); - return scanNode; - }); - if (node.getDeviceEntries().get(i) != null) { - aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); - } - } - } + buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, partialAggregation); } else { - for (int i = 0; i < regionReplicaSetsList.size(); i++) { - for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) { - AggregationTableScanNode aggregationTableScanNode = - tableScanNodeMap.computeIfAbsent( - regionReplicaSet, - k -> { - AggregationTableScanNode scanNode = - new AggregationTableScanNode( - queryId.genPlanNodeId(), - node.getQualifiedObjectName(), - node.getOutputSymbols(), - node.getAssignments(), - new ArrayList<>(), - node.getIdAndAttributeIndexMap(), - node.getScanOrder(), - node.getTimePredicate().orElse(null), - node.getPushDownPredicate(), - node.getPushDownLimit(), - node.getPushDownOffset(), - node.isPushLimitToEachDevice(), - node.getProjection(), - node.getAggregations(), - node.getGroupingSets(), - node.getPreGroupedSymbols(), - node.getStep(), - node.getGroupIdSymbol()); - scanNode.setRegionReplicaSet(regionReplicaSet); - return scanNode; - }); - if (node.getDeviceEntries().size() > i && node.getDeviceEntries().get(i) != null) { - aggregationTableScanNode.appendDeviceEntry(node.getDeviceEntries().get(i)); - } - } - } + buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node); } List resultTableScanNodeList = new ArrayList<>(); TRegionReplicaSet mostUsedDataRegion = null; int maxDeviceEntrySizeOfTableScan = 0; - for (Map.Entry entry : - tableScanNodeMap.entrySet()) { + for (Map.Entry entry : regionNodeMap.entrySet()) { TRegionReplicaSet regionReplicaSet = entry.getKey(); TableScanNode subTableScanNode = entry.getValue(); subTableScanNode.setPlanNodeId(queryId.genPlanNodeId()); @@ -706,6 +639,49 @@ public List visitAggregationTableScan( return resultTableScanNodeList; } + private void buildRegionNodeMap( + AggregationTableScanNode originalAggTableScanNode, + List> regionReplicaSetsList, + Map regionNodeMap, + AggregationTableScanNode partialAggTableScanNode) { + for (int i = 0; i < regionReplicaSetsList.size(); i++) { + for (TRegionReplicaSet regionReplicaSet : regionReplicaSetsList.get(i)) { + AggregationTableScanNode aggregationTableScanNode = + regionNodeMap.computeIfAbsent( + regionReplicaSet, + k -> { + AggregationTableScanNode scanNode = + new AggregationTableScanNode( + queryId.genPlanNodeId(), + partialAggTableScanNode.getQualifiedObjectName(), + partialAggTableScanNode.getOutputSymbols(), + partialAggTableScanNode.getAssignments(), + new ArrayList<>(), + partialAggTableScanNode.getIdAndAttributeIndexMap(), + partialAggTableScanNode.getScanOrder(), + partialAggTableScanNode.getTimePredicate().orElse(null), + partialAggTableScanNode.getPushDownPredicate(), + partialAggTableScanNode.getPushDownLimit(), + partialAggTableScanNode.getPushDownOffset(), + partialAggTableScanNode.isPushLimitToEachDevice(), + partialAggTableScanNode.getProjection(), + partialAggTableScanNode.getAggregations(), + partialAggTableScanNode.getGroupingSets(), + partialAggTableScanNode.getPreGroupedSymbols(), + partialAggTableScanNode.getStep(), + partialAggTableScanNode.getGroupIdSymbol()); + scanNode.setRegionReplicaSet(regionReplicaSet); + return scanNode; + }); + if (originalAggTableScanNode.getDeviceEntries().size() > i + && originalAggTableScanNode.getDeviceEntries().get(i) != null) { + aggregationTableScanNode.appendDeviceEntry( + originalAggTableScanNode.getDeviceEntries().get(i)); + } + } + } + } + private static OrderingScheme constructOrderingSchema(List symbols) { Map orderings = new HashMap<>(); symbols.forEach(symbol -> orderings.put(symbol, SortOrder.ASC_NULLS_LAST)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index e3d9e6b3f847..7e5509f64c32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -484,6 +484,7 @@ private void getDeviceEntriesWithDataPartitions( if (deviceEntries.isEmpty()) { if (!analysis.hasAggregates()) { + // no device entries, queries(except aggregation) can be finished analysis.setFinishQueryAfterAnalyze(); } analysis.setEmptyDataSource(true); @@ -509,6 +510,7 @@ private void getDeviceEntriesWithDataPartitions( if (dataPartition.getDataPartitionMap().isEmpty()) { if (!analysis.hasAggregates()) { + // no data partitions, queries(except aggregation) can be finished analysis.setFinishQueryAfterAnalyze(); } analysis.setEmptyDataSource(true); From e66de62fbdad6e095092cb1c9ecf099085288e0d Mon Sep 17 00:00:00 2001 From: Beyyes Date: Thu, 24 Oct 2024 23:02:12 +0800 Subject: [PATCH 3/3] fix aggregators --- .../db/queryengine/plan/relational/analyzer/Analysis.java | 5 +++-- .../planner/optimizations/PushAggregationIntoTableScan.java | 2 +- .../planner/optimizations/PushPredicateIntoTableScan.java | 4 ++-- .../optimizations/TransformAggregationToStreamable.java | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 1b34213f96c4..76b80d842eb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -335,8 +335,9 @@ public List getAggregates(QuerySpecification query) { return aggregates.get(NodeRef.of(query)); } - public boolean hasAggregates() { - return !aggregates.isEmpty(); + public boolean noAggregates() { + return aggregates.isEmpty() + || (aggregates.size() == 1 && aggregates.entrySet().iterator().next().getValue().isEmpty()); } public void setOrderByAggregates(OrderBy node, List aggregates) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index 07b519220d58..18940ea005f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -56,7 +56,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { if (!(context.getAnalysis().getStatement() instanceof Query) - || !context.getAnalysis().hasAggregates()) { + || context.getAnalysis().noAggregates()) { return plan; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 7e5509f64c32..8e9296b7f488 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -483,7 +483,7 @@ private void getDeviceEntriesWithDataPartitions( .recordPlanCost(TABLE_TYPE, SCHEMA_FETCHER, System.nanoTime() - startTime); if (deviceEntries.isEmpty()) { - if (!analysis.hasAggregates()) { + if (analysis.noAggregates()) { // no device entries, queries(except aggregation) can be finished analysis.setFinishQueryAfterAnalyze(); } @@ -509,7 +509,7 @@ private void getDeviceEntriesWithDataPartitions( } if (dataPartition.getDataPartitionMap().isEmpty()) { - if (!analysis.hasAggregates()) { + if (analysis.noAggregates()) { // no data partitions, queries(except aggregation) can be finished analysis.setFinishQueryAfterAnalyze(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java index 006fbb4c87d0..1ca5e2bbcd23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java @@ -50,7 +50,7 @@ public class TransformAggregationToStreamable implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { if (!(context.getAnalysis().getStatement() instanceof Query) - || !context.getAnalysis().hasAggregates()) { + || context.getAnalysis().noAggregates()) { return plan; }