Skip to content

Commit

Permalink
feat(backend): move comp logic to workflow params
Browse files Browse the repository at this point in the history
Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
  • Loading branch information
5 people committed Jul 3, 2024
1 parent 2fb4922 commit 3d348d8
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 136 deletions.
83 changes: 57 additions & 26 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package argocompiler

import (
"fmt"
"strconv"
"strings"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -63,7 +64,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
if err != nil {
return nil, err
}
// fill root component default paramters to PipelineJob
// fill root component default parameters to PipelineJob
specParams := spec.GetRoot().GetInputDefinitions().GetParameters()
for name, param := range specParams {
_, ok := job.RuntimeConfig.ParameterValues[name]
Expand Down Expand Up @@ -108,6 +109,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
"pipelines.kubeflow.org/v2_component": "true",
},
},
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{},
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
},
Expand Down Expand Up @@ -180,69 +184,96 @@ func (c *workflowCompiler) templateName(componentName string) string {
return componentName
}

// WIP: store component spec, task spec and executor spec in annotations

const (
annotationComponents = "pipelines.kubeflow.org/components-"
annotationContainers = "pipelines.kubeflow.org/implementations-"
annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-"
argumentsComponents = "components-"
argumentsContainers = "implementations-"
argumentsKubernetesSpec = "kubernetes-"
)

func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
return c.saveProtoToAnnotation(annotationComponents+name, spec)
baseComponentName := ExtractBaseComponentName(argumentsComponents + name)
return c.saveProtoToArguments(baseComponentName, spec)
}

// useComponentSpec returns a placeholder we can refer to the component spec
// in argo workflow fields.
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
return c.annotationPlaceholder(annotationComponents + name)
baseComponentName := ExtractBaseComponentName(argumentsComponents + name)
return c.argumentsPlaceholder(baseComponentName)
}

func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
return c.saveProtoToAnnotation(annotationContainers+name, msg)
baseComponentName := ExtractBaseComponentName(argumentsContainers + name)
return c.saveProtoToArguments(baseComponentName, msg)
}

func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationContainers + name)
baseComponentName := ExtractBaseComponentName(argumentsContainers + name)
return c.argumentsPlaceholder(baseComponentName)
}

func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error {
return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec)
return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec)
}

func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationKubernetesSpec + name)
return c.argumentsPlaceholder(argumentsKubernetesSpec + name)
}

// TODO(Bobgy): sanitize component name
func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error {
// saveProtoToArguments saves a proto message to the workflow arguments. The
// message is serialized to JSON and stored in the workflow arguments and then
// referenced by the workflow templates using AWF templating syntax. The reason
// for storing it in the workflow arguments is because there is a 1-many
// relationship between components and tasks that reference them. The workflow
// arguments allow us to deduplicate the component logic (implementation & spec
// in IR), significantly reducing the size of the argo workflow manifest.
func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error {
if c == nil {
return fmt.Errorf("compiler is nil")
}
if c.wf.Annotations == nil {
c.wf.Annotations = make(map[string]string)
if c.wf.Spec.Arguments.Parameters == nil {
c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}}
}
if _, alreadyExists := c.wf.Annotations[name]; alreadyExists {
return fmt.Errorf("annotation %q already exists", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil {
return nil
}
json, err := stablyMarshalJSON(msg)
if err != nil {
return fmt.Errorf("saving component spec of %q to annotations: %w", name, err)
return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err)
}
// TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
c.wf.Annotations[name] = json
c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{
Name: componentName,
Value: wfapi.AnyStringPtr(json),
})
return nil
}

func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) {
// argumentsPlaceholder checks for the unique component name within the workflow
// arguments and returns a template tag that references the component in the
// workflow arguments.
func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) {
if c == nil {
return "", fmt.Errorf("compiler is nil")
}
if _, exists := c.wf.Annotations[name]; !exists {
return "", fmt.Errorf("using component spec: failed to find annotation %q", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil {
return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName)
}
// Reference: https://argoproj.github.io/argo-workflows/variables/
return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil

return workflowParameter(componentName), nil
}

// extractBaseComponentName removes the iteration suffix that the IR compiler
// adds to the component name.
func ExtractBaseComponentName(componentName string) string {
baseComponentName := componentName
componentNameArray := strings.Split(componentName, "-")

if _, err := strconv.Atoi(componentNameArray[len(componentNameArray)-1]); err == nil {
baseComponentName = strings.Join(componentNameArray[:len(componentNameArray)-1], "-")

}

return baseComponentName
}

const (
Expand Down
27 changes: 27 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,30 @@ func load(t *testing.T, path string, platformSpecPath string) (*pipelinespec.Pip
}
return job, nil
}

func Test_extractBaseComponentName(t *testing.T) {
tests := []struct {
name string
componentName string
expectedBaseName string
}{
{
name: "With dash and int",
componentName: "component-2",
expectedBaseName: "component",
},
{
name: "Without dash and int",
componentName: "component",
expectedBaseName: "component",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := argocompiler.ExtractBaseComponentName(tt.componentName)
if result != tt.expectedBaseName {
t.Errorf("Expected: %s, Got: %s", tt.expectedBaseName, result)
}
})
}
}
6 changes: 4 additions & 2 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName)

if kubernetesConfigParam != nil {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
Expand Down
Loading

0 comments on commit 3d348d8

Please sign in to comment.