Skip to content

Commit

Permalink
fix: Server-side v1.Event filtering (#191)
Browse files Browse the repository at this point in the history
* fix: Server-side v1.Event filtering

* use '0' as patch if we fail to get GitVersion
  • Loading branch information
KarolisL authored Oct 15, 2024
1 parent 2e43d48 commit fddae45
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
33 changes: 24 additions & 9 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type Controller struct {
discovery discovery.DiscoveryInterface
metricsClient versioned.Interface
informerFactory informers.SharedInformerFactory
// independentInformers are not handled by informerFactory.
// Initial use-case for them are Event informers:
// they need independent ListOptions
independentInformers map[string]cache.SharedIndexInformer

delta *delta.Delta
deltaMu sync.Mutex
Expand Down Expand Up @@ -197,15 +201,17 @@ func New(

queue := workqueue.NewNamed("castai-agent")

f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
defaultResync := 0 * time.Second
f := informers.NewSharedInformerFactory(clientset, defaultResync)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, defaultResync)
discovery := clientset.Discovery()

defaultInformers := getDefaultInformers(f, castwareNamespace)
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
additionalTransformers := createAdditionalTransformers(cfg)

handledInformers := map[string]*custominformers.HandledInformer{}
independentInformers := map[string]cache.SharedIndexInformer{}
for typ, i := range defaultInformers {
name := typ.String()
if !informerEnabled(cfg, name) {
Expand All @@ -217,10 +223,12 @@ func New(
eventType := reflect.TypeOf(&corev1.Event{})
autoscalerEvents := fmt.Sprintf("%s:autoscaler", eventType)
if informerEnabled(cfg, autoscalerEvents) {
informer := createEventInformer(clientset, defaultResync, v, autoscalerevents.ListOpts)
independentInformers[autoscalerEvents] = informer
handledInformers[autoscalerEvents] = custominformers.NewHandledInformer(
log,
queue,
createEventInformer(f, v, autoscalerevents.ListOpts),
informer,
eventType,
filters.Filters{
{
Expand All @@ -232,10 +240,12 @@ func New(
}
oomEvents := fmt.Sprintf("%s:oom", eventType)
if informerEnabled(cfg, oomEvents) {
informer := createEventInformer(clientset, defaultResync, v, oomevents.ListOpts)
independentInformers[oomEvents] = informer
handledInformers[oomEvents] = custominformers.NewHandledInformer(
log,
queue,
createEventInformer(f, v, oomevents.ListOpts),
informer,
eventType,
filters.Filters{
{
Expand All @@ -255,6 +265,7 @@ func New(
delta: delta.New(log, clusterID, v.Full(), agentVersion.Version),
queue: queue,
informers: handledInformers,
independentInformers: independentInformers,
agentVersion: agentVersion,
healthzProvider: healthzProvider,
metricsClient: metricsClient,
Expand Down Expand Up @@ -648,6 +659,9 @@ func throttleLog(ctx context.Context, log logrus.FieldLogger, objType string, wa

func (c *Controller) Start(done <-chan struct{}) {
c.informerFactory.Start(done)
for _, informer := range c.independentInformers {
go informer.Run(done)
}
}
func extractGroupVersionsFromApiResourceError(log logrus.FieldLogger, err error) map[schema.GroupVersion]bool {
cleanedString := strings.Split(err.Error(), "unable to retrieve the complete list of server APIs: ")[1]
Expand Down Expand Up @@ -889,11 +903,12 @@ func getDefaultInformers(f informers.SharedInformerFactory, castwareNamespace st
}
}

func createEventInformer(f informers.SharedInformerFactory, v version.Interface, listOptions func(*metav1.ListOptions, version.Interface)) cache.SharedIndexInformer {
return f.InformerFor(&corev1.Event{}, func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return v1.NewFilteredEventInformer(client, corev1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(options *metav1.ListOptions) {
listOptions(options, v)
})
// createEventInformer creates a new event informer with the given list options.
// We can't use sharedInformerFactory because it would reuse the first registered ListOptions
// for all the informers.
func createEventInformer(client kubernetes.Interface, resyncPeriod time.Duration, v version.Interface, listOptions func(*metav1.ListOptions, version.Interface)) cache.SharedIndexInformer {
return v1.NewFilteredEventInformer(client, corev1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(options *metav1.ListOptions) {
listOptions(options, v)
})
}

Expand Down
7 changes: 6 additions & 1 deletion internal/services/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/version"
Expand Down Expand Up @@ -43,7 +44,11 @@ type Version struct {
}

func (v *Version) Full() string {
return v.v.Major + "." + v.v.Minor
if v.v.GitVersion != "" {
return strings.TrimPrefix(v.v.GitVersion, "v")
}
// We should rarely, if ever, have empty GitVersion, but this is just in case
return v.v.Major + "." + v.v.Minor + ".0"
}

func (v *Version) MinorInt() int {
Expand Down
9 changes: 5 additions & 4 deletions internal/services/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (

func Test(t *testing.T) {
v := version.Info{
Major: "1",
Minor: "21+",
GitCommit: "2812f9fb0003709fc44fc34166701b377020f1c9",
Major: "1",
Minor: "21+",
GitVersion: "v1.21.0",
GitCommit: "2812f9fb0003709fc44fc34166701b377020f1c9",
}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, err := json.Marshal(v)
Expand All @@ -39,6 +40,6 @@ func Test(t *testing.T) {
}

require.NoError(t, err)
require.Equal(t, "1.21+", got.Full())
require.Equal(t, "1.21.0", got.Full())
require.Equal(t, 21, got.MinorInt())
}

0 comments on commit fddae45

Please sign in to comment.