Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): move comp logic to workflow params #10979

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 88 additions & 26 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
Expand Down Expand Up @@ -63,7 +64,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
if err != nil {
return nil, err
}
// fill root component default paramters to PipelineJob
// fill root component default parameters to PipelineJob
specParams := spec.GetRoot().GetInputDefinitions().GetParameters()
for name, param := range specParams {
_, ok := job.RuntimeConfig.ParameterValues[name]
Expand Down Expand Up @@ -108,6 +109,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
"pipelines.kubeflow.org/v2_component": "true",
},
},
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{},
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
},
Expand Down Expand Up @@ -180,69 +184,127 @@ func (c *workflowCompiler) templateName(componentName string) string {
return componentName
}

// WIP: store component spec, task spec and executor spec in annotations

const (
annotationComponents = "pipelines.kubeflow.org/components-"
annotationContainers = "pipelines.kubeflow.org/implementations-"
annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-"
argumentsComponents = "components-"
argumentsContainers = "implementations-"
argumentsKubernetesSpec = "kubernetes-"
)

func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
return c.saveProtoToAnnotation(annotationComponents+name, spec)
functionName := c.extractFunctionName(name)

return c.saveProtoToArguments(argumentsComponents+functionName, spec)
}

// useComponentSpec returns a placeholder we can refer to the component spec
// in argo workflow fields.
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
return c.annotationPlaceholder(annotationComponents + name)
functionName := c.extractFunctionName(name)

return c.argumentsPlaceholder(argumentsComponents + functionName)
}

func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
return c.saveProtoToAnnotation(annotationContainers+name, msg)
functionName := c.extractFunctionName(name)

return c.saveProtoToArguments(argumentsContainers+functionName, msg)
}

func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationContainers + name)
functionName := c.extractFunctionName(name)

return c.argumentsPlaceholder(argumentsContainers + functionName)
}

func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error {
return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec)
return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec)
}

func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationKubernetesSpec + name)
return c.argumentsPlaceholder(argumentsKubernetesSpec + name)
}
zazulam marked this conversation as resolved.
Show resolved Hide resolved

// TODO(Bobgy): sanitize component name
func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error {
// saveProtoToArguments saves a proto message to the workflow arguments. The
// message is serialized to JSON and stored in the workflow arguments and then
// referenced by the workflow templates using AWF templating syntax. The reason
// for storing it in the workflow arguments is because there is a 1-many
// relationship between components and tasks that reference them. The workflow
// arguments allow us to deduplicate the component logic (implementation & spec
// in IR), significantly reducing the size of the argo workflow manifest.
func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error {
if c == nil {
return fmt.Errorf("compiler is nil")
}
if c.wf.Annotations == nil {
c.wf.Annotations = make(map[string]string)
if c.wf.Spec.Arguments.Parameters == nil {
c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}}
}
if _, alreadyExists := c.wf.Annotations[name]; alreadyExists {
return fmt.Errorf("annotation %q already exists", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil {
return nil
zazulam marked this conversation as resolved.
Show resolved Hide resolved
}
json, err := stablyMarshalJSON(msg)
if err != nil {
return fmt.Errorf("saving component spec of %q to annotations: %w", name, err)
return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err)
}
// TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
c.wf.Annotations[name] = json
c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{
Name: componentName,
Value: wfapi.AnyStringPtr(json),
})
return nil
}

func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) {
// argumentsPlaceholder checks for the unique component name within the workflow
// arguments and returns a template tag that references the component in the
// workflow arguments.
func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) {
if c == nil {
return "", fmt.Errorf("compiler is nil")
}
if _, exists := c.wf.Annotations[name]; !exists {
return "", fmt.Errorf("using component spec: failed to find annotation %q", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil {
return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName)
}

return workflowParameter(componentName), nil
}

// extractFunctionName extracts the function name of a component by looking it
// up in the pipeline spec.
func (c *workflowCompiler) extractFunctionName(componentName string) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every component has a visible function name. Why not just use the original name as-is?

Copy link
Contributor

@droctothorpe droctothorpe Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, Chen!

So this PR tackles two separate problems:

  1. This PR moves the component logic from the metadata annotations (which have a 256KB limit) to the spec (which has a much higher, configurable limit defined by the etcd request size limit). This in it of itself solves a major regression in v2.

  2. In addition, this PR deduplicates component logic in the compiled AWF manifest. This duplication is a long-standing issue that was carried over from v1 and was reported as early as 2020. Since the IR compiler appends an incrementing index to duplicate tasks, and the IR -> AWF compiler doesn't properly account for this, the underlying component logic gets duplicated in full in the compiled AWF manifest for each instance. This is kind of like copying a function in full every time you want to invoke it. In a sense, we're kind of using the function name as a key in a set instead of adding each task's logic to a list. Since components can get quite large, with all this duplicate logic, it's easy to hit even the etcd requests size limit. Several of our internal model development teams have run into this exact problem and have been forced to partition large pipelines or minify compiled manifests (i.e. strip verbose annotations and comments), but minification only works in v1 where compilation happens client-side.

Not every component has a visible function name. Why not just use the original name as-is?

If the --function_to_execute is not specified in the executor, then extractFunctionName just returns the original (indexed) component name, i.e. "the original name as-is." So that's precisely what happens when the function name is not specified.

At a bare minimum, this PR circumvents the 256KB limit, but it also substantially reduces the size of the compiled manifest (when possible) thanks to deduplication (we saw a 70% file size reduction in our tests).

Apologies if I'm over-explaining anything!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the --function_to_execute is not specified in the executor, then extractFunctionName just returns the original (indexed) component name, i.e. "the original name as-is." So that's precisely what happens when the function name is not specified.

Exactly as this function checks multiple cases, but doesn't guarantee any improvement over the original, so It feels to me an over-engineering that doesn't help much. I would suggest we remove it and use the original name as-is for simplicity, if the goal it to guarantee a short length and/or unique name, then we should use some hashing method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the original name as is would guarantee redundancy in the compiled manifest. Hashing is an interesting approach! We could hash the IR.deploymentSpec.executors.<executor>.container.command. and use that as the key. One concern with that approach is that it significantly reduces the human readability of the AWF manifest. Also, could hashing hundreds of large components increase AWF compilation time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every component has a visible function name. Why not just use the original name as-is?

QQ: Under what conditions does a component not include --function_to_execute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hashing, I was thinking hash the entire ComponentSpec (and possibly the expanded ExecutorSpec when applicable). That would guarantee uniqueness. I don't think it would add noticable AWF compilation time, but admit I didn't do any experiment.

QQ: Under what conditions does a component not include --function_to_execute?

Exactly what I was planning to explain. :)
The issue with the current extraFunctionName implementation is that it only makes a difference for maybe 50%-60% of the time. IMO, it's not worth the added complexity and it may even hurts AWF readability--if we value that--one needs to look into this implementation to figure out where the name comes from.

I can list a number of examples where the current approach fails to improve over original name as-is:

  • condition, loop, and exit handler all produce DAG component with no executor. e.g.:
    comp-condition-13:
    dag:
    tasks:
    print-text-8:
    cachingOptions:
    enableCache: true
    componentRef:
    name: comp-print-text-8
    inputs:
    parameters:
    msg:
    runtimeValue:
    constant: '1'
    taskInfo:
    name: print-text-8
    inputDefinitions:
    parameters:
    pipelinechannel--flip-coin-op-Output:
    parameterType: STRING
    pipelinechannel--loop-item-param-11:
    parameterType: STRING
  • pipeline can be used as component, which is another DAG component without executor, e.g.:
    comp-inner-pipeline:
    dag:
    tasks:
    condition-1:
    componentRef:
    name: comp-condition-1
    dependentTasks:
    - print-op1
    inputs:
    parameters:
    pipelinechannel--print-op1-Output:
    taskOutputParameter:
    outputParameterKey: Output
    producerTask: print-op1
    taskInfo:
    name: condition-1
    triggerPolicy:
    condition: inputs.parameter_values['pipelinechannel--print-op1-Output']
    == 'Hello'
  • Custom container component doesn't have --function_to_execute, and may comes with very generic entry point like main, run, etc. And large enterprise may sometimes build a shared container with a single entrypoint that takes arguments to execute different component function. e.g:
    exec-container-io:
    container:
    args:
    - --output_path
    - '{{$.outputs.parameters[''output_path''].output_file}}'
    command:
    - my_program
    - '{{$.inputs.parameters[''text'']}}'
    image: python:3.7

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my suggestion for this PR is to simply remove the extractFunctionName part and use original component name as-is. I think the major concern on the size limit has been solved by moving the spec from annotation to argument. If further size reduction is needed, we can then look into a hashing solution that would work for 100%.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for the thorough response and additional context, @chensun. We really appreciate it.

We POCed the hash-based approach that you suggested before we saw your comment and pushed it up since the work was done anyway. It works like a charm!

We tested against the three edge cases that you posted. Hashing successfully deduplicates all of the corresponding component logic. It's true that duplicate conditionals, DAGs, and nested pipelines are not deduplicated per say, but we confirmed that their underlying shared components, which are what typically inflate the manifest file size beyond the 1.5 MB etcd limit, are in fact deduplicated.

For example, pipeline_in_pipeline_complex.yaml references print_op1 three times: once at the root of my_pipeline and twice in the graph_component sub_pipeline (since it's parallelized).

We confirmed that the compiled AWF manifest consolidates these various tasks under the shared key, implementations-83cf3e248270dfedd9adf4317961878cb8ebdf352ee8121750c35bfee5d408ae, and therefore only stores the logic for the corresponding function once.

We think the change may be small enough in scope and lines of code and the underlying problem significant enough to warrant keeping it in this PR. That being said, we don't mind tackling the hash approach in a discrete PR.

Copy link
Contributor

@droctothorpe droctothorpe Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the component_io.yaml example slightly to demonstrate deduplication (with hashing) for container components that don't list the function name:

from kfp import Client
from kfp import dsl


@dsl.container_component
def container_io(arg: str, output_path: dsl.OutputPath(str)):
    return dsl.ContainerSpec(
        image="python:3.7",
        command=["echo", arg],
        args=["--output_path", output_path],
    )


@dsl.pipeline
def pipeline():
    task1 = container_io(arg="foo")
    task2 = container_io(arg="foo")
    task3 = container_io(arg="bar")


client = Client()
client.create_run_from_pipeline_func(pipeline)

The resulting AWF manifest looks like this:

Spec:
  Arguments:
    Parameters:
      Name:    components-fc838d49ffb7211dd93e608a1671731bc81e22197bbdb2e423df0ac72d451bc9
      Value:   {"executorLabel":"exec-container-io","inputDefinitions":{"parameters":{"arg":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"output_path":{"parameterType":"STRING"}}}}
      Name:    implementations-fc838d49ffb7211dd93e608a1671731bc81e22197bbdb2e423df0ac72d451bc9
      Value:   {"args":["--output_path","{{$.outputs.parameters['output_path'].output_file}}"],"command":["echo","{{$.inputs.parameters['arg']}}"],"image":"python:3.7"}
      Name:    components-root
      Value:   {"dag":{"tasks":{"container-io":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-container-io"},"inputs":{"parameters":{"arg":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"container-io"}},"container-io-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-container-io-2"},"inputs":{"parameters":{"arg":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"container-io-2"}},"container-io-3":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-container-io-3"},"inputs":{"parameters":{"arg":{"runtimeValue":{"constant":"bar"}}}},"taskInfo":{"name":"container-io-3"}}}}}

The tasks that share logic share the same hash in the Spec.Arguments.Parameters so the component logic is not duplicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With respect to the pipelines_with_loops_and_conditions.yaml edge case you mentioned: it has 10 instances of the print_text component.

Thanks to the deduplication (through hashing), it stores just one copy of the component logic!

go run backend/src/v2/cmd/compiler/main.go -spec sdk/python/test_data/pipelines/pipeline_with_loops_and_conditions.yaml | grep 'print_text(msg' | wc -l
       1

print_text is a small component (though there's plenty of fluff around even one line components in the form of pip installs and what not), but it's not uncommon to parallelize massive components; that 10x increase in storage consumption makes reaching the etcd max-request-bytes threshold inevitable for our more complex models.

log.Debug("componentName: ", componentName)
// The root component is a DAG and therefore doesn't have a corresponding
// executor or function name. The final return statement in this function
// would cover this edge case, but this saves us some unecessary iteration.
if componentName == "root" {
return componentName
}
// Reference: https://argoproj.github.io/argo-workflows/variables/
return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil
executorLabel := c.spec.Components[componentName].GetExecutorLabel()
log.Debug("executorLabel: ", executorLabel)
// There are more nested conditionals here than we would prefer, but we
// don't want to make any assumptions about the presence of specific fields
// in the IR.
if c.executors != nil {
for executorName, executorValue := range c.executors {
log.Debug("executorName: ", executorName)
if executorName == executorLabel {
args := executorValue.GetContainer().GetArgs()
if args != nil {
if len(args) > 1 {
penultimateArg := args[len(args)-2]
if penultimateArg == "--function_to_execute" {
componentFunctionName := args[len(args)-1]
log.Debug("componentFunctionName: ", componentFunctionName)
return componentFunctionName
}
}
}
}
}
}

log.Debug("No corresponding executor for component: ", componentName)
// We could theoretically return an error here, but since the only
// consequence of not finding a matching executor is reduced deduplication,
// this doesn't result in application failure and we therefore continue.
return componentName
}

const (
Expand Down
6 changes: 4 additions & 2 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName)

if kubernetesConfigParam != nil {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
Expand Down
Loading
Loading