Skip to content

Commit

Permalink
feat(controller): opt-in to sending pod node events as pod (argoproj#…
Browse files Browse the repository at this point in the history
…6377)

* feat(controller): opt-in to sending pod node events as pod

Because of how kubernetes aggregates events and how Argo sends both
workflow and node events with the same involved object (the workflow),
some workflow and node events can get merged into a single event, without
specifying which nodes are merged into one. This makes it challenging to
respond to phase changes for nodes that have a particular annotation
because kubernetes ignores annotations when checking if an event is
similar.

Creating an opt-in feature to send node events for nodes of type "Pod"
with the involved object set to the associated pod will force kubernetes
to send an event for each pod instead of aggregated several events into
a single event and thus make it possible build a system solely that
responds to node phase changes using Argo Workflows and Argo Events.

An alternative solution to this would be creating a `WorkflowNode`
custom resource and sending events with the workflow node itself as the
involved object, but that seemed like a bigger change than necessary.

Signed-off-by: Joseph McGovern <[email protected]>

* feat(controller): Get pod by node more efficiently

Rather than fetching and iterating over all pods, we can fetch the pod
we are interested in by a key. Even better, we can use an existing
function to do this instead of writing a new implementation.

Signed-off-by: Joseph McGovern <[email protected]>

* feat(controller): Only send one event for pod nodes

When the workflow controller is configured to send events with the
`involvedObject` set to the pod for pod workflow nodes, it could either
only send the event with the pod as the involved object, or it could
send two events: one with the pod and the other as the workflow as the
involved object.

The design decision was made to only send a single event in order to
reduce unnecessary event noise. Users that opt-in to this feature must
be aware that pod nodes will no longer emit events with the Workflow as
the involved object.

Signed-off-by: Joseph McGovern <[email protected]>
  • Loading branch information
josephmcgovern-wf authored Jul 26, 2021
1 parent 959ce6b commit cc701a1
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 4 deletions.
3 changes: 2 additions & 1 deletion USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ Currently, the following organizations are **officially** using Argo Workflows:
1. [Tulip](https://tulip.com/)
1. [UFirstGroup](https://www.ufirstgroup.com)
1. [Vispera](https://www.vispera.co)
1. [VMware](https://www.vmware.com/)
1. [Wavefront](https://www.wavefront.com/)
1. [Wellcome Trust](https://wellcome.ac.uk/)
1. [VMware](https://www.vmware.com/)
1. [WooliesX](https://wooliesx.com.au/)
1. [Woolworths Group](https://www.woolworthsgroup.com.au/)
1. [Workiva](https://www.workiva.com/)
1. [Zhihu](https://www.zhihu.com/)

### Projects Using Argo
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ResourceRateLimit struct {
// Config contain the configuration settings for the workflow controller
type Config struct {

// NodeEvents configures how node events are omitted
// NodeEvents configures how node events are emitted
NodeEvents NodeEvents `json:"nodeEvents,omitempty"`

// ExecutorImage is the image name of the executor to use when running pods
Expand Down
3 changes: 2 additions & 1 deletion config/node_events.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package config

type NodeEvents struct {
Enabled *bool `json:"enabled,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
SendAsPod bool `json:"sendAsPod,omitempty"`
}

func (e NodeEvents) IsEnabled() bool {
Expand Down
21 changes: 20 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -2115,6 +2116,13 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
return node
}

func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, error) {
if node.Type != wfv1.NodeTypePod {
return nil, fmt.Errorf("Expected node type %s, got %s", wfv1.NodeTypePod, node.Type)
}
return woc.controller.getPod(woc.wf.GetNamespace(), node.ID)
}

func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
message := fmt.Sprintf("%v node %s", node.Phase, node.Name)
if node.Message != "" {
Expand All @@ -2125,8 +2133,19 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
case wfv1.NodeSucceeded, wfv1.NodeRunning:
eventType = apiv1.EventTypeNormal
}
eventConfig := woc.controller.Config.NodeEvents
var involvedObject runtime.Object = woc.wf
if eventConfig.SendAsPod {
pod, err := woc.getPodByNode(node)
if err != nil {
woc.log.Infof("Error getting pod from workflow node: %s", err)
}
if pod != nil {
involvedObject = pod
}
}
woc.eventRecorder.AnnotatedEventf(
woc.wf,
involvedObject,
map[string]string{
common.AnnotationKeyNodeType: string(node.Type),
common.AnnotationKeyNodeName: node.Name,
Expand Down
137 changes: 137 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3394,6 +3394,102 @@ spec:
}
}

func TestEventNodeEventsAsPod(t *testing.T) {
for manifest, want := range map[string][]string{
// Invalid spec
`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: invalid-spec
spec:
entrypoint: 123
`: {
"Warning WorkflowFailed invalid spec: template name '123' undefined",
},
// DAG
`
metadata:
name: dag-events
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: a
template: whalesay
- name: whalesay
container:
image: docker/whalesay:latest
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node dag-events",
"Normal WorkflowNodeRunning Running node dag-events.a",
"Normal WorkflowNodeSucceeded Succeeded node dag-events.a",
"Normal WorkflowNodeSucceeded Succeeded node dag-events",
"Normal WorkflowSucceeded Workflow completed",
},
// steps
`
metadata:
name: steps-events
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: a
template: whalesay
- name: whalesay
container:
image: docker/whalesay:latest
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node steps-events",
"Normal WorkflowNodeRunning Running node steps-events[0]",
"Normal WorkflowNodeRunning Running node steps-events[0].a",
"Normal WorkflowNodeSucceeded Succeeded node steps-events[0].a",
"Normal WorkflowNodeSucceeded Succeeded node steps-events[0]",
"Normal WorkflowNodeSucceeded Succeeded node steps-events",
"Normal WorkflowSucceeded Workflow completed",
},
// no DAG or steps
`
metadata:
name: no-dag-or-steps
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node no-dag-or-steps",
"Normal WorkflowNodeSucceeded Succeeded node no-dag-or-steps",
"Normal WorkflowSucceeded Workflow completed",
},
} {
wf := wfv1.MustUnmarshalWorkflow(manifest)
ctx := context.Background()
t.Run(wf.Name, func(t *testing.T) {
cancel, controller := newController(wf)
defer cancel()
controller.Config.NodeEvents = config.NodeEvents{SendAsPod: true}
woc := newWorkflowOperationCtx(wf, controller)
createRunningPods(ctx, woc)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.ElementsMatch(t, want, getEvents(controller, len(want)))
})
}
}

func getEvents(controller *WorkflowController, num int) []string {
c := controller.eventRecorderManager.(*testEventRecorderManager).eventRecorder.Events
events := make([]string, num)
Expand All @@ -3403,6 +3499,47 @@ func getEvents(controller *WorkflowController, num int) []string {
return events
}

func TestGetPodByNode(t *testing.T) {
workflowText := `
metadata:
name: dag-events
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: a
template: whalesay
- name: whalesay
container:
image: docker/whalesay:latest
`
wf := wfv1.MustUnmarshalWorkflow(workflowText)
wf.Namespace = "argo"
ctx := context.Background()
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
createRunningPods(ctx, woc)
woc.operate(ctx)
// Parent dag node has no pod
parentNode := woc.wf.GetNodeByName("dag-events")
pod, err := woc.getPodByNode(parentNode)
assert.Nil(t, pod)
assert.Error(t, err, "Expected node type Pod, got DAG")
// Pod node should return a pod
podNode := woc.wf.GetNodeByName("dag-events.a")
pod, err = woc.getPodByNode(podNode)
assert.NotNil(t, pod)
assert.Nil(t, err)
// Invalid node should not return a pod
invalidNode := wfv1.NodeStatus{Type: wfv1.NodeTypePod, Name: "doesnt-exist"}
pod, err = woc.getPodByNode(&invalidNode)
assert.Nil(t, pod)
assert.Nil(t, err)
}

var pdbwf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down

0 comments on commit cc701a1

Please sign in to comment.