From 7dc13f82781ec6d0258e398281f847217d252513 Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 21:32:13 +0000 Subject: [PATCH] crdwatcher: Reuse watcher.K8sWatcher to watch TracingPolicy Before, to watch TracingPolicy(Namespaced) custom resources, we were reading Kubernetes config, creating an informer factory and starting it, all in the WatchTracePolicy function. As now the watcher.K8sWatcher struct initializes informer factories, we can reuse it instead of configuring everything from scratch. This commit replaces the WatchTracePolicy function with AddTracingPolicyInformer, which only adds informers to the K8sWatcher. Initialization and start of the K8sWatcher happen in main. Signed-off-by: Anna Kapuscinska --- cmd/tetragon/main.go | 9 +++-- .../observer_test_helper.go | 35 +++++++++--------- pkg/watcher/crdwatcher/tracingpolicy.go | 36 +++++++++++-------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index eb138e06f09..47afb25281c 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -431,6 +431,12 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu if err != nil { return err } + if option.Config.EnableTracingPolicyCRD { + err := crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, observer.GetSensorManager()) + if err != nil { + return err + } + } } else { log.Info("Disabling Kubernetes API") k8sWatcher = watcher.NewFakeK8sWatcher(nil) @@ -504,9 +510,6 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu log.WithField("enabled", option.Config.ExportFilename != "").WithField("fileName", option.Config.ExportFilename).Info("Exporter configuration") obs.AddListener(pm) saveInitInfo() - if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD { - go crdwatcher.WatchTracePolicy(ctx, observer.GetSensorManager()) - } obs.LogPinnedBpf(observerDir) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index e580d184a95..251de16b8a5 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -19,24 +19,28 @@ import ( "testing" "time" - "github.com/cilium/tetragon/pkg/cgrouprate" - "github.com/cilium/tetragon/pkg/defaults" - "github.com/cilium/tetragon/pkg/encoder" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" - "github.com/cilium/tetragon/pkg/metricsconfig" - "github.com/cilium/tetragon/pkg/observer" - "github.com/cilium/tetragon/pkg/policyfilter" - "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/bpf" "github.com/cilium/tetragon/pkg/btf" "github.com/cilium/tetragon/pkg/bugtool" + "github.com/cilium/tetragon/pkg/cgrouprate" + "github.com/cilium/tetragon/pkg/defaults" + "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/exporter" tetragonGrpc "github.com/cilium/tetragon/pkg/grpc" + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" + "github.com/cilium/tetragon/pkg/metricsconfig" + "github.com/cilium/tetragon/pkg/observer" "github.com/cilium/tetragon/pkg/option" + "github.com/cilium/tetragon/pkg/policyfilter" "github.com/cilium/tetragon/pkg/process" "github.com/cilium/tetragon/pkg/reader/namespace" "github.com/cilium/tetragon/pkg/rthooks" @@ -44,14 +48,9 @@ import ( "github.com/cilium/tetragon/pkg/sensors/base" "github.com/cilium/tetragon/pkg/sensors/exec/procevents" "github.com/cilium/tetragon/pkg/testutils" + "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/watcher" "github.com/cilium/tetragon/pkg/watcher/crdwatcher" - - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" ) var ( @@ -374,7 +373,7 @@ func getDefaultSensors(tb testing.TB, initialSensor *sensors.Sensor, opts ...Tes } func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, opts *testExporterOptions, oo *testObserverOptions) error { - watcher := opts.watcher + k8sWatcher := opts.watcher processCacheSize := 32768 dataCacheSize := 1024 procCacheGCInterval := defaults.DefaultProcessCacheGCInterval @@ -391,7 +390,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op }) if oo.crd { - crdwatcher.WatchTracePolicy(ctx, sensorManager) + crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, sensorManager) } if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil { @@ -402,7 +401,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op procCacheGCInterval = oo.procCacheGCInterval } - if err := process.InitCache(watcher, processCacheSize, procCacheGCInterval); err != nil { + if err := process.InitCache(k8sWatcher, processCacheSize, procCacheGCInterval); err != nil { return err } @@ -413,7 +412,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op var cancelWg sync.WaitGroup // use an empty hooks runner - hookRunner := (&rthooks.Runner{}).WithWatcher(watcher) + hookRunner := (&rthooks.Runner{}).WithWatcher(k8sWatcher) // For testing we disable the eventcache and cilium cache by default. If we // enable these then every tests would need to wait for the 1.5 mimutes needed diff --git a/pkg/watcher/crdwatcher/tracingpolicy.go b/pkg/watcher/crdwatcher/tracingpolicy.go index 42a50316a5b..a5ef31c5270 100644 --- a/pkg/watcher/crdwatcher/tracingpolicy.go +++ b/pkg/watcher/crdwatcher/tracingpolicy.go @@ -11,16 +11,13 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1" - "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/sensors" "github.com/cilium/tetragon/pkg/tracingpolicy" - k8sconf "github.com/cilium/tetragon/pkg/watcher/conf" + "github.com/cilium/tetragon/pkg/watcher" ) // Log "missing tracing policy" message once. @@ -182,16 +179,18 @@ func updateTracingPolicy(ctx context.Context, log logrus.FieldLogger, s *sensors } } -func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { +func AddTracingPolicyInformer(ctx context.Context, w watcher.Watcher, s *sensors.Manager) error { log := logger.GetLogger() - conf, err := k8sconf.K8sConfig() - if err != nil { - log.WithError(err).Fatal("couldn't get cluster config") + if w == nil { + return fmt.Errorf("k8s watcher not initialized") + } + factory := w.GetCRDInformerFactory() + if factory == nil { + return fmt.Errorf("CRD informer factory not initialized") } - client := versioned.NewForConfigOrDie(conf) - factory := externalversions.NewSharedInformerFactory(client, 0) - factory.Cilium().V1alpha1().TracingPolicies().Informer().AddEventHandler( + tpInformer := factory.Cilium().V1alpha1().TracingPolicies().Informer() + tpInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -202,8 +201,13 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err := w.AddInformer("TracingPolicy", tpInformer, nil) + if err != nil { + return fmt.Errorf("failed to add TracingPolicy informer: %w", err) + } - factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer().AddEventHandler( + tpnInformer := factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer() + tpnInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -214,8 +218,10 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err = w.AddInformer("TracingPolicyNamespaced", tpnInformer, nil) + if err != nil { + return fmt.Errorf("failed to add TracingPolicyNamespaced informer: %w", err) + } - go factory.Start(wait.NeverStop) - factory.WaitForCacheSync(wait.NeverStop) - log.Info("Started watching tracing policies") + return nil }