From d282374ebac711eb8d22a8c3f9f46b74717826f1 Mon Sep 17 00:00:00 2001 From: mprahl Date: Mon, 6 Jan 2025 17:17:07 -0500 Subject: [PATCH] feat(sdk/backend): Add support for placeholders in resource limits The API introduced new fields prefixed with Resource (e.g. ResourceCpuLimit) to replace the old fields without the prefix. The Driver hadn't been updated to honor those fields but the SDK started using them which led to unexpected behavior. The Driver now honors both fields but prioritizes the new fields. The SDK now only sets the new fields. The outcome is that resource limits/requests can now use input parameters. Note that pipeline_spec_builder.py was doing some validation on the limits/requests being set, but that's already handled in the user facing method (e.g. set_cpu_limit). Resolves: https://github.com/kubeflow/pipelines/issues/11500 Signed-off-by: mprahl --- backend/src/v2/driver/driver.go | 131 +++++++++++++---- backend/src/v2/driver/driver_test.go | 132 ++++++++++++++++++ backend/third_party_licenses/apiserver.csv | 2 +- backend/third_party_licenses/driver.csv | 2 +- backend/third_party_licenses/launcher.csv | 2 +- go.mod | 2 +- go.sum | 5 +- sdk/python/kfp/compiler/compiler_test.py | 18 --- sdk/python/kfp/compiler/compiler_utils.py | 77 ---------- .../kfp/compiler/pipeline_spec_builder.py | 35 ++--- .../pipeline_with_resource_spec.yaml | 14 +- 11 files changed, 250 insertions(+), 170 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 7afc1a6b892..c30edb5fd94 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -362,6 +362,35 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl return execution, nil } +// getPodResource will accept the new field that accepts placeholders (e.g. resourceMemoryLimit) and the old float64 +// field (e.g. memoryLimit) and return the resolved value as a Quantity. If the returned Quantity is nil, it was not set +// by the user. If the new field is set, the old field is ignored. +func getPodResource( + new string, old float64, executorInput *pipelinespec.ExecutorInput, oldFmtStr string, +) (*k8sres.Quantity, error) { + var resolved string + + if new != "" { + var err error + + resolved, err = resolvePodSpecInputRuntimeParameter(new, executorInput) + if err != nil { + return nil, fmt.Errorf("failed to resolve executor input when retrieving pod resource: %w", err) + } + } else if old != 0 { + resolved = fmt.Sprintf(oldFmtStr, old) + } else { + return nil, nil + } + + q, err := k8sres.ParseQuantity(resolved) + if err != nil { + return nil, err + } + + return &q, nil +} + // initPodSpecPatch generates a strategic merge patch for pod spec, it is merged // to container base template generated in compiler/container.go. Therefore, only // dynamic values are patched here. The volume mounts / configmap mounts are @@ -414,46 +443,86 @@ func initPodSpecPatch( Limits: map[k8score.ResourceName]k8sres.Quantity{}, Requests: map[k8score.ResourceName]k8sres.Quantity{}, } - memoryLimit := container.GetResources().GetMemoryLimit() - if memoryLimit != 0 { - q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryLimit)) - if err != nil { - return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) - } - res.Limits[k8score.ResourceMemory] = q + + memoryLimit, err := getPodResource( + container.GetResources().GetResourceMemoryLimit(), + container.GetResources().GetMemoryLimit(), + executorInput, + "%vG", + ) + if err != nil { + return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) } - memoryRequest := container.GetResources().GetMemoryRequest() - if memoryRequest != 0 { - q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryRequest)) - if err != nil { - return nil, err - } - res.Requests[k8score.ResourceMemory] = q + if memoryLimit != nil { + res.Limits[k8score.ResourceMemory] = *memoryLimit } - cpuLimit := container.GetResources().GetCpuLimit() - if cpuLimit != 0 { - q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuLimit)) - if err != nil { - return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) - } - res.Limits[k8score.ResourceCPU] = q + + memoryRequest, err := getPodResource( + container.GetResources().GetResourceMemoryRequest(), + container.GetResources().GetMemoryRequest(), + executorInput, + "%vG", + ) + if err != nil { + return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) } - cpuRequest := container.GetResources().GetCpuRequest() - if cpuRequest != 0 { - q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuRequest)) - if err != nil { - return nil, err - } - res.Requests[k8score.ResourceCPU] = q + if memoryRequest != nil { + res.Requests[k8score.ResourceMemory] = *memoryRequest + } + + cpuLimit, err := getPodResource( + container.GetResources().GetResourceCpuLimit(), + container.GetResources().GetCpuLimit(), + executorInput, + "%v", + ) + if err != nil { + return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) + } + if cpuLimit != nil { + res.Limits[k8score.ResourceCPU] = *cpuLimit + } + + cpuRequest, err := getPodResource( + container.GetResources().GetResourceCpuRequest(), + container.GetResources().GetCpuRequest(), + executorInput, + "%v", + ) + if err != nil { + return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) } + if cpuRequest != nil { + res.Requests[k8score.ResourceCPU] = *cpuRequest + } + accelerator := container.GetResources().GetAccelerator() if accelerator != nil { - if accelerator.GetType() != "" && accelerator.GetCount() > 0 { - acceleratorType, err := resolvePodSpecInputRuntimeParameter(accelerator.GetType(), executorInput) + var acceleratorType string + if accelerator.GetResourceType() != "" { + acceleratorType, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceType(), executorInput) if err != nil { return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) } - q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", accelerator.GetCount())) + } else if accelerator.GetType() != "" { + acceleratorType = accelerator.GetType() + } + + var acceleratorCount string + + if accelerator.GetResourceCount() != "" { + var err error + + acceleratorCount, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceCount(), executorInput) + if err != nil { + return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) + } + } else if accelerator.Count > 0 { + acceleratorCount = fmt.Sprintf("%v", accelerator.GetCount()) + } + + if acceleratorType != "" && acceleratorCount != "" { + q, err := k8sres.ParseQuantity(acceleratorCount) if err != nil { return nil, fmt.Errorf("failed to init podSpecPatch: %w", err) } diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index d2f4553a9b5..c84896c41de 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -17,6 +17,7 @@ import ( "encoding/json" "testing" + "google.golang.org/protobuf/types/known/structpb" k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -259,6 +260,137 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { } } +func Test_initPodSpecPatch_resource_placeholders(t *testing.T) { + containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{ + Image: "python:3.9", + Args: []string{"--function_to_execute", "add"}, + Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"}, + Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{ + ResourceCpuRequest: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}", + ResourceCpuLimit: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}", + ResourceMemoryRequest: "{{$.inputs.parameters['pipelinechannel--memory_request']}}", + ResourceMemoryLimit: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}", + Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{ + ResourceType: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}", + ResourceCount: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}", + }, + }, + } + componentSpec := &pipelinespec.ComponentSpec{} + executorInput := &pipelinespec.ExecutorInput{ + Inputs: &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "cpu_request": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}", + }, + }, + "pipelinechannel--cpu_request": { + Kind: &structpb.Value_StringValue{ + StringValue: "200m", + }, + }, + "cpu_limit": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}", + }, + }, + "pipelinechannel--cpu_limit": { + Kind: &structpb.Value_StringValue{ + StringValue: "400m", + }, + }, + "memory_request": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--memory_request']}}", + }, + }, + "pipelinechannel--memory_request": { + Kind: &structpb.Value_StringValue{ + StringValue: "100Mi", + }, + }, + "memory_limit": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}", + }, + }, + "pipelinechannel--memory_limit": { + Kind: &structpb.Value_StringValue{ + StringValue: "500Mi", + }, + }, + "accelerator_type": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}", + }, + }, + "pipelinechannel--accelerator_type": { + Kind: &structpb.Value_StringValue{ + StringValue: "nvidia.com/gpu", + }, + }, + "accelerator_count": { + Kind: &structpb.Value_StringValue{ + StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}", + }, + }, + "pipelinechannel--accelerator_count": { + Kind: &structpb.Value_StringValue{ + StringValue: "1", + }, + }, + }, + }, + } + + podSpec, err := initPodSpecPatch( + containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", + ) + assert.Nil(t, err) + assert.Len(t, podSpec.Containers, 1) + + res := podSpec.Containers[0].Resources + assert.Equal(t, k8sres.MustParse("200m"), res.Requests[k8score.ResourceCPU]) + assert.Equal(t, k8sres.MustParse("400m"), res.Limits[k8score.ResourceCPU]) + assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory]) + assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory]) + assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")]) +} + +func Test_initPodSpecPatch_legacy_resources(t *testing.T) { + containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{ + Image: "python:3.9", + Args: []string{"--function_to_execute", "add"}, + Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"}, + Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{ + CpuRequest: 200, + CpuLimit: 400, + ResourceMemoryRequest: "100Mi", + ResourceMemoryLimit: "500Mi", + Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{ + Type: "nvidia.com/gpu", + Count: 1, + }, + }, + } + componentSpec := &pipelinespec.ComponentSpec{} + executorInput := &pipelinespec.ExecutorInput{} + + podSpec, err := initPodSpecPatch( + containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", + ) + assert.Nil(t, err) + assert.Len(t, podSpec.Containers, 1) + + res := podSpec.Containers[0].Resources + assert.Equal(t, k8sres.MustParse("200"), res.Requests[k8score.ResourceCPU]) + assert.Equal(t, k8sres.MustParse("400"), res.Limits[k8score.ResourceCPU]) + assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory]) + assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory]) + assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")]) +} + func Test_makeVolumeMountPatch(t *testing.T) { type args struct { pvcMount []*kubernetesplatform.PvcMount diff --git a/backend/third_party_licenses/apiserver.csv b/backend/third_party_licenses/apiserver.csv index 0081303485d..73a9148ef64 100644 --- a/backend/third_party_licenses/apiserver.csv +++ b/backend/third_party_licenses/apiserver.csv @@ -83,7 +83,7 @@ github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENS github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/pipeline-loops/LICENSE,Apache-2.0 github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler/pkg/apis/exithandler,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-exithandler/LICENSE,Apache-2.0 github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/apis/kfptask,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-kfptask/LICENSE,Apache-2.0 -github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0 +github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0 diff --git a/backend/third_party_licenses/driver.csv b/backend/third_party_licenses/driver.csv index be1af93f94f..4b872f93fd7 100644 --- a/backend/third_party_licenses/driver.csv +++ b/backend/third_party_licenses/driver.csv @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0 github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT -github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0 +github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0 diff --git a/backend/third_party_licenses/launcher.csv b/backend/third_party_licenses/launcher.csv index bed574d8d78..bda51c275bc 100644 --- a/backend/third_party_licenses/launcher.csv +++ b/backend/third_party_licenses/launcher.csv @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0 github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT -github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0 +github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0 github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0 github.com/lestrrat-go/strftime,https://github.com/lestrrat-go/strftime/blob/v1.0.4/LICENSE,MIT diff --git a/go.mod b/go.mod index 92b0f5ecdb4..aeb6603c893 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops v0.0.0-20240417221339-0b894195443c github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20240417221339-0b894195443c github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20240417221339-0b894195443c - github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31 + github.com/kubeflow/pipelines/api v0.0.0-20250102152816-873e9dedd766 github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49 github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31 github.com/lestrrat-go/strftime v1.0.4 diff --git a/go.sum b/go.sum index c7088e69cab..1d53c021611 100644 --- a/go.sum +++ b/go.sum @@ -664,8 +664,8 @@ github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20240417 github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20240417221339-0b894195443c/go.mod h1:a6DSo/UxoG4hkJXJMo4nSmHURDEEiEVremmXy4GwdII= github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20240417221339-0b894195443c h1:/BevRDQhW+0ULFnDhYTq6JKiml0rORDW3qTnnOgGPnc= github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20240417221339-0b894195443c/go.mod h1:PGwbV4v8F6E8RXuvtJ9qpnIiESpK8LL3JDSsGo6nT+o= -github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31 h1:I4DTAdIkzWlSuVXlIxjJhwK0TxYH2/CEWP7DO7RGSJg= -github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31/go.mod h1:T7TOQB36gGe97yUdfVAnYK5uuT0+uQbLNHDUHxYkmE4= +github.com/kubeflow/pipelines/api v0.0.0-20250102152816-873e9dedd766 h1:ZXz+Ki+hxez8vsr5hWHsjYWpa9V/dkp3iI4XTtMBH7A= +github.com/kubeflow/pipelines/api v0.0.0-20250102152816-873e9dedd766/go.mod h1:puPWVUzB1VCb8lVq4g06IR2rIXuSpejDTd/FkSr6nvc= github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49 h1:Xf1qun8x4ZJj/nLZpUSIaphDK04NKeV7WME31qIC8Xo= github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49/go.mod h1:UhJrmpVSWawsAsSr1OzZfsEZTvQce+GvGwcZ58ULEhM= github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31 h1:t1G2SexX+SwtYiaFrwH1lzGRSiXYMjd2QDT9842Ytpc= @@ -1378,7 +1378,6 @@ google.golang.org/genproto v0.0.0-20210420162539-3c870d7478d2/go.mod h1:P3QM42oQ google.golang.org/genproto v0.0.0-20210423144448-3a41ef94ed2b/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210429181445-86c259c2b4ab/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210506142907-4a47615972c2/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= -google.golang.org/genproto v0.0.0-20211026145609-4688e4c4e024/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211221231510-d629cc9a93d5/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY= diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 898187e36c3..b09b1b52c5d 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3501,9 +3501,6 @@ def simple_pipeline(): self.assertEqual( '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] ['container']['resources']['resourceCpuLimit']) - self.assertEqual( - 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2'] - ['container']['resources']['cpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) @@ -3511,9 +3508,6 @@ def simple_pipeline(): self.assertEqual( '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] ['container']['resources']['resourceMemoryLimit']) - self.assertEqual( - 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3'] - ['container']['resources']['memoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) @@ -3521,27 +3515,15 @@ def simple_pipeline(): self.assertEqual( '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceCpuRequest']) - self.assertEqual( - 2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuRequest']) self.assertEqual( '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceCpuLimit']) - self.assertEqual( - 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['cpuLimit']) self.assertEqual( '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceMemoryRequest']) - self.assertEqual( - 4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryRequest']) self.assertEqual( '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceMemoryLimit']) - self.assertEqual( - 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['memoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index dc10665944f..6aae6b8054f 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -15,7 +15,6 @@ import collections import copy -import re from typing import DefaultDict, Dict, List, Mapping, Set, Tuple, Union from kfp import dsl @@ -805,79 +804,3 @@ def recursive_replace_placeholders(data: Union[Dict, List], old_value: str, if isinstance(data, pipeline_channel.PipelineChannel): data = str(data) return new_value if data == old_value else data - - -def validate_cpu_request_limit_to_float(cpu: str) -> float: - """Validates cpu request/limit string and converts to its numeric float - value. - - Args: - cpu: CPU requests or limits. This string should be a number or a - number followed by an "m" to indicate millicores (1/1000). For - more information, see `Specify a CPU Request and a CPU Limit - `_. - - Raises: - ValueError if the cpu request/limit string value is invalid. - - Returns: - The numeric value (float) of the cpu request/limit. - """ - if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: - raise ValueError( - 'Invalid cpu string. Should be float or integer, or integer' - ' followed by "m".') - - return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) - - -def validate_memory_request_limit_to_float(memory: str) -> float: - """Validates memory request/limit string and converts to its numeric value. - - Args: - memory: Memory requests or limits. This string should be a number or - a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", - "Gi", "M", "Mi", "K", or "Ki". - - Raises: - ValueError if the memory request/limit string value is invalid. - - Returns: - The numeric value (float) of the memory request/limit. - """ - if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', - memory) is None: - raise ValueError( - 'Invalid memory string. Should be a number or a number ' - 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' - '"Gi", "M", "Mi", "K", "Ki".') - - if memory.endswith('E'): - memory = float(memory[:-1]) * constants._E / constants._G - elif memory.endswith('Ei'): - memory = float(memory[:-2]) * constants._EI / constants._G - elif memory.endswith('P'): - memory = float(memory[:-1]) * constants._P / constants._G - elif memory.endswith('Pi'): - memory = float(memory[:-2]) * constants._PI / constants._G - elif memory.endswith('T'): - memory = float(memory[:-1]) * constants._T / constants._G - elif memory.endswith('Ti'): - memory = float(memory[:-2]) * constants._TI / constants._G - elif memory.endswith('G'): - memory = float(memory[:-1]) - elif memory.endswith('Gi'): - memory = float(memory[:-2]) * constants._GI / constants._G - elif memory.endswith('M'): - memory = float(memory[:-1]) * constants._M / constants._G - elif memory.endswith('Mi'): - memory = float(memory[:-2]) * constants._MI / constants._G - elif memory.endswith('K'): - memory = float(memory[:-1]) * constants._K / constants._G - elif memory.endswith('Ki'): - memory = float(memory[:-2]) * constants._KI / constants._G - else: - # By default interpret as a plain integer, in the unit of Bytes. - memory = float(memory) / constants._G - - return memory diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 3061faab5e4..c72836d9a07 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -650,32 +650,17 @@ def convert_to_placeholder(input_value: str) -> str: if task.container_spec.resources is not None: if task.container_spec.resources.cpu_request is not None: - container_spec.resources.resource_cpu_request = ( - convert_to_placeholder( - task.container_spec.resources.cpu_request)) - container_spec.resources.cpu_request = compiler_utils.validate_cpu_request_limit_to_float( - cpu=convert_to_placeholder( - task.container_spec.resources.cpu_request)) + container_spec.resources.resource_cpu_request = convert_to_placeholder( + task.container_spec.resources.cpu_request) if task.container_spec.resources.cpu_limit is not None: - container_spec.resources.resource_cpu_limit = ( - convert_to_placeholder(task.container_spec.resources.cpu_limit)) - container_spec.resources.cpu_limit = compiler_utils.validate_cpu_request_limit_to_float( - cpu=convert_to_placeholder( - task.container_spec.resources.cpu_limit)) + container_spec.resources.resource_cpu_limit = convert_to_placeholder( + task.container_spec.resources.cpu_limit) if task.container_spec.resources.memory_request is not None: - container_spec.resources.resource_memory_request = ( - convert_to_placeholder( - task.container_spec.resources.memory_request)) - container_spec.resources.memory_request = compiler_utils.validate_memory_request_limit_to_float( - memory=convert_to_placeholder( - task.container_spec.resources.memory_request)) + container_spec.resources.resource_memory_request = convert_to_placeholder( + task.container_spec.resources.memory_request) if task.container_spec.resources.memory_limit is not None: - container_spec.resources.resource_memory_limit = ( - convert_to_placeholder( - task.container_spec.resources.memory_limit)) - container_spec.resources.memory_limit = compiler_utils.validate_memory_request_limit_to_float( - memory=convert_to_placeholder( - task.container_spec.resources.memory_limit)) + container_spec.resources.resource_memory_limit = convert_to_placeholder( + task.container_spec.resources.memory_limit) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec @@ -684,10 +669,6 @@ def convert_to_placeholder(input_value: str) -> str: task.container_spec.resources.accelerator_type), resource_count=convert_to_placeholder( task.container_spec.resources.accelerator_count), - type=convert_to_placeholder( - task.container_spec.resources.accelerator_type), - count=convert_to_placeholder( - int(task.container_spec.resources.accelerator_count)), )) return container_spec diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index 81595db1803..d7f0921e59c 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -61,18 +61,12 @@ deploymentSpec: image: gcr.io/my-project/my-fancy-trainer resources: accelerator: - count: '1' - type: 'tpu-v3' resourceCount: '1' - resourceType: 'tpu-v3' - cpuLimit: 4.0 - cpuRequest: 2.0 - memoryLimit: 15.032385536 - memoryRequest: 4.294967296 + resourceType: tpu-v3 resourceCpuLimit: '4' resourceCpuRequest: '2' - resourceMemoryLimit: '14Gi' - resourceMemoryRequest: '4Gi' + resourceMemoryLimit: 14Gi + resourceMemoryRequest: 4Gi pipelineInfo: description: A linear two-step pipeline with resource specification. name: two-step-pipeline-with-resource-spec @@ -125,4 +119,4 @@ root: isOptional: true parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.8.0 +sdkVersion: kfp-2.11.0