From af61d36f5f20a7f3b07b8430a400073eb20c411c Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 1 Sep 2022 14:22:09 +0200 Subject: [PATCH] operator: Use hive and the k8s-client This switches the operator to use the k8-client provided via the k8s-client cell instead of going via the global variables in pkg/k8s. Code is refactored to pass around the clientset. Signed-off-by: Jussi Maki --- .../cmdref/cilium-operator-alibabacloud.md | 2 +- Documentation/cmdref/cilium-operator-aws.md | 2 +- Documentation/cmdref/cilium-operator-azure.md | 2 +- .../cmdref/cilium-operator-generic.md | 2 +- Documentation/cmdref/cilium-operator.md | 2 +- operator/cmd/ccnp_event.go | 5 +- operator/cmd/cilium_node.go | 31 +-- operator/cmd/cnp_event.go | 5 +- operator/cmd/flags.go | 18 -- operator/cmd/k8s_cep_gc.go | 16 +- operator/cmd/k8s_identity.go | 23 +- operator/cmd/k8s_pod_controller.go | 10 +- operator/cmd/root.go | 210 ++++++++---------- .../pkg/ciliumendpointslice/endpointslice.go | 12 +- operator/pkg/ingress/endpoint.go | 6 +- operator/pkg/ingress/envoy_config.go | 6 +- operator/pkg/ingress/ingress.go | 28 ++- operator/pkg/ingress/secret.go | 7 +- operator/pkg/ingress/service.go | 5 +- operator/watchers/bgp.go | 5 +- operator/watchers/cilium_endpoint.go | 10 +- operator/watchers/cilium_node_gc.go | 8 +- operator/watchers/k8s_service_sync.go | 20 +- operator/watchers/node_taint.go | 34 +-- operator/watchers/pod.go | 10 +- pkg/k8s/client/cell.go | 6 +- pkg/k8s/client/config.go | 5 + pkg/option/config.go | 23 -- test/controlplane/suite/agent.go | 14 +- test/controlplane/suite/testcase.go | 43 ++-- 30 files changed, 258 insertions(+), 312 deletions(-) diff --git a/Documentation/cmdref/cilium-operator-alibabacloud.md b/Documentation/cmdref/cilium-operator-alibabacloud.md index a71c68064bf21..2cec09d1327a1 100644 --- a/Documentation/cmdref/cilium-operator-alibabacloud.md +++ b/Documentation/cmdref/cilium-operator-alibabacloud.md @@ -35,7 +35,7 @@ cilium-operator-alibabacloud [flags] --enable-k8s-endpoint-slice Enables k8s EndpointSlice feature into Cilium-Operator if the k8s cluster supports it (default true) --enable-k8s-event-handover Enable k8s event handover to kvstore for improved scalability --enable-metrics Enable Prometheus metrics - --gops-port int Port for gops server to listen on (default 9891) + --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for cilium-operator-alibabacloud --identity-allocation-mode string Method to use for identity allocation (default "kvstore") --identity-gc-interval duration GC interval for security identities (default 15m0s) diff --git a/Documentation/cmdref/cilium-operator-aws.md b/Documentation/cmdref/cilium-operator-aws.md index c032b924638c4..b9da5493d154e 100644 --- a/Documentation/cmdref/cilium-operator-aws.md +++ b/Documentation/cmdref/cilium-operator-aws.md @@ -41,7 +41,7 @@ cilium-operator-aws [flags] --enable-metrics Enable Prometheus metrics --eni-tags map ENI tags in the form of k1=v1 (multiple k/v pairs can be passed by repeating the CLI flag) --excess-ip-release-delay int Number of seconds operator would wait before it releases an IP previously marked as excess (default 180) - --gops-port int Port for gops server to listen on (default 9891) + --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for cilium-operator-aws --identity-allocation-mode string Method to use for identity allocation (default "kvstore") --identity-gc-interval duration GC interval for security identities (default 15m0s) diff --git a/Documentation/cmdref/cilium-operator-azure.md b/Documentation/cmdref/cilium-operator-azure.md index 2281c1529caba..f413814a6fcc7 100644 --- a/Documentation/cmdref/cilium-operator-azure.md +++ b/Documentation/cmdref/cilium-operator-azure.md @@ -38,7 +38,7 @@ cilium-operator-azure [flags] --enable-k8s-endpoint-slice Enables k8s EndpointSlice feature into Cilium-Operator if the k8s cluster supports it (default true) --enable-k8s-event-handover Enable k8s event handover to kvstore for improved scalability --enable-metrics Enable Prometheus metrics - --gops-port int Port for gops server to listen on (default 9891) + --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for cilium-operator-azure --identity-allocation-mode string Method to use for identity allocation (default "kvstore") --identity-gc-interval duration GC interval for security identities (default 15m0s) diff --git a/Documentation/cmdref/cilium-operator-generic.md b/Documentation/cmdref/cilium-operator-generic.md index 2b91e89878e7b..11736908fae9b 100644 --- a/Documentation/cmdref/cilium-operator-generic.md +++ b/Documentation/cmdref/cilium-operator-generic.md @@ -34,7 +34,7 @@ cilium-operator-generic [flags] --enable-k8s-endpoint-slice Enables k8s EndpointSlice feature into Cilium-Operator if the k8s cluster supports it (default true) --enable-k8s-event-handover Enable k8s event handover to kvstore for improved scalability --enable-metrics Enable Prometheus metrics - --gops-port int Port for gops server to listen on (default 9891) + --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for cilium-operator-generic --identity-allocation-mode string Method to use for identity allocation (default "kvstore") --identity-gc-interval duration GC interval for security identities (default 15m0s) diff --git a/Documentation/cmdref/cilium-operator.md b/Documentation/cmdref/cilium-operator.md index af955f2069d3f..64c9e815ee085 100644 --- a/Documentation/cmdref/cilium-operator.md +++ b/Documentation/cmdref/cilium-operator.md @@ -46,7 +46,7 @@ cilium-operator [flags] --enable-metrics Enable Prometheus metrics --eni-tags map ENI tags in the form of k1=v1 (multiple k/v pairs can be passed by repeating the CLI flag) --excess-ip-release-delay int Number of seconds operator would wait before it releases an IP previously marked as excess (default 180) - --gops-port int Port for gops server to listen on (default 9891) + --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for cilium-operator --identity-allocation-mode string Method to use for identity allocation (default "kvstore") --identity-gc-interval duration GC interval for security identities (default 15m0s) diff --git a/operator/cmd/ccnp_event.go b/operator/cmd/ccnp_event.go index d5f5c09e499cc..12a1384070877 100644 --- a/operator/cmd/ccnp_event.go +++ b/operator/cmd/ccnp_event.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/k8s/watchers/resources" @@ -31,7 +32,7 @@ func k8sEventMetric(scope, action string) { // clusterwide policies. Since, internally Clusterwide policies are implemented // using CiliumNetworkPolicy itself, the entire implementation uses the methods // associcated with CiliumNetworkPolicy. -func enableCCNPWatcher() error { +func enableCCNPWatcher(clientset k8sClient.Clientset) error { enableCNPStatusUpdates := kvstoreEnabled() && option.Config.K8sEventHandover && !option.Config.DisableCNPStatusUpdates if enableCNPStatusUpdates { log.Info("Starting a CCNP Status handover from kvstore to k8s") @@ -63,7 +64,7 @@ func enableCCNPWatcher() error { } ciliumV2Controller := informer.NewInformerWithStore( - utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](ciliumK8sClient.CiliumV2().CiliumClusterwideNetworkPolicies()), + utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](clientset.CiliumV2().CiliumClusterwideNetworkPolicies()), &cilium_v2.CiliumClusterwideNetworkPolicy{}, 0, cache.ResourceEventHandlerFuncs{ diff --git a/operator/cmd/cilium_node.go b/operator/cmd/cilium_node.go index 75690d815bab7..2028461984645 100644 --- a/operator/cmd/cilium_node.go +++ b/operator/cmd/cilium_node.go @@ -23,6 +23,7 @@ import ( "github.com/cilium/cilium/pkg/ipam/allocator" "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" v2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2" "github.com/cilium/cilium/pkg/k8s/informer" v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1" @@ -50,7 +51,7 @@ var ( k8sCiliumNodesCacheSynced = make(chan struct{}) ) -func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.NodeEventHandler, withKVStore bool) error { +func startSynchronizingCiliumNodes(ctx context.Context, clientset k8sClient.Clientset, nodeManager allocator.NodeEventHandler, withKVStore bool) error { var ( ciliumNodeKVStore *store.SharedStore err error @@ -208,7 +209,7 @@ func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.No // introducing a slim version of it. var ciliumNodeInformer cache.Controller ciliumNodeStore, ciliumNodeInformer = informer.NewInformer( - utils.ListerWatcherFromTyped[*cilium_v2.CiliumNodeList](ciliumK8sClient.CiliumV2().CiliumNodes()), + utils.ListerWatcherFromTyped[*cilium_v2.CiliumNodeList](clientset.CiliumV2().CiliumNodes()), &cilium_v2.CiliumNode{}, 0, resourceEventHandler, @@ -313,39 +314,41 @@ func processNextWorkItem(queue workqueue.RateLimitingInterface, syncHandler func return true } -type ciliumNodeUpdateImplementation struct{} +type ciliumNodeUpdateImplementation struct { + clientset k8sClient.Clientset +} func (c *ciliumNodeUpdateImplementation) Create(node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) { - return ciliumK8sClient.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{}) + return c.clientset.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{}) } func (c *ciliumNodeUpdateImplementation) Get(node string) (*cilium_v2.CiliumNode, error) { - return ciliumK8sClient.CiliumV2().CiliumNodes().Get(context.TODO(), node, meta_v1.GetOptions{}) + return c.clientset.CiliumV2().CiliumNodes().Get(context.TODO(), node, meta_v1.GetOptions{}) } func (c *ciliumNodeUpdateImplementation) UpdateStatus(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) { if origNode == nil || !origNode.Status.DeepEqual(&node.Status) { - return ciliumK8sClient.CiliumV2().CiliumNodes().UpdateStatus(context.TODO(), node, meta_v1.UpdateOptions{}) + return c.clientset.CiliumV2().CiliumNodes().UpdateStatus(context.TODO(), node, meta_v1.UpdateOptions{}) } return nil, nil } func (c *ciliumNodeUpdateImplementation) Update(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) { if origNode == nil || !origNode.Spec.DeepEqual(&node.Spec) { - return ciliumK8sClient.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{}) + return c.clientset.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{}) } return nil, nil } -func RunCNPNodeStatusGC(nodeStore cache.Store) { - go runCNPNodeStatusGC("cnp-node-gc", false, nodeStore) - go runCNPNodeStatusGC("ccnp-node-gc", true, nodeStore) +func RunCNPNodeStatusGC(clientset k8sClient.Clientset, nodeStore cache.Store) { + go runCNPNodeStatusGC("cnp-node-gc", false, clientset, nodeStore) + go runCNPNodeStatusGC("ccnp-node-gc", true, clientset, nodeStore) } // runCNPNodeStatusGC runs the node status garbage collector for cilium network // policies. The policy corresponds to CiliumClusterwideNetworkPolicy if the clusterwide // parameter is true and CiliumNetworkPolicy otherwise. -func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) { +func runCNPNodeStatusGC(name string, clusterwide bool, clientset k8sClient.Clientset, nodeStore cache.Store) { parallelRequests := 4 removeNodeFromCNP := make(chan func(), 50) for i := 0; i < parallelRequests; i++ { @@ -369,7 +372,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) { var cnpItemsList []cilium_v2.CiliumNetworkPolicy if clusterwide { - ccnpList, err := ciliumK8sClient.CiliumV2().CiliumClusterwideNetworkPolicies().List(ctx, + ccnpList, err := clientset.CiliumV2().CiliumClusterwideNetworkPolicies().List(ctx, meta_v1.ListOptions{ Limit: 10, Continue: continueID, @@ -386,7 +389,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) { } continueID = ccnpList.Continue } else { - cnpList, err := ciliumK8sClient.CiliumV2().CiliumNetworkPolicies(core_v1.NamespaceAll).List(ctx, + cnpList, err := clientset.CiliumV2().CiliumNetworkPolicies(core_v1.NamespaceAll).List(ctx, meta_v1.ListOptions{ Limit: 10, Continue: continueID, @@ -421,7 +424,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) { wg.Add(1) cnpCpy := cnp.DeepCopy() removeNodeFromCNP <- func() { - updateCNP(ciliumK8sClient.CiliumV2(), cnpCpy, nodesToDelete) + updateCNP(clientset.CiliumV2(), cnpCpy, nodesToDelete) wg.Done() } } diff --git a/operator/cmd/cnp_event.go b/operator/cmd/cnp_event.go index 58977e13f40e2..493dc1f92c648 100644 --- a/operator/cmd/cnp_event.go +++ b/operator/cmd/cnp_event.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/k8s/watchers/resources" @@ -36,7 +37,7 @@ func init() { // enableCNPWatcher waits for the CiliumNetowrkPolicy CRD availability and then // garbage collects stale CiliumNetowrkPolicy status field entries. -func enableCNPWatcher() error { +func enableCNPWatcher(clientset k8sClient.Clientset) error { enableCNPStatusUpdates := kvstoreEnabled() && option.Config.K8sEventHandover && !option.Config.DisableCNPStatusUpdates if enableCNPStatusUpdates { log.Info("Starting CNP Status handover from kvstore to k8s") @@ -68,7 +69,7 @@ func enableCNPWatcher() error { } ciliumV2Controller := informer.NewInformerWithStore( - utils.ListerWatcherFromTyped[*cilium_v2.CiliumNetworkPolicyList](ciliumK8sClient.CiliumV2().CiliumNetworkPolicies("")), + utils.ListerWatcherFromTyped[*cilium_v2.CiliumNetworkPolicyList](clientset.CiliumV2().CiliumNetworkPolicies("")), &cilium_v2.CiliumNetworkPolicy{}, 0, cache.ResourceEventHandlerFuncs{ diff --git a/operator/cmd/flags.go b/operator/cmd/flags.go index 5bdd58334c96b..2712102639826 100644 --- a/operator/cmd/flags.go +++ b/operator/cmd/flags.go @@ -239,24 +239,12 @@ func init() { option.KVStoreOpt, "Key-value store options e.g. etcd.address=127.0.0.1:4001") option.BindEnv(Vp, option.KVStoreOpt) - flags.String(option.K8sAPIServer, "", "Kubernetes API server URL") - option.BindEnv(Vp, option.K8sAPIServer) - - flags.Float32(option.K8sClientQPSLimit, defaults.K8sClientQPSLimit, "Queries per second limit for the K8s client") - flags.Int(option.K8sClientBurst, defaults.K8sClientBurst, "Burst value allowed for the K8s client") - flags.Bool(option.K8sEnableEndpointSlice, defaults.K8sEnableEndpointSlice, "Enables k8s EndpointSlice feature into Cilium-Operator if the k8s cluster supports it") option.BindEnv(Vp, option.K8sEnableEndpointSlice) - flags.Bool(option.K8sEnableAPIDiscovery, defaults.K8sEnableAPIDiscovery, "Enable discovery of Kubernetes API groups and resources with the discovery API") - option.BindEnv(Vp, option.K8sEnableAPIDiscovery) - flags.String(option.K8sNamespaceName, "", "Name of the Kubernetes namespace in which Cilium Operator is deployed in") option.BindEnv(Vp, option.K8sNamespaceName) - flags.String(option.K8sKubeConfigPath, "", "Absolute path of the kubernetes kubeconfig file") - option.BindEnv(Vp, option.K8sKubeConfigPath) - flags.Duration(operatorOption.NodesGCInterval, 5*time.Minute, "GC interval for CiliumNodes") option.BindEnv(Vp, operatorOption.NodesGCInterval) @@ -288,12 +276,6 @@ func init() { flags.MarkHidden(option.CMDRef) option.BindEnv(Vp, option.CMDRef) - flags.Int(option.GopsPort, defaults.GopsPortOperator, "Port for gops server to listen on") - option.BindEnv(Vp, option.GopsPort) - - flags.Duration(option.K8sHeartbeatTimeout, 30*time.Second, "Configures the timeout for api-server heartbeat, set to 0 to disable") - option.BindEnv(Vp, option.K8sHeartbeatTimeout) - flags.Duration(operatorOption.LeaderElectionLeaseDuration, 15*time.Second, "Duration that non-leader operator candidates will wait before forcing to acquire leadership") option.BindEnv(Vp, operatorOption.LeaderElectionLeaseDuration) diff --git a/operator/cmd/k8s_cep_gc.go b/operator/cmd/k8s_cep_gc.go index ae9cce93205aa..55a476ea94bf2 100644 --- a/operator/cmd/k8s_cep_gc.go +++ b/operator/cmd/k8s_cep_gc.go @@ -15,8 +15,8 @@ import ( operatorOption "github.com/cilium/cilium/operator/option" "github.com/cilium/cilium/operator/watchers" "github.com/cilium/cilium/pkg/controller" - "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" k8sUtils "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/logging/logfields" @@ -32,7 +32,7 @@ import ( // delete CEP if the corresponding pod does not exist // // CiliumEndpoint objects have the same name as the pod they represent -func enableCiliumEndpointSyncGC(once bool) { +func enableCiliumEndpointSyncGC(clientset k8sClient.Clientset, once bool) { var ( controllerName = "to-k8s-ciliumendpoint-gc" scopedLog = log.WithField("controller", controllerName) @@ -40,8 +40,6 @@ func enableCiliumEndpointSyncGC(once bool) { stopCh = make(chan struct{}) ) - ciliumClient := ciliumK8sClient.CiliumV2() - if once { log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources") gcInterval = 0 @@ -51,12 +49,12 @@ func enableCiliumEndpointSyncGC(once bool) { } // This functions will block until the resources are synced with k8s. - watchers.CiliumEndpointsInit(ciliumClient, stopCh) + watchers.CiliumEndpointsInit(clientset, stopCh) if !once { // If we are running this function "once" it means that we // will delete all CEPs in the cluster regardless of the pod // state. - watchers.PodsInit(k8s.WatcherClient(), stopCh) + watchers.PodsInit(clientset, stopCh) } <-k8sCiliumNodesCacheSynced @@ -65,13 +63,13 @@ func enableCiliumEndpointSyncGC(once bool) { controller.ControllerParams{ RunInterval: gcInterval, DoFunc: func(ctx context.Context) error { - return doCiliumEndpointSyncGC(ctx, once, stopCh, scopedLog) + return doCiliumEndpointSyncGC(ctx, clientset, once, stopCh, scopedLog) }, }) } -func doCiliumEndpointSyncGC(ctx context.Context, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error { - ciliumClient := ciliumK8sClient.CiliumV2() +func doCiliumEndpointSyncGC(ctx context.Context, clientset k8sClient.Clientset, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error { + ciliumClient := clientset.CiliumV2() // For each CEP we fetched, check if we know about it for _, cepObj := range watchers.CiliumEndpointStore.List() { cep, ok := cepObj.(*cilium_v2.CiliumEndpoint) diff --git a/operator/cmd/k8s_identity.go b/operator/cmd/k8s_identity.go index f4259e9f1e598..53ecd88366fb6 100644 --- a/operator/cmd/k8s_identity.go +++ b/operator/cmd/k8s_identity.go @@ -18,6 +18,7 @@ import ( "github.com/cilium/cilium/operator/watchers" "github.com/cilium/cilium/pkg/controller" v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/identitybackend" "github.com/cilium/cilium/pkg/k8s/informer" "github.com/cilium/cilium/pkg/k8s/utils" @@ -28,13 +29,13 @@ var identityStore cache.Store // deleteIdentity deletes an identity. It includes the resource version and // will error if the object has since been changed. -func deleteIdentity(ctx context.Context, identity *v2.CiliumIdentity) error { +func deleteIdentity(ctx context.Context, clientset k8sClient.Clientset, identity *v2.CiliumIdentity) error { // Wait until we can delete an identity err := identityRateLimiter.Wait(ctx) if err != nil { return err } - err = ciliumK8sClient.CiliumV2().CiliumIdentities().Delete( + err = clientset.CiliumV2().CiliumIdentities().Delete( ctx, identity.Name, metav1.DeleteOptions{ @@ -52,8 +53,8 @@ func deleteIdentity(ctx context.Context, identity *v2.CiliumIdentity) error { return err } -func updateIdentity(ctx context.Context, identity *v2.CiliumIdentity) error { - _, err := ciliumK8sClient.CiliumV2().CiliumIdentities().Update( +func updateIdentity(ctx context.Context, clientset k8sClient.Clientset, identity *v2.CiliumIdentity) error { + _, err := clientset.CiliumV2().CiliumIdentities().Update( ctx, identity, metav1.UpdateOptions{}) @@ -77,7 +78,7 @@ var ( // identityGCIteration is a single iteration of a garbage collection. It will // delete identities that have not had its heartbeat lifesign updated since // option.Config.IdentityHeartbeatTimeout -func identityGCIteration(ctx context.Context) { +func identityGCIteration(ctx context.Context, clientset k8sClient.Clientset) { log.Debug("Running CRD identity garbage collector") if identityStore == nil { @@ -118,7 +119,7 @@ func identityGCIteration(ctx context.Context) { } log.WithField(logfields.Identity, identity).Info("Marking identity for later deletion") identity.Annotations[identitybackend.HeartBeatAnnotation] = timeNow.Format(time.RFC3339Nano) - err := updateIdentity(ctx, identity) + err := updateIdentity(ctx, clientset, identity) if err != nil { log.WithError(err). WithField(logfields.Identity, identity). @@ -130,7 +131,7 @@ func identityGCIteration(ctx context.Context) { log.WithFields(logrus.Fields{ logfields.Identity: identity, }).Debugf("Deleting unused identity; marked for deletion at %s", ts) - if err := deleteIdentity(ctx, identity); err != nil { + if err := deleteIdentity(ctx, clientset, identity); err != nil { log.WithError(err).WithFields(logrus.Fields{ logfields.Identity: identity, }).Error("Deleting unused identity") @@ -160,7 +161,7 @@ func identityGCIteration(ctx context.Context) { identityHeartbeat.GC() } -func startCRDIdentityGC() { +func startCRDIdentityGC(clientset k8sClient.Clientset) { if operatorOption.Config.EndpointGCInterval == 0 { log.Fatal("The CiliumIdentity garbage collector requires the CiliumEndpoint garbage collector to be enabled") } @@ -171,18 +172,18 @@ func startCRDIdentityGC() { controller.ControllerParams{ RunInterval: operatorOption.Config.IdentityGCInterval, DoFunc: func(ctx context.Context) error { - identityGCIteration(ctx) + identityGCIteration(ctx, clientset) return ctx.Err() }, }) } -func startManagingK8sIdentities() { +func startManagingK8sIdentities(clientset k8sClient.Clientset) { identityHeartbeat = identity.NewIdentityHeartbeatStore(operatorOption.Config.IdentityHeartbeatTimeout) identityStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) identityInformer := informer.NewInformerWithStore( - utils.ListerWatcherFromTyped[*v2.CiliumIdentityList](ciliumK8sClient.CiliumV2().CiliumIdentities()), + utils.ListerWatcherFromTyped[*v2.CiliumIdentityList](clientset.CiliumV2().CiliumIdentities()), &v2.CiliumIdentity{}, 0, cache.ResourceEventHandlerFuncs{ diff --git a/operator/cmd/k8s_pod_controller.go b/operator/cmd/k8s_pod_controller.go index e3de67cc80255..760a261420c05 100644 --- a/operator/cmd/k8s_pod_controller.go +++ b/operator/cmd/k8s_pod_controller.go @@ -15,7 +15,7 @@ import ( operatorOption "github.com/cilium/cilium/operator/option" "github.com/cilium/cilium/operator/watchers" "github.com/cilium/cilium/pkg/controller" - "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" "github.com/cilium/cilium/pkg/logging/logfields" ) @@ -29,10 +29,10 @@ var ( lastPodRestart = map[string]time.Time{} ) -func enableUnmanagedKubeDNSController() { +func enableUnmanagedKubeDNSController(clientset k8sClient.Clientset) { // These functions will block until the resources are synced with k8s. - watchers.CiliumEndpointsInit(k8s.CiliumClient().CiliumV2(), wait.NeverStop) - watchers.UnmanagedKubeDNSPodsInit(k8s.WatcherClient()) + watchers.CiliumEndpointsInit(clientset, wait.NeverStop) + watchers.UnmanagedKubeDNSPodsInit(clientset) controller.NewManager().UpdateController("restart-unmanaged-kube-dns", controller.ControllerParams{ @@ -77,7 +77,7 @@ func enableUnmanagedKubeDNSController() { } log.WithField(logfields.K8sPodName, podID).Infof("Restarting unmanaged kube-dns pod, started %s ago", age) - if err := k8s.Client().CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { + if err := clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { log.WithError(err).WithField(logfields.K8sPodName, podID).Warning("Unable to restart pod") } else { lastPodRestart[podID] = time.Now() diff --git a/operator/cmd/root.go b/operator/cmd/root.go index b1f34697dab94..4169c4929d172 100644 --- a/operator/cmd/root.go +++ b/operator/cmd/root.go @@ -9,45 +9,44 @@ import ( "os" "os/signal" "path/filepath" - "sync" "sync/atomic" "time" - gops "github.com/google/gops/agent" - "golang.org/x/sys/unix" - - "github.com/cilium/cilium/operator/api" - operatorMetrics "github.com/cilium/cilium/operator/metrics" - ces "github.com/cilium/cilium/operator/pkg/ciliumendpointslice" - - "github.com/cilium/cilium/pkg/components" - ipamOption "github.com/cilium/cilium/pkg/ipam/option" - "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/client" - clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" - k8sversion "github.com/cilium/cilium/pkg/k8s/version" - "github.com/cilium/cilium/pkg/logging" - "github.com/cilium/cilium/pkg/metrics" - "github.com/cilium/cilium/pkg/pprof" - "github.com/cilium/cilium/pkg/rand" - "github.com/cilium/cilium/pkg/version" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "go.uber.org/fx" + "golang.org/x/sys/unix" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "github.com/cilium/cilium/operator/api" + operatorMetrics "github.com/cilium/cilium/operator/metrics" operatorOption "github.com/cilium/cilium/operator/option" + ces "github.com/cilium/cilium/operator/pkg/ciliumendpointslice" "github.com/cilium/cilium/operator/pkg/ingress" operatorWatchers "github.com/cilium/cilium/operator/watchers" + + "github.com/cilium/cilium/pkg/components" + "github.com/cilium/cilium/pkg/gops" + "github.com/cilium/cilium/pkg/hive" "github.com/cilium/cilium/pkg/ipam/allocator" + ipamOption "github.com/cilium/cilium/pkg/ipam/option" "github.com/cilium/cilium/pkg/k8s" + "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/client" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" + k8sversion "github.com/cilium/cilium/pkg/k8s/version" "github.com/cilium/cilium/pkg/kvstore" + "github.com/cilium/cilium/pkg/logging" "github.com/cilium/cilium/pkg/logging/logfields" + "github.com/cilium/cilium/pkg/metrics" "github.com/cilium/cilium/pkg/option" + "github.com/cilium/cilium/pkg/pprof" + "github.com/cilium/cilium/pkg/rand" "github.com/cilium/cilium/pkg/rate" + "github.com/cilium/cilium/pkg/version" ) var ( @@ -64,27 +63,12 @@ var ( genMarkdown(cobraCmd, cmdRefDir) os.Exit(0) } - - // Open socket for using gops to get stacktraces of the agent. - addr := fmt.Sprintf("127.0.0.1:%d", Vp.GetInt(option.GopsPort)) - addrField := logrus.Fields{"address": addr} - if err := gops.Listen(gops.Options{ - Addr: addr, - ReuseSocketAddrAndPort: true, - }); err != nil { - log.WithError(err).WithFields(addrField).Fatal("Cannot start gops server") - } - log.WithFields(addrField).Info("Started gops server") - - initEnv() - runOperator() + operatorHive.Run() }, } shutdownSignal = make(chan struct{}) - ciliumK8sClient clientset.Interface - leaderElectionResourceLockName = "cilium-operator-resource-lock" // Use a Go context so we can tell the leaderelection code when we @@ -109,7 +93,7 @@ var ( // elected leader. Otherwise, it is false. IsLeader atomic.Value - doOnce sync.Once + operatorHive *hive.Hive ) func Execute() { @@ -118,7 +102,6 @@ func Execute() { go func() { <-signals - doCleanup(0) }() if err := rootCmd.Execute(); err != nil { @@ -127,15 +110,45 @@ func Execute() { } } +var operatorCell = hive.NewCell( + "operator", + fx.Invoke(registerOperatorHooks), +) + +func registerOperatorHooks(lc fx.Lifecycle, clientset k8sClient.Clientset, shutdowner fx.Shutdowner) { + k8s.SetClients(clientset, clientset.Slim(), clientset, clientset) + initEnv() + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go runOperator(clientset, shutdowner) + return nil + }, + OnStop: func(context.Context) error { + doCleanup() + return nil + }, + }) +} + func init() { rootCmd.AddCommand(MetricsCmd) - Populate() + + operatorHive = hive.New( + Vp, + rootCmd.Flags(), + + gops.Cell, + k8sClient.Cell, + operatorCell, + ) } func initEnv() { // Prepopulate option.Config with options from CLI. option.Config.Populate(Vp) operatorOption.Config.Populate(Vp) + operatorAddr = Vp.GetString(operatorOption.OperatorAPIServeAddr) // add hooks after setting up metrics in the option.Confog logging.DefaultLogger.Hooks.Add(metrics.NewLoggingHook(components.CiliumOperatortName)) @@ -146,45 +159,19 @@ func initEnv() { } option.LogRegisteredOptions(Vp, log) + // Enable fallback to direct API probing to check for support of Leases in // case Discovery API fails. - //option.Config.EnableK8sLeasesFallbackDiscovery() + Vp.Set(option.K8sEnableAPIDiscovery, true) } -func initK8s(k8sInitDone chan struct{}) { - /* TODO: Fixed in next commit. - k8s.Configure( - option.Config.K8sAPIServer, - option.Config.K8sKubeConfigPath, - float32(option.Config.K8sClientQPSLimit), - option.Config.K8sClientBurst, - ) - - if err := k8s.Init(option.Config); err != nil { - log.WithError(err).Fatal("Unable to connect to Kubernetes apiserver") - }*/ +func doCleanup() { + IsLeader.Store(false) + close(shutdownSignal) - close(k8sInitDone) -} - -func doCleanup(exitCode int) { - // We run the cleanup logic only once. The operator is assumed to exit - // once the cleanup logic is executed. - doOnce.Do(func() { - IsLeader.Store(false) - gops.Close() - close(shutdownSignal) - - // Cancelling this conext here makes sure that if the operator hold the - // leader lease, it will be released. - leaderElectionCtxCancel() - - // If the exit code is set to 0, then we assume that the operator will - // exit gracefully once the lease has been released. - if exitCode != 0 { - os.Exit(exitCode) - } - }) + // Cancelling this conext here makes sure that if the operator hold the + // leader lease, it will be released. + leaderElectionCtxCancel() } func getAPIServerAddr() []string { @@ -196,7 +183,7 @@ func getAPIServerAddr() []string { // checkStatus checks the connection status to the kvstore and // k8s apiserver and returns an error if any of them is unhealthy -func checkStatus() error { +func checkStatus(clientset k8sClient.Clientset) error { if kvstoreEnabled() { // We check if we are the leader here because only the leader has // access to the kvstore client. Otherwise, the kvstore client check @@ -212,7 +199,7 @@ func checkStatus() error { } } - if _, err := k8s.Client().Discovery().ServerVersion(); err != nil { + if _, err := clientset.Discovery().ServerVersion(); err != nil { return err } @@ -222,19 +209,20 @@ func checkStatus() error { // runOperator implements the logic of leader election for cilium-operator using // built-in leader election capbility in kubernetes. // See: https://github.com/kubernetes/client-go/blob/master/examples/leader-election/main.go -func runOperator() { +func runOperator(clientset k8sClient.Clientset, shutdowner fx.Shutdowner) { log.Infof("Cilium Operator %s", version.Version) - k8sInitDone := make(chan struct{}) + allSystemsGo := make(chan struct{}) IsLeader.Store(false) // Configure API server for the operator. - srv, err := api.NewServer(shutdownSignal, k8sInitDone, getAPIServerAddr()...) + srv, err := api.NewServer(shutdownSignal, allSystemsGo, getAPIServerAddr()...) if err != nil { log.WithError(err).Fatalf("Unable to create operator apiserver") } + close(allSystemsGo) go func() { - err = srv.WithStatusCheckFunc(checkStatus).StartServer() + err = srv.WithStatusCheckFunc(func() error { return checkStatus(clientset) }).StartServer() if err != nil { log.WithError(err).Fatalf("Unable to start operator apiserver") } @@ -248,8 +236,6 @@ func runOperator() { pprof.Enable(operatorOption.Config.PProfPort) } - initK8s(k8sInitDone) - capabilities := k8sversion.Capabilities() if !capabilities.MinimalVersionMet { log.Fatalf("Minimal kubernetes version not met: %s < %s", @@ -271,7 +257,7 @@ func runOperator() { // See docs on capabilities.LeasesResourceLock for more context. if !capabilities.LeasesResourceLock { log.Info("Support for coordination.k8s.io/v1 not present, fallback to non HA mode") - onOperatorStart(leaderElectionCtx) + onOperatorStart(leaderElectionCtx, clientset) return } @@ -295,7 +281,7 @@ func runOperator() { Name: leaderElectionResourceLockName, Namespace: ns, }, - Client: k8s.Client().CoordinationV1(), + Client: clientset.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ // Identity name of the lock holder Identity: operatorID, @@ -314,11 +300,13 @@ func runOperator() { RetryPeriod: operatorOption.Config.LeaderElectionRetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: onOperatorStart, + OnStartedLeading: func(ctx context.Context) { + onOperatorStart(ctx, clientset) + }, OnStoppedLeading: func() { log.WithField("operator-id", operatorID).Info("Leader election lost") // Cleanup everything here, and exit. - doCleanup(1) + shutdowner.Shutdown() }, OnNewLeader: func(identity string) { if identity == operatorID { @@ -334,19 +322,14 @@ func runOperator() { }) } -func onOperatorStart(ctx context.Context) { - OnOperatorStartLeading(ctx) +func onOperatorStart(ctx context.Context, clientset k8sClient.Clientset) { + OnOperatorStartLeading(ctx, clientset) <-shutdownSignal // graceful exit log.Info("Received termination signal. Shutting down") } -// Populate options required by cilium-operator command line only. -func Populate() { - operatorAddr = Vp.GetString(operatorOption.OperatorAPIServeAddr) -} - func kvstoreEnabled() bool { if option.Config.KVStore == "" { return false @@ -359,25 +342,21 @@ func kvstoreEnabled() bool { // OnOperatorStartLeading is the function called once the operator starts leading // in HA mode. -func OnOperatorStartLeading(ctx context.Context) { +func OnOperatorStartLeading(ctx context.Context, clientset k8sClient.Clientset) { IsLeader.Store(true) - ciliumK8sClient = k8s.CiliumClient() - // If CiliumEndpointSlice feature is enabled, create CESController, start CEP watcher and run controller. if !option.Config.DisableCiliumEndpointCRD && option.Config.EnableCiliumEndpointSlice { log.Info("Create and run CES controller, start CEP watcher") // Initialize the CES controller - cesController := ces.NewCESController(k8s.CiliumClient(), + cesController := ces.NewCESController(clientset, operatorOption.Config.CESMaxCEPsInCES, operatorOption.Config.CESSlicingMode, - 0.0, 0, - /* TODO: Fixed in next commit. - option.Config.K8sClientQPSLimit, - option.Config.K8sClientBurst)*/) + float64(clientset.Config().K8sClientQPS), + clientset.Config().K8sClientBurst) stopCh := make(chan struct{}) // Start CEP watcher - operatorWatchers.CiliumEndpointsSliceInit(k8s.CiliumClient().CiliumV2(), cesController) + operatorWatchers.CiliumEndpointsSliceInit(clientset, cesController) // Start the CES controller, after current CEPs are synced locally in cache. go cesController.Run(operatorWatchers.CiliumEndpointStore, stopCh) } @@ -390,7 +369,7 @@ func OnOperatorStartLeading(ctx context.Context) { if option.Config.DisableCiliumEndpointCRD { log.Infof("KubeDNS unmanaged pods controller disabled as %q option is set to 'disabled' in Cilium ConfigMap", option.DisableCiliumEndpointCRDName) } else if operatorOption.Config.UnmanagedPodWatcherInterval != 0 { - go enableUnmanagedKubeDNSController() + go enableUnmanagedKubeDNSController(clientset) } var ( @@ -412,7 +391,7 @@ func OnOperatorStartLeading(ctx context.Context) { log.WithError(err).Fatalf("Unable to init %s allocator", ipamMode) } - nm, err := alloc.Start(ctx, &ciliumNodeUpdateImplementation{}) + nm, err := alloc.Start(ctx, &ciliumNodeUpdateImplementation{clientset}) if err != nil { log.WithError(err).Fatalf("Unable to start %s allocator", ipamMode) } @@ -422,12 +401,12 @@ func OnOperatorStartLeading(ctx context.Context) { if operatorOption.Config.BGPAnnounceLBIP { log.Info("Starting LB IP allocator") - operatorWatchers.StartLBIPAllocator(ctx, option.Config) + operatorWatchers.StartLBIPAllocator(ctx, option.Config, clientset) } if kvstoreEnabled() { if operatorOption.Config.SyncK8sServices { - operatorWatchers.StartSynchronizingServices(true, option.Config) + operatorWatchers.StartSynchronizingServices(clientset, true, option.Config) } var goopts *kvstore.ExtraOptions @@ -461,7 +440,7 @@ func OnOperatorStartLeading(ctx context.Context) { logfields.ServiceName: name, logfields.ServiceNamespace: namespace, }).Info("Retrieving service spec from k8s to perform automatic etcd service translation") - k8sSvc, err := k8s.Client().CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) + k8sSvc, err := clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) switch { case err == nil: // Create another service cache that contains the @@ -517,19 +496,19 @@ func OnOperatorStartLeading(ctx context.Context) { "set-cilium-is-up-condition": operatorOption.Config.SetCiliumIsUpCondition, }).Info("Removing Cilium Node Taints or Setting Cilium Is Up Condition for Kubernetes Nodes") - operatorWatchers.HandleNodeTolerationAndTaints(stopCh) + operatorWatchers.HandleNodeTolerationAndTaints(clientset, stopCh) } - if err := startSynchronizingCiliumNodes(ctx, nodeManager, withKVStore); err != nil { + if err := startSynchronizingCiliumNodes(ctx, clientset, nodeManager, withKVStore); err != nil { log.WithError(err).Fatal("Unable to setup node watcher") } if operatorOption.Config.CNPNodeStatusGCInterval != 0 { - RunCNPNodeStatusGC(ciliumNodeStore) + RunCNPNodeStatusGC(clientset, ciliumNodeStore) } if operatorOption.Config.NodesGCInterval != 0 { - operatorWatchers.RunCiliumNodeGC(ctx, ciliumNodeStore, operatorOption.Config.NodesGCInterval) + operatorWatchers.RunCiliumNodeGC(ctx, clientset, ciliumNodeStore, operatorOption.Config.NodesGCInterval) } if option.Config.IPAM == ipamOption.IPAMClusterPool || option.Config.IPAM == ipamOption.IPAMClusterPoolV2 { @@ -562,10 +541,10 @@ func OnOperatorStartLeading(ctx context.Context) { log.Fatal("CRD Identity allocation mode requires k8s to be configured.") } - startManagingK8sIdentities() + startManagingK8sIdentities(clientset) if operatorOption.Config.IdentityGCInterval != 0 { - go startCRDIdentityGC() + go startCRDIdentityGC(clientset) } case option.IdentityAllocationModeKVstore: if operatorOption.Config.IdentityGCInterval != 0 { @@ -574,21 +553,21 @@ func OnOperatorStartLeading(ctx context.Context) { } if operatorOption.Config.EndpointGCInterval != 0 { - enableCiliumEndpointSyncGC(false) + enableCiliumEndpointSyncGC(clientset, false) } else { // Even if the EndpointGC is disabled we still want it to run at least // once. This is to prevent leftover CEPs from populating ipcache with // stale entries. - enableCiliumEndpointSyncGC(true) + enableCiliumEndpointSyncGC(clientset, true) } - err = enableCNPWatcher() + err = enableCNPWatcher(clientset) if err != nil { log.WithError(err).WithField(logfields.LogSubsys, "CNPWatcher").Fatal( "Cannot connect to Kubernetes apiserver ") } - err = enableCCNPWatcher() + err = enableCCNPWatcher(clientset) if err != nil { log.WithError(err).WithField(logfields.LogSubsys, "CCNPWatcher").Fatal( "Cannot connect to Kubernetes apiserver ") @@ -596,6 +575,7 @@ func OnOperatorStartLeading(ctx context.Context) { if operatorOption.Config.EnableIngressController { ingressController, err := ingress.NewIngressController( + clientset, ingress.WithHTTPSEnforced(operatorOption.Config.EnforceIngressHTTPS), ingress.WithSecretsSyncEnabled(operatorOption.Config.EnableIngressSecretsSync), ingress.WithSecretsNamespace(operatorOption.Config.IngressSecretsNamespace), diff --git a/operator/pkg/ciliumendpointslice/endpointslice.go b/operator/pkg/ciliumendpointslice/endpointslice.go index f4409c9c0ee1e..d82e647b9cfdf 100644 --- a/operator/pkg/ciliumendpointslice/endpointslice.go +++ b/operator/pkg/ciliumendpointslice/endpointslice.go @@ -15,9 +15,9 @@ import ( "github.com/cilium/cilium/operator/metrics" operatorOption "github.com/cilium/cilium/operator/option" - "github.com/cilium/cilium/pkg/k8s" "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" capi_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" csv2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2" csv2a1 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2alpha1" "github.com/cilium/cilium/pkg/k8s/informer" @@ -93,7 +93,7 @@ func GetCEPNameFromCCEP(cep *capi_v2a1.CoreCiliumEndpoint, namespace string) str } // NewCESController, creates and initializes the CES controller -func NewCESController(client *k8s.K8sCiliumClient, +func NewCESController(clientset k8sClient.Clientset, maxCEPsInCES int, slicingMode string, qpsLimit float64, @@ -124,7 +124,7 @@ func NewCESController(client *k8s.K8sCiliumClient, if slicingMode == cesIdentityBasedSlicing { manager = newCESManagerIdentity(rlQueue, maxCEPsInCES) } - cesStore := ciliumEndpointSliceInit(client.CiliumV2alpha1(), wait.NeverStop) + cesStore := ciliumEndpointSliceInit(clientset.CiliumV2alpha1(), wait.NeverStop) // List all existing CESs from the api-server and cache it locally. // This sync should happen before starting CEP watcher, because CEP watcher @@ -134,9 +134,9 @@ func NewCESController(client *k8s.K8sCiliumClient, // to sync existing CESs before starting a CEP watcher. syncCESsInLocalCache(cesStore, manager) return &CiliumEndpointSliceController{ - clientV2: client.CiliumV2(), - clientV2a1: client.CiliumV2alpha1(), - reconciler: newReconciler(client.CiliumV2alpha1(), manager), + clientV2: clientset.CiliumV2(), + clientV2a1: clientset.CiliumV2alpha1(), + reconciler: newReconciler(clientset.CiliumV2alpha1(), manager), Manager: manager, queue: rlQueue, ciliumEndpointSliceStore: cesStore, diff --git a/operator/pkg/ingress/endpoint.go b/operator/pkg/ingress/endpoint.go index 2cb1ccf6ea7b7..2a54dfa7f5441 100644 --- a/operator/pkg/ingress/endpoint.go +++ b/operator/pkg/ingress/endpoint.go @@ -11,7 +11,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_networkingv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/networking/v1" @@ -24,7 +24,7 @@ type endpointManager struct { maxRetries int } -func newEndpointManager(maxRetries int) (*endpointManager, error) { +func newEndpointManager(clientset k8sClient.Clientset, maxRetries int) (*endpointManager, error) { manager := &endpointManager{ maxRetries: maxRetries, } @@ -32,7 +32,7 @@ func newEndpointManager(maxRetries int) (*endpointManager, error) { // setup store and informer only for endpoints having label cilium.io/ingress manager.store, manager.informer = informer.NewInformer( utils.ListerWatcherWithModifier( - utils.ListerWatcherFromTyped[*slim_corev1.EndpointsList](k8s.WatcherClient().CoreV1().Endpoints("")), + utils.ListerWatcherFromTyped[*slim_corev1.EndpointsList](clientset.Slim().CoreV1().Endpoints("")), func(options *metav1.ListOptions) { options.LabelSelector = ciliumIngressLabelKey }), diff --git a/operator/pkg/ingress/envoy_config.go b/operator/pkg/ingress/envoy_config.go index d4d8f8df7b83e..96f8a3d85e61d 100644 --- a/operator/pkg/ingress/envoy_config.go +++ b/operator/pkg/ingress/envoy_config.go @@ -30,8 +30,8 @@ import ( "github.com/cilium/cilium/operator/pkg/ingress/annotations" "github.com/cilium/cilium/pkg/envoy" - "github.com/cilium/cilium/pkg/k8s" v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_networkingv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/networking/v1" "github.com/cilium/cilium/pkg/k8s/utils" @@ -48,14 +48,14 @@ type envoyConfigManager struct { maxRetries int } -func newEnvoyConfigManager(maxRetries int) (*envoyConfigManager, error) { +func newEnvoyConfigManager(clientset k8sClient.Clientset, maxRetries int) (*envoyConfigManager, error) { manager := &envoyConfigManager{ maxRetries: maxRetries, } // setup store and informer only for endpoints having label cilium.io/ingress manager.store, manager.informer = informer.NewInformer( - utils.ListerWatcherFromTyped[*v2.CiliumEnvoyConfigList](k8s.CiliumClient().CiliumV2().CiliumEnvoyConfigs(corev1.NamespaceAll)), + utils.ListerWatcherFromTyped[*v2.CiliumEnvoyConfigList](clientset.CiliumV2().CiliumEnvoyConfigs(corev1.NamespaceAll)), &v2.CiliumEnvoyConfig{}, 0, cache.ResourceEventHandlerFuncs{}, diff --git a/operator/pkg/ingress/ingress.go b/operator/pkg/ingress/ingress.go index 5fbc1f8a99fb7..f05498db37136 100644 --- a/operator/pkg/ingress/ingress.go +++ b/operator/pkg/ingress/ingress.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_networkingv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/networking/v1" @@ -58,6 +59,8 @@ type ingressServiceUpdatedEvent struct { // 3. Manage synced TLS secrets in given namespace // - TLS secrets type IngressController struct { + clientset k8sClient.Clientset + ingressInformer cache.Controller ingressStore cache.Store @@ -76,7 +79,7 @@ type IngressController struct { } // NewIngressController returns a controller for ingress objects having ingressClassName as cilium -func NewIngressController(options ...Option) (*IngressController, error) { +func NewIngressController(clientset k8sClient.Clientset, options ...Option) (*IngressController, error) { opts := DefaultIngressOptions for _, opt := range options { if err := opt(&opts); err != nil { @@ -85,6 +88,7 @@ func NewIngressController(options ...Option) (*IngressController, error) { } ic := &IngressController{ + clientset: clientset, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), maxRetries: opts.MaxRetries, enforcedHTTPS: opts.EnforcedHTTPS, @@ -93,7 +97,7 @@ func NewIngressController(options ...Option) (*IngressController, error) { lbAnnotationPrefixes: opts.LBAnnotationPrefixes, } ic.ingressStore, ic.ingressInformer = informer.NewInformer( - utils.ListerWatcherFromTyped[*slim_networkingv1.IngressList](k8s.WatcherClient().NetworkingV1().Ingresses(corev1.NamespaceAll)), + utils.ListerWatcherFromTyped[*slim_networkingv1.IngressList](clientset.Slim().NetworkingV1().Ingresses(corev1.NamespaceAll)), &slim_networkingv1.Ingress{}, 0, cache.ResourceEventHandlerFuncs{ @@ -104,19 +108,19 @@ func NewIngressController(options ...Option) (*IngressController, error) { nil, ) - serviceManager, err := newServiceManager(ic.queue, opts.MaxRetries) + serviceManager, err := newServiceManager(clientset, ic.queue, opts.MaxRetries) if err != nil { return nil, err } ic.serviceManager = serviceManager - endpointManager, err := newEndpointManager(opts.MaxRetries) + endpointManager, err := newEndpointManager(clientset, opts.MaxRetries) if err != nil { return nil, err } ic.endpointManager = endpointManager - envoyConfigManager, err := newEnvoyConfigManager(opts.MaxRetries) + envoyConfigManager, err := newEnvoyConfigManager(clientset, opts.MaxRetries) if err != nil { return nil, err } @@ -124,7 +128,7 @@ func NewIngressController(options ...Option) (*IngressController, error) { ic.secretManager = newNoOpsSecretManager() if ic.enabledSecretsSync { - secretManager, err := newSyncSecretsManager(opts.SecretsNamespace, opts.MaxRetries) + secretManager, err := newSyncSecretsManager(clientset, opts.SecretsNamespace, opts.MaxRetries) if err != nil { return nil, err } @@ -243,7 +247,7 @@ func (ic *IngressController) handleIngressServiceUpdatedEvent(ingressServiceUpda } newIngressStatus := getIngressForStatusUpdate(ingress, service.Status.LoadBalancer) - _, err = k8s.Client().NetworkingV1().Ingresses(ingress.Namespace).UpdateStatus(context.Background(), newIngressStatus, metav1.UpdateOptions{}) + _, err = ic.clientset.NetworkingV1().Ingresses(ingress.Namespace).UpdateStatus(context.Background(), newIngressStatus, metav1.UpdateOptions{}) if err != nil { scopedLog.WithError(err).Warn("Failed to update ingress status") return err @@ -348,7 +352,7 @@ func (ic *IngressController) createLoadBalancer(ingress *slim_networkingv1.Ingre return nil } - _, err = k8s.Client().CoreV1().Services(ingress.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}) + _, err = ic.clientset.CoreV1().Services(ingress.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}) if err != nil { log.WithError(err).WithField(logfields.Ingress, ingress.Name).Error("Failed to create a service for ingress") return err @@ -377,7 +381,7 @@ func (ic *IngressController) createEndpoints(ingress *slim_networkingv1.Ingress) return nil } - _, err = k8s.Client().CoreV1().Endpoints(ingress.Namespace).Create(context.Background(), endpoints, metav1.CreateOptions{}) + _, err = ic.clientset.CoreV1().Endpoints(ingress.Namespace).Create(context.Background(), endpoints, metav1.CreateOptions{}) if err != nil { log.WithError(err).WithField(logfields.Ingress, ingress.Name).Error("Failed to create endpoints for ingress") return err @@ -414,7 +418,7 @@ func (ic *IngressController) createEnvoyConfig(ingress *slim_networkingv1.Ingres // Update existing CEC newEnvoyConfig := existingEnvoyConfig.DeepCopy() newEnvoyConfig.Spec = desired.Spec - _, err = k8s.CiliumClient().CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Update(context.Background(), newEnvoyConfig, metav1.UpdateOptions{}) + _, err = ic.clientset.CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Update(context.Background(), newEnvoyConfig, metav1.UpdateOptions{}) if err != nil { scopedLog.WithError(err).Error("Failed to update CiliumEnvoyConfig for ingress") return err @@ -422,7 +426,7 @@ func (ic *IngressController) createEnvoyConfig(ingress *slim_networkingv1.Ingres scopedLog.Debug("Updated CiliumEnvoyConfig for ingress") return nil } - _, err = k8s.CiliumClient().CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Create(context.Background(), desired, metav1.CreateOptions{}) + _, err = ic.clientset.CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Create(context.Background(), desired, metav1.CreateOptions{}) if err != nil { scopedLog.WithError(err).Error("Failed to create CiliumEnvoyConfig for ingress") return err @@ -472,7 +476,7 @@ func (ic *IngressController) deleteCiliumEnvoyConfig(ingress *slim_networkingv1. scopedLog.Debug("CiliumEnvoyConfig already deleted. Continuing...") return nil } - err = k8s.CiliumClient().CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Delete(context.Background(), resourceName, metav1.DeleteOptions{}) + err = ic.clientset.CiliumV2().CiliumEnvoyConfigs(ingress.Namespace).Delete(context.Background(), resourceName, metav1.DeleteOptions{}) if err != nil && !k8serrors.IsNotFound(err) { scopedLog.Error("Failed to delete CiliumEnvoyConfig for ingress") return err diff --git a/operator/pkg/ingress/secret.go b/operator/pkg/ingress/secret.go index 14de44dee54aa..622c0e93d2675 100644 --- a/operator/pkg/ingress/secret.go +++ b/operator/pkg/ingress/secret.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_networkingv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/networking/v1" @@ -77,18 +78,18 @@ func (n noOpsSecretManager) Run() {} func (n noOpsSecretManager) Add(event interface{}) {} // newSyncSecretsManager constructs a new secret manager instance -func newSyncSecretsManager(namespace string, maxRetries int) (secretManager, error) { +func newSyncSecretsManager(clientset k8sClient.Clientset, namespace string, maxRetries int) (secretManager, error) { manager := &syncSecretManager{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), namespace: namespace, maxRetries: maxRetries, - client: k8s.Client().CoreV1().Secrets(namespace), + client: clientset.CoreV1().Secrets(namespace), watchedSecretMap: map[string]string{}, } manager.store, manager.informer = informer.NewInformer( utils.ListerWatcherWithModifier( - utils.ListerWatcherFromTyped[*slim_corev1.SecretList](k8s.WatcherClient().CoreV1().Secrets(corev1.NamespaceAll)), + utils.ListerWatcherFromTyped[*slim_corev1.SecretList](clientset.Slim().CoreV1().Secrets(corev1.NamespaceAll)), // only watch TLS secret func(options *metav1.ListOptions) { options.FieldSelector = tlsFieldSelector diff --git a/operator/pkg/ingress/service.go b/operator/pkg/ingress/service.go index 9af3dfdaa708c..c2208a6cb601c 100644 --- a/operator/pkg/ingress/service.go +++ b/operator/pkg/ingress/service.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_networkingv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/networking/v1" @@ -45,7 +46,7 @@ type serviceManager struct { ingressQueue workqueue.RateLimitingInterface } -func newServiceManager(ingressQueue workqueue.RateLimitingInterface, maxRetries int) (*serviceManager, error) { +func newServiceManager(clientset k8sClient.Clientset, ingressQueue workqueue.RateLimitingInterface, maxRetries int) (*serviceManager, error) { manager := &serviceManager{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), ingressQueue: ingressQueue, @@ -54,7 +55,7 @@ func newServiceManager(ingressQueue workqueue.RateLimitingInterface, maxRetries manager.store, manager.informer = informer.NewInformer( utils.ListerWatcherWithModifier( - utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](k8s.WatcherClient().CoreV1().Services("")), + utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](clientset.Slim().CoreV1().Services("")), func(options *metav1.ListOptions) { options.LabelSelector = ciliumIngressLabelKey }), diff --git a/operator/watchers/bgp.go b/operator/watchers/bgp.go index 5dc1cb9b71cee..5415790f2657d 100644 --- a/operator/watchers/bgp.go +++ b/operator/watchers/bgp.go @@ -7,6 +7,7 @@ import ( "context" "github.com/cilium/cilium/pkg/bgp/manager" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/lock" ) @@ -14,7 +15,7 @@ import ( // StartLBIPAllocator starts the service watcher if it hasn't already and looks // for service of type LoadBalancer. Once it finds a service of that type, it // will try to allocate an external IP (LoadBalancerIP) for it. -func StartLBIPAllocator(ctx context.Context, cfg ServiceSyncConfiguration) { +func StartLBIPAllocator(ctx context.Context, cfg ServiceSyncConfiguration, clientset k8sClient.Clientset) { optsModifier, err := utils.GetServiceListOptionsModifier(cfg) if err != nil { log.WithError(err).Fatal("Error creating service option modifier") @@ -22,7 +23,7 @@ func StartLBIPAllocator(ctx context.Context, cfg ServiceSyncConfiguration) { swgSvcs := lock.NewStoppableWaitGroup() swgEps := lock.NewStoppableWaitGroup() - InitServiceWatcher(cfg, swgSvcs, swgEps, optsModifier) + InitServiceWatcher(cfg, clientset, swgSvcs, swgEps, optsModifier) m, err := manager.New(ctx, serviceIndexer) if err != nil { diff --git a/operator/watchers/cilium_endpoint.go b/operator/watchers/cilium_endpoint.go index 19ca31cf6a5ff..5f300dac56d38 100644 --- a/operator/watchers/cilium_endpoint.go +++ b/operator/watchers/cilium_endpoint.go @@ -16,7 +16,7 @@ import ( ces "github.com/cilium/cilium/operator/pkg/ciliumendpointslice" "github.com/cilium/cilium/pkg/k8s" cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - cilium_cli "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/logging/logfields" @@ -51,10 +51,10 @@ var ( ) // CiliumEndpointsSliceInit starts a CiliumEndpointWatcher and caches cesController locally. -func CiliumEndpointsSliceInit(ciliumNPClient cilium_cli.CiliumV2Interface, +func CiliumEndpointsSliceInit(clientset k8sClient.Clientset, cbController *ces.CiliumEndpointSliceController) { cesController = cbController - CiliumEndpointsInit(ciliumNPClient, wait.NeverStop) + CiliumEndpointsInit(clientset, wait.NeverStop) } // identityIndexFunc index identities by ID. @@ -71,7 +71,7 @@ func identityIndexFunc(obj interface{}) ([]string, error) { } // CiliumEndpointsInit starts a CiliumEndpointWatcher -func CiliumEndpointsInit(ciliumNPClient cilium_cli.CiliumV2Interface, stopCh <-chan struct{}) { +func CiliumEndpointsInit(clientset k8sClient.Clientset, stopCh <-chan struct{}) { once.Do(func() { CiliumEndpointStore = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers) @@ -104,7 +104,7 @@ func CiliumEndpointsInit(ciliumNPClient cilium_cli.CiliumV2Interface, stopCh <-c } ciliumEndpointInformer := informer.NewInformerWithStore( - utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumEndpointList](k8s.CiliumClient().CiliumV2().CiliumEndpoints("")), + utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumEndpointList](clientset.CiliumV2().CiliumEndpoints("")), &cilium_api_v2.CiliumEndpoint{}, 0, cacheResourceHandler, diff --git a/operator/watchers/cilium_node_gc.go b/operator/watchers/cilium_node_gc.go index 87f8d70827268..6550c45871b2f 100644 --- a/operator/watchers/cilium_node_gc.go +++ b/operator/watchers/cilium_node_gc.go @@ -12,8 +12,8 @@ import ( "k8s.io/client-go/tools/cache" "github.com/cilium/cilium/pkg/controller" - "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" ciliumv2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2" "github.com/cilium/cilium/pkg/lock" "github.com/cilium/cilium/pkg/logging/logfields" @@ -52,8 +52,8 @@ func (c *ciliumNodeGCCandidate) Delete(nodeName string) { } // RunCiliumNodeGC performs garbage collector for cilium node resource -func RunCiliumNodeGC(ctx context.Context, ciliumNodeStore cache.Store, interval time.Duration) { - nodesInit(k8s.WatcherClient(), ctx.Done()) +func RunCiliumNodeGC(ctx context.Context, clientset k8sClient.Clientset, ciliumNodeStore cache.Store, interval time.Duration) { + nodesInit(clientset.Slim(), ctx.Done()) // wait for k8s nodes synced is done select { @@ -70,7 +70,7 @@ func RunCiliumNodeGC(ctx context.Context, ciliumNodeStore cache.Store, interval controller.ControllerParams{ Context: ctx, DoFunc: func(ctx context.Context) error { - return performCiliumNodeGC(ctx, k8s.CiliumClient().CiliumV2().CiliumNodes(), ciliumNodeStore, + return performCiliumNodeGC(ctx, clientset.CiliumV2().CiliumNodes(), ciliumNodeStore, nodeGetter{}, interval, candidateStore) }, RunInterval: interval, diff --git a/operator/watchers/k8s_service_sync.go b/operator/watchers/k8s_service_sync.go index e4a918e9df690..2871f484b4b80 100644 --- a/operator/watchers/k8s_service_sync.go +++ b/operator/watchers/k8s_service_sync.go @@ -16,6 +16,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1" @@ -109,7 +110,7 @@ type ServiceSyncConfiguration interface { // 'shared' specifies whether only shared services are synchronized. If 'false' then all services // will be synchronized. For clustermesh we only need to synchronize shared services, while for // VM support we need to sync all the services. -func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { +func StartSynchronizingServices(clientset k8sClient.Clientset, shared bool, cfg ServiceSyncConfiguration) { log.Info("Starting to synchronize k8s services to kvstore") sharedOnly = shared @@ -122,11 +123,15 @@ func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { go func() { store, err := store.JoinSharedStore(store.Configuration{ - Prefix: serviceStore.ServiceStorePrefix, + Prefix: serviceStore.ServiceStorePrefix, + SynchronizationInterval: 5 * time.Minute, + SharedKeyDeleteDelay: 0, KeyCreator: func() store.Key { return &serviceStore.ClusterService{} }, - SynchronizationInterval: 5 * time.Minute, + Backend: nil, + Observer: nil, + Context: nil, }) if err != nil { @@ -142,7 +147,7 @@ func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { serviceSubscribers.Register(newServiceCacheSubscriber(swgSvcs, swgEps)) - InitServiceWatcher(cfg, swgSvcs, swgEps, serviceOptsModifier) + InitServiceWatcher(cfg, clientset, swgSvcs, swgEps, serviceOptsModifier) var ( endpointController cache.Controller @@ -152,7 +157,7 @@ func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { switch { case k8s.SupportsEndpointSlice(): var endpointSliceEnabled bool - endpointController, endpointSliceEnabled = endpointSlicesInit(k8s.WatcherClient(), swgEps) + endpointController, endpointSliceEnabled = endpointSlicesInit(clientset.Slim(), swgEps) // the cluster has endpoint slices so we should not check for v1.Endpoints if endpointSliceEnabled { // endpointController has been kicked off already inside @@ -167,7 +172,7 @@ func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { } fallthrough default: - endpointController = endpointsInit(k8s.WatcherClient(), swgEps, serviceOptsModifier) + endpointController = endpointsInit(clientset.Slim(), swgEps, serviceOptsModifier) go endpointController.Run(wait.NeverStop) } @@ -187,6 +192,7 @@ func StartSynchronizingServices(shared bool, cfg ServiceSyncConfiguration) { // changes and push changes into ServiceCache. func InitServiceWatcher( cfg ServiceSyncConfiguration, + clientset k8sClient.Clientset, swgSvcs, swgEps *lock.StoppableWaitGroup, optsModifier func(options *v1meta.ListOptions), ) { @@ -194,7 +200,7 @@ func InitServiceWatcher( // Watch for v1.Service changes and push changes into ServiceCache. serviceIndexer, serviceController = informer.NewInformer( utils.ListerWatcherWithModifier( - utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](k8s.WatcherClient().CoreV1().Services("")), + utils.ListerWatcherFromTyped[*slim_corev1.ServiceList](clientset.Slim().CoreV1().Services("")), optsModifier), &slim_corev1.Service{}, 0, diff --git a/operator/watchers/node_taint.go b/operator/watchers/node_taint.go index d71645c2d5fce..8cb7c6777f705 100644 --- a/operator/watchers/node_taint.go +++ b/operator/watchers/node_taint.go @@ -9,25 +9,25 @@ import ( "errors" "fmt" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sTypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "github.com/cilium/cilium/operator/option" "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1" - slimclientset "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned" k8sUtils "github.com/cilium/cilium/pkg/k8s/utils" "github.com/cilium/cilium/pkg/logging/logfields" pkgOption "github.com/cilium/cilium/pkg/option" - - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8sTypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) const ( @@ -107,13 +107,13 @@ func checkAndMarkNode(c kubernetes.Interface, nodeGetter slimNodeGetter, nodeNam } // ciliumPodsWatcher starts up a pod watcher to handle pod events. -func ciliumPodsWatcher(slimClient slimclientset.Interface, stopCh <-chan struct{}) { +func ciliumPodsWatcher(clientset k8sClient.Clientset, stopCh <-chan struct{}) { ciliumQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cilium-pod-queue") ciliumPodInformer := informer.NewInformerWithStore( k8sUtils.ListerWatcherWithModifier( k8sUtils.ListerWatcherFromTyped[*slim_corev1.PodList]( - slimClient.CoreV1().Pods(option.Config.CiliumK8sNamespace), + clientset.Slim().CoreV1().Pods(option.Config.CiliumK8sNamespace), ), func(options *metav1.ListOptions) { options.LabelSelector = option.Config.CiliumPodLabels @@ -140,7 +140,7 @@ func ciliumPodsWatcher(slimClient slimclientset.Interface, stopCh <-chan struct{ // Do not use the k8sClient provided by the nodesInit function since we // need a k8s client that can update node structures and not simply // watch for node events. - for processNextCiliumPodItem(k8s.Client(), nodeGetter, ciliumQueue) { + for processNextCiliumPodItem(clientset, nodeGetter, ciliumQueue) { } }() @@ -404,20 +404,20 @@ func markNode(c kubernetes.Interface, nodeGetter slimNodeGetter, nodeName string } // HandleNodeTolerationAndTaints remove node -func HandleNodeTolerationAndTaints(stopCh <-chan struct{}) { +func HandleNodeTolerationAndTaints(clientset k8sClient.Clientset, stopCh <-chan struct{}) { mno = markNodeOptions{ RemoveNodeTaint: option.Config.RemoveCiliumNodeTaints, SetCiliumIsUpCondition: option.Config.SetCiliumIsUpCondition, } - nodesInit(k8s.WatcherClient(), stopCh) + nodesInit(clientset.Slim(), stopCh) go func() { // Do not use the k8sClient provided by the nodesInit function since we // need a k8s client that can update node structures and not simply // watch for node events. - for checkTaintForNextNodeItem(k8s.Client(), &nodeGetter{}, nodeQueue) { + for checkTaintForNextNodeItem(clientset, &nodeGetter{}, nodeQueue) { } }() - ciliumPodsWatcher(k8s.WatcherClient(), stopCh) + ciliumPodsWatcher(clientset, stopCh) } diff --git a/operator/watchers/pod.go b/operator/watchers/pod.go index c76cd20126476..bd5dac61f32d4 100644 --- a/operator/watchers/pod.go +++ b/operator/watchers/pod.go @@ -9,7 +9,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "github.com/cilium/cilium/pkg/k8s" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1" @@ -47,14 +47,14 @@ func podNodeNameIndexFunc(obj interface{}) ([]string, error) { return []string{}, nil } -func PodsInit(slimClient *k8s.K8sSlimClient, stopCh <-chan struct{}) { +func PodsInit(clientset k8sClient.Clientset, stopCh <-chan struct{}) { var podInformer cache.Controller PodStore = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{ PodNodeNameIndex: podNodeNameIndexFunc, }) podInformer = informer.NewInformerWithStore( k8sUtils.ListerWatcherWithFields( - k8sUtils.ListerWatcherFromTyped[*slim_corev1.PodList](slimClient.CoreV1().Pods("")), + k8sUtils.ListerWatcherFromTyped[*slim_corev1.PodList](clientset.Slim().CoreV1().Pods("")), fields.Everything()), &slim_corev1.Pod{}, 0, @@ -120,11 +120,11 @@ func convertToPod(obj interface{}) interface{} { } } -func UnmanagedKubeDNSPodsInit(slimClient *k8s.K8sSlimClient) { +func UnmanagedKubeDNSPodsInit(clientset k8sClient.Clientset) { var unmanagedPodInformer cache.Controller UnmanagedKubeDNSPodStore, unmanagedPodInformer = informer.NewInformer( k8sUtils.ListerWatcherWithModifier( - k8sUtils.ListerWatcherFromTyped[*slim_corev1.PodList](slimClient.CoreV1().Pods("")), + k8sUtils.ListerWatcherFromTyped[*slim_corev1.PodList](clientset.Slim().CoreV1().Pods("")), func(options *metav1.ListOptions) { options.LabelSelector = "k8s-app=kube-dns" options.FieldSelector = "status.phase=Running" diff --git a/pkg/k8s/client/cell.go b/pkg/k8s/client/cell.go index fe96f4ab58d1d..3c1c0426adddb 100644 --- a/pkg/k8s/client/cell.go +++ b/pkg/k8s/client/cell.go @@ -421,13 +421,13 @@ type FakeClientset struct { *CiliumFakeClientset *APIExtFakeClientset - SlimClientset *SlimFakeClientset + SlimFakeClientset *SlimFakeClientset } var _ Clientset = &FakeClientset{} func (c *FakeClientset) Slim() slim_clientset.Interface { - return c.SlimClientset + return c.SlimFakeClientset } func (c *FakeClientset) Discovery() discovery.DiscoveryInterface { @@ -448,7 +448,7 @@ func (c *FakeClientset) Config() Config { func NewFakeClientset() (*FakeClientset, Clientset) { client := FakeClientset{ - SlimClientset: slim_fake.NewSimpleClientset(), + SlimFakeClientset: slim_fake.NewSimpleClientset(), CiliumFakeClientset: cilium_fake.NewSimpleClientset(), APIExtFakeClientset: apiext_fake.NewSimpleClientset(), KubernetesFakeClientset: fake.NewSimpleClientset(), diff --git a/pkg/k8s/client/config.go b/pkg/k8s/client/config.go index d3e893e5f3fee..f8fda8fe7c717 100644 --- a/pkg/k8s/client/config.go +++ b/pkg/k8s/client/config.go @@ -39,10 +39,15 @@ func (cfg Config) CellFlags(flags *pflag.FlagSet) { flags.Bool(option.K8sEnableAPIDiscovery, defaults.K8sEnableAPIDiscovery, "Enable discovery of Kubernetes API groups and resources with the discovery API") } +// K8sAPIDiscoveryEnabled returns true if API discovery of API groups and +// resources is enabled func (cfg Config) K8sAPIDiscoveryEnabled() bool { return cfg.EnableK8sAPIDiscovery } +// K8sLeasesFallbackDiscoveryEnabled returns true if we should fallback to direct API +// probing when checking for support of Leases in case Discovery API fails to discover +// required groups. func (cfg Config) K8sLeasesFallbackDiscoveryEnabled() bool { return cfg.K8sAPIDiscoveryEnabled() } diff --git a/pkg/option/config.go b/pkg/option/config.go index 82fa9a137d3a9..7cc5fa4b70f84 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -2113,8 +2113,6 @@ type DaemonConfig struct { // map. SizeofSockRevElement int - K8sEnableAPIDiscovery bool - // k8sEnableLeasesFallbackDiscovery enables k8s to fallback to API probing to check // for the support of Leases in Kubernetes when there is an error in discovering // API groups using Discovery API. @@ -2276,7 +2274,6 @@ var ( AllowICMPFragNeeded: defaults.AllowICMPFragNeeded, EnableWellKnownIdentities: defaults.EnableWellKnownIdentities, K8sEnableK8sEndpointSlice: defaults.K8sEnableEndpointSlice, - K8sEnableAPIDiscovery: defaults.K8sEnableAPIDiscovery, AllocatorListTimeout: defaults.AllocatorListTimeout, EnableICMPRules: defaults.EnableICMPRules, @@ -2519,24 +2516,11 @@ func (c *DaemonConfig) AgentNotReadyNodeTaintValue() string { } } -// K8sAPIDiscoveryEnabled returns true if API discovery of API groups and -// resources is enabled -func (c *DaemonConfig) K8sAPIDiscoveryEnabled() bool { - return c.K8sEnableAPIDiscovery -} - // K8sIngressControllerEnabled returns true if ingress controller feature is enabled in Cilium func (c *DaemonConfig) K8sIngressControllerEnabled() bool { return c.EnableIngressController } -// K8sLeasesFallbackDiscoveryEnabled returns true if we should fallback to direct API -// probing when checking for support of Leases in case Discovery API fails to discover -// required groups. -func (c *DaemonConfig) K8sLeasesFallbackDiscoveryEnabled() bool { - return c.K8sEnableAPIDiscovery -} - // DirectRoutingDeviceRequired return whether the Direct Routing Device is needed under // the current configuration. func (c *DaemonConfig) DirectRoutingDeviceRequired() bool { @@ -2546,12 +2530,6 @@ func (c *DaemonConfig) DirectRoutingDeviceRequired() bool { return (c.EnableNodePort || BPFHostRoutingEnabled) && !c.TunnelingEnabled() } -// EnableK8sLeasesFallbackDiscovery enables using direct API probing as a fallback to check -// for the support of Leases when discovering API groups is not possible. -func (c *DaemonConfig) EnableK8sLeasesFallbackDiscovery() { - c.K8sEnableAPIDiscovery = true -} - func (c *DaemonConfig) validateIPv6ClusterAllocCIDR() error { ip, cidr, err := net.ParseCIDR(c.IPv6ClusterAllocCIDR) if err != nil { @@ -2856,7 +2834,6 @@ func (c *DaemonConfig) Populate(vp *viper.Viper) { c.IPv6ServiceRange = vp.GetString(IPv6ServiceRange) c.JoinCluster = vp.GetBool(JoinClusterName) c.K8sEnableK8sEndpointSlice = vp.GetBool(K8sEnableEndpointSlice) - c.K8sEnableAPIDiscovery = vp.GetBool(K8sEnableAPIDiscovery) c.K8sRequireIPv4PodCIDR = vp.GetBool(K8sRequireIPv4PodCIDRName) c.K8sRequireIPv6PodCIDR = vp.GetBool(K8sRequireIPv6PodCIDRName) c.K8sServiceCacheSize = uint(vp.GetInt(K8sServiceCacheSize)) diff --git a/test/controlplane/suite/agent.go b/test/controlplane/suite/agent.go index 38e998cffd677..74a90f8bcd64d 100644 --- a/test/controlplane/suite/agent.go +++ b/test/controlplane/suite/agent.go @@ -9,14 +9,9 @@ import ( "io/ioutil" "os" - fakeApiExt "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" - "k8s.io/client-go/kubernetes/fake" - "github.com/cilium/cilium/daemon/cmd" fakeDatapath "github.com/cilium/cilium/pkg/datapath/fake" "github.com/cilium/cilium/pkg/endpoint" - fakeCilium "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/fake" - fakeSlim "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned/fake" agentOption "github.com/cilium/cilium/pkg/option" ) @@ -34,7 +29,7 @@ func (h *agentHandle) tearDown() { os.RemoveAll(h.tempDir) } -func startCiliumAgent(nodeName string, clients fakeClients) (*fakeDatapath.FakeDatapath, agentHandle, error) { +func startCiliumAgent(nodeName string) (*fakeDatapath.FakeDatapath, agentHandle, error) { var handle agentHandle handle.tempDir = setupTestDirectories() @@ -85,10 +80,3 @@ func (k8sConfig) K8sAPIDiscoveryEnabled() bool { func (k8sConfig) K8sLeasesFallbackDiscoveryEnabled() bool { return false } - -type fakeClients struct { - core *fake.Clientset - slim *fakeSlim.Clientset - cilium *fakeCilium.Clientset - apiext *fakeApiExt.Clientset -} diff --git a/test/controlplane/suite/testcase.go b/test/controlplane/suite/testcase.go index 2df1e91f0add5..cbfae3d764b51 100644 --- a/test/controlplane/suite/testcase.go +++ b/test/controlplane/suite/testcase.go @@ -14,7 +14,6 @@ import ( corev1 "k8s.io/api/core/v1" discov1 "k8s.io/api/discovery/v1" discov1beta1 "k8s.io/api/discovery/v1beta1" - fakeApiExt "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -24,7 +23,6 @@ import ( versionapi "k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/watch" fakediscovery "k8s.io/client-go/discovery/fake" - "k8s.io/client-go/kubernetes/fake" k8sTesting "k8s.io/client-go/testing" operatorOption "github.com/cilium/cilium/operator/option" @@ -39,11 +37,11 @@ import ( ipamOption "github.com/cilium/cilium/pkg/ipam/option" "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - fakeCilium "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/fake" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" - fakeSlim "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned/fake" "github.com/cilium/cilium/pkg/node/types" "github.com/cilium/cilium/pkg/proxy" + + k8sClient "github.com/cilium/cilium/pkg/k8s/client" ) const ( @@ -58,7 +56,7 @@ type trackerAndDecoder struct { type ControlPlaneTest struct { t *testing.T nodeName string - clients fakeClients + clients *k8sClient.FakeClientset trackers []trackerAndDecoder agentHandle *agentHandle operatorHandle *operatorHandle @@ -66,28 +64,27 @@ type ControlPlaneTest struct { } func NewControlPlaneTest(t *testing.T, nodeName string, k8sVersion string) *ControlPlaneTest { - clients := fakeClients{ - core: addFieldSelection(fake.NewSimpleClientset()), - slim: addFieldSelection(fakeSlim.NewSimpleClientset()), - cilium: addFieldSelection(fakeCilium.NewSimpleClientset()), - apiext: addFieldSelection(fakeApiExt.NewSimpleClientset()), - } - fd := clients.core.Discovery().(*fakediscovery.FakeDiscovery) + clients, _ := k8sClient.NewFakeClientset() + clients.KubernetesFakeClientset = addFieldSelection(clients.KubernetesFakeClientset) + clients.SlimFakeClientset = addFieldSelection(clients.SlimFakeClientset) + clients.CiliumFakeClientset = addFieldSelection(clients.CiliumFakeClientset) + clients.APIExtFakeClientset = addFieldSelection(clients.APIExtFakeClientset) + fd := clients.KubernetesFakeClientset.Discovery().(*fakediscovery.FakeDiscovery) fd.FakedServerVersion = toVersionInfo(k8sVersion) resources, ok := apiResources[k8sVersion] if !ok { panic(fmt.Sprintf("k8s version %s not found in apiResources", k8sVersion)) } - clients.core.Resources = resources - clients.slim.Resources = resources - clients.cilium.Resources = resources - clients.apiext.Resources = resources + clients.KubernetesFakeClientset.Resources = resources + clients.SlimFakeClientset.Resources = resources + clients.CiliumFakeClientset.Resources = resources + clients.APIExtFakeClientset.Resources = resources trackers := []trackerAndDecoder{ - {clients.core.Tracker(), coreDecoder}, - {clients.slim.Tracker(), slimDecoder}, - {clients.cilium.Tracker(), ciliumDecoder}, + {clients.KubernetesFakeClientset.Tracker(), coreDecoder}, + {clients.SlimFakeClientset.Tracker(), slimDecoder}, + {clients.CiliumFakeClientset.Tracker(), ciliumDecoder}, } return &ControlPlaneTest{ @@ -106,8 +103,8 @@ func (cpt *ControlPlaneTest) SetupEnvironment(modConfig func(*agentOption.Daemon // Configure k8s and perform capability detection with the fake client. k8s.Configure("dummy", "dummy", 10.0, 10) - version.Update(cpt.clients.core, &k8sConfig{}) - k8s.SetClients(cpt.clients.core, cpt.clients.slim, cpt.clients.cilium, cpt.clients.apiext) + version.Update(cpt.clients, &k8sConfig{}) + k8s.SetClients(cpt.clients, cpt.clients.Slim(), cpt.clients, cpt.clients) proxy.DefaultDNSProxy = fqdnproxy.MockFQDNProxy{} @@ -142,7 +139,7 @@ func (cpt *ControlPlaneTest) StartAgent() *ControlPlaneTest { if cpt.agentHandle != nil { cpt.t.Fatal("StartAgent() already called") } - datapath, agentHandle, err := startCiliumAgent(cpt.nodeName, cpt.clients) + datapath, agentHandle, err := startCiliumAgent(cpt.nodeName) if err != nil { cpt.t.Fatalf("Failed to start cilium agent: %s", err) } @@ -170,7 +167,7 @@ func (cpt *ControlPlaneTest) StartOperator() *ControlPlaneTest { cpt.operatorHandle = &operatorHandle{ cancel: cancel, } - operatorCmd.OnOperatorStartLeading(context) + operatorCmd.OnOperatorStartLeading(context, cpt.clients) return cpt }