Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) {
s.FlushTimer(name, value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
6 changes: 6 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to
// stat name.
func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) {
s.FlushTimer(name, val)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
20 changes: 14 additions & 6 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,23 @@ func (s *netSink) FlushGauge(name string, value uint64) {
}

func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
s.flushFloatOptimized(name, "|ms\n", value)
}

func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For performance reasons it would be great to avoid fmt.Sprintf here if possible.

s.flushFloatOptimized(name, suffix, value)
}

func (s *netSink) flushFloatOptimized(name, suffix string, value float64) {
// Since we historitically used floating point values to represent time
// durations, metrics (particularly timers) are often recorded as an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
// floats so use integer formatting whenever possible.
//
// floats, so we should convert to an integer whenever possible.
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
s.flushUint64(name, "|ms\n", uint64(value))
s.flushUint64(name, suffix, uint64(value))
} else {
s.flushFloat64(name, "|ms\n", value)
s.flushFloat64(name, suffix, value)
}
}

Expand Down
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushAggregatedTimer(name string, value, sampleRate float64)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this we'll need a new major release.

The library usage should be backwards compatible, and additionally there is a feature flag to control the new behaviour, however if anything is implementing this interface it'll break.

}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
134 changes: 123 additions & 11 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,30 +298,100 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
CollectedValue() []float64
SampleRate() float64
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) CollectedValue() []float64 {
return nil
}

func (t *standardTimer) SampleRate() float64 {
return 0.0 // todo: using zero value of float64. the correct value would be 1.0 given 1 stat, hwoever that 1 stat is never stored, just flushed right away
}

type reservoirTimer struct {
base time.Duration
name string
capacity int
values []float64
fill int // todo: the only purpose of this is to be faster than calculating len(values), is it worht it?
count int
mu sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, make the Mutex the first field (it's advantageous performance wise to make the most accessed field first)

}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// todo: consider edge cases for <
if t.fill < t.capacity {
t.values = append(t.values, value)
} else {
// todo: discarding the oldest value when the reference is full, this can probably be smarter
t.values = append(t.values[1:], value)
t.fill--
}

t.fill++
t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) CollectedValue() []float64 {
t.mu.Lock()
defer t.mu.Unlock()

// todo: Return a copy of the values slice to avoid data races
valuesCopy := make([]float64, len(t.values))
copy(valuesCopy, t.values)
return valuesCopy
}

func (t *reservoirTimer) SampleRate() float64 {
return float64(t.fill) / float64(t.count) // todo: is it faster to store these values as float64 instead of converting here
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -338,7 +408,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
type statStore struct {
counters sync.Map
gauges sync.Map
timers sync.Map
timers sync.Map // todo: should be control this count, especially for reservoirs we will be storing a lot of these in memory before flushing them

mu sync.RWMutex
statGenerators []StatGenerator
Expand Down Expand Up @@ -393,6 +463,23 @@ func (s *statStore) Flush() {
return true
})

settings := GetSettings() // todo: move this to some shared memory
// todo: i'm not sure not sure if we need a condition here or there's another way to assume this implicitly but since to my understanding s.timers
// will retain/store data even if it's unused in the case of standardTimer. in any case this should provide some optimization
if settings.isTimerReservoirEnabled() {
s.timers.Range(func(key, v interface{}) bool {
timer := v.(timer)
sampleRate := timer.SampleRate()
// CollectedValue() should be nil unless reservoirTimer
for _, value := range timer.CollectedValue() {
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate)
}

s.timers.Delete(key) // todo: not sure if this cleanup is necessary
return true
})
}

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +577,39 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}

var t timer
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
// todo: if s.timers gets to a certain size, we can flush all timers and delete them from the map
// todo: no idea how memory was managed here before did we just expect the map of s.timers to just be replaced after it's filled?

// todo: have defaults defined in a shared location
t = &reservoirTimer{
name: serializedName,
base: base,
capacity: 100,
values: make([]float64, 0, 100),
fill: 0,
count: 0,
}
} else {
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}
t := &timer{name: serializedName, sink: s.sink, base: base}

// todo: why would the timer ever be replaced, will this hurt reservoirs or benefit them? or is it just redundant since we load above?
if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

return t
}

Expand Down
Loading
Loading