Skip to content

Commit

Permalink
use gcsfuse for a3ultra and a3 mega
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Pawłowski <[email protected]>
  • Loading branch information
pawloch00 committed Jan 29, 2025
1 parent 8bb0c30 commit cf77a8c
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 11 deletions.
52 changes: 52 additions & 0 deletions src/xpk/blueprints/a3mega/storage_crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: storages.xpk.x-k8s.io
spec:
group: xpk.x-k8s.io
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
type:
type: string
cluster:
type: string
auto_mount:
type: boolean
mount_point:
type: string
readonly:
type: boolean
manifest:
type: string
pv:
type: string
pvc:
type: string
required:
- type
- cluster
- auto_mount
- mount_point
- readonly
- manifest
- pvc
- pv
x-kubernetes-validations:
- message: Value is immutable
rule: self == oldSelf
scope: Cluster
names:
plural: storages
singular: storage
kind: Storage
shortNames:
- stg
52 changes: 52 additions & 0 deletions src/xpk/blueprints/a3ultra/storage_crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: storages.xpk.x-k8s.io
spec:
group: xpk.x-k8s.io
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
type:
type: string
cluster:
type: string
auto_mount:
type: boolean
mount_point:
type: string
readonly:
type: boolean
manifest:
type: string
pv:
type: string
pvc:
type: string
required:
- type
- cluster
- auto_mount
- mount_point
- readonly
- manifest
- pvc
- pv
x-kubernetes-validations:
- message: Value is immutable
rule: self == oldSelf
scope: Cluster
names:
plural: storages
singular: storage
kind: Storage
shortNames:
- stg
18 changes: 14 additions & 4 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
kind: JobSet
metadata:
name: {args.workload}
annotations: {storage_annotations}
labels:
kueue.x-k8s.io/queue-name: multislice-queue # Name of the LocalQueue
xpk.google.com/workload: {args.workload}
Expand Down Expand Up @@ -209,9 +210,6 @@
- name: slice-job
replicas: 1
template:
metadata:
annotations:
{storage_annotations}
spec:
parallelism: {args.num_nodes}
completions: {args.num_nodes}
Expand Down Expand Up @@ -485,6 +483,7 @@ def workload_create(args) -> None:
xpk_exit(return_code)

storages: list[Storage] = get_storages_to_mount(k8s_api_client, args.storage)
xpk_print('storages:', storages)
gcs_fuse_storages = list(
filter(lambda storage: storage.type == GCS_FUSE_TYPE, storages)
)
Expand All @@ -510,18 +509,29 @@ def workload_create(args) -> None:

if system.device_type in cluster_gcluster.supported_device_types:
yml_string = a3_gpu_workload_create_yaml.format(
args=args, container=container
args=args,
container=container,
service_account=XPK_SA,
storage_volumes=get_storage_volumes_yaml_for_gpu(gcs_fuse_storages),
)

if args.device_type == cluster_gcluster.a3mega_device_type:
sub_networks = [f'{args.cluster}-gpunet-{i}-subnet' for i in range(8)]
yml_string = tcpxo_decorator.decorate_jobset(yml_string, sub_networks)
if len(gcs_fuse_storages) > 0:
yml_string = tcpxo_decorator.decorate_jobset_with_storages(
yml_string, gcs_fuse_storages
)

if args.device_type == cluster_gcluster.a3ultra_device_type:
sub_networks = [f'{args.cluster}-sub-1'] + [
f'{args.cluster}-rdma-sub-{i}' for i in range(8)
]
yml_string = rdma_decorator.decorate_jobset(yml_string, sub_networks)
if len(gcs_fuse_storages) > 0:
yml_string = rdma_decorator.decorate_jobset_with_storages(
yml_string, gcs_fuse_storages
)
else:
yml_string = gpu_workload_create_yaml.format(
args=args,
Expand Down
13 changes: 8 additions & 5 deletions src/xpk/core/blueprint/blueprint_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
limitations under the License.
"""

from logging import config
import shutil
from typing import Optional
from ruamel import yaml
Expand Down Expand Up @@ -200,9 +199,9 @@ def generate_a3_mega_blueprint(
"config_template_vars": {"num_chips": f"{num_chips}"},
},
"jobset": {"install": True, "version": "v0.7.2"},
"apply_manifests": [
{"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'}
],
"apply_manifests": [{
"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'
}],
},
)

Expand Down Expand Up @@ -588,7 +587,11 @@ def generate_a3_ultra_blueprint(
"apply_manifests": [
{"source": nccl_installer_path},
{"source": mlgru_disable_path},
{"source": f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'}
{
"source": (
f'$(ghpc_stage("{blueprint_name}"))/storage_crd.yaml'
)
},
],
},
)
Expand Down
1 change: 1 addition & 0 deletions src/xpk/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def get_storages(
Returns:
A list of Storage objects matching the given names.
"""
xpk_print("requested_storages: ", requested_storages)
all_storages = list_storages(k8s_api_client)
all_storage_names = {storage.name for storage in all_storages}

Expand Down
29 changes: 29 additions & 0 deletions src/xpk/core/workload_decorators/rdma_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import yaml
from ...utils.yaml import literal_string
from ...core.storage import GCS_FUSE_TYPE


def decorate_jobset(jobset_manifest_str, sub_networks) -> str:
Expand Down Expand Up @@ -52,6 +53,34 @@ def decorate_jobset(jobset_manifest_str, sub_networks) -> str:
return yaml.dump(manifest, sort_keys=False)


def decorate_jobset_with_storages(jobset_manifest_str, storages) -> str:
"""
Decorates a JobSet manifest with the necessary storages.
Args:
jobset_manifest_str: The JobSet manifest as a YAML string.
Returns:
The modified JobSet manifest as a YAML string.
"""

manifest = yaml.safe_load(jobset_manifest_str)

for job in manifest['spec']['replicatedJobs']:
job_manifest = job['template']
add_storage_annotations(job_manifest, storages)

return yaml.dump(manifest, sort_keys=False)


def add_storage_annotations(job_manifest, storages):
"""Adds or updates storage annotations in the Pod template."""
annotations = job_manifest['spec']['template']['metadata']['annotations']
gcs_present = [storage.type == GCS_FUSE_TYPE for storage in storages]
if gcs_present:
annotations.update({'gke-gcsfuse/volumes': 'true'})


def add_annotations(job_manifest, sub_networks):
"""Adds or updates annotations in the Pod template."""
annotations = job_manifest['spec']['template']['metadata']['annotations']
Expand Down
30 changes: 29 additions & 1 deletion src/xpk/core/workload_decorators/tcpxo_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import yaml
from ...utils.yaml import literal_string

from ...core.storage import GCS_FUSE_TYPE
# Component version
rxdm = 'v1.0.12'

Expand Down Expand Up @@ -56,6 +56,34 @@ def decorate_jobset(jobset_manifest_str, sub_networks) -> str:
return yaml.dump(manifest, sort_keys=False)


def decorate_jobset_with_storages(jobset_manifest_str, storages) -> str:
"""
Decorates a JobSet manifest with the necessary storages.
Args:
jobset_manifest_str: The JobSet manifest as a YAML string.
Returns:
The modified JobSet manifest as a YAML string.
"""

manifest = yaml.safe_load(jobset_manifest_str)

for job in manifest['spec']['replicatedJobs']:
job_manifest = job['template']
add_storage_annotations(job_manifest, storages)

return yaml.dump(manifest, sort_keys=False)


def add_storage_annotations(job_manifest, storages):
"""Adds or updates storage annotations in the Pod template."""
annotations = job_manifest['spec']['template']['metadata']['annotations']
gcs_present = [storage.type == GCS_FUSE_TYPE for storage in storages]
if gcs_present:
annotations.update({'gke-gcsfuse/volumes': 'true'})


def add_annotations(job_manifest, sub_networks):
"""Adds or updates annotations in the Pod template."""
annotations = job_manifest['spec']['template']['metadata']['annotations']
Expand Down
3 changes: 2 additions & 1 deletion src/xpk/parser/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ def add_shared_cluster_create_optional_arguments(args_parsers):
action='store_true',
help=(
'Enable GSCFuse driver on the cluster. This enables Workload'
' Identity Federation. When using A3 ultra/A3 mega Workload Identity is enabled by default.'
' Identity Federation. When using A3 ultra/A3 mega Workload'
' Identity is enabled by default.'
),
)

Expand Down

0 comments on commit cf77a8c

Please sign in to comment.