From fa4d03cb9e1ebfb48866510d60148d14954a36f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Wed, 27 Nov 2024 16:52:09 +0100 Subject: [PATCH] fix(sdk/backend): allow using cluster's default sc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When using the SDK's function `CreatePVC`, leaving the `storage_class_name` field empty will result in the cluster’s default storage class being applied. To enhance modularity and clarity, the logic for building the PVC definition has been refactored into a dedicated function. Error messages have been updated to align with this change, and unit tests have been implemented to cover all required and optional fields. In the code handling annotations, the `GetFields` method has replaced the use of the `AsMap` method. This approach is more convenient and eliminates the need for type conversion to `structpb.Value`. Resolves: #11396 Signed-off-by: Sébastien Han --- CHANGELOG.md | 2 +- backend/src/v2/driver/driver.go | 152 +++++++++-------- backend/src/v2/driver/driver_test.go | 156 ++++++++++++++++++ kubernetes_platform/python/README.md | 2 +- .../python/kfp/kubernetes/volume.py | 2 +- .../data/create_dynamic_pvc_default_sc.py | 32 ++++ .../data/create_dynamic_pvc_default_sc.yaml | 99 +++++++++++ 7 files changed, 369 insertions(+), 76 deletions(-) create mode 100644 kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.py create mode 100644 kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ad9eaebd03..54f9a470420 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,7 +65,7 @@ * **kubernetes_platform:** fix api-generator docker mount for SELinux ([\#10890](https://github.com/kubeflow/pipelines/issues/10890)) ([e69078b](https://github.com/kubeflow/pipelines/commit/e69078b2b65c0e34fd56499bbe34da882dc6e009)) * **manifests:** Move metacontroller to the top in kustmization.yaml ([\#10669](https://github.com/kubeflow/pipelines/issues/10669)) ([4e9fe75](https://github.com/kubeflow/pipelines/commit/4e9fe75d4564bbcdde7cd358298361e94d4a20be)) * **sdk:** Throw 'exit_task cannot depend on any other tasks.' error when an ExitHandler has a parameter dependent on other task ([\#11005](https://github.com/kubeflow/pipelines/issues/11005)) ([08185e7](https://github.com/kubeflow/pipelines/commit/08185e71717ef628be3cbe2cdeb1fd55b25581d4)) - +* **sdk/backend**: Use the cluster's default storage class when creating a PVC with an empty storage class ([\#11396](httpshttps://github.com/kubeflow/pipelines/issues/11396)) ### Other Pull Requests diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 960ec6148e8..f066066defe 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1657,63 +1657,6 @@ func createPVC( inputs := execution.ExecutorInput.Inputs glog.Infof("Input parameter values: %+v", inputs.ParameterValues) - // Requied input: access_modes - accessModeInput, ok := inputs.ParameterValues["access_modes"] - if !ok || accessModeInput == nil { - return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: parameter access_modes not provided") - } - var accessModes []k8score.PersistentVolumeAccessMode - for _, value := range accessModeInput.GetListValue().GetValues() { - accessModes = append(accessModes, accessModeMap[value.GetStringValue()]) - } - - // Optional input: pvc_name and pvc_name_suffix - // Can only provide at most one of these two parameters. - // If neither is provided, PVC name is a randomly generated UUID. - pvcNameSuffixInput := inputs.ParameterValues["pvc_name_suffix"] - pvcNameInput := inputs.ParameterValues["pvc_name"] - if pvcNameInput.GetStringValue() != "" && pvcNameSuffixInput.GetStringValue() != "" { - return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: at most one of pvc_name and pvc_name_suffix can be non-empty") - } else if pvcNameSuffixInput.GetStringValue() != "" { - pvcName = uuid.NewString() + pvcNameSuffixInput.GetStringValue() - // Add pvcName to the executor input for fingerprint generation - execution.ExecutorInput.Inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName) - } else if pvcNameInput.GetStringValue() != "" { - pvcName = pvcNameInput.GetStringValue() - } else { - pvcName = uuid.NewString() - // Add pvcName to the executor input for fingerprint generation - execution.ExecutorInput.Inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName) - } - - // Required input: size - volumeSizeInput, ok := inputs.ParameterValues["size"] - if !ok || volumeSizeInput == nil { - return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: parameter volumeSize not provided") - } - - // Optional input: storage_class_name - // When not provided, use default value `standard` - storageClassNameInput, ok := inputs.ParameterValues["storage_class_name"] - var storageClassName string - if !ok { - storageClassName = "standard" - } else { - storageClassName = storageClassNameInput.GetStringValue() - } - - // Optional input: annotations - pvcAnnotationsInput := inputs.ParameterValues["annotations"] - pvcAnnotations := make(map[string]string) - for key, val := range pvcAnnotationsInput.GetStructValue().AsMap() { - typedVal := val.(structpb.Value) - pvcAnnotations[key] = typedVal.GetStringValue() - } - - // Optional input: volume_name - volumeNameInput := inputs.ParameterValues["volume_name"] - volumeName := volumeNameInput.GetStringValue() - // Get execution fingerprint and MLMD ID for caching // If pvcName includes a randomly generated UUID, it is added in the execution input as a key-value pair for this purpose only // The original execution is not changed. @@ -1761,22 +1704,10 @@ func createPVC( return pvcName, createdExecution, pb.Execution_CACHED, nil } - // Create a PersistentVolumeClaim object - pvc := &k8score.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, - Annotations: pvcAnnotations, - }, - Spec: k8score.PersistentVolumeClaimSpec{ - AccessModes: accessModes, - Resources: k8score.ResourceRequirements{ - Requests: k8score.ResourceList{ - k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()), - }, - }, - StorageClassName: &storageClassName, - VolumeName: volumeName, - }, + // Build the PVC object + pvc, err := buildPVC(inputs) + if err != nil { + return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to build pvc definition: %w", err) } // Create the PVC in the cluster @@ -1801,6 +1732,81 @@ func createPVC( return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil } +// buildPVC creates a PersistentVolumeClaim object based on the provided inputs. +func buildPVC(inputs *pipelinespec.ExecutorInput_Inputs) (*k8score.PersistentVolumeClaim, error) { + // Required input: access_modes + accessModeInput, ok := inputs.ParameterValues["access_modes"] + if !ok || accessModeInput == nil { + return nil, fmt.Errorf("parameter access_modes not provided") + } + var accessModes []k8score.PersistentVolumeAccessMode + for _, value := range accessModeInput.GetListValue().GetValues() { + accessModes = append(accessModes, accessModeMap[value.GetStringValue()]) + } + + // Optional input: pvc_name and pvc_name_suffix + // Can only provide at most one of these two parameters. + // If neither is provided, PVC name is a randomly generated UUID. + pvcNameSuffixInput := inputs.ParameterValues["pvc_name_suffix"] + pvcNameInput := inputs.ParameterValues["pvc_name"] + var pvcName string + if pvcNameInput.GetStringValue() != "" && pvcNameSuffixInput.GetStringValue() != "" { + return nil, fmt.Errorf("at most one of pvc_name and pvc_name_suffix can be non-empty") + } else if pvcNameSuffixInput.GetStringValue() != "" { + pvcName = uuid.NewString() + pvcNameSuffixInput.GetStringValue() + // Add pvcName to the executor input for fingerprint generation + inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName) + } else if pvcNameInput.GetStringValue() != "" { + pvcName = pvcNameInput.GetStringValue() + } else { + pvcName = uuid.NewString() + // Add pvcName to the executor input for fingerprint generation + inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName) + } + + // Required input: size + volumeSizeInput, ok := inputs.ParameterValues["size"] + if !ok || volumeSizeInput == nil { + return nil, fmt.Errorf("parameter size not provided") + } + + // Optional input: annotations + pvcAnnotationsInput := inputs.ParameterValues["annotations"] + pvcAnnotations := make(map[string]string) + for key, val := range pvcAnnotationsInput.GetStructValue().GetFields() { + pvcAnnotations[key] = val.GetStringValue() + } + + // Optional input: volume_name + volumeNameInput := inputs.ParameterValues["volume_name"] + volumeName := volumeNameInput.GetStringValue() + + // Create a PersistentVolumeClaim object + pvc := &k8score.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Annotations: pvcAnnotations, + }, + Spec: k8score.PersistentVolumeClaimSpec{ + AccessModes: accessModes, + Resources: k8score.ResourceRequirements{ + Requests: k8score.ResourceList{ + k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()), + }, + }, + VolumeName: volumeName, + }, + } + + // Optional input: storage_class_name + if storageClassNameInput, ok := inputs.ParameterValues["storage_class_name"]; ok { + storageClassName := storageClassNameInput.GetStringValue() + pvc.Spec.StorageClassName = &storageClassName + } + + return pvc, nil +} + func deletePVC( ctx context.Context, k8sClient kubernetes.Interface, diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index 130291f3cac..aec84ca4f0f 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -17,6 +17,7 @@ import ( "encoding/json" "testing" + "google.golang.org/protobuf/types/known/structpb" k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1621,3 +1622,158 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) { }) } } + +func Test_buildPVC(t *testing.T) { + type args struct { + inputs *pipelinespec.ExecutorInput_Inputs + } + accessModesList, _ := structpb.NewList([]interface{}{"ReadWriteOnce"}) + accessModes := structpb.NewListValue(accessModesList) + tests := []struct { + name string + args args + wantErr bool + errMsg string + }{ + { + "valid case", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": accessModes, + "pvc_name": structpb.NewStringValue("my-pvc"), + "size": structpb.NewStringValue("5Gi"), + "storage_class_name": structpb.NewStringValue("standard"), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + false, + "", + }, + { + "missing required access_modes", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": nil, + "pvc_name": structpb.NewStringValue("my-pvc-2"), + "size": structpb.NewStringValue("5Gi"), + "storage_class_name": structpb.NewStringValue("standard"), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + true, + "parameter access_modes not provided", + }, + { + "both pvc_name_suffix and pvc_name provided", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": accessModes, + "pvc_name": structpb.NewStringValue("my-pvc-3"), + "pvc_name_suffix": structpb.NewStringValue("-my-pvc"), + "size": structpb.NewStringValue("5Gi"), + "storage_class_name": structpb.NewStringValue("standard"), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + true, + "at most one of pvc_name and pvc_name_suffix can be non-empty", + }, + { + "missing required size", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": accessModes, + "pvc_name": structpb.NewStringValue("my-pvc"), + "storage_class_name": structpb.NewStringValue("standard"), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + true, + "parameter size not provided", + }, + { + "annotations provided", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": accessModes, + "pvc_name": structpb.NewStringValue("my-pvc"), + "size": structpb.NewStringValue("5Gi"), + "annotations": structpb.NewStructValue( + &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "key1": structpb.NewStringValue("value1"), + "key2": structpb.NewStringValue("value2"), + }, + }, + ), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + false, + "", + }, + { + "storage_class_name not provided", + args{ + &pipelinespec.ExecutorInput_Inputs{ + ParameterValues: map[string]*structpb.Value{ + "access_modes": accessModes, + "pvc_name": structpb.NewStringValue("my-pvc"), + "size": structpb.NewStringValue("5Gi"), + "volume_name": structpb.NewStringValue("volume-name"), + }, + }, + }, + false, + "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pvc, err := buildPVC(tt.args.inputs) + if tt.wantErr { + assert.NotNil(t, err) + assert.Nil(t, pvc) + assert.Contains(t, err.Error(), tt.errMsg) + } else { + assert.Nil(t, err) + assert.NotNil(t, pvc) + // Test the PVC fields + if storageClassNameInput, ok := tt.args.inputs.ParameterValues["storage_class_name"]; ok { + assert.Equal(t, storageClassNameInput.GetStringValue(), *pvc.Spec.StorageClassName) + } else { + assert.Nil(t, pvc.Spec.StorageClassName) + } + if annotationsInput, ok := tt.args.inputs.ParameterValues["annotations"]; ok { + annotations := annotationsInput.GetStructValue().GetFields() + assert.Equal(t, len(annotations), len(pvc.Annotations)) + for key, value := range annotations { + assert.Equal(t, value.GetStringValue(), pvc.Annotations[key]) + } + } + if pvcNameInput, ok := tt.args.inputs.ParameterValues["pvc_name"]; ok { + assert.Equal(t, pvcNameInput.GetStringValue(), pvc.Name) + } + if pvcNameSuffixInput, ok := tt.args.inputs.ParameterValues["pvc_name_suffix"]; ok { + assert.Equal(t, pvc.Name, pvcNameSuffixInput.GetStringValue()) + } + if sizeInput, ok := tt.args.inputs.ParameterValues["size"]; ok { + storage := pvc.Spec.Resources.Requests[k8score.ResourceStorage] + assert.Equal(t, sizeInput.GetStringValue(), storage.String()) + } + assert.Equal(t, k8score.ReadWriteOnce, pvc.Spec.AccessModes[0]) + assert.Equal(t, "volume-name", pvc.Spec.VolumeName) + } + }) + } +} diff --git a/kubernetes_platform/python/README.md b/kubernetes_platform/python/README.md index 5f89848d6ae..21e4e46a983 100644 --- a/kubernetes_platform/python/README.md +++ b/kubernetes_platform/python/README.md @@ -166,7 +166,7 @@ def my_pipeline(): pvc_name_suffix='-my-pvc', access_modes=['ReadWriteOnce'], size='5Gi', - storage_class_name='standard', + storage_class_name='standard', # optional - do not specify to use the default storage class ) task1 = make_data() diff --git a/kubernetes_platform/python/kfp/kubernetes/volume.py b/kubernetes_platform/python/kfp/kubernetes/volume.py index 4535d45bcd9..1d3716e64e0 100644 --- a/kubernetes_platform/python/kfp/kubernetes/volume.py +++ b/kubernetes_platform/python/kfp/kubernetes/volume.py @@ -49,7 +49,7 @@ def CreatePVC( of ``pvc_name`` and ``pvc_name_suffix`` can be provided. storage_class_name: Name of StorageClass from which to provision the PV to back the PVC. ``None`` indicates to use the cluster's default - storage_class_name. Set to ``''`` for a statically specified PVC. + storage class. Set to ``''`` for a statically specified PVC. volume_name: Pre-existing PersistentVolume that should back the provisioned PersistentVolumeClaim. Used for statically specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName `_. diff --git a/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.py b/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.py new file mode 100644 index 00000000000..8e004ba5c34 --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.py @@ -0,0 +1,32 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl +from kfp import kubernetes + + +@dsl.pipeline +def my_pipeline(): + kubernetes.CreatePVC( + pvc_name_suffix="-my-pvc", + access_modes=["ReadWriteOnce"], + size="5Mi", + annotations={"foo": "bar"}, + ) + + +if __name__ == "__main__": + from kfp import compiler + + compiler.Compiler().compile(my_pipeline, __file__.replace(".py", ".yaml")) diff --git a/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.yaml b/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.yaml new file mode 100644 index 00000000000..d5bfac20c41 --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/create_dynamic_pvc_default_sc.yaml @@ -0,0 +1,99 @@ +# PIPELINE DEFINITION +# Name: my-pipeline +components: + comp-createpvc: + executorLabel: exec-createpvc + inputDefinitions: + parameters: + access_modes: + description: 'AccessModes to request for the provisioned PVC. May + + be one or more of ``''ReadWriteOnce''``, ``''ReadOnlyMany''``, ``''ReadWriteMany''``, + or + + ``''ReadWriteOncePod''``. Corresponds to `PersistentVolumeClaim.spec.accessModes + `_.' + parameterType: LIST + annotations: + description: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations + `_. + isOptional: true + parameterType: STRUCT + pvc_name: + description: 'Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name + `_. + Only one of ``pvc_name`` and ``pvc_name_suffix`` can + + be provided.' + isOptional: true + parameterType: STRING + pvc_name_suffix: + description: 'Prefix to use for a dynamically generated name, which + + will take the form ``-``. Only one + + of ``pvc_name`` and ``pvc_name_suffix`` can be provided.' + isOptional: true + parameterType: STRING + size: + description: The size of storage requested by the PVC that will be provisioned. + For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage + `_. + parameterType: STRING + storage_class_name: + defaultValue: '' + description: 'Name of StorageClass from which to provision the PV + + to back the PVC. ``None`` indicates to use the cluster''s default + + storage class. Set to ``''''`` for a statically specified PVC.' + isOptional: true + parameterType: STRING + volume_name: + description: 'Pre-existing PersistentVolume that should back the + + provisioned PersistentVolumeClaim. Used for statically + + specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName + `_.' + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + name: + parameterType: STRING +deploymentSpec: + executors: + exec-createpvc: + container: + image: argostub/createpvc +pipelineInfo: + name: my-pipeline +root: + dag: + tasks: + createpvc: + cachingOptions: + enableCache: true + componentRef: + name: comp-createpvc + inputs: + parameters: + access_modes: + runtimeValue: + constant: + - ReadWriteOnce + annotations: + runtimeValue: + constant: + foo: bar + pvc_name_suffix: + runtimeValue: + constant: -my-pvc + size: + runtimeValue: + constant: 5Mi + taskInfo: + name: createpvc +schemaVersion: 2.1.0 +sdkVersion: kfp-2.10.1