From cf77a8c247c63f9e70b479555cc9db4003d157ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Paw=C5=82owski?= Date: Wed, 29 Jan 2025 11:26:11 +0000 Subject: [PATCH] use gcsfuse for a3ultra and a3 mega MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Piotr Pawłowski --- src/xpk/blueprints/a3mega/storage_crd.yaml | 52 +++++++++++++++++++ src/xpk/blueprints/a3ultra/storage_crd.yaml | 52 +++++++++++++++++++ src/xpk/commands/workload.py | 18 +++++-- src/xpk/core/blueprint/blueprint_generator.py | 13 +++-- src/xpk/core/storage.py | 1 + .../workload_decorators/rdma_decorator.py | 29 +++++++++++ .../workload_decorators/tcpxo_decorator.py | 30 ++++++++++- src/xpk/parser/cluster.py | 3 +- 8 files changed, 187 insertions(+), 11 deletions(-) create mode 100644 src/xpk/blueprints/a3mega/storage_crd.yaml create mode 100644 src/xpk/blueprints/a3ultra/storage_crd.yaml diff --git a/src/xpk/blueprints/a3mega/storage_crd.yaml b/src/xpk/blueprints/a3mega/storage_crd.yaml new file mode 100644 index 000000000..03c47f55a --- /dev/null +++ b/src/xpk/blueprints/a3mega/storage_crd.yaml @@ -0,0 +1,52 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: storages.xpk.x-k8s.io +spec: + group: xpk.x-k8s.io + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + type: + type: string + cluster: + type: string + auto_mount: + type: boolean + mount_point: + type: string + readonly: + type: boolean + manifest: + type: string + pv: + type: string + pvc: + type: string + required: + - type + - cluster + - auto_mount + - mount_point + - readonly + - manifest + - pvc + - pv + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + scope: Cluster + names: + plural: storages + singular: storage + kind: Storage + shortNames: + - stg \ No newline at end of file diff --git a/src/xpk/blueprints/a3ultra/storage_crd.yaml b/src/xpk/blueprints/a3ultra/storage_crd.yaml new file mode 100644 index 000000000..03c47f55a --- /dev/null +++ b/src/xpk/blueprints/a3ultra/storage_crd.yaml @@ -0,0 +1,52 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: storages.xpk.x-k8s.io +spec: + group: xpk.x-k8s.io + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + type: + type: string + cluster: + type: string + auto_mount: + type: boolean + mount_point: + type: string + readonly: + type: boolean + manifest: + type: string + pv: + type: string + pvc: + type: string + required: + - type + - cluster + - auto_mount + - mount_point + - readonly + - manifest + - pvc + - pv + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + scope: Cluster + names: + plural: storages + singular: storage + kind: Storage + shortNames: + - stg \ No newline at end of file diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index d30584101..007f43547 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -134,6 +134,7 @@ kind: JobSet metadata: name: {args.workload} + annotations: {storage_annotations} labels: kueue.x-k8s.io/queue-name: multislice-queue # Name of the LocalQueue xpk.google.com/workload: {args.workload} @@ -209,9 +210,6 @@ - name: slice-job replicas: 1 template: - metadata: - annotations: - {storage_annotations} spec: parallelism: {args.num_nodes} completions: {args.num_nodes} @@ -485,6 +483,7 @@ def workload_create(args) -> None: xpk_exit(return_code) storages: list[Storage] = get_storages_to_mount(k8s_api_client, args.storage) + xpk_print('storages:', storages) gcs_fuse_storages = list( filter(lambda storage: storage.type == GCS_FUSE_TYPE, storages) ) @@ -510,18 +509,29 @@ def workload_create(args) -> None: if system.device_type in cluster_gcluster.supported_device_types: yml_string = a3_gpu_workload_create_yaml.format( - args=args, container=container + args=args, + container=container, + service_account=XPK_SA, + storage_volumes=get_storage_volumes_yaml_for_gpu(gcs_fuse_storages), ) if args.device_type == cluster_gcluster.a3mega_device_type: sub_networks = [f'{args.cluster}-gpunet-{i}-subnet' for i in range(8)] yml_string = tcpxo_decorator.decorate_jobset(yml_string, sub_networks) + if len(gcs_fuse_storages) > 0: + yml_string = tcpxo_decorator.decorate_jobset_with_storages( + yml_string, gcs_fuse_storages + ) if args.device_type == cluster_gcluster.a3ultra_device_type: sub_networks = [f'{args.cluster}-sub-1'] + [ f'{args.cluster}-rdma-sub-{i}' for i in range(8) ] yml_string = rdma_decorator.decorate_jobset(yml_string, sub_networks) + if len(gcs_fuse_storages) > 0: + yml_string = rdma_decorator.decorate_jobset_with_storages( + yml_string, gcs_fuse_storages + ) else: yml_string = gpu_workload_create_yaml.format( args=args, diff --git a/src/xpk/core/blueprint/blueprint_generator.py b/src/xpk/core/blueprint/blueprint_generator.py index 27c8dd7de..004bed4e4 100644 --- a/src/xpk/core/blueprint/blueprint_generator.py +++ b/src/xpk/core/blueprint/blueprint_generator.py @@ -14,7 +14,6 @@ limitations under the License. """ -from logging import config import shutil from typing import Optional from ruamel import yaml @@ -200,9 +199,9 @@ def generate_a3_mega_blueprint( "config_template_vars": {"num_chips": f"{num_chips}"}, }, "jobset": {"install": True, "version": "v0.7.2"}, - "apply_manifests": [ - {"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'} - ], + "apply_manifests": [{ + "source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml' + }], }, ) @@ -588,7 +587,11 @@ def generate_a3_ultra_blueprint( "apply_manifests": [ {"source": nccl_installer_path}, {"source": mlgru_disable_path}, - {"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'} + { + "source": ( + f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml' + ) + }, ], }, ) diff --git a/src/xpk/core/storage.py b/src/xpk/core/storage.py index 485abde04..1c846bc13 100644 --- a/src/xpk/core/storage.py +++ b/src/xpk/core/storage.py @@ -194,6 +194,7 @@ def get_storages( Returns: A list of Storage objects matching the given names. """ + xpk_print("requested_storages: ", requested_storages) all_storages = list_storages(k8s_api_client) all_storage_names = {storage.name for storage in all_storages} diff --git a/src/xpk/core/workload_decorators/rdma_decorator.py b/src/xpk/core/workload_decorators/rdma_decorator.py index 290d3add5..a375ed2d4 100644 --- a/src/xpk/core/workload_decorators/rdma_decorator.py +++ b/src/xpk/core/workload_decorators/rdma_decorator.py @@ -16,6 +16,7 @@ import yaml from ...utils.yaml import literal_string +from ...core.storage import GCS_FUSE_TYPE def decorate_jobset(jobset_manifest_str, sub_networks) -> str: @@ -52,6 +53,34 @@ def decorate_jobset(jobset_manifest_str, sub_networks) -> str: return yaml.dump(manifest, sort_keys=False) +def decorate_jobset_with_storages(jobset_manifest_str, storages) -> str: + """ + Decorates a JobSet manifest with the necessary storages. + + Args: + jobset_manifest_str: The JobSet manifest as a YAML string. + + Returns: + The modified JobSet manifest as a YAML string. + """ + + manifest = yaml.safe_load(jobset_manifest_str) + + for job in manifest['spec']['replicatedJobs']: + job_manifest = job['template'] + add_storage_annotations(job_manifest, storages) + + return yaml.dump(manifest, sort_keys=False) + + +def add_storage_annotations(job_manifest, storages): + """Adds or updates storage annotations in the Pod template.""" + annotations = job_manifest['spec']['template']['metadata']['annotations'] + gcs_present = [storage.type == GCS_FUSE_TYPE for storage in storages] + if gcs_present: + annotations.update({'gke-gcsfuse/volumes': 'true'}) + + def add_annotations(job_manifest, sub_networks): """Adds or updates annotations in the Pod template.""" annotations = job_manifest['spec']['template']['metadata']['annotations'] diff --git a/src/xpk/core/workload_decorators/tcpxo_decorator.py b/src/xpk/core/workload_decorators/tcpxo_decorator.py index 641ca4a82..8bdb1bac6 100644 --- a/src/xpk/core/workload_decorators/tcpxo_decorator.py +++ b/src/xpk/core/workload_decorators/tcpxo_decorator.py @@ -16,7 +16,7 @@ import yaml from ...utils.yaml import literal_string - +from ...core.storage import GCS_FUSE_TYPE # Component version rxdm = 'v1.0.12' @@ -56,6 +56,34 @@ def decorate_jobset(jobset_manifest_str, sub_networks) -> str: return yaml.dump(manifest, sort_keys=False) +def decorate_jobset_with_storages(jobset_manifest_str, storages) -> str: + """ + Decorates a JobSet manifest with the necessary storages. + + Args: + jobset_manifest_str: The JobSet manifest as a YAML string. + + Returns: + The modified JobSet manifest as a YAML string. + """ + + manifest = yaml.safe_load(jobset_manifest_str) + + for job in manifest['spec']['replicatedJobs']: + job_manifest = job['template'] + add_storage_annotations(job_manifest, storages) + + return yaml.dump(manifest, sort_keys=False) + + +def add_storage_annotations(job_manifest, storages): + """Adds or updates storage annotations in the Pod template.""" + annotations = job_manifest['spec']['template']['metadata']['annotations'] + gcs_present = [storage.type == GCS_FUSE_TYPE for storage in storages] + if gcs_present: + annotations.update({'gke-gcsfuse/volumes': 'true'}) + + def add_annotations(job_manifest, sub_networks): """Adds or updates annotations in the Pod template.""" annotations = job_manifest['spec']['template']['metadata']['annotations'] diff --git a/src/xpk/parser/cluster.py b/src/xpk/parser/cluster.py index 7c65a67cc..066f6a45e 100644 --- a/src/xpk/parser/cluster.py +++ b/src/xpk/parser/cluster.py @@ -560,7 +560,8 @@ def add_shared_cluster_create_optional_arguments(args_parsers): action='store_true', help=( 'Enable GSCFuse driver on the cluster. This enables Workload' - ' Identity Federation. When using A3 ultra/A3 mega Workload Identity is enabled by default.' + ' Identity Federation. When using A3 ultra/A3 mega Workload' + ' Identity is enabled by default.' ), )