Skip to content

Commit

Permalink
feat(sdk/backend): Add support for placeholders in resource limits
Browse files Browse the repository at this point in the history
The API introduced new fields prefixed with Resource (e.g.
ResourceCpuLimit) to replace the old fields without the prefix. The
Driver hadn't been updated to honor those fields but the SDK started
using them which led to unexpected behavior.

The Driver now honors both fields but prioritizes the new fields. The
SDK now only sets the new fields.

The outcome is that resource limits/requests can now use input
parameters.

Note that pipeline_spec_builder.py was doing some validation on the
limits/requests being set, but that's already handled in the user facing
method (e.g. set_cpu_limit).

Resolves:
#11500

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Jan 9, 2025
1 parent e89d2d5 commit d282374
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 170 deletions.
131 changes: 100 additions & 31 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,35 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl
return execution, nil
}

// getPodResource will accept the new field that accepts placeholders (e.g. resourceMemoryLimit) and the old float64
// field (e.g. memoryLimit) and return the resolved value as a Quantity. If the returned Quantity is nil, it was not set
// by the user. If the new field is set, the old field is ignored.
func getPodResource(
new string, old float64, executorInput *pipelinespec.ExecutorInput, oldFmtStr string,
) (*k8sres.Quantity, error) {
var resolved string

if new != "" {
var err error

resolved, err = resolvePodSpecInputRuntimeParameter(new, executorInput)
if err != nil {
return nil, fmt.Errorf("failed to resolve executor input when retrieving pod resource: %w", err)
}
} else if old != 0 {
resolved = fmt.Sprintf(oldFmtStr, old)
} else {
return nil, nil
}

q, err := k8sres.ParseQuantity(resolved)
if err != nil {
return nil, err
}

return &q, nil
}

// initPodSpecPatch generates a strategic merge patch for pod spec, it is merged
// to container base template generated in compiler/container.go. Therefore, only
// dynamic values are patched here. The volume mounts / configmap mounts are
Expand Down Expand Up @@ -414,46 +443,86 @@ func initPodSpecPatch(
Limits: map[k8score.ResourceName]k8sres.Quantity{},
Requests: map[k8score.ResourceName]k8sres.Quantity{},
}
memoryLimit := container.GetResources().GetMemoryLimit()
if memoryLimit != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryLimit))
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
res.Limits[k8score.ResourceMemory] = q

memoryLimit, err := getPodResource(
container.GetResources().GetResourceMemoryLimit(),
container.GetResources().GetMemoryLimit(),
executorInput,
"%vG",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
memoryRequest := container.GetResources().GetMemoryRequest()
if memoryRequest != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryRequest))
if err != nil {
return nil, err
}
res.Requests[k8score.ResourceMemory] = q
if memoryLimit != nil {
res.Limits[k8score.ResourceMemory] = *memoryLimit
}
cpuLimit := container.GetResources().GetCpuLimit()
if cpuLimit != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuLimit))
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
res.Limits[k8score.ResourceCPU] = q

memoryRequest, err := getPodResource(
container.GetResources().GetResourceMemoryRequest(),
container.GetResources().GetMemoryRequest(),
executorInput,
"%vG",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
cpuRequest := container.GetResources().GetCpuRequest()
if cpuRequest != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuRequest))
if err != nil {
return nil, err
}
res.Requests[k8score.ResourceCPU] = q
if memoryRequest != nil {
res.Requests[k8score.ResourceMemory] = *memoryRequest
}

cpuLimit, err := getPodResource(
container.GetResources().GetResourceCpuLimit(),
container.GetResources().GetCpuLimit(),
executorInput,
"%v",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
if cpuLimit != nil {
res.Limits[k8score.ResourceCPU] = *cpuLimit
}

cpuRequest, err := getPodResource(
container.GetResources().GetResourceCpuRequest(),
container.GetResources().GetCpuRequest(),
executorInput,
"%v",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
if cpuRequest != nil {
res.Requests[k8score.ResourceCPU] = *cpuRequest
}

accelerator := container.GetResources().GetAccelerator()
if accelerator != nil {
if accelerator.GetType() != "" && accelerator.GetCount() > 0 {
acceleratorType, err := resolvePodSpecInputRuntimeParameter(accelerator.GetType(), executorInput)
var acceleratorType string
if accelerator.GetResourceType() != "" {
acceleratorType, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceType(), executorInput)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", accelerator.GetCount()))
} else if accelerator.GetType() != "" {
acceleratorType = accelerator.GetType()
}

var acceleratorCount string

if accelerator.GetResourceCount() != "" {
var err error

acceleratorCount, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceCount(), executorInput)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
} else if accelerator.Count > 0 {
acceleratorCount = fmt.Sprintf("%v", accelerator.GetCount())
}

if acceleratorType != "" && acceleratorCount != "" {
q, err := k8sres.ParseQuantity(acceleratorCount)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
Expand Down
132 changes: 132 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"testing"

"google.golang.org/protobuf/types/known/structpb"
k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -259,6 +260,137 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) {
}
}

func Test_initPodSpecPatch_resource_placeholders(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{
ResourceCpuRequest: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}",
ResourceCpuLimit: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}",
ResourceMemoryRequest: "{{$.inputs.parameters['pipelinechannel--memory_request']}}",
ResourceMemoryLimit: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}",
Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{
ResourceType: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}",
ResourceCount: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}",
},
},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
ParameterValues: map[string]*structpb.Value{
"cpu_request": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}",
},
},
"pipelinechannel--cpu_request": {
Kind: &structpb.Value_StringValue{
StringValue: "200m",
},
},
"cpu_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}",
},
},
"pipelinechannel--cpu_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "400m",
},
},
"memory_request": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--memory_request']}}",
},
},
"pipelinechannel--memory_request": {
Kind: &structpb.Value_StringValue{
StringValue: "100Mi",
},
},
"memory_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}",
},
},
"pipelinechannel--memory_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "500Mi",
},
},
"accelerator_type": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}",
},
},
"pipelinechannel--accelerator_type": {
Kind: &structpb.Value_StringValue{
StringValue: "nvidia.com/gpu",
},
},
"accelerator_count": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}",
},
},
"pipelinechannel--accelerator_count": {
Kind: &structpb.Value_StringValue{
StringValue: "1",
},
},
},
},
}

podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.Containers, 1)

res := podSpec.Containers[0].Resources
assert.Equal(t, k8sres.MustParse("200m"), res.Requests[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("400m"), res.Limits[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}

func Test_initPodSpecPatch_legacy_resources(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{
CpuRequest: 200,
CpuLimit: 400,
ResourceMemoryRequest: "100Mi",
ResourceMemoryLimit: "500Mi",
Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{
Type: "nvidia.com/gpu",
Count: 1,
},
},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{}

podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.Containers, 1)

res := podSpec.Containers[0].Resources
assert.Equal(t, k8sres.MustParse("200"), res.Requests[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("400"), res.Limits[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}

func Test_makeVolumeMountPatch(t *testing.T) {
type args struct {
pvcMount []*kubernetesplatform.PvcMount
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/apiserver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENS
github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/pipeline-loops/LICENSE,Apache-2.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler/pkg/apis/exithandler,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-exithandler/LICENSE,Apache-2.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/apis/kfptask,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-kfptask/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/launcher.csv
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/lestrrat-go/strftime,https://github.com/lestrrat-go/strftime/blob/v1.0.4/LICENSE,MIT
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/api v0.0.0-20250102152816-873e9dedd766
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/lestrrat-go/strftime v1.0.4
Expand Down
5 changes: 2 additions & 3 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d282374

Please sign in to comment.