From 0e41cd68bf73a07c29b1d9bcc9566290c55835d5 Mon Sep 17 00:00:00 2001 From: Farhad Sharabiani Date: Sat, 28 Dec 2024 19:55:02 +0100 Subject: [PATCH] Workloads implemented for A3-Mega and A3-Ultra machines (#306) * A3Mega wokload implemented * A3Ultra workload implemented --- .dockerignore | 141 ++++++++++++++++ README.md | 40 ++++- pyproject.toml | 3 +- .../blueprints/a3mega/config-map.yaml.tftpl | 11 +- .../a3mega/kueue-xpk-configuration.yaml.tftpl | 9 +- .../blueprints/a3ultra/config-map.yaml.tftpl | 10 +- .../kueue-xpk-configuration.yaml.tftpl | 5 +- src/xpk/commands/cluster_gcluster.py | 32 +--- src/xpk/commands/workload.py | 75 +++++++-- src/xpk/core/blueprint/blueprint_generator.py | 97 ++++++----- src/xpk/core/core.py | 53 ++++-- src/xpk/core/nap.py | 2 +- src/xpk/core/tests/data/a3_mega.yaml | 20 ++- src/xpk/core/tests/data/a3_ultra.yaml | 71 ++++---- .../integration/test_gcluster_a3ultra.py | 7 +- src/xpk/core/tests/unit/test_blueprint.py | 6 +- src/xpk/core/workload_decorators/__init__.py | 15 ++ .../workload_decorators/rdma_decorator.py | 109 ++++++++++++ .../workload_decorators/tcpxo_decorator.py | 157 ++++++++++++++++++ src/xpk/utils/yaml.py | 30 ++++ 20 files changed, 742 insertions(+), 151 deletions(-) create mode 100644 .dockerignore create mode 100644 src/xpk/core/workload_decorators/__init__.py create mode 100644 src/xpk/core/workload_decorators/rdma_decorator.py create mode 100644 src/xpk/core/workload_decorators/tcpxo_decorator.py create mode 100644 src/xpk/utils/yaml.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..ecced5bbd --- /dev/null +++ b/.dockerignore @@ -0,0 +1,141 @@ +# editor and IDE paraphernalia +.idea/ + +*__pycache__* +tmp/ +.pytype +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so +bin/ +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# DS_Store files +**/.DS_Store + +# XPK/Cluster Toolkit working directory +xpkclusters/* \ No newline at end of file diff --git a/README.md b/README.md index be5238459..e9c196fdd 100644 --- a/README.md +++ b/README.md @@ -46,8 +46,10 @@ xpk supports the following TPU types: * Trillium (v6e) and the following GPU types: -* a100 -* a3 (h100) +* A100 +* A3-Highgpu (h100) +* A3-Mega (h100-mega) - [Create cluster](#provisioning-a3-ultra-and-a3-mega-clusters-gpu-machines), [Create workloads](#workloads-for-a3-ultra-and-a3-mega-clusters-gpu-machines) +* A3-Ultra (h200) - [Create cluster](#provisioning-a3-ultra-and-a3-mega-clusters-gpu-machines), [Create workloads](#workloads-for-a3-ultra-and-a3-mega-clusters-gpu-machines) and the following CPU types: * n2-standard-32 @@ -397,6 +399,26 @@ will fail the cluster creation process because Vertex AI Tensorboard is not supp --tpu-type=v5litepod-16 ``` +## Provisioning A3-Ultra and A3-Mega clusters (GPU machines) +To create a cluster with A3 machines, run the below command. To create workloads on these clusters see [here](#workloads-for-a3-ultra-and-a3-mega-clusters-gpu-machines). + * For A3-Ultra: --device-type=h200-141gb-8 + * For A3-Mega: --device-type=h100-mega-80gb-8 + + ```shell + python3 xpk.py cluster create \ + --cluster CLUSTER_NAME --device-type=h200-141gb-8 \ + --zone=$COMPUTE_ZONE --project=$PROJECT_ID \ + --num-nodes=4 --reservation=$RESERVATION_ID + ``` +Currently, the below flags/arguments are supported for A3-Mega and A3-Ultra machines: + * --num-nodes + * --default-pool-cpu-machine-type + * --default-pool-cpu-num-nodes + * --reservation + * --spot + * --on-demand (only A3-Mega) + + ## Workload Create * Workload Create (submit training job): @@ -463,6 +485,20 @@ increase this to a large number, say 50. Real jobs can be interrupted due to hardware failures and software updates. We assume your job has implemented checkpointing so the job restarts near where it was interrupted. +### Workloads for A3-Ultra and A3-Mega clusters (GPU machines) +To submit jobs on a cluster with A3 machines, run the below command. To create a cluster with A3 machines see [here](#provisioning-a3-ultra-and-a3-mega-clusters-gpu-machines). + * For A3-Ultra: --device-type=h200-141gb-8 + * For A3-Mega: --device-type=h100-mega-80gb-8 + + ```shell + python3 xpk.py workload create \ + --workload=$WORKLOAD_NAME --command="echo goodbye" \ + --cluster=$CLUSTER_NAME --device-type=h200-141gb-8 \ + --zone=$COMPUTE_ZONE --project=$PROJECT_ID \ + --num-nodes=$WOKRKLOAD_NUM_NODES + ``` +> The docker image flags/arguments introduced in [workloads section](#workload-create) can be used with A3 machines as well. + ### Workload Priority and Preemption * Set the priority level of your workload with `--priority=LEVEL` diff --git a/pyproject.toml b/pyproject.toml index 70550d18b..cddeec3c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "cloud-accelerator-diagnostics", "tabulate", "ruamel.yaml", + "pyyaml", "docker" ] @@ -62,7 +63,7 @@ dev = [ version = {attr = "xpk.core.core.__version__"} [tool.setuptools] -packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.utils", "xpk.core.blueprint"] +packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.utils", "xpk.core.blueprint", "xpk.core.workload_decorators"] package-dir = {"" = "src"} [tool.pyink] diff --git a/src/xpk/blueprints/a3mega/config-map.yaml.tftpl b/src/xpk/blueprints/a3mega/config-map.yaml.tftpl index 900d30729..bd5cf91d5 100644 --- a/src/xpk/blueprints/a3mega/config-map.yaml.tftpl +++ b/src/xpk/blueprints/a3mega/config-map.yaml.tftpl @@ -1,6 +1,15 @@ kind: ConfigMap apiVersion: v1 metadata: - name: ${name} + name: ${resource_config_name} data: h100-mega-80gb-8: "${num_nodes}" +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: ${cluster_config_name} +data: + capacity_type: "${capacity_type}" + reservation_id: "${reservation}" + provisioner: gcluster \ No newline at end of file diff --git a/src/xpk/blueprints/a3mega/kueue-xpk-configuration.yaml.tftpl b/src/xpk/blueprints/a3mega/kueue-xpk-configuration.yaml.tftpl index eae15a719..405ac155d 100644 --- a/src/xpk/blueprints/a3mega/kueue-xpk-configuration.yaml.tftpl +++ b/src/xpk/blueprints/a3mega/kueue-xpk-configuration.yaml.tftpl @@ -23,17 +23,18 @@ kind: ClusterQueue metadata: name: cluster-queue spec: - preemption: - reclaimWithinCohort: Never # Don't preempt other queues in the cohort. - withinClusterQueue: LowerPriority namespaceSelector: {} # match all. resourceGroups: - - coveredResources: ["nvidia.com/gpu"] + - coveredResources: ["nvidia.com/gpu", "cpu", "memory"] flavors: - name: 1xh100-mega-80gb-8 resources: - name: "nvidia.com/gpu" nominalQuota: ${num_chips} + - name: "cpu" + nominalQuota: 10000 + - name: "memory" + nominalQuota: 10000Gi --- apiVersion: kueue.x-k8s.io/v1beta1 kind: LocalQueue diff --git a/src/xpk/blueprints/a3ultra/config-map.yaml.tftpl b/src/xpk/blueprints/a3ultra/config-map.yaml.tftpl index eeddce911..fd5780060 100644 --- a/src/xpk/blueprints/a3ultra/config-map.yaml.tftpl +++ b/src/xpk/blueprints/a3ultra/config-map.yaml.tftpl @@ -4,4 +4,12 @@ metadata: name: ${resource_config_name} data: h200-141gb-8: "${num_nodes}" - nvidia-h200-141gb: "${num_nodes}" +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: ${cluster_config_name} +data: + capacity_type: "${capacity_type}" + reservation_id: "${reservation}" + provisioner: gcluster \ No newline at end of file diff --git a/src/xpk/blueprints/a3ultra/kueue-xpk-configuration.yaml.tftpl b/src/xpk/blueprints/a3ultra/kueue-xpk-configuration.yaml.tftpl index de68aed0a..59d4c88aa 100644 --- a/src/xpk/blueprints/a3ultra/kueue-xpk-configuration.yaml.tftpl +++ b/src/xpk/blueprints/a3ultra/kueue-xpk-configuration.yaml.tftpl @@ -23,14 +23,11 @@ kind: ClusterQueue metadata: name: cluster-queue spec: - preemption: - reclaimWithinCohort: Never # Don't preempt other queues in the cohort. - withinClusterQueue: LowerPriority namespaceSelector: {} # match all. resourceGroups: - coveredResources: ["nvidia.com/gpu", "cpu", "memory"] flavors: - - name: 1xh200-141gb-8g + - name: 1xh200-141gb-8 resources: - name: "nvidia.com/gpu" nominalQuota: ${num_chips} diff --git a/src/xpk/commands/cluster_gcluster.py b/src/xpk/commands/cluster_gcluster.py index a0536b016..26b2815d2 100644 --- a/src/xpk/commands/cluster_gcluster.py +++ b/src/xpk/commands/cluster_gcluster.py @@ -17,7 +17,7 @@ from ..core.blueprint.blueprint_generator import BlueprintGenerator, BlueprintGeneratorOutput, supported_device_types, a3mega_device_type, a3ultra_device_type from ..core.docker_manager import DockerManager from ..core.gcluster_manager import GclusterManager -from ..core.core import zone_to_region +from ..core.core import zone_to_region, get_capacity_type from ..utils.console import xpk_exit, xpk_print from ..utils.network import all_IPs_cidr from ..utils.file import ensure_directory_exists @@ -142,7 +142,11 @@ def prepare_blueprint_generator() -> BlueprintGenerator: def generate_blueprint( blueprint_name, args, prefix=None ) -> BlueprintGeneratorOutput: - validate_consumption_args(args) + capacity_type, return_code = get_capacity_type(args) + if return_code != 0: + xpk_print('Capacity type is invalid.') + xpk_exit(return_code) + bpg = prepare_blueprint_generator() if args.device_type in supported_device_types: @@ -157,9 +161,8 @@ def generate_blueprint( zone=args.zone, auth_cidr=all_IPs_cidr, num_nodes=num_nodes, - autoscaling_total_min_nodes=num_nodes, reservation=args.reservation if args.reservation else None, - spot=args.spot if args.spot else False, + capacity_type=capacity_type, system_node_pool_machine_type=args.default_pool_cpu_machine_type, system_node_pool_min_node_count=args.default_pool_cpu_num_nodes, ) @@ -173,27 +176,10 @@ def generate_blueprint( project_id=args.project, zone=args.zone, auth_cidr=all_IPs_cidr, - static_node_count=num_nodes, + num_nodes=num_nodes, reservation=args.reservation if args.reservation else None, - spot=args.spot if args.spot else False, + capacity_type=capacity_type, system_node_pool_machine_type=args.default_pool_cpu_machine_type, system_node_pool_min_node_count=args.default_pool_cpu_num_nodes, ) return None - - -def validate_consumption_args(args): - args_set = [] - if not args.reservation is None: - args_set.append('--reservation') - if not args.spot is None and args.spot: - args_set.append('--spot') - if not args.on_demand is None and args.on_demand: - args_set.append('--on-demand') - - if len(args_set) > 1: - xpk_print( - f"Error: only one of {' or '.join(args_set)} can be set at the same" - ' time.' - ) - xpk_exit(1) diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 9eed3b6fa..4ba7aa765 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -64,6 +64,8 @@ from ..utils.console import get_user_input, xpk_exit, xpk_print from ..utils.file import write_tmp_file from .cluster import set_cluster_command +from ..core.workload_decorators import tcpxo_decorator, rdma_decorator +from . import cluster_gcluster workload_create_yaml = """apiVersion: jobset.x-k8s.io/v1alpha2 kind: JobSet @@ -165,6 +167,42 @@ {container} """ +a3_gpu_workload_create_yaml = """apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + name: {args.workload} + labels: + kueue.x-k8s.io/queue-name: multislice-queue # Name of the LocalQueue + xpk.google.com/workload: {args.workload} +spec: + failurePolicy: + maxRestarts: {args.max_restarts} + replicatedJobs: + - name: slice-job + replicas: 1 + template: + spec: + parallelism: {args.num_nodes} + completions: {args.num_nodes} + backoffLimit: 0 # When any pod fails, the job is failed + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + annotations: + kueue.x-k8s.io/podset-preferred-topology: "cloud.google.com/gce-topology-host" + spec: + priorityClassName: {args.priority} + restartPolicy: Never + dnsPolicy: ClusterFirstWithHostNet + terminationGracePeriodSeconds: {args.termination_grace_period_seconds} + tolerations: + - operator: "Exists" + key: nvidia.com/gpu + containers: + {container} +""" + pw_workload_create_yaml = """apiVersion: jobset.x-k8s.io/v1alpha2 kind: JobSet metadata: @@ -418,17 +456,32 @@ def workload_create(args) -> None: if return_code != 0: xpk_exit(return_code) - yml_string = gpu_workload_create_yaml.format( - args=args, - container=container, - command=args.command, - chips_per_vm=system.chips_per_vm, - gpu_scheduler=gpu_scheduler, - gpu_volume=get_gpu_volume(system), - gpu_rxdm_image=get_gpu_rxdm_image(system), - gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), - gpu_tcp_volume=get_gpu_tcp_volume(system), - ) + if system.device_type in cluster_gcluster.supported_device_types: + yml_string = a3_gpu_workload_create_yaml.format( + args=args, container=container + ) + + if args.device_type == cluster_gcluster.a3mega_device_type: + sub_networks = [f'{args.cluster}-gpunet-{i}-subnet' for i in range(8)] + yml_string = tcpxo_decorator.decorate_jobset(yml_string, sub_networks) + + if args.device_type == cluster_gcluster.a3ultra_device_type: + sub_networks = [f'{args.cluster}-sub-1'] + [ + f'{args.cluster}-rdma-sub-{i}' for i in range(8) + ] + yml_string = rdma_decorator.decorate_jobset(yml_string, sub_networks) + else: + yml_string = gpu_workload_create_yaml.format( + args=args, + container=container, + command=args.command, + chips_per_vm=system.chips_per_vm, + gpu_scheduler=gpu_scheduler, + gpu_volume=get_gpu_volume(system), + gpu_rxdm_image=get_gpu_rxdm_image(system), + gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), + gpu_tcp_volume=get_gpu_tcp_volume(system), + ) elif args.use_pathways and ensure_pathways_workload_prerequisites( args, system ): diff --git a/src/xpk/core/blueprint/blueprint_generator.py b/src/xpk/core/blueprint/blueprint_generator.py index f900ae4be..1200a7e77 100644 --- a/src/xpk/core/blueprint/blueprint_generator.py +++ b/src/xpk/core/blueprint/blueprint_generator.py @@ -23,11 +23,12 @@ from ..system_characteristics import get_system_characteristics_by_device_type from ...utils.console import xpk_print, xpk_exit from ...utils.file import ensure_directory_exists +from ..core import CapacityType, h100_mega_device_type, h200_device_type yaml = yaml.YAML() -a3mega_device_type = "h100-mega-80gb-8" -a3ultra_device_type = "h200-141gb-8" +a3mega_device_type = h100_mega_device_type +a3ultra_device_type = h200_device_type supported_device_types = {a3mega_device_type, a3ultra_device_type} blueprint_dependencies_dir = { a3mega_device_type: "src/xpk/blueprints/a3mega", @@ -73,10 +74,9 @@ def generate_a3_mega_blueprint( primary_vpc_name: str = "network1", gpu_subnets_name: str = "gpunets", group_placement_max_distance: int = 2, - autoscaling_total_min_nodes: int = 2, subnetwork_cidr_suffix: int = 24, reservation: str | None = None, - spot: bool = False, + capacity_type: CapacityType = CapacityType.ON_DEMAND, system_node_pool_min_node_count: int = 2, ) -> BlueprintGeneratorOutput: """Create A3 mega blueprint and directory containing its dependencies. @@ -125,6 +125,7 @@ def generate_a3_mega_blueprint( source="modules/scheduler/gke-cluster", use=[primary_vpc_name, gpu_subnets_name], settings={ + "release_channel": "RAPID", "prefix_with_deployment_name": False, "name_suffix": cluster_name, "enable_private_endpoint": False, @@ -171,13 +172,14 @@ def generate_a3_mega_blueprint( settings={ "name": f"{cluster_name}-a3-megagpu-pool-0", "machine_type": system.gce_machine_type, - "autoscaling_total_min_nodes": autoscaling_total_min_nodes, - "initial_node_count": num_nodes, + "static_node_count": num_nodes, "zones": [zone], "host_maintenance_interval": "PERIODIC", "reservation_affinity": reservation_affinity, "run_workload_script": False, - "spot": spot, + "spot": capacity_type == CapacityType.SPOT, + "max_pods_per_node": 32, + "auto_upgrade": True, }, outputs=["instructions"], ) @@ -189,7 +191,7 @@ def generate_a3_mega_blueprint( settings={ "kueue": { "install": True, - "version": "v0.9.1", # TAS feature-gates is enabled in CT + "version": "v0.10.0", # TAS feature-gates is enabled in CT "config_path": f'$(ghpc_stage("{blueprint_name}"))/kueue-xpk-configuration.yaml.tftpl', "config_template_vars": {"num_chips": f"{num_chips}"}, }, @@ -207,8 +209,13 @@ def generate_a3_mega_blueprint( f'$(ghpc_stage("{blueprint_name}"))/config-map.yaml.tftpl' ), "template_vars": { - "name": "xpk-gke-a3-megagpu-resources-configmap", + "resource_config_name": ( + f"{cluster_name}-resources-configmap" + ), "num_nodes": f"{num_nodes}", + "cluster_config_name": f"{cluster_name}-metadata-configmap", + "capacity_type": f"{capacity_type.value}", + "reservation": f"{reservation}", }, }] }, @@ -382,11 +389,11 @@ def generate_a3_ultra_blueprint( auth_cidr: str, system_node_pool_machine_type: str, reservation: Optional[str | None] = None, - static_node_count: int = 4, + num_nodes: int = 2, prefix: str = "", mtu_size: int = 8896, system_node_pool_min_node_count: int = 2, - spot: bool = False, + capacity_type: CapacityType = CapacityType.ON_DEMAND, ) -> BlueprintGeneratorOutput: """Create A3 ultra blueprint. @@ -399,25 +406,26 @@ def generate_a3_ultra_blueprint( f'$(ghpc_stage("{blueprint_name}"))/nccl-installer.yaml' ) mlgru_disable_path = f'$(ghpc_stage("{blueprint_name}"))/mlgru-disable.yaml' - net_0_id = f"{cluster_name}-a3u-net-0" + net_0_id = f"{cluster_name}-net-0" gpu_net_0 = DeploymentModule( id=net_0_id, - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/network/vpc?ref=e0c690b", + source="modules/network/vpc", settings={ - "network_name": f"{cluster_name}-a3u-net-0", + "network_name": f"{cluster_name}-net-0", "subnetworks": [{ - "subnet_name": f"{cluster_name}-a3u-sub-0", + "subnet_name": f"{cluster_name}-sub-0", "subnet_region": region, "subnet_ip": "192.168.0.0/18", }], - "secondary_ranges": { - f"{cluster_name}-a3u-sub-0": [ + "secondary_ranges_list": [{ + "subnetwork_name": f"{cluster_name}-sub-0", + "ranges": [ {"range_name": "pods", "ip_cidr_range": "10.4.0.0/14"}, {"range_name": "services", "ip_cidr_range": "10.0.32.0/20"}, - ] - }, + ], + }], "firewall_rules": [{ - "name": f"{cluster_name}-a3u-internal-0", + "name": f"{cluster_name}-internal-0", "ranges": ["192.168.0.0/16"], "allow": [ {"protocol": "tcp", "ports": ["0-65535"]}, @@ -427,20 +435,20 @@ def generate_a3_ultra_blueprint( }], }, ) - net_1_id = f"{cluster_name}-a3u-net-1" + net_1_id = f"{cluster_name}-net-1" gpu_net_1 = DeploymentModule( id=net_1_id, - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/network/vpc?ref=e0c690b", + source="modules/network/vpc", settings={ - "network_name": f"{cluster_name}-a3u-net-1", + "network_name": f"{cluster_name}-net-1", "mtu": mtu_size, "subnetworks": [{ - "subnet_name": f"{cluster_name}-a3u-sub-1", + "subnet_name": f"{cluster_name}-sub-1", "subnet_region": region, "subnet_ip": "192.168.64.0/18", }], "firewall_rules": [{ - "name": f"{cluster_name}-a3u-internal-1", + "name": f"{cluster_name}-internal-1", "ranges": ["192.168.0.0/16"], "allow": [ {"protocol": "tcp", "ports": ["0-65535"]}, @@ -450,17 +458,17 @@ def generate_a3_ultra_blueprint( }], }, ) - rma_net_id = f"{cluster_name}-a3u-rdma-net" + rma_net_id = f"{cluster_name}-rdma-net" rma_net = DeploymentModule( id=rma_net_id, - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//community/modules/network/rdma-vpc?ref=98c49fe", + source="modules/network/gpu-rdma-vpc", settings={ - "network_name": f"{cluster_name}-a3u-rdma-net", + "network_name": f"{cluster_name}-rdma-net", "mtu": mtu_size, "network_profile": f"https://www.googleapis.com/compute/beta/projects/{project_id}/global/networkProfiles/{zone}-vpc-roce", "network_routing_mode": "REGIONAL", "subnetworks_template": { - "name_prefix": f"{cluster_name}-a3u-rdma-sub", + "name_prefix": f"{cluster_name}-rdma-sub", "count": 8, "ip_range": "192.168.128.0/18", "region": region, @@ -470,7 +478,7 @@ def generate_a3_ultra_blueprint( cluster_id = f"{cluster_name}-a3-ultragpu-cluster" a3_ultra_cluster = DeploymentModule( id=cluster_id, - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/scheduler/gke-cluster?ref=e0c690b", + source="modules/scheduler/gke-cluster", use=[net_0_id], settings={ "release_channel": "RAPID", @@ -489,14 +497,14 @@ def generate_a3_ultra_blueprint( "total_max_nodes": 1000, }, "additional_networks": ( - f"$(concat([{{network={cluster_name}-a3u-net-1.network_name," - f" subnetwork={cluster_name}-a3u-net-1.subnetwork_name," + f"$(concat([{{network={cluster_name}-net-1.network_name," + f" subnetwork={cluster_name}-net-1.subnetwork_name," f' subnetwork_project="{project_id}", nic_type="GVNIC",' " queue_count=null, network_ip=null, stack_type=null," " access_config=[{nat_ip=null, public_ptr_domain_name=null," " network_tier=null}], ipv6_access_config=[]," " alias_ip_range=[]}]," - f" {cluster_name}-a3u-rdma-net.subnetwork_interfaces_gke))" + f" {cluster_name}-rdma-net.subnetwork_interfaces_gke))" ), }, outputs=["instructions"], @@ -510,14 +518,14 @@ def generate_a3_ultra_blueprint( xpk_exit(1) gpu_pool = DeploymentModule( id=f"{cluster_name}-a3u-pool", - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/compute/gke-node-pool?ref=e0c690b", + source="modules/compute/gke-node-pool", use=[cluster_id], settings={ "machine_type": system.gce_machine_type, "auto_upgrade": True, "zones": [zone], - "static_node_count": static_node_count, - "spot": spot, + "static_node_count": num_nodes, + "spot": capacity_type == CapacityType.SPOT, "max_pods_per_node": 32, "guest_accelerator": [{ "type": "nvidia-h200-141gb", @@ -527,14 +535,14 @@ def generate_a3_ultra_blueprint( }, }], "additional_networks": ( - f"$(concat([{{network={cluster_name}-a3u-net-1.network_name," - f" subnetwork={cluster_name}-a3u-net-1.subnetwork_name," + f"$(concat([{{network={cluster_name}-net-1.network_name," + f" subnetwork={cluster_name}-net-1.subnetwork_name," f' subnetwork_project="{project_id}", nic_type="GVNIC",' " queue_count=null, network_ip=null, stack_type=null," " access_config=[{nat_ip=null, public_ptr_domain_name=null," " network_tier=null}], ipv6_access_config=[]," " alias_ip_range=[]}]," - f" {cluster_name}-a3u-rdma-net.subnetwork_interfaces_gke))" + f" {cluster_name}-rdma-net.subnetwork_interfaces_gke))" ), }, outputs=["instructions"], @@ -545,16 +553,16 @@ def generate_a3_ultra_blueprint( "specific_reservations": [{"name": reservation}], } - num_chips = static_node_count * system.chips_per_vm + num_chips = num_nodes * system.chips_per_vm workload_manager_install_id = "workload-manager-install" workload_manager_install = DeploymentModule( id=workload_manager_install_id, - source="github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/management/kubectl-apply?ref=e0c690b", + source="modules/management/kubectl-apply", use=[cluster_id], settings={ "kueue": { "install": True, - "version": "v0.9.1", # TAS feature-gates is enabled in CT + "version": "v0.10.0", # TAS feature-gates is enabled in CT "config_path": f'$(ghpc_stage("{blueprint_name}"))/kueue-xpk-configuration.yaml.tftpl', "config_template_vars": {"num_chips": f"{num_chips}"}, }, @@ -579,7 +587,10 @@ def generate_a3_ultra_blueprint( "resource_config_name": ( f"{cluster_name}-resources-configmap" ), - "num_nodes": f"{static_node_count}", + "num_nodes": f"{num_nodes}", + "cluster_config_name": f"{cluster_name}-metadata-configmap", + "capacity_type": f"{capacity_type.value}", + "reservation": f"{reservation}", }, }] }, diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index c734ef4ff..70ccf9c29 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -65,6 +65,7 @@ h100_device_type = 'h100-80gb-8' h100_mega_device_type = 'h100-mega-80gb-8' +h200_device_type = 'h200-141gb-8' JOBSET_VERSION = 'v0.7.2' @@ -814,6 +815,18 @@ def get_cluster_configmap(args, configmap_name) -> dict[str, str] | None: return config_map +def get_cluster_provisioner(args) -> str: + metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' + cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_provisioner = 'gcloud' + if not cluster_config_map is None: + provisioner = cluster_config_map.get('provisioner') + if not provisioner is None: + cluster_provisioner = provisioner + xpk_print(f'Cluster provisioner: {cluster_provisioner}') + return cluster_provisioner + + def create_vertex_tensorboard(args) -> dict: """Creates a Tensorboard instance in Vertex AI. @@ -1772,7 +1785,7 @@ def check_if_workload_can_schedule(args, system: SystemCharacteristics) -> bool: # Check for gke accelerator type: missing_gke_accelerator_type = False - if system.gke_accelerator not in cluster_config_map: + if not cluster_config_map.get(system.gke_accelerator): xpk_print( f'Gke Accelerator Type Check: {args.workload} is requesting' f' {system.gke_accelerator} but cluster only contains' @@ -1960,7 +1973,6 @@ def get_main_container(args, system, docker_image, resource_type) -> str: gpu_workload_terminate_command = '' if system.accelerator_type == AcceleratorType['GPU']: - command = 'cd /deps && bash gpu_multi_process_run.sh' gpu_workload_terminate_command = ( 'echo Main app is done > /usr/share/workload/workload_terminated; ' ) @@ -2019,9 +2031,13 @@ def get_main_container(args, system, docker_image, resource_type) -> str: resources: limits: {resources} +""" + volume_mounts = get_volume_mounts(args, system) + if volume_mounts != '': + yaml += """ volumeMounts: {volume_mounts} - """ +""" return yaml.format( args=args, system=system, @@ -2037,7 +2053,7 @@ def get_main_container(args, system, docker_image, resource_type) -> str: gpu_workload_terminate_command=gpu_workload_terminate_command, xpk_internal_commands=xpk_internal_commands, resources=get_main_container_resources(args, system, resource_type), - volume_mounts=get_volume_mounts(args, system), + volume_mounts=volume_mounts, xpk_return_user_exit_code=xpk_return_user_exit_code, ) @@ -2148,13 +2164,11 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str: mountPath: /dev/shm - name: workload-terminated-volume mountPath: /usr/share/workload""" - elif system.device_type == h100_mega_device_type: - volume_mount_yaml = """- name: nvidia-install-dir-host - mountPath: /usr/local/nvidia/lib64 - - name: shared-memory - mountPath: /dev/shm - - name: workload-terminated-volume - mountPath: /usr/share/workload""" + elif ( + system.device_type == h100_mega_device_type + or system.device_type == h200_device_type + ): + volume_mount_yaml = '' return volume_mount_yaml @@ -2246,15 +2260,22 @@ def get_env_container(args, system: SystemCharacteristics): value: "{system.chips_per_vm}" - name: JAX_COORDINATOR_PORT value: "6002" - - name: LD_LIBRARY_PATH - value: /usr/local/nvidia/lib64 - name: COMMAND value: "{args.command}" {args.env}""" + if system.accelerator_type == AcceleratorType['GPU']: - gpu_direct_name = ( - 'tcpx' if args.device_type == h100_device_type else 'fastrak' - ) + gpu_direct_name = 'fastrak' + if args.device_type == h100_device_type: + gpu_direct_name = 'tcpx' + gpu_env_yaml += """ + - name: LD_LIBRARY_PATH + value: /usr/local/nvidia/lib64 +""" + elif args.device_type == h100_mega_device_type: + gpu_direct_name = 'tcpxo' + elif args.device_type == h200_device_type: + gpu_direct_name = 'rdma' return gpu_env_yaml.format( args=args, system=system, gpu_direct_name=gpu_direct_name ) diff --git a/src/xpk/core/nap.py b/src/xpk/core/nap.py index 161a93d51..0714e090d 100644 --- a/src/xpk/core/nap.py +++ b/src/xpk/core/nap.py @@ -282,7 +282,7 @@ def is_autoprovisioning_enabled( xpk_print( 'Error: Autoprovisioning not enabled but should be so exiting xpk.' f' Value should be {AUTOPROVISIONING_CONFIG_VALUE} but instead found' - f' value of {cluster_config_map[system.accelerator_type]}' + f' value of {autoprovisioning_value}' ) return False, 1 diff --git a/src/xpk/core/tests/data/a3_mega.yaml b/src/xpk/core/tests/data/a3_mega.yaml index 1d1c81f9d..c3a50eb21 100644 --- a/src/xpk/core/tests/data/a3_mega.yaml +++ b/src/xpk/core/tests/data/a3_mega.yaml @@ -50,6 +50,7 @@ deployment_groups: source: modules/scheduler/gke-cluster use: [network1, gpunets] settings: + release_channel: RAPID prefix_with_deployment_name: false name_suffix: bar enable_private_endpoint: false @@ -75,16 +76,17 @@ deployment_groups: settings: name: bar-a3-megagpu-pool-0 machine_type: a3-megagpu-8g - autoscaling_total_min_nodes: 2 - initial_node_count: 2 + static_node_count: 2 zones: [us-central1-c] host_maintenance_interval: PERIODIC reservation_affinity: consume_reservation_type: SPECIFIC_RESERVATION specific_reservations: - name: test-reservation - run_workload_script: False - spot: True + run_workload_script: false + spot: false + max_pods_per_node : 32 + auto_upgrade: true outputs: [instructions] - !DeploymentModule @@ -94,7 +96,7 @@ deployment_groups: settings: kueue: install: true - version: "v0.9.1" + version: "v0.10.0" config_path: $(ghpc_stage("xpk-gke-a3-megagpu"))/kueue-xpk-configuration.yaml.tftpl config_template_vars: {num_chips: "16"} jobset: @@ -107,4 +109,10 @@ deployment_groups: settings: apply_manifests: - source: $(ghpc_stage("xpk-gke-a3-megagpu"))/config-map.yaml.tftpl - template_vars: {name: "xpk-gke-a3-megagpu-resources-configmap", num_nodes: "2"} \ No newline at end of file + template_vars: { + resource_config_name: "bar-resources-configmap", + num_nodes: "2", + cluster_config_name: "bar-metadata-configmap", + capacity_type: "reservation", + reservation: "test-reservation", + } \ No newline at end of file diff --git a/src/xpk/core/tests/data/a3_ultra.yaml b/src/xpk/core/tests/data/a3_ultra.yaml index 5c5ad1baa..869f557de 100644 --- a/src/xpk/core/tests/data/a3_ultra.yaml +++ b/src/xpk/core/tests/data/a3_ultra.yaml @@ -21,22 +21,23 @@ deployment_groups: group: primary modules: - !DeploymentModule - id: gke-a3-ultra-a3u-net-0 - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/network/vpc?ref=e0c690b + id: gke-a3-ultra-net-0 + source: modules/network/vpc settings: - network_name: gke-a3-ultra-a3u-net-0 + network_name: gke-a3-ultra-net-0 subnetworks: - - subnet_name: gke-a3-ultra-a3u-sub-0 + - subnet_name: gke-a3-ultra-sub-0 subnet_region: us-central1 subnet_ip: 192.168.0.0/18 - secondary_ranges: - gke-a3-ultra-a3u-sub-0: - - range_name: pods - ip_cidr_range: 10.4.0.0/14 - - range_name: services - ip_cidr_range: 10.0.32.0/20 + secondary_ranges_list: + - subnetwork_name : gke-a3-ultra-sub-0 + ranges: + - range_name: pods + ip_cidr_range: 10.4.0.0/14 + - range_name: services + ip_cidr_range: 10.0.32.0/20 firewall_rules: - - name: gke-a3-ultra-a3u-internal-0 + - name: gke-a3-ultra-internal-0 ranges: [192.168.0.0/16] allow: - protocol: tcp @@ -46,17 +47,17 @@ deployment_groups: - protocol: icmp - !DeploymentModule - id: gke-a3-ultra-a3u-net-1 - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/network/vpc?ref=e0c690b + id: gke-a3-ultra-net-1 + source: modules/network/vpc settings: - network_name: gke-a3-ultra-a3u-net-1 + network_name: gke-a3-ultra-net-1 mtu: 8896 subnetworks: - - subnet_name: gke-a3-ultra-a3u-sub-1 + - subnet_name: gke-a3-ultra-sub-1 subnet_region: us-central1 subnet_ip: 192.168.64.0/18 firewall_rules: - - name: gke-a3-ultra-a3u-internal-1 + - name: gke-a3-ultra-internal-1 ranges: [192.168.0.0/16] allow: - protocol: tcp @@ -66,23 +67,23 @@ deployment_groups: - protocol: icmp - !DeploymentModule - id: gke-a3-ultra-a3u-rdma-net - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//community/modules/network/rdma-vpc?ref=98c49fe + id: gke-a3-ultra-rdma-net + source: modules/network/gpu-rdma-vpc settings: - network_name: gke-a3-ultra-a3u-rdma-net + network_name: gke-a3-ultra-rdma-net mtu: 8896 network_profile: https://www.googleapis.com/compute/beta/projects/foo/global/networkProfiles/us-central1-c-vpc-roce network_routing_mode: REGIONAL subnetworks_template: - name_prefix: gke-a3-ultra-a3u-rdma-sub + name_prefix: gke-a3-ultra-rdma-sub count: 8 ip_range: 192.168.128.0/18 region: us-central1 - !DeploymentModule id: gke-a3-ultra-a3-ultragpu-cluster - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/scheduler/gke-cluster?ref=e0c690b - use: [gke-a3-ultra-a3u-net-0] + source: modules/scheduler/gke-cluster + use: [gke-a3-ultra-net-0] settings: release_channel: "RAPID" prefix_with_deployment_name: false @@ -97,18 +98,18 @@ deployment_groups: system_node_pool_node_count: total_min_nodes: 2 total_max_nodes: 1000 - additional_networks: $(concat([{network=gke-a3-ultra-a3u-net-1.network_name, subnetwork=gke-a3-ultra-a3u-net-1.subnetwork_name, subnetwork_project="foo", nic_type="GVNIC", queue_count=null, network_ip=null, stack_type=null, access_config=[{nat_ip=null, public_ptr_domain_name=null, network_tier=null}], ipv6_access_config=[], alias_ip_range=[]}], gke-a3-ultra-a3u-rdma-net.subnetwork_interfaces_gke)) + additional_networks: $(concat([{network=gke-a3-ultra-net-1.network_name, subnetwork=gke-a3-ultra-net-1.subnetwork_name, subnetwork_project="foo", nic_type="GVNIC", queue_count=null, network_ip=null, stack_type=null, access_config=[{nat_ip=null, public_ptr_domain_name=null, network_tier=null}], ipv6_access_config=[], alias_ip_range=[]}], gke-a3-ultra-rdma-net.subnetwork_interfaces_gke)) outputs: [instructions] - !DeploymentModule id: gke-a3-ultra-a3u-pool - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/compute/gke-node-pool?ref=e0c690b + source: modules/compute/gke-node-pool use: [gke-a3-ultra-a3-ultragpu-cluster] settings: machine_type: a3-ultragpu-8g auto_upgrade: true zones: [us-central1-c] - static_node_count: 4 + static_node_count: 2 spot: false max_pods_per_node: 32 guest_accelerator: @@ -117,24 +118,24 @@ deployment_groups: gpu_driver_installation_config: gpu_driver_version: "LATEST" additional_networks: - $(concat([{network=gke-a3-ultra-a3u-net-1.network_name, subnetwork=gke-a3-ultra-a3u-net-1.subnetwork_name, subnetwork_project="foo", nic_type="GVNIC", queue_count=null, network_ip=null, stack_type=null, access_config=[{nat_ip=null, public_ptr_domain_name=null, network_tier=null}], ipv6_access_config=[], alias_ip_range=[]}], gke-a3-ultra-a3u-rdma-net.subnetwork_interfaces_gke)) + $(concat([{network=gke-a3-ultra-net-1.network_name, subnetwork=gke-a3-ultra-net-1.subnetwork_name, subnetwork_project="foo", nic_type="GVNIC", queue_count=null, network_ip=null, stack_type=null, access_config=[{nat_ip=null, public_ptr_domain_name=null, network_tier=null}], ipv6_access_config=[], alias_ip_range=[]}], gke-a3-ultra-rdma-net.subnetwork_interfaces_gke)) reservation_affinity: consume_reservation_type: SPECIFIC_RESERVATION specific_reservations: - - name: test_reservation + - name: test-reservation outputs: [instructions] - !DeploymentModule id: workload-manager-install - source: github.com/GoogleCloudPlatform/cluster-toolkit.git//modules/management/kubectl-apply?ref=e0c690b + source: modules/management/kubectl-apply use: [gke-a3-ultra-a3-ultragpu-cluster] settings: kueue: install: true - version: v0.9.1 # TAS feature-gates is enabled in CT + version: v0.10.0 # TAS feature-gates is enabled in CT config_path: $(ghpc_stage("xpk-gke-a3-ultra"))/kueue-xpk-configuration.yaml.tftpl config_template_vars: - num_chips: "32" + num_chips: "16" jobset: install: true version: v0.7.1 @@ -149,7 +150,11 @@ deployment_groups: settings: apply_manifests: - source: '$(ghpc_stage("xpk-gke-a3-ultra"))/config-map.yaml.tftpl' - template_vars: - resource_config_name: 'gke-a3-ultra-resources-configmap' - num_nodes: '4' + template_vars: { + resource_config_name: "gke-a3-ultra-resources-configmap", + num_nodes: "2", + cluster_config_name: "gke-a3-ultra-metadata-configmap", + capacity_type: "reservation", + reservation: "test-reservation", + } diff --git a/src/xpk/core/tests/integration/test_gcluster_a3ultra.py b/src/xpk/core/tests/integration/test_gcluster_a3ultra.py index a945b11ca..9687bea61 100644 --- a/src/xpk/core/tests/integration/test_gcluster_a3ultra.py +++ b/src/xpk/core/tests/integration/test_gcluster_a3ultra.py @@ -18,6 +18,7 @@ from xpk.core.docker_manager import DockerManager from xpk.core.gcluster_manager import GclusterManager from xpk.core.blueprint.blueprint_generator import BlueprintGenerator +from xpk.core.core import CapacityType import pytest import os import shutil @@ -74,7 +75,7 @@ def test_create_a3_ultra_deployment_files(setup_tests): auth_cidr=auth_cidr, zone=zone, reservation="foo", - static_node_count=1, + num_nodes=1, system_node_pool_machine_type="e2-standard-16", prefix=prefix, ) @@ -138,8 +139,8 @@ def test_create_a3_ultra_deployment(setup_tests): project_id=project_id, auth_cidr=auth_cidr, zone=zone, - spot=True, - static_node_count=1, + capacity_type=CapacityType.SPOT, + num_nodes=1, system_node_pool_machine_type="e2-standard-16", ) blueprint_test_path = os.path.join(bp_path, f"{blueprint_name}.yaml") diff --git a/src/xpk/core/tests/unit/test_blueprint.py b/src/xpk/core/tests/unit/test_blueprint.py index f004ce0f9..184480264 100644 --- a/src/xpk/core/tests/unit/test_blueprint.py +++ b/src/xpk/core/tests/unit/test_blueprint.py @@ -17,6 +17,7 @@ import shutil from xpk.core.blueprint.blueprint_generator import BlueprintGenerator from xpk.core.blueprint.blueprint_definitions import Blueprint +from xpk.core.core import CapacityType import ruamel.yaml import os @@ -50,7 +51,7 @@ def test_generate_a3_mega_blueprint(): zone="us-central1-c", auth_cidr="10.0.0.0/32", reservation="test-reservation", - spot=True, + capacity_type=CapacityType.RESERVATION, system_node_pool_min_node_count=5, ) @@ -88,8 +89,9 @@ def test_generate_a3_ultra_blueprint(): region="us-central1", zone="us-central1-c", auth_cidr="10.0.0.0/32", - reservation="test_reservation", + reservation="test-reservation", system_node_pool_machine_type="e2-standard-16", + capacity_type=CapacityType.RESERVATION, ) with open(a3_ultra_yaml_test_path, encoding="utf-8") as stream: ctk_yaml = yaml.load(stream) diff --git a/src/xpk/core/workload_decorators/__init__.py b/src/xpk/core/workload_decorators/__init__.py new file mode 100644 index 000000000..e7c0b7144 --- /dev/null +++ b/src/xpk/core/workload_decorators/__init__.py @@ -0,0 +1,15 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/src/xpk/core/workload_decorators/rdma_decorator.py b/src/xpk/core/workload_decorators/rdma_decorator.py new file mode 100644 index 000000000..290d3add5 --- /dev/null +++ b/src/xpk/core/workload_decorators/rdma_decorator.py @@ -0,0 +1,109 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import yaml +from ...utils.yaml import literal_string + + +def decorate_jobset(jobset_manifest_str, sub_networks) -> str: + """ + Decorates a JobSet manifest with the necessary components for rdma-daemon. + + Args: + jobset_manifest_str: The JobSet manifest as a YAML string. + + Returns: + The modified JobSet manifest as a YAML string. + """ + + manifest = yaml.safe_load(jobset_manifest_str) + + for job in manifest['spec']['replicatedJobs']: + job_manifest = job['template'] + job_manifest.setdefault('spec', {}).setdefault('template', {}).setdefault( + 'metadata', {} + ).setdefault('annotations', {}) + spec = ( + job_manifest.setdefault('spec', {}) + .setdefault('template', {}) + .setdefault('spec', {}) + ) + spec.setdefault('tolerations', []) + spec.setdefault('volumes', []) + + add_annotations(job_manifest, sub_networks) + add_volumes(job_manifest) + add_tolerations(job_manifest) + update_gpu_containers(job_manifest) + + return yaml.dump(manifest, sort_keys=False) + + +def add_annotations(job_manifest, sub_networks): + """Adds or updates annotations in the Pod template.""" + annotations = job_manifest['spec']['template']['metadata']['annotations'] + interfaces = [ + '[', + ' {"interfaceName":"eth0","network":"default"},', + *[ + f' {{"interfaceName":"eth{i + 1}","network":"{sub_networks[i]}"}}{"," if i<8 else ""}' + for i in range(9) + ], + ']', + ] + annotations.update({ + 'networking.gke.io/default-interface': 'eth0', + 'networking.gke.io/interfaces': literal_string('\n'.join(interfaces)), + }) + + +def add_volumes(job_manifest): + """Adds volumes to the Pod spec.""" + volumes = job_manifest['spec']['template']['spec']['volumes'] + volumes.append({ + 'name': 'library-dir-host', + 'hostPath': {'path': '/home/kubernetes/bin/nvidia'}, + }) + volumes.append( + {'name': 'gib', 'hostPath': {'path': '/home/kubernetes/bin/gib'}} + ) + + +def add_tolerations(job_manifest): + """Adds tolerations to the Pod spec.""" + tolerations = job_manifest['spec']['template']['spec']['tolerations'] + tolerations.append({ + 'key': 'user-workload', + 'operator': 'Equal', + 'value': 'true', + 'effect': 'NoSchedule', + }) + + +def update_gpu_containers(job_manifest): + for container in job_manifest['spec']['template']['spec']['containers']: + if 'nvidia.com/gpu' in container.get('resources', {}).get('limits', {}): + container.setdefault('env', []) + container['env'].append( + {'name': 'LD_LIBRARY_PATH', 'value': '/usr/local/nvidia/lib64'} + ) + container.setdefault('volumeMounts', []) + container['volumeMounts'].append( + {'name': 'library-dir-host', 'mountPath': '/usr/local/nvidia'} + ) + container['volumeMounts'].append( + {'name': 'gib', 'mountPath': '/usr/local/gib'} + ) diff --git a/src/xpk/core/workload_decorators/tcpxo_decorator.py b/src/xpk/core/workload_decorators/tcpxo_decorator.py new file mode 100644 index 000000000..641ca4a82 --- /dev/null +++ b/src/xpk/core/workload_decorators/tcpxo_decorator.py @@ -0,0 +1,157 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import yaml +from ...utils.yaml import literal_string + +# Component version +rxdm = 'v1.0.12' + + +def decorate_jobset(jobset_manifest_str, sub_networks) -> str: + """ + Decorates a JobSet manifest with the necessary components for tcpxo-daemon. + + Args: + jobset_manifest_str: The JobSet manifest as a YAML string. + + Returns: + The modified JobSet manifest as a YAML string. + """ + + manifest = yaml.safe_load(jobset_manifest_str) + + for job in manifest['spec']['replicatedJobs']: + job_manifest = job['template'] + job_manifest.setdefault('spec', {}).setdefault('template', {}).setdefault( + 'metadata', {} + ).setdefault('annotations', {}) + spec = ( + job_manifest.setdefault('spec', {}) + .setdefault('template', {}) + .setdefault('spec', {}) + ) + spec.setdefault('tolerations', []) + spec.setdefault('volumes', []) + + add_annotations(job_manifest, sub_networks) + add_volumes(job_manifest) + add_tolerations(job_manifest) + add_tcpxo_daemon_container(job_manifest) + update_gpu_containers(job_manifest) + + return yaml.dump(manifest, sort_keys=False) + + +def add_annotations(job_manifest, sub_networks): + """Adds or updates annotations in the Pod template.""" + annotations = job_manifest['spec']['template']['metadata']['annotations'] + interfaces = [ + '[', + ' {"interfaceName":"eth0","network":"default"},', + *[ + f' {{"interfaceName":"eth{i + 1}","network":"{sub_networks[i]}"}}{"," if i<7 else ""}' + for i in range(8) + ], + ']', + ] + annotations.update({ + 'devices.gke.io/container.tcpxo-daemon': literal_string( + '- path: /dev/nvidia0\n' + '- path: /dev/nvidia1\n' + '- path: /dev/nvidia2\n' + '- path: /dev/nvidia3\n' + '- path: /dev/nvidia4\n' + '- path: /dev/nvidia5\n' + '- path: /dev/nvidia6\n' + '- path: /dev/nvidia7\n' + '- path: /dev/nvidiactl\n' + '- path: /dev/nvidia-uvm\n' + '- path: /dev/dmabuf_import_helper\n' + ), + 'networking.gke.io/default-interface': 'eth0', + 'networking.gke.io/interfaces': literal_string('\n'.join(interfaces)), + }) + + +def add_tolerations(job_manifest): + """Adds tolerations to the Pod spec.""" + tolerations = job_manifest['spec']['template']['spec']['tolerations'] + tolerations.append({ + 'key': 'user-workload', + 'operator': 'Equal', + 'value': 'true', + 'effect': 'NoSchedule', + }) + + +def add_volumes(job_manifest): + """Adds volumes to the Pod spec.""" + volumes = job_manifest['spec']['template']['spec']['volumes'] + volumes.append({ + 'name': 'libraries', + 'hostPath': {'path': '/home/kubernetes/bin/nvidia/lib64'}, + }) + volumes.append({'name': 'sys', 'hostPath': {'path': '/sys'}}) + volumes.append({'name': 'proc-sys', 'hostPath': {'path': '/proc/sys'}}) + volumes.append({ + 'name': 'aperture-devices', + 'hostPath': {'path': '/dev/aperture_devices'}, + }) + + +def add_tcpxo_daemon_container(job_manifest): + """Adds the tcpxo-daemon container to the Pod spec.""" + tcpxo_daemon_container = { + 'name': 'tcpxo-daemon', + 'image': f'us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpxo/tcpgpudmarxd-dev:{rxdm}', + 'imagePullPolicy': 'Always', + 'command': ['/bin/sh', '-c'], + 'args': [ + 'set -ex\nchmod 755' + ' /fts/entrypoint_rxdm_container.sh\n/fts/entrypoint_rxdm_container.sh' + ' --num_hops=2 --num_nics=8 --uid= --alsologtostderr' + ], + 'securityContext': { + 'capabilities': {'add': ['NET_ADMIN', 'NET_BIND_SERVICE']} + }, + 'volumeMounts': [ + {'name': 'libraries', 'mountPath': '/usr/local/nvidia'}, + {'name': 'sys', 'mountPath': '/hostsysfs'}, + {'name': 'proc-sys', 'mountPath': '/hostprocsysfs'}, + ], + 'env': [{'name': 'LD_LIBRARY_PATH', 'value': '/usr/local/nvidia/lib64'}], + } + job_manifest['spec']['template']['spec']['containers'].insert( + 0, tcpxo_daemon_container + ) + + +def update_gpu_containers(job_manifest): + for container in job_manifest['spec']['template']['spec']['containers']: + if 'nvidia.com/gpu' in container.get('resources', {}).get('limits', {}): + container.setdefault('env', []) + container['env'].append( + {'name': 'LD_LIBRARY_PATH', 'value': '/usr/local/nvidia/lib64'} + ) + container['env'].append({ + 'name': 'NCCL_FASTRAK_LLCM_DEVICE_DIRECTORY', + 'value': '/dev/aperture_devices', + }) + container.setdefault('volumeMounts', []) + container['volumeMounts'].append( + {'name': 'aperture-devices', 'mountPath': '/dev/aperture_devices'} + ) diff --git a/src/xpk/utils/yaml.py b/src/xpk/utils/yaml.py new file mode 100644 index 000000000..a987066a5 --- /dev/null +++ b/src/xpk/utils/yaml.py @@ -0,0 +1,30 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import yaml + + +class literal_string(str): + pass + + +def literal_string_representer( + dumper: yaml.Dumper, data +) -> yaml.nodes.ScalarNode: + return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') + + +yaml.add_representer(literal_string, literal_string_representer)