diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 6ad0cc3fcbb9..f7f747a170cf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -219,11 +219,12 @@ public void testMinQuery() { public void testTimeSeriesSumQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, - TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM")); + TimeSeriesContext timeSeriesContext = + new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, + 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null)); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); - ServerQueryRequest serverQueryRequest = new ServerQueryRequest( - queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); + ServerQueryRequest serverQueryRequest = + new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); @@ -235,11 +236,12 @@ public void testTimeSeriesSumQuery() { public void testTimeSeriesMaxQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, - TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX")); + TimeSeriesContext timeSeriesContext = + new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, + 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null)); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); - ServerQueryRequest serverQueryRequest = new ServerQueryRequest( - queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); + ServerQueryRequest serverQueryRequest = + new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); @@ -253,9 +255,8 @@ public void testTimeSeriesMaxQuery() { if (timeSeries.getTagValues()[0].equals("New York")) { assertFalse(foundNewYork, "Found multiple time-series for New York"); foundNewYork = true; - Optional maxValue = Arrays.stream(timeSeries.getValues()) - .filter(Objects::nonNull) - .max(Comparator.naturalOrder()); + Optional maxValue = + Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); assertTrue(maxValue.isPresent()); assertEquals(maxValue.get().longValue(), 4L); } @@ -267,11 +268,12 @@ public void testTimeSeriesMaxQuery() { public void testTimeSeriesMinQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, - TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN")); + TimeSeriesContext timeSeriesContext = + new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, + 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null)); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); - ServerQueryRequest serverQueryRequest = new ServerQueryRequest( - queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); + ServerQueryRequest serverQueryRequest = + new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); @@ -285,9 +287,8 @@ public void testTimeSeriesMinQuery() { if (timeSeries.getTagValues()[0].equals("Chicago")) { assertFalse(foundChicago, "Found multiple time-series for Chicago"); foundChicago = true; - Optional minValue = Arrays.stream(timeSeries.getValues()) - .filter(Objects::nonNull) - .min(Comparator.naturalOrder()); + Optional minValue = + Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); assertTrue(minValue.isPresent()); assertEquals(minValue.get().longValue(), 0L); } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index 0d0254128f1d..aa31692a337c 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -47,8 +47,8 @@ public void init(PinotConfiguration pinotConfiguration) { @Override public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) { if (!request.getLanguage().equals(Constants.LANGUAGE)) { - throw new IllegalArgumentException(String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(), - Constants.LANGUAGE)); + throw new IllegalArgumentException( + String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(), Constants.LANGUAGE)); } // Step-1: Parse and create a logical plan tree. BaseTimeSeriesPlanNode planNode = planQuery(request); @@ -61,16 +61,16 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { PlanIdGenerator planIdGenerator = new PlanIdGenerator(); Tokenizer tokenizer = new Tokenizer(request.getQuery()); List> commands = tokenizer.tokenize(); - Preconditions.checkState(commands.size() > 1, "At least two commands required. " - + "Query should start with a fetch followed by an aggregation."); + Preconditions.checkState(commands.size() > 1, + "At least two commands required. " + "Query should start with a fetch followed by an aggregation."); BaseTimeSeriesPlanNode lastNode = null; AggInfo aggInfo = null; List groupByColumns = new ArrayList<>(); BaseTimeSeriesPlanNode rootNode = null; for (int commandId = commands.size() - 1; commandId >= 0; commandId--) { String command = commands.get(commandId).get(0); - Preconditions.checkState((command.equals("fetch") && commandId == 0) - || (!command.equals("fetch") && commandId > 0), + Preconditions.checkState( + (command.equals("fetch") && commandId == 0) || (!command.equals("fetch") && commandId > 0), "fetch should be the first command"); List children = new ArrayList<>(); BaseTimeSeriesPlanNode currentNode = null; @@ -82,10 +82,9 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { case "sum": case "min": case "max": - Preconditions.checkState(commandId == 1, - "Aggregation should be the second command (fetch should be first)"); + Preconditions.checkState(commandId == 1, "Aggregation should be the second command (fetch should be first)"); Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed."); - aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH)); + aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null); if (commands.get(commandId).size() > 1) { String[] cols = commands.get(commandId).get(1).split(","); groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList()); @@ -152,7 +151,7 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); - return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, - aggInfo, groupByColumns); + return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo, + groupByColumns); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java index 81b03fa131c0..929e2669cf6a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java @@ -27,7 +27,8 @@ import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; public class PhysicalTimeSeriesPlanVisitorTest { @@ -36,7 +37,7 @@ public void testCompileQueryContext() { final String planId = "id"; final String tableName = "orderTable"; final String timeColumn = "orderTime"; - final AggInfo aggInfo = new AggInfo("SUM"); + final AggInfo aggInfo = new AggInfo("SUM", null); final String filterExpr = "cityName = 'Chicago'"; // Case-1: Without offset, simple column based group-by expression, simple column based value, and non-empty filter. { diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java index 03d9cc8aa96c..0dc3e0502def 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java @@ -21,24 +21,49 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nullable; /** - * AggInfo is used to represent the aggregation function. Aggregation functions are simply stored as a string, - * since time-series languages are allowed to implement their own aggregation functions. - * TODO: We will likely be adding more parameters to this. One candidate is partial/full aggregation information or - * aggregation result type to allow for intermediate result types. + * AggInfo is used to represent the aggregation function and its parameters. + * Aggregation functions are stored as a string, since time-series languages + * are allowed to implement their own aggregation functions. + * + * The class now includes a map of parameters, which can store various + * configuration options for the aggregation function. This allows for + * more flexibility in defining and customizing aggregations. + * + * Common parameters might include: + * - window: Defines the time window for aggregation + * + * Example usage: + * Map params = new HashMap<>(); + * params.put("window", "5m"); + * AggInfo aggInfo = new AggInfo("rate", params); */ public class AggInfo { private final String _aggFunction; + private final Map _params; @JsonCreator - public AggInfo(@JsonProperty("aggFunction") String aggFunction) { + public AggInfo(@JsonProperty("aggFunction") String aggFunction, + @JsonProperty("params") @Nullable Map params) { Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo"); _aggFunction = aggFunction; + _params = params != null ? params : Collections.emptyMap(); } public String getAggFunction() { return _aggFunction; } + + public Map getParams() { + return Collections.unmodifiableMap(_params); + } + + public String getParam(String key) { + return _params.get(key); + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java index 82694e19a019..ece46c332a9a 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -25,7 +25,7 @@ import org.apache.pinot.tsdb.spi.TimeBuckets; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; public class LeafTimeSeriesPlanNodeTest { @@ -42,34 +42,34 @@ public void testGetEffectiveFilter() { final String nonEmptyFilter = "cityName = 'Chicago'"; // Case-1: No offset, and empty filter. { - LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, - TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM"), - Collections.singletonList("cityName")); + LeafTimeSeriesPlanNode planNode = + new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col", + new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime >= " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); } // Case-2: Offset, but empty filter { - LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, - TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM"), - Collections.singletonList("cityName")); + LeafTimeSeriesPlanNode planNode = + new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col", + new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); } // Case-3: Offset and non-empty filter { - LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, - TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"), - Collections.singletonList("cityName")); + LeafTimeSeriesPlanNode planNode = + new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter, + "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); } // Case-4: Offset, and non-empty filter, and time-unit that is not seconds { - LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, - TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"), - Collections.singletonList("cityName")); + LeafTimeSeriesPlanNode planNode = + new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L, + nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index df66ea8fd94b..4bd5c37a5ae5 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -19,23 +19,28 @@ package org.apache.pinot.tsdb.spi.plan.serde; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class TimeSeriesPlanSerdeTest { @Test public void testSerdeForScanFilterProjectNode() { - LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode( - "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, - 0L, "myFilterExpression", "myValueExpression", - new AggInfo("SUM"), new ArrayList<>() - ); + Map aggParams = new HashMap<>(); + aggParams.put("window", "5m"); + + LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = + new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L, + "myFilterExpression", "myValueExpression", new AggInfo("SUM", aggParams), new ArrayList<>()); BaseTimeSeriesPlanNode planNode = TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode)); assertTrue(planNode instanceof LeafTimeSeriesPlanNode); @@ -47,6 +52,8 @@ public void testSerdeForScanFilterProjectNode() { assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression"); assertEquals(deserializedNode.getValueExpression(), "myValueExpression"); assertNotNull(deserializedNode.getAggInfo()); + assertNotNull(deserializedNode.getAggInfo().getParams()); + assertEquals(deserializedNode.getAggInfo().getParams().get("window"), "5m"); assertEquals(deserializedNode.getGroupByExpressions().size(), 0); } }