Skip to content

Commit

Permalink
Multirun system (#308)
Browse files Browse the repository at this point in the history
* ROCm changes

* Update ping

* -

* Cleanup the rocm script

* use rocm branch

* -

* Ne wmulti run system

* multinode tweaks

* make sure system config is applied before running

* Update matrix run

* Tweaks

---------

Co-authored-by: Your Name <[email protected]>
  • Loading branch information
Delaunay and Your Name authored Nov 21, 2024
1 parent 0b93d59 commit 491505f
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 37 deletions.
1 change: 0 additions & 1 deletion benchmarks/llm/recipes/full_finetune_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class FullFinetuneRecipeDistributed(FTRecipeInterface):
"""

def __init__(self, cfg: DictConfig) -> None:

import os
self._device = acc.fetch_device(int(os.getenv("LOCAL_RANK", "0")))
self._dtype = utils.get_dtype(cfg.dtype, device=self._device)
Expand Down
2 changes: 1 addition & 1 deletion config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ resnet50-noio:
inherits: _torchvision
voir:
options:
stop: 1000
stop: 500
interval: "1s"

tags:
Expand Down
30 changes: 30 additions & 0 deletions config/examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,33 @@ system:
ip: 192.168.11.13
main: false
user: username




multirun:
runs:
# Force batch size to populate the sizing model
- name: "bs{sizer.batch_size}"
matrix:
sizer.auto: 1
sizer.batch_size: [1, 2, 4, 8, 16, 32, 64, 128]
sizer.save: ["scaling.yaml"]

# Matrix run
- name: "c{sizer.capacity}_m{sizer.multiple}_w{cpu.n_workers}"
matrix:
cpu.auto: 1
cpu.n_workers: [2, 4, 8, 16, 32]
sizer.auto: 1
sizer.capacity: [4Go, 8Go, 16Go, 32Go, 64Go, All]
sizer.multiple: 8
sizer.save: ["scaling.yaml"]

# Auto run
- name: "auto"
matrix:
cpu.auto: 1
sizer.auto: 1
sizer.multiple: 8
sizer.save: ["scaling.yaml"]
2 changes: 2 additions & 0 deletions config/scaling.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ lightning-gpus:
112: 16776.25 MiB
128: 15858 MiB
240: 28942.25 MiB
256: 77822 MiB
504: 54100.25 MiB
616: 93571 MiB
624: 65386.25 MiB
optimized: 16
llama: {}
Expand Down
7 changes: 4 additions & 3 deletions milabench/_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This file is generated, do not modify"""

__tag__ = "v1.0.0_RC1-13-gde92a7e"
__commit__ = "de92a7ea9dea1da24e8105e4566d5e6daef8464c"
__date__ = "2024-10-03 15:48:10 +0000"
__tag__ = "v1.0.0_RC1-18-g784b38e"
__commit__ = "784b38e77b90116047e3de893c22c2f7d3225179"
__date__ = "2024-10-18 15:58:46 +0000"

2 changes: 2 additions & 0 deletions milabench/alt_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def run(argv, setsid=None, process_accumulator=None, info={}, **kwargs):
destroy(*mx.processes)
yield entry

# mx.close()


def proceed(coro):
loop = FeedbackEventLoop()
Expand Down
42 changes: 29 additions & 13 deletions milabench/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ..report import make_report
from ..sizer import MemoryUsageExtractor
from ..summary import make_summary
from ..system import multirun, apply_system, SizerOptions, option


# fmt: off
Expand Down Expand Up @@ -72,26 +73,15 @@ def _fetch_arch(mp):
return None


@tooled
def cli_run(args=None):
"""Run the benchmarks."""
if args is None:
args = arguments()

def run(mp, args, name):
layers = validation_names(args.validations)

dash_class = {
"short": ShortDashFormatter,
"long": LongDashFormatter,
"no": None,
}.get(args.dash, None)

mp = get_multipack(run_name=args.run_name)
arch = _fetch_arch(mp)

# Initialize the backend here so we can retrieve GPU stats
init_arch(arch)


success = run_with_loggers(
mp.do_run(repeat=args.repeat),
loggers=[
Expand Down Expand Up @@ -136,3 +126,29 @@ def cli_run(args=None):
)

return success


@tooled
def cli_run(args=None):
"""Run the benchmarks."""
if args is None:
args = arguments()

# Load the configuration and system
mp = get_multipack(run_name=args.run_name)
arch = _fetch_arch(mp)

# Initialize the backend here so we can retrieve GPU stats
init_arch(arch)

success = 0
for name, conf in multirun():
run_name = name or args.run_name

# Note that this function overrides the system config
mp = get_multipack(run_name=run_name)

with apply_system(conf):
success += run(mp, args, run_name)

return success
8 changes: 7 additions & 1 deletion milabench/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ def _find_node_config(self) -> Dict:
return {}

def is_local(self):
local = self._is_local()
print("is_local", self.host, local)
return local

def _is_local(self):
localnode = self.pack.config["system"]["self"]

if localnode is not None:
Expand Down Expand Up @@ -581,7 +586,7 @@ def node_address(node):
"""Favour Hostname as it is the most consistent name across machines"""
host = node.get("hostname")
ip = node.get("ip")
return host or ip
return ip or hostname


class ForeachNode(ListCommand):
Expand Down Expand Up @@ -637,6 +642,7 @@ def executors(self):
**self.options
)

print(rank, node, node_address(node))
worker = SSHCommand(
host=node_address(node),
user=node["user"],
Expand Down
2 changes: 2 additions & 0 deletions milabench/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def fetch_runs(folder, filter):

runs = []
ignored = 0

for run in os.listdir(folder):
if run.startswith("install") or run.startswith("prepare"):
continue
Expand All @@ -43,6 +44,7 @@ def fetch_runs(folder, filter):
date = retrieve_datetime_from_name(date)
else:
name = run
date = None

if date is None:
date = datetime.fromtimestamp(os.path.getmtime(pth))
Expand Down
10 changes: 7 additions & 3 deletions milabench/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,15 @@ def combine_args(args, kwargs):
yield kwargs
else:
key, values = args.popitem()
for value in values:
kwargs[key] = value

try:
for value in values:
kwargs[key] = value
yield from combine_args(deepcopy(args), kwargs)
except:
kwargs[key] = values
yield from combine_args(deepcopy(args), kwargs)


def expand_matrix(name, bench_config):
if "matrix" not in bench_config:
return [(name, bench_config)]
Expand Down
4 changes: 2 additions & 2 deletions milabench/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ def milabench_remote_setup_plan(pack, setup_for="worker") -> SequenceCommand:

nodes = pack.config["system"]["nodes"]
copy = []
node_packs = []

copy_source = copy_folder(pack, INSTALL_FOLDER, setup_for)

install = []

for i, node in enumerate(nodes):
if should_run_for(node, setup_for):
install.append(pip_install_milabench(node_packs[i], node, INSTALL_FOLDER))
node_pack = worker_pack(pack, node)
install.append(pip_install_milabench(node_pack, node, INSTALL_FOLDER))

return SequenceCommand(
copy_source,
Expand Down
25 changes: 17 additions & 8 deletions milabench/sizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ def to_octet(value: str) -> float:
class Sizer:
"""Automatically scale the batch size to match GPU spec"""

def __init__(self, options=SizerOptions(), scaling_config=None):
self.options = options
def __init__(self, scaling_config=None):
self.path = scaling_config

if scaling_config is None:
scaling_config = default_scaling_config

with open(scaling_config, "r") as sconf:
self.scaling_config = yaml.safe_load(sconf)

@property
def options(self):
return SizerOptions()

def benchscaling(self, benchmark):
# key
Expand Down Expand Up @@ -165,6 +168,10 @@ def find_batch_size(self, benchmark, event):
return -1

def argv(self, benchmark, capacity, argv):
newargv = self._argv(benchmark, capacity, argv)
return newargv

def _argv(self, benchmark, capacity, argv):
"""Find the batch size and override it with a new value"""

config = self.benchscaling(benchmark)
Expand Down Expand Up @@ -214,11 +221,12 @@ def argv(self, benchmark, capacity, argv):


def batch_sizer() -> Sizer:
sizer = sizer_global.get()
if sizer is None:
sizer_global.set(Sizer())
return batch_sizer()
return sizer
return Sizer()
# sizer = sizer_global.get()
# if sizer is None:
# sizer_global.set(Sizer())
# return batch_sizer()
# return sizer


def get_batch_size(config, start_event):
Expand All @@ -242,8 +250,9 @@ class MemoryUsageExtractor(ValidationLayer):
"""Extract max memory usage per benchmark to populate the memory model"""

def __init__(self):

self.filepath = option("sizer.save", str, None)
sizer = batch_sizer()
self.filepath = sizer.options.save
self.memory = deepcopy(sizer.scaling_config)
self.scaling = None
self.benchname = None
Expand Down
63 changes: 61 additions & 2 deletions milabench/system.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextvars
from copy import deepcopy
import ipaddress
import os
import socket
Expand All @@ -15,7 +16,7 @@
from .merge import merge

system_global = contextvars.ContextVar("system", default=None)

multirun_global = contextvars.ContextVar("multirun", default=None)

def get_gpu_capacity(strict=False):
try:
Expand Down Expand Up @@ -79,6 +80,60 @@ def as_environment_variable(name):
return "MILABENCH_" + "_".join(map(str.upper, frags))


def multirun():
multirun = multirun_global.get()

if multirun is None or len(multirun) == 0:
yield None, dict()

runs = multirun.get("runs", dict())

from .config import combine_args
import time
from types import SimpleNamespace

def unflatten(dct):
result = {}
for k, v in dct.items():
l = result
frags = k.split(".")
for frag in frags[:-1]:
l = l.setdefault(frag, SimpleNamespace())
setattr(l, frags[-1], v)

return result

for run_matrix in runs:
arguments = run_matrix["matrix"]

for run in combine_args(arguments, dict()):
template_name = run_matrix["name"]

ctx = unflatten(run)
ctx['time'] = int(time.time())
run_name = template_name.format(**ctx)

yield run_name, run


@contextmanager
def apply_system(config: dict):
system = system_global.get()
old = deepcopy(system)

for k, v in config.items():
frags = k.split(".")

lookup = system.setdefault("options", {})
for f in frags[:-1]:
lookup = lookup.setdefault(f, {})
lookup[frags[-1]] = v


yield
system_global.set(old)


def option(name, etype, default=None):
options = dict()
system = system_global.get()
Expand Down Expand Up @@ -401,11 +456,12 @@ def gethostname(host):
def resolve_hostname(ip):
try:
hostname, _, iplist = socket.gethostbyaddr(ip)

for ip in iplist:
if is_loopback(ip):
return hostname, True

# FIXME
return socket.gethostname(), hostname.startswith(socket.gethostname())
return hostname, hostname == socket.gethostname()

Expand Down Expand Up @@ -465,6 +521,9 @@ def build_system_config(config_file, defaults=None, gpu=True):
config = merge(defaults, config)

system = config.get("system", {})
multirun = config.get("multirun", {})

multirun_global.set(multirun)
system_global.set(system)

# capacity is only required if batch resizer is enabled
Expand Down
Loading

0 comments on commit 491505f

Please sign in to comment.