Skip to content

Commit

Permalink
feat: More http client configuration options (#192)
Browse files Browse the repository at this point in the history

Some other changes:
- refactor agentVersion to set it as global var in config.go
- print requestID header on HTTP failures
- use proper User-Agent header
  • Loading branch information
KarolisL authored Oct 16, 2024
1 parent fddae45 commit d8236ba
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 20 deletions.
4 changes: 2 additions & 2 deletions cmd/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func run(ctx context.Context) error {
remoteLogger := logrus.New()
remoteLogger.SetLevel(logrus.Level(cfg.Log.Level))
remoteLogger.SetFormatter(&textFormatter)
log := remoteLogger.WithField("version", ctx.Value("agentVersion").(*config.AgentVersion).Version)
log := remoteLogger.WithField("version", config.VersionInfo.Version)
if podName := os.Getenv("SELF_POD_NAME"); podName != "" {
log = log.WithField("component_pod_name", podName)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func runAgentMode(ctx context.Context, castaiclient castai.Client, log *logrus.E
ctx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()

agentVersion := ctx.Value("agentVersion").(*config.AgentVersion)
agentVersion := config.VersionInfo

// buffer will allow for all senders to push, even though we will only read first error and cancel context after it;
// all errors from exitCh are logged
Expand Down
2 changes: 1 addition & 1 deletion cmd/dump/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func run(ctx context.Context) error {

logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.Log.Level))
log := logger.WithField("version", ctx.Value("agentVersion").(*config.AgentVersion).Version)
log := logger.WithField("version", config.VersionInfo.Version)

log.Infof("starting dump of cluster snapshot")

Expand Down
2 changes: 1 addition & 1 deletion cmd/monitor/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func run(ctx context.Context) error {

remoteLogger := logrus.New()
remoteLogger.SetLevel(logrus.Level(cfg.Log.Level))
log := remoteLogger.WithField("version", ctx.Value("agentVersion").(*config.AgentVersion).Version)
log := remoteLogger.WithField("version", config.VersionInfo.Version)

localLog := logrus.New()
localLog.SetLevel(logrus.DebugLevel)
Expand Down
33 changes: 26 additions & 7 deletions internal/castai/castai.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (

const (
defaultRetryCount = 3
defaultTimeout = 10 * time.Second
sendDeltaReadTimeout = 2 * time.Minute
totalSendDeltaTimeout = 5 * time.Minute
headerAPIKey = "X-Api-Key"
headerContinuityToken = "Continuity-Token"
headerContentType = "Content-Type"
headerContentEncoding = "Content-Encoding"
headerUserAgent = "User-Agent"

respHeaderRequestID = "X-Castai-Request-Id"
)

var (
Expand Down Expand Up @@ -73,13 +73,18 @@ func NewDefaultRestyClient() (*resty.Client, error) {
}

restyClient := resty.NewWithClient(&http.Client{
Timeout: defaultTimeout,
Timeout: cfg.Timeout,
Transport: clientTransport,
})

restyClient.SetBaseURL(cfg.URL)
restyClient.SetRetryCount(defaultRetryCount)
restyClient.Header.Set(headerAPIKey, cfg.Key)
restyClient.Header.Set(headerUserAgent, fmt.Sprintf("castai-agent/%s", config.VersionInfo.Version))
if host := cfg.HostHeaderOverride; host != "" {
restyClient.Header.Set("Host", host)
}
addUA(restyClient.Header)

return restyClient, nil
}
Expand All @@ -93,7 +98,7 @@ func NewDefaultDeltaHTTPClient() (*http.Client, error) {
}

return &http.Client{
Timeout: sendDeltaReadTimeout,
Timeout: config.Get().API.DeltaReadTimeout,
Transport: clientTransport,
}, nil
}
Expand Down Expand Up @@ -194,7 +199,7 @@ func (c *client) SendDelta(ctx context.Context, clusterID string, delta *Delta)
}
}()

ctx, cancel := context.WithTimeout(ctx, totalSendDeltaTimeout)
ctx, cancel := context.WithTimeout(ctx, cfg.TotalSendDeltaTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), pipeReader)
Expand All @@ -206,6 +211,11 @@ func (c *client) SendDelta(ctx context.Context, clusterID string, delta *Delta)
req.Header.Set(headerContentEncoding, "gzip")
req.Header.Set(headerAPIKey, cfg.Key)
req.Header.Set(headerContinuityToken, c.continuityToken)
addUA(req.Header)

if host := cfg.HostHeaderOverride; host != "" {
req.Header.Set("Host", host)
}

var resp *http.Response

Expand Down Expand Up @@ -253,7 +263,8 @@ func (c *client) SendDelta(ctx context.Context, clusterID string, delta *Delta)
if strings.Contains(buf.String(), ErrInvalidContinuityToken.Error()) {
return ErrInvalidContinuityToken
}
return fmt.Errorf("delta request error status_code=%d body=%s", resp.StatusCode, buf.String())
reqID := resp.Header.Get(respHeaderRequestID)
return fmt.Errorf("delta request error request_id=%q status_code=%d body=%s", reqID, resp.StatusCode, buf.String())
}
c.continuityToken = resp.Header.Get(headerContinuityToken)
log.Infof("delta upload finished")
Expand Down Expand Up @@ -312,3 +323,11 @@ func (c *client) ExchangeAgentTelemetry(ctx context.Context, clusterID string, r

return body, nil
}

func addUA(header http.Header) {
version := "unknown"
if vi := config.VersionInfo; vi != nil {
version = vi.Version
}
header.Set(headerUserAgent, fmt.Sprintf("castai-agent/%s", version))
}
12 changes: 10 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ type Pod struct {
}

type API struct {
Key string `mapstructure:"key"`
URL string `mapstructure:"url"`
Key string `mapstructure:"key"`
URL string `mapstructure:"url"`
HostHeaderOverride string `mapstructure:"host_header_override"`
Timeout time.Duration `mapstructure:"timeout"`
DeltaReadTimeout time.Duration `mapstructure:"delta_read_timeout"`
TotalSendDeltaTimeout time.Duration `mapstructure:"total_send_delta_timeout"`
}

type EKS struct {
Expand Down Expand Up @@ -144,6 +148,10 @@ func Get() Config {
return *cfg
}

viper.SetDefault("api.timeout", 10*time.Second)
viper.SetDefault("api.delta_read_timeout", 2*time.Minute)
viper.SetDefault("api.total_send_delta_timeout", 5*time.Minute)

viper.SetDefault("controller.interval", 15*time.Second)
viper.SetDefault("controller.prep_timeout", 10*time.Minute)
viper.SetDefault("controller.memory_pressure_interval", 3*time.Second)
Expand Down
2 changes: 2 additions & 0 deletions internal/config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package config

import "fmt"

var VersionInfo *AgentVersion

type AgentVersion struct {
GitCommit, GitRef, Version string
}
Expand Down
13 changes: 10 additions & 3 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ func CollectSingleSnapshot(ctx context.Context,

defer queue.ShutDown()

agentVersion := ctx.Value("agentVersion").(*config.AgentVersion)
agentVersion := "unknown"
if vi := config.VersionInfo; vi != nil {
agentVersion = vi.Version
}

d := delta.New(log, clusterID, v.Full(), agentVersion.Version)
d := delta.New(log, clusterID, v.Full(), agentVersion)
go func() {
for {
i, _ := queue.Get()
Expand Down Expand Up @@ -649,7 +652,11 @@ func throttleLog(ctx context.Context, log logrus.FieldLogger, objType string, wa
} else {
log.Infof("Informer cache for %v synced after %v", objType, time.Since(waitStartedAt))
}
time.Sleep(window)
select {
case <-time.After(window):
case <-ctx.Done():
return
}
}
}
}()
Expand Down
6 changes: 2 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
_ "net/http/pprof"

"github.com/KimMachineGun/automemlimit/memlimit"
Expand Down Expand Up @@ -30,12 +29,11 @@ var (
)

func main() {

ctx := signals.SetupSignalHandler()
ctx = context.WithValue(ctx, "agentVersion", &config.AgentVersion{
config.VersionInfo = &config.AgentVersion{
GitCommit: GitCommit,
GitRef: GitRef,
Version: Version,
})
}
cmd.Execute(ctx)
}

0 comments on commit d8236ba

Please sign in to comment.