Skip to content

Commit

Permalink
feat: add healthz endpoint and graceful shutdown (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
saumas authored Jun 8, 2022
1 parent 6081620 commit 535015b
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 77 deletions.
10 changes: 7 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ type Static struct {
}

type Controller struct {
Interval time.Duration `mapstructure:"interval"`
PrepTimeout time.Duration `mapstructure:"prep_timeout"`
InitialSleepDuration time.Duration `mapstructure:"initial_sleep_duration"`
Interval time.Duration `mapstructure:"interval"`
PrepTimeout time.Duration `mapstructure:"prep_timeout"`
InitialSleepDuration time.Duration `mapstructure:"initial_sleep_duration"`
HealthySnapshotIntervalLimit time.Duration `mapstructure:"healthy_snapshot_interval_limit"`
InitializationTimeoutExtension time.Duration `mapstructure:"initialization_timeout_extension"`
}

var cfg *Config
Expand All @@ -95,6 +97,8 @@ func Get() Config {
viper.SetDefault("controller.interval", 15*time.Second)
viper.SetDefault("controller.prep_timeout", 10*time.Minute)
viper.SetDefault("controller.initial_sleep_duration", 30*time.Second)
viper.SetDefault("controller.healthy_snapshot_interval_limit", 10*time.Minute)
viper.SetDefault("controller.initialization_timeout_extension", 5*time.Minute)

viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Expand Down
35 changes: 23 additions & 12 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type Controller struct {
delta *delta
deltaMu sync.Mutex

agentVersion *config.AgentVersion
agentVersion *config.AgentVersion
healthzProvider *HealthzProvider
}

func New(
Expand All @@ -59,7 +60,10 @@ func New(
cfg *config.Controller,
v version.Interface,
agentVersion *config.AgentVersion,
healthzProvider *HealthzProvider,
) *Controller {
healthzProvider.Initializing()

typeInformerMap := map[reflect.Type]cache.SharedInformer{
reflect.TypeOf(&corev1.Node{}): f.Core().V1().Nodes().Informer(),
reflect.TypeOf(&corev1.Pod{}): f.Core().V1().Pods().Informer(),
Expand All @@ -86,15 +90,16 @@ func New(
}

c := &Controller{
log: log,
clusterID: clusterID,
castaiclient: castaiclient,
provider: provider,
cfg: cfg,
delta: newDelta(log, clusterID, v.Full()),
queue: workqueue.NewNamed("castai-agent"),
informers: typeInformerMap,
agentVersion: agentVersion,
log: log,
clusterID: clusterID,
castaiclient: castaiclient,
provider: provider,
cfg: cfg,
delta: newDelta(log, clusterID, v.Full()),
queue: workqueue.NewNamed("castai-agent"),
informers: typeInformerMap,
agentVersion: agentVersion,
healthzProvider: healthzProvider,
}

c.registerEventHandlers()
Expand Down Expand Up @@ -228,7 +233,7 @@ func removeSensitiveEnvVars(obj interface{}) {
}
}

func (c *Controller) Run(ctx context.Context) {
func (c *Controller) Run(ctx context.Context) error {
defer c.queue.ShutDown()

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -243,7 +248,7 @@ func (c *Controller) Run(ctx context.Context) {
c.log.Info("waiting for informers cache to sync")
if !cache.WaitForCacheSync(ctx.Done(), syncs...) {
c.log.Error("failed to sync")
return
return fmt.Errorf("failed to wait for cache sync")
}
c.log.Infof("informers cache synced after %v", time.Since(waitStartedAt))

Expand Down Expand Up @@ -282,6 +287,8 @@ func (c *Controller) Run(ctx context.Context) {
c.log.Infof("sleeping for %s before starting to send cluster deltas", c.cfg.InitialSleepDuration)
time.Sleep(c.cfg.InitialSleepDuration)

c.healthzProvider.Initialized()

c.log.Infof("sending cluster deltas every %s", c.cfg.Interval)
wait.Until(func() {
c.send(ctx)
Expand All @@ -294,6 +301,8 @@ func (c *Controller) Run(ctx context.Context) {
}()

c.pollQueueUntilShutdown()

return nil
}

// collectInitialSnapshot is used to add a time buffer to collect the initial snapshot which is larger than periodic
Expand Down Expand Up @@ -389,6 +398,8 @@ func (c *Controller) send(ctx context.Context) {
return
}

c.healthzProvider.SnapshotSent()

c.delta.clear()
}

Expand Down
23 changes: 8 additions & 15 deletions internal/services/controller/controller_exclude_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,17 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {

log := logrus.New()
log.SetLevel(logrus.DebugLevel)
ctrl := New(
log,
f,
castaiclient,
provider,
clusterID.String(),
&config.Controller{
Interval: 2 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
},
version,
agentVersion,
)
ctrl := New(log, f, castaiclient, provider, clusterID.String(), &config.Controller{
Interval: 2 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg))

f.Start(ctx.Done())

go ctrl.Run(ctx)
go func() {
require.NoError(t, ctrl.Run(ctx))
}()

wait.Until(func() {
if atomic.LoadInt64(&invocations) >= 3 {
Expand Down
31 changes: 16 additions & 15 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import (
"castai-agent/pkg/labels"
)

var defaultHealthzCfg = config.Config{Controller: &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 10 * time.Minute,
InitialSleepDuration: 30 * time.Second,
HealthySnapshotIntervalLimit: 10 * time.Minute,
InitializationTimeoutExtension: 5 * time.Minute,
}}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(
m,
Expand Down Expand Up @@ -101,23 +109,16 @@ func TestController_HappyPath(t *testing.T) {
f := informers.NewSharedInformerFactory(clientset, 0)
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
ctrl := New(
log,
f,
castaiclient,
provider,
clusterID.String(),
&config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
},
version,
agentVersion,
)
ctrl := New(log, f, castaiclient, provider, clusterID.String(), &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg))
f.Start(ctx.Done())

go ctrl.Run(ctx)
go func() {
require.NoError(t, ctrl.Run(ctx))
}()

wait.Until(func() {
if atomic.LoadInt64(&invocations) >= 1 {
Expand Down
67 changes: 67 additions & 0 deletions internal/services/controller/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package controller

import (
"fmt"
"net/http"
"time"

"castai-agent/internal/config"
)

func NewHealthzProvider(cfg config.Config) *HealthzProvider {
return &HealthzProvider{
cfg: cfg,
initHardTimeout: cfg.Controller.PrepTimeout + cfg.Controller.InitialSleepDuration + cfg.Controller.InitializationTimeoutExtension,
}
}

type HealthzProvider struct {
cfg config.Config
initHardTimeout time.Duration

initializeStartedAt *time.Time
lastHealthyActionAt *time.Time
}

func (h *HealthzProvider) Check(_ *http.Request) error {
if h.lastHealthyActionAt != nil {
if time.Since(*h.lastHealthyActionAt) > h.cfg.Controller.HealthySnapshotIntervalLimit {
return fmt.Errorf("time since initialization or last snapshot sent is over the considered healthy limit of %s", h.cfg.Controller.HealthySnapshotIntervalLimit)
}
return nil
}

if h.initializeStartedAt != nil {
if time.Since(*h.initializeStartedAt) > h.initHardTimeout {
return fmt.Errorf("controller initialization is taking longer than the hard timeout of %s", h.initHardTimeout)
}
return nil
}

return fmt.Errorf("healthz not initialized")
}

func (h *HealthzProvider) Initializing() {
if h.initializeStartedAt == nil {
h.initializeStartedAt = nowPtr()
h.lastHealthyActionAt = nil
}
}

func (h *HealthzProvider) Initialized() {
h.healthyAction()
}

func (h *HealthzProvider) SnapshotSent() {
h.healthyAction()
}

func (h *HealthzProvider) healthyAction() {
h.initializeStartedAt = nil
h.lastHealthyActionAt = nowPtr()
}

func nowPtr() *time.Time {
now := time.Now()
return &now
}
74 changes: 74 additions & 0 deletions internal/services/controller/healthz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package controller

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"castai-agent/internal/config"
)

func TestNewHealthzProvider(t *testing.T) {
t.Run("unhealthy statuses", func(t *testing.T) {
cfg := config.Config{Controller: &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: time.Millisecond,
InitialSleepDuration: time.Millisecond,
InitializationTimeoutExtension: time.Millisecond,
HealthySnapshotIntervalLimit: time.Millisecond,
}}

h := NewHealthzProvider(cfg)

t.Run("should return not initialized error", func(t *testing.T) {
require.Error(t, h.Check(nil))
})

t.Run("should return initialize timeout error", func(t *testing.T) {
h.Initializing()

time.Sleep(5 * time.Millisecond)

require.Error(t, h.Check(nil))
})

t.Run("should return snapshot timeout error", func(t *testing.T) {
h.healthyAction()

time.Sleep(5 * time.Millisecond)

require.Error(t, h.Check(nil))
})
})

t.Run("healthy statuses", func(t *testing.T) {
cfg := config.Config{Controller: &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 10 * time.Minute,
InitialSleepDuration: 30 * time.Second,
InitializationTimeoutExtension: 5 * time.Minute,
HealthySnapshotIntervalLimit: 10 * time.Minute,
}}

h := NewHealthzProvider(cfg)

t.Run("should return no error when still initializing", func(t *testing.T) {
h.Initializing()

require.NoError(t, h.Check(nil))
})

t.Run("should return no error when timeout after initialization has not yet passed", func(t *testing.T) {
h.Initialized()

require.NoError(t, h.Check(nil))
})

t.Run("should return no error when time since last snapshot has not been long", func(t *testing.T) {
h.SnapshotSent()

require.NoError(t, h.Check(nil))
})
})
}
Loading

0 comments on commit 535015b

Please sign in to comment.