Skip to content

Commit

Permalink
optimize reservoirTimer by utilizing a ring buffer
Browse files Browse the repository at this point in the history
optimize reservoirTimer by allocating an reusing memory
fix bug with sampleRate calculation
optimize FlushAggregatedTimer a bit
expand and fix test coverage
  • Loading branch information
maciuszek committed Jan 14, 2025
1 parent 0d3fb45 commit ea5ae6a
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 28 deletions.
3 changes: 2 additions & 1 deletion net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ func (s *netSink) FlushTimer(name string, value float64) {
}

func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate)
// todo: this can be further optimized by strconv.AppendFloat directly to the buffer in flush(Uint|Float)64 however we would need more conditions or code duplication
suffix := "|ms|@" + strconv.FormatFloat(sampleRate, 'f', 2, 64) + "\n" // todo: deteremine how many decimal places we need
s.flushFloatOptimized(name, suffix, value)
}

Expand Down
63 changes: 45 additions & 18 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"math/bits"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -216,7 +217,7 @@ type StatGenerator interface {
func NewStore(sink Sink, _ bool) Store {
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets
}
}

Expand Down Expand Up @@ -306,6 +307,7 @@ type timer interface {
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
ResetValue(int, float64)
CollectedValue() []float64
SampleRate() float64
}
Expand All @@ -332,6 +334,8 @@ func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) ResetValue(_ int, _ float64) {}

func (t *standardTimer) CollectedValue() []float64 {
return nil // since we flush right away nothing will be collected
}
Expand All @@ -341,12 +345,14 @@ func (t *standardTimer) SampleRate() float64 {
}

type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
capacity int
ringSize int
ringMask int
values []float64
count int
mu sync.Mutex
overflow int
}

func (t *reservoirTimer) time(dur time.Duration) {
Expand All @@ -361,10 +367,11 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

if t.count < t.capacity {
t.values = append(t.values, value)
} else {
t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter
t.values[t.overflow&t.ringMask] = value
t.overflow++

if t.overflow == t.ringSize {
t.overflow = 0
}

t.count++
Expand All @@ -374,18 +381,31 @@ func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) ResetValue(index int, value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// todo: we mistakingly record timers as floats, if we could confidently make this an int we optimize the reset with some bitwise/xor
if t.values[index] == value {
t.values[index] = 0
}
}

func (t *reservoirTimer) CollectedValue() []float64 {
t.mu.Lock()
defer t.mu.Unlock()

// return a copy of the values slice to avoid data races
values := make([]float64, len(t.values))
values := make([]float64, t.ringSize) // todo: is it worth it to use t.ringSize instead of computing len of values worth it?
copy(values, t.values)
return values
}

func (t *reservoirTimer) SampleRate() float64 {
return float64(len(t.values)) / float64(t.count)
if t.count <= t.ringSize {
return 1.0
}
return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it?
}

type timespan struct {
Expand All @@ -404,7 +424,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
// todo: no idea how memory is managed here, when are the map entries ever deleted?
// slots in this maps are reused as stats names are stable over the lifetime of the process
counters sync.Map
gauges sync.Map
timers sync.Map
Expand Down Expand Up @@ -451,8 +471,6 @@ func (s *statStore) Flush() {
}
s.mu.RUnlock()

// todo: if we're not deleting the data we flush from these maps, won't we just keep resending them?

s.counters.Range(func(key, v interface{}) bool {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
Expand All @@ -469,10 +487,17 @@ func (s *statStore) Flush() {
s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
sampleRate := timer.SampleRate()
for _, value := range timer.CollectedValue() {
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate)
if sampleRate == 0.0 {
return true // todo: may provide reduce some processing but not sure if it's worth the complexity
}

for i, value := range timer.CollectedValue() {
// todo: i think i need to preprocess what we flush as skipping should affect the sample rate
if value != 0.0 {
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate)
timer.ResetValue(i, value)
}
}
s.timers.Delete(key) // delete it from the map so it's not flushed again
}

return true
Expand Down Expand Up @@ -582,12 +607,14 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {

var t timer
if s.conf.isTimerReservoirEnabled() {
capacity := s.conf.TimerReservoirSize
capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity))
t = &reservoirTimer{
name: serializedName,
base: base,
capacity: s.conf.TimerReservoirSize,
values: make([]float64, 0, s.conf.TimerReservoirSize),
count: 0,
ringSize: capacity,
ringMask: capacityRoundedToTheNextPowerOfTwo - 1,
values: make([]float64, capacity),
}
} else {
t = &standardTimer{
Expand Down
166 changes: 157 additions & 9 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,24 @@ func TestTimerReservoir_Disabled(t *testing.T) {

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
if strings.Contains(value, "|@") {
t.Errorf("A stat was written with a sample rate when it shouldn't have any: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir(t *testing.T) {
func TestTimerReservoir_Overflow(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
Expand All @@ -171,7 +180,7 @@ func TestTimerReservoir(t *testing.T) {
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic
}

if ts.String() != "" {
Expand All @@ -182,9 +191,101 @@ func TestTimerReservoir(t *testing.T) {

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if sampleRate != "0.10" {
t.Errorf("A stat was written without a 0.10 sample rate: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_Full(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
}

expectedStatCount := 100

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 100; i++ {
store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic
}

if ts.String() != "" {
t.Errorf("Stats were written pre flush potentially clearing the reservoir too early")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if sampleRate != "1.00" {
t.Errorf("A stat was written without a 1.00 sample rate: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_NotFull(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
}

expectedStatCount := 50

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 50; i++ {
store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic
}

if ts.String() != "" {
t.Errorf("Stats were written pre flush potentially clearing the reservoir too early")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if sampleRate != "1.00" {
t.Errorf("A stat was written without a 1.00 sample rate: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
Expand All @@ -202,7 +303,7 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) {
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10) + 1) // use different names so that we don't conflate the metrics into the same reservoir
}

if ts.String() != "" {
Expand All @@ -213,9 +314,56 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) {

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
stats := strings.Split(ts.String(), "\n")
statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount)
}

stats = stats[:statCount]
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
if sampleRate != "1.00" {
t.Errorf("A stat was written without a 1.00 sample rate: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_FilteredZeros(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err)
}

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test").AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written pre flush potentially clearing the reservoir too early")
}
store.Flush()

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
stats = stats[:len(stats)-1] // there will be 1 extra new line character at the end of the buffer
for _, stat := range stats {
value := strings.Split(stat, ":")[1]
sampleRate := strings.Split(value, ("|@"))[1]
value = strings.Split(value, ("|ms"))[0] // strip value and remove suffix and get raw number
if value == "0" {
t.Errorf("A stat was written with a zero value: %s", stat)
}
if sampleRate != "0.10" {
t.Errorf("A stat was written without a 0.10 sample rate: %s", stat)
}
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
Expand Down

0 comments on commit ea5ae6a

Please sign in to comment.