From 94411f2ae441a37a419d3b77edcede95b0863ad3 Mon Sep 17 00:00:00 2001 From: Damian Czaja Date: Mon, 4 Nov 2024 13:24:35 +0100 Subject: [PATCH] KUBE-644: use informers to watch CSRs (#154) --- cmd/controller/run.go | 7 +- internal/actions/csr/csr.go | 188 ++++++++++++++----------------- internal/actions/csr/csr_test.go | 159 +++++++++++++++++++------- internal/actions/csr/svc.go | 42 +++++-- internal/actions/csr/svc_test.go | 8 +- 5 files changed, 242 insertions(+), 162 deletions(-) diff --git a/cmd/controller/run.go b/cmd/controller/run.go index d70fda14..0f463345 100644 --- a/cmd/controller/run.go +++ b/cmd/controller/run.go @@ -189,9 +189,12 @@ func runController( } if isGKE { - log.Info("auto approve csr started as running on GKE") csrMgr := csr.NewApprovalManager(log, clientset) - csrMgr.Start(ctx) + if err := csrMgr.Start(ctx); err != nil { + log.WithError(err).Fatal("failed to start approval manager") + } + + log.Info("auto approve csr started as running on GKE") } svc.Run(ctx) diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index e05e7d28..b863a089 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -17,16 +17,20 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - - "github.com/castai/cluster-controller/internal/waitext" + "k8s.io/client-go/tools/cache" ) const ( ReasonApproved = "AutoApproved" approvedMessage = "This CSR was approved by CAST AI" csrTTL = time.Hour + + // We should approve CSRs, when they are created, so resync can be high. + // Resync plays back all events (create, update, delete), which are in informer cache. + // This does not involve talking to API server, it is not relist. + csrInformerResyncPeriod = 12 * time.Hour ) var ErrNodeCertificateNotFound = errors.New("node certificate not found") @@ -66,6 +70,34 @@ func (c *Certificate) Approved() bool { return false } +// Outdated returns, whether the certificate request is old and should not be processed by cluster-controller. +// It has nothing to do with certificate expiration. +func (c *Certificate) Outdated() bool { + if c.v1Beta1 != nil { + return c.v1Beta1.CreationTimestamp.Add(csrTTL).Before(time.Now()) + } + return c.v1.CreationTimestamp.Add(csrTTL).Before(time.Now()) +} + +func (c *Certificate) ForCASTAINode() bool { + if c.Name == "" { + return false + } + + if strings.HasPrefix(c.Name, "system:node") && strings.Contains(c.Name, "cast-pool") { + return true + } + + return false +} + +func (c *Certificate) NodeBootstrap() bool { + // Since we only have one handler per CSR/certificate name, + // which is the node name, we can process the controller's certificates and kubelet-bootstrap`s. + // This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved. + return c.RequestingUser == "kubelet-bootstrap" || c.RequestingUser == "system:serviceaccount:castai-agent:castai-cluster-controller" +} + func isAlreadyApproved(err error) bool { if err == nil { return false @@ -147,6 +179,17 @@ func (c *Certificate) NewCSR(ctx context.Context, client kubernetes.Interface) ( return &Certificate{v1: resp}, nil } +func startInformer(ctx context.Context, log logrus.FieldLogger, factory informers.SharedInformerFactory) { + stopCh := make(chan struct{}) + defer close(stopCh) + + factory.Start(stopCh) + log.Info("watching for new node csr") + + <-ctx.Done() + log.WithField("context", ctx.Err()).Info("finished watching for new node csr") +} + func get(ctx context.Context, client kubernetes.Interface, cert *Certificate) (*Certificate, error) { if cert.v1Beta1 != nil { v1beta1req, err := client.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, cert.v1Beta1.Name, metav1.GetOptions{}) @@ -273,144 +316,81 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam return nil, ErrNodeCertificateNotFound } -func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan *Certificate) { - var w watch.Interface - var err error - b := waitext.DefaultExponentialBackoff() - err = waitext.Retry( - ctx, - b, - waitext.Forever, - func(ctx context.Context) (bool, error) { - w, err = getWatcher(ctx, client) - // Context canceled is when the cluster-controller is stopped. - // In that case context.Canceled is not an error. - if errors.Is(err, context.Canceled) { - return false, err - } - if err != nil { - return true, fmt.Errorf("getWatcher: %w", err) - } - return false, nil - }, - func(err error) { - log.Warnf("retrying: %v", err) - }, +func createInformer(ctx context.Context, client kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer, error) { + var ( + errv1 error + errv1beta1 error ) - if err != nil { - log.Warnf("finished: %v", err) - return - } - - defer w.Stop() - log.Info("watching for new node csr") + if _, errv1 = client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1 == nil { + v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer() + return v1Factory, v1Informer, nil + } - for { - select { - case <-ctx.Done(): - return - case event, ok := <-w.ResultChan(): - if !ok { - log.Info("watcher closed") - go WatchCastAINodeCSRs(ctx, log, client, c) // start over in case of any error. - return - } + if _, errv1beta1 = client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1beta1 == nil { + v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector + })) + v1Informer := v1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer() + return v1Factory, v1Informer, nil + } - cert, err := toCertificate(event) - if err != nil { - log.Warnf("toCertificate: skipping csr event: %v", err) - continue - } + return nil, nil, fmt.Errorf("failed to create informer: v1: %w, v1beta1: %w", errv1, errv1beta1) +} - if cert == nil { - continue - } +var errUnexpectedObjectType = errors.New("unexpected object type") - if cert.Approved() { - continue - } +func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error { + cert, err := toCertificate(csrObj) + if err != nil { + return err + } - sendCertificate(ctx, c, cert) - } + if cert == nil { + return nil } -} -func getWatcher(ctx context.Context, client kubernetes.Interface) (watch.Interface, error) { - w, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, getOptions(certv1.KubeAPIServerClientKubeletSignerName)) - if err != nil { - w, err = client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName)) - if err != nil { - return nil, fmt.Errorf("fail to open v1 and v1beta watching client: %w", err) - } + if cert.Approved() || !cert.ForCASTAINode() || !cert.NodeBootstrap() || cert.Outdated() { + return nil } - return w, nil -} -var ( - errUnexpectedObjectType = errors.New("unexpected object type") - errCSRTooOld = errors.New("csr is too old") - errOwner = errors.New("owner is not bootstrap") - errNonCastAINode = errors.New("not a castai node") -) + sendCertificate(ctx, c, cert) + return nil +} -func toCertificate(event watch.Event) (cert *Certificate, err error) { +func toCertificate(obj interface{}) (cert *Certificate, err error) { var name string var request []byte - isOutdated := false - switch e := event.Object.(type) { + switch e := obj.(type) { case *certv1.CertificateSigningRequest: name = e.Name request = e.Spec.Request cert = &Certificate{Name: name, v1: e, RequestingUser: e.Spec.Username} - isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now()) case *certv1beta1.CertificateSigningRequest: name = e.Name request = e.Spec.Request cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username} - isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now()) default: return nil, errUnexpectedObjectType } - if isOutdated { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld) - } - - // Since we only have one handler per CSR/certificate name, - // which is the node name, we can process the controller's certificates and kubelet-bootstrap`s. - // This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved. - if cert.RequestingUser != "kubelet-bootstrap" && cert.RequestingUser != "system:serviceaccount:castai-agent:castai-cluster-controller" { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner) - } - cn, err := getSubjectCommonName(name, request) if err != nil { return nil, fmt.Errorf("getSubjectCommonName: Name: %v RequestingUser: %v request: %v %w", cert.Name, cert.RequestingUser, string(request), err) } - if !isCastAINodeCsr(cn) { - return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v cn: %v %w", cert.Name, cert.RequestingUser, cn, errNonCastAINode) - } cert.Name = cn return cert, nil } -func isCastAINodeCsr(subjectCommonName string) bool { - if subjectCommonName == "" { - return false - } - - if strings.HasPrefix(subjectCommonName, "system:node") && strings.Contains(subjectCommonName, "cast-pool") { - return true - } - - return false -} - -func sendCertificate(ctx context.Context, c chan *Certificate, cert *Certificate) { +func sendCertificate(ctx context.Context, c chan<- *Certificate, cert *Certificate) { select { case c <- cert: case <-ctx.Done(): diff --git a/internal/actions/csr/csr_test.go b/internal/actions/csr/csr_test.go index 0d6f9f35..b60a571b 100644 --- a/internal/actions/csr/csr_test.go +++ b/internal/actions/csr/csr_test.go @@ -3,14 +3,13 @@ package csr import ( "context" "path/filepath" - "reflect" "testing" "time" "github.com/stretchr/testify/require" certv1 "k8s.io/api/certificates/v1" + certv1beta1 "k8s.io/api/certificates/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" @@ -94,8 +93,82 @@ func Test_isCastAINodeCsr(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := isCastAINodeCsr(tt.args.subjectCommonName) - require.Equal(t, tt.want, got) + cert := &Certificate{ + Name: tt.args.subjectCommonName, + } + + require.Equal(t, tt.want, cert.ForCASTAINode()) + }) + } +} + +func Test_outdatedCertificate(t *testing.T) { + tt := map[string]struct { + createTimestamp time.Time + want bool + }{ + "Outdated": { + createTimestamp: time.Now().Add(-csrTTL).Add(-time.Second), + want: true, + }, + "Not outdated": { + createTimestamp: time.Now(), + want: false, + }, + "Outdated, right before": { + createTimestamp: time.Now().Add(-csrTTL).Add(2 * time.Second), + want: false, + }, + } + + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + cert := &Certificate{ + v1: &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(tc.createTimestamp), + }, + }, + } + require.Equal(t, tc.want, cert.Outdated()) + + certBeta := &Certificate{ + v1Beta1: &certv1beta1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(tc.createTimestamp), + }, + }, + } + require.Equal(t, tc.want, certBeta.Outdated()) + }) + } +} + +func Test_nodeBootstrap(t *testing.T) { + tt := map[string]struct { + reqUser string + want bool + }{ + "other one": { + reqUser: "dummy-user", + want: false, + }, + "kubelet-bootstrap": { + reqUser: "kubelet-bootstrap", + want: true, + }, + "castai-cluster-controller": { + reqUser: "system:serviceaccount:castai-agent:castai-cluster-controller", + want: true, + }, + } + + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + cert := &Certificate{ + RequestingUser: tc.reqUser, + } + require.Equal(t, tc.want, cert.NodeBootstrap()) }) } } @@ -104,88 +177,88 @@ func Test_toCertificate(t *testing.T) { testCSRv1 := getCSRv1("node-csr", "kubelet-bootstrap") testCSRv1beta1 := getCSRv1betav1("node-csr", "kubelet-bootstrap") type args struct { - event watch.Event + obj interface{} } tests := []struct { - name string - args args - wantCert *Certificate - wantErr bool + name string + args args + checkFunc func(t *testing.T, cert *Certificate) + wantErr bool }{ { name: "empty event", args: args{ - event: watch.Event{}, + obj: nil, }, wantErr: true, }, { name: "outdated event", args: args{ - event: watch.Event{ - Object: &certv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: time.Now().Add(-csrTTL)}, - }, + obj: &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: time.Now().Add(-csrTTL)}, }, + Spec: testCSRv1.Spec, }, }, - wantErr: true, + checkFunc: func(t *testing.T, cert *Certificate) { + require.True(t, cert.Outdated()) + }, + wantErr: false, }, { name: "bad owner", args: args{ - event: watch.Event{ - Object: &certv1.CertificateSigningRequest{ - Spec: certv1.CertificateSigningRequestSpec{ - Username: "test", - }, - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)}, - }, + obj: &certv1.CertificateSigningRequest{ + Spec: certv1.CertificateSigningRequestSpec{ + Username: "test", + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)}, }, }, }, - wantErr: true, + checkFunc: func(t *testing.T, cert *Certificate) { + require.False(t, cert.NodeBootstrap()) + }, + wantErr: false, }, { name: "ok v1", args: args{ - event: watch.Event{ - Object: testCSRv1, - }, + obj: testCSRv1, }, - wantErr: false, - wantCert: &Certificate{ - Name: "system:node:gke-dev-master-cast-pool-cb53177b", - RequestingUser: "kubelet-bootstrap", - v1: testCSRv1, + checkFunc: func(t *testing.T, cert *Certificate) { + require.Equal(t, "system:node:gke-dev-master-cast-pool-cb53177b", cert.Name) + require.Equal(t, "kubelet-bootstrap", cert.RequestingUser) + require.Equal(t, testCSRv1, cert.v1) }, + wantErr: false, }, { name: "ok v1beta1", args: args{ - event: watch.Event{ - Object: testCSRv1beta1, - }, + obj: testCSRv1beta1, }, wantErr: false, - wantCert: &Certificate{ - Name: "system:node:gke-dev-master-cast-pool-cb53177b", - RequestingUser: "kubelet-bootstrap", - v1Beta1: testCSRv1beta1, + checkFunc: func(t *testing.T, cert *Certificate) { + require.Equal(t, "system:node:gke-dev-master-cast-pool-cb53177b", cert.Name) + require.Equal(t, "kubelet-bootstrap", cert.RequestingUser) + require.Equal(t, testCSRv1beta1, cert.v1Beta1) }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotCert, err := toCertificate(tt.args.event) + gotCert, err := toCertificate(tt.args.obj) if (err != nil) != tt.wantErr { t.Errorf("toCertificate() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(gotCert, tt.wantCert) { - t.Errorf("toCertificate() gotCert = %v, want %v", gotCert, tt.wantCert) + + if tt.checkFunc != nil { + tt.checkFunc(t, gotCert) } }) } diff --git a/internal/actions/csr/svc.go b/internal/actions/csr/svc.go index bb13609d..ae18b3fb 100644 --- a/internal/actions/csr/svc.go +++ b/internal/actions/csr/svc.go @@ -11,6 +11,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/castai/cluster-controller/internal/waitext" ) @@ -35,8 +36,35 @@ type ApprovalManager struct { m sync.Mutex // Used to make sure there is just one watcher running. } -func (h *ApprovalManager) Start(ctx context.Context) { - go h.runAutoApproveForCastAINodes(ctx) +func (h *ApprovalManager) Start(ctx context.Context) error { + informerFactory, csrInformer, err := createInformer(ctx, h.clientset) + if err != nil { + return fmt.Errorf("while creating informer: %w", err) + } + + c := make(chan *Certificate, 1) + + handlerFuncs := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if err := processCSREvent(ctx, c, obj); err != nil { + h.log.WithError(err).Warn("failed to process csr add event") + } + }, + } + + if _, err := csrInformer.AddEventHandler(handlerFuncs); err != nil { + return fmt.Errorf("adding csr informer event handlers: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + if !h.startAutoApprove(cancel) { + return nil + } + + go startInformer(ctx, h.log, informerFactory) + go h.runAutoApproveForCastAINodes(ctx, c) + + return nil } func (h *ApprovalManager) Stop() { @@ -99,18 +127,10 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce return errCSRNotApproved } -func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if !h.startAutoApprove(cancel) { - return // already running. - } +func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context, c <-chan *Certificate) { defer h.stopAutoApproveForCastAINodes() log := h.log.WithField("RunAutoApprove", "auto-approve-csr") - c := make(chan *Certificate, 1) - go WatchCastAINodeCSRs(ctx, log, h.clientset, c) for { select { diff --git a/internal/actions/csr/svc_test.go b/internal/actions/csr/svc_test.go index 3ccdab62..4f592acc 100644 --- a/internal/actions/csr/svc_test.go +++ b/internal/actions/csr/svc_test.go @@ -85,7 +85,9 @@ func TestCSRApprove(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - s.Start(ctx) + if err := s.Start(ctx); err != nil { + t.Logf("failed to start approval manager: %s", err.Error()) + } }() go func() { defer wg.Done() @@ -118,7 +120,9 @@ func TestCSRApprove(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - s.Start(ctx) + if err := s.Start(ctx); err != nil { + t.Logf("failed to start approval manager: %s", err.Error()) + } }() go func() { defer wg.Done()