Skip to content

Commit

Permalink
add Reader.Seek
Browse files Browse the repository at this point in the history
  • Loading branch information
djherbis committed Feb 22, 2020
1 parent 375fb58 commit 668a316
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 11 deletions.
52 changes: 43 additions & 9 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"errors"
"io"
"sync"
)
Expand All @@ -22,29 +23,24 @@ func (r *Reader) Name() string { return r.file.Name() }
// ReadAt blocks while waiting for the requested section of the Stream to be written,
// unless the Stream is closed in which case it will always return immediately.
func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
return r.read(p, func(p []byte) (int, error) {
return r.file.ReadAt(p, off)
}, &off)
return r.read(p, &off)
}

// Read reads from the Stream. If the end of an open Stream is reached, Read
// blocks until more data is written or the Stream is Closed.
func (r *Reader) Read(p []byte) (n int, err error) {
r.readMu.Lock()
defer r.readMu.Unlock()

return r.read(p, r.file.Read, &r.readOff)
return r.read(p, &r.readOff)
}

type readerFunc func([]byte) (int, error)

func (r *Reader) read(p []byte, readFunc readerFunc, off *int64) (n int, err error) {
func (r *Reader) read(p []byte, off *int64) (n int, err error) {
for {
var m int
m, err = r.s.b.UseHandle(func() (int, error) {
r.fileMu.RLock()
defer r.fileMu.RUnlock()
return readFunc(p[n:])
return r.file.ReadAt(p[n:], *off)
})
n += m
*off += int64(m)
Expand Down Expand Up @@ -90,3 +86,41 @@ func (r *Reader) Close() error {
func (r *Reader) Size() (int64, bool) {
return r.s.b.Size()
}

// Seek changes the offset of the next Read in the stream.
// Seeking to Start/Current does not block for the stream to reach that position,
// so it cannot guarantee that position exists.
// Seeking to End will block until the stream is closed and then seek to that position.
// Seek is safe to call concurrently with all other methods, though calling it
// concurrently with Read will lead to an undefined order of the calls
// (ex. may Seek then Read or Read than Seek, changing which bytes are Read).
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
r.readMu.Lock()
defer r.readMu.Unlock()

switch whence {
default:
return 0, errWhence
case io.SeekStart:
case io.SeekCurrent:
offset += r.readOff
case io.SeekEnd:
if err := r.s.b.Wait(r, maxInt64); err != nil && err != io.EOF {
return 0, err
}
size, _ := r.s.b.Size() // we most be closed to reach here due to ^
offset += size
}
if offset < 0 {
return 0, errOffset
}
r.readOff = offset
return r.readOff, nil
}

var (
errWhence = errors.New("Seek: invalid whence")
errOffset = errors.New("Seek: invalid offset")
)

const maxInt64 = 1<<63 - 1
109 changes: 109 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ func TestMemFs(t *testing.T) {
t.Error(err)
t.FailNow()
}


want := "hello"
w, _ := fs.Create("file")
io.WriteString(w, want)

r, _ := fs.Open("file")
data, _ := ioutil.ReadAll(r)
got := string(data)

if want != got {
t.Errorf("Want/got: %v/%v", want, got)
}

r.Close()
if _, err := ioutil.ReadAll(r); err != os.ErrClosed {
t.Errorf("wanted ErrClosed got %v", err)
}
}

func TestSingletonFs(t *testing.T) {
Expand Down Expand Up @@ -421,11 +439,13 @@ func testReadAtWait(t *testing.T, fs FileSystem) {
t.Error(err)
t.FailNow()
}
defer f.Remove()
r, err := f.NextReader()
if err != nil {
t.Error(err)
t.FailNow()
}
defer r.Close()
io.WriteString(f, "hello")
go func() {
<-time.After(50 * time.Millisecond)
Expand Down Expand Up @@ -605,3 +625,92 @@ func cleanup(f *Stream, t *testing.T) {
t.Error("error while removing file: ", err)
}
}

func TestSeeker(t *testing.T) {
for _, fs := range GetFilesystems() {
testSeeker(t, fs)
}
}

func testSeeker(t *testing.T, fs FileSystem) {
f, err := NewStream(t.Name()+".txt", fs)
if err != nil {
t.Error(err)
t.FailNow()
}
defer f.Remove()
output := "012345"
go func() {
io.WriteString(f, output)
<-time.After(5 * time.Millisecond)
f.Close()
}()
r, _ := f.NextReader()

n := int64(len(output))
for i := int64(-1); i >= -n; i-- {
off, err := r.Seek(i, io.SeekEnd)
if err != nil {
t.Fatal(err)
}
if got, want := off, n+i; want != got {
t.Errorf("Want/got wrong offset: %v, %v", want, got)
}
data, err := ioutil.ReadAll(io.LimitReader(r, 1))
if err != nil {
t.Fatal(err)
}
if got, want := string(data), output[n+i:n+i+1]; want != got {
t.Errorf("Want/got wrong: %v, %v", want, got)
}
}

for i := int64(0); i < n; i++ {
off, err := r.Seek(i, io.SeekStart)
if err != nil {
t.Fatal(err)
}
if got, want := off, i; want != got {
t.Errorf("Want/got wrong offset: %v, %v", want, got)
}
data, err := ioutil.ReadAll(io.LimitReader(r, 1))
if err != nil {
t.Fatal(err)
}
if got, want := string(data), output[i:i+1]; want != got {
t.Errorf("Want/got wrong: %v, %v", want, got)
}
}

r.Seek(0, io.SeekStart)

for i := int64(1); i < n; i += 2 {
off, err := r.Seek(1, io.SeekCurrent)
if err != nil {
t.Fatal(err)
}
if got, want := off, i; want != got {
t.Errorf("Want/got wrong offset: %v, %v", want, got)
}
data, err := ioutil.ReadAll(io.LimitReader(r, 1))
if err != nil {
t.Fatal(err)
}
if got, want := string(data), output[i:i+1]; want != got {
t.Errorf("Want/got wrong: %v, %v", want, got)
}
}

if _, err := r.Seek(0, 100); err != errWhence {
t.Errorf("Expected errWhence")
}

if _, err := r.Seek(-1, io.SeekStart); err != errOffset {
t.Errorf("Expected errOffset")
}

f.Cancel()
if _, err := r.Seek(0, io.SeekEnd); err != ErrCanceled {
t.Fatal(err)
}
}
3 changes: 1 addition & 2 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func (b *broadcaster) Wait(r *Reader, off int64) error {
return ErrCanceled

case closedState:
remaining := b.size - off
if remaining == 0 {
if off >= b.size {
return io.EOF
}
}
Expand Down

0 comments on commit 668a316

Please sign in to comment.