Skip to content

Commit

Permalink
reads/writes to files occur concurrently now, reducing critical secti…
Browse files Browse the repository at this point in the history
…on time
  • Loading branch information
djherbis committed Jul 24, 2016
1 parent 11c8248 commit fc45bf8
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 56 deletions.
63 changes: 28 additions & 35 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package stream

import "io"
import (
"io"
"sync"
)

// Reader is a concurrent-safe Stream Reader.
type Reader struct {
s *Stream
file File
s *Stream
file File
mu sync.Mutex
readOff int64
}

// Name returns the name of the underlying File in the FileSystem.
Expand All @@ -15,28 +20,22 @@ func (r *Reader) Name() string { return r.file.Name() }
// ReadAt blocks while waiting for the requested section of the Stream to be written,
// unless the Stream is closed in which case it will always return immediately.
func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
r.s.b.RLock()
defer r.s.b.RUnlock()

var m int

for {

m, err = r.file.ReadAt(p[n:], off+int64(n))
m, err = r.file.ReadAt(p[n:], off)
n += m
off += int64(m)

if r.s.b.IsOpen() {

switch {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.Wait()
case err != nil:
return n, err
switch {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
if v, open := r.s.b.Wait(off); v == 0 && !open {
return n, io.EOF
}

} else {
case err != nil:
return n, err
}

Expand All @@ -46,31 +45,25 @@ func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
// Read reads from the Stream. If the end of an open Stream is reached, Read
// blocks until more data is written or the Stream is Closed.
func (r *Reader) Read(p []byte) (n int, err error) {
r.s.b.RLock()
defer r.s.b.RUnlock()

r.mu.Lock()
defer r.mu.Unlock()
var m int

for {

m, err = r.file.Read(p[n:])
n += m

if r.s.b.IsOpen() {

switch {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.Wait()
case err != nil:
return n, err
r.readOff += int64(m)

switch {
case n != 0 && err == nil:
return n, nil
case err == io.EOF:
if v, open := r.s.b.Wait(r.readOff); v == 0 && !open {
return n, io.EOF
}

} else {
case err != nil:
return n, err
}

}
}

Expand Down
21 changes: 12 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var ErrRemoving = errors.New("cannot open a new reader while removing file")

// Stream is used to concurrently Write and Read from a File.
type Stream struct {
mu sync.Mutex
grp sync.WaitGroup
b *broadcaster
file File
Expand Down Expand Up @@ -41,20 +42,22 @@ func (s *Stream) Name() string { return s.file.Name() }

// Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.
func (s *Stream) Write(p []byte) (int, error) {
defer s.b.Broadcast()
s.b.Lock()
defer s.b.Unlock()
return s.file.Write(p)
s.mu.Lock()
defer s.mu.Unlock()
n, err := s.file.Write(p)
s.b.Wrote(n)
return n, err
}

// Close will close the active stream. This will cause Readers to return EOF once they have
// read the entire stream.
func (s *Stream) Close() error {
defer s.dec()
defer s.b.Close()
s.b.Lock()
defer s.b.Unlock()
return s.file.Close()
s.mu.Lock()
defer s.mu.Unlock()
err := s.file.Close()
s.b.Close()
s.dec()
return err
}

// Remove will block until the Stream and all its Readers have been Closed,
Expand Down
22 changes: 22 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ func TestMem(t *testing.T) {
testFile(f, t)
}

func TestReadAtWait(t *testing.T) {
f, err := NewStream("test.txt", NewMemFS())
if err != nil {
t.Error(err)
t.FailNow()
}
r, err := f.NextReader()
if err != nil {
t.Error(err)
t.FailNow()
}
go func() {
<-time.After(10 * time.Millisecond)
f.Close()
}()
data := make([]byte, 10)
n, err := r.ReadAt(data, 0)
if n != 0 || err != io.EOF {
t.Errorf("Unexpected, should be empty: %d, %s", n, err)
}
}

func TestRemove(t *testing.T) {
f, err := NewStream("test.txt", NewMemFS())
if err != nil {
Expand Down
35 changes: 23 additions & 12 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,44 @@ package stream

import (
"sync"
"sync/atomic"
)

type broadcaster struct {
sync.RWMutex
closed uint32
*sync.Cond
mu sync.RWMutex
cond *sync.Cond
closed bool
size int64
}

func newBroadcaster() *broadcaster {
var b broadcaster
b.Cond = sync.NewCond(b.RWMutex.RLocker())
b.cond = sync.NewCond(b.mu.RLocker())
return &b
}

func (b *broadcaster) Wait() {
if b.IsOpen() {
b.Cond.Wait()
// Wait blocks until we've written past the given offset, or until closed.
func (b *broadcaster) Wait(off int64) (n int64, open bool) {
b.mu.RLock()
defer b.mu.RUnlock()
for !b.closed && off >= b.size {
b.cond.Wait()
}
return b.size - off, !b.closed
}

func (b *broadcaster) IsOpen() bool {
return atomic.LoadUint32(&b.closed) == 0
func (b *broadcaster) Wrote(n int) {
if n > 0 {
b.mu.Lock()
b.size += int64(n)
b.mu.Unlock()
b.cond.Broadcast()
}
}

func (b *broadcaster) Close() error {
atomic.StoreUint32(&b.closed, 1)
b.Cond.Broadcast()
b.mu.Lock()
b.closed = true
b.mu.Unlock()
b.cond.Broadcast()
return nil
}

0 comments on commit fc45bf8

Please sign in to comment.