From 96a8ffafdb84b9bf1963e063178c0bca753bf731 Mon Sep 17 00:00:00 2001 From: mvaal Date: Fri, 10 Mar 2023 15:16:19 -0600 Subject: [PATCH] Replace `openpolicyagent.org/kube-mgmt-retries` annotation with in-memory map --- pkg/configmap/configmap.go | 60 ++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/pkg/configmap/configmap.go b/pkg/configmap/configmap.go index 4ae24147..6ed2270e 100644 --- a/pkg/configmap/configmap.go +++ b/pkg/configmap/configmap.go @@ -12,7 +12,6 @@ import ( "sort" "strings" "time" - "strconv" "github.com/open-policy-agent/kube-mgmt/pkg/opa" "github.com/sirupsen/logrus" @@ -31,9 +30,8 @@ import ( ) const ( - defaultRetries = 2 + defaultRetries = 2 statusAnnotationKey = "openpolicyagent.org/kube-mgmt-status" - retriesAnnotationKey = "openpolicyagent.org/kube-mgmt-retries" // Special namespace in Kubernetes federation that holds scheduling policies. // commented because staticcheck: 'const kubeFederationSchedulingPolicy is unused (U1000)' // kubeFederationSchedulingPolicy = "kube-federation-scheduling-policy" @@ -42,8 +40,12 @@ const ( syncResetBackoffMax = time.Second * 30 ) +var ( + kubeMgmtRetries = make(map[string]int) +) + // Label validator -func CustomLabel(key, value string) error { +func CustomLabel(key, value string) error { _, err := labels.NewRequirement(key, selection.Equals, []string{value}) if err != nil { return err @@ -135,7 +137,7 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) { } quit := make(chan struct{}) - logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces) + logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces) for _, namespace := range namespaces { if namespace == "*" { namespace = v1.NamespaceAll @@ -162,7 +164,7 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) { func (s *Sync) add(obj interface{}) { cm := obj.(*v1.ConfigMap) if match, isPolicy := s.matcher(cm); match { - logrus.Debugf("OnAdd cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy) + logrus.Debugf("OnAdd cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy) s.syncAdd(cm, isPolicy) } } @@ -170,13 +172,13 @@ func (s *Sync) add(obj interface{}) { func (s *Sync) update(oldObj, obj interface{}) { oldCm, cm := oldObj.(*v1.ConfigMap), obj.(*v1.ConfigMap) if match, isPolicy := s.matcher(cm); match { - logrus.Debugf("OnUpdate cm=%v/%v, isPolicy=%v, oldVer=%v, newVer=%v", - cm.Namespace, cm.Name, isPolicy, oldCm.GetResourceVersion(), cm.GetResourceVersion()) + logrus.Debugf("OnUpdate cm=%v/%v, isPolicy=%v, oldVer=%v, newVer=%v", + cm.Namespace, cm.Name, isPolicy, oldCm.GetResourceVersion(), cm.GetResourceVersion()) if cm.GetResourceVersion() != oldCm.GetResourceVersion() { newFp, oldFp := fingerprint(cm), fingerprint(oldCm) - rtrVal := cm.Annotations[retriesAnnotationKey] + rtrVal := kubeMgmtRetries[cm.Name] logrus.Debugf("OnUpdate cm=%v/%v, retries=%v, oldFp=%v, newFp=%v", cm.Namespace, cm.Name, rtrVal, oldFp, newFp) - if newFp != oldFp || rtrVal != "0" { + if newFp != oldFp || rtrVal != 0 { s.syncAdd(cm, isPolicy) } } @@ -194,7 +196,7 @@ func (s *Sync) delete(obj interface{}) { } cm := obj.(*v1.ConfigMap) if match, isPolicy := s.matcher(cm); match { - logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name) + logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name) s.syncRemove(cm, isPolicy) } } @@ -215,7 +217,7 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) { var err error if isPolicy { err = s.opa.InsertPolicy(id, []byte(value)) - logrus.Infof("Added policy %v, err=%v", id, err) + logrus.Infof("Added policy %v, err=%v", id, err) } else { // We don't need to know the JSON structure, just pass it // directly to the OPA data store. @@ -232,22 +234,17 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) { } } if syncErr != nil { - var retries int = 0 - if isPolicy { - if rStr, ok := cm.Annotations[retriesAnnotationKey]; ok { - r, err := strconv.Atoi(rStr) - if err == nil && r > 0 { - retries = r - 1 - logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries) - } else if err == nil && r == 0 { - retries = defaultRetries - logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries) - } - } else { - retries = defaultRetries - logrus.Debugf("Adding policies error cm=%v, no retry annotation, new retry=%v", path, retries) - } - } + var retries int = 0 + if isPolicy { + r := kubeMgmtRetries[cm.Name] + if r > 0 { + retries = r - 1 + logrus.Debugf("Adding policies error cm=%v, old retry=%d, new retry=%d", path, r, retries) + } else if r == 0 { + retries = defaultRetries + logrus.Debugf("Adding policies error cm=%v, old retry=%d, new retry=%d", path, r, retries) + } + } s.setAnnotations(cm, status{ Status: "error", Error: syncErr, @@ -260,7 +257,7 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) { } func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) { - logrus.Debugf("Attempting to remove cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy) + logrus.Debugf("Attempting to remove cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy) path := fmt.Sprintf("%v/%v", cm.Namespace, cm.Name) for key := range cm.Data { id := fmt.Sprintf("%v/%v", path, key) @@ -275,6 +272,7 @@ func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) { } } } + delete(kubeMgmtRetries, cm.Name) } func (s *Sync) setAnnotations(cm *v1.ConfigMap, st status, retries int) { @@ -287,7 +285,6 @@ func (s *Sync) setAnnotations(cm *v1.ConfigMap, st status, retries int) { "metadata": map[string]interface{}{ "annotations": map[string]interface{}{ statusAnnotationKey: string(bs), - retriesAnnotationKey: strconv.Itoa(retries), }, }, } @@ -300,10 +297,11 @@ func (s *Sync) setAnnotations(cm *v1.ConfigMap, st status, retries int) { if err != nil { logrus.Errorf("Failed to %v for %v/%v: %v", statusAnnotationKey, cm.Namespace, cm.Name, err) } + kubeMgmtRetries[cm.Name] = retries } func (s *Sync) syncReset(id string) { - logrus.Debugf("Attempting to reset %v", id) + logrus.Debugf("Attempting to reset %v", id) d := syncResetBackoffMin for { if err := s.opa.PutData("/", map[string]interface{}{}); err != nil {