Skip to content

Commit

Permalink
operator: Use hive and the k8s-client
Browse files Browse the repository at this point in the history
This switches the operator to use the k8-client provided via the k8s-client cell
instead of going via the global variables in pkg/k8s. Code is refactored to pass
around the clientset.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki authored and pchaigno committed Sep 15, 2022
1 parent fc41aeb commit af61d36
Show file tree
Hide file tree
Showing 30 changed files with 258 additions and 312 deletions.
2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-alibabacloud.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-aws.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-azure.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-generic.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions operator/cmd/ccnp_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
Expand All @@ -31,7 +32,7 @@ func k8sEventMetric(scope, action string) {
// clusterwide policies. Since, internally Clusterwide policies are implemented
// using CiliumNetworkPolicy itself, the entire implementation uses the methods
// associcated with CiliumNetworkPolicy.
func enableCCNPWatcher() error {
func enableCCNPWatcher(clientset k8sClient.Clientset) error {
enableCNPStatusUpdates := kvstoreEnabled() && option.Config.K8sEventHandover && !option.Config.DisableCNPStatusUpdates
if enableCNPStatusUpdates {
log.Info("Starting a CCNP Status handover from kvstore to k8s")
Expand Down Expand Up @@ -63,7 +64,7 @@ func enableCCNPWatcher() error {
}

ciliumV2Controller := informer.NewInformerWithStore(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](ciliumK8sClient.CiliumV2().CiliumClusterwideNetworkPolicies()),
utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](clientset.CiliumV2().CiliumClusterwideNetworkPolicies()),
&cilium_v2.CiliumClusterwideNetworkPolicy{},
0,
cache.ResourceEventHandlerFuncs{
Expand Down
31 changes: 17 additions & 14 deletions operator/cmd/cilium_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cilium/cilium/pkg/ipam/allocator"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
v2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/informer"
v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
Expand Down Expand Up @@ -50,7 +51,7 @@ var (
k8sCiliumNodesCacheSynced = make(chan struct{})
)

func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.NodeEventHandler, withKVStore bool) error {
func startSynchronizingCiliumNodes(ctx context.Context, clientset k8sClient.Clientset, nodeManager allocator.NodeEventHandler, withKVStore bool) error {
var (
ciliumNodeKVStore *store.SharedStore
err error
Expand Down Expand Up @@ -208,7 +209,7 @@ func startSynchronizingCiliumNodes(ctx context.Context, nodeManager allocator.No
// introducing a slim version of it.
var ciliumNodeInformer cache.Controller
ciliumNodeStore, ciliumNodeInformer = informer.NewInformer(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumNodeList](ciliumK8sClient.CiliumV2().CiliumNodes()),
utils.ListerWatcherFromTyped[*cilium_v2.CiliumNodeList](clientset.CiliumV2().CiliumNodes()),
&cilium_v2.CiliumNode{},
0,
resourceEventHandler,
Expand Down Expand Up @@ -313,39 +314,41 @@ func processNextWorkItem(queue workqueue.RateLimitingInterface, syncHandler func
return true
}

type ciliumNodeUpdateImplementation struct{}
type ciliumNodeUpdateImplementation struct {
clientset k8sClient.Clientset
}

func (c *ciliumNodeUpdateImplementation) Create(node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{})
return c.clientset.CiliumV2().CiliumNodes().Create(context.TODO(), node, meta_v1.CreateOptions{})
}

func (c *ciliumNodeUpdateImplementation) Get(node string) (*cilium_v2.CiliumNode, error) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Get(context.TODO(), node, meta_v1.GetOptions{})
return c.clientset.CiliumV2().CiliumNodes().Get(context.TODO(), node, meta_v1.GetOptions{})
}

func (c *ciliumNodeUpdateImplementation) UpdateStatus(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
if origNode == nil || !origNode.Status.DeepEqual(&node.Status) {
return ciliumK8sClient.CiliumV2().CiliumNodes().UpdateStatus(context.TODO(), node, meta_v1.UpdateOptions{})
return c.clientset.CiliumV2().CiliumNodes().UpdateStatus(context.TODO(), node, meta_v1.UpdateOptions{})
}
return nil, nil
}

func (c *ciliumNodeUpdateImplementation) Update(origNode, node *cilium_v2.CiliumNode) (*cilium_v2.CiliumNode, error) {
if origNode == nil || !origNode.Spec.DeepEqual(&node.Spec) {
return ciliumK8sClient.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{})
return c.clientset.CiliumV2().CiliumNodes().Update(context.TODO(), node, meta_v1.UpdateOptions{})
}
return nil, nil
}

func RunCNPNodeStatusGC(nodeStore cache.Store) {
go runCNPNodeStatusGC("cnp-node-gc", false, nodeStore)
go runCNPNodeStatusGC("ccnp-node-gc", true, nodeStore)
func RunCNPNodeStatusGC(clientset k8sClient.Clientset, nodeStore cache.Store) {
go runCNPNodeStatusGC("cnp-node-gc", false, clientset, nodeStore)
go runCNPNodeStatusGC("ccnp-node-gc", true, clientset, nodeStore)
}

// runCNPNodeStatusGC runs the node status garbage collector for cilium network
// policies. The policy corresponds to CiliumClusterwideNetworkPolicy if the clusterwide
// parameter is true and CiliumNetworkPolicy otherwise.
func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) {
func runCNPNodeStatusGC(name string, clusterwide bool, clientset k8sClient.Clientset, nodeStore cache.Store) {
parallelRequests := 4
removeNodeFromCNP := make(chan func(), 50)
for i := 0; i < parallelRequests; i++ {
Expand All @@ -369,7 +372,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) {
var cnpItemsList []cilium_v2.CiliumNetworkPolicy

if clusterwide {
ccnpList, err := ciliumK8sClient.CiliumV2().CiliumClusterwideNetworkPolicies().List(ctx,
ccnpList, err := clientset.CiliumV2().CiliumClusterwideNetworkPolicies().List(ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
Expand All @@ -386,7 +389,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) {
}
continueID = ccnpList.Continue
} else {
cnpList, err := ciliumK8sClient.CiliumV2().CiliumNetworkPolicies(core_v1.NamespaceAll).List(ctx,
cnpList, err := clientset.CiliumV2().CiliumNetworkPolicies(core_v1.NamespaceAll).List(ctx,
meta_v1.ListOptions{
Limit: 10,
Continue: continueID,
Expand Down Expand Up @@ -421,7 +424,7 @@ func runCNPNodeStatusGC(name string, clusterwide bool, nodeStore cache.Store) {
wg.Add(1)
cnpCpy := cnp.DeepCopy()
removeNodeFromCNP <- func() {
updateCNP(ciliumK8sClient.CiliumV2(), cnpCpy, nodesToDelete)
updateCNP(clientset.CiliumV2(), cnpCpy, nodesToDelete)
wg.Done()
}
}
Expand Down
5 changes: 3 additions & 2 deletions operator/cmd/cnp_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
Expand All @@ -36,7 +37,7 @@ func init() {

// enableCNPWatcher waits for the CiliumNetowrkPolicy CRD availability and then
// garbage collects stale CiliumNetowrkPolicy status field entries.
func enableCNPWatcher() error {
func enableCNPWatcher(clientset k8sClient.Clientset) error {
enableCNPStatusUpdates := kvstoreEnabled() && option.Config.K8sEventHandover && !option.Config.DisableCNPStatusUpdates
if enableCNPStatusUpdates {
log.Info("Starting CNP Status handover from kvstore to k8s")
Expand Down Expand Up @@ -68,7 +69,7 @@ func enableCNPWatcher() error {
}

ciliumV2Controller := informer.NewInformerWithStore(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumNetworkPolicyList](ciliumK8sClient.CiliumV2().CiliumNetworkPolicies("")),
utils.ListerWatcherFromTyped[*cilium_v2.CiliumNetworkPolicyList](clientset.CiliumV2().CiliumNetworkPolicies("")),
&cilium_v2.CiliumNetworkPolicy{},
0,
cache.ResourceEventHandlerFuncs{
Expand Down
18 changes: 0 additions & 18 deletions operator/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,12 @@ func init() {
option.KVStoreOpt, "Key-value store options e.g. etcd.address=127.0.0.1:4001")
option.BindEnv(Vp, option.KVStoreOpt)

flags.String(option.K8sAPIServer, "", "Kubernetes API server URL")
option.BindEnv(Vp, option.K8sAPIServer)

flags.Float32(option.K8sClientQPSLimit, defaults.K8sClientQPSLimit, "Queries per second limit for the K8s client")
flags.Int(option.K8sClientBurst, defaults.K8sClientBurst, "Burst value allowed for the K8s client")

flags.Bool(option.K8sEnableEndpointSlice, defaults.K8sEnableEndpointSlice, "Enables k8s EndpointSlice feature into Cilium-Operator if the k8s cluster supports it")
option.BindEnv(Vp, option.K8sEnableEndpointSlice)

flags.Bool(option.K8sEnableAPIDiscovery, defaults.K8sEnableAPIDiscovery, "Enable discovery of Kubernetes API groups and resources with the discovery API")
option.BindEnv(Vp, option.K8sEnableAPIDiscovery)

flags.String(option.K8sNamespaceName, "", "Name of the Kubernetes namespace in which Cilium Operator is deployed in")
option.BindEnv(Vp, option.K8sNamespaceName)

flags.String(option.K8sKubeConfigPath, "", "Absolute path of the kubernetes kubeconfig file")
option.BindEnv(Vp, option.K8sKubeConfigPath)

flags.Duration(operatorOption.NodesGCInterval, 5*time.Minute, "GC interval for CiliumNodes")
option.BindEnv(Vp, operatorOption.NodesGCInterval)

Expand Down Expand Up @@ -288,12 +276,6 @@ func init() {
flags.MarkHidden(option.CMDRef)
option.BindEnv(Vp, option.CMDRef)

flags.Int(option.GopsPort, defaults.GopsPortOperator, "Port for gops server to listen on")
option.BindEnv(Vp, option.GopsPort)

flags.Duration(option.K8sHeartbeatTimeout, 30*time.Second, "Configures the timeout for api-server heartbeat, set to 0 to disable")
option.BindEnv(Vp, option.K8sHeartbeatTimeout)

flags.Duration(operatorOption.LeaderElectionLeaseDuration, 15*time.Second,
"Duration that non-leader operator candidates will wait before forcing to acquire leadership")
option.BindEnv(Vp, operatorOption.LeaderElectionLeaseDuration)
Expand Down
16 changes: 7 additions & 9 deletions operator/cmd/k8s_cep_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/operator/watchers"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand All @@ -32,16 +32,14 @@ import (
// delete CEP if the corresponding pod does not exist
//
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC(once bool) {
func enableCiliumEndpointSyncGC(clientset k8sClient.Clientset, once bool) {
var (
controllerName = "to-k8s-ciliumendpoint-gc"
scopedLog = log.WithField("controller", controllerName)
gcInterval time.Duration
stopCh = make(chan struct{})
)

ciliumClient := ciliumK8sClient.CiliumV2()

if once {
log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources")
gcInterval = 0
Expand All @@ -51,12 +49,12 @@ func enableCiliumEndpointSyncGC(once bool) {
}

// This functions will block until the resources are synced with k8s.
watchers.CiliumEndpointsInit(ciliumClient, stopCh)
watchers.CiliumEndpointsInit(clientset, stopCh)
if !once {
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state.
watchers.PodsInit(k8s.WatcherClient(), stopCh)
watchers.PodsInit(clientset, stopCh)
}
<-k8sCiliumNodesCacheSynced

Expand All @@ -65,13 +63,13 @@ func enableCiliumEndpointSyncGC(once bool) {
controller.ControllerParams{
RunInterval: gcInterval,
DoFunc: func(ctx context.Context) error {
return doCiliumEndpointSyncGC(ctx, once, stopCh, scopedLog)
return doCiliumEndpointSyncGC(ctx, clientset, once, stopCh, scopedLog)
},
})
}

func doCiliumEndpointSyncGC(ctx context.Context, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error {
ciliumClient := ciliumK8sClient.CiliumV2()
func doCiliumEndpointSyncGC(ctx context.Context, clientset k8sClient.Clientset, once bool, stopCh chan struct{}, scopedLog *logrus.Entry) error {
ciliumClient := clientset.CiliumV2()
// For each CEP we fetched, check if we know about it
for _, cepObj := range watchers.CiliumEndpointStore.List() {
cep, ok := cepObj.(*cilium_v2.CiliumEndpoint)
Expand Down
Loading

0 comments on commit af61d36

Please sign in to comment.