Skip to content

Commit

Permalink
fix lastPkgTimestamp race
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Feb 11, 2025
1 parent 3536965 commit 278c55d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 32 deletions.
30 changes: 23 additions & 7 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Session struct {

enableAutoUpdate bool
autoUpdateExitCode int

lastPackageTimestampMu sync.RWMutex
lastPackageTimestamp time.Time
}

type closeOnce struct {
Expand Down Expand Up @@ -293,30 +296,43 @@ func (s *Session) startReader(ctx context.Context, readerExit chan any) {
s.processReaderResponse(resp, goroutineCloseCh, pipeFinishCh)
}

func (s *Session) setLastPackageTimestamp(t time.Time) {
s.lastPackageTimestampMu.Lock()
defer s.lastPackageTimestampMu.Unlock()
s.lastPackageTimestamp = t
}

func (s *Session) getLastPackageTimestamp() time.Time {
s.lastPackageTimestampMu.RLock()
defer s.lastPackageTimestampMu.RUnlock()
return s.lastPackageTimestamp
}

func (s *Session) processReaderResponse(resp *http.Response, goroutineCloseCh, pipeFinishCh chan any) {
lastPackageTimestamp := time.Now()
s.setLastPackageTimestamp(time.Now())

decoder := json.NewDecoder(resp.Body)
go s.handleReaderPipe(resp.Body, &lastPackageTimestamp, goroutineCloseCh, pipeFinishCh)
go s.handleReaderPipe(resp.Body, goroutineCloseCh, pipeFinishCh)

for {
var content Body
if err := decoder.Decode(&content); err != nil {
log.Logger.Errorf("session reader: error decoding response: %v", err)
break
}
if !s.tryWriteToReader(content, &lastPackageTimestamp) {
if !s.tryWriteToReader(content) {
return
}
}
}

func (s *Session) tryWriteToReader(content Body, lastPackageTimestamp *time.Time) bool {
func (s *Session) tryWriteToReader(content Body) bool {
select {
case <-s.closer.Done():
log.Logger.Debug("session reader: session closed, dropping message")
return false
case s.reader <- content:
*lastPackageTimestamp = time.Now()
s.setLastPackageTimestamp(time.Now())
log.Logger.Debug("session reader: request received and written to pipe")
return true
default:
Expand All @@ -325,7 +341,7 @@ func (s *Session) tryWriteToReader(content Body, lastPackageTimestamp *time.Time
}
}

func (s *Session) handleReaderPipe(respBody io.ReadCloser, lastPackageTimestamp *time.Time, closec, finish chan any) {
func (s *Session) handleReaderPipe(respBody io.ReadCloser, closec, finish chan any) {
defer close(finish)
log.Logger.Debug("session reader: pipe handler started")
threshold := 2 * time.Minute
Expand All @@ -343,7 +359,7 @@ func (s *Session) handleReaderPipe(respBody io.ReadCloser, lastPackageTimestamp
log.Logger.Debug("session reader: request finished, closing read pipe handler")
return
case <-ticker.C:
if time.Since(*lastPackageTimestamp) > threshold {
if time.Since(s.getLastPackageTimestamp()) > threshold {

Check warning on line 362 in internal/session/session.go

View check run for this annotation

Codecov / codecov/patch

internal/session/session.go#L362

Added line #L362 was not covered by tests
log.Logger.Debugf("session reader: exceed read wait timeout, closing read pipe handler")
return
}
Expand Down
104 changes: 79 additions & 25 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestApplyOpts(t *testing.T) {
Expand Down Expand Up @@ -388,37 +391,35 @@ func TestWriteBodyToPipe(t *testing.T) {

func TestTryWriteToReader(t *testing.T) {
tests := []struct {
name string
setupCloser bool
readerBufferSize int
content Body
preloadMessages int
expectedResult bool
expectTimestampSet bool
name string
setupCloser bool
readerBufferSize int
content Body
preloadMessages int
expectedResult bool
}{
{
name: "successful write",
readerBufferSize: 1,
content: Body{
ReqID: "test",
},
expectedResult: true,
expectTimestampSet: true,
expectedResult: true,
},
{
name: "full channel",
readerBufferSize: 1,
preloadMessages: 1,
name: "closed session",
setupCloser: true,
content: Body{
ReqID: "test",
},
expectedResult: true,
expectTimestampSet: false,
expectedResult: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)

s := &Session{
reader: make(chan Body, tt.readerBufferSize),
closer: &closeOnce{closer: make(chan any)},
Expand All @@ -433,15 +434,14 @@ func TestTryWriteToReader(t *testing.T) {
s.reader <- Body{ReqID: "preload"}
}

timestamp := time.Now().Add(-time.Hour) // Old timestamp
result := s.tryWriteToReader(tt.content, &timestamp)
beforeWrite := time.Now()
result := s.tryWriteToReader(tt.content)

if result != tt.expectedResult {
t.Errorf("tryWriteToReader() = %v, want %v", result, tt.expectedResult)
}
assert.Equal(tt.expectedResult, result)

if tt.expectTimestampSet && timestamp.Before(time.Now().Add(-time.Minute)) {
t.Error("Expected timestamp to be updated")
if result && !tt.setupCloser {
timestamp := s.getLastPackageTimestamp()
assert.False(timestamp.Before(beforeWrite), "Expected timestamp to be updated after successful write")
}
})
}
Expand All @@ -468,6 +468,8 @@ func TestHandleReaderPipe(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)

s := &Session{
closer: &closeOnce{closer: make(chan any)},
}
Expand All @@ -479,13 +481,15 @@ func TestHandleReaderPipe(t *testing.T) {
reader, writer := io.Pipe()
closec := make(chan any)
finish := make(chan any)
lastPackageTimestamp := time.Now()

go s.handleReaderPipe(reader, &lastPackageTimestamp, closec, finish)
// Set initial timestamp
s.setLastPackageTimestamp(time.Now())

go s.handleReaderPipe(reader, closec, finish)

select {
case <-finish:
t.Error("Pipe handler exited too early")
assert.Fail("Pipe handler exited too early")
case <-time.After(50 * time.Millisecond):
// Expected - handler should still be running
}
Expand All @@ -495,7 +499,7 @@ func TestHandleReaderPipe(t *testing.T) {
case <-finish:
// Expected - handler should exit when session is closed
case <-time.After(tt.expectedExitAfter):
t.Error("Pipe handler didn't exit after session close")
assert.Fail("Pipe handler didn't exit after session close")
}
}

Expand Down Expand Up @@ -559,3 +563,53 @@ func TestSessionKeepAlive(t *testing.T) {
t.Error("Writer channel should be closed")
}
}

func TestLastPackageTimestamp(t *testing.T) {
assert := assert.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s := &Session{
ctx: ctx,
cancel: cancel,
closer: &closeOnce{closer: make(chan any)},
}

// Test initial state
initialTime := s.getLastPackageTimestamp()
assert.True(initialTime.IsZero(), "Expected initial timestamp to be zero")

// Test setting and getting timestamp
now := time.Now()
s.setLastPackageTimestamp(now)
gotTime := s.getLastPackageTimestamp()
assert.True(gotTime.Equal(now), "Expected timestamp to match set time")

// Test concurrent access
const numGoroutines = 100
var wg sync.WaitGroup
wg.Add(numGoroutines * 2) // For both readers and writers

// Launch multiple goroutines to test concurrent reads
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
_ = s.getLastPackageTimestamp()
}()
}

// Launch multiple goroutines to test concurrent writes
for i := 0; i < numGoroutines; i++ {
go func(i int) {
defer wg.Done()
s.setLastPackageTimestamp(time.Now().Add(time.Duration(i) * time.Millisecond))
}(i)
}

wg.Wait()

// Verify timestamp was updated during concurrent operations
finalTime := s.getLastPackageTimestamp()
assert.False(finalTime.Equal(now), "Expected timestamp to be updated during concurrent operations")
}

0 comments on commit 278c55d

Please sign in to comment.