Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT LAND] trying amortizing allocs in ReverseMatch #4306

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/metrics/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ type TagMatchOptions struct {
// Function to extract name and tags from an id.
NameAndTagsFn id.NameAndTagsFn

// Function to get a sorted tag iterator from id tags.
// The caller of Matches is the owner of the Iterator and is responsible for closing it, this allows reusing the
// same Iterator across many Matches.
SortedTagIteratorFn id.SortedTagIteratorFn
// SortedTagIterator is an iterator for use in interpreting an []byte ID.
// Match functions will mutate this iterator. The caller passes this down in order to allow reuse of
// the iterator across calls, without pooling (which was expensive in previous versions of this code).
// N.B.: this is an iteration of https://github.com/m3db/m3/pull/3988.
SortedTagIterator id.SortedTagIterator
}

type filter interface {
Expand Down
9 changes: 7 additions & 2 deletions src/metrics/filters/mock_filter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/metrics/filters/tags_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ func (f *tagsFilter) String() string {
return buf.String()
}

func (f *tagsFilter) Matches(id []byte, opts TagMatchOptions) (bool, error) {
func (f *tagsFilter) Matches(idBytes []byte, opts TagMatchOptions) (bool, error) {
if f.nameFilter == nil && len(f.tagFilters) == 0 {
return true, nil
}

name, tags, err := opts.NameAndTagsFn(id)
name, tags, err := opts.NameAndTagsFn(idBytes)
if err != nil {
return false, err
}
Expand All @@ -205,7 +205,8 @@ func (f *tagsFilter) Matches(id []byte, opts TagMatchOptions) (bool, error) {
}
}

iter := opts.SortedTagIteratorFn(tags)
iter := opts.SortedTagIterator
iter.Reset(tags)

currIdx := 0

Expand Down
82 changes: 52 additions & 30 deletions src/metrics/rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type activeRuleSet struct {
includeTagKeys map[uint64]struct{}
}

type optimizedMatchOptions struct {
NameAndTagsFn metricid.NameAndTagsFn
SortedTagIterator metricid.SortedTagIterator
}

func newActiveRuleSet(
version int,
mappingRules []*mappingRule,
Expand Down Expand Up @@ -111,7 +116,12 @@ func (as *activeRuleSet) ForwardMatch(
fromNanos, toNanos int64,
opts MatchOptions,
) (MatchResult, error) {
currMatchRes, err := as.forwardMatchAt(id.Bytes(), fromNanos, opts)
optimizedOpts := optimizedMatchOptions{
NameAndTagsFn: opts.NameAndTagsFn,
SortedTagIterator: opts.SortedTagIteratorFn(nil),
}

currMatchRes, err := as.forwardMatchAt(id.Bytes(), fromNanos, optimizedOpts)
if err != nil {
return MatchResult{}, err
}
Expand All @@ -124,7 +134,7 @@ func (as *activeRuleSet) ForwardMatch(
)

for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
nextMatchRes, err := as.forwardMatchAt(id.Bytes(), nextCutoverNanos, opts)
nextMatchRes, err := as.forwardMatchAt(id.Bytes(), nextCutoverNanos, optimizedOpts)
if err != nil {
return MatchResult{}, err
}
Expand Down Expand Up @@ -164,6 +174,11 @@ func (as *activeRuleSet) ReverseMatch(
keepOriginal bool
)

optimizedMatchOpts := optimizedMatchOptions{
NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn,
SortedTagIterator: as.tagsFilterOpts.SortedTagIteratorFn(nil),
}

// Determine whether the ID is a rollup metric ID.
name, tags, err := as.tagsFilterOpts.NameAndTagsFn(id.Bytes())
if err == nil {
Expand All @@ -180,6 +195,7 @@ func (as *activeRuleSet) ReverseMatch(
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
optimizedMatchOpts,
)
if err != nil {
return MatchResult{}, err
Expand All @@ -202,6 +218,7 @@ func (as *activeRuleSet) ReverseMatch(
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
optimizedMatchOpts,
)
if err != nil {
return MatchResult{}, err
Expand Down Expand Up @@ -229,13 +246,13 @@ func (as *activeRuleSet) ReverseMatch(
func (as *activeRuleSet) forwardMatchAt(
id []byte,
timeNanos int64,
matchOpts MatchOptions,
optimizedMatchOpts optimizedMatchOptions,
) (forwardMatchResult, error) {
mappingResults, err := as.mappingsForNonRollupID(id, timeNanos, matchOpts)
mappingResults, err := as.mappingsForNonRollupID(id, timeNanos, optimizedMatchOpts)
if err != nil {
return forwardMatchResult{}, err
}
rollupResults, err := as.rollupResultsFor(id, timeNanos, matchOpts)
rollupResults, err := as.rollupResultsFor(id, timeNanos, optimizedMatchOpts)
if err != nil {
return forwardMatchResult{}, err
}
Expand Down Expand Up @@ -263,7 +280,7 @@ func (as *activeRuleSet) forwardMatchAt(
func (as *activeRuleSet) mappingsForNonRollupID(
id []byte,
timeNanos int64,
matchOpts MatchOptions,
optimizedMatchOpts optimizedMatchOptions,
) (mappingResults, error) {
var (
cutoverNanos int64
Expand All @@ -275,8 +292,9 @@ func (as *activeRuleSet) mappingsForNonRollupID(
continue
}
matches, err := snapshot.filter.Matches(id, filters.TagMatchOptions{
SortedTagIteratorFn: matchOpts.SortedTagIteratorFn,
NameAndTagsFn: matchOpts.NameAndTagsFn,
NameAndTagsFn: optimizedMatchOpts.NameAndTagsFn,
// TODO: this needs the change to not use the alloc
SortedTagIterator: optimizedMatchOpts.SortedTagIterator,
})
if err != nil {
return mappingResults{}, err
Expand Down Expand Up @@ -336,7 +354,11 @@ func (as *activeRuleSet) LatestRollupRules(_ []byte, timeNanos int64) ([]view.Ro
return out, nil
}

func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts MatchOptions) (rollupResults, error) {
func (as *activeRuleSet) rollupResultsFor(
id []byte,
timeNanos int64,
optimizedMatchOpts optimizedMatchOptions,
) (rollupResults, error) {
var (
cutoverNanos int64
rollupTargets []rollupTarget
Expand All @@ -350,8 +372,9 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts
continue
}
match, err := snapshot.filter.Matches(id, filters.TagMatchOptions{
NameAndTagsFn: matchOpts.NameAndTagsFn,
SortedTagIteratorFn: matchOpts.SortedTagIteratorFn,
// TODO: same comment, different line
NameAndTagsFn: optimizedMatchOpts.NameAndTagsFn,
SortedTagIterator: optimizedMatchOpts.SortedTagIterator,
})
if err != nil {
return rollupResults{}, err
Expand Down Expand Up @@ -382,7 +405,7 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts
}
}
// NB: could log the matching error here if needed.
res, _ := as.toRollupResults(id, cutoverNanos, rollupTargets, keepOriginal, tags, matchOpts)
res, _ := as.toRollupResults(id, cutoverNanos, rollupTargets, keepOriginal, tags, optimizedMatchOpts)
return res, nil
}

Expand All @@ -399,15 +422,15 @@ func (as *activeRuleSet) toRollupResults(
targets []rollupTarget,
keepOriginal bool,
tags [][]models.Tag,
matchOpts MatchOptions,
optimizedMatchOpts optimizedMatchOptions,
) (rollupResults, error) {
if len(targets) == 0 {
return rollupResults{}, nil
}

// If we cannot extract tags from the id, this is likely an invalid
// metric and we bail early.
_, sortedTagPairBytes, err := matchOpts.NameAndTagsFn(id)
_, sortedTagPairBytes, err := optimizedMatchOpts.NameAndTagsFn(id)
if err != nil {
return rollupResults{}, err
}
Expand Down Expand Up @@ -456,7 +479,7 @@ func (as *activeRuleSet) toRollupResults(
tagPairs,
tags[idx],
matchRollupTargetOptions{generateRollupID: true},
matchOpts)
optimizedMatchOpts)
if err != nil {
multiErr = multiErr.Add(err)
continue
Expand All @@ -473,7 +496,7 @@ func (as *activeRuleSet) toRollupResults(
continue
}
tagPairs = tagPairs[:0]
applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs, tags[idx], matchOpts)
applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs, tags[idx], optimizedMatchOpts)
if err != nil {
err = fmt.Errorf("failed to apply id %s to pipeline %v: %v", id, toApply, err)
multiErr = multiErr.Add(err)
Expand Down Expand Up @@ -518,7 +541,7 @@ func (as *activeRuleSet) matchRollupTarget(
tagPairs []metricid.TagPair, // buffer for reuse to generate rollup ID across calls
tags []models.Tag,
targetOpts matchRollupTargetOptions,
matchOpts MatchOptions,
optimizedMatchOpts optimizedMatchOptions,
) ([]byte, bool, error) {
if rollupOp.Type == mpipeline.ExcludeByRollupType && !targetOpts.generateRollupID {
// Exclude by tag always matches, if not generating rollup ID
Expand All @@ -528,12 +551,13 @@ func (as *activeRuleSet) matchRollupTarget(

var (
rollupTags = rollupOp.Tags
sortedTagIter = matchOpts.SortedTagIteratorFn(sortedTagPairBytes)
matchTagIdx = 0
sortedTagIter = optimizedMatchOpts.SortedTagIterator
nameTagName = as.tagsFilterOpts.NameTagKey
nameTagValue []byte
includeTagNames = as.includeTagKeys
)
sortedTagIter.Reset(sortedTagPairBytes)

switch rollupOp.Type {
case mpipeline.GroupByRollupType:
Expand Down Expand Up @@ -653,7 +677,7 @@ func (as *activeRuleSet) applyIDToPipeline(
pipeline mpipeline.Pipeline,
tagPairs []metricid.TagPair, // buffer for reuse across calls
tags []models.Tag,
matchOpts MatchOptions,
optimizedMatchOpts optimizedMatchOptions,
) (applied.Pipeline, error) {
operations := make([]applied.OpUnion, 0, pipeline.Len())
for i := 0; i < pipeline.Len(); i++ {
Expand All @@ -674,7 +698,7 @@ func (as *activeRuleSet) applyIDToPipeline(
tagPairs,
tags,
matchRollupTargetOptions{generateRollupID: true},
matchOpts)
optimizedMatchOpts)
if err != nil {
return applied.Pipeline{}, err
}
Expand All @@ -694,6 +718,7 @@ func (as *activeRuleSet) applyIDToPipeline(
return applied.NewPipeline(operations), nil
}

// amainsd: hi
func (as *activeRuleSet) reverseMappingsFor(
id, name, tags []byte,
isRollupID bool,
Expand All @@ -702,11 +727,12 @@ func (as *activeRuleSet) reverseMappingsFor(
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
optimizedMatchOpts optimizedMatchOptions,
) (reverseMatchResult, bool, error) {
if !isRollupID {
return as.reverseMappingsForNonRollupID(id, timeNanos, mt, at, aggTypesOpts)
return as.reverseMappingsForNonRollupID(id, timeNanos, mt, at, aggTypesOpts, optimizedMatchOpts)
}
return as.reverseMappingsForRollupID(name, tags, timeNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
return as.reverseMappingsForRollupID(name, tags, timeNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts, optimizedMatchOpts)
}

type reverseMatchResult struct {
Expand All @@ -722,11 +748,9 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID(
mt metric.Type,
at aggregation.Type,
aggTypesOpts aggregation.TypesOptions,
optimizedMatchOpts optimizedMatchOptions,
) (reverseMatchResult, bool, error) {
mapping, err := as.mappingsForNonRollupID(id, timeNanos, MatchOptions{
NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn,
SortedTagIteratorFn: as.tagsFilterOpts.SortedTagIteratorFn,
})
mapping, err := as.mappingsForNonRollupID(id, timeNanos, optimizedMatchOpts)
if err != nil {
return reverseMatchResult{}, false, err
}
Expand Down Expand Up @@ -769,6 +793,7 @@ func (as *activeRuleSet) reverseMappingsForRollupID(
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
optimizedMatchOpts optimizedMatchOptions,
) (reverseMatchResult, bool, error) {
for _, rollupRule := range as.rollupRules {
snapshot := rollupRule.activeSnapshot(timeNanos)
Expand All @@ -792,10 +817,7 @@ func (as *activeRuleSet) reverseMappingsForRollupID(
nil,
nil,
matchRollupTargetOptions{generateRollupID: false},
MatchOptions{
NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn,
SortedTagIteratorFn: as.tagsFilterOpts.SortedTagIteratorFn,
},
optimizedMatchOpts,
)
if err != nil {
return reverseMatchResult{}, false, err
Expand Down
7 changes: 7 additions & 0 deletions src/metrics/rules/active_ruleset_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package rules

import "testing"

func BenchmarkActiveRuleSet(b *testing.B) {

}