Skip to content

Commit

Permalink
ray_scheduler: workspace + fixed no role logging (pytorch#492)
Browse files Browse the repository at this point in the history
Summary:
This updates Ray to have proper workspace support.

* `-c working_dir=...` is deprecated in favor of `torchx run --workspace=...`
* `-c requirements=...` is optional and requirements.txt will be automatically read from the workspace if present
* `torchx log ray://foo/bar` works without requiring `/ray/0`

Pull Request resolved: pytorch#492

Test Plan:
(torchx) tristanr@tristanr-arch2 ~/D/t/e/ray (ray)> torchx run -s ray --wait --log dist.ddp --env LOGLEVEL=INFO -j 2x1 -m scripts.compute_world_size
torchx 2022-05-18 16:55:31 INFO     Checking for changes in workspace `file:///home/tristanr/Developer/torchrec/examples/ray`...
torchx 2022-05-18 16:55:31 INFO     To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.
torchx 2022-05-18 16:55:31 INFO     Built new image `/tmp/torchx_workspacebe6331jv` based on original image `ghcr.io/pytorch/torchx:0.2.0dev0` and changes in workspace `file:///home/tristanr/Developer/torch
rec/examples/ray` for role[0]=compute_world_size.
torchx 2022-05-18 16:55:31 WARNING  The Ray scheduler does not support port mapping.
torchx 2022-05-18 16:55:31 INFO     Uploading package gcs://_ray_pkg_63a39f7096dfa0bd.zip.
torchx 2022-05-18 16:55:31 INFO     Creating a file package for local directory '/tmp/torchx_workspacebe6331jv'.
ray://torchx/127.0.0.1:8265-compute_world_size-mpr03nzqvvg3td
torchx 2022-05-18 16:55:31 INFO     Launched app: ray://torchx/127.0.0.1:8265-compute_world_size-mpr03nzqvvg3td
torchx 2022-05-18 16:55:31 INFO     AppStatus:
  msg: PENDING
  num_restarts: -1
  roles:
  - replicas:
    - hostname: <NONE>
      id: 0
      role: ray
      state: !!python/object/apply:torchx.specs.api.AppState
      - 2
      structured_error_msg: <NONE>
    role: ray
  state: PENDING (2)
  structured_error_msg: <NONE>
  ui_url: null

torchx 2022-05-18 16:55:31 INFO     Job URL: None
torchx 2022-05-18 16:55:31 INFO     Waiting for the app to finish...
torchx 2022-05-18 16:55:31 INFO     Waiting for app to start before logging...
torchx 2022-05-18 16:55:43 INFO     Job finished: SUCCEEDED
(torchx) tristanr@tristanr-arch2 ~/D/t/e/ray (ray)> torchx log ray://torchx/127.0.0.1:8265-compute_world_size-mpr03nzqvvg3td
ray/0 Waiting for placement group to start.
ray/0 running ray.wait on [ObjectRef(8f2664c081ffc268e1c4275021ead9801a8d33861a00000001000000), ObjectRef(afe9f14f5a927c04b8e247b9daca5a9348ef61061a00000001000000)]
ray/0 (CommandActor pid=494377) INFO:torch.distributed.launcher.api:Starting elastic_operator with launch configs:
ray/0 (CommandActor pid=494377)   entrypoint       : scripts.compute_world_size
ray/0 (CommandActor pid=494377)   min_nodes        : 2
ray/0 (CommandActor pid=494377)   max_nodes        : 2
ray/0 (CommandActor pid=494377)   nproc_per_node   : 1
ray/0 (CommandActor pid=494377)   run_id           : compute_world_size-mpr03nzqvvg3td
ray/0 (CommandActor pid=494377)   rdzv_backend     : c10d
ray/0 (CommandActor pid=494377)   rdzv_endpoint    : localhost:29500
ray/0 (CommandActor pid=494377)   rdzv_configs     : {'timeout': 900}
ray/0 (CommandActor pid=494377)   max_restarts     : 0
ray/0 (CommandActor pid=494377)   monitor_interval : 5
ray/0 (CommandActor pid=494377)   log_dir          : None
ray/0 (CommandActor pid=494377)   metrics_cfg      : {}
ray/0 (CommandActor pid=494377)
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.local_elastic_agent:log directory set to: /tmp/torchelastic_vyq136c_/compute_world_size-mpr03nzqvvg3td_nu4r0f6t
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:[] starting workers for entrypoint: python
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:[] Rendezvous'ing worker group
ray/0 (CommandActor pid=494406) INFO:torch.distributed.launcher.api:Starting elastic_operator with launch configs:
ray/0 (CommandActor pid=494406)   entrypoint       : scripts.compute_world_size
ray/0 (CommandActor pid=494406)   min_nodes        : 2
ray/0 (CommandActor pid=494406)   max_nodes        : 2
ray/0 (CommandActor pid=494406)   nproc_per_node   : 1
ray/0 (CommandActor pid=494406)   run_id           : compute_world_size-mpr03nzqvvg3td
ray/0 (CommandActor pid=494406)   rdzv_backend     : c10d
ray/0 (CommandActor pid=494406)   rdzv_endpoint    : 172.26.20.254:29500
ray/0 (CommandActor pid=494406)   rdzv_configs     : {'timeout': 900}
ray/0 (CommandActor pid=494406)   max_restarts     : 0
ray/0 (CommandActor pid=494406)   monitor_interval : 5
ray/0 (CommandActor pid=494406)   log_dir          : None
ray/0 (CommandActor pid=494406)   metrics_cfg      : {}
ray/0 (CommandActor pid=494406)
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.local_elastic_agent:log directory set to: /tmp/torchelastic_t38mo11i/compute_world_size-mpr03nzqvvg3td_ehvp80_p
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:[] starting workers for entrypoint: python
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:[] Rendezvous'ing worker group
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:[] Rendezvous complete for workers. Result:
ray/0 (CommandActor pid=494377)   restart_count=0
ray/0 (CommandActor pid=494377)   master_addr=tristanr-arch2
ray/0 (CommandActor pid=494377)   master_port=48089
ray/0 (CommandActor pid=494377)   group_rank=1
ray/0 (CommandActor pid=494377)   group_world_size=2
ray/0 (CommandActor pid=494377)   local_ranks=[0]
ray/0 (CommandActor pid=494377)   role_ranks=[1]
ray/0 (CommandActor pid=494377)   global_ranks=[1]
ray/0 (CommandActor pid=494377)   role_world_sizes=[2]
ray/0 (CommandActor pid=494377)   global_world_sizes=[2]
ray/0 (CommandActor pid=494377)
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:[] Starting worker group
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.multiprocessing:Setting worker0 reply file to: /tmp/torchelastic_vyq136c_/compute_world_size-mpr03nzqvvg3td_nu4r0f6t/attempt_0/0/error.json
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:[] Rendezvous complete for workers. Result:
ray/0 (CommandActor pid=494406)   restart_count=0
ray/0 (CommandActor pid=494406)   master_addr=tristanr-arch2
ray/0 (CommandActor pid=494406)   master_port=48089
ray/0 (CommandActor pid=494406)   group_rank=0
ray/0 (CommandActor pid=494406)   group_world_size=2
ray/0 (CommandActor pid=494406)   local_ranks=[0]
ray/0 (CommandActor pid=494406)   role_ranks=[0]
ray/0 (CommandActor pid=494406)   global_ranks=[0]
ray/0 (CommandActor pid=494406)   role_world_sizes=[2]
ray/0 (CommandActor pid=494406)   global_world_sizes=[2]
ray/0 (CommandActor pid=494406)
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:[] Starting worker group
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.multiprocessing:Setting worker0 reply file to: /tmp/torchelastic_t38mo11i/compute_world_size-mpr03nzqvvg3td_ehvp80_p/attempt_0/0/error.json
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:[] worker group successfully finished. Waiting 300 seconds for other agents to finish.
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:Local worker group finished (SUCCEEDED). Waiting 300 seconds for other agents to finish
ray/0 (CommandActor pid=494377) INFO:torch.distributed.elastic.agent.server.api:Done waiting for other agents. Elapsed: 0.000942230224609375 seconds
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:[] worker group successfully finished. Waiting 300 seconds for other agents to finish.
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:Local worker group finished (SUCCEEDED). Waiting 300 seconds for other agents to finish
ray/0 (CommandActor pid=494406) INFO:torch.distributed.elastic.agent.server.api:Done waiting for other agents. Elapsed: 0.0013003349304199219 seconds
ray/0 (CommandActor pid=494377) [0]:initializing `gloo` process group
ray/0 (CommandActor pid=494377) [0]:successfully initialized process group
ray/0 (CommandActor pid=494377) [0]:rank: 1, actual world_size: 2, computed world_size: 2
ray/0 (CommandActor pid=494406) [0]:initializing `gloo` process group
ray/0 (CommandActor pid=494406) [0]:successfully initialized process group
ray/0 (CommandActor pid=494406) [0]:rank: 0, actual world_size: 2, computed world_size: 2
ray/0 running ray.wait on [ObjectRef(afe9f14f5a927c04b8e247b9daca5a9348ef61061a00000001000000)]

Reviewed By: kiukchung, msaroufim

Differential Revision: D36500237

Pulled By: d4l3k

fbshipit-source-id: 9ecf85b7860a7220262f0146890012cc88630cd2
  • Loading branch information
d4l3k authored and facebook-github-bot committed May 20, 2022
1 parent d0392f1 commit cb1d04a
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 51 deletions.
1 change: 1 addition & 0 deletions .torchxignore
4 changes: 3 additions & 1 deletion scripts/component_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ def main() -> None:
],
"image": torchx_image,
"cfg": {
"working_dir": ".",
"requirements": "",
},
"workspace": f"file://{os.getcwd()}",
},
}

Expand All @@ -115,6 +116,7 @@ def main() -> None:
image=params["image"],
cfg=params["cfg"],
dryrun=dryrun,
workspace=params.get("workspace"),
)


Expand Down
7 changes: 5 additions & 2 deletions torchx/components/integration_tests/integ_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dataclasses import asdict
from json import dumps
from types import ModuleType
from typing import Callable, cast, Dict, List, Type
from typing import Callable, cast, Dict, List, Optional, Type

from pyre_extensions import none_throws
from torchx.cli.cmd_log import get_logs
Expand Down Expand Up @@ -45,6 +45,7 @@ def run_components(
scheduler: str,
cfg: Dict[str, CfgVal],
dryrun: bool = False,
workspace: Optional[str] = None,
) -> None:
component_providers = [
cast(Callable[..., ComponentProvider], cls)(scheduler, image)
Expand All @@ -68,7 +69,9 @@ def run_components(
log.info(f"Submitting AppDef... (dryrun={dryrun})")
# get the dryrun info to log the scheduler request
# then use the schedule (intead of the run API) for job submission
dryrun_info = runner.dryrun(app_def, scheduler, cfg=cfg)
dryrun_info = runner.dryrun(
app_def, scheduler, cfg=cfg, workspace=workspace
)
log.info(f"\nAppDef:\n{dumps(asdict(app_def), indent=4)}")
log.info(f"\nScheduler Request:\n{dryrun_info}")

Expand Down
102 changes: 62 additions & 40 deletions torchx/schedulers/ray_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@
import json
import logging
import os
import tempfile
import time
from dataclasses import dataclass, field
from datetime import datetime
from shutil import copy2, copytree, rmtree
from tempfile import mkdtemp
from typing import Any, cast, Dict, List, Mapping, Optional, Set, Type # noqa
from shutil import copy2, rmtree
from typing import Any, cast, Dict, Iterable, List, Mapping, Optional, Set, Type # noqa

from torchx.schedulers.api import (
AppDryRunInfo,
AppState,
DescribeAppResponse,
filter_regex,
Scheduler,
split_lines,
Stream,
)
from torchx.schedulers.ids import make_unique
from torchx.schedulers.ray.ray_common import RayActor, TORCHX_RANK0_HOST
from torchx.specs import AppDef, macros, NONE, ReplicaStatus, Role, RoleStatus, runopts
from torchx.workspace.dir_workspace import TmpDirWorkspace
from typing_extensions import TypedDict


Expand Down Expand Up @@ -92,7 +94,7 @@ class RayJob:
dashboard_address:
The existing dashboard IP address to connect to
working_dir:
The working directory to copy to the cluster
The working directory to copy to the cluster
requirements:
The libraries to install on the cluster per requirements.txt
actors:
Expand All @@ -102,15 +104,24 @@ class RayJob:
"""

app_id: str
working_dir: str
cluster_config_file: Optional[str] = None
cluster_name: Optional[str] = None
dashboard_address: Optional[str] = None
working_dir: Optional[str] = None
requirements: Optional[str] = None
actors: List[RayActor] = field(default_factory=list)

class RayScheduler(Scheduler[RayOpts]):
class RayScheduler(Scheduler[RayOpts], TmpDirWorkspace):
"""
RayScheduler is a TorchX scheduling interface to Ray. The job def
workers will be launched as Ray actors
The job environment is specified by the TorchX workspace. Any files in
the workspace will be present in the Ray job unless specified in
``.torchxignore``. Python dependencies will be read from the
``requirements.txt`` file located at the root of the workspace unless
it's overridden via ``-c ...,requirements=foo/requirements.txt``.
**Config Options**
.. runopts::
Expand All @@ -122,12 +133,15 @@ class RayScheduler(Scheduler[RayOpts]):
type: scheduler
features:
cancel: true
logs: true
logs: |
Partial support. Ray only supports a single log stream so
only a dummy "ray/0" combined log role is supported.
Tailing and time seeking are not supported.
distributed: true
describe: |
Partial support. RayScheduler will return job status but
does not provide the complete original AppSpec.
workspaces: false
workspaces: true
mounts: false
"""
Expand Down Expand Up @@ -156,11 +170,6 @@ def run_opts(self) -> runopts:
default="127.0.0.1:8265",
help="Use ray status to get the dashboard address you will submit jobs against",
)
opts.add(
"working_dir",
type_=str,
help="Copy the the working directory containing the Python scripts to the cluster.",
)
opts.add("requirements", type_=str, help="Path to requirements.txt")
return opts

Expand All @@ -169,7 +178,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str:

# Create serialized actors for ray_driver.py
actors = cfg.actors
dirpath = mkdtemp()
dirpath = cfg.working_dir
serialize(actors, dirpath)

job_submission_addr: str = ""
Expand All @@ -189,41 +198,46 @@ def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str:
f"http://{job_submission_addr}"
)

# 1. Copy working directory
if cfg.working_dir:
copytree(cfg.working_dir, dirpath, dirs_exist_ok=True)

# 2. Copy Ray driver utilities
# 1. Copy Ray driver utilities
current_directory = os.path.dirname(os.path.abspath(__file__))
copy2(os.path.join(current_directory, "ray", "ray_driver.py"), dirpath)
copy2(os.path.join(current_directory, "ray", "ray_common.py"), dirpath)

# 3. Parse requirements.txt
reqs: List[str] = []
if cfg.requirements: # pragma: no cover
with open(cfg.requirements) as f:
for line in f:
reqs.append(line.strip())
runtime_env = {"working_dir": dirpath}
if cfg.requirements:
runtime_env["pip"] = cfg.requirements

# 4. Submit Job via the Ray Job Submission API
# 1. Submit Job via the Ray Job Submission API
try:
job_id: str = client.submit_job(
job_id=cfg.app_id,
# we will pack, hash, zip, upload, register working_dir in GCS of ray cluster
# and use it to configure your job execution.
entrypoint="python3 ray_driver.py",
runtime_env={"working_dir": dirpath, "pip": reqs},
runtime_env=runtime_env,
)

finally:
rmtree(dirpath)
if dirpath.startswith(tempfile.gettempdir()):
rmtree(dirpath)

# Encode job submission client in job_id
return f"{job_submission_addr}-{job_id}"

def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]:
app_id = make_unique(app.name)
requirements = cfg.get("requirements")

working_dir = app.roles[0].image
if not os.path.exists(working_dir):
raise RuntimeError(
f"Role image must be a valid directory, got: {working_dir} "
)

requirements: Optional[str] = cfg.get("requirements")
if requirements is None:
workspace_reqs = os.path.join(working_dir, "requirements.txt")
if os.path.exists(workspace_reqs):
requirements = workspace_reqs

cluster_cfg = cfg.get("cluster_config_file")
if cluster_cfg:
Expand All @@ -234,8 +248,9 @@ def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]:

job: RayJob = RayJob(
app_id,
cluster_cfg,
cluster_config_file=cluster_cfg,
requirements=requirements,
working_dir=working_dir,
)

else: # pragma: no cover
Expand All @@ -244,9 +259,9 @@ def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]:
app_id=app_id,
dashboard_address=dashboard_address,
requirements=requirements,
working_dir=working_dir,
)
job.cluster_name = cfg.get("cluster_name")
job.working_dir = cfg.get("working_dir")

for role in app.roles:
for replica_id in range(role.num_replicas):
Expand Down Expand Up @@ -298,12 +313,10 @@ def wait_until_finish(self, app_id: str, timeout: int = 30) -> None:
with a given timeout. This is intended for testing. Programmatic
usage should use the runner wait method instead.
"""
addr, _, app_id = app_id.partition("-")

client = JobSubmissionClient(f"http://{addr}")
start = time.time()
while time.time() - start <= timeout:
status_info = client.get_job_status(app_id)
status_info = self._get_job_status(app_id)
status = status_info
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
break
Expand All @@ -314,12 +327,18 @@ def _cancel_existing(self, app_id: str) -> None: # pragma: no cover
client = JobSubmissionClient(f"http://{addr}")
client.stop_job(app_id)

def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
def _get_job_status(self, app_id: str) -> JobStatus:
addr, _, app_id = app_id.partition("-")
client = JobSubmissionClient(f"http://{addr}")
job_status_info = client.get_job_status(app_id)
status = client.get_job_status(app_id)
if isinstance(status, str):
return cast(JobStatus, status)
return status.status

def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
job_status_info = self._get_job_status(app_id)
state = _ray_status_to_torchx_appstate[job_status_info]
roles = [Role(name="ray", num_replicas=-1, image="<N/A>")]
roles = [Role(name="ray", num_replicas=1, image="<N/A>")]

# get ip_address and put it in hostname

Expand Down Expand Up @@ -354,12 +373,15 @@ def log_iter(
until: Optional[datetime] = None,
should_tail: bool = False,
streams: Optional[Stream] = None,
) -> List[str]:
# TODO: support regex, tailing, streams etc..
) -> Iterable[str]:
# TODO: support tailing, streams etc..
addr, _, app_id = app_id.partition("-")
client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}")
logs: str = client.get_job_logs(app_id)
return split_lines(logs)
iterator = split_lines(logs)
if regex:
return filter_regex(regex, iterator)
return iterator

def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler:
if not has_ray(): # pragma: no cover
Expand Down
2 changes: 2 additions & 0 deletions torchx/schedulers/test/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
actors.json
ray_common.py
ray_driver.py
Loading

0 comments on commit cb1d04a

Please sign in to comment.