Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
72bc838
prototype a simple reservoir for timers
maciuszek Jan 8, 2025
f67aca4
clarify comment
maciuszek Jan 8, 2025
5cc34d9
Move s.timerCount inside timer.Range
bshramin Jan 8, 2025
f7fc55e
fix test messages
maciuszek Jan 8, 2025
4a0662a
fix spelling
maciuszek Jan 8, 2025
f35471d
Merge remote-tracking branch 'origin/master' into mattkuzminski/add-t…
maciuszek Jan 10, 2025
57ef42b
re-design with timer reservoirs correctly independent per mertic
maciuszek Jan 10, 2025
4610f55
add some more todos
maciuszek Jan 10, 2025
cc908b5
clean up redundant code
maciuszek Jan 10, 2025
b1a2def
some more clean up
maciuszek Jan 11, 2025
8eb942d
address todos
maciuszek Jan 13, 2025
5dd8757
fix comment
maciuszek Jan 13, 2025
0d3fb45
ensure memory and flush management for timers
maciuszek Jan 13, 2025
ea5ae6a
optimize reservoirTimer by utilizing a ring buffer
maciuszek Jan 14, 2025
e81d603
correct how we flush reusable timer entries
maciuszek Jan 15, 2025
74a26a1
add test for reused timer map after flushing
maciuszek Jan 15, 2025
6d2687c
correct the ring buffer implementation to utilize bitwise benefits
maciuszek Jan 15, 2025
d067744
improve reservoirTimer property access
maciuszek Jan 15, 2025
7e5a451
make reservoir tests more dynamic
maciuszek Jan 16, 2025
a54db1a
improve comments
maciuszek Jan 16, 2025
18c0e57
optimize reservoir timer flush
maciuszek Jan 16, 2025
bf0ef63
block never flush edge cases when stores are constructed outside of N…
maciuszek Jan 16, 2025
8dad5ed
fix typo in comment
maciuszek Jan 16, 2025
9762152
add test for reservoir automatic flushing
maciuszek Jan 16, 2025
2641924
add test for concurrent reservoir writes and flushing
maciuszek Jan 16, 2025
858a3fd
fix typo in comment
maciuszek Jan 16, 2025
4e9611d
protect writes while flushing
maciuszek Jan 16, 2025
70cc61c
dont export controls that can result in a deadlock or datarace
maciuszek Jan 16, 2025
b352a4f
add critical optimization todo
maciuszek Jan 16, 2025
ef8cf0b
simplify reservoir processing
maciuszek Jan 17, 2025
8cceead
unexport RingSize and document immutability
maciuszek Jan 17, 2025
4cae9ac
print to stdout for testing
maciuszek Jan 17, 2025
7fe2893
improve test logging
maciuszek Jan 17, 2025
c2738fa
temporarily make logging a bit better at the sacrifice of performance
maciuszek Jan 20, 2025
236e2cc
remove test logging
maciuszek Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) {
s.FlushTimer(name, value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
6 changes: 6 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to
// stat name.
func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) {
s.FlushTimer(name, val)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
21 changes: 15 additions & 6 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,24 @@ func (s *netSink) FlushGauge(name string, value uint64) {
}

func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
s.flushFloatOptimized(name, "|ms\n", value)
}

func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
// todo: this can be further optimized by strconv.AppendFloat directly to the buffer in flush(Uint|Float)64 however we would need more conditions or code duplication
suffix := "|ms|@" + strconv.FormatFloat(sampleRate, 'f', 2, 64) + "\n" // todo: deteremine how many decimal places we need
s.flushFloatOptimized(name, suffix, value)
}

func (s *netSink) flushFloatOptimized(name, suffix string, value float64) {
// Since we historitically used floating point values to represent time
// durations, metrics (particularly timers) are often recorded as an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
// floats so use integer formatting whenever possible.
//
// floats, so we should convert to an integer whenever possible.
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
s.flushUint64(name, "|ms\n", uint64(value))
s.flushUint64(name, suffix, uint64(value))
} else {
s.flushFloat64(name, "|ms\n", value)
s.flushFloat64(name, suffix, value)
}
}

Expand Down
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
8 changes: 8 additions & 0 deletions net_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ func (s *netTestSink) String() string {
return str
}

func (s *netTestSink) Pull() string {
s.mu.Lock()
str := s.buf.String()
s.buf.Reset()
s.mu.Unlock()
return str
}

func (s *netTestSink) Host(t testing.TB) string {
t.Helper()
host, _, err := net.SplitHostPort(s.conn.Address().String())
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushAggregatedTimer(name string, value, sampleRate float64)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this we'll need a new major release.

The library usage should be backwards compatible, and additionally there is a feature flag to control the new behaviour, however if anything is implementing this interface it'll break.

}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
164 changes: 153 additions & 11 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"math/bits"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -214,7 +215,10 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets
}
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
Expand Down Expand Up @@ -298,30 +302,130 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
GetValue(int) float64
ValueCount() int
SampleRate() float64
Reset()
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) GetValue(_ int) float64 {
return 0.0 // since we flush right away nothing will be collected
}

func (t *standardTimer) ValueCount() int {
return 0 // since we flush right away nothing will be collected
}

func (t *standardTimer) SampleRate() float64 {
return 1.0 // metrics which are not sampled have an implicit sample rate 1.0
}

// nothing to persisted in memroy for this timer
func (t *standardTimer) Reset() {}

type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
ringSize int
ringMask int
values []float64
count int
overflow int
}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

t.values[t.overflow&t.ringMask] = value
t.overflow++

// todo: can i optimize this with xor?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to reset the overflow since you're masking (taking only the lower bits is equivalent to a modulo operation). Also using an unsigned int here would be a bit safer / clearer (makes the value always positive and unsigned overflow is well defined).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this implementation here is wrong

if t.overflow == t.ringSize {
t.overflow = 0
}

t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) GetValue(index int) float64 {
t.mu.Lock()
defer t.mu.Unlock()

return t.values[index]
}

func (t *reservoirTimer) ValueCount() int {
t.mu.Lock() // todo: could probably convert locks like this to atomic.LoadUint64
defer t.mu.Unlock()

if t.count > t.ringSize {
return t.ringSize
}
return t.count
}

func (t *reservoirTimer) SampleRate() float64 {
t.mu.Lock()
defer t.mu.Unlock()

// todo: a 0 count should probably not be a 1.0 sample rate
if t.count <= t.ringSize {
return 1.0
}
return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it?
}

func (t *reservoirTimer) Reset() {
t.mu.Lock()
defer t.mu.Unlock()

t.count = 0 // this will imply a 0.0 sample rate until it's increased
t.overflow = 0
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -336,6 +440,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
// these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process
counters sync.Map
gauges sync.Map
timers sync.Map
Expand All @@ -344,6 +449,8 @@ type statStore struct {
statGenerators []StatGenerator

sink Sink

conf Settings
}

var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true}
Expand Down Expand Up @@ -393,6 +500,21 @@ func (s *statStore) Flush() {
return true
})

s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
sampleRate := timer.SampleRate()

// since the map memory is reused only process what we accumulated in the current processing itteration
for i := 0; i < timer.ValueCount(); i++ {
s.sink.FlushAggregatedTimer(key.(string), timer.GetValue(i), sampleRate)
}

timer.Reset()
}

return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +612,34 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}
t := &timer{name: serializedName, sink: s.sink, base: base}

var t timer
if s.conf.isTimerReservoirEnabled() {
capacity := s.conf.TimerReservoirSize
capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity))
t = &reservoirTimer{
name: serializedName,
base: base,
ringSize: capacity,
ringMask: capacityRoundedToTheNextPowerOfTwo - 1,
values: make([]float64, capacity),
}
} else {
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

return t
}

Expand Down
Loading
Loading