diff --git a/USERS.md b/USERS.md index 59653fc51c47..a622975b74ff 100644 --- a/USERS.md +++ b/USERS.md @@ -127,11 +127,12 @@ Currently, the following organizations are **officially** using Argo Workflows: 1. [Tulip](https://tulip.com/) 1. [UFirstGroup](https://www.ufirstgroup.com) 1. [Vispera](https://www.vispera.co) +1. [VMware](https://www.vmware.com/) 1. [Wavefront](https://www.wavefront.com/) 1. [Wellcome Trust](https://wellcome.ac.uk/) -1. [VMware](https://www.vmware.com/) 1. [WooliesX](https://wooliesx.com.au/) 1. [Woolworths Group](https://www.woolworthsgroup.com.au/) +1. [Workiva](https://www.workiva.com/) 1. [Zhihu](https://www.zhihu.com/) ### Projects Using Argo diff --git a/config/config.go b/config/config.go index 07102132680e..16b22639582f 100644 --- a/config/config.go +++ b/config/config.go @@ -22,7 +22,7 @@ type ResourceRateLimit struct { // Config contain the configuration settings for the workflow controller type Config struct { - // NodeEvents configures how node events are omitted + // NodeEvents configures how node events are emitted NodeEvents NodeEvents `json:"nodeEvents,omitempty"` // ExecutorImage is the image name of the executor to use when running pods diff --git a/config/node_events.go b/config/node_events.go index 51c76e263fd2..bda0a7ffef20 100644 --- a/config/node_events.go +++ b/config/node_events.go @@ -1,7 +1,8 @@ package config type NodeEvents struct { - Enabled *bool `json:"enabled,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + SendAsPod bool `json:"sendAsPod,omitempty"` } func (e NodeEvents) IsEnabled() bool { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 659d17c91da9..50cf9aa78377 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -2115,6 +2116,13 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, return node } +func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, error) { + if node.Type != wfv1.NodeTypePod { + return nil, fmt.Errorf("Expected node type %s, got %s", wfv1.NodeTypePod, node.Type) + } + return woc.controller.getPod(woc.wf.GetNamespace(), node.ID) +} + func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { message := fmt.Sprintf("%v node %s", node.Phase, node.Name) if node.Message != "" { @@ -2125,8 +2133,19 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { case wfv1.NodeSucceeded, wfv1.NodeRunning: eventType = apiv1.EventTypeNormal } + eventConfig := woc.controller.Config.NodeEvents + var involvedObject runtime.Object = woc.wf + if eventConfig.SendAsPod { + pod, err := woc.getPodByNode(node) + if err != nil { + woc.log.Infof("Error getting pod from workflow node: %s", err) + } + if pod != nil { + involvedObject = pod + } + } woc.eventRecorder.AnnotatedEventf( - woc.wf, + involvedObject, map[string]string{ common.AnnotationKeyNodeType: string(node.Type), common.AnnotationKeyNodeName: node.Name, diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index ea959853f1af..7083939d8fd9 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -3394,6 +3394,102 @@ spec: } } +func TestEventNodeEventsAsPod(t *testing.T) { + for manifest, want := range map[string][]string{ + // Invalid spec + ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: invalid-spec +spec: + entrypoint: 123 +`: { + "Warning WorkflowFailed invalid spec: template name '123' undefined", + }, + // DAG + ` +metadata: + name: dag-events +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node dag-events", + "Normal WorkflowNodeRunning Running node dag-events.a", + "Normal WorkflowNodeSucceeded Succeeded node dag-events.a", + "Normal WorkflowNodeSucceeded Succeeded node dag-events", + "Normal WorkflowSucceeded Workflow completed", + }, + // steps + ` +metadata: + name: steps-events +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node steps-events", + "Normal WorkflowNodeRunning Running node steps-events[0]", + "Normal WorkflowNodeRunning Running node steps-events[0].a", + "Normal WorkflowNodeSucceeded Succeeded node steps-events[0].a", + "Normal WorkflowNodeSucceeded Succeeded node steps-events[0]", + "Normal WorkflowNodeSucceeded Succeeded node steps-events", + "Normal WorkflowSucceeded Workflow completed", + }, + // no DAG or steps + ` +metadata: + name: no-dag-or-steps +spec: + entrypoint: whalesay + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +`: { + "Normal WorkflowRunning Workflow Running", + "Normal WorkflowNodeRunning Running node no-dag-or-steps", + "Normal WorkflowNodeSucceeded Succeeded node no-dag-or-steps", + "Normal WorkflowSucceeded Workflow completed", + }, + } { + wf := wfv1.MustUnmarshalWorkflow(manifest) + ctx := context.Background() + t.Run(wf.Name, func(t *testing.T) { + cancel, controller := newController(wf) + defer cancel() + controller.Config.NodeEvents = config.NodeEvents{SendAsPod: true} + woc := newWorkflowOperationCtx(wf, controller) + createRunningPods(ctx, woc) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.ElementsMatch(t, want, getEvents(controller, len(want))) + }) + } +} + func getEvents(controller *WorkflowController, num int) []string { c := controller.eventRecorderManager.(*testEventRecorderManager).eventRecorder.Events events := make([]string, num) @@ -3403,6 +3499,47 @@ func getEvents(controller *WorkflowController, num int) []string { return events } +func TestGetPodByNode(t *testing.T) { + workflowText := ` +metadata: + name: dag-events +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: a + template: whalesay + - name: whalesay + container: + image: docker/whalesay:latest +` + wf := wfv1.MustUnmarshalWorkflow(workflowText) + wf.Namespace = "argo" + ctx := context.Background() + cancel, controller := newController(wf) + defer cancel() + woc := newWorkflowOperationCtx(wf, controller) + createRunningPods(ctx, woc) + woc.operate(ctx) + // Parent dag node has no pod + parentNode := woc.wf.GetNodeByName("dag-events") + pod, err := woc.getPodByNode(parentNode) + assert.Nil(t, pod) + assert.Error(t, err, "Expected node type Pod, got DAG") + // Pod node should return a pod + podNode := woc.wf.GetNodeByName("dag-events.a") + pod, err = woc.getPodByNode(podNode) + assert.NotNil(t, pod) + assert.Nil(t, err) + // Invalid node should not return a pod + invalidNode := wfv1.NodeStatus{Type: wfv1.NodeTypePod, Name: "doesnt-exist"} + pod, err = woc.getPodByNode(&invalidNode) + assert.Nil(t, pod) + assert.Nil(t, err) +} + var pdbwf = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow