Skip to content

Commit

Permalink
KUBE-424: Migrate from core/v1 events to events.k8s.io/v1 (#120)
Browse files Browse the repository at this point in the history
KUBE-424: migrate to events.k8s.io/v1 from core/v1 events

Co-authored-by: gleb <[email protected]>
  • Loading branch information
stgleb and gleb authored Aug 27, 2024
1 parent 39f7e66 commit 0b42dac
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 123 deletions.
145 changes: 41 additions & 104 deletions actions/create_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,76 @@ package actions

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"github.com/castai/cluster-controller/castai"
)

func newCreateEventHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
factory := func(ns string) (record.EventBroadcaster, record.EventRecorder) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: clientset.CoreV1().Events(ns)})
eventBroadcaster.StartStructuredLogging(0)
log.Debug("create new broadcaster and recorder for namespace: %s", ns)
// Create an event recorder
return eventBroadcaster, eventBroadcaster.NewRecorder(nil, v1.EventSource{})
}
return &createEventHandler{
log: log,
clientset: clientset,
log: log,
clientSet: clientset,
recorderFactory: factory,
eventNsBroadcaster: map[string]record.EventBroadcaster{},
eventNsRecorder: map[string]record.EventRecorder{},
}
}

type createEventHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
log logrus.FieldLogger
clientSet kubernetes.Interface
recorderFactory func(string) (record.EventBroadcaster, record.EventRecorder)
mu sync.RWMutex
eventNsBroadcaster map[string]record.EventBroadcaster
eventNsRecorder map[string]record.EventRecorder
}

func (h *createEventHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionCreateEvent)
if !ok {
return fmt.Errorf("unexpected type %T for create event handler", action.Data())
}

namespace := req.ObjectRef.Namespace
if namespace == "" {
namespace = v1.NamespaceDefault
}

event := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", req.ObjectRef.Name, req.EventTime.Unix()),
Namespace: namespace,
},
EventTime: metav1.NewMicroTime(req.EventTime),
ReportingController: req.Reporter,
ReportingInstance: req.Reporter,
InvolvedObject: req.ObjectRef,
Type: req.EventType,
Reason: req.Reason,
Action: req.Action,
Message: req.Message,
FirstTimestamp: metav1.NewTime(req.EventTime),
LastTimestamp: metav1.NewTime(req.EventTime),
Count: 1,
}

similarEvent, err := h.searchSimilarEvent(&req.ObjectRef, event)
if err != nil {
return fmt.Errorf("searching for similar event for ref %v: %w", req.ObjectRef, err)
}

if similarEvent != nil {
event.Name = similarEvent.Name
event.ResourceVersion = similarEvent.ResourceVersion
event.FirstTimestamp = similarEvent.FirstTimestamp
event.Count = similarEvent.Count + 1

newData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshaling new event for ref %v: %w", req.ObjectRef, err)
}

oldData, err := json.Marshal(similarEvent)
if err != nil {
return fmt.Errorf("marshaling old event for ref %v: %w", req.ObjectRef, err)
}

patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
if err != nil {
return fmt.Errorf("creating event merge patch for ref %v: %w", req.ObjectRef, err)
}

_, err = h.clientset.CoreV1().
Events(event.Namespace).
Patch(ctx, similarEvent.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("patching event for ref %v: %w", req.ObjectRef, err)
}
// Patching might fail if the event was removed while we were constructing the request body, so just
// recreate the event.
if err == nil {
return nil
}
}

if _, err := h.clientset.CoreV1().Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating event for ref %v: %w", req.ObjectRef, err)
}

h.handleEventV1(ctx, req, namespace)
return nil
}

func (h *createEventHandler) searchSimilarEvent(ref *v1.ObjectReference, event *v1.Event) (*v1.Event, error) {
// Scheme is not needed when searching for reference. Scheme is needed only for raw runtime objects.
resp, err := h.clientset.CoreV1().Events(event.Namespace).Search(nil, ref)
if err != nil {
return nil, fmt.Errorf("searching events for ref %+v: %w", ref, err)
}

key := getEventKey(event)

for i := range resp.Items {
if getEventKey(&resp.Items[i]) != key {
continue
func (h *createEventHandler) handleEventV1(_ context.Context, req *castai.ActionCreateEvent, namespace string) {
h.mu.RLock()
h.log.Debug("handling create event action: %s type: %s", req.Action, req.EventType)
if recorder, ok := h.eventNsRecorder[namespace]; ok {
recorder.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message)
h.mu.RUnlock()
} else {
h.mu.RUnlock()
h.mu.Lock()
// Double check after acquiring the lock.
if recorder, ok := h.eventNsRecorder[namespace]; !ok {
broadcaster, rec := h.recorderFactory(namespace)
h.eventNsBroadcaster[namespace] = broadcaster
h.eventNsRecorder[namespace] = rec
rec.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message)
} else {
recorder.Eventf(&req.ObjectRef, v1.EventTypeNormal, req.Reason, req.Action, req.Message)
}
return &resp.Items[i], nil
h.mu.Unlock()
}

return nil, nil
}

func getEventKey(event *v1.Event) string {
return strings.Join([]string{
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
event.InvolvedObject.FieldPath,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.Action,
event.Message,
event.ReportingController,
}, "")
}
121 changes: 102 additions & 19 deletions actions/create_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package actions
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"

"github.com/castai/cluster-controller/castai"
)
Expand Down Expand Up @@ -85,33 +88,113 @@ func TestCreateEvent(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
clientSet := fake.NewSimpleClientset(test.object)
recorder := record.NewFakeRecorder(test.actionCount)
broadCaster := record.NewBroadcasterForTests(time.Second * 10)
h := createEventHandler{
log: logrus.New(),
clientset: clientSet,
clientSet: clientSet,
eventNsRecorder: map[string]record.EventRecorder{
"castai": recorder,
},
eventNsBroadcaster: map[string]record.EventBroadcaster{
"castai": broadCaster,
},
}

ctx := context.Background()
wg := sync.WaitGroup{}
wg.Add(test.actionCount)
for i := 0; i < test.actionCount; i++ {
err := h.Handle(ctx, test.action)
r.NoError(err)
go func() {
err := h.Handle(ctx, test.action)
r.NoError(err)
wg.Done()
}()
}
wg.Wait()
events := make([]string, 0, test.actionCount)
for i := 0; i < test.actionCount; i++ {
select {
case event := <-recorder.Events:
events = append(events, event)
default:
t.Errorf("not enough events expected %d actual %d", test.actionCount, i)
continue
}
}
for i := 0; i < test.actionCount; i++ {
r.Contains(events[i], v1.EventTypeNormal)
r.Contains(events[i], test.expectedEvent.Action)
r.Contains(events[i], test.expectedEvent.Reason)
r.Contains(events[i], test.expectedEvent.Message)
}
broadCaster.Shutdown()
})
}
}

eventTime := test.action.ActionCreateEvent.EventTime
testEventName := fmt.Sprintf("%v.%x", test.action.ActionCreateEvent.ObjectRef.Name, eventTime.Unix())

testEvent, err := clientSet.CoreV1().
Events(test.expectedEvent.Namespace).
Get(ctx, testEventName, metav1.GetOptions{})
func TestRandomNs(t *testing.T) {
t.Parallel()
r := require.New(t)
actionCount := 10
clientSet := fake.NewSimpleClientset(testPod(types.UID(uuid.New().String())))
recorders := make([]*record.FakeRecorder, 0, actionCount)
h := createEventHandler{
log: logrus.New(),
clientSet: clientSet,
recorderFactory: func(ns string) (record.EventBroadcaster, record.EventRecorder) {
broadcaster := record.NewBroadcasterForTests(time.Second * 10)
rec := record.NewFakeRecorder(actionCount)
recorders = append(recorders, rec)
return broadcaster, rec
},
eventNsRecorder: map[string]record.EventRecorder{},
eventNsBroadcaster: map[string]record.EventBroadcaster{},
}
ctx := context.Background()
wg := sync.WaitGroup{}
wg.Add(actionCount)
for i := 0; i < actionCount; i++ {
go func() {
err := h.Handle(ctx, &castai.ClusterAction{
ID: uuid.New().String(),
ActionCreateEvent: &castai.ActionCreateEvent{
ObjectRef: podObjReference(
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("testPod-%s", uuid.NewString()[:4]),
Namespace: uuid.NewString(),
},
}),
Reporter: "provisioning.cast.ai",
EventTime: time.Now(),
EventType: "Warning",
Reason: "Just because!",
Action: "During node creation.",
Message: "Oh common, you can do better.",
},
})
r.NoError(err)

r.Equal(test.expectedEvent.Type, testEvent.Type)
r.Equal(test.expectedEvent.Reason, testEvent.Reason)
r.Equal(test.expectedEvent.Action, testEvent.Action)
r.Equal(test.expectedEvent.Message, testEvent.Message)
r.Equal(test.expectedEvent.ReportingController, testEvent.ReportingController)
r.Equal(test.expectedEvent.ReportingInstance, testEvent.ReportingInstance)
r.EqualValues(test.actionCount, testEvent.Count)
})
wg.Done()
}()
}
wg.Wait()
events := make([]string, 0, actionCount)
for i := range recorders {
select {
case event := <-recorders[i].Events:
events = append(events, event)
default:
t.Errorf("not enough events expected %d actual %d", actionCount, i)
continue
}
}
for ns, broadCaster := range h.eventNsBroadcaster {
t.Logf("shutting down broadcaster for ns %s", ns)
broadCaster.Shutdown()
}
r.Len(events, actionCount)
for i := 0; i < actionCount; i++ {
r.Contains(events[i], v1.EventTypeNormal)
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand Down

0 comments on commit 0b42dac

Please sign in to comment.