Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Revert eager forwarding logic #148

Merged
merged 4 commits into from
Jul 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -200,7 +201,7 @@ func (agg *aggregator) AddForwarded(
callEnd := agg.nowFn()
agg.metrics.addForwarded.ReportSuccess(callEnd.Sub(callStart))
forwardingDelay := time.Duration(callEnd.UnixNano() - metric.TimeNanos)
agg.metrics.addForwarded.latencies.RecordDuration(
agg.metrics.addForwarded.ReportForwardingLatency(
metadata.StoragePolicy.Resolution().Window,
metadata.NumForwardedTimes,
forwardingDelay,
Expand Down Expand Up @@ -661,46 +662,87 @@ func (m *aggregatorAddUntimedMetrics) ReportError(err error) {
m.aggregatorAddMetricMetrics.ReportError(err)
}

type latencyBucketKey struct {
resolution time.Duration
numForwardedTimes int
}

type aggregatorAddForwardedMetrics struct {
sync.RWMutex
aggregatorAddMetricMetrics

latencies *ForwardingLatencyHistograms
scope tally.Scope
maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn
forwardingLatency map[latencyBucketKey]tally.Histogram
}

func newAggregatorAddForwardedMetrics(
scope tally.Scope,
samplingRate float64,
maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn,
) aggregatorAddForwardedMetrics {
latencyScope := scope.Tagged(map[string]string{"latency-type": "current"})
bucketsFn := func(key ForwardingLatencyBucketKey, numLatencyBuckets int) tally.Buckets {
maxForwardingDelayAllowed := maxAllowedForwardingDelayFn(key.Resolution, key.NumForwardedTimes)
latencyBucketSize := maxForwardingDelayAllowed * 2 / time.Duration(numLatencyBuckets)
return tally.MustMakeLinearDurationBuckets(0, latencyBucketSize, numLatencyBuckets)
}
return aggregatorAddForwardedMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
latencies: NewForwardingLatencyHistograms(latencyScope, bucketsFn),
scope: scope,
maxAllowedForwardingDelayFn: maxAllowedForwardingDelayFn,
forwardingLatency: make(map[latencyBucketKey]tally.Histogram),
}
}

type tickMetricsForIncomingMetricType struct {
func (m *aggregatorAddForwardedMetrics) ReportForwardingLatency(
resolution time.Duration,
numForwardedTimes int,
duration time.Duration,
) {
key := latencyBucketKey{
resolution: resolution,
numForwardedTimes: numForwardedTimes,
}
m.RLock()
histogram, exists := m.forwardingLatency[key]
m.RUnlock()
if exists {
histogram.RecordDuration(duration)
return
}
m.Lock()
histogram, exists = m.forwardingLatency[key]
if exists {
m.Unlock()
histogram.RecordDuration(duration)
return
}
maxForwardingDelayAllowed := m.maxAllowedForwardingDelayFn(resolution, numForwardedTimes)
maxLatencyBucketLimit := maxForwardingDelayAllowed * maxLatencyBucketLimitScaleFactor
latencyBucketSize := maxLatencyBucketLimit / time.Duration(numLatencyBuckets)
latencyBuckets := tally.MustMakeLinearDurationBuckets(0, latencyBucketSize, numLatencyBuckets)
histogram = m.scope.Tagged(map[string]string{
"bucket-version": strconv.Itoa(latencyBucketVersion),
"resolution": resolution.String(),
"num-forwarded-times": strconv.Itoa(numForwardedTimes),
}).Histogram("forwarding-latency", latencyBuckets)
m.forwardingLatency[key] = histogram
m.Unlock()
histogram.RecordDuration(duration)
}

type tickMetricsForMetricCategory struct {
scope tally.Scope
activeEntries tally.Gauge
expiredEntries tally.Counter
activeElems map[time.Duration]tally.Gauge
}

func newTickMetricsForIncomingMetricType(scope tally.Scope) tickMetricsForIncomingMetricType {
return tickMetricsForIncomingMetricType{
func newTickMetricsForMetricCategory(scope tally.Scope) tickMetricsForMetricCategory {
return tickMetricsForMetricCategory{
scope: scope,
activeEntries: scope.Gauge("active-entries"),
expiredEntries: scope.Counter("expired-entries"),
activeElems: make(map[time.Duration]tally.Gauge),
}
}

func (m tickMetricsForIncomingMetricType) Report(tickResult tickResultForIncomingMetricType) {
func (m tickMetricsForMetricCategory) Report(tickResult tickResultForMetricCategory) {
m.activeEntries.Update(float64(tickResult.activeEntries))
m.expiredEntries.Inc(int64(tickResult.expiredEntries))
for dur, val := range tickResult.activeElems {
Expand All @@ -718,8 +760,8 @@ func (m tickMetricsForIncomingMetricType) Report(tickResult tickResultForIncomin
type aggregatorTickMetrics struct {
flushTimesErrors tally.Counter
duration tally.Timer
standard tickMetricsForIncomingMetricType
forwarded tickMetricsForIncomingMetricType
standard tickMetricsForMetricCategory
forwarded tickMetricsForMetricCategory
}

func newAggregatorTickMetrics(scope tally.Scope) aggregatorTickMetrics {
Expand All @@ -728,8 +770,8 @@ func newAggregatorTickMetrics(scope tally.Scope) aggregatorTickMetrics {
return aggregatorTickMetrics{
flushTimesErrors: scope.Counter("flush-times-errors"),
duration: scope.Timer("duration"),
standard: newTickMetricsForIncomingMetricType(standardScope),
forwarded: newTickMetricsForIncomingMetricType(forwardedScope),
standard: newTickMetricsForMetricCategory(standardScope),
forwarded: newTickMetricsForMetricCategory(forwardedScope),
}
}

Expand Down Expand Up @@ -850,3 +892,9 @@ const (
)

type sleepFn func(d time.Duration)

const (
latencyBucketVersion = 2
numLatencyBuckets = 40
maxLatencyBucketLimitScaleFactor = 2
)
Loading