From 84490621fe92226c4b81a979ccd948eeaed9b777 Mon Sep 17 00:00:00 2001 From: "Ricardo M. Oliveira" Date: Tue, 19 Nov 2024 18:11:32 -0300 Subject: [PATCH] Backport fixes in kubeflow/pipelines#11075 Introduced back the functions to convert k8s size values to float, but moved to kfp.dsl.utils Signed-off-by: Ricardo M. Oliveira --- .../kfp/compiler/pipeline_spec_builder.py | 22 +++--- sdk/python/kfp/dsl/utils.py | 78 +++++++++++++++++++ 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index afc014530fa2..8214d0acc7e0 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -71,8 +71,9 @@ def to_protobuf_value(value: type_utils.PARAMETER_TYPES) -> struct_pb2.Value: return struct_pb2.Value(number_value=value) elif isinstance(value, dict): return struct_pb2.Value( - struct_value=struct_pb2.Struct( - fields={k: to_protobuf_value(v) for k, v in value.items()})) + struct_value=struct_pb2.Struct(fields={ + k: to_protobuf_value(v) for k, v in value.items() + })) elif isinstance(value, list): return struct_pb2.Value( list_value=struct_pb2.ListValue( @@ -645,19 +646,20 @@ def convert_to_placeholder(input_value: str) -> str: if task.container_spec.resources is not None: if task.container_spec.resources.cpu_request is not None: - container_spec.resources.resource_cpu_request = ( - convert_to_placeholder( + container_spec.resources.cpu_request = utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: - container_spec.resources.resource_cpu_limit = ( - convert_to_placeholder(task.container_spec.resources.cpu_limit)) + container_spec.resources.cpu_limit = utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: - container_spec.resources.resource_memory_request = ( - convert_to_placeholder( + container_spec.resources.memory_request = utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: - container_spec.resources.resource_memory_limit = ( - convert_to_placeholder( + container_spec.resources.memory_limit = utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( task.container_spec.resources.memory_limit)) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( diff --git a/sdk/python/kfp/dsl/utils.py b/sdk/python/kfp/dsl/utils.py index 4400bca89424..a91ce50d047a 100644 --- a/sdk/python/kfp/dsl/utils.py +++ b/sdk/python/kfp/dsl/utils.py @@ -20,6 +20,8 @@ import types from typing import List +from kfp.dsl import constants + COMPONENT_NAME_PREFIX = 'comp-' _EXECUTOR_LABEL_PREFIX = 'exec-' @@ -126,3 +128,79 @@ def validate_pipeline_name(name: str) -> None: 'Please specify a pipeline name that matches the regular ' 'expression "^[a-z0-9][a-z0-9-]{0,127}$" using ' '`dsl.pipeline(name=...)` decorator.' % name) + + +def validate_cpu_request_limit_to_float(cpu: str) -> float: + """Validates cpu request/limit string and converts to its numeric float + value. + + Args: + cpu: CPU requests or limits. This string should be a number or a + number followed by an "m" to indicate millicores (1/1000). For + more information, see `Specify a CPU Request and a CPU Limit + `_. + + Raises: + ValueError if the cpu request/limit string value is invalid. + + Returns: + The numeric value (float) of the cpu request/limit. + """ + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + + return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + + +def validate_memory_request_limit_to_float(memory: str) -> float: + """Validates memory request/limit string and converts to its numeric value. + + Args: + memory: Memory requests or limits. This string should be a number or + a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", + "Gi", "M", "Mi", "K", or "Ki". + + Raises: + ValueError if the memory request/limit string value is invalid. + + Returns: + The numeric value (float) of the memory request/limit. + """ + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') + + if memory.endswith('E'): + memory = float(memory[:-1]) * constants._E / constants._G + elif memory.endswith('Ei'): + memory = float(memory[:-2]) * constants._EI / constants._G + elif memory.endswith('P'): + memory = float(memory[:-1]) * constants._P / constants._G + elif memory.endswith('Pi'): + memory = float(memory[:-2]) * constants._PI / constants._G + elif memory.endswith('T'): + memory = float(memory[:-1]) * constants._T / constants._G + elif memory.endswith('Ti'): + memory = float(memory[:-2]) * constants._TI / constants._G + elif memory.endswith('G'): + memory = float(memory[:-1]) + elif memory.endswith('Gi'): + memory = float(memory[:-2]) * constants._GI / constants._G + elif memory.endswith('M'): + memory = float(memory[:-1]) * constants._M / constants._G + elif memory.endswith('Mi'): + memory = float(memory[:-2]) * constants._MI / constants._G + elif memory.endswith('K'): + memory = float(memory[:-1]) * constants._K / constants._G + elif memory.endswith('Ki'): + memory = float(memory[:-2]) * constants._KI / constants._G + else: + # By default interpret as a plain integer, in the unit of Bytes. + memory = float(memory) / constants._G + + return memory