-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: port context-aware cond to gobox #325
base: main
Are you sure you want to change the base?
Changes from all commits
20ecdf3
a01271b
b9e6386
cfe8e42
b8c877d
f1e7e83
7715469
1a6a488
b6f91b4
4bd0dcf
4605647
5a99073
e8bffc4
63c9e51
8a5c784
3eabeff
47297a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,129 @@ | ||||||||||||||||||
// Copyright 2023 Outreach Corporation. All Rights Reserved. | ||||||||||||||||||
|
||||||||||||||||||
// Description: Cond.go provides a context respecting sync condition | ||||||||||||||||||
|
||||||||||||||||||
package async | ||||||||||||||||||
|
||||||||||||||||||
import ( | ||||||||||||||||||
"context" | ||||||||||||||||||
"sync" | ||||||||||||||||||
"sync/atomic" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
// Cond mimics sync.Cond in purpose, with the added goals of easing usability slightly and respecting context expiry. | ||||||||||||||||||
// | ||||||||||||||||||
// It provides functionality similar sync.Cond | ||||||||||||||||||
// - there is no signal method for waking a single Waiter | ||||||||||||||||||
// - the Wait method exits with error if the context cancels. | ||||||||||||||||||
// - it provides WaitForCondition, which intends to encapsulate the common pattern of acquiring a lock, checking | ||||||||||||||||||
// a condition, and releasing the lock before waiting for a state change if the condition is not met. | ||||||||||||||||||
type Cond struct { | ||||||||||||||||||
pointer atomic.Pointer[chan struct{}] | ||||||||||||||||||
Mu sync.Mutex | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// ch returns the channel that Waiters are waiting on, | ||||||||||||||||||
// possibly creating one if it hasn't been initialized | ||||||||||||||||||
func (c *Cond) ch() chan struct{} { | ||||||||||||||||||
// non atomic check for nil channel | ||||||||||||||||||
load := c.pointer.Load() | ||||||||||||||||||
if load == nil { | ||||||||||||||||||
t := make(chan struct{}) | ||||||||||||||||||
c.pointer.CompareAndSwap(nil, &t) | ||||||||||||||||||
return t | ||||||||||||||||||
Comment on lines
+31
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't handle the race where two threads enter this if block at the same time. You should return the value of Correct verbose version:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably should close |
||||||||||||||||||
} | ||||||||||||||||||
return *load | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Wait waits for the state change Broadcast until context ends. | ||||||||||||||||||
AndrewWinterman marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
// If the returned error is non-nil, then the context ended before the state change Broadcast. | ||||||||||||||||||
func (c *Cond) Wait(ctx context.Context) error { | ||||||||||||||||||
ch := c.ch() | ||||||||||||||||||
select { | ||||||||||||||||||
case <-ch: | ||||||||||||||||||
return nil | ||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||
return ctx.Err() | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Broadcast signals the state change to all Waiters. | ||||||||||||||||||
func (c *Cond) Broadcast() { | ||||||||||||||||||
// swap in a new channel, close the old one | ||||||||||||||||||
newChan := make(chan struct{}) | ||||||||||||||||||
ch := c.pointer.Swap(&newChan) | ||||||||||||||||||
if ch != nil { | ||||||||||||||||||
close(*ch) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// WaitForCondition acquires Cond's lock, then checks if the condition is true. If the condition is not true, | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clarify what the lock actually is (and use the receiver name):
Suggested change
|
||||||||||||||||||
// or the lock was not available, it releases the locker and waits for the state change Broadcast. | ||||||||||||||||||
// If the context ends during any of these operations, the context error is returned. | ||||||||||||||||||
// | ||||||||||||||||||
// WaitForCondition returns an unlock function that should always be called to unlock the locker. | ||||||||||||||||||
// unlock is safe to call regardless of error. | ||||||||||||||||||
// Error should only be returned if the context ends before the condition is met. | ||||||||||||||||||
// | ||||||||||||||||||
// If it returns without error, it also locks the provided locker and the caller must call the returned function | ||||||||||||||||||
// to unlock it. Until they call unlock, the state should not be changed. | ||||||||||||||||||
// | ||||||||||||||||||
// This method encapsulates the instructions in sync.Cond.Wait: | ||||||||||||||||||
// | ||||||||||||||||||
// """ | ||||||||||||||||||
// Because c.L is not locked while Wait is waiting, the caller | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yah, thanks! |
||||||||||||||||||
// typically cannot assume that the condition is true when | ||||||||||||||||||
// Wait returns. Instead, the caller should Wait in a loop: | ||||||||||||||||||
// | ||||||||||||||||||
// c.L.Lock() | ||||||||||||||||||
// for !condition() { | ||||||||||||||||||
// c.Wait() | ||||||||||||||||||
// } | ||||||||||||||||||
// ... make use of condition ... | ||||||||||||||||||
// c.L.Unlock() | ||||||||||||||||||
// | ||||||||||||||||||
// """ | ||||||||||||||||||
// | ||||||||||||||||||
// Instead, you can do the following: | ||||||||||||||||||
// | ||||||||||||||||||
// var c Cond | ||||||||||||||||||
// | ||||||||||||||||||
// unlock, err := c.WaitForCondition(ctx, func() bool { | ||||||||||||||||||
// // check condition | ||||||||||||||||||
// return true | ||||||||||||||||||
// }) | ||||||||||||||||||
// if err != nil { | ||||||||||||||||||
// // context expired before condition was met | ||||||||||||||||||
// } | ||||||||||||||||||
// ... make use of condition ... | ||||||||||||||||||
// unlock() | ||||||||||||||||||
func (c *Cond) WaitForCondition(ctx context.Context, condition func() bool) (unlock func(), | ||||||||||||||||||
AndrewWinterman marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
err error) { | ||||||||||||||||||
for { | ||||||||||||||||||
locked := c.Mu.TryLock() | ||||||||||||||||||
// we have the lock, we can safely check the condition | ||||||||||||||||||
ok := locked && condition() | ||||||||||||||||||
|
||||||||||||||||||
if !ok { | ||||||||||||||||||
// condition not met | ||||||||||||||||||
if locked { | ||||||||||||||||||
// but we acquired the lock. so unlock it... | ||||||||||||||||||
c.Mu.Unlock() | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// either way, wait for the next signal | ||||||||||||||||||
waitErr := c.Wait(ctx) | ||||||||||||||||||
if waitErr != nil { | ||||||||||||||||||
return func() {}, waitErr | ||||||||||||||||||
} | ||||||||||||||||||
} else { | ||||||||||||||||||
// condition met, return the unlock function and nil error. | ||||||||||||||||||
// client must call the unlock function to unlock the mutex | ||||||||||||||||||
// client guaranteed the condition holds while mutex lock is held. | ||||||||||||||||||
return func() { | ||||||||||||||||||
c.Mu.Unlock() | ||||||||||||||||||
c.Broadcast() | ||||||||||||||||||
}, nil | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document public field?