Skip to content

Commit

Permalink
feat: delete all pods when node is deleted (#48)
Browse files Browse the repository at this point in the history
* feat: delete all pods when node is deleted

* chore: add CODEOWNERS
  • Loading branch information
aldor007 authored Nov 7, 2022
1 parent c6d54ee commit 6754391
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @castai/cast-core
68 changes: 63 additions & 5 deletions actions/delete_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,39 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
)

type deleteNodeConfig struct {
deleteRetries uint64
deleteRetryWait time.Duration
deleteRetries uint64
deleteRetryWait time.Duration
podsTerminationWait time.Duration
}

func newDeleteNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
return &deleteNodeHandler{
log: log,
clientset: clientset,
cfg: deleteNodeConfig{
deleteRetries: 5,
deleteRetryWait: 1 * time.Second,
deleteRetries: 5,
deleteRetryWait: 1 * time.Second,
podsTerminationWait: 5 * time.Second,
},
drainNodeHandler: drainNodeHandler{
log: log,
clientset: clientset,
},
}
}

type deleteNodeHandler struct {
drainNodeHandler
log logrus.FieldLogger
clientset kubernetes.Interface
cfg deleteNodeConfig
Expand All @@ -52,12 +61,61 @@ func (h *deleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAc
log.Info("deleting kubernetes node")

b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.deleteRetryWait), h.cfg.deleteRetries), ctx)
return backoff.Retry(func() error {
err := backoff.Retry(func() error {
err := h.clientset.CoreV1().Nodes().Delete(ctx, req.NodeName, metav1.DeleteOptions{})
if apierrors.IsNotFound(err) {
log.Info("node not found, skipping delete")
return nil
}
return err
}, b)

if err != nil {
return fmt.Errorf("error removing node %w", err)
}

podsListing := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podsTerminationWait), h.cfg.deleteRetries), ctx)
var pods []v1.Pod
err = backoff.Retry(func() error {
podList, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": req.NodeName}).String(),
})
if err != nil {
return err
}
pods = podList.Items
return nil

}, podsListing)

if err != nil {
return fmt.Errorf("listing node pods %w", err)
}

log.Infof("node has %d pods - removing", len(pods))

// Create delete options with grace period 0 - force delete.
deleteOptions := metav1.NewDeleteOptions(0)
deletePod := func(ctx context.Context, pod v1.Pod) error {
return h.deletePod(ctx, *deleteOptions, pod)
}

if err := h.sendPodsRequests(ctx, pods, deletePod); err != nil {
return fmt.Errorf("sending delete pods requests: %w", err)
}

// Cleanup of pods for which node has been removed. It should take a few seconds but added retry in case of network errors.
podsWait := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podsTerminationWait), h.cfg.deleteRetries), ctx)
return backoff.Retry(func() error {
pods, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": req.NodeName}).String(),
})
if err != nil {
return fmt.Errorf("unable to list pods for node %q err: %w", req.NodeName, err)
}
if len(pods.Items) > 0 {
return fmt.Errorf("waiting for %d pods to be terminated on node %v", len(pods.Items), req.NodeName)
}
return nil
}, podsWait)
}
44 changes: 41 additions & 3 deletions actions/delete_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package actions

import (
"context"
"github.com/google/uuid"
"testing"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/fields"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
Expand All @@ -16,12 +18,11 @@ import (
)

func TestDeleteNodeHandler(t *testing.T) {
r := require.New(t)

log := logrus.New()
log.SetLevel(logrus.DebugLevel)

t.Run("delete successfully", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -52,6 +53,7 @@ func TestDeleteNodeHandler(t *testing.T) {
})

t.Run("skip delete when node not found", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -79,4 +81,40 @@ func TestDeleteNodeHandler(t *testing.T) {
_, err = clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.NoError(err)
})

t.Run("delete node with pods", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
podName := "pod1"
clientset := setupFakeClientWithNodePodEviction(nodeName, podName)

action := &castai.ClusterAction{
ID: uuid.New().String(),
ActionDeleteNode: &castai.ActionDeleteNode{
NodeName: nodeName,
},
}

h := deleteNodeHandler{
log: log,
clientset: clientset,
cfg: deleteNodeConfig{
podsTerminationWait: 1,
},
drainNodeHandler: drainNodeHandler{clientset: clientset},
}

err := h.Handle(context.Background(), action)
r.NoError(err)

_, err = clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.Error(err)
r.True(apierrors.IsNotFound(err))

pods, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String(),
})
r.NoError(err)
r.Len(pods.Items, 0)
})
}

0 comments on commit 6754391

Please sign in to comment.