Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding custom params to AggInfo so that series builders can use it. #14173

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,12 @@ public void testMinQuery() {
public void testTimeSeriesSumQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM"));
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock);
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock();
Expand All @@ -235,11 +236,12 @@ public void testTimeSeriesSumQuery() {
public void testTimeSeriesMaxQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX"));
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock);
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock();
Expand All @@ -253,9 +255,8 @@ public void testTimeSeriesMaxQuery() {
if (timeSeries.getTagValues()[0].equals("New York")) {
assertFalse(foundNewYork, "Found multiple time-series for New York");
foundNewYork = true;
Optional<Double> maxValue = Arrays.stream(timeSeries.getValues())
.filter(Objects::nonNull)
.max(Comparator.naturalOrder());
Optional<Double> maxValue =
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
assertTrue(maxValue.isPresent());
assertEquals(maxValue.get().longValue(), 4L);
}
Expand All @@ -267,11 +268,12 @@ public void testTimeSeriesMaxQuery() {
public void testTimeSeriesMinQuery() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN"));
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock);
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock();
Expand All @@ -285,9 +287,8 @@ public void testTimeSeriesMinQuery() {
if (timeSeries.getTagValues()[0].equals("Chicago")) {
assertFalse(foundChicago, "Found multiple time-series for Chicago");
foundChicago = true;
Optional<Double> minValue = Arrays.stream(timeSeries.getValues())
.filter(Objects::nonNull)
.min(Comparator.naturalOrder());
Optional<Double> minValue =
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
assertTrue(minValue.isPresent());
assertEquals(minValue.get().longValue(), 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public void init(PinotConfiguration pinotConfiguration) {
@Override
public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) {
if (!request.getLanguage().equals(Constants.LANGUAGE)) {
throw new IllegalArgumentException(String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(),
Constants.LANGUAGE));
throw new IllegalArgumentException(
String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(), Constants.LANGUAGE));
}
// Step-1: Parse and create a logical plan tree.
BaseTimeSeriesPlanNode planNode = planQuery(request);
Expand All @@ -61,16 +61,16 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
PlanIdGenerator planIdGenerator = new PlanIdGenerator();
Tokenizer tokenizer = new Tokenizer(request.getQuery());
List<List<String>> commands = tokenizer.tokenize();
Preconditions.checkState(commands.size() > 1, "At least two commands required. "
+ "Query should start with a fetch followed by an aggregation.");
Preconditions.checkState(commands.size() > 1,
"At least two commands required. " + "Query should start with a fetch followed by an aggregation.");
BaseTimeSeriesPlanNode lastNode = null;
AggInfo aggInfo = null;
List<String> groupByColumns = new ArrayList<>();
BaseTimeSeriesPlanNode rootNode = null;
for (int commandId = commands.size() - 1; commandId >= 0; commandId--) {
String command = commands.get(commandId).get(0);
Preconditions.checkState((command.equals("fetch") && commandId == 0)
|| (!command.equals("fetch") && commandId > 0),
Preconditions.checkState(
(command.equals("fetch") && commandId == 0) || (!command.equals("fetch") && commandId > 0),
"fetch should be the first command");
List<BaseTimeSeriesPlanNode> children = new ArrayList<>();
BaseTimeSeriesPlanNode currentNode = null;
Expand All @@ -82,10 +82,9 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
case "sum":
case "min":
case "max":
Preconditions.checkState(commandId == 1,
"Aggregation should be the second command (fetch should be first)");
Preconditions.checkState(commandId == 1, "Aggregation should be the second command (fetch should be first)");
Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed.");
aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH));
aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null);
if (commands.get(commandId).size() > 1) {
String[] cols = commands.get(commandId).get(1).split(",");
groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList());
Expand Down Expand Up @@ -152,7 +151,7 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> tokens
Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col=");
Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit=");
Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value=");
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr,
aggInfo, groupByColumns);
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo,
groupByColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.testng.annotations.Test;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;


public class PhysicalTimeSeriesPlanVisitorTest {
Expand All @@ -36,7 +37,7 @@ public void testCompileQueryContext() {
final String planId = "id";
final String tableName = "orderTable";
final String timeColumn = "orderTime";
final AggInfo aggInfo = new AggInfo("SUM");
final AggInfo aggInfo = new AggInfo("SUM", null);
final String filterExpr = "cityName = 'Chicago'";
// Case-1: Without offset, simple column based group-by expression, simple column based value, and non-empty filter.
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,47 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Map;


/**
* AggInfo is used to represent the aggregation function. Aggregation functions are simply stored as a string,
* since time-series languages are allowed to implement their own aggregation functions.
* TODO: We will likely be adding more parameters to this. One candidate is partial/full aggregation information or
* aggregation result type to allow for intermediate result types.
* AggInfo is used to represent the aggregation function and its parameters.
* Aggregation functions are stored as a string, since time-series languages
* are allowed to implement their own aggregation functions.
*
* The class now includes a map of parameters, which can store various
* configuration options for the aggregation function. This allows for
* more flexibility in defining and customizing aggregations.
*
* Common parameters might include:
* - window: Defines the time window for aggregation
*
* Example usage:
* Map<String, String> params = new HashMap<>();
* params.put("window", "5m");
* AggInfo aggInfo = new AggInfo("rate", params);
*/
public class AggInfo {
private final String _aggFunction;
private final Map<String, String> _params;

@JsonCreator
public AggInfo(@JsonProperty("aggFunction") String aggFunction) {
public AggInfo(@JsonProperty("aggFunction") String aggFunction, @JsonProperty("params") Map<String, String> params) {
raghavyadav01 marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo");
_aggFunction = aggFunction;
_params = params != null ? params : Collections.emptyMap();
}

public String getAggFunction() {
return _aggFunction;
}

public Map<String, String> getParams() {
return Collections.unmodifiableMap(_params);
}

public String getParam(String key) {
return _params.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.testng.annotations.Test;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;


public class LeafTimeSeriesPlanNodeTest {
Expand All @@ -42,34 +42,34 @@ public void testGetEffectiveFilter() {
final String nonEmptyFilter = "cityName = 'Chicago'";
// Case-1: No offset, and empty filter.
{
LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN,
TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM"),
Collections.singletonList("cityName"));
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime >= " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter);
}
// Case-2: Offset, but empty filter
{
LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN,
TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM"),
Collections.singletonList("cityName"));
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime >= " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123));
}
// Case-3: Offset and non-empty filter
{
LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN,
TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"),
Collections.singletonList("cityName"));
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
"value_col", new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter,
(expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123)));
}
// Case-4: Offset, and non-empty filter, and time-unit that is not seconds
{
LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN,
TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"),
Collections.singletonList("cityName"));
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter,
(expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@
package org.apache.pinot.tsdb.spi.plan.serde;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.testng.annotations.Test;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class TimeSeriesPlanSerdeTest {
@Test
public void testSerdeForScanFilterProjectNode() {
LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode(
"sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS,
0L, "myFilterExpression", "myValueExpression",
new AggInfo("SUM"), new ArrayList<>()
);
Map<String, String> aggParams = new HashMap<>();
aggParams.put("window", "5m");

LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L,
"myFilterExpression", "myValueExpression", new AggInfo("SUM", aggParams), new ArrayList<>());
BaseTimeSeriesPlanNode planNode =
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
Expand All @@ -47,6 +52,8 @@ public void testSerdeForScanFilterProjectNode() {
assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
assertNotNull(deserializedNode.getAggInfo());
assertNotNull(deserializedNode.getAggInfo().getParams());
assertEquals(deserializedNode.getAggInfo().getParams().get("window"), "5m");
assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
}
}
Loading