Skip to content

Commit

Permalink
fixing broadcaster to broadcast atomically, and close to be goroutine…
Browse files Browse the repository at this point in the history
… safe with Write()
  • Loading branch information
djherbis committed Feb 4, 2016
1 parent 5cba822 commit a783ace
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 33 deletions.
4 changes: 0 additions & 4 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.RUnlock()
r.s.b.Wait()
r.s.b.RLock()
case err != nil:
return n, err
}
Expand Down Expand Up @@ -64,9 +62,7 @@ func (r *Reader) Read(p []byte) (n int, err error) {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.RUnlock()
r.s.b.Wait()
r.s.b.RLock()
case err != nil:
return n, err
}
Expand Down
2 changes: 2 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (s *Stream) Write(p []byte) (int, error) {
func (s *Stream) Close() error {
defer s.dec()
defer s.b.Close()
s.b.Lock()
defer s.b.Unlock()
return s.file.Close()
}

Expand Down
43 changes: 14 additions & 29 deletions sync.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,34 @@
package stream

import "sync"

type closer chan struct{}

func (a closer) Close() error {
close(a)
return nil
}

func (a closer) IsOpen() bool {
select {
case <-a:
return false
default:
return true
}
}

type noLock struct{}

func (l noLock) Lock() {}
func (l noLock) Unlock() {}
import (
"sync"
"sync/atomic"
)

type broadcaster struct {
sync.RWMutex
closer
closed uint32
*sync.Cond
}

func newBroadcaster() *broadcaster {
return &broadcaster{
Cond: sync.NewCond(noLock{}),
closer: make(closer),
}
var b broadcaster
b.Cond = sync.NewCond(b.RWMutex.RLocker())
return &b
}

func (b *broadcaster) Wait() {
if b.closer.IsOpen() {
if b.IsOpen() {
b.Cond.Wait()
}
}

func (b *broadcaster) IsOpen() bool {
return atomic.LoadUint32(&b.closed) == 0
}

func (b *broadcaster) Close() error {
b.closer.Close()
atomic.StoreUint32(&b.closed, 1)
b.Cond.Broadcast()
return nil
}

0 comments on commit a783ace

Please sign in to comment.