Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): move comp logic to workflow params #10979

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 98 additions & 26 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package argocompiler

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"strings"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
Expand Down Expand Up @@ -63,7 +67,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
if err != nil {
return nil, err
}
// fill root component default paramters to PipelineJob
// fill root component default parameters to PipelineJob
specParams := spec.GetRoot().GetInputDefinitions().GetParameters()
for name, param := range specParams {
_, ok := job.RuntimeConfig.ParameterValues[name]
Expand Down Expand Up @@ -108,6 +112,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
"pipelines.kubeflow.org/v2_component": "true",
},
},
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{},
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
},
Expand Down Expand Up @@ -180,69 +187,134 @@ func (c *workflowCompiler) templateName(componentName string) string {
return componentName
}

// WIP: store component spec, task spec and executor spec in annotations

const (
annotationComponents = "pipelines.kubeflow.org/components-"
annotationContainers = "pipelines.kubeflow.org/implementations-"
annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-"
argumentsComponents = "components-"
argumentsContainers = "implementations-"
argumentsKubernetesSpec = "kubernetes-"
)

func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
return c.saveProtoToAnnotation(annotationComponents+name, spec)
hashedComponent := c.hashComponentContainer(name)

return c.saveProtoToArguments(argumentsComponents+hashedComponent, spec)
}

// useComponentSpec returns a placeholder we can refer to the component spec
// in argo workflow fields.
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
return c.annotationPlaceholder(annotationComponents + name)
hashedComponent := c.hashComponentContainer(name)

return c.argumentsPlaceholder(argumentsComponents + hashedComponent)
}

func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
return c.saveProtoToAnnotation(annotationContainers+name, msg)
hashedComponent := c.hashComponentContainer(name)

return c.saveProtoToArguments(argumentsContainers+hashedComponent, msg)
}

func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationContainers + name)
hashedComponent := c.hashComponentContainer(name)

return c.argumentsPlaceholder(argumentsContainers + hashedComponent)
}

func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error {
return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec)
return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec)
}

func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationKubernetesSpec + name)
return c.argumentsPlaceholder(argumentsKubernetesSpec + name)
}
zazulam marked this conversation as resolved.
Show resolved Hide resolved

// TODO(Bobgy): sanitize component name
func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error {
// saveProtoToArguments saves a proto message to the workflow arguments. The
// message is serialized to JSON and stored in the workflow arguments and then
// referenced by the workflow templates using AWF templating syntax. The reason
// for storing it in the workflow arguments is because there is a 1-many
// relationship between components and tasks that reference them. The workflow
// arguments allow us to deduplicate the component logic (implementation & spec
// in IR), significantly reducing the size of the argo workflow manifest.
func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error {
if c == nil {
return fmt.Errorf("compiler is nil")
}
if c.wf.Annotations == nil {
c.wf.Annotations = make(map[string]string)
if c.wf.Spec.Arguments.Parameters == nil {
c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}}
}
if _, alreadyExists := c.wf.Annotations[name]; alreadyExists {
return fmt.Errorf("annotation %q already exists", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil {
return nil
zazulam marked this conversation as resolved.
Show resolved Hide resolved
}
json, err := stablyMarshalJSON(msg)
if err != nil {
return fmt.Errorf("saving component spec of %q to annotations: %w", name, err)
return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err)
}
// TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
c.wf.Annotations[name] = json
c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{
Name: componentName,
Value: wfapi.AnyStringPtr(json),
})
return nil
}

func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) {
// argumentsPlaceholder checks for the unique component name within the workflow
// arguments and returns a template tag that references the component in the
// workflow arguments.
func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) {
if c == nil {
return "", fmt.Errorf("compiler is nil")
}
if _, exists := c.wf.Annotations[name]; !exists {
return "", fmt.Errorf("using component spec: failed to find annotation %q", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil {
return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName)
}

return workflowParameter(componentName), nil
}

// hashComponentContainer serializes and hashes the container field of a given
// component.
func (c *workflowCompiler) hashComponentContainer(componentName string) string {
log.Debug("componentName: ", componentName)
// Return early for root component since it has no command and args.
if componentName == "root" {
return componentName
}
// Reference: https://argoproj.github.io/argo-workflows/variables/
return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil
if c.executors != nil { // Don't bother if there are no executors in the pipeline spec.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: There's opportunity to dedupe DAG component as well. We can hash the entire ComponentSpec (plus ExecutorSpec when available) minus their naming part: component name, executor label, etc.
We can leave it to future improvements, not a blocker for this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droctothorpe looks like we didn't go this far on this PR yes? would you be willing to open up a new github issue as a follow up, in case someone wants to pick this up?

// Look up the executorLabel for the component in question.
executorLabel := c.spec.Components[componentName].GetExecutorLabel()
log.Debug("executorLabel: ", executorLabel)
// Iterate through the list of executors.
for executorName, executorValue := range c.executors {
log.Debug("executorName: ", executorName)
// If one of them matches the executorLabel we extracted earlier...
if executorName == executorLabel {
// Get the corresponding container.
container := executorValue.GetContainer()
if container != nil {
containerHash, err := hashValue(container)
if err != nil {
// Do not bubble up since this is not a breaking error
// and we can just return the componentName in full.
log.Debug("Error hashing container: ", err)
}

return containerHash
}
}
}
}

return componentName
}

// hashValue serializes and hashes a provided value.
func hashValue(value interface{}) (string, error) {
bytes, err := json.Marshal(value)
if err != nil {
return "", err
}
h := sha256.New()
h.Write([]byte(bytes))

return hex.EncodeToString(h.Sum(nil)), nil
}

const (
Expand Down
6 changes: 4 additions & 2 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName)

if kubernetesConfigParam != nil {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
Expand Down
Loading
Loading