From 0b42dac00a310884a20d63dd9d7db96f4e676671 Mon Sep 17 00:00:00 2001 From: Gleb Stepanov Date: Tue, 27 Aug 2024 12:24:33 +0200 Subject: [PATCH] KUBE-424: Migrate from core/v1 events to events.k8s.io/v1 (#120) KUBE-424: migrate to events.k8s.io/v1 from core/v1 events Co-authored-by: gleb --- actions/create_event_handler.go | 145 ++++++++------------------- actions/create_event_handler_test.go | 121 ++++++++++++++++++---- go.mod | 1 + 3 files changed, 144 insertions(+), 123 deletions(-) diff --git a/actions/create_event_handler.go b/actions/create_event_handler.go index cc5643dd..4fb9915a 100644 --- a/actions/create_event_handler.go +++ b/actions/create_event_handler.go @@ -2,31 +2,43 @@ package actions import ( "context" - "encoding/json" "fmt" - "strings" + "sync" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" + typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "github.com/castai/cluster-controller/castai" ) func newCreateEventHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler { + factory := func(ns string) (record.EventBroadcaster, record.EventRecorder) { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: clientset.CoreV1().Events(ns)}) + eventBroadcaster.StartStructuredLogging(0) + log.Debug("create new broadcaster and recorder for namespace: %s", ns) + // Create an event recorder + return eventBroadcaster, eventBroadcaster.NewRecorder(nil, v1.EventSource{}) + } return &createEventHandler{ - log: log, - clientset: clientset, + log: log, + clientSet: clientset, + recorderFactory: factory, + eventNsBroadcaster: map[string]record.EventBroadcaster{}, + eventNsRecorder: map[string]record.EventRecorder{}, } } type createEventHandler struct { - log logrus.FieldLogger - clientset kubernetes.Interface + log logrus.FieldLogger + clientSet kubernetes.Interface + recorderFactory func(string) (record.EventBroadcaster, record.EventRecorder) + mu sync.RWMutex + eventNsBroadcaster map[string]record.EventBroadcaster + eventNsRecorder map[string]record.EventRecorder } func (h *createEventHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { @@ -34,107 +46,32 @@ func (h *createEventHandler) Handle(ctx context.Context, action *castai.ClusterA if !ok { return fmt.Errorf("unexpected type %T for create event handler", action.Data()) } - namespace := req.ObjectRef.Namespace if namespace == "" { namespace = v1.NamespaceDefault } - - event := &v1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%v.%x", req.ObjectRef.Name, req.EventTime.Unix()), - Namespace: namespace, - }, - EventTime: metav1.NewMicroTime(req.EventTime), - ReportingController: req.Reporter, - ReportingInstance: req.Reporter, - InvolvedObject: req.ObjectRef, - Type: req.EventType, - Reason: req.Reason, - Action: req.Action, - Message: req.Message, - FirstTimestamp: metav1.NewTime(req.EventTime), - LastTimestamp: metav1.NewTime(req.EventTime), - Count: 1, - } - - similarEvent, err := h.searchSimilarEvent(&req.ObjectRef, event) - if err != nil { - return fmt.Errorf("searching for similar event for ref %v: %w", req.ObjectRef, err) - } - - if similarEvent != nil { - event.Name = similarEvent.Name - event.ResourceVersion = similarEvent.ResourceVersion - event.FirstTimestamp = similarEvent.FirstTimestamp - event.Count = similarEvent.Count + 1 - - newData, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("marshaling new event for ref %v: %w", req.ObjectRef, err) - } - - oldData, err := json.Marshal(similarEvent) - if err != nil { - return fmt.Errorf("marshaling old event for ref %v: %w", req.ObjectRef, err) - } - - patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, event) - if err != nil { - return fmt.Errorf("creating event merge patch for ref %v: %w", req.ObjectRef, err) - } - - _, err = h.clientset.CoreV1(). - Events(event.Namespace). - Patch(ctx, similarEvent.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("patching event for ref %v: %w", req.ObjectRef, err) - } - // Patching might fail if the event was removed while we were constructing the request body, so just - // recreate the event. - if err == nil { - return nil - } - } - - if _, err := h.clientset.CoreV1().Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("creating event for ref %v: %w", req.ObjectRef, err) - } - + h.handleEventV1(ctx, req, namespace) return nil } -func (h *createEventHandler) searchSimilarEvent(ref *v1.ObjectReference, event *v1.Event) (*v1.Event, error) { - // Scheme is not needed when searching for reference. Scheme is needed only for raw runtime objects. - resp, err := h.clientset.CoreV1().Events(event.Namespace).Search(nil, ref) - if err != nil { - return nil, fmt.Errorf("searching events for ref %+v: %w", ref, err) - } - - key := getEventKey(event) - - for i := range resp.Items { - if getEventKey(&resp.Items[i]) != key { - continue +func (h *createEventHandler) handleEventV1(_ context.Context, req *castai.ActionCreateEvent, namespace string) { + h.mu.RLock() + h.log.Debug("handling create event action: %s type: %s", req.Action, req.EventType) + if recorder, ok := h.eventNsRecorder[namespace]; ok { + recorder.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message) + h.mu.RUnlock() + } else { + h.mu.RUnlock() + h.mu.Lock() + // Double check after acquiring the lock. + if recorder, ok := h.eventNsRecorder[namespace]; !ok { + broadcaster, rec := h.recorderFactory(namespace) + h.eventNsBroadcaster[namespace] = broadcaster + h.eventNsRecorder[namespace] = rec + rec.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message) + } else { + recorder.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message) } - return &resp.Items[i], nil + h.mu.Unlock() } - - return nil, nil -} - -func getEventKey(event *v1.Event) string { - return strings.Join([]string{ - event.InvolvedObject.Kind, - event.InvolvedObject.Namespace, - event.InvolvedObject.Name, - event.InvolvedObject.FieldPath, - string(event.InvolvedObject.UID), - event.InvolvedObject.APIVersion, - event.Type, - event.Reason, - event.Action, - event.Message, - event.ReportingController, - }, "") } diff --git a/actions/create_event_handler_test.go b/actions/create_event_handler_test.go index 50455acd..bdc2c78e 100644 --- a/actions/create_event_handler_test.go +++ b/actions/create_event_handler_test.go @@ -3,6 +3,7 @@ package actions import ( "context" "fmt" + "sync" "testing" "time" @@ -10,10 +11,12 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" "github.com/castai/cluster-controller/castai" ) @@ -85,33 +88,113 @@ func TestCreateEvent(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { clientSet := fake.NewSimpleClientset(test.object) + recorder := record.NewFakeRecorder(test.actionCount) + broadCaster := record.NewBroadcasterForTests(time.Second * 10) h := createEventHandler{ log: logrus.New(), - clientset: clientSet, + clientSet: clientSet, + eventNsRecorder: map[string]record.EventRecorder{ + "castai": recorder, + }, + eventNsBroadcaster: map[string]record.EventBroadcaster{ + "castai": broadCaster, + }, } - ctx := context.Background() + wg := sync.WaitGroup{} + wg.Add(test.actionCount) for i := 0; i < test.actionCount; i++ { - err := h.Handle(ctx, test.action) - r.NoError(err) + go func() { + err := h.Handle(ctx, test.action) + r.NoError(err) + wg.Done() + }() } + wg.Wait() + events := make([]string, 0, test.actionCount) + for i := 0; i < test.actionCount; i++ { + select { + case event := <-recorder.Events: + events = append(events, event) + default: + t.Errorf("not enough events expected %d actual %d", test.actionCount, i) + continue + } + } + for i := 0; i < test.actionCount; i++ { + r.Contains(events[i], v1.EventTypeNormal) + r.Contains(events[i], test.expectedEvent.Action) + r.Contains(events[i], test.expectedEvent.Reason) + r.Contains(events[i], test.expectedEvent.Message) + } + broadCaster.Shutdown() + }) + } +} - eventTime := test.action.ActionCreateEvent.EventTime - testEventName := fmt.Sprintf("%v.%x", test.action.ActionCreateEvent.ObjectRef.Name, eventTime.Unix()) - - testEvent, err := clientSet.CoreV1(). - Events(test.expectedEvent.Namespace). - Get(ctx, testEventName, metav1.GetOptions{}) +func TestRandomNs(t *testing.T) { + t.Parallel() + r := require.New(t) + actionCount := 10 + clientSet := fake.NewSimpleClientset(testPod(types.UID(uuid.New().String()))) + recorders := make([]*record.FakeRecorder, 0, actionCount) + h := createEventHandler{ + log: logrus.New(), + clientSet: clientSet, + recorderFactory: func(ns string) (record.EventBroadcaster, record.EventRecorder) { + broadcaster := record.NewBroadcasterForTests(time.Second * 10) + rec := record.NewFakeRecorder(actionCount) + recorders = append(recorders, rec) + return broadcaster, rec + }, + eventNsRecorder: map[string]record.EventRecorder{}, + eventNsBroadcaster: map[string]record.EventBroadcaster{}, + } + ctx := context.Background() + wg := sync.WaitGroup{} + wg.Add(actionCount) + for i := 0; i < actionCount; i++ { + go func() { + err := h.Handle(ctx, &castai.ClusterAction{ + ID: uuid.New().String(), + ActionCreateEvent: &castai.ActionCreateEvent{ + ObjectRef: podObjReference( + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("testPod-%s", uuid.NewString()[:4]), + Namespace: uuid.NewString(), + }, + }), + Reporter: "provisioning.cast.ai", + EventTime: time.Now(), + EventType: "Warning", + Reason: "Just because!", + Action: "During node creation.", + Message: "Oh common, you can do better.", + }, + }) r.NoError(err) - - r.Equal(test.expectedEvent.Type, testEvent.Type) - r.Equal(test.expectedEvent.Reason, testEvent.Reason) - r.Equal(test.expectedEvent.Action, testEvent.Action) - r.Equal(test.expectedEvent.Message, testEvent.Message) - r.Equal(test.expectedEvent.ReportingController, testEvent.ReportingController) - r.Equal(test.expectedEvent.ReportingInstance, testEvent.ReportingInstance) - r.EqualValues(test.actionCount, testEvent.Count) - }) + wg.Done() + }() + } + wg.Wait() + events := make([]string, 0, actionCount) + for i := range recorders { + select { + case event := <-recorders[i].Events: + events = append(events, event) + default: + t.Errorf("not enough events expected %d actual %d", actionCount, i) + continue + } + } + for ns, broadCaster := range h.eventNsBroadcaster { + t.Logf("shutting down broadcaster for ns %s", ns) + broadCaster.Shutdown() + } + r.Len(events, actionCount) + for i := 0; i < actionCount; i++ { + r.Contains(events[i], v1.EventTypeNormal) } } diff --git a/go.mod b/go.mod index 5f05daa5..040a6a95 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/go-openapi/swag v0.22.3 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/gnostic-models v0.6.8 // indirect