From 5d77be93cebc44cfe9a93f9688063006794910d5 Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Wed, 28 Aug 2024 22:42:09 -0700 Subject: [PATCH 1/9] Restore and revamp the watch benchmarks I took this out when I added type parameterization to watch.Value, calling it "ridiculous and useless" because "the results probably just reflect lock contention." Well, I'm about to change the locking strategy for watch.Value in a big way, so a head-to-head comparison of Value.Set implementations in the presence of watchers has some meaning, even if it's all about lock contention. My original implementation really was awful, though. Every setter incrementing the same atomic integer? Really? This version introduces three big changes from the original: 1. Adding a version with 1,000 watchers. 2. Using type parameters. 3. Using random numbers (from the math/rand/v2 default source) to generate the values to set. The random part is still fuzzy to me. It's not like I've proven that using a constant is worse than this, so the idea that the setters should do "something" is basically just vibes. But the goal is to compare implementations, not look at absolute numbers for their own sake, so the work done by setters isn't *that* important either way. --- internal/watch/watch_test.go | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/internal/watch/watch_test.go b/internal/watch/watch_test.go index fbf3856..61dc283 100644 --- a/internal/watch/watch_test.go +++ b/internal/watch/watch_test.go @@ -1,6 +1,7 @@ package watch import ( + "math/rand/v2" "runtime" "sync" "testing" @@ -347,3 +348,49 @@ func assertBlocked(t *testing.T, ch <-chan struct{}) { default: } } + +func BenchmarkSet1Watcher(b *testing.B) { + benchmarkSetWithWatchers(b, 1) +} + +func BenchmarkSet10Watchers(b *testing.B) { + benchmarkSetWithWatchers(b, 10) +} + +func BenchmarkSet100Watchers(b *testing.B) { + benchmarkSetWithWatchers(b, 100) +} + +func BenchmarkSet1000Watchers(b *testing.B) { + benchmarkSetWithWatchers(b, 1000) +} + +func benchmarkSetWithWatchers(b *testing.B, nWatchers int) { + v := NewValue(uint64(0)) + watchers := make([]Watch, nWatchers) + for i := range watchers { + var sum uint64 + watchers[i] = v.Watch(func(x uint64) { sum += x }) + } + + b.Cleanup(func() { + for _, w := range watchers { + w.Cancel() + w.Wait() + } + }) + + b.RunParallel(func(pb *testing.PB) { + // The choice to set random values is somewhat arbitrary. In practice, the + // cost of lock contention probably outweighs any strategy for generating + // these values--even setting a constant every time (unless there were ever + // an optimization to not trigger watches when the value doesn't change). + // Having the setters do work that the handlers can't predict feels vaguely + // more realistic, though, and it's not a huge difference either way since + // the goal is to compare different watcher implementations (that is, the + // work just needs to be the same on both sides of the comparison). + for pb.Next() { + v.Set(rand.Uint64()) + } + }) +} From 8764fe3dabfa812dcb945b90ad9cb4eb4c304ab2 Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Wed, 28 Aug 2024 22:10:21 -0700 Subject: [PATCH 2/9] Watch with fewer goroutines I've wanted for some time to see if I could limit the use of goroutines in the watch package. The original version uses up to 2 goroutines per watch: 1. A long-running goroutine to constantly poll an updates channel. 2. A temporary goroutine to insulate (1) from runtime.Goexit. This version instead spawns up to 1 goroutine, which runs only while updates are being handled and still protects against runtime.Goexit. Unlike the old "channel as an atomic buffer" concept, this uses a single mutex to track the new value and handler invariants together. In fact, I've been careful to avoid any use of channels, so no allocations are required beyond the original watch struct. Benchmark results are fuzzier than I'd like, but the numbers seem noticeably better at first glance. Allocation size in particular gets cut roughly in half, presumably for the reasons stated above (with a grain of salt; I haven't profiled the memory). One concern: mutation testing shows "w.unregister(w)" can be removed without failing unit tests. The old implementation's channel closure would implicitly guarantee a panic on unintended future updates, but I'll need to find another answer now. --- internal/watch/watch.go | 86 ++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 3642ecc..918eb31 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -98,61 +98,75 @@ type Watch interface { type watch[T any] struct { handler func(T) unregister func(*watch[T]) - pending chan T - done chan struct{} + + mu sync.Mutex + wg sync.WaitGroup + next T + ok bool // There is a valid value in next. + running bool // There is (or will be) a goroutine responsible for handling values. + cancel bool // The WaitGroup must be canceled as soon as running == false. } func newWatch[T any](handler func(T), unregister func(*watch[T])) *watch[T] { w := &watch[T]{ handler: handler, unregister: unregister, - pending: make(chan T, 1), - done: make(chan struct{}), } - go w.run() + w.wg.Add(1) return w } -func (w *watch[T]) run() { - var wg sync.WaitGroup - defer close(w.done) - - for next := range w.pending { - x := next - wg.Add(1) - // Insulate the handler from the main loop, e.g. if it calls runtime.Goexit - // it should not terminate this loop and break the processing of new values. - go func() { - defer wg.Done() - w.handler(x) - }() - wg.Wait() +func (w *watch[T]) update(x T) { + w.mu.Lock() + start := !w.running + w.next, w.ok, w.running = x, true, true + w.mu.Unlock() + if start { + go w.run() } } -func (w *watch[T]) update(x T) { - // It's important that this call not block, so we assume w.pending is buffered - // and drop a pending update to free space if necessary. - select { - case <-w.pending: - w.pending <- x - case w.pending <- x: +func (w *watch[T]) run() { + var cancel, unwind bool + defer func() { + if cancel { + w.wg.Done() + } + if unwind { + go w.run() // Only possible if w.running == true, so we must maintain the invariant. + } + }() + + for { + w.mu.Lock() + cancel = w.cancel + next := w.next + stop := !w.ok || cancel + w.next, w.ok = *new(T), false + w.running = !stop + w.mu.Unlock() + + if stop { + return + } + + unwind = true + w.handler(next) // May panic or call runtime.Goexit. + unwind = false } } func (w *watch[T]) Cancel() { - w.unregister(w) - w.clearPending() - close(w.pending) -} - -func (w *watch[T]) clearPending() { - select { - case <-w.pending: - default: + w.unregister(w) // After this, we are guaranteed no new w.update calls. + w.mu.Lock() + w.cancel = true + done := !w.running + w.mu.Unlock() + if done { + w.wg.Done() } } func (w *watch[T]) Wait() { - <-w.done + w.wg.Wait() } From ee7dc372d257ed38b2c7ab11c4f3c41411364d79 Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Wed, 28 Aug 2024 23:55:27 -0700 Subject: [PATCH 3/9] Use a regular mutex to protect watch.Value Turns out I only call watch.Value.Get in the unit tests. I won't remove it, since it serves useful purposes there, but RWMutex seems like a useless premature optimization. I also did some visual tidying throughout. --- internal/watch/watch.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 918eb31..54085d6 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -8,12 +8,10 @@ import "sync" // // The zero value of a Value is valid and stores the zero value of T. type Value[T any] struct { - // Invariant: Every Watch must receive one update call for every value of the - // Value from the time it is added to the watchers set to the time it is - // removed. - // - // mu protects this invariant, and prevents data races on value. - mu sync.RWMutex + // mu prevents data races on the value, and protects the invariant that every + // Watch receives one update call for every value of the Value from the time + // it's added to the watchers set to the time it's removed. + mu sync.Mutex value T watchers map[*watch[T]]struct{} } @@ -25,8 +23,8 @@ func NewValue[T any](x T) *Value[T] { // Get returns the current value stored in v. func (v *Value[T]) Get() T { - v.mu.RLock() - defer v.mu.RUnlock() + v.mu.Lock() + defer v.mu.Unlock() return v.value } @@ -57,20 +55,19 @@ func (v *Value[T]) Set(x T) { // execution has finished. func (v *Value[T]) Watch(handler func(x T)) Watch { w := newWatch(handler, v.unregisterWatch) - v.updateAndRegisterWatch(w) + v.registerAndUpdateWatch(w) return w } -func (v *Value[T]) updateAndRegisterWatch(w *watch[T]) { +func (v *Value[T]) registerAndUpdateWatch(w *watch[T]) { v.mu.Lock() defer v.mu.Unlock() - w.update(v.value) - if v.watchers == nil { v.watchers = make(map[*watch[T]]struct{}) } v.watchers[w] = struct{}{} + w.update(v.value) } func (v *Value[T]) unregisterWatch(w *watch[T]) { From ce7dfe70a831b59c0e6da83a2475b68c82a17c3f Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 22:06:40 -0700 Subject: [PATCH 4/9] Make it safe to call Watch.Cancel twice I may or may not document this, though if this were a public package Hyrum's Law would guarantee it as a permanent feature of the API. --- internal/watch/watch.go | 4 ++-- internal/watch/watch_test.go | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 54085d6..31caab1 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -156,10 +156,10 @@ func (w *watch[T]) run() { func (w *watch[T]) Cancel() { w.unregister(w) // After this, we are guaranteed no new w.update calls. w.mu.Lock() + finish := !w.running && !w.cancel w.cancel = true - done := !w.running w.mu.Unlock() - if done { + if finish { w.wg.Done() } } diff --git a/internal/watch/watch_test.go b/internal/watch/watch_test.go index 61dc283..209aee3 100644 --- a/internal/watch/watch_test.go +++ b/internal/watch/watch_test.go @@ -208,6 +208,14 @@ func TestGoexitFromHandler(t *testing.T) { assertWatchTerminates(t, w) } +func TestDoubleCancel(t *testing.T) { + // This one is simple: calling Cancel twice should not panic. + v := NewValue("alice") + w := v.Watch(func(x string) {}) + w.Cancel() + w.Cancel() +} + func TestCancelBlockedWatcher(t *testing.T) { // A specific test for canceling a watch while it is handling a notification. From 4fc6ebee2677c789db68e1f42070e481e5c7d99a Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 22:09:17 -0700 Subject: [PATCH 5/9] Simplify cancellation from the watch handler goroutine I think it's reasonable to keep anything out of the defer that doesn't strictly need to be there. --- internal/watch/watch.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 31caab1..4b9fce0 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -124,25 +124,25 @@ func (w *watch[T]) update(x T) { } func (w *watch[T]) run() { - var cancel, unwind bool + var unwind bool defer func() { - if cancel { - w.wg.Done() - } if unwind { - go w.run() // Only possible if w.running == true, so we must maintain the invariant. + // Only possible if w.running == true, so we must maintain the invariant. + go w.run() } }() for { w.mu.Lock() - cancel = w.cancel - next := w.next + next, cancel := w.next, w.cancel stop := !w.ok || cancel - w.next, w.ok = *new(T), false w.running = !stop + w.next, w.ok = *new(T), false w.mu.Unlock() + if cancel { + w.wg.Done() + } if stop { return } From ab62e5d71906821ec16988ae93f13c4fde821504 Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 22:27:50 -0700 Subject: [PATCH 6/9] Test cancellation with an inactive handler This is the case that mutation testing revealed. Making the same mutation myself results in a panic during this test, due to the WaitGroup getting canceled twice. If that somehow starts working, the blocked channel test is ready to take over. --- internal/watch/watch_test.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/watch/watch_test.go b/internal/watch/watch_test.go index 209aee3..b1a9827 100644 --- a/internal/watch/watch_test.go +++ b/internal/watch/watch_test.go @@ -216,6 +216,26 @@ func TestDoubleCancel(t *testing.T) { w.Cancel() } +func TestCancelInactiveHandler(t *testing.T) { + // The usual case of canceling a watch, where no handler is active at the time + // of cancellation. Once we cancel, no further handler calls should be made. + + v := NewValue("alice") + notify := make(chan string, 1) + w := v.Watch(func(x string) { + select { + case notify <- x: + default: + } + }) + + assertNextReceive(t, notify, "alice") + + w.Cancel() + v.Set("bob") + assertBlocked(t, notify) +} + func TestCancelBlockedWatcher(t *testing.T) { // A specific test for canceling a watch while it is handling a notification. @@ -338,10 +358,10 @@ func assertWatchTerminates(t *testing.T, w Watch) { } } -func assertBlocked(t *testing.T, ch <-chan struct{}) { +func assertBlocked[T any](t *testing.T, ch <-chan T) { t.Helper() - // If any background routines are going to close ch when they should not, + // If any background routines are going to send on ch when they should not, // let's make a best effort to help them along. gomaxprocs := runtime.GOMAXPROCS(1) defer runtime.GOMAXPROCS(gomaxprocs) From 5c8ec2ff51fc22ac573a64674940ece298b54b49 Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 22:49:24 -0700 Subject: [PATCH 7/9] Test multi-cancellation more robustly I had to kill another mutant. My original double-cancel test didn't wait for the initial handler to finish before setting the cancel flag, and the scheduling of the goroutines worked out to avoid double-cancelling the WaitGroup. Now, double-cancellation tests for both active and inactive handlers should be more robust (though in the former case I merely tweaked an existing test). --- internal/watch/watch_test.go | 50 ++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/internal/watch/watch_test.go b/internal/watch/watch_test.go index b1a9827..94a32c1 100644 --- a/internal/watch/watch_test.go +++ b/internal/watch/watch_test.go @@ -208,14 +208,6 @@ func TestGoexitFromHandler(t *testing.T) { assertWatchTerminates(t, w) } -func TestDoubleCancel(t *testing.T) { - // This one is simple: calling Cancel twice should not panic. - v := NewValue("alice") - w := v.Watch(func(x string) {}) - w.Cancel() - w.Cancel() -} - func TestCancelInactiveHandler(t *testing.T) { // The usual case of canceling a watch, where no handler is active at the time // of cancellation. Once we cancel, no further handler calls should be made. @@ -230,12 +222,26 @@ func TestCancelInactiveHandler(t *testing.T) { }) assertNextReceive(t, notify, "alice") + forceRuntimeProgress() // Try to ensure the handler has fully terminated. w.Cancel() v.Set("bob") assertBlocked(t, notify) } +func TestDoubleCancelInactiveHandler(t *testing.T) { + // A specific test for calling Cancel twice on an inactive handler, and + // ensuring we don't panic. + + v := NewValue("alice") + w := v.Watch(func(x string) {}) + forceRuntimeProgress() // Try to ensure the initial handler has fully terminated. + + w.Cancel() + w.Cancel() + assertWatchTerminates(t, w) +} + func TestCancelBlockedWatcher(t *testing.T) { // A specific test for canceling a watch while it is handling a notification. @@ -269,9 +275,10 @@ func TestCancelBlockedWatcher(t *testing.T) { assertWatchTerminates(t, w) } -func TestCancelFromHandler(t *testing.T) { +func TestDoubleCancelFromHandler(t *testing.T) { // This is a special case of Cancel being called while a handler is blocked, - // as the caller of Cancel is the handler itself. + // as the caller of Cancel is the handler itself. We also call Cancel twice, + // to make sure multi-cancellation works in the active handler case. v := NewValue("alice") @@ -286,6 +293,7 @@ func TestCancelFromHandler(t *testing.T) { v.Set("bob") w := <-watchCh w.Cancel() + w.Cancel() canceled = true }) @@ -361,15 +369,7 @@ func assertWatchTerminates(t *testing.T, w Watch) { func assertBlocked[T any](t *testing.T, ch <-chan T) { t.Helper() - // If any background routines are going to send on ch when they should not, - // let's make a best effort to help them along. - gomaxprocs := runtime.GOMAXPROCS(1) - defer runtime.GOMAXPROCS(gomaxprocs) - n := runtime.NumGoroutine() - for i := 0; i < n; i++ { - runtime.Gosched() - } - + forceRuntimeProgress() select { case <-ch: t.Fatal("progress was not blocked") @@ -377,6 +377,18 @@ func assertBlocked[T any](t *testing.T, ch <-chan T) { } } +// forceRuntimeProgress makes a best-effort attempt to force the Go runtime to +// make progress on all other goroutines in the system, ideally to the point at +// which they will next block if not preempted. It works best if no other +// goroutines are CPU-intensive or change GOMAXPROCS. +func forceRuntimeProgress() { + gomaxprocs := runtime.GOMAXPROCS(1) + defer runtime.GOMAXPROCS(gomaxprocs) + for range runtime.NumGoroutine() { + runtime.Gosched() + } +} + func BenchmarkSet1Watcher(b *testing.B) { benchmarkSetWithWatchers(b, 1) } From 8d6ba9219915fda8a54f579c8e9d5267f24fe04e Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 23:06:58 -0700 Subject: [PATCH 8/9] Clean up various parts of the watch tests Reworded a few messages, and took advantage of Go 1.22 loop features (proper scoping, range over int) in a couple spots. --- internal/watch/watch_test.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/internal/watch/watch_test.go b/internal/watch/watch_test.go index 94a32c1..de53fd5 100644 --- a/internal/watch/watch_test.go +++ b/internal/watch/watch_test.go @@ -10,7 +10,7 @@ import ( const timeout = 2 * time.Second -func TestValue(t *testing.T) { +func TestValueStress(t *testing.T) { // A stress test meant to be run with the race detector enabled. This test // ensures that all access to a Value is synchronized, that handlers run // serially, and that handlers are properly notified of the most recent state. @@ -25,11 +25,9 @@ func TestValue(t *testing.T) { var handlerGroup sync.WaitGroup handlerGroup.Add(nWatchers) - for i := 0; i < nWatchers; i++ { - var ( - sum int - sawFinal bool - ) + for i := range nWatchers { + var sum int + var sawFinal bool watches[i] = v.Watch(func(x int) { // This will quickly make the race detector complain if more than one // instance of a handler runs at once. @@ -51,10 +49,7 @@ func TestValue(t *testing.T) { for i := 1; i <= nWrites-1; i++ { // This will quickly make the race detector complain if Set is not properly // synchronized. - go func(i int) { - defer setGroup.Done() - v.Set(i) - }(i) + go func() { defer setGroup.Done(); v.Set(i) }() } setGroup.Wait() @@ -69,7 +64,7 @@ func TestValue(t *testing.T) { select { case <-done: case <-time.After(timeout): - t.Fatalf("reached %v timeout before all watchers saw final state", timeout) + t.Fatalf("not all watchers saw final state within %v", timeout) } for _, w := range watches { @@ -98,7 +93,7 @@ func TestWatchZeroValue(t *testing.T) { t.Errorf("watch on zero value of Value got %v; want nil", x) } case <-time.After(timeout): - t.Fatalf("reached %v timeout before watcher was notified", timeout) + t.Fatalf("watcher not notified within %v", timeout) } w.Cancel() @@ -346,7 +341,7 @@ func assertNextReceive[T comparable](t *testing.T, ch chan T, want T) { t.Fatalf("got %v from channel, want %v", got, want) } case <-time.After(timeout): - t.Fatalf("reached %v timeout before watcher was notified", timeout) + t.Fatalf("watcher not notified within %v", timeout) } } @@ -362,7 +357,7 @@ func assertWatchTerminates(t *testing.T, w Watch) { select { case <-done: case <-time.After(timeout): - t.Fatalf("watch not terminated after %v", timeout) + t.Fatalf("watch still active after %v", timeout) } } @@ -416,6 +411,8 @@ func benchmarkSetWithWatchers(b *testing.B, nWatchers int) { b.Cleanup(func() { for _, w := range watchers { w.Cancel() + } + for _, w := range watchers { w.Wait() } }) From b9306058f3cf2755e366fd4ee47db5708299362d Mon Sep 17 00:00:00 2001 From: Alex Hamlin Date: Thu, 29 Aug 2024 23:38:23 -0700 Subject: [PATCH 9/9] Add links to watch documentation --- internal/watch/watch.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 4b9fce0..5e61a2a 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -44,14 +44,14 @@ func (v *Value[T]) Set(x T) { // // Each active watch executes up to one instance of handler at a time in a new // goroutine, first with the value stored in v upon creation of the watch, then -// with subsequent values stored in v by calls to Set. If the value stored in v -// changes while a handler execution is in flight, handler will be called once -// more with the latest value stored in v following its current execution. -// Intermediate updates preceding the latest value will be dropped. +// with subsequent values stored in v by calls to [Value.Set]. If the value +// stored in v changes while a handler execution is in flight, handler will be +// called once more with the latest value stored in v following its current +// execution. Intermediate updates preceding the latest value will be dropped. // // Values are not recovered by the garbage collector until all of their // associated watches have terminated. A watch is terminated after it has been -// canceled by a call to Watch.Cancel, and any pending or in-flight handler +// canceled by a call to [Watch.Cancel], and any pending or in-flight handler // execution has finished. func (v *Value[T]) Watch(handler func(x T)) Watch { w := newWatch(handler, v.unregisterWatch) @@ -77,7 +77,7 @@ func (v *Value[T]) unregisterWatch(w *watch[T]) { delete(v.watchers, w) } -// Watch represents a single watch on a Value. See Value.Watch for details. +// Watch represents a single watch on a Value. See [Value.Watch] for details. type Watch interface { // Cancel requests that this watch be terminated as soon as possible, // potentially after a pending or in-flight handler execution has finished.