From a783acea0219fb5179f20ea1950451614b172bd0 Mon Sep 17 00:00:00 2001 From: djherbis Date: Wed, 3 Feb 2016 22:24:30 -0800 Subject: [PATCH] fixing broadcaster to broadcast atomically, and close to be goroutine safe with Write() --- reader.go | 4 ---- stream.go | 2 ++ sync.go | 43 ++++++++++++++----------------------------- 3 files changed, 16 insertions(+), 33 deletions(-) diff --git a/reader.go b/reader.go index 22bd3b2..8321270 100644 --- a/reader.go +++ b/reader.go @@ -31,9 +31,7 @@ func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) { case n != 0 && err == nil: return n, err case err == io.EOF: - r.s.b.RUnlock() r.s.b.Wait() - r.s.b.RLock() case err != nil: return n, err } @@ -64,9 +62,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { case n != 0 && err == nil: return n, err case err == io.EOF: - r.s.b.RUnlock() r.s.b.Wait() - r.s.b.RLock() case err != nil: return n, err } diff --git a/stream.go b/stream.go index 7ebd213..a0b3e1a 100644 --- a/stream.go +++ b/stream.go @@ -52,6 +52,8 @@ func (s *Stream) Write(p []byte) (int, error) { func (s *Stream) Close() error { defer s.dec() defer s.b.Close() + s.b.Lock() + defer s.b.Unlock() return s.file.Close() } diff --git a/sync.go b/sync.go index 6d507a6..26096ed 100644 --- a/sync.go +++ b/sync.go @@ -1,49 +1,34 @@ package stream -import "sync" - -type closer chan struct{} - -func (a closer) Close() error { - close(a) - return nil -} - -func (a closer) IsOpen() bool { - select { - case <-a: - return false - default: - return true - } -} - -type noLock struct{} - -func (l noLock) Lock() {} -func (l noLock) Unlock() {} +import ( + "sync" + "sync/atomic" +) type broadcaster struct { sync.RWMutex - closer + closed uint32 *sync.Cond } func newBroadcaster() *broadcaster { - return &broadcaster{ - Cond: sync.NewCond(noLock{}), - closer: make(closer), - } + var b broadcaster + b.Cond = sync.NewCond(b.RWMutex.RLocker()) + return &b } func (b *broadcaster) Wait() { - if b.closer.IsOpen() { + if b.IsOpen() { b.Cond.Wait() } } +func (b *broadcaster) IsOpen() bool { + return atomic.LoadUint32(&b.closed) == 0 +} + func (b *broadcaster) Close() error { - b.closer.Close() + atomic.StoreUint32(&b.closed, 1) b.Cond.Broadcast() return nil }