diff --git a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TimechartTransformation.java b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TimechartTransformation.java index 2ce11d7a..7333cfb2 100644 --- a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TimechartTransformation.java +++ b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TimechartTransformation.java @@ -47,19 +47,15 @@ import com.teragrep.pth10.ast.*; import com.teragrep.pth10.ast.bo.*; -import com.teragrep.pth10.ast.bo.Token.Type; import com.teragrep.pth10.ast.commands.aggregate.AggregateFunction; import com.teragrep.pth10.steps.timechart.TimechartStep; import com.teragrep.pth_03.antlr.DPLParser; import com.teragrep.pth_03.antlr.DPLParserBaseVisitor; -import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.tree.ParseTree; import org.apache.spark.sql.Column; import org.apache.spark.sql.functions; import org.apache.spark.unsafe.types.CalendarInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; import java.util.*; import java.util.regex.Matcher; @@ -67,34 +63,24 @@ /** * Class that contains the visitor methods for the timechart command
- * Provides a pivoted dataset, making it easier to form time-field graphs in the UI - *
Dataset.groupBy("_time").pivot(aggregateField).sum(fieldname)
*/ -public class TimechartTransformation extends DPLParserBaseVisitor { +public final class TimechartTransformation extends DPLParserBaseVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(TimechartTransformation.class); - private DPLParserCatalystContext catCtx = null; - private DPLParserCatalystVisitor catVisitor; - private Document doc; + private final DPLParserCatalystContext catCtx; - EvalTransformation evalTransformation; - AggregateFunction aggregateFunction; - private String aggregateField = null; + private final AggregateFunction aggregateFunction; - public TimechartStep timechartStep = null; + // fields set in visit functions + private Column span; + private final ArrayList aggFunCols = new ArrayList<>(); + private final ArrayList divByInsts = new ArrayList<>(); - public TimechartTransformation(DPLParserCatalystContext catCtx, DPLParserCatalystVisitor catVisitor) { - this.doc = null; + public TimechartTransformation(final DPLParserCatalystContext catCtx) { this.catCtx = catCtx; - this.catVisitor = catVisitor; - this.evalTransformation = new EvalTransformation(catCtx); this.aggregateFunction = new AggregateFunction(catCtx); } - public String getAggregateField() { - return this.aggregateField; - } - /** * timechartTransformation : COMMAND_MODE_TIMECHART (t_timechart_sepParameter)? (t_timechart_formatParameter)? * (t_timechart_fixedrangeParameter)? (t_timechart_partialParameter)? (t_timechart_contParameter)? @@ -108,70 +94,20 @@ public Node visitTimechartTransformation(DPLParser.TimechartTransformationContex } private Node timechartTransformationEmitCatalyst(DPLParser.TimechartTransformationContext ctx) { - this.timechartStep = new TimechartStep(); - - Column span = null; - - if (ctx.t_timechart_binOptParameter() != null && !ctx.t_timechart_binOptParameter().isEmpty()) { - LOGGER.info("Timechart Optional parameters: <[{}]>", ctx.t_timechart_binOptParameter().get(0).getText()); - - ColumnNode spanNode = (ColumnNode) visit(ctx.t_timechart_binOptParameter().get(0)); - if (spanNode != null) { - span = spanNode.getColumn(); - } - } - - Column funCol = null; - List listOfAggFunCols = new ArrayList<>(); - List listOfDivideByInst = new ArrayList<>(); - for (int i = 0; i < ctx.getChildCount(); i++) { - ParseTree child = ctx.getChild(i); - - if (child instanceof DPLParser.AggregateFunctionContext) { - // go through each agg. function - DPLParser.AggregateFunctionContext aggFunCtx = (DPLParser.AggregateFunctionContext) child; - Node funNode = visit(aggFunCtx); - if (funNode != null) { - if (funCol != null) { - listOfAggFunCols.add(funCol); - } - funCol = ((ColumnNode) funNode).getColumn(); - } - } - else if (child instanceof DPLParser.T_timechart_divideByInstructionContext) { - String divByInst = ((StringNode) visitT_timechart_divideByInstruction( - (DPLParser.T_timechart_divideByInstructionContext) child - )).toString(); - listOfDivideByInst.add(divByInst); - } - else if (child instanceof DPLParser.T_timechart_fieldRenameInstructionContext) { - if (funCol != null) { - funCol = funCol.as(visit(child).toString()); - } - } - } - listOfAggFunCols.add(funCol); // need to add last one; for loop above only adds if there's a new one coming + span = createDefaultSpan(); - if (span == null) { - span = createDefaultSpan(); - } + visitChildren(ctx); // visit all the parameters - timechartStep.setHdfsPath(this.catVisitor.getHdfsPath()); - timechartStep.setCatCtx(catCtx); - timechartStep.setSpan(span); - timechartStep.setAggCols(listOfAggFunCols); - timechartStep.setDivByInsts(listOfDivideByInst); + TimechartStep timechartStep = new TimechartStep(aggFunCols, divByInsts, span); // span this.catCtx.setTimeChartSpanSeconds(getSpanSeconds(span)); - LOGGER.debug("span= <[{}]>", timechartStep.getSpan().toString()); - LOGGER.debug("aggcols= <[{}]>", Arrays.toString(timechartStep.getAggCols().toArray())); - LOGGER.debug("divby= <[{}]>", Arrays.toString(timechartStep.getDivByInsts().toArray())); + LOGGER.debug("span= <[{}]>", span); + LOGGER.debug("aggcols= <[{}]>", aggFunCols); + LOGGER.debug("divby= <[{}]>", divByInsts); return new StepNode(timechartStep); - - //throw new RuntimeException("Chart transformation operation not supported yet"); } /** @@ -209,47 +145,35 @@ else if (isWithinNumber && spanChar != ' ') { @Override public Node visitAggregateFunction(DPLParser.AggregateFunctionContext ctx) { - Node rv = aggregateFunction.visitAggregateFunction(ctx); - if (aggregateField == null) - aggregateField = aggregateFunction.getAggregateField(); - return aggregateFunction.visitAggregateFunction(ctx); + ColumnNode aggCol = (ColumnNode) aggregateFunction.visitAggregateFunction(ctx); + aggFunCols.add(aggCol.getColumn()); + return new NullNode(); } @Override public Node visitT_timechart_divideByInstruction(DPLParser.T_timechart_divideByInstructionContext ctx) { - // LOGGER.info(ctx.getChildCount()+"--visitT_chart_divideByInstruction incoming{}", ctx.getText()); - if (ctx.getChildCount() == 0) { - return null; - } - String target = ctx.getChild(1).getChild(0).toString(); + String field = ctx.fieldType().getChild(0).toString(); + divByInsts.add(field); - if (doc != null) { - Element el = doc.createElement("divideBy"); - el.setAttribute("field", target); - return new ElementNode(el); - } - else { - return new StringNode(new Token(Type.STRING, target)); - } + return new NullNode(); } @Override public Node visitT_timechart_fieldRenameInstruction(DPLParser.T_timechart_fieldRenameInstructionContext ctx) { - String field = ctx.getChild(1).getText(); - if (doc != null) { - Element el = doc.createElement("fieldRename"); - el.setAttribute("field", field); - return new ElementNode(el); - } - else { - return new StringNode(new Token(Type.STRING, field)); + String rename = ctx.getChild(1).getText(); + if (!aggFunCols.isEmpty()) { + Column latestAgg = aggFunCols.remove(aggFunCols.size() - 1); + aggFunCols.add(latestAgg.as(rename)); // rename the newest visited aggregation column } + + return new NullNode(); } @Override public Node visitT_timechart_binOptParameter(DPLParser.T_timechart_binOptParameterContext ctx) { LOGGER.info("visitT_timechart_binOptParameter:<{}>", ctx.getText()); - return visitChildren(ctx); + span = ((ColumnNode) visitChildren(ctx)).getColumn(); + return new NullNode(); } @Override @@ -268,10 +192,9 @@ public Node visitT_timechart_binSpanParameter(DPLParser.T_timechart_binSpanParam * @return */ private Column createDefaultSpan() { - long sec = 0; + final long sec; + final String duration; TimeRange tr = TimeRange.ONE_HOUR; - String duration = "1 days"; // Default duration - // LOGGER.info("createDefaultSpan="+catCtx.getTimeRange()); DPLParserConfig pConf = catCtx.getParserConfig(); if (pConf != null) { tr = pConf.getTimeRange(); @@ -332,7 +255,6 @@ private CalendarInterval getSpanLength(String value) { // default timescale is sec String timescale = "sec"; int numericalValue; - int month = 0; long sec = 0; Pattern p = Pattern.compile("\\d+"); Matcher m = p.matcher(value); @@ -400,7 +322,7 @@ private CalendarInterval getSpanLength(String value) { break; } } - return new CalendarInterval(month, 0, sec * 1000 * 1000L); + return new CalendarInterval(0, 0, sec * 1000 * 1000L); } @Override @@ -483,33 +405,8 @@ public Node visitT_timechart_evaledField(DPLParser.T_timechart_evaledFieldContex return visitChildren(ctx); } - /*@Override public Node visitT_timechart_singleAggregation(DPLParser.T_timechart_singleAggregationContext ctx) { - String oper = ctx.getText(); - String defaultField="*"; - Node rv = null; - Column col = null; - if(oper.equalsIgnoreCase("count") || oper.equalsIgnoreCase("c")) { - aggregateField = "count"; // use default name - col = org.apache.spark.sql.functions.count(defaultField); - // LOGGER.info("T_timechart_singleAggregation (Catalyst):{}", col.expr().sql()+" default field="+defaultField); - traceBuffer.add("Visit AggregateMethodCount(Catalyst):{}", col.expr().sql()); - rv = new ColumnNode(col); - }else { - rv = this.aggregateFunction.visitAggregateFunction(ctx.aggregateFunction()); - this.aggregateField = aggregateFunction.getAggregateField(); - } - // Check whether field needs to be renamed - if(ctx.t_timechart_fieldRenameInstruction() != null){ - Node renameCmd = visitT_timechart_fieldRenameInstruction(ctx.t_timechart_fieldRenameInstruction()); - aggregateField = renameCmd.toString(); - // rv = new ColumnNode(((ColumnNode) rv).getColumn().as(renameCmd.toString())); - } - return rv; - }*/ - @Override public Node visitSpanType(DPLParser.SpanTypeContext ctx) { - // LOGGER.info("visitSpanType:"+ctx.getText()); return visitChildren(ctx); } diff --git a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TransformStatement.java b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TransformStatement.java index 81387858..1db84020 100644 --- a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TransformStatement.java +++ b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TransformStatement.java @@ -176,7 +176,7 @@ public Node visitChartTransformation(DPLParser.ChartTransformationContext ctx) { @Override public Node visitTimechartTransformation(DPLParser.TimechartTransformationContext ctx) { // timechart command - return new TimechartTransformation(catCtx, catVisitor).visitTimechartTransformation(ctx); + return new TimechartTransformation(catCtx).visitTimechartTransformation(ctx); } @Override diff --git a/src/main/java/com/teragrep/pth10/steps/timechart/AbstractTimechartStep.java b/src/main/java/com/teragrep/pth10/steps/timechart/AbstractTimechartStep.java deleted file mode 100644 index 28bf1c8a..00000000 --- a/src/main/java/com/teragrep/pth10/steps/timechart/AbstractTimechartStep.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10) - * Copyright (C) 2019-2024 Suomen Kanuuna Oy - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - * Additional permission under GNU Affero General Public License version 3 - * section 7 - * - * If you modify this Program, or any covered work, by linking or combining it - * with other code, such other code is not for that reason alone subject to any - * of the requirements of the GNU Affero GPL version 3 as long as this Program - * is the same Program as licensed from Suomen Kanuuna Oy without any additional - * modifications. - * - * Supplemented terms under GNU Affero General Public License version 3 - * section 7 - * - * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified - * versions must be marked as "Modified version of" The Program. - * - * Names of the licensors and authors may not be used for publicity purposes. - * - * No rights are granted for use of trade names, trademarks, or service marks - * which are in The Program if any. - * - * Licensee must indemnify licensors and authors for any liability that these - * contractual assumptions impose on licensors and authors. - * - * To the extent this program is licensed as part of the Commercial versions of - * Teragrep, the applicable Commercial License may apply to this file if you as - * a licensee so wish it. - */ -package com.teragrep.pth10.steps.timechart; - -import com.teragrep.pth10.ast.DPLParserCatalystContext; -import com.teragrep.pth10.steps.AbstractStep; -import org.apache.spark.sql.Column; - -import java.util.List; - -public abstract class AbstractTimechartStep extends AbstractStep { - - protected List aggCols = null; - protected List divByInsts = null; - protected Column span = null; - protected String aggregateField = null; - protected DPLParserCatalystContext catCtx = null; - protected String hdfsPath = null; - - public AbstractTimechartStep() { - super(); - } - - public void setAggCols(List aggCols) { - this.aggCols = aggCols; - } - - public List getAggCols() { - return aggCols; - } - - public void setDivByInsts(List divByInsts) { - this.divByInsts = divByInsts; - } - - public List getDivByInsts() { - return divByInsts; - } - - public void setAggregateField(String field) { - this.aggregateField = field; - } - - public String getAggregateField() { - return this.aggregateField; - } - - public void setSpan(Column span) { - this.span = span; - } - - public Column getSpan() { - return this.span; - } - - public void setCatCtx(DPLParserCatalystContext catCtx) { - this.catCtx = catCtx; - } - - public DPLParserCatalystContext getCatCtx() { - return catCtx; - } - - public void setHdfsPath(String hdfsPath) { - this.hdfsPath = hdfsPath; - } - - public String getHdfsPath() { - return hdfsPath; - } -} diff --git a/src/main/java/com/teragrep/pth10/steps/timechart/TimechartStep.java b/src/main/java/com/teragrep/pth10/steps/timechart/TimechartStep.java index 9849094d..06bbdd1f 100644 --- a/src/main/java/com/teragrep/pth10/steps/timechart/TimechartStep.java +++ b/src/main/java/com/teragrep/pth10/steps/timechart/TimechartStep.java @@ -45,18 +45,27 @@ */ package com.teragrep.pth10.steps.timechart; +import com.teragrep.pth10.steps.AbstractStep; import org.apache.spark.sql.*; import scala.collection.JavaConversions; import scala.collection.Seq; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; -public final class TimechartStep extends AbstractTimechartStep { +public final class TimechartStep extends AbstractStep { - public TimechartStep() { + private final List aggCols; + private final List divByInsts; + private final Column span; + + public TimechartStep(final List aggCols, final List divByInsts, final Column span) { super(); + this.aggCols = aggCols; + this.divByInsts = divByInsts; + this.span = span; this.properties.add(CommandProperty.AGGREGATE); } @@ -66,18 +75,17 @@ public Dataset get(Dataset dataset) { return null; } - if (this.getAggCols() == null || this.getAggCols().isEmpty()) { + if (aggCols.isEmpty()) { throw new RuntimeException("Aggregate columns not present in TimechartStep, cannot proceed"); } // .agg has funky arguments; just giving a Seq of columns is no good, first arg needs to be a column - Column firstAggCol = this.aggCols.get(0); - Seq seqOfAggColsExceptFirst = JavaConversions - .asScalaBuffer(this.aggCols.subList(1, this.aggCols.size())); + Column firstAggCol = aggCols.get(0); + Seq seqOfAggColsExceptFirst = JavaConversions.asScalaBuffer(aggCols.subList(1, aggCols.size())); List allGroupBys = new ArrayList<>(); - allGroupBys.add(this.span); - allGroupBys.addAll(this.divByInsts.stream().map(functions::col).collect(Collectors.toList())); + allGroupBys.add(span); + allGroupBys.addAll(divByInsts.stream().map(functions::col).collect(Collectors.toList())); Seq seqOfAllGroupBys = JavaConversions.asScalaBuffer(allGroupBys); @@ -90,4 +98,20 @@ public Dataset get(Dataset dataset) { .drop("window") .orderBy("_time"); } + + @Override + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + final TimechartStep that = (TimechartStep) o; + // Column.equals() doesn't work, using String representation instead to see that the same operations are done on both + return Objects.equals(aggCols.toString(), that.aggCols.toString()) + && Objects.equals(divByInsts, that.divByInsts) && Objects.equals(span.toString(), that.span.toString()); + } + + @Override + public int hashCode() { + return Objects.hash(aggCols, divByInsts, span); + } } diff --git a/src/test/java/com/teragrep/pth10/translationTests/TimechartTest.java b/src/test/java/com/teragrep/pth10/translationTests/TimechartTest.java index 4a394255..c9c1abd4 100644 --- a/src/test/java/com/teragrep/pth10/translationTests/TimechartTest.java +++ b/src/test/java/com/teragrep/pth10/translationTests/TimechartTest.java @@ -46,8 +46,11 @@ package com.teragrep.pth10.translationTests; import com.teragrep.pth10.ast.DPLParserCatalystContext; -import com.teragrep.pth10.ast.DPLParserCatalystVisitor; +import com.teragrep.pth10.ast.bo.ColumnNode; +import com.teragrep.pth10.ast.bo.StepNode; +import com.teragrep.pth10.ast.commands.aggregate.AggregateFunction; import com.teragrep.pth10.ast.commands.transformstatement.TimechartTransformation; +import com.teragrep.pth10.steps.AbstractStep; import com.teragrep.pth10.steps.timechart.TimechartStep; import com.teragrep.pth_03.antlr.DPLLexer; import com.teragrep.pth_03.antlr.DPLParser; @@ -55,95 +58,147 @@ import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.CharStreams; import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.CommonTokenStream; import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.tree.ParseTree; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.functions; +import org.apache.spark.unsafe.types.CalendarInterval; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class TimechartTest { - private static final Logger LOGGER = LoggerFactory.getLogger(TimechartTest.class); - @Test void testTimeChartTranslation() { - String query = "| timechart span=5min sum(sales) as sales by product"; + String rename = "sales"; + String byField = "product"; + String query = "| timechart span=5min sum(sales) as " + rename + " by " + byField; + + DPLParserCatalystContext ctx = new DPLParserCatalystContext(null); + + // create parse tree with PTH-03 CharStream inputStream = CharStreams.fromString(query); DPLLexer lexer = new DPLLexer(inputStream); DPLParser parser = new DPLParser(new CommonTokenStream(lexer)); ParseTree tree = parser.root(); - LOGGER.debug(tree.toStringTree(parser)); + DPLParser.AggregateMethodSumContext aggContext = (DPLParser.AggregateMethodSumContext) tree + .getChild(1) + .getChild(0) + .getChild(2) + .getChild(0); + DPLParser.TimechartTransformationContext timechartContext = (DPLParser.TimechartTransformationContext) tree + .getChild(1) + .getChild(0); - DPLParserCatalystContext ctx = new DPLParserCatalystContext(null); - // Use this file for dataset initialization - String testFile = "src/test/resources/timechartTestData.jsonl"; ctx.setEarliest("-1w"); - DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx); + // traverse the tree in PTH-10 and create TimechartStep + TimechartTransformation tct = new TimechartTransformation(ctx); + StepNode timechartNode = (StepNode) tct.visitTimechartTransformation(timechartContext); + AbstractStep tcs = timechartNode.get(); - TimechartTransformation tct = new TimechartTransformation(ctx, visitor); - tct.visitTimechartTransformation((DPLParser.TimechartTransformationContext) tree.getChild(1).getChild(0)); - TimechartStep tcs = tct.timechartStep; + // expected BY clause + List divByInsts = new ArrayList<>(); + divByInsts.add(byField); - Assertions.assertEquals("window(_time, 300000000, 300000000, 0) AS window", tcs.getSpan().toString()); - Assertions - .assertEquals( - "sumaggregator(encodeusingserializer(input[0, java.lang.Object, true], false) AS value, decodeusingserializer(input[0, binary, true], com.teragrep.pth10.ast.commands.aggregate.UDAFs.BufferClasses.SumBuffer, false), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true)) AS `sum(sales)` AS sales", - tcs.getAggCols().get(0).toString() - ); - Assertions.assertEquals("product", tcs.getDivByInsts().get(0)); + // expected aggregations + AggregateFunction aggregateFunction = new AggregateFunction(ctx); + ColumnNode aggColNode = (ColumnNode) aggregateFunction.visitAggregateMethodSum(aggContext); // "sum(sales)" aggregation + Column aggCol = aggColNode.getColumn().as(rename); // "as sales" + List aggCols = new ArrayList<>(); + aggCols.add(aggCol); + + // expected span + CalendarInterval ival = new CalendarInterval(0, 0, 5 * 60 * 1000 * 1000); + Column span = functions.window(new Column("_time"), String.valueOf(ival), "5 minutes", "0 minutes"); + + TimechartStep expected = new TimechartStep(aggCols, divByInsts, span); + + Assertions.assertEquals(expected, tcs); } @Test void testTimeChartTranslation_NoByClause() { - String query = "| timechart span=5min sum(sales) as sales"; + String rename = "sales"; + String query = "| timechart span=5min sum(sales) as " + rename; + + // create parse tree with PTH-03 CharStream inputStream = CharStreams.fromString(query); DPLLexer lexer = new DPLLexer(inputStream); DPLParser parser = new DPLParser(new CommonTokenStream(lexer)); ParseTree tree = parser.root(); - LOGGER.debug(tree.toStringTree(parser)); + + DPLParser.AggregateMethodSumContext aggContext = (DPLParser.AggregateMethodSumContext) tree + .getChild(1) + .getChild(0) + .getChild(2) + .getChild(0); + DPLParser.TimechartTransformationContext timechartContext = (DPLParser.TimechartTransformationContext) tree + .getChild(1) + .getChild(0); DPLParserCatalystContext ctx = new DPLParserCatalystContext(null); ctx.setEarliest("-1w"); - DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx); - - TimechartTransformation tct = new TimechartTransformation(ctx, visitor); - tct.visitTimechartTransformation((DPLParser.TimechartTransformationContext) tree.getChild(1).getChild(0)); - TimechartStep tcs = tct.timechartStep; - - Assertions.assertEquals("window(_time, 300000000, 300000000, 0) AS window", tcs.getSpan().toString()); - Assertions - .assertEquals( - "sumaggregator(encodeusingserializer(input[0, java.lang.Object, true], false) AS value, decodeusingserializer(input[0, binary, true], com.teragrep.pth10.ast.commands.aggregate.UDAFs.BufferClasses.SumBuffer, false), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true)) AS `sum(sales)` AS sales", - tcs.getAggCols().get(0).toString() - ); - Assertions.assertEquals(0, tcs.getDivByInsts().size()); + + // traverse the tree in PTH-10 and create TimechartStep + TimechartTransformation tct = new TimechartTransformation(ctx); + StepNode timechartNode = (StepNode) tct.visitTimechartTransformation(timechartContext); + AbstractStep tcs = timechartNode.get(); + + // expected aggregations + AggregateFunction aggregateFunction = new AggregateFunction(ctx); + ColumnNode aggColNode = (ColumnNode) aggregateFunction.visitAggregateMethodSum(aggContext); // "sum(sales)" aggregation + Column aggCol = aggColNode.getColumn().as(rename); // "as sales" + List aggCols = new ArrayList<>(); + aggCols.add(aggCol); + + // expected span + CalendarInterval ival = new CalendarInterval(0, 0, 5 * 60 * 1000 * 1000); + Column span = functions.window(new Column("_time"), String.valueOf(ival), "5 minutes", "0 minutes"); + + TimechartStep expected = new TimechartStep(aggCols, new ArrayList<>(), span); + + Assertions.assertEquals(expected, tcs); } @Test void testTimeChartTranslationBasic() { String query = "| timechart count"; + + // create parse tree with PTH-03 CharStream inputStream = CharStreams.fromString(query); DPLLexer lexer = new DPLLexer(inputStream); DPLParser parser = new DPLParser(new CommonTokenStream(lexer)); ParseTree tree = parser.root(); - LOGGER.debug(tree.toStringTree(parser)); + DPLParser.AggregateFunctionContext aggContext = (DPLParser.AggregateFunctionContext) tree + .getChild(1) + .getChild(0) + .getChild(1); DPLParserCatalystContext ctx = new DPLParserCatalystContext(null); ctx.setEarliest("-1w"); - DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx); - - TimechartTransformation tct = new TimechartTransformation(ctx, visitor); - tct.visitTimechartTransformation((DPLParser.TimechartTransformationContext) tree.getChild(1).getChild(0)); - TimechartStep tcs = tct.timechartStep; - - Assertions.assertEquals("window(_time, 86400000000, 86400000000, 0) AS window", tcs.getSpan().toString()); - Assertions - .assertEquals( - "countaggregator(input[0, java.lang.Long, true].longValue AS value, staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, input[0, bigint, true], true, false, true), input[0, java.lang.Long, true].longValue) AS count", - tcs.getAggCols().get(0).toString() - ); - Assertions.assertEquals(0, tcs.getDivByInsts().size()); + + // traverse the tree in PTH-10 and create TimechartStep + TimechartTransformation tct = new TimechartTransformation(ctx); + StepNode timechartNode = (StepNode) tct + .visitTimechartTransformation((DPLParser.TimechartTransformationContext) tree.getChild(1).getChild(0)); + AbstractStep tcs = timechartNode.get(); + + // expected aggregations + AggregateFunction aggregateFunction = new AggregateFunction(ctx); + ColumnNode aggColNode = (ColumnNode) aggregateFunction.visitAggregateFunction(aggContext); // "count" aggregation + List aggCols = new ArrayList<>(); + aggCols.add(aggColNode.getColumn()); + + // expected default span of 1 day when "span=" parameter is not specified + CalendarInterval ival = new CalendarInterval(0, 1, 0); + Column span = functions.window(new Column("_time"), String.valueOf(ival), "1 day", "0 minutes"); + + TimechartStep expected = new TimechartStep(aggCols, new ArrayList<>(), span); + + Assertions.assertEquals(expected, tcs); } }