diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 9d2c1a2c2c..5ea1949431 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -157,6 +157,25 @@ func sumSeriesWithWildcards( return combineSeriesWithWildcards(ctx, series, positions, sumSpecificationFunc, ts.Sum) } +// aggregateWithWildcards splits the given set of series into sub-groupings +// based on wildcard matches in the hierarchy, then aggregate the values in +// each grouping based on the given function. +func aggregateWithWildcards( + ctx *common.Context, + series singlePathSpec, + fname string, + positions ...int, +) (ts.SeriesList, error) { + f, fexists := summarizeFuncs[fname] + if !fexists { + err := errors.NewInvalidParamsError(fmt.Errorf( + "invalid func %s", fname)) + return ts.NewSeriesList(), err + } + + return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc) +} + // combineSeriesWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then combines the values in each // sub-grouping according to the provided consolidation function diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index fae525aa30..cb19232d5b 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -331,6 +331,56 @@ func TestSumSeriesWithWildcards(t *testing.T) { } } +func TestAggregateWithWildcards(t *testing.T) { + var ( + start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") + end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") + ctx = common.NewContext(common.ContextOptions{Start: start, End: end}) + inputs = []*ts.Series{ + ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start, + ts.NewConstantValues(ctx, 2, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start, + ts.NewConstantValues(ctx, 4, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-3.pod1.status.500", start, + ts.NewConstantValues(ctx, 6, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-1.pod2.status.500", start, + ts.NewConstantValues(ctx, 8, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod2.status.500", start, + ts.NewConstantValues(ctx, 10, 12, 10000)), + + ts.NewSeries(ctx, "servers.foo-1.pod1.status.400", start, + ts.NewConstantValues(ctx, 20, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod1.status.400", start, + ts.NewConstantValues(ctx, 30, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-3.pod2.status.400", start, + ts.NewConstantValues(ctx, 40, 12, 10000)), + } + ) + defer ctx.Close() + + outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{ + Values: inputs, + }, "sum", 1, 2) + require.NoError(t, err) + require.Equal(t, 2, len(outSeries.Values)) + + outSeries, _ = sortByName(ctx, singlePathSpec(outSeries)) + + expectedOutputs := []struct { + name string + sumOfVals float64 + }{ + {"servers.status.400", 90 * 12}, + {"servers.status.500", 30 * 12}, + } + + for i, expected := range expectedOutputs { + series := outSeries.Values[i] + assert.Equal(t, expected.name, series.Name()) + assert.Equal(t, expected.sumOfVals, series.SafeSum()) + } +} + func TestGroupByNode(t *testing.T) { var ( start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index a8541e7c0f..40bce34c6a 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1961,6 +1961,7 @@ func init() { }) MustRegisterFunction(sumSeries) MustRegisterFunction(sumSeriesWithWildcards) + MustRegisterFunction(aggregateWithWildcards) MustRegisterFunction(sustainedAbove) MustRegisterFunction(sustainedBelow) MustRegisterFunction(threshold).WithDefaultParams(map[uint8]interface{}{