diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationFunctionDistinctIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationFunctionDistinctIT.java new file mode 100644 index 000000000000..8f9b5c1eac4c --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationFunctionDistinctIT.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; + +/** + * In this Class, we construct the scenario using DistinctAccumulator for + * AggregationFunctionDistinct. Other cases are covered by {@link IoTDBTableAggregationIT}. + */ +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBTableAggregationFunctionDistinctIT { + + private static final String DATABASE_NAME = "test"; + private static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE table1(device_id STRING TAG, s1 INT32 FIELD, s2 INT64 FIELD, s3 FLOAT FIELD, s4 DOUBLE FIELD, s5 BOOLEAN FIELD, s6 TEXT FIELD, s7 STRING FIELD, s8 BLOB FIELD, s9 TIMESTAMP FIELD, s10 DATE FIELD)", + "INSERT INTO table1(time,device_id,s1,s3,s6,s8,s9) values (2024-09-24T06:15:30.000+00:00,'d01',30,30.0,'shanghai_huangpu_red_A_d01_30', X'cafebabe30',2024-09-24T06:15:30.000+00:00)", + "INSERT INTO table1(time,device_id,s2,s3,s4,s6,s7,s9,s10) values (2024-09-24T06:15:35.000+00:00,'d01',35000,35.0,35.0,'shanghai_huangpu_red_A_d01_35','shanghai_huangpu_red_A_d01_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')", + "INSERT INTO table1(time,device_id,s1,s3,s5,s7,s9) values (2024-09-24T06:15:40.000+00:00,'d01',40,40.0,true,'shanghai_huangpu_red_A_d01_40',2024-09-24T06:15:40.000+00:00)", + "INSERT INTO table1(time,device_id,s2,s5,s9,s10) values (2024-09-24T06:15:50.000+00:00,'d01',50000,false,2024-09-24T06:15:50.000+00:00,'2024-09-24')", + "INSERT INTO table1(time,device_id,s1,s2,s3,s4,s6,s7,s9,s10) values (2024-09-24T06:15:35.000+00:00,'d01',30,35000,35.0,35.0,'shanghai_huangpu_red_A_d01_35','shanghai_huangpu_red_A_d01_35',2024-09-24T06:15:35.000+00:00,'2024-09-24')", + "FLUSH", + "CLEAR ATTRIBUTE CACHE", + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(128 * 1024); + EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 * 1024); + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void countDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = new String[] {"d01,2,2,3,1,2,2,2,1,4,1,"}; + + tableResultSetEqualTest( + "select device_id, count(distinct s1), count(distinct s2), count(distinct s3), count(distinct s4), count(distinct s5), count(distinct s6), count(distinct s7), count(distinct s8), count(distinct s9), count(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void countIfDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = new String[] {"d01,0,1,1,1,1,1,1,1,1,1,"}; + tableResultSetEqualTest( + "select device_id, count_if(distinct s1 < 0), count_if(distinct s2 is not null), count_if(distinct s3 is not null), count_if(distinct s4 is not null), count_if(distinct s5 is not null), count_if(distinct s6 is not null), count_if(distinct s7 is not null), count_if(distinct s8 is not null), count_if(distinct s9 is not null), count_if(distinct s10 is not null) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void avgDistinctTest() { + String[] expectedHeader = new String[] {"device_id", "_col1", "_col2", "_col3", "_col4"}; + String[] retArray = + new String[] { + "d01,35.0,42500.0,35.0,35.0,", + }; + tableResultSetEqualTest( + "select device_id, avg(distinct s1), avg(distinct s2), avg(distinct s3), avg(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void sumDistinctTest() { + String[] expectedHeader = new String[] {"device_id", "_col1", "_col2", "_col3", "_col4"}; + String[] retArray = new String[] {"d01,70.0,85000.0,105.0,35.0,"}; + tableResultSetEqualTest( + "select device_id, sum(distinct s1), sum(distinct s2), sum(distinct s3), sum(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void minDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10", + "_col11" + }; + String[] retArray = + new String[] { + "d01,d01,30,35000,30.0,35.0,false,shanghai_huangpu_red_A_d01_30,shanghai_huangpu_red_A_d01_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select device_id, min(distinct device_id), min(distinct s1), min(distinct s2), min(distinct s3), min(distinct s4), min(distinct s5), min(distinct s6), min(distinct s7), min(distinct s8), min(distinct s9), min(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void minByDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8" + }; + String[] retArray = + new String[] { + "d01,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,", + }; + + tableResultSetEqualTest( + "select device_id, min_by(distinct time, s1), min_by(distinct time, s2), min_by(distinct time, s3), min_by(distinct time, s4), min_by(distinct time, s5), min_by(distinct time, s6), min_by(distinct time, s9), min_by(distinct time, s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void maxDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10", + "_col11" + }; + String[] retArray = + new String[] { + "d01,d01,40,50000,40.0,35.0,true,shanghai_huangpu_red_A_d01_35,shanghai_huangpu_red_A_d01_40,0xcafebabe30,2024-09-24T06:15:50.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select device_id,max(distinct device_id), max(distinct s1), max(distinct s2), max(distinct s3), max(distinct s4), max(distinct s5), max(distinct s6), max(distinct s7), max(distinct s8), max(distinct s9), max(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void maxByDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8" + }; + String[] retArray = + new String[] { + "d01,2024-09-24T06:15:40.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:40.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:40.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:35.000Z,", + }; + + tableResultSetEqualTest( + "select device_id, max_by(distinct time, s1), max_by(distinct time, s2), max_by(distinct time, s3), max_by(distinct time, s4), max_by(distinct time, s5), max_by(distinct time, s6), max_by(distinct time, s9), max_by(distinct time, s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void firstDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = + new String[] { + "d01,30,35000,30.0,35.0,true,shanghai_huangpu_red_A_d01_30,shanghai_huangpu_red_A_d01_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select device_id, first(distinct s1), first(distinct s2), first(distinct s3), first(distinct s4), first(distinct s5), first(distinct s6), first(distinct s7), first(distinct s8), first(distinct s9), first(distinct s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void firstByDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = + new String[] { + "d01,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:40.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:35.000Z," + }; + tableResultSetEqualTest( + "select device_id, first_by(distinct time, s1), first_by(distinct time, s2), first_by(distinct time, s3), first_by(distinct time, s4), first_by(distinct time, s5), first_by(distinct time, s6), first_by(distinct time, s7), first_by(distinct time, s8), first_by(distinct time, s9), first_by(distinct time, s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void lastDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = + new String[] { + "d01,40,50000,40.0,35.0,false,shanghai_huangpu_red_A_d01_35,shanghai_huangpu_red_A_d01_40,0xcafebabe30,2024-09-24T06:15:50.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select device_id, last(distinct s1), last(distinct s2), last(distinct s3), last(distinct s4), last(distinct s5), last(distinct s6), last(distinct s7), last(distinct s8), last(distinct s9), last(distinct s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void lastByDistinctTest() { + String[] expectedHeader = + new String[] { + "device_id", + "_col1", + "_col2", + "_col3", + "_col4", + "_col5", + "_col6", + "_col7", + "_col8", + "_col9", + "_col10" + }; + String[] retArray = + new String[] { + "d01,2024-09-24T06:15:40.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:40.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:35.000Z,2024-09-24T06:15:40.000Z,2024-09-24T06:15:30.000Z,2024-09-24T06:15:50.000Z,2024-09-24T06:15:35.000Z," + }; + tableResultSetEqualTest( + "select device_id, last_by(distinct time, s1), last_by(distinct time, s2), last_by(distinct time, s3), last_by(distinct time, s4), last_by(distinct time, s5), last_by(distinct time, s6), last_by(distinct time, s7), last_by(distinct time, s8), last_by(distinct time, s9), first_by(distinct time, s10) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void extremeDistinctTest() { + String[] expectedHeader = new String[] {"device_id", "_col1", "_col2", "_col3", "_col4"}; + String[] retArray = new String[] {"d01,40,50000,40.0,35.0,"}; + tableResultSetEqualTest( + "select device_id, extreme(distinct s1), extreme(distinct s2), extreme(distinct s3), extreme(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void varianceDistinctTest() { + String[] expectedHeader = new String[] {"device_id", "_col1", "_col2", "_col3", "_col4"}; + String[] retArray = new String[] {"d01,25.0,5.625E7,16.7,0.0,"}; + tableResultSetEqualTest( + "select device_id, round(VAR_POP(distinct s1),1), round(VAR_POP(distinct s2),1), round(VAR_POP(distinct s3),1), round(VAR_POP(distinct s4),1) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java index 25dc1e29f57b..0e1157b92384 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java @@ -4220,6 +4220,7 @@ public void exceptionTest1() { // ================================================================== @Test public void countDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", @@ -4265,10 +4266,30 @@ public void countDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,5,5,5,5,2,6,8,5,10,1,", + "haidian,5,5,5,5,2,6,8,5,10,1,", + "huangpu,5,5,5,5,2,6,8,5,10,1,", + "pudong,5,5,5,5,2,6,8,5,10,1," + }; + tableResultSetEqualTest( + "select region, count(distinct s1), count(distinct s2), count(distinct s3), count(distinct s4), count(distinct s5), count(distinct s6), count(distinct s7), count(distinct s8), count(distinct s9), count(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void countIfDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", @@ -4315,10 +4336,31 @@ public void countIfDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,0,1,1,1,1,1,1,1,1,1,", + "haidian,0,1,1,1,1,1,1,1,1,1,", + "huangpu,0,1,1,1,1,1,1,1,1,1,", + "pudong,0,1,1,1,1,1,1,1,1,1," + }; + tableResultSetEqualTest( + "select region, count_if(distinct s1 < 0), count_if(distinct s2 is not null), count_if(distinct s3 is not null), count_if(distinct s4 is not null), count_if(distinct s5 is not null), count_if(distinct s6 is not null), count_if(distinct s7 is not null), count_if(distinct s8 is not null), count_if(distinct s9 is not null), count_if(distinct s10 is not null) " + + "from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void avgDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] {"_col0", "_col1", "_col2", "_col3"}; String[] retArray = new String[] { @@ -4346,10 +4388,26 @@ public void avgDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = new String[] {"region", "_col1", "_col2", "_col3", "_col4"}; + retArray = + new String[] { + "chaoyang,40.4,40400.0,39.4,42.4,", + "haidian,40.4,40400.0,39.4,42.4,", + "huangpu,40.4,40400.0,39.4,42.4,", + "pudong,40.4,40400.0,39.4,42.4," + }; + tableResultSetEqualTest( + "select region, avg(distinct s1), avg(distinct s2), avg(distinct s3), avg(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void sumDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] {"_col0", "_col1", "_col2", "_col3"}; String[] retArray = new String[] { @@ -4377,10 +4435,26 @@ public void sumDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = new String[] {"region", "_col1", "_col2", "_col3", "_col4"}; + retArray = + new String[] { + "chaoyang,202.0,202000.0,197.0,212.0,", + "haidian,202.0,202000.0,197.0,212.0,", + "huangpu,202.0,202000.0,197.0,212.0,", + "pudong,202.0,202000.0,197.0,212.0," + }; + tableResultSetEqualTest( + "select region, sum(distinct s1), sum(distinct s2), sum(distinct s3), sum(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void minDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", @@ -4427,17 +4501,36 @@ public void minDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10", "_col11" + }; + retArray = + new String[] { + "chaoyang,d09,30,31000,30.0,35.0,false,beijing_chaoyang_red_A_d09_30,beijing_chaoyang_red_A_d09_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24,", + "haidian,d13,30,31000,30.0,35.0,false,beijing_haidian_red_A_d13_30,beijing_haidian_red_A_d13_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24,", + "huangpu,d01,30,31000,30.0,35.0,false,shanghai_huangpu_red_A_d01_30,shanghai_huangpu_red_A_d01_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24,", + "pudong,d05,30,31000,30.0,35.0,false,shanghai_pudong_red_A_d05_30,shanghai_pudong_red_A_d05_35,0xcafebabe30,2024-09-24T06:15:30.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select region, min(distinct device_id), min(distinct s1), min(distinct s2), min(distinct s3), min(distinct s4), min(distinct s5), min(distinct s6), min(distinct s7), min(distinct s8), min(distinct s9), min(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void minByDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] {"_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7"}; String[] retArray = new String[] { "2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:36.000Z,", }; - // global Aggregation tableResultSetEqualTest( "select min_by(distinct time, s1), min_by(distinct time, s2), min_by(distinct time, s3), min_by(distinct time, s4), min_by(distinct time, s5), min_by(distinct time, s6), min_by(distinct time, s9), min_by(distinct time, s10) " + "from table1 where device_id='d11'", @@ -4454,7 +4547,8 @@ public void minByDistinctTest() { "d03,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:36.000Z,", "d11,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:36.000Z," }; - // group by Aggregation + + // test GroupByDistinctAccumulator tableResultSetEqualTest( "select device_id, min_by(distinct time, s1), min_by(distinct time, s2), min_by(distinct time, s3), min_by(distinct time, s4), min_by(distinct time, s5), min_by(distinct time, s6), min_by(distinct time, s9), min_by(distinct time, s10) " + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", @@ -4465,6 +4559,7 @@ public void minByDistinctTest() { @Test public void maxDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", @@ -4511,17 +4606,36 @@ public void maxDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10", "_col11" + }; + retArray = + new String[] { + "chaoyang,d12,55,50000,51.0,55.0,true,beijing_chaoyang_yellow_B_d12_55,beijing_chaoyang_yellow_B_d12_30,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24,", + "haidian,d16,55,50000,51.0,55.0,true,beijing_haidian_yellow_B_d16_55,beijing_haidian_yellow_B_d16_30,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24,", + "huangpu,d04,55,50000,51.0,55.0,true,shanghai_huangpu_yellow_B_d04_55,shanghai_huangpu_yellow_B_d04_30,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24,", + "pudong,d08,55,50000,51.0,55.0,true,shanghai_pudong_yellow_B_d08_55,shanghai_pudong_yellow_B_d08_30,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select region,max(distinct device_id), max(distinct s1), max(distinct s2), max(distinct s3), max(distinct s4), max(distinct s5), max(distinct s6), max(distinct s7), max(distinct s8), max(distinct s9), max(distinct s10) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void maxByDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] {"_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7"}; String[] retArray = new String[] { "2024-09-24T06:15:41.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:36.000Z,", }; - // global Aggregation tableResultSetEqualTest( "select max_by(distinct time, s1), max_by(distinct time, s2), max_by(distinct time, s3), max_by(distinct time, s4), max_by(distinct time, s5), max_by(distinct time, s6), max_by(distinct time, s9), max_by(distinct time, s10) " + "from table1 where device_id='d11'", @@ -4538,7 +4652,8 @@ public void maxByDistinctTest() { "d03,2024-09-24T06:15:41.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:36.000Z,", "d11,2024-09-24T06:15:41.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:36.000Z," }; - // group by Aggregation + + // test GroupByDistinctAccumulator tableResultSetEqualTest( "select device_id, max_by(distinct time, s1), max_by(distinct time, s2), max_by(distinct time, s3), max_by(distinct time, s4), max_by(distinct time, s5), max_by(distinct time, s6), max_by(distinct time, s9), max_by(distinct time, s10) " + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", @@ -4549,6 +4664,7 @@ public void maxByDistinctTest() { @Test public void firstDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9" @@ -4593,10 +4709,29 @@ public void firstDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,36,31000,41.0,36.0,false,beijing_chaoyang_yellow_A_d11_41,beijing_chaoyang_yellow_A_d11_36,0xcafebabe31,2024-09-24T06:15:31.000Z,2024-09-24,", + "huangpu,36,31000,41.0,36.0,false,shanghai_huangpu_yellow_A_d03_41,shanghai_huangpu_yellow_A_d03_36,0xcafebabe31,2024-09-24T06:15:31.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select region, first(distinct s1), first(distinct s2), first(distinct s3), first(distinct s4), first(distinct s5), first(distinct s6), first(distinct s7), first(distinct s8), first(distinct s9), first(distinct s10) " + + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void firstByDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9" @@ -4641,10 +4776,29 @@ public void firstByDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:36.000Z,", + "huangpu,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:36.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:31.000Z,2024-09-24T06:15:36.000Z,", + }; + tableResultSetEqualTest( + "select region, first_by(distinct time, s1), first_by(distinct time, s2), first_by(distinct time, s3), first_by(distinct time, s4), first_by(distinct time, s5), first_by(distinct time, s6), first_by(distinct time, s7), first_by(distinct time, s8), first_by(distinct time, s9), first_by(distinct time, s10) " + + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void lastDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9" @@ -4689,10 +4843,29 @@ public void lastDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,41,46000,51.0,46.0,false,beijing_chaoyang_yellow_A_d11_51,beijing_chaoyang_yellow_A_d11_46,0xcafebabe41,2024-09-24T06:15:51.000Z,2024-09-24,", + "huangpu,41,46000,51.0,46.0,false,shanghai_huangpu_yellow_A_d03_51,shanghai_huangpu_yellow_A_d03_46,0xcafebabe41,2024-09-24T06:15:51.000Z,2024-09-24," + }; + tableResultSetEqualTest( + "select region, last(distinct s1), last(distinct s2), last(distinct s3), last(distinct s4), last(distinct s5), last(distinct s6), last(distinct s7), last(distinct s8), last(distinct s9), last(distinct s10) " + + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void lastByDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] { "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9" @@ -4737,10 +4910,29 @@ public void lastByDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = + new String[] { + "region", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", + "_col10" + }; + retArray = + new String[] { + "chaoyang,2024-09-24T06:15:41.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:36.000Z,", + "huangpu,2024-09-24T06:15:41.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:46.000Z,2024-09-24T06:15:41.000Z,2024-09-24T06:15:51.000Z,2024-09-24T06:15:36.000Z,", + }; + tableResultSetEqualTest( + "select region, last_by(distinct time, s1), last_by(distinct time, s2), last_by(distinct time, s3), last_by(distinct time, s4), last_by(distinct time, s5), last_by(distinct time, s6), last_by(distinct time, s7), last_by(distinct time, s8), last_by(distinct time, s9), first_by(distinct time, s10) " + + "from table1 where device_id='d11' or device_id='d03' group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test public void extremeDistinctTest() { + // test MarkDistinct String[] expectedHeader = new String[] {"_col0", "_col1", "_col2", "_col3"}; String[] retArray = new String[] { @@ -4768,6 +4960,21 @@ public void extremeDistinctTest() { expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = new String[] {"region", "_col1", "_col2", "_col3", "_col4"}; + retArray = + new String[] { + "chaoyang,55,50000,51.0,55.0,", + "haidian,55,50000,51.0,55.0,", + "huangpu,55,50000,51.0,55.0,", + "pudong,55,50000,51.0,55.0," + }; + tableResultSetEqualTest( + "select region, extreme(distinct s1), extreme(distinct s2), extreme(distinct s3), extreme(distinct s4) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test @@ -4778,7 +4985,6 @@ public void varianceDistinctTest() { new String[] { "68.2,4.824E7,49.0,54.6,", }; - // global Aggregation tableResultSetEqualTest( "select round(VAR_POP(distinct s1),1), round(VAR_POP(distinct s2),1), round(VAR_POP(distinct s3),1), round(VAR_POP(distinct s4),1) from table1", expectedHeader, @@ -4794,12 +5000,26 @@ public void varianceDistinctTest() { "shanghai,shanghai,huangpu,68.2,4.824E7,49.0,54.6,", "shanghai,shanghai,pudong,68.2,4.824E7,49.0,54.6," }; - // group by Aggregation tableResultSetEqualTest( "select province,city,region, round(VAR_POP(distinct s1),1), round(VAR_POP(distinct s2),1), round(VAR_POP(distinct s3),1), round(VAR_POP(distinct s4),1) from table1 group by 1,2,3 order by 1,2,3", expectedHeader, retArray, DATABASE_NAME); + + // test GroupByDistinctAccumulator + expectedHeader = new String[] {"region", "_col1", "_col2", "_col3", "_col4"}; + retArray = + new String[] { + "chaoyang,68.2,4.824E7,49.0,54.6,", + "haidian,68.2,4.824E7,49.0,54.6,", + "huangpu,68.2,4.824E7,49.0,54.6,", + "pudong,68.2,4.824E7,49.0,54.6," + }; + tableResultSetEqualTest( + "select region, round(VAR_POP(distinct s1),1), round(VAR_POP(distinct s2),1), round(VAR_POP(distinct s3),1), round(VAR_POP(distinct s4),1) from table1 group by 1 order by 1", + expectedHeader, + retArray, + DATABASE_NAME); } @Test @@ -4843,6 +5063,35 @@ public void mixedTest() { DATABASE_NAME); } + @Test + public void singleInputDistinctAggregationTest() { + String[] expectedHeader = new String[] {"_col0", "_col1"}; + String[] retArray = + new String[] { + "5,40.4,", + }; + + tableResultSetEqualTest( + "select count(distinct s1), avg(distinct s1) from table1", + expectedHeader, + retArray, + DATABASE_NAME); + + expectedHeader = new String[] {"province", "city", "region", "_col3", "_col4"}; + retArray = + new String[] { + "beijing,beijing,chaoyang,5,40.4,", + "beijing,beijing,haidian,5,40.4,", + "shanghai,shanghai,huangpu,5,40.4,", + "shanghai,shanghai,pudong,5,40.4," + }; + tableResultSetEqualTest( + "select province,city,region,count(distinct s1), avg(distinct s1) from table1 group by 1,2,3 order by 1,2,3", + expectedHeader, + retArray, + DATABASE_NAME); + } + @Test public void exceptionTest2() { tableAssertTestFail( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MarkDistinctOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MarkDistinctOperator.java new file mode 100644 index 000000000000..4794f97c885d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MarkDistinctOperator.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.MarkDistinctHash; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.CURRENT_USED_MEMORY; + +public class MarkDistinctOperator implements ProcessOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(MarkDistinctOperator.class); + private final OperatorContext operatorContext; + + private final Operator child; + + private final MarkDistinctHash markDistinctHash; + private final int[] markDistinctChannels; + + private final MemoryReservationManager memoryReservationManager; + // memory already occupied by MarkDistinctHash + private long previousRetainedSize = 0; + + private final int maxResultLines = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + + public MarkDistinctOperator( + OperatorContext operatorContext, + Operator child, + List types, + List markDistinctChannels, + Optional hashChannel) { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.child = child; + + requireNonNull(hashChannel, "hashChannel is null"); + requireNonNull(markDistinctChannels, "markDistinctChannels is null"); + + ImmutableList.Builder distinctTypes = ImmutableList.builder(); + for (int channel : markDistinctChannels) { + distinctTypes.add(types.get(channel)); + } + if (hashChannel.isPresent()) { + this.markDistinctChannels = new int[markDistinctChannels.size() + 1]; + for (int i = 0; i < markDistinctChannels.size(); i++) { + this.markDistinctChannels[i] = markDistinctChannels.get(i); + } + this.markDistinctChannels[markDistinctChannels.size()] = hashChannel.get(); + } else { + this.markDistinctChannels = Ints.toArray(markDistinctChannels); + } + + this.markDistinctHash = + new MarkDistinctHash(distinctTypes.build(), hashChannel.isPresent(), UpdateMemory.NOOP); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture isBlocked() { + return child.isBlocked(); + } + + @Override + public TsBlock next() throws Exception { + TsBlock input = child.nextWithTimer(); + if (input == null) { + return null; + } + + Column[] markColumn = + new Column[] {markDistinctHash.markDistinctRows(input.getColumns(markDistinctChannels))}; + updateOccupiedMemorySize(); + return input.appendValueColumns(markColumn); + } + + private void updateOccupiedMemorySize() { + long memorySize = markDistinctHash.getEstimatedSize(); + operatorContext.recordSpecifiedInfo(CURRENT_USED_MEMORY, Long.toString(memorySize)); + long delta = memorySize - previousRetainedSize; + if (delta > 0) { + memoryReservationManager.reserveMemoryCumulatively(delta); + } else if (delta < 0) { + memoryReservationManager.releaseMemoryCumulatively(-delta); + } + previousRetainedSize = memorySize; + } + + @Override + public boolean hasNext() throws Exception { + return child.hasNextWithTimer(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public boolean isFinished() throws Exception { + return child.isFinished(); + } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory() + maxResultLines; + } + + @Override + public long calculateMaxReturnSize() { + // all positions are non-null, so size of each position in the mask column is 1 + return child.calculateMaxReturnSize() + maxResultLines; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return child.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java index f4e6472c9781..ea4095e418ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java @@ -76,6 +76,10 @@ public void processBlock(TsBlock block) { // Use select-all AggregationMask here because filter of Agg-Function is not supported now AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount()); + if (maskChannel.isPresent()) { + mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt())); + } + accumulator.addInput(arguments, mask); } else { accumulator.addIntermediate(arguments[0]); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/MarkDistinctHash.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/MarkDistinctHash.java index ef8a12fea31b..7001bdf056e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/MarkDistinctHash.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/MarkDistinctHash.java @@ -45,7 +45,7 @@ public MarkDistinctHash(List types, boolean hasPrecomputedHash, UpdateMemo } public long getEstimatedSize() { - return groupByHash.getEstimatedSize(); + return groupByHash.getEstimatedSize() + Long.BYTES; } public Column markDistinctRows(Column[] columns) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index e94136cf74c6..967345567b92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -82,6 +82,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MarkDistinctOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortFullOuterJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortInnerJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortSemiJoinOperator; @@ -140,6 +141,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; @@ -1764,12 +1766,17 @@ private TableAggregator buildAggregator( timeColumnName, aggregation.isDistinct()); + OptionalInt maskChannel = OptionalInt.empty(); + if (aggregation.hasMask()) { + maskChannel = OptionalInt.of(childLayout.get(aggregation.getMask().get())); + } + return new TableAggregator( accumulator, step, getTSDataType(typeProvider.getTableModelType(symbol)), argumentChannels, - OptionalInt.empty()); + maskChannel); } private Operator planGroupByAggregation( @@ -1940,12 +1947,17 @@ private GroupedAggregator buildGroupByAggregator( true, aggregation.isDistinct()); + OptionalInt maskChannel = OptionalInt.empty(); + if (aggregation.hasMask()) { + maskChannel = OptionalInt.of(childLayout.get(aggregation.getMask().get())); + } + return new GroupedAggregator( accumulator, step, getTSDataType(typeProvider.getTableModelType(symbol)), argumentChannels, - OptionalInt.empty()); + maskChannel); } @Override @@ -2418,4 +2430,29 @@ private boolean canUseLastCacheOptimize( return true; } + + @Override + public Operator visitMarkDistinct(MarkDistinctNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + ExplainAnalyzeOperator.class.getSimpleName()); + + TypeProvider typeProvider = context.getTypeProvider(); + Map childLayout = + makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); + + return new MarkDistinctOperator( + operatorContext, + child, + node.getChild().getOutputSymbols().stream() + .map(typeProvider::getTableModelType) + .collect(Collectors.toList()), + node.getDistinctSymbols().stream().map(childLayout::get).collect(Collectors.toList()), + Optional.empty()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 442d59f8be33..ea7c315504b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -65,12 +65,14 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -684,8 +686,18 @@ public List visitAggregation( int i = 0; for (org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation aggregation : node.getAggregations().values()) { - boxValue.add( - String.format("Aggregator-%d: %s", i++, aggregation.getResolvedFunction().toString())); + StringBuilder aggregator = + new StringBuilder( + String.format( + "Aggregator-%d: %s", i++, aggregation.getResolvedFunction().toString())); + if (aggregation.hasMask()) { + aggregator.append(String.format(" mask: %s", aggregation.getMask().get())); + } + + if (aggregation.isDistinct()) { + aggregator.append(" distinct: true"); + } + boxValue.add(aggregator.toString()); } boxValue.add(String.format("GroupingKeys: %s", node.getGroupingKeys())); if (node.isStreamable()) { @@ -707,8 +719,18 @@ public List visitAggregationTableScan( int i = 0; for (org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation aggregation : node.getAggregations().values()) { - boxValue.add( - String.format("Aggregator-%d: %s", i++, aggregation.getResolvedFunction().toString())); + StringBuilder aggregator = + new StringBuilder( + String.format( + "Aggregator-%d: %s", i++, aggregation.getResolvedFunction().toString())); + if (aggregation.hasMask()) { + aggregator.append(String.format(" mask: %s", aggregation.getMask().get())); + } + + if (aggregation.isDistinct()) { + aggregator.append(" distinct: true"); + } + boxValue.add(aggregator.toString()); } boxValue.add(String.format("GroupingKeys: %s", node.getGroupingKeys())); if (node.isStreamable()) { @@ -749,6 +771,17 @@ public List visitAggregationTableScan( return render(node, boxValue, context); } + @Override + public List visitMarkDistinct(MarkDistinctNode node, GraphContext context) { + List boxValue = new ArrayList<>(); + boxValue.add(String.format("MarkDistinct-%s", node.getPlanNodeId())); + boxValue.add(String.format("MarkerSymbol-%s", node.getMarkerSymbol())); + boxValue.add(String.format("DistinctSymbols-%s", node.getDistinctSymbols())); + Optional hashSymbol = node.getHashSymbol(); + hashSymbol.ifPresent(symbol -> boxValue.add(String.format("HashSymbol-%s", symbol))); + return render(node, boxValue, context); + } + @Override public List visitFilter( org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode node, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 3e86d189e1ec..d734ac7f858f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -121,6 +121,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; @@ -290,6 +291,7 @@ public enum PlanNodeType { TREE_ALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1023), TREE_NONALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1024), TABLE_SEMI_JOIN_NODE((short) 1025), + MARK_DISTINCT_NODE((short) 1026), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -657,6 +659,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { return TreeNonAlignedDeviceViewScanNode.deserialize(buffer); case 1025: return SemiJoinNode.deserialize(buffer); + case 1026: + return MarkDistinctNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index d04c57e77fbd..8660e6145407 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -126,6 +126,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -795,4 +796,8 @@ public R visitTreeAlignedDeviceViewScan(TreeAlignedDeviceViewScanNode node, C co public R visitTreeNonAlignedDeviceViewScan(TreeNonAlignedDeviceViewScanNode node, C context) { return visitTreeDeviceViewScan(node, context); } + + public R visitMarkDistinct(MarkDistinctNode node, C context) { + return visitSingleChildProcess(node, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index a90c07eaaf85..fb479eb44748 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -53,6 +53,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; @@ -701,8 +702,10 @@ public List visitAggregation(AggregationNode node, PlanContext context } // We cannot do multi-stage Aggregate if any aggregation-function is distinct. + // For Aggregation with mask, there is no need to do multi-stage Aggregate because the + // MarkDistinctNode will merge all data from different child. if (node.getAggregations().values().stream() - .anyMatch(AggregationNode.Aggregation::isDistinct)) { + .anyMatch(aggregation -> aggregation.isDistinct() || aggregation.hasMask())) { node.setChild( mergeChildrenViaCollectOrMergeSort( nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()), childrenNodes)); @@ -834,6 +837,18 @@ public List visitEnforceSingleRow(EnforceSingleRowNode node, PlanConte return Collections.singletonList(node); } + @Override + public List visitMarkDistinct(MarkDistinctNode node, PlanContext context) { + List childrenNodes = node.getChild().accept(this, context); + OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); + if (childOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); + } + + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } + private void buildRegionNodeMap( AggregationTableScanNode originalAggTableScanNode, List> regionReplicaSetsList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java new file mode 100644 index 000000000000..90914832dd25 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.aggregation; +import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; + +/** + * Implements distinct aggregations with different inputs by transforming plans of the following + * shape: + * + *
+ * - Aggregation
+ *        GROUP BY (k)
+ *        F1(DISTINCT a0, a1, ...)
+ *        F2(DISTINCT b0, b1, ...)
+ *        F3(c0, c1, ...)
+ *     - X
+ * 
+ * + * into + * + *
+ * - Aggregation
+ *        GROUP BY (k)
+ *        F1(a0, a1, ...) mask ($0)
+ *        F2(b0, b1, ...) mask ($1)
+ *        F3(c0, c1, ...)
+ *     - MarkDistinct (k, a0, a1, ...) -> $0
+ *          - MarkDistinct (k, b0, b1, ...) -> $1
+ *              - X
+ * 
+ */ +public class MultipleDistinctAggregationToMarkDistinct implements Rule { + private static final Pattern PATTERN = + aggregation() + .matching( + Predicates.and( + MultipleDistinctAggregationToMarkDistinct::hasNoDistinctWithFilterOrMask, + Predicates.or( + MultipleDistinctAggregationToMarkDistinct::hasMultipleDistincts, + MultipleDistinctAggregationToMarkDistinct::hasMixedDistinctAndNonDistincts))); + + private static boolean hasNoDistinctWithFilterOrMask(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .noneMatch( + aggregation -> + aggregation.isDistinct() + && (aggregation.getFilter().isPresent() || aggregation.getMask().isPresent())); + } + + private static boolean hasMultipleDistincts(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .filter(AggregationNode.Aggregation::isDistinct) + .map(AggregationNode.Aggregation::getArguments) + .map(HashSet::new) + .distinct() + .count() + > 1; + } + + private static boolean hasMixedDistinctAndNonDistincts(AggregationNode aggregationNode) { + long distincts = + aggregationNode.getAggregations().values().stream() + .filter(AggregationNode.Aggregation::isDistinct) + .count(); + + return distincts > 0 && distincts < aggregationNode.getAggregations().size(); + } + + @Override + public Pattern getPattern() { + return PATTERN; + } + + @Override + public Result apply(AggregationNode parent, Captures captures, Context context) { + if (!shouldAddMarkDistinct(parent, context)) { + return Result.empty(); + } + + // the distinct marker for the given set of input columns + Map, Symbol> markers = new HashMap<>(); + + Map newAggregations = new HashMap<>(); + PlanNode subPlan = parent.getChild(); + + for (Map.Entry entry : + parent.getAggregations().entrySet()) { + AggregationNode.Aggregation aggregation = entry.getValue(); + + if (aggregation.isDistinct() + && !aggregation.getFilter().isPresent() + && !aggregation.getMask().isPresent()) { + Set inputs = aggregation.getArguments().stream().map(Symbol::from).collect(toSet()); + + Symbol marker = markers.get(inputs); + if (marker == null) { + marker = + context + .getSymbolAllocator() + .newSymbol(Iterables.getLast(inputs).getName(), BOOLEAN, "distinct"); + markers.put(inputs, marker); + + ImmutableSet.Builder distinctSymbols = + ImmutableSet.builder().addAll(parent.getGroupingKeys()).addAll(inputs); + parent.getGroupIdSymbol().ifPresent(distinctSymbols::add); + + subPlan = + new MarkDistinctNode( + context.getIdAllocator().genPlanNodeId(), + subPlan, + marker, + ImmutableList.copyOf(distinctSymbols.build()), + Optional.empty()); + } + + // remove the distinct flag and set the distinct marker + newAggregations.put( + entry.getKey(), + new AggregationNode.Aggregation( + aggregation.getResolvedFunction(), + aggregation.getArguments(), + false, + aggregation.getFilter(), + aggregation.getOrderingScheme(), + Optional.of(marker))); + } else { + newAggregations.put(entry.getKey(), aggregation); + } + } + + return Result.ofPlanNode( + AggregationNode.builderFrom(parent) + .setSource(subPlan) + .setAggregations(newAggregations) + .setPreGroupedSymbols(ImmutableList.of()) + .build()); + } + + private boolean shouldAddMarkDistinct(AggregationNode aggregationNode, Context context) { + if (aggregationNode.getGroupingKeys().isEmpty()) { + // global distinct aggregation is computed using a single thread. MarkDistinct will help + // parallelize the execution. + return true; + } + if (aggregationNode.getGroupingKeys().size() > 1) { + // NDV stats for multiple grouping keys are unreliable, let's keep MarkDistinct for this case + // to avoid significant slowdown or OOM/too big hash table issues in case of + // overestimation of very small NDV with big number of distinct values inside the groups. + return true; + } + + return false; + } + + private static boolean hasSingleDistinctAndNonDistincts(AggregationNode aggregationNode) { + long distincts = + aggregationNode.getAggregations().values().stream() + .filter(AggregationNode.Aggregation::isDistinct) + .count(); + + return distincts == 1 && distincts < aggregationNode.getAggregations().size(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneMarkDistinctColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneMarkDistinctColumns.java new file mode 100644 index 000000000000..42a522c4e540 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneMarkDistinctColumns.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; + +import com.google.common.collect.Streams; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.markDistinct; + +public class PruneMarkDistinctColumns extends ProjectOffPushDownRule { + public PruneMarkDistinctColumns() { + super(markDistinct()); + } + + @Override + protected Optional pushDownProjectOff( + Context context, MarkDistinctNode markDistinctNode, Set referencedOutputs) { + if (!referencedOutputs.contains(markDistinctNode.getMarkerSymbol())) { + return Optional.of(markDistinctNode.getChild()); + } + + Set requiredInputs = + Streams.concat( + referencedOutputs.stream() + .filter(symbol -> !symbol.equals(markDistinctNode.getMarkerSymbol())), + markDistinctNode.getDistinctSymbols().stream(), + markDistinctNode.getHashSymbol().map(Stream::of).orElseGet(Stream::empty)) + .collect(toImmutableSet()); + + return restrictChildOutputs(context.getIdAllocator(), markDistinctNode, requiredInputs); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SingleDistinctAggregationToGroupBy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SingleDistinctAggregationToGroupBy.java new file mode 100644 index 000000000000..d9a342bb7167 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SingleDistinctAggregationToGroupBy.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.aggregation; + +/** + * Implements distinct aggregations with similar inputs by transforming plans of the following + * shape: + * + *
+ * - Aggregation
+ *        GROUP BY (k)
+ *        F1(DISTINCT s0, s1, ...),
+ *        F2(DISTINCT s0, s1, ...),
+ *     - X
+ * 
+ * + * into + * + *
+ * - Aggregation
+ *          GROUP BY (k)
+ *          F1(s0, s1, ...)
+ *          F2(s0, s1, ...)
+ *      - Aggregation
+ *             GROUP BY (k, s0, s1, ...)
+ *          - X
+ * 
+ * + *

Assumes s0, s1, ... are symbol references (i.e., complex expressions have been pre-projected) + */ +public class SingleDistinctAggregationToGroupBy implements Rule { + private static final Pattern PATTERN = + aggregation() + .matching(SingleDistinctAggregationToGroupBy::hasSingleDistinctInput) + .matching(SingleDistinctAggregationToGroupBy::allDistinctAggregates) + .matching(SingleDistinctAggregationToGroupBy::noFilters) + .matching(SingleDistinctAggregationToGroupBy::noOrdering) + .matching(SingleDistinctAggregationToGroupBy::noMasks); + + private static boolean hasSingleDistinctInput(AggregationNode aggregationNode) { + return extractArgumentSets(aggregationNode).count() == 1; + } + + private static boolean allDistinctAggregates(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .allMatch(AggregationNode.Aggregation::isDistinct); + } + + private static boolean noFilters(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .noneMatch(aggregation -> aggregation.getFilter().isPresent()); + } + + private static boolean noOrdering(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .noneMatch(aggregation -> aggregation.getOrderingScheme().isPresent()); + } + + private static boolean noMasks(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .noneMatch(aggregation -> aggregation.getMask().isPresent()); + } + + private static Stream> extractArgumentSets(AggregationNode aggregationNode) { + return aggregationNode.getAggregations().values().stream() + .filter(AggregationNode.Aggregation::isDistinct) + .map(AggregationNode.Aggregation::getArguments) + .>map(HashSet::new) + .distinct(); + } + + @Override + public Pattern getPattern() { + return PATTERN; + } + + @Override + public Result apply(AggregationNode aggregation, Captures captures, Context context) { + List> argumentSets = + extractArgumentSets(aggregation).collect(Collectors.toList()); + + Set symbols = + Iterables.getOnlyElement(argumentSets).stream() + .map(Symbol::from) + .collect(Collectors.toSet()); + + return Result.ofPlanNode( + AggregationNode.builderFrom(aggregation) + .setSource( + singleAggregation( + context.getIdAllocator().genPlanNodeId(), + aggregation.getChild(), + ImmutableMap.of(), + singleGroupingSet( + ImmutableList.builder() + .addAll(aggregation.getGroupingKeys()) + .addAll(symbols) + .build()))) + .setAggregations( + // remove DISTINCT flag from function calls + aggregation.getAggregations().entrySet().stream() + .collect( + Collectors.toMap(Map.Entry::getKey, e -> removeDistinct(e.getValue())))) + .setPreGroupedSymbols(ImmutableList.of()) + .build()); + } + + private static AggregationNode.Aggregation removeDistinct( + AggregationNode.Aggregation aggregation) { + checkArgument(aggregation.isDistinct(), "Expected aggregation to have DISTINCT input"); + + return new AggregationNode.Aggregation( + aggregation.getResolvedFunction(), + aggregation.getArguments(), + false, + aggregation.getFilter(), + aggregation.getOrderingScheme(), + aggregation.getMask()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java index 98c9caf9e610..59bc3df472f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java @@ -175,7 +175,7 @@ public PlanNode clone() { @Override public List getOutputColumnNames() { - return null; + throw new UnsupportedOperationException(); } @Override @@ -577,6 +577,10 @@ public Optional getMask() { return mask; } + public boolean hasMask() { + return mask.isPresent(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MarkDistinctNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MarkDistinctNode.java new file mode 100644 index 000000000000..e8d13ae3a168 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MarkDistinctNode.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class MarkDistinctNode extends SingleChildProcessNode { + private final Symbol markerSymbol; + + private final Optional hashSymbol; + private final List distinctSymbols; + + public MarkDistinctNode( + PlanNodeId id, + PlanNode child, + Symbol markerSymbol, + List distinctSymbols, + Optional hashSymbol) { + super(id); + this.child = child; + this.markerSymbol = requireNonNull(markerSymbol, "markerSymbol is null"); + this.hashSymbol = requireNonNull(hashSymbol, "hashSymbol is null"); + requireNonNull(distinctSymbols, "distinctSymbols is null"); + checkArgument(!distinctSymbols.isEmpty(), "distinctSymbols cannot be empty"); + this.distinctSymbols = ImmutableList.copyOf(distinctSymbols); + } + + @Override + public List getOutputSymbols() { + return ImmutableList.builder() + .addAll(child.getOutputSymbols()) + .add(markerSymbol) + .build(); + } + + @Override + public List getChildren() { + return ImmutableList.of(child); + } + + @Override + public PlanNode clone() { + return new MarkDistinctNode(id, child, markerSymbol, distinctSymbols, hashSymbol); + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + public Symbol getMarkerSymbol() { + return markerSymbol; + } + + public List getDistinctSymbols() { + return distinctSymbols; + } + + public Optional getHashSymbol() { + return hashSymbol; + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitMarkDistinct(this, context); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.MARK_DISTINCT_NODE.serialize(byteBuffer); + Symbol.serialize(markerSymbol, byteBuffer); + if (hashSymbol.isPresent()) { + ReadWriteIOUtils.write(true, byteBuffer); + Symbol.serialize(hashSymbol.get(), byteBuffer); + } else { + ReadWriteIOUtils.write(false, byteBuffer); + } + ReadWriteIOUtils.write(distinctSymbols.size(), byteBuffer); + for (Symbol distinctSymbol : distinctSymbols) { + Symbol.serialize(distinctSymbol, byteBuffer); + ; + } + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.MARK_DISTINCT_NODE.serialize(stream); + Symbol.serialize(markerSymbol, stream); + if (hashSymbol.isPresent()) { + ReadWriteIOUtils.write(true, stream); + Symbol.serialize(hashSymbol.get(), stream); + } else { + ReadWriteIOUtils.write(false, stream); + } + ReadWriteIOUtils.write(distinctSymbols.size(), stream); + for (Symbol distinctSymbol : distinctSymbols) { + Symbol.serialize(distinctSymbol, stream); + } + } + + public static MarkDistinctNode deserialize(ByteBuffer byteBuffer) { + Symbol markerSymbol = Symbol.deserialize(byteBuffer); + Optional hashSymbol = Optional.empty(); + if (ReadWriteIOUtils.readBool(byteBuffer)) { + hashSymbol = Optional.of(Symbol.deserialize(byteBuffer)); + } + int size = ReadWriteIOUtils.readInt(byteBuffer); + List distinctSymbols = new ArrayList<>(size); + while (size-- > 0) { + distinctSymbols.add(Symbol.deserialize(byteBuffer)); + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new MarkDistinctNode(planNodeId, null, markerSymbol, distinctSymbols, hashSymbol); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + return new MarkDistinctNode( + getPlanNodeId(), + Iterables.getOnlyElement(newChildren), + markerSymbol, + distinctSymbols, + hashSymbol); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index 604e2b06a3a3..5b16f6f03bf7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -125,10 +125,9 @@ public static Pattern limit() { return typeOf(LimitNode.class); } - /*public static Pattern markDistinct() - { - return typeOf(MarkDistinctNode.class); - }*/ + public static Pattern markDistinct() { + return typeOf(MarkDistinctNode.class); + } public static Pattern output() { return typeOf(OutputNode.class); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index b3766383c180..33450a79b866 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationSourceColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyColumns; @@ -45,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneJoinChildrenColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneJoinColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneLimitColumns; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneMarkDistinctColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneOffsetColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneOutputSourceColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneProjectColumns; @@ -59,6 +62,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveTrivialFilters; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveUnreferencedScalarSubqueries; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.SimplifyExpressions; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.SingleDistinctAggregationToGroupBy; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedSubqueryToJoin; @@ -93,6 +97,7 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) { new PruneGapFillColumns(), new PruneFillColumns(), new PruneLimitColumns(), + new PruneMarkDistinctColumns(), new PruneOffsetColumns(), new PruneOutputSourceColumns(), new PruneProjectColumns(), @@ -173,10 +178,14 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) { // new ReplaceRedundantJoinWithSource(), // new RemoveRedundantJoin(), // new ReplaceRedundantJoinWithProject(), - new RemoveRedundantEnforceSingleRowNode() + new RemoveRedundantEnforceSingleRowNode(), // new RemoveRedundantExists(), // new RemoveRedundantWindow(), - // new SingleDistinctAggregationToGroupBy(), + new SingleDistinctAggregationToGroupBy(), + // Our AggregationPushDown does not support AggregationNode with distinct, + // so there is no need to put it after AggregationPushDown, + // put it here to avoid extra ColumnPruning. + new MultipleDistinctAggregationToMarkDistinct() // new MergeLimitWithDistinct(), // new PruneCountAggregationOverScalar(metadata), // new SimplifyCountOverConstant(plannerContext), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 2b4144389837..092a53b9d1a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; @@ -352,6 +353,26 @@ public PlanAndMappings visitExplainAnalyze(ExplainAnalyzeNode node, UnaliasConte rewrittenSource.getMappings()); } + @Override + public PlanAndMappings visitMarkDistinct(MarkDistinctNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + Symbol newMarkerSymbol = mapper.map(node.getMarkerSymbol()); + List newDistinctSymbols = mapper.mapAndDistinct(node.getDistinctSymbols()); + Optional newHashSymbol = node.getHashSymbol().map(mapper::map); + + return new PlanAndMappings( + new MarkDistinctNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + newMarkerSymbol, + newDistinctSymbols, + newHashSymbol), + mapping); + } + @Override public PlanAndMappings visitLimit(LimitNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/DistinctTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/DistinctTest.java index 0a1ce1e3c35f..b0c40afd8a10 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/DistinctTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/DistinctTest.java @@ -32,10 +32,12 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationTableScan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.distinctAggregationFunction; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.markDistinct; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; @@ -207,4 +209,120 @@ public void simpleAggFunctionDistinctTest() { ImmutableSet.of("s1", "s2", "s3")), exchange()))))); } + + @Test + public void optimizer1Test() { + // Hit optimizer SingleDistinctAggregationToGroupBy + PlanTester planTester = new PlanTester(); + LogicalQueryPlan logicalQueryPlan = + planTester.createPlan( + "select count(distinct s1), avg(distinct s1) from table1 group by tag1"); + assertPlan( + logicalQueryPlan, + output( + project( + aggregation( + singleGroupingSet("tag1"), + ImmutableMap.of( + Optional.of("count"), + aggregationFunction("count", ImmutableList.of("s1")), + Optional.of("avg"), + aggregationFunction("avg", ImmutableList.of("s1"))), + Optional.empty(), + SINGLE, + aggregation( + singleGroupingSet("tag1", "s1"), + ImmutableMap.of(), + ImmutableList.of("tag1"), + Optional.empty(), + SINGLE, + tableScan( + "testdb.table1", + ImmutableList.of("tag1", "s1"), + ImmutableSet.of("s1", "tag1"))))))); + } + + @Test + public void optimizer2Test() { + // Hit optimizer MultipleDistinctAggregationToMarkDistinct + // case1: global Agg + PlanTester planTester = new PlanTester(); + LogicalQueryPlan logicalQueryPlan = + planTester.createPlan("select count(distinct s1), count(s2) from table1"); + assertPlan( + logicalQueryPlan, + output( + aggregation( + singleGroupingSet(), + ImmutableMap.of( + Optional.of("count"), + aggregationFunction("count", ImmutableList.of("s1")), + Optional.of("count_0"), + aggregationFunction("count", ImmutableList.of("s2"))), + ImmutableList.of(), + ImmutableList.of("s1$distinct"), + Optional.empty(), + SINGLE, + markDistinct( + "s1$distinct", + ImmutableList.of("s1"), + tableScan( + "testdb.table1", + ImmutableList.of("s1", "s2"), + ImmutableSet.of("s1", "s2")))))); + + // case2: groupingKeys are more than one + logicalQueryPlan = + planTester.createPlan( + "select count(distinct s1), count(s2) from table1 group by tag1, tag2"); + assertPlan( + logicalQueryPlan, + output( + project( + aggregation( + singleGroupingSet("tag1", "tag2"), + ImmutableMap.of( + Optional.of("count"), + aggregationFunction("count", ImmutableList.of("s1")), + Optional.of("count_0"), + aggregationFunction("count", ImmutableList.of("s2"))), + ImmutableList.of("tag1", "tag2"), + ImmutableList.of("s1$distinct"), + Optional.empty(), + SINGLE, + markDistinct( + "s1$distinct", + ImmutableList.of("tag1", "tag2", "s1"), + tableScan( + "testdb.table1", + ImmutableList.of("tag1", "tag2", "s1", "s2"), + ImmutableSet.of("tag1", "tag2", "s1", "s2"))))))); + + // DistributionPlanTest + assertPlan( + planTester.getFragmentPlan(0), + output( + project( + aggregation( + singleGroupingSet("tag1", "tag2"), + ImmutableMap.of( + Optional.of("count"), + aggregationFunction("count", ImmutableList.of("s1")), + Optional.of("count_0"), + aggregationFunction("count", ImmutableList.of("s2"))), + ImmutableList.of("tag1", "tag2"), + ImmutableList.of("s1$distinct"), + Optional.empty(), + SINGLE, + markDistinct( + "s1$distinct", + ImmutableList.of("tag1", "tag2", "s1"), + mergeSort( + exchange(), + tableScan( + "testdb.table1", + ImmutableList.of("tag1", "tag2", "s1", "s2"), + ImmutableSet.of("tag1", "tag2", "s1", "s2")), + exchange())))))); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/MarkDistinctMatcher.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/MarkDistinctMatcher.java new file mode 100644 index 000000000000..27c74560f8d6 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/MarkDistinctMatcher.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.assertions; + +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.NO_MATCH; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.match; + +public class MarkDistinctMatcher implements Matcher { + private final PlanTestSymbol markerSymbol; + private final List distinctSymbols; + private final Optional hashSymbol; + + public MarkDistinctMatcher( + PlanTestSymbol markerSymbol, + List distinctSymbols, + Optional hashSymbol) { + this.markerSymbol = requireNonNull(markerSymbol, "markerSymbol is null"); + this.distinctSymbols = ImmutableList.copyOf(distinctSymbols); + this.hashSymbol = requireNonNull(hashSymbol, "hashSymbol is null"); + } + + @Override + public boolean shapeMatches(PlanNode node) { + return node instanceof MarkDistinctNode; + } + + @Override + public MatchResult detailMatches( + PlanNode node, SessionInfo session, Metadata metadata, SymbolAliases symbolAliases) { + checkState( + shapeMatches(node), + "Plan testing framework error: shapeMatches returned false in detailMatches in %s", + this.getClass().getName()); + MarkDistinctNode markDistinctNode = (MarkDistinctNode) node; + + if (!markDistinctNode + .getHashSymbol() + .equals(hashSymbol.map(alias -> alias.toSymbol(symbolAliases)))) { + return NO_MATCH; + } + + if (!ImmutableSet.copyOf(markDistinctNode.getDistinctSymbols()) + .equals( + distinctSymbols.stream() + .map(alias -> alias.toSymbol(symbolAliases)) + .collect(toImmutableSet()))) { + return NO_MATCH; + } + + return match(markerSymbol.toString(), markDistinctNode.getMarkerSymbol().toSymbolReference()); + } + + @Override + public String toString() { + return toStringHelper(this) + .add("markerSymbol", markerSymbol) + .add("distinctSymbols", distinctSymbols) + .add("hashSymbol", hashSymbol) + .toString(); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index edc87ae7efe7..73e25f17648d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; @@ -393,6 +394,27 @@ public static PlanMatchPattern aggregationTableScan( return result; } + public static PlanMatchPattern markDistinct( + String markerSymbol, List distinctSymbols, PlanMatchPattern source) { + return node(MarkDistinctNode.class, source) + .with( + new MarkDistinctMatcher( + new SymbolAlias(markerSymbol), toSymbolAliases(distinctSymbols), Optional.empty())); + } + + public static PlanMatchPattern markDistinct( + String markerSymbol, + List distinctSymbols, + String hashSymbol, + PlanMatchPattern source) { + return node(MarkDistinctNode.class, source) + .with( + new MarkDistinctMatcher( + new SymbolAlias(markerSymbol), + toSymbolAliases(distinctSymbols), + Optional.of(new SymbolAlias(hashSymbol)))); + } + /* public static PlanMatchPattern distinctLimit(long limit, List distinctSymbols, PlanMatchPattern source) {