Skip to content

Commit

Permalink
Refactor ReverseMatch signature to allow for iterator reuse (#4305)
Browse files Browse the repository at this point in the history
commit-id:5b2feb4b
  • Loading branch information
andrewmains12 authored Oct 22, 2024
1 parent 6950e31 commit 4feb762
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 78 deletions.
3 changes: 0 additions & 3 deletions src/metrics/filters/tags_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ type TagsFilterOptions struct {

// Function to extract name and tags from an id.
NameAndTagsFn id.NameAndTagsFn

// Function to create a new sorted tag iterator from id tags.
SortedTagIteratorFn id.SortedTagIteratorFn
}

// tagsFilter contains a list of tag filters.
Expand Down
17 changes: 2 additions & 15 deletions src/metrics/matcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/m3db/m3/src/metrics/filters"
"github.com/m3db/m3/src/metrics/matcher/cache"
"github.com/m3db/m3/src/metrics/matcher/namespace"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/id/m3"
"github.com/m3db/m3/src/metrics/rules"
"github.com/m3db/m3/src/x/clock"
Expand Down Expand Up @@ -101,21 +100,9 @@ func (cfg *Configuration) NewOptions(
}

// Configure rules options.
scope := instrumentOpts.MetricsScope().SubScope("sorted-tag-iterator-pool")
poolOpts := cfg.SortedTagIteratorPool.NewObjectPoolOptions(instrumentOpts.SetMetricsScope(scope))
sortedTagIteratorPool := id.NewSortedTagIteratorPool(poolOpts)
sortedTagIteratorPool.Init(func() id.SortedTagIterator {
return m3.NewPooledSortedTagIterator(nil, sortedTagIteratorPool)
})
sortedTagIteratorFn := func(tagPairs []byte) id.SortedTagIterator {
it := sortedTagIteratorPool.Get()
it.Reset(tagPairs)
return it
}
tagsFilterOptions := filters.TagsFilterOptions{
NameTagKey: []byte(cfg.NameTagKey),
NameAndTagsFn: m3.NameAndTags,
SortedTagIteratorFn: sortedTagIteratorFn,
NameTagKey: []byte(cfg.NameTagKey),
NameAndTagsFn: m3.NameAndTags,
}

isRollupIDFn := func(name []byte, tags []byte) bool {
Expand Down
30 changes: 26 additions & 4 deletions src/metrics/matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,27 @@ func (m *matcher) ForwardMatch(

func (m *matcher) ReverseMatch(
id id.ID,
fromNanos, toNanos int64,
fromNanos int64,
toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
matchOpts rules.MatchOptions,
) (rules.MatchResult, error) {
sw := m.metrics.matchLatency.Start()
defer sw.Stop()
// Cache does not support reverse matching.
return m.namespaces.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
return m.namespaces.ReverseMatch(
id,
fromNanos,
toNanos,
mt,
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
matchOpts,
)
}

func (m *matcher) Close() error {
Expand Down Expand Up @@ -156,15 +167,26 @@ func (m *noCacheMatcher) ForwardMatch(

func (m *noCacheMatcher) ReverseMatch(
id id.ID,
fromNanos, toNanos int64,
fromNanos int64,
toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
matchOpts rules.MatchOptions,
) (rules.MatchResult, error) {
sw := m.metrics.matchLatency.Start()
defer sw.Stop()
return m.namespaces.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
return m.namespaces.ReverseMatch(
id,
fromNanos,
toNanos,
mt,
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
matchOpts,
)
}

func (m *noCacheMatcher) Close() error {
Expand Down
10 changes: 5 additions & 5 deletions src/metrics/matcher/matcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions src/metrics/matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,29 @@ func (n *namespaces) ForwardMatch(id id.ID, fromNanos, toNanos int64,

func (n *namespaces) ReverseMatch(
id id.ID,
fromNanos, toNanos int64,
fromNanos int64,
toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
matchOpts rules.MatchOptions,
) (rules.MatchResult, error) {
namespace := n.nsResolver.Resolve(id)
ruleSet, exists := n.ruleSet(namespace)
if !exists {
return rules.EmptyMatchResult, nil
}
return ruleSet.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
return ruleSet.ReverseMatch(
id,
fromNanos,
toNanos,
mt,
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
matchOpts,
)
}

func (n *namespaces) ruleSet(namespace []byte) (RuleSet, bool) {
Expand Down
15 changes: 13 additions & 2 deletions src/metrics/matcher/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,13 @@ func (r *ruleSet) ForwardMatch(id id.ID, fromNanos, toNanos int64, opts rules.Ma

func (r *ruleSet) ReverseMatch(
id id.ID,
fromNanos, toNanos int64,
fromNanos int64,
toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
matchOpts rules.MatchOptions,
) (rules.MatchResult, error) {
callStart := r.nowFn()
r.RLock()
Expand All @@ -196,7 +198,16 @@ func (r *ruleSet) ReverseMatch(
r.metrics.nilMatcher.Inc(1)
return rules.EmptyMatchResult, nil
}
res, err := r.activeSet.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
res, err := r.activeSet.ReverseMatch(
id,
fromNanos,
toNanos,
mt,
at,
isMultiAggregationTypesAllowed,
aggTypesOpts,
matchOpts,
)
r.RUnlock()
if err != nil {
return rules.MatchResult{}, err
Expand Down
136 changes: 118 additions & 18 deletions src/metrics/matcher/ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,16 @@ func TestRuleSetReverseMatchWithMatcher(t *testing.T) {
aggTypesOpts = aggregation.NewTypesOptions()
)

res, err := rs.ReverseMatch(testID("foo"), fromNanos, toNanos, metric.CounterType, aggregation.Sum,
isMultiAggregationTypesAllowed, aggTypesOpts)
res, err := rs.ReverseMatch(
testID("foo"),
fromNanos,
toNanos,
metric.CounterType,
aggregation.Sum,
isMultiAggregationTypesAllowed,
aggTypesOpts,
rules.MatchOptions{},
)
require.NoError(t, err)
require.Equal(t, mockMatcher.res, res)
require.Equal(t, []byte("foo"), mockMatcher.id)
Expand Down Expand Up @@ -148,11 +156,41 @@ func TestToRuleSetSuccess(t *testing.T) {
func TestRuleSetProcessNamespaceNotRegistered(t *testing.T) {
var (
inputs = []rules.RuleSet{
&mockRuleSet{namespace: "ns1", version: 1, cutoverNanos: 1234, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns2", version: 2, cutoverNanos: 1235, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns3", version: 3, cutoverNanos: 1236, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns4", version: 4, cutoverNanos: 1237, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns5", version: 5, cutoverNanos: 1238, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{
namespace: "ns1",
version: 1,
cutoverNanos: 1234,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns2",
version: 2,
cutoverNanos: 1235,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns3",
version: 3,
cutoverNanos: 1236,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns4",
version: 4,
cutoverNanos: 1237,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns5",
version: 5,
cutoverNanos: 1238,
tombstoned: false,
matcher: &mockMatcher{},
},
}
)

Expand All @@ -173,11 +211,41 @@ func TestRuleSetProcessNamespaceNotRegistered(t *testing.T) {
func TestRuleSetProcessStaleUpdate(t *testing.T) {
var (
inputs = []rules.RuleSet{
&mockRuleSet{namespace: "ns1", version: 1, cutoverNanos: 1234, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns2", version: 2, cutoverNanos: 1235, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns3", version: 3, cutoverNanos: 1236, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns4", version: 4, cutoverNanos: 1237, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns5", version: 5, cutoverNanos: 1238, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{
namespace: "ns1",
version: 1,
cutoverNanos: 1234,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns2",
version: 2,
cutoverNanos: 1235,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns3",
version: 3,
cutoverNanos: 1236,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns4",
version: 4,
cutoverNanos: 1237,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns5",
version: 5,
cutoverNanos: 1238,
tombstoned: false,
matcher: &mockMatcher{},
},
}
)

Expand All @@ -203,11 +271,41 @@ func TestRuleSetProcessStaleUpdate(t *testing.T) {
func TestRuleSetProcessSuccess(t *testing.T) {
var (
inputs = []rules.RuleSet{
&mockRuleSet{namespace: "ns1", version: 1, cutoverNanos: 1234, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns2", version: 2, cutoverNanos: 1235, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns3", version: 3, cutoverNanos: 1236, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns4", version: 4, cutoverNanos: 1237, tombstoned: true, matcher: &mockMatcher{}},
&mockRuleSet{namespace: "ns5", version: 5, cutoverNanos: 1238, tombstoned: false, matcher: &mockMatcher{}},
&mockRuleSet{
namespace: "ns1",
version: 1,
cutoverNanos: 1234,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns2",
version: 2,
cutoverNanos: 1235,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns3",
version: 3,
cutoverNanos: 1236,
tombstoned: false,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns4",
version: 4,
cutoverNanos: 1237,
tombstoned: true,
matcher: &mockMatcher{},
},
&mockRuleSet{
namespace: "ns5",
version: 5,
cutoverNanos: 1238,
tombstoned: false,
matcher: &mockMatcher{},
},
}
)

Expand Down Expand Up @@ -261,11 +359,13 @@ func (mm *mockMatcher) ForwardMatch(

func (mm *mockMatcher) ReverseMatch(
id id.ID,
fromNanos, toNanos int64,
fromNanos int64,
toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
matchOpts rules.MatchOptions,
) (rules.MatchResult, error) {
mm.id = id.Bytes()
mm.fromNanos = fromNanos
Expand Down
Loading

0 comments on commit 4feb762

Please sign in to comment.