Skip to content

Commit

Permalink
fix(backend): Use an Argo Workflow exit lifecycle hook for exit handlers
Browse files Browse the repository at this point in the history
As described in #10917, exit handlers were implemented as dependent
tasks that always ran within an Argo Workflow. The issue is that this
caused the pipeline to have a succeeded status regardless of if the
tasks within the exit handlers all succeeded.

This commit changes exit handlers to be exit lifecycle hooks on an
Argo Workflow so that the overall pipeline status is not impacted.

Resolves:
#11405

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Jan 2, 2025
1 parent 2ebb853 commit a3ec631
Show file tree
Hide file tree
Showing 10 changed files with 782 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ jobs:
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output

- name: Basic sample tests - exit_handler
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output

- name: Collect test results
if: always()
Expand Down
5 changes: 5 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "../testdata/create_pod_metadata.json",
argoYAMLPath: "testdata/create_pod_metadata.yaml",
},
{
jobPath: "../testdata/exit_handler.json",
platformSpecPath: "",
argoYAMLPath: "testdata/exit_handler.yaml",
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down
27 changes: 26 additions & 1 deletion backend/src/v2/compiler/argocompiler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package argocompiler

import k8score "k8s.io/api/core/v1"
import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
k8score "k8s.io/api/core/v1"
)

// env vars in metadata-grpc-configmap is defined in component package
var metadataConfigIsOptional bool = true
Expand Down Expand Up @@ -42,3 +45,25 @@ var commonEnvs = []k8score.EnvVar{{
},
},
}}

// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty.
func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) {
if exitTemplate == "" {
return
}

var args wfapi.Arguments

if parentDagID != "" {
args = wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)},
}}
}

task.Hooks = wfapi.LifecycleHooks{
wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{
Template: exitTemplate,
Arguments: args,
},
}
}
11 changes: 10 additions & 1 deletion backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ type containerExecutorInputs struct {
cachedDecision string
// if false, the container will be skipped.
condition string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
}

// containerExecutorTask returns an argo workflows DAGTask.
Expand All @@ -192,7 +197,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(refName),
When: when,
Expand All @@ -203,6 +208,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
},
},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

// addContainerExecutorTemplate adds a generic container executor template for
Expand Down
141 changes: 98 additions & 43 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if err != nil {
return err
}
dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}
tasks := dagSpec.GetTasks()
// Iterate through tasks in deterministic order to facilitate testing.
// Note, order doesn't affect compiler with real effect right now.
Expand All @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
keys = append(keys, key)
}
sort.Strings(keys)

taskToExitTemplate := map[string]string{}

// First process exit tasks since those need to be set as lifecycle hooks on the the exit handler sub DAG.
for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil {
Expand All @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if kfpTask.GetArtifactIterator() != nil {
return fmt.Errorf("artifact iterator not implemented yet")
}

if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip tasks that aren't exit tasks.
continue
}

tasks, err := c.task(taskName, kfpTask, taskInputs{
parentDagID: inputParameter(paramParentDagID),
})
if err != nil {
return err
}

// Generate the template name in the format of "exit-hook-<DAG name>-<task name>"
// (e.g. exit-hook-root-print-op).
name := fmt.Sprintf("exit-hook-%s-%s", name, taskName)

deps := kfpTask.GetDependentTasks()

for _, dep := range deps {
taskToExitTemplate[dep] = name
}

exitDag := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{
Tasks: tasks,
},
}

_, err = c.addTemplate(exitDag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
}
}

dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}

for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip already processed exit tasks.
continue
}

exitTemplate := taskToExitTemplate[taskName]

tasks, err := c.task(
taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate},
)
if err != nil {
return err
}
dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...)
}

// The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned
// exit task.
if len(dag.DAG.Tasks) == 0 {
return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name)
}

_, err = c.addTemplate(dag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
Expand Down Expand Up @@ -116,26 +178,37 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
type dagInputs struct {
// placeholder for parent DAG execution ID
parentDagID string
condition string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
condition string
}

// dagTask generates task for a DAG component.
// name: task name
// componentName: DAG component name
func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask {
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.templateName(componentName),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(inputs.parentDagID)},
{Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)},
}},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

type taskInputs struct {
parentDagID string
iterationIndex string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
}

// parentDagID: placeholder for parent DAG execution ID
Expand Down Expand Up @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
return nil, err
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
dag := c.dagTask(name, componentName, dagInputs{
parentDagID: driverOutputs.executionID,
condition: driverOutputs.condition,
parentDagID: driverOutputs.executionID,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
condition: driverOutputs.condition,
})
dag.Depends = depends([]string{driverTaskName})
if task.GetTriggerPolicy().GetCondition() != "" {
Expand Down Expand Up @@ -215,23 +291,23 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
driverOutputs.condition = ""
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
// Handle exit handler dependency
if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends_exit_handler(task.GetDependentTasks())
}

// When using a dummy image, this means this task is for Kubernetes configs.
// In this case skip executor(launcher).
if dummyImages[e.Container.GetImage()] {
driver.Name = name
return []wfapi.DAGTask{*driver}, nil
}
executor := c.containerExecutorTask(name, containerExecutorInputs{
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
Expand Down Expand Up @@ -572,24 +648,3 @@ func depends(deps []string) string {
}
return builder.String()
}

// Exit handler task happens no matter the state of the upstream tasks
func depends_exit_handler(deps []string) string {
if len(deps) == 0 {
return ""
}
var builder strings.Builder
for index, dep := range deps {
if index > 0 {
builder.WriteString(" || ")
}
for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} {
if inner_index > 0 {
builder.WriteString(" || ")
}
builder.WriteString(dep)
builder.WriteString(task_status)
}
}
return builder.String()
}
Loading

0 comments on commit a3ec631

Please sign in to comment.