Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Return to read immediately after a successful read. #894

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand All @@ -125,12 +130,13 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
select {
case <-ctx.Done():
// Exit after next read attempt.
// We may have started waiting here when the stop signal
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
// we need to go back and try one more read. We'll exit
// the stream in the zero byte handler above.
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
Expand Down
9 changes: 4 additions & 5 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Stream is not shut down with cancel in this test
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // sync past read

// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
_, err = s.Write([]byte{})
testutil.FatalIfErr(t, err)
Expand Down Expand Up @@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -111,7 +109,8 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Synchronise past read.
// Yield to give the stream a chance to read.
time.Sleep(10 * time.Millisecond)

cancel() // This cancellation should cause the stream to shut down.
wg.Wait()
Expand Down
14 changes: 7 additions & 7 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
fs.lastReadTime = time.Now()
fs.mu.Unlock()
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && err != io.EOF {
Expand Down Expand Up @@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
}

// No error implies there is more to read in this file so go
// straight back to read unless it looks like context is Done.
if err == nil && ctx.Err() == nil {
continue
}

Sleep:
// If we get here it's because we've stalled. First test to see if it's
// time to exit.
Expand Down Expand Up @@ -243,6 +242,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("stream(%s): waiting", fs.pathname)
select {
case <-ctx.Done():
// Exit after next read attempt.
// We may have started waiting here when the cancellation
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
Expand All @@ -251,7 +251,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// could argue exiting immediately is less surprising.
// Assumption is that this doesn't make a difference in
// production.
glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname)
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", fs.pathname)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s): Wake received", fs.pathname)
Expand Down
14 changes: 8 additions & 6 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
logCloses.Add(ps.pathname, 1)
close(ps.lines)
ps.cancel()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, fd)

for {
Expand All @@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
ps.lastReadTime = time.Now()
ps.mu.Unlock()
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

// Test to see if we should exit.
Expand All @@ -120,10 +124,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("stream(%s): waiting", ps.pathname)
select {
case <-ctx.Done():
// Exit immediately; cancelled context is going to cause the
// next read to be interrupted and exit, so don't bother going
// around the loop again.
return
// Exit after next read attempt.
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", ps.pathname)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s): Wake received", ps.pathname)
Expand Down
10 changes: 4 additions & 6 deletions internal/tailer/logstream/pipestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666))

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)
Expand All @@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {

testutil.WriteString(t, f, "1\n")

// Avoid a race with cancellation if we can synchronise with waker.Wake()
awaken(0, 0)

cancel() // Cancellation here should cause the stream to shut down.
wg.Wait()

Expand Down Expand Up @@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down by cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled)
testutil.FatalIfErr(t, err)
Expand All @@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) {
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())

awaken(0, 0)
// Give the stream a chance to wake and read
time.Sleep(10 * time.Millisecond)

testutil.FatalIfErr(t, f.Close())

Expand Down
7 changes: 6 additions & 1 deletion internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
ss.lastReadTime = time.Now()
ss.mu.Unlock()
ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand All @@ -150,7 +155,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
select {
case <-ctx.Done():
// Cancelled context will cause the next read to be interrupted and exit.
// Exit after next read attempt.
glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address)
case <-waker.Wake():
// sleep until next Wake()
Expand Down
9 changes: 4 additions & 5 deletions internal/tailer/logstream/socketstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down with cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read

// Close the socket to signal to the socketStream to shut down.
testutil.FatalIfErr(t, s.Close())

Expand Down Expand Up @@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -108,7 +106,8 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read to ensure we read
// Yield to give the stream a chance to read.
time.Sleep(10 * time.Millisecond)

cancel() // This cancellation should cause the stream to shut down immediately.
wg.Wait()
Expand Down
Loading