-
Notifications
You must be signed in to change notification settings - Fork 307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Slurm agent #3005
base: master
Are you sure you want to change the base?
[WIP] Slurm agent #3005
Changes from 28 commits
421d1b8
1d1f806
5d97126
2e7f0f2
9644b99
e41b181
6db24dc
122c7f1
e9760a7
e68fda9
470637c
1579ab4
0e538f0
8229418
9e6d8a6
e07b09a
3a7eb6d
a815fd9
a3ea014
a109bd8
e5da665
0a3d9f1
1b0f6df
26cc201
16d953e
c743917
e365dee
c1064d4
361fbd1
fc3e34e
5cd58ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# Flytekit Slurm Plugin | ||
|
||
The Slurm agent is designed to integrate Flyte workflows with Slurm-managed high-performance computing (HPC) clusters, enabling users to leverage Slurm's capability of compute resource allocation, scheduling, and monitoring. | ||
|
||
This [guide](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md) provides a concise overview of the design philosophy behind the Slurm agent and explains how to set up a local environment for testing the agent. |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Amazing Graph. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, bro. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
# Slurm Agent Demo | ||
|
||
In this guide, we will briefly introduce how to setup an environment to test Slurm agent locally without running the backend service (e.g., flyte agent gRPC server). It covers both basic and advanced use cases. | ||
|
||
## Table of Content | ||
* [Overview](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#overview) | ||
* [Setup a Local Test Environment](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#setup-a-local-test-environment) | ||
* [Flyte Client (Localhost)](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#flyte-client-localhost) | ||
* [Remote Tiny Slurm Cluster](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#remote-tiny-slurm-cluster) | ||
* [SSH Configuration](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#ssh-configuration) | ||
* [Run a Demo](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/demo.md#run-a-demo) | ||
|
||
## Overview | ||
Slurm agent on the highest level has three core methods to interact with a Slurm cluster: | ||
1. `create`: Use `srun` or `sbatch` to run a job on a Slurm cluster | ||
2. `get`: Use `scontrol show job <job_id>` to monitor the Slurm job state | ||
3. `delete`: Use `scancel <job_id>` to cancel the Slurm job (this method is still under test) | ||
|
||
In the simplest form, Slurm agent supports directly running a batch script using `sbatch` on a Slurm cluster as shown below: | ||
|
||
![](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/assets/basic_arch.png) | ||
|
||
## Setup a Local Test Environment | ||
Without running the backend service, we can setup an environment to test Slurm agent locally. The setup consists of two main components: a client (localhost) and a remote tiny Slurm cluster. Then, we need to configure SSH connection to facilitate communication between the two, which relies on `asyncssh`. | ||
|
||
### Flyte Client (Localhost) | ||
1. Setup a local Flyte cluster following this [official guide](https://docs.flyte.org/en/latest/community/contribute/contribute_code.html#how-to-setup-dev-environment-for-flytekit) | ||
2. Build a virtual environment (e.g., conda) and activate it | ||
3. Clone Flytekit repo, checkout the Slurm agent PR, and install Flytekit | ||
``` | ||
git clone https://github.com/flyteorg/flytekit.git | ||
gh pr checkout 3005 | ||
make setup && pip install -e . | ||
``` | ||
4. Install Flytekit Slurm agent | ||
``` | ||
cd plugins/flytekit-slurm/ | ||
pip install -e . | ||
``` | ||
|
||
### Remote Tiny Slurm Cluster | ||
To simplify the setup process, we follow this [guide](https://github.com/JiangJiaWei1103/Slurm-101) to configure a single-host Slurm cluster, covering `slurmctld` (the central management daemon) and `slurmd` (the compute node daemon). | ||
|
||
### SSH Configuration | ||
To facilitate communication between the Flyte client and the remote Slurm cluster, we setup SSH on the Flyte client side as follows: | ||
1. Create a new authentication key pair | ||
``` | ||
ssh-keygen -t rsa -b 4096 | ||
``` | ||
2. Copy the public key into the remote Slurm cluster | ||
``` | ||
ssh-copy-id <username>@<remote_server_ip> | ||
``` | ||
3. Enable key-based authentication | ||
``` | ||
# ~/.ssh/config | ||
Host <host_alias> | ||
HostName <remote_server_ip> | ||
Port <ssh_port> | ||
User <username> | ||
IdentityFile <path_to_private_key> | ||
``` | ||
|
||
## Run a Demo | ||
Suppose we have a batch script to run on Slurm cluster: | ||
``` | ||
#!/bin/bash | ||
|
||
echo "Working!" >> ./remote_touch.txt | ||
``` | ||
|
||
We use the following python script to test Slurm agent on the client side. A crucial part of the task configuration is specifying the target Slurm cluster and designating the batch script's path within the cluster. | ||
|
||
```python | ||
import os | ||
|
||
from flytekit import workflow | ||
from flytekitplugins.slurm import Slurm, SlurmTask | ||
|
||
|
||
echo_job = SlurmTask( | ||
name="echo-job-name", | ||
task_config=Slurm( | ||
slurm_host="<host_alias>", | ||
batch_script_path="<path_to_batch_script_within_cluster>", | ||
sbatch_conf={ | ||
"partition": "debug", | ||
"job-name": "tiny-slurm", | ||
} | ||
) | ||
) | ||
|
||
|
||
@workflow | ||
def wf() -> None: | ||
echo_job() | ||
|
||
|
||
if __name__ == "__main__": | ||
from flytekit.clis.sdk_in_container import pyflyte | ||
from click.testing import CliRunner | ||
|
||
runner = CliRunner() | ||
path = os.path.realpath(__file__) | ||
|
||
print(f">>> LOCAL EXEC <<<") | ||
result = runner.invoke(pyflyte.main, ["run", path, "wf"]) | ||
print(result.output) | ||
``` | ||
|
||
After the Slurm job is completed, we can find the following result on Slurm cluster: | ||
|
||
![](https://github.com/JiangJiaWei1103/flytekit/blob/slurm-agent-dev/plugins/flytekit-slurm/assets/slurm_basic_result.png) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .function.agent import SlurmFunctionAgent | ||
from .function.task import SlurmFunction, SlurmFunctionTask | ||
from .script.agent import SlurmScriptAgent | ||
from .script.task import Slurm, SlurmRemoteScript, SlurmShellTask, SlurmTask |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
from dataclasses import dataclass | ||
from typing import Dict, Optional | ||
|
||
import asyncssh | ||
from asyncssh import SSHClientConnection | ||
|
||
from flytekit.extend.backend.base_agent import AgentRegistry, AsyncAgentBase, Resource, ResourceMeta | ||
from flytekit.extend.backend.utils import convert_to_flyte_phase | ||
from flytekit.models.literals import LiteralMap | ||
from flytekit.models.task import TaskTemplate | ||
|
||
|
||
@dataclass | ||
class SlurmJobMetadata(ResourceMeta): | ||
"""Slurm job metadata. | ||
|
||
Args: | ||
job_id: Slurm job id. | ||
""" | ||
|
||
job_id: str | ||
slurm_host: str | ||
|
||
|
||
class SlurmFunctionAgent(AsyncAgentBase): | ||
name = "Slurm Function Agent" | ||
|
||
# SSH connection pool for multi-host environment | ||
_conn: Optional[SSHClientConnection] = None | ||
|
||
def __init__(self) -> None: | ||
super(SlurmFunctionAgent, self).__init__(task_type_name="slurm_fn", metadata_type=SlurmJobMetadata) | ||
|
||
async def create( | ||
self, | ||
task_template: TaskTemplate, | ||
inputs: Optional[LiteralMap] = None, | ||
**kwargs, | ||
) -> SlurmJobMetadata: | ||
# Retrieve task config | ||
slurm_host = task_template.custom["slurm_host"] | ||
srun_conf = task_template.custom["srun_conf"] | ||
Comment on lines
+41
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use task_template.custom.get("slurm_host")? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As Let me know what you think. Thanks! |
||
|
||
# Construct srun command for Slurm cluster | ||
cmd = _get_srun_cmd(srun_conf=srun_conf, entrypoint=" ".join(task_template.container.args)) | ||
|
||
# Run Slurm job | ||
if self._conn is None: | ||
await self._connect(slurm_host) | ||
res = await self._conn.run(cmd, check=True) | ||
|
||
# Direct return for sbatch | ||
# job_id = res.stdout.split()[-1] | ||
# Use echo trick for srun | ||
job_id = res.stdout.strip() | ||
|
||
return SlurmJobMetadata(job_id=job_id, slurm_host=slurm_host) | ||
|
||
async def get(self, resource_meta: SlurmJobMetadata, **kwargs) -> Resource: | ||
await self._connect(resource_meta.slurm_host) | ||
res = await self._conn.run(f"scontrol show job {resource_meta.job_id}", check=True) | ||
|
||
# Determine the current flyte phase from Slurm job state | ||
job_state = "running" | ||
for o in res.stdout.split(" "): | ||
if "JobState" in o: | ||
job_state = o.split("=")[1].strip().lower() | ||
cur_phase = convert_to_flyte_phase(job_state) | ||
|
||
return Resource(phase=cur_phase) | ||
|
||
async def delete(self, resource_meta: SlurmJobMetadata, **kwargs) -> None: | ||
await self._connect(resource_meta.slurm_host) | ||
_ = await self._conn.run(f"scancel {resource_meta.job_id}", check=True) | ||
|
||
async def _connect(self, slurm_host: str) -> None: | ||
"""Make an SSH client connection.""" | ||
self._conn = await asyncssh.connect(host=slurm_host) | ||
|
||
|
||
def _get_srun_cmd(srun_conf: Dict[str, str], entrypoint: str) -> str: | ||
"""Construct Slurm srun command. | ||
|
||
Flyte entrypoint, pyflyte-execute, is run within a bash shell process. | ||
|
||
Args: | ||
srun_conf: Options of srun command. | ||
entrypoint: Flyte entrypoint. | ||
|
||
Returns: | ||
cmd: Slurm srun command. | ||
""" | ||
# Setup srun options | ||
cmd = ["srun"] | ||
for opt, val in srun_conf.items(): | ||
cmd.extend([f"--{opt}", str(val)]) | ||
|
||
cmd.extend(["bash", "-c"]) | ||
cmd = " ".join(cmd) | ||
|
||
cmd += f""" '# Setup environment variables | ||
export PATH=$PATH:/opt/anaconda/anaconda3/bin; | ||
|
||
# Run pyflyte-execute in a pre-built conda env | ||
source activate dev; | ||
{entrypoint}; | ||
|
||
# A trick to show Slurm job id on stdout | ||
echo $SLURM_JOB_ID;' | ||
""" | ||
|
||
return cmd | ||
|
||
|
||
AgentRegistry.register(SlurmFunctionAgent()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
is this for
shell task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we define a
SlurmTask
without specifyingcontainer_image
(as the example python script provided above),ctx.serialization_settings
will beNone
. Then, an error is raised which describes thatPythonAutoContainerTask
needs an image.I think this is just a temporary workaround for local test and I'm still pondering how to better handle this issue.