diff --git a/README.md b/README.md index 470593fa..08fb0b98 100644 --- a/README.md +++ b/README.md @@ -640,8 +640,6 @@ Check out [MaxText example](https://github.com/google/maxtext/pull/570) on how t ``` * Workload List supports waiting for the completion of a specific job. XPK will follow an existing job until it has finished or the `timeout`, if provided, has been reached and then list the job. If no `timeout` is specified, the default value is set to the max value, 1 week. You may also set `timeout=0` to poll the job once. -(Note: `restart-on-user-code-failure` must be set -when creating the workload otherwise the workload will always finish with `Completed` status.) Wait for a job to complete. diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index f2f744a7..77ca78f3 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -36,6 +36,7 @@ get_gpu_scheduler, get_gpu_tcp_volume, get_gpu_volume, + get_main_container_docker_image, get_user_workload_container, get_volumes, parse_env_config, @@ -80,6 +81,7 @@ spec: ttlSecondsAfterFinished: {args.ttl_seconds_after_finished} failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -89,6 +91,7 @@ parallelism: {system.vms_per_slice} # Equal to the number of VMs per slice completions: {system.vms_per_slice} # Same as the above. backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -122,6 +125,7 @@ spec: ttlSecondsAfterFinished: {args.ttl_seconds_after_finished} failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -131,6 +135,7 @@ parallelism: {args.num_nodes} completions: {args.num_nodes} backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -180,6 +185,7 @@ spec: ttlSecondsAfterFinished: {args.ttl_seconds_after_finished} failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -189,6 +195,7 @@ parallelism: {args.num_nodes} completions: {args.num_nodes} backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -217,130 +224,131 @@ spec: ttlSecondsAfterFinished: {args.ttl_seconds_after_finished} failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} successPolicy: operator: "All" targetReplicatedJobs: - {args.targetReplicatedJob} replicatedJobs: - - name: worker - replicas: {args.num_slices} - template: - metadata: - annotations: - alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: {backoff_limit} - completions: {system.vms_per_slice} - parallelism: {system.vms_per_slice} - template: - spec: - terminationGracePeriodSeconds: {args.termination_grace_period_seconds} - containers: - - args: - {pathways_worker_args} - image: {args.server_image} - imagePullPolicy: Always - name: pathways-worker - ports: - - containerPort: 29001 - - containerPort: 8471 - - containerPort: 8080 - resources: - limits: - {resource_type}: {system.chips_per_vm} - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp + - name: worker + replicas: {args.num_slices} + template: + metadata: + annotations: + alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: {backoff_limit} + completions: {system.vms_per_slice} + parallelism: {system.vms_per_slice} + template: + spec: + terminationGracePeriodSeconds: {args.termination_grace_period_seconds} + containers: + - args: + {pathways_worker_args} + image: {args.server_image} + imagePullPolicy: Always + name: pathways-worker + ports: + - containerPort: 29001 + - containerPort: 8471 + - containerPort: 8080 + resources: + limits: + {resource_type}: {system.chips_per_vm} + securityContext: + privileged: true + volumeMounts: + - mountPath: /tmp + name: shared-tmp + {pathways_sidecar_container} + nodeSelector: + {accelerator_label} + {machine_label} + {autoprovisioning_args} + priorityClassName: {args.priority} + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate name: shared-tmp - {pathways_sidecar_container} - nodeSelector: - {accelerator_label} - {machine_label} - {autoprovisioning_args} - priorityClassName: {args.priority} - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp - - name: rm - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_rm_args} - env: - - name: REPLICATED_JOB_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] - - name: JOBSET_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] - - name: HOST_ADDRESS - value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) - - name: TPU_SKIP_MDS_QUERY - value: "true" - image: {args.server_image} - imagePullPolicy: Always - name: pathways-rm - ports: - - containerPort: 29001 - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp + - name: rm + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_rm_args} + env: + - name: REPLICATED_JOB_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] + - name: JOBSET_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] + - name: HOST_ADDRESS + value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) + - name: TPU_SKIP_MDS_QUERY + value: "true" + image: {args.server_image} + imagePullPolicy: Always + name: pathways-rm + ports: + - containerPort: 29001 + securityContext: + privileged: true + volumeMounts: + - mountPath: /tmp + name: shared-tmp + nodeSelector: + cloud.google.com/gke-nodepool: cpu-rm-np + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate name: shared-tmp - nodeSelector: - cloud.google.com/gke-nodepool: cpu-rm-np - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp - - name: proxy - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_proxy_args} - image: {args.proxy_server_image} - imagePullPolicy: Always - name: pathways-proxy - ports: - - containerPort: 29000 - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - nodeSelector: - cloud.google.com/gke-nodepool: cpu-proxy-np - {user_workload} + - name: proxy + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_proxy_args} + image: {args.proxy_server_image} + imagePullPolicy: Always + name: pathways-proxy + ports: + - containerPort: 29000 + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + nodeSelector: + cloud.google.com/gke-nodepool: cpu-proxy-np + {user_workload} """ @@ -451,6 +459,21 @@ def workload_create(args) -> None: if return_code != 0: xpk_exit(return_code) + failure_policy_rules = """rules: + - action: FailJobSet + onJobFailureReasons: + - PodFailurePolicy""" + restart_on_exit_codes = get_restart_exit_codes(args) + restart_on_exit_codes = ','.join(map(str, restart_on_exit_codes)) + pod_failure_policy = f""" + podFailurePolicy: + rules: + - action: FailJob + onExitCodes: + containerName: {get_main_container_docker_image(args, system)} + operator: NotIn + values: [{restart_on_exit_codes}]""" + # Create the workload file based on accelerator type or workload type. if system.accelerator_type == AcceleratorType['GPU']: container, debugging_dashboard_id = get_user_workload_container( @@ -464,7 +487,10 @@ def workload_create(args) -> None: if system.device_type in cluster_gcluster.supported_device_types: yml_string = a3_gpu_workload_create_yaml.format( - args=args, container=container + args=args, + container=container, + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) if args.device_type == cluster_gcluster.a3mega_device_type: @@ -487,6 +513,8 @@ def workload_create(args) -> None: gpu_rxdm_image=get_gpu_rxdm_image(system), gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), gpu_tcp_volume=get_gpu_tcp_volume(system), + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) elif args.use_pathways and ensure_pathways_workload_prerequisites( args, system @@ -502,13 +530,17 @@ def workload_create(args) -> None: pathways_worker_args=get_pathways_worker_args(args), pathways_proxy_args=get_pathways_proxy_args(args), pathways_sidecar_container=get_pathways_sidecar_container(args), - user_workload=get_user_workload_for_pathways(args, system), + user_workload=get_user_workload_for_pathways( + args, system, pod_failure_policy + ), resource_type=AcceleratorTypeToAcceleratorCharacteristics[ system.accelerator_type ].resource_type, local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, backoff_limit=system.vms_per_slice * 4, + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) else: container, debugging_dashboard_id = get_user_workload_container( @@ -526,6 +558,8 @@ def workload_create(args) -> None: local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, volumes=get_volumes(args, system), + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) tmp = write_tmp_file(yml_string) command = f'kubectl apply -f {str(tmp.file.name)}' @@ -596,6 +630,23 @@ def workload_create(args) -> None: xpk_exit(0) +def get_restart_exit_codes(args) -> list: + exit_codes = [42] + exit_codes.extend(range(127, 256, 1)) + + if args.restart_on_exit_codes is not None: + items = args.restart_on_exit_codes.split(',') + for item in items: + item = item.strip() + if '-' in item: + start, end = map(int, item.split('-')) + exit_codes.extend(range(start, end + 1)) + else: + exit_codes.append(int(item)) + + return exit_codes + + def workload_delete(args) -> None: """Function around workload delete. diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index b8196efa..94e2bd4e 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -1989,15 +1989,6 @@ def get_main_container(args, system, docker_image, resource_type) -> str: 'touch /shared-volume/stacktrace_signal; ' ) - xpk_return_user_exit_code = '' - if args.restart_on_user_code_failure: - if int(args.max_restarts) <= 0: - xpk_print( - f'Warning: --max-restarts, is set to {args.max_restarts}. Will not' - ' restart on user failure.' - ) - xpk_return_user_exit_code = 'exit $EXIT_CODE' - yaml = """- name: {docker_name} image: {docker_image} {image_pull_policy} @@ -2026,10 +2017,7 @@ def get_main_container(args, system, docker_image, resource_type) -> str: echo EXIT_CODE=$EXIT_CODE; {tpu_stacktrace_terminate_command} {gpu_workload_terminate_command} - if [ "$EXIT_CODE" = 143 ]; then - exit $EXIT_CODE - fi - {xpk_return_user_exit_code} + exit $EXIT_CODE resources: limits: {resources} @@ -2056,7 +2044,6 @@ def get_main_container(args, system, docker_image, resource_type) -> str: xpk_internal_commands=xpk_internal_commands, resources=get_main_container_resources(args, system, resource_type), volume_mounts=volume_mounts, - xpk_return_user_exit_code=xpk_return_user_exit_code, ) diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index 875b3e18..fb972166 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -41,8 +41,8 @@ def get_pathways_worker_args(args) -> str: str: yaml containing arguments for the Pathways workers. """ yaml = """- --server_port=29001 - - --resource_manager_address={rm_address} - - --gcs_scratch_location={args.pathways_gcs_location}""" + - --resource_manager_address={rm_address} + - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: return yaml.format(args=args, rm_address=get_rm_address(args)) else: @@ -58,8 +58,8 @@ def get_pathways_proxy_args(args) -> str: str: yaml containing arguments for the Pathways proxy. """ yaml = """- --server_port=29000 - - --resource_manager_address={rm_address} - - --gcs_scratch_location={args.pathways_gcs_location}""" + - --resource_manager_address={rm_address} + - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: return yaml.format(args=args, rm_address=get_rm_address(args)) @@ -200,9 +200,6 @@ def ensure_pathways_workload_prerequisites(args, system) -> bool: # Set the job which determines the life of other Pathways jobs args.targetReplicatedJob = 'proxy' if args.headless else 'main' - # Always report user code failures back to JobSet. - args.restart_on_user_code_failure = True - return True @@ -232,10 +229,10 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: str: yaml containing arguments for the Pathways resource manager. """ yaml = """- --server_port=29001 - - --gcs_scratch_location={args.pathways_gcs_location} - - --node_type=resource_manager - - --instance_count={instance_count} - - --instance_type={instance_type}""" + - --gcs_scratch_location={args.pathways_gcs_location} + - --node_type=resource_manager + - --instance_count={instance_count} + - --instance_type={instance_type}""" if args.use_pathways: return yaml.format( args=args, @@ -246,7 +243,9 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: return '' -def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: +def get_user_workload_for_pathways( + args, system: SystemCharacteristics, pod_failure_policy +) -> str: """ Create a user workload container for Pathways. Don't create one for Pathways headless mode. @@ -261,32 +260,35 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: Pathways server port as a YAML string """ user_workload_yaml = """- name: main - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + {pod_failure_policy} + template: + spec: + containers: {container} - nodeSelector: - cloud.google.com/gke-nodepool: cpu-user-np - restartPolicy: OnFailure - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp""" + nodeSelector: + cloud.google.com/gke-nodepool: cpu-user-np + restartPolicy: Never + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate + name: shared-tmp""" if args.headless: return '' else: container, _ = get_user_workload_container(args, system) - return user_workload_yaml.format(args=args, container=container) + return user_workload_yaml.format( + args=args, container=container, pod_failure_policy=pod_failure_policy + ) def get_rm_address(args) -> str: diff --git a/src/xpk/parser/workload.py b/src/xpk/parser/workload.py index efbadc13..264edc42 100644 --- a/src/xpk/parser/workload.py +++ b/src/xpk/parser/workload.py @@ -518,14 +518,16 @@ def add_shared_workload_create_optional_arguments(args_parsers): ), ) custom_parser.add_argument( - '--restart-on-user-code-failure', - action='store_true', + '--restart-on-exit-codes', + type=str, + default=None, help=( - 'Adding this argument will return user failures back to the jobset' - ' manager allowing restarts on user code when --max-restarts is set' - ' greater than 0. By default, this is not enabled, and workloads' - ' will not restart from user code failures. This is enabled by' - ' default on Pathways workloads.' + 'Adding this argument specifies additional user-defined exit codes' + ' that allow restarting the workload when --max-restarts is set to' + ' a value greater than 0. By default, workloads restart on exit' + ' codes 42 and 127-255. Any exit codes provided through this flag' + ' will be included alongside the default codes for restarting' + ' conditions.' ), ) custom_parser.add_argument(