From cc701a1affdb4d29b4f48fdfb5dad719192597ec Mon Sep 17 00:00:00 2001 From: Joe McGovern Date: Mon, 26 Jul 2021 11:45:23 -0500 Subject: [PATCH] feat(controller): opt-in to sending pod node events as pod (#6377) * feat(controller): opt-in to sending pod node events as pod Because of how kubernetes aggregates events and how Argo sends both workflow and node events with the same involved object (the workflow), some workflow and node events can get merged into a single event, without specifying which nodes are merged into one. This makes it challenging to respond to phase changes for nodes that have a particular annotation because kubernetes ignores annotations when checking if an event is similar. Creating an opt-in feature to send node events for nodes of type "Pod" with the involved object set to the associated pod will force kubernetes to send an event for each pod instead of aggregated several events into a single event and thus make it possible build a system solely that responds to node phase changes using Argo Workflows and Argo Events. An alternative solution to this would be creating a `WorkflowNode` custom resource and sending events with the workflow node itself as the involved object, but that seemed like a bigger change than necessary. Signed-off-by: Joseph McGovern * feat(controller): Get pod by node more efficiently Rather than fetching and iterating over all pods, we can fetch the pod we are interested in by a key. Even better, we can use an existing function to do this instead of writing a new implementation. Signed-off-by: Joseph McGovern * feat(controller): Only send one event for pod nodes When the workflow controller is configured to send events with the `involvedObject` set to the pod for pod workflow nodes, it could either only send the event with the pod as the involved object, or it could send two events: one with the pod and the other as the workflow as the involved object. The design decision was made to only send a single event in order to reduce unnecessary event noise. Users that opt-in to this feature must be aware that pod nodes will no longer emit events with the Workflow as the involved object. Signed-off-by: Joseph McGovern --- USERS.md | 3 +- config/config.go | 2 +- config/node_events.go | 3 +- workflow/controller/operator.go | 21 +++- workflow/controller/operator_test.go | 137 +++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 4 deletions(-) diff --git a/USERS.md b/USERS.md index 59653fc51c47..a622975b74ff 100644 --- a/USERS.md +++ b/USERS.md @@ -127,11 +127,12 @@ Currently, the following organizations are **officially** using Argo Workflows: 1. [Tulip](https://tulip.com/) 1. [UFirstGroup](https://www.ufirstgroup.com) 1. [Vispera](https://www.vispera.co) +1. [VMware](https://www.vmware.com/) 1. [Wavefront](https://www.wavefront.com/) 1. [Wellcome Trust](https://wellcome.ac.uk/) -1. [VMware](https://www.vmware.com/) 1. [WooliesX](https://wooliesx.com.au/) 1. [Woolworths Group](https://www.woolworthsgroup.com.au/) +1. [Workiva](https://www.workiva.com/) 1. [Zhihu](https://www.zhihu.com/) ### Projects Using Argo diff --git a/config/config.go b/config/config.go index 07102132680e..16b22639582f 100644 --- a/config/config.go +++ b/config/config.go @@ -22,7 +22,7 @@ type ResourceRateLimit struct { // Config contain the configuration settings for the workflow controller type Config struct { - // NodeEvents configures how node events are omitted + // NodeEvents configures how node events are emitted NodeEvents NodeEvents `json:"nodeEvents,omitempty"` // ExecutorImage is the image name of the executor to use when running pods diff --git a/config/node_events.go b/config/node_events.go index 51c76e263fd2..bda0a7ffef20 100644 --- a/config/node_events.go +++ b/config/node_events.go @@ -1,7 +1,8 @@ package config type NodeEvents struct { - Enabled *bool `json:"enabled,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + SendAsPod bool `json:"sendAsPod,omitempty"` } func (e NodeEvents) IsEnabled() bool { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 659d17c91da9..50cf9aa78377 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -2115,6 +2116,13 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, return node } +func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, error) { + if node.Type != wfv1.NodeTypePod { + return nil, fmt.Errorf("Expected node type %s, got %s", wfv1.NodeTypePod, node.Type) + } + return woc.controller.getPod(woc.wf.GetNamespace(), node.ID) +} + func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { message := fmt.Sprintf("%v node %s", node.Phase, node.Name) if node.Message != "" { @@ -2125,8 +2133,19 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { case wfv1.NodeSucceeded, wfv1.NodeRunning: eventType = apiv1.EventTypeNormal } + eventConfig := woc.controller.Config.NodeEvents + var involvedObject runtime.Object = woc.wf + if eventConfig.SendAsPod { + pod, err := woc.getPodByNode(node) + if err != nil { + woc.log.Infof("Error getting pod from workflow node: %s", err) + } + if pod != nil { + involvedObject = pod + } + } woc.eventRecorder.AnnotatedEventf( - woc.wf, + involvedObject, map[string]string{ common.AnnotationKeyNodeType: string(node.Type), common.AnnotationKeyNodeName: node.Name, diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index ea959853f1af..7083939d8fd9 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -3394,6 +3394,102 @@ spec: } } +func TestEventNodeEventsAsPod(t *testing.T) { + for manifest, want := range map[string][]string{ + // Invalid spec + ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: invalid-spec +spec: + entrypoint: 123 +`: { + "Warning WorkflowFailed invalid spec: template name '123' undefined", + }, + // DAG + ` +metadata: + name: dag-events +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node dag-events", + "Normal WorkflowNodeRunning Running node dag-events.a", + "Normal WorkflowNodeSucceeded Succeeded node dag-events.a", + "Normal WorkflowNodeSucceeded Succeeded node dag-events", + "Normal WorkflowSucceeded Workflow completed", + }, + // steps + ` +metadata: + name: steps-events +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node steps-events", + "Normal WorkflowNodeRunning Running node steps-events[0]", + "Normal WorkflowNodeRunning Running node steps-events[0].a", + "Normal WorkflowNodeSucceeded Succeeded node steps-events[0].a", + "Normal WorkflowNodeSucceeded Succeeded node steps-events[0]", + "Normal WorkflowNodeSucceeded Succeeded node steps-events", + "Normal WorkflowSucceeded Workflow completed", + }, + // no DAG or steps + ` +metadata: + name: no-dag-or-steps +spec: + entrypoint: whalesay + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node no-dag-or-steps", + "Normal WorkflowNodeSucceeded Succeeded node no-dag-or-steps", + "Normal WorkflowSucceeded Workflow completed", + }, + } { + wf := wfv1.MustUnmarshalWorkflow(manifest) + ctx := context.Background() + t.Run(wf.Name, func(t *testing.T) { + cancel, controller := newController(wf) + defer cancel() + controller.Config.NodeEvents = config.NodeEvents{SendAsPod: true} + woc := newWorkflowOperationCtx(wf, controller) + createRunningPods(ctx, woc) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.ElementsMatch(t, want, getEvents(controller, len(want))) + }) + } +} + func getEvents(controller *WorkflowController, num int) []string { c := controller.eventRecorderManager.(*testEventRecorderManager).eventRecorder.Events events := make([]string, num) @@ -3403,6 +3499,47 @@ func getEvents(controller *WorkflowController, num int) []string { return events } +func TestGetPodByNode(t *testing.T) { + workflowText := ` +metadata: + name: dag-events +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +` + wf := wfv1.MustUnmarshalWorkflow(workflowText) + wf.Namespace = "argo" + ctx := context.Background() + cancel, controller := newController(wf) + defer cancel() + woc := newWorkflowOperationCtx(wf, controller) + createRunningPods(ctx, woc) + woc.operate(ctx) + // Parent dag node has no pod + parentNode := woc.wf.GetNodeByName("dag-events") + pod, err := woc.getPodByNode(parentNode) + assert.Nil(t, pod) + assert.Error(t, err, "Expected node type Pod, got DAG") + // Pod node should return a pod + podNode := woc.wf.GetNodeByName("dag-events.a") + pod, err = woc.getPodByNode(podNode) + assert.NotNil(t, pod) + assert.Nil(t, err) + // Invalid node should not return a pod + invalidNode := wfv1.NodeStatus{Type: wfv1.NodeTypePod, Name: "doesnt-exist"} + pod, err = woc.getPodByNode(&invalidNode) + assert.Nil(t, pod) + assert.Nil(t, err) +} + var pdbwf = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow