Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(internal/session): add more unit tests with smaller functions #359

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 90 additions & 61 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@

enableAutoUpdate bool
autoUpdateExitCode int

lastPackageTimestampMu sync.RWMutex
lastPackageTimestamp time.Time
}

type closeOnce struct {
Expand Down Expand Up @@ -169,6 +172,34 @@
}
}

func createHTTPClient() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
FallbackDelay: 300 * time.Millisecond,
}).DialContext,
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
DisableKeepAlives: true,
},
}
}

func createSessionRequest(ctx context.Context, endpoint, machineID, sessionType string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, "POST", endpoint, body)
if err != nil {
return nil, err
}
req.Header.Set("machine_id", machineID)
req.Header.Set("session_type", sessionType)
return req, nil
}

func (s *Session) startWriter(ctx context.Context, writerExit chan any) {
pipeFinishCh := make(chan any)
goroutineCloseCh := make(chan any)
Expand All @@ -181,29 +212,13 @@
reader, writer := io.Pipe()
go s.handleWriterPipe(writer, goroutineCloseCh, pipeFinishCh)

req, err := http.NewRequestWithContext(ctx, "POST", s.endpoint, reader)
req, err := createSessionRequest(ctx, s.endpoint, s.machineID, "write", reader)
if err != nil {
log.Logger.Debugf("session writer: error creating request: %v", err)
return
}
req.Header.Set("machine_id", s.machineID)
req.Header.Set("session_type", "write")

client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
FallbackDelay: 300 * time.Millisecond,
}).DialContext,
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
DisableKeepAlives: true,
},
}
client := createHTTPClient()
resp, err := client.Do(req)
if err != nil {
log.Logger.Debugf("session writer: error making request: %v", err)
Expand All @@ -221,28 +236,33 @@
case <-s.closer.Done():
log.Logger.Debug("session writer: session closed, closing pipe handler")
return

case <-closec:
log.Logger.Debug("session writer: request finished, closing pipe handler")
return

case body := <-s.writer:
bytes, err := json.Marshal(body)
if err != nil {
log.Logger.Errorf("session writer: failed to marshal body: %v", err)
continue
}
if _, err := writer.Write(bytes); err != nil {
log.Logger.Errorf("session writer: failed to write to pipe: %v", err)
if err := s.writeBodyToPipe(writer, body); err != nil {
if errors.Is(err, io.ErrClosedPipe) {
return
}
}
log.Logger.Debug("session writer: body written to pipe")
}
}
}

func (s *Session) writeBodyToPipe(writer *io.PipeWriter, body Body) error {
bytes, err := json.Marshal(body)
if err != nil {
log.Logger.Errorf("session writer: failed to marshal body: %v", err)
return err
}

Check warning on line 257 in internal/session/session.go

View check run for this annotation

Codecov / codecov/patch

internal/session/session.go#L255-L257

Added lines #L255 - L257 were not covered by tests
if _, err := writer.Write(bytes); err != nil {
log.Logger.Errorf("session writer: failed to write to pipe: %v", err)
return err
}

Check warning on line 261 in internal/session/session.go

View check run for this annotation

Codecov / codecov/patch

internal/session/session.go#L259-L261

Added lines #L259 - L261 were not covered by tests
log.Logger.Debug("session writer: body written to pipe")
return nil
}

func (s *Session) startReader(ctx context.Context, readerExit chan any) {
goroutineCloseCh := make(chan any)
pipeFinishCh := make(chan any)
Expand All @@ -252,30 +272,15 @@
<-pipeFinishCh
close(readerExit)
}()
req, err := http.NewRequestWithContext(ctx, "POST", s.endpoint, nil)

req, err := createSessionRequest(ctx, s.endpoint, s.machineID, "read", nil)
if err != nil {
log.Logger.Debugf("session reader: error creating request: %v", err)
close(pipeFinishCh)
return
}
req.Header.Set("machine_id", s.machineID)
req.Header.Set("session_type", "read")

client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
FallbackDelay: 300 * time.Millisecond,
}).DialContext,
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
DisableKeepAlives: true,
},
}
client := createHTTPClient()
resp, err := client.Do(req)
if err != nil {
log.Logger.Debugf("session reader: error making request: %v, retrying", err)
Expand All @@ -288,31 +293,55 @@
return
}

lastPackageTimestamp := &time.Time{}
*lastPackageTimestamp = time.Now()
s.processReaderResponse(resp, goroutineCloseCh, pipeFinishCh)
}

func (s *Session) setLastPackageTimestamp(t time.Time) {
s.lastPackageTimestampMu.Lock()
defer s.lastPackageTimestampMu.Unlock()
s.lastPackageTimestamp = t
}

func (s *Session) getLastPackageTimestamp() time.Time {
s.lastPackageTimestampMu.RLock()
defer s.lastPackageTimestampMu.RUnlock()
return s.lastPackageTimestamp
}

func (s *Session) processReaderResponse(resp *http.Response, goroutineCloseCh, pipeFinishCh chan any) {
s.setLastPackageTimestamp(time.Now())

decoder := json.NewDecoder(resp.Body)
go s.handleReaderPipe(resp.Body, lastPackageTimestamp, goroutineCloseCh, pipeFinishCh)
go s.handleReaderPipe(resp.Body, goroutineCloseCh, pipeFinishCh)

for {
var content Body
err = decoder.Decode(&content)
if err != nil {
if err := decoder.Decode(&content); err != nil {
log.Logger.Errorf("session reader: error decoding response: %v", err)
break
}
select {
case <-s.closer.Done():
log.Logger.Debug("session reader: session closed, dropping message")
if !s.tryWriteToReader(content) {
return
case s.reader <- content:
*lastPackageTimestamp = time.Now()
log.Logger.Debug("session reader: request received and written to pipe")
default:
log.Logger.Errorw("session reader: reader channel full, dropping message")
}
}
}

func (s *Session) handleReaderPipe(respBody io.ReadCloser, lastPackageTimestamp *time.Time, closec, finish chan any) {
func (s *Session) tryWriteToReader(content Body) bool {
select {
case <-s.closer.Done():
log.Logger.Debug("session reader: session closed, dropping message")
return false
case s.reader <- content:
s.setLastPackageTimestamp(time.Now())
log.Logger.Debug("session reader: request received and written to pipe")
return true
default:
log.Logger.Errorw("session reader: reader channel full, dropping message")
return true

Check warning on line 340 in internal/session/session.go

View check run for this annotation

Codecov / codecov/patch

internal/session/session.go#L338-L340

Added lines #L338 - L340 were not covered by tests
}
}

func (s *Session) handleReaderPipe(respBody io.ReadCloser, closec, finish chan any) {
defer close(finish)
log.Logger.Debug("session reader: pipe handler started")
threshold := 2 * time.Minute
Expand All @@ -330,7 +359,7 @@
log.Logger.Debug("session reader: request finished, closing read pipe handler")
return
case <-ticker.C:
if time.Since(*lastPackageTimestamp) > threshold {
if time.Since(s.getLastPackageTimestamp()) > threshold {

Check warning on line 362 in internal/session/session.go

View check run for this annotation

Codecov / codecov/patch

internal/session/session.go#L362

Added line #L362 was not covered by tests
log.Logger.Debugf("session reader: exceed read wait timeout, closing read pipe handler")
return
}
Expand Down
Loading
Loading