Skip to content

Commit

Permalink
feat: handle create event action (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
r0kas authored Nov 23, 2021
1 parent a921277 commit 57ce9fb
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 11 deletions.
7 changes: 4 additions & 3 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func NewService(
cfg: cfg,
castaiClient: castaiClient,
actionHandlers: map[reflect.Type]ActionHandler{
reflect.TypeOf(&castai.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionDrainNode{}): newDrainNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionDrainNode{}): newDrainNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
},
}
}
Expand Down
140 changes: 140 additions & 0 deletions actions/create_event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package actions

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
)

func newCreateEventHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
return &createEventHandler{
log: log,
clientset: clientset,
}
}

type createEventHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
}

func (h *createEventHandler) Handle(ctx context.Context, data interface{}) error {
req, ok := data.(*castai.ActionCreateEvent)
if !ok {
return fmt.Errorf("unexpected type %T for create event handler", data)
}

namespace := req.ObjectRef.Namespace
if namespace == "" {
namespace = v1.NamespaceDefault
}

event := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", req.ObjectRef.Name, req.EventTime.Unix()),
Namespace: namespace,
},
EventTime: metav1.NewMicroTime(req.EventTime),
ReportingController: req.Reporter,
ReportingInstance: req.Reporter,
InvolvedObject: req.ObjectRef,
Type: req.EventType,
Reason: req.Reason,
Action: req.Action,
Message: req.Message,
FirstTimestamp: metav1.NewTime(req.EventTime),
LastTimestamp: metav1.NewTime(req.EventTime),
Count: 1,
}

similarEvent, err := h.searchSimilarEvent(&req.ObjectRef, event)
if err != nil {
return fmt.Errorf("searching for similar event for ref %v: %w", req.ObjectRef, err)
}

if similarEvent != nil {
event.Name = similarEvent.Name
event.ResourceVersion = similarEvent.ResourceVersion
event.FirstTimestamp = similarEvent.FirstTimestamp
event.Count = similarEvent.Count + 1

newData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshaling new event for ref %v: %w", req.ObjectRef, err)
}

oldData, err := json.Marshal(similarEvent)
if err != nil {
return fmt.Errorf("marshaling old event for ref %v: %w", req.ObjectRef, err)
}

patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
if err != nil {
return fmt.Errorf("creating event merge patch for ref %v: %w", req.ObjectRef, err)
}

_, err = h.clientset.CoreV1().
Events(event.Namespace).
Patch(ctx, similarEvent.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("patching event for ref %v: %w", req.ObjectRef, err)
}
// Patching might fail if the event was removed while we were constructing the request body, so just
// recreate the event.
if err == nil {
return nil
}
}

if _, err := h.clientset.CoreV1().Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating event for ref %v: %w", req.ObjectRef, err)
}

return nil
}

func (h *createEventHandler) searchSimilarEvent(ref *v1.ObjectReference, event *v1.Event) (*v1.Event, error) {
// Scheme is not needed when searching for reference. Scheme is needed only for raw runtime objects.
resp, err := h.clientset.CoreV1().Events(event.Namespace).Search(nil, ref)
if err != nil {
return nil, fmt.Errorf("searching events for ref %+v: %w", ref, err)
}

key := getEventKey(event)

for i := range resp.Items {
if getEventKey(&resp.Items[i]) != key {
continue
}
return &resp.Items[i], nil
}

return nil, nil
}

func getEventKey(event *v1.Event) string {
return strings.Join([]string{
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
event.InvolvedObject.FieldPath,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.Action,
event.Message,
event.ReportingController,
}, "")
}
135 changes: 135 additions & 0 deletions actions/create_event_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package actions

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"

"github.com/castai/cluster-controller/castai"
)

func TestCreateEvent(t *testing.T) {
r := require.New(t)
id := types.UID(uuid.New().String())

tests := []struct {
name string
action *castai.ActionCreateEvent
actionCount int
object runtime.Object
expectedEvent *corev1.Event
}{
{
name: "create single pod event",
action: &castai.ActionCreateEvent{
Reporter: "autoscaler.cast.ai",
ObjectRef: podObjReference(testPod(id)),
EventTime: time.Now(),
EventType: "Normal",
Reason: "Just because!",
Action: "During node creation.",
Message: "Oh common, you can do better.",
},
actionCount: 1,
object: testPod(id),
expectedEvent: &corev1.Event{
ObjectMeta: metav1.ObjectMeta{Namespace: "castai"},
Type: "Normal",
Reason: "Just because!",
Action: "During node creation.",
Message: "Oh common, you can do better.",
ReportingController: "autoscaler.cast.ai",
ReportingInstance: "autoscaler.cast.ai",
},
},
{
name: "create several pod events",
action: &castai.ActionCreateEvent{
Reporter: "provisioning.cast.ai",
ObjectRef: podObjReference(testPod(id)),
EventTime: time.Now(),
EventType: "Warning",
Reason: "Just because!",
Action: "During node creation.",
Message: "Oh common, you can do better.",
},
actionCount: 6,
object: testPod(id),
expectedEvent: &corev1.Event{
ObjectMeta: metav1.ObjectMeta{Namespace: "castai"},
Type: "Warning",
Reason: "Just because!",
Action: "During node creation.",
Message: "Oh common, you can do better.",
ReportingController: "provisioning.cast.ai",
ReportingInstance: "provisioning.cast.ai",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
clientSet := fake.NewSimpleClientset(test.object)
h := createEventHandler{
log: logrus.New(),
clientset: clientSet,
}

ctx := context.Background()
for i := 0; i < test.actionCount; i++ {
err := h.Handle(ctx, test.action)
r.NoError(err)
}

eventTime := test.action.EventTime
testEventName := fmt.Sprintf("%v.%x", test.action.ObjectRef.Name, eventTime.Unix())

testEvent, err := clientSet.CoreV1().
Events(test.expectedEvent.Namespace).
Get(ctx, testEventName, metav1.GetOptions{})
r.NoError(err)

r.Equal(test.expectedEvent.Type, testEvent.Type)
r.Equal(test.expectedEvent.Reason, testEvent.Reason)
r.Equal(test.expectedEvent.Action, testEvent.Action)
r.Equal(test.expectedEvent.Message, testEvent.Message)
r.Equal(test.expectedEvent.ReportingController, testEvent.ReportingController)
r.Equal(test.expectedEvent.ReportingInstance, testEvent.ReportingInstance)
r.EqualValues(test.actionCount, testEvent.Count)
})
}
}

func testPod(id types.UID) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "testPod",
UID: id,
Namespace: "castai",
},
}
}

func podObjReference(p *corev1.Pod) corev1.ObjectReference {
return corev1.ObjectReference{
Kind: p.Kind,
Namespace: p.Namespace,
Name: p.Name,
UID: p.UID,
APIVersion: p.APIVersion,
ResourceVersion: p.ResourceVersion,
}
}
29 changes: 22 additions & 7 deletions castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

type GetClusterActionsResponse struct {
Expand All @@ -15,13 +16,14 @@ type AckClusterActionRequest struct {
}

type ClusterAction struct {
ID string `json:"id"`
ActionDeleteNode *ActionDeleteNode `json:"actionDeleteNode,omitempty"`
ActionDrainNode *ActionDrainNode `json:"actionDrainNode,omitempty"`
ActionPatchNode *ActionPatchNode `json:"actionPatchNode,omitempty"`
CreatedAt time.Time `json:"createdAt"`
DoneAt *time.Time `json:"doneAt,omitempty"`
Error *string `json:"error,omitempty"`
ID string `json:"id"`
ActionDeleteNode *ActionDeleteNode `json:"actionDeleteNode,omitempty"`
ActionDrainNode *ActionDrainNode `json:"actionDrainNode,omitempty"`
ActionPatchNode *ActionPatchNode `json:"actionPatchNode,omitempty"`
ActionCreateEvent *ActionCreateEvent `json:"actionCreateEvent,omitempty"`
CreatedAt time.Time `json:"createdAt"`
DoneAt *time.Time `json:"doneAt,omitempty"`
Error *string `json:"error,omitempty"`
}

func (c *ClusterAction) Data() interface{} {
Expand All @@ -34,6 +36,9 @@ func (c *ClusterAction) Data() interface{} {
if c.ActionPatchNode != nil {
return c.ActionPatchNode
}
if c.ActionCreateEvent != nil {
return c.ActionCreateEvent
}
return nil
}

Expand Down Expand Up @@ -65,3 +70,13 @@ type NodeTaint struct {
Key string `json:"key"`
Value string `json:"value"`
}

type ActionCreateEvent struct {
Reporter string `json:"reportingComponent"`
ObjectRef v1.ObjectReference `json:"objectReference"`
EventTime time.Time `json:"eventTime"`
EventType string `json:"eventType"`
Reason string `json:"reason"`
Action string `json:"action"`
Message string `json:"message"`
}
2 changes: 1 addition & 1 deletion log/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"))
}

func TestLogExporter(t *testing.T) {
Expand Down

0 comments on commit 57ce9fb

Please sign in to comment.