Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) {
s.FlushTimer(name, value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
6 changes: 6 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to
// stat name.
func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) {
s.FlushTimer(name, val)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
21 changes: 15 additions & 6 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,24 @@ func (s *netSink) FlushGauge(name string, value uint64) {
}

func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
s.flushFloatOptimized(name, "|ms\n", value)
}

func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
// todo: this can be further optimized by strconv.AppendFloat directly to the buffer in flush(Uint|Float)64 however we would need more conditions or code duplication
suffix := "|ms|@" + strconv.FormatFloat(sampleRate, 'f', 2, 64) + "\n" // todo: deteremine how many decimal places we need
s.flushFloatOptimized(name, suffix, value)
}

func (s *netSink) flushFloatOptimized(name, suffix string, value float64) {
// Since we historitically used floating point values to represent time
// durations, metrics (particularly timers) are often recorded as an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
// floats so use integer formatting whenever possible.
//
// floats, so we should convert to an integer whenever possible.
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
s.flushUint64(name, "|ms\n", uint64(value))
s.flushUint64(name, suffix, uint64(value))
} else {
s.flushFloat64(name, "|ms\n", value)
s.flushFloat64(name, suffix, value)
}
}

Expand Down
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
8 changes: 8 additions & 0 deletions net_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ func (s *netTestSink) String() string {
return str
}

func (s *netTestSink) Pull() string {
s.mu.Lock()
str := s.buf.String()
s.buf.Reset()
s.mu.Unlock()
return str
}

func (s *netTestSink) Host(t testing.TB) string {
t.Helper()
host, _, err := net.SplitHostPort(s.conn.Address().String())
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushAggregatedTimer(name string, value, sampleRate float64)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this we'll need a new major release.

The library usage should be backwards compatible, and additionally there is a feature flag to control the new behaviour, however if anything is implementing this interface it'll break.

}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
164 changes: 153 additions & 11 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"math/bits"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -214,7 +215,10 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets
}
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
Expand Down Expand Up @@ -298,30 +302,130 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
GetValue(int) float64
ValueCount() int
SampleRate() float64
Reset()
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) GetValue(_ int) float64 {
return 0.0 // since we flush right away nothing will be collected
}

func (t *standardTimer) ValueCount() int {
return 0 // since we flush right away nothing will be collected
}

func (t *standardTimer) SampleRate() float64 {
return 1.0 // metrics which are not sampled have an implicit sample rate 1.0
}

// nothing to persisted in memroy for this timer
func (t *standardTimer) Reset() {}

type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
ringSize int
ringMask int
values []float64
count int
overflow int
}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

t.values[t.overflow&t.ringMask] = value
t.overflow++

// todo: can i optimize this with xor?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to reset the overflow since you're masking (taking only the lower bits is equivalent to a modulo operation). Also using an unsigned int here would be a bit safer / clearer (makes the value always positive and unsigned overflow is well defined).

if t.overflow == t.ringSize {
t.overflow = 0
}

t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) GetValue(index int) float64 {
t.mu.Lock()
defer t.mu.Unlock()

return t.values[index]
}

func (t *reservoirTimer) ValueCount() int {
t.mu.Lock() // todo: could probably convert locks like this to atomic.LoadUint64
defer t.mu.Unlock()

if t.count > t.ringSize {
return t.ringSize
}
return t.count
}

func (t *reservoirTimer) SampleRate() float64 {
t.mu.Lock()
defer t.mu.Unlock()

// todo: a 0 count should probably not be a 1.0 sample rate
if t.count <= t.ringSize {
return 1.0
}
return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it?
}

func (t *reservoirTimer) Reset() {
t.mu.Lock()
defer t.mu.Unlock()

t.count = 0 // this will imply a 0.0 sample rate until it's increased
t.overflow = 0
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -336,6 +440,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
// these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process
counters sync.Map
gauges sync.Map
timers sync.Map
Expand All @@ -344,6 +449,8 @@ type statStore struct {
statGenerators []StatGenerator

sink Sink

conf Settings
}

var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true}
Expand Down Expand Up @@ -393,6 +500,21 @@ func (s *statStore) Flush() {
return true
})

s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
sampleRate := timer.SampleRate()

// since the map memory is reused only process what we accumulated in the current processing itteration
for i := 0; i < timer.ValueCount(); i++ {
s.sink.FlushAggregatedTimer(key.(string), timer.GetValue(i), sampleRate)
}

timer.Reset()
}

return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +612,34 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}
t := &timer{name: serializedName, sink: s.sink, base: base}

var t timer
if s.conf.isTimerReservoirEnabled() {
capacity := s.conf.TimerReservoirSize
capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity))
t = &reservoirTimer{
name: serializedName,
base: base,
ringSize: capacity,
ringMask: capacityRoundedToTheNextPowerOfTwo - 1,
values: make([]float64, capacity),
}
} else {
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

return t
}

Expand Down
Loading
Loading