Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker resolution #225

Merged
merged 10 commits into from
Jun 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add metric gathering utility for batch x worker run
pierre.delaunay committed Jun 14, 2024

Verified

This commit was signed with the committer’s verified signature.
vxern Dorian M. Oszczęda
commit 1fad24ca7e02b65e9591c21ee3a0d28ff33e65b9
124 changes: 124 additions & 0 deletions milabench/cli/gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import argparse
import os
import re
from dataclasses import dataclass, field

import pandas as pd

from ..common import _read_reports
from ..report import make_dataframe, pandas_to_string
from ..summary import make_summary


def default_tags():
return [
"worker=w([a-z0-9]*)",
"multiple=m([0-9]*)",
"power=p([0-9]*)",
"capacity=c([A-Za-z0-9]*(Go)?)",
]


# fmt: off
@dataclass
class Arguments:
runs: str
tags: list = field(default_factory=default_tags)
# fmt: on


def arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"--runs",
type=str,
help="Run folder",
default="/home/mila/d/delaunap/batch_x_worker/",
)
parser.add_argument(
"--tags",
type=str,
help="Tags defined in run names",
default=default_tags(),
)
return parser.parse_args() # Arguments()


def get_config(reports):
k = list(reports.keys())[0]
config = None
for line in reports[k]:
if line["event"] == "config":
config = line["data"]
break
return config


def extract_tags(name, tags):
for tag, pat in tags.items():
if m := pat.search(name):
value = m.group(1)
yield tag, value
else:
print(f"{tag} not found in {name}")
yield tag, "NA"


def gather_cli(args=None):
"""Gather metrics from runs inside a folder in a neat format.
It can extract tags/flags from the runname and create new columns to uniquely identify runs.

Examples
--------

>>> python -m milabench.cli.gather --runs /home/mila/d/delaunap/batch_x_worker/
bench | fail | n | perf | sem% | std% | peak_memory | score | weight | elapsed | name | worker | multiple | power | capacity
brax | 0 | 1 | 722480.33 | 0.7% | 5.2% | 6448 | 722480.33 | 1.00 | 94 | w16-m8-c4Go | 16 | 8 | NA | 4Go
dlrm | 0 | 1 | 350641.30 | 0.6% | 4.6% | 7624 | 350641.30 | 1.00 | 124 | w16-m8-c4Go | 16 | 8 | NA | 4Go
....
brax | 0 | 1 | 723867.42 | 0.6% | 4.5% | 6448 | 723867.42 | 1.00 | 94 | w2-m8-c8Go | 2 | 8 | NA | 8Go
dlrm | 0 | 1 | 403113.36 | 0.7% | 5.1% | 7420 | 403113.36 | 1.00 | 258 | w2-m8-c8Go | 2 | 8 | NA | 8Go
bf16 | 0 | 8 | 293.08 | 0.3% | 7.5% | 5688 | 2361.09 | 0.00 | 18 | w2-m8-c8Go | 2 | 8 | NA | 8Go
fp16 | 0 | 8 | 290.58 | 0.2% | 4.9% | 5688 | 2335.63 | 0.00 | 29 | w2-m8-c8Go | 2 | 8 | NA | 8Go

"""
if args is None:
args = arguments()

runs = []
for folder in os.listdir(args.runs):
if folder.startswith("prepare"):
continue

if folder.startswith("install"):
continue

path = f"{args.runs}/{folder}"
if os.path.isdir(path):
runs.append(path)

tags = dict()
for tag in args.tags:
name, regex = tag.split("=")
tags[name] = re.compile(regex)

query = ("batch_size", "elapsed")
data = []
for run in runs:
reports = _read_reports(run)
summary = make_summary(reports.values(), query=query)
df = make_dataframe(summary, None, None, query=query)

name = run.split("/")[-1]
df["name"] = name.split(".", maxsplit=1)[0]
for tag, value in extract_tags(name, tags):
df[tag] = value

data.append(df)

gathered = pd.concat(data)
print(pandas_to_string(gathered))


if __name__ == "__main__":
gather_cli()
17 changes: 12 additions & 5 deletions milabench/cli/matrix.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from dataclasses import dataclass
import sys
from dataclasses import dataclass

import yaml
from coleo import Option, tooled

from ..system import build_system_config
from ..common import deduce_arch, build_config, get_base_defaults, merge, is_selected
from ..common import (
build_config,
deduce_arch,
get_base_defaults,
is_selected,
merge,
)
from ..sizer import resolve_argv, scale_argv
from ..system import build_system_config


# fmt: off
@@ -78,13 +84,14 @@ def cli_matrix_run(args=None):

def resolve_args(conf, argv):
from ..pack import Package

pack = Package(conf)

args = []
for k, v in argv.items():
args.append(k)
args.append(v)

sized_args = scale_argv(pack, args)
final_args = resolve_argv(pack, sized_args)

@@ -94,7 +101,7 @@ def resolve_args(conf, argv):
argv[k] = final_args[i + 1]
i += 2
continue

print(f"Missing resolved argument {k}")

return argv
2 changes: 1 addition & 1 deletion milabench/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -666,7 +666,7 @@ def _argv(self, **_) -> List:

resolver = new_argument_resolver(self.pack)

cpu_per_process = resolver(str(self.pack.config['argv']['--cpus_per_gpu']))
cpu_per_process = resolver(str(self.pack.config["argv"]["--cpus_per_gpu"]))
return [
# -- Run the command in the right venv
# This could be inside the SSH Command
2 changes: 1 addition & 1 deletion milabench/common.py
Original file line number Diff line number Diff line change
@@ -15,14 +15,14 @@
from milabench.alt_async import proceed
from milabench.utils import available_layers, blabla, multilogger

from .system import build_system_config
from .config import build_config
from .fs import XPath
from .log import TerminalFormatter
from .merge import merge
from .multi import MultiPackage
from .report import make_report
from .summary import aggregate, make_summary
from .system import build_system_config


def get_pack(defn):
3 changes: 0 additions & 3 deletions milabench/config.py
Original file line number Diff line number Diff line change
@@ -4,14 +4,12 @@
from copy import deepcopy

import psutil
from copy import deepcopy
import yaml
from omegaconf import OmegaConf

from .fs import XPath
from .merge import merge


config_global = contextvars.ContextVar("config", default=None)


@@ -112,7 +110,6 @@ def build_matrix_bench(all_configs):

for name, bench_config in all_configs.items():
for k, v in expand_matrix(name, bench_config):

if k in expanded_config:
raise ValueError("Bench name is not unique")

15 changes: 8 additions & 7 deletions milabench/log.py
Original file line number Diff line number Diff line change
@@ -145,10 +145,11 @@ def __call__(self, entry):
pass

elif event == "config":

def _show(k, entry):
if k in ("meta", "system"):
return

if isinstance(entry, dict):
for k2, v in entry.items():
_show(f"{k}.{k2}", v)
@@ -302,9 +303,9 @@ def on_data(self, entry, data, row):
load = int(data.get("load", 0) * 100)
currm, totalm = data.get("memory", [0, 0])
temp = int(data.get("temperature", 0))
row[f"gpu:{gpuid}"] = (
f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C"
)
row[
f"gpu:{gpuid}"
] = f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C"
row["gpu_load"] = f"{load}%"
row["gpu_mem"] = f"{currm:.0f}/{totalm:.0f} MB"
row["gpu_temp"] = f"{temp}C"
@@ -378,9 +379,9 @@ def on_data(self, entry, data, row):
load = int(data.get("load", 0) * 100)
currm, totalm = data.get("memory", [0, 0])
temp = int(data.get("temperature", 0))
row[f"gpu:{gpuid}"] = (
f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C"
)
row[
f"gpu:{gpuid}"
] = f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C"
else:
task = data.pop("task", "")
units = data.pop("units", "")
114 changes: 72 additions & 42 deletions milabench/report.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@


@error_guard({})
def _make_row(summary, compare, weights):
def _make_row(summary, compare, weights, query=None):
mkey = "train_rate"
metric = "mean"
row = {}
@@ -55,6 +55,12 @@ def _make_row(summary, compare, weights):
row["weight"] = weights.get("weight", summary.get("weight", 0))
# ----

if query is not None:
extra = summary.get("extra", dict())
for q in query:
if v := extra.get(q):
row[q] = v

return row


@@ -185,33 +191,50 @@ def _report_pergpu(entries, measure="50"):
}


def make_dataframe(summary, compare=None, weights=None):
def make_dataframe(summary, compare=None, weights=None, query=None):
if weights is None:
weights = dict()

all_keys = list(
sorted(
{
*(summary.keys() if summary else []),
*(compare.keys() if compare else []),
*(weights.keys() if weights else []),
}
)
{
*(summary.keys() if summary else []),
*(compare.keys() if compare else []),
*(weights.keys() if weights else []),
}
)

def sort_by(key):
"""Group similar runs together"""
if weights:
return weights[key]["group"]

if summary:
return summary[key]["group"]

return key

# Sort by name first so bench with similar names are together
# we want bench in the same group with similar names to be close
all_keys = sorted(all_keys)

# Sort by group so bench are grouped together
# we want flops bench to be close together no matter what their names are
all_keys = sorted(all_keys, key=sort_by)

df = DataFrame(
{
key: _make_row(
summary.get(key, {}),
compare and compare.get(key, {}),
weights and weights.get(key, {}),
query=query,
)
for key in all_keys
}
).transpose()

# Reorder columns
df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 0))]
df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 2000))]

return df

@@ -301,37 +324,6 @@ def _score(column):
out.finalize()


def pandas_to_string(df, formatters):
"""Default stdout printer does not insert a column sep which makes it hard to retranscribe results elsewhere.
to_csv does not align the output.
"""
from collections import defaultdict

columns = df.columns.tolist()

sep = " | "
lines = []
col_size = defaultdict(int)

for index, row in df.iterrows():
line = [f"{index:<30}"]
for col, val in zip(columns, row):
fmt = formatters.get(col)
val = fmt(val)
col_size[col] = max(col_size[col], len(val))
line.append(val)

lines.append(sep.join(line))

def fmtcol(col):
size = col_size[col]
return f"{col:>{size}}"

header = sep.join([f"{'bench':<30}"] + [fmtcol(col) for col in columns])

return "\n".join([header] + lines)


_formatters = {
"fail": "{:4.0f}".format,
"n": "{:3.0f}".format,
@@ -347,8 +339,10 @@ def fmtcol(col):
"sem%": "{:6.1%}".format,
"iqr%": "{:6.1%}".format,
"score": "{:10.2f}".format,
"weight": "{:5.2f}".format,
"weight": "{:6.2f}".format,
"peak_memory": "{:11.0f}".format,
"elapsed": "{:5.0f}".format,
"batch_size": "{:3.0f}".format,
0: "{:.0%}".format,
1: "{:.0%}".format,
2: "{:.0%}".format,
@@ -368,6 +362,42 @@ def fmtcol(col):
}


def pandas_to_string(df, formatters=_formatters):
"""Default stdout printer does not insert a column sep which makes it hard to retranscribe results elsewhere.
to_csv does not align the output.
"""
from collections import defaultdict

columns = df.columns.tolist()

sep = " | "
lines = []
col_size = defaultdict(int)

for index, row in df.iterrows():
line = [f"{index:<30}"]
for col, val in zip(columns, row):
fmt = formatters.get(col)

if fmt is not None:
val = fmt(val)
col_size[col] = max(col_size[col], len(val))
else:
val = str(val)

line.append(val)

lines.append(sep.join(line))

def fmtcol(col):
size = col_size[col]
return f"{col:>{size}}"

header = sep.join([f"{'bench':<30}"] + [fmtcol(col) for col in columns])

return "\n".join([header] + lines)


_table_style = H.style(
"""
body {
38 changes: 32 additions & 6 deletions milabench/sizer.py
Original file line number Diff line number Diff line change
@@ -2,14 +2,13 @@
import multiprocessing
import os
from copy import deepcopy
import multiprocessing

import numpy as np
import yaml
from voir.instruments.gpu import get_gpu_info

from .merge import merge
from .system import system_global, SizerOptions, CPUOptions
from .system import CPUOptions, SizerOptions, system_global
from .validation.validation import ValidationLayer

ROOT = os.path.dirname(__file__)
@@ -70,7 +69,7 @@ def benchscaling(self, benchmark):

# benchmark config
if isinstance(benchmark, dict) and "name" in benchmark:
return benchmark
return self.scaling_config.get(benchmark["name"])

# pack
return self.scaling_config.get(benchmark.config["name"])
@@ -143,6 +142,27 @@ def size(self, benchmark, capacity):

return None

def find_batch_size(self, benchmark, event):
config = self.benchscaling(benchmark)

if config is None:
return None

argname = config.get("arg")
if argname is None:
return -1

if "event" in event:
event = event["data"]

argv = event["command"]

for i, arg in enumerate(argv):
if str(arg).endswith(argname):
return int(argv[i + 1])

return -1

def argv(self, benchmark, capacity, argv):
"""Find the batch size and override it with a new value"""

@@ -183,6 +203,11 @@ def batch_sizer() -> Sizer:
return sizer


def get_batch_size(config, start_event):
sizer = batch_sizer()
return sizer.find_batch_size(config, start_event)


def scale_argv(pack, argv):
sizer = batch_sizer()

@@ -276,8 +301,8 @@ def on_end(self, entry):
def report(self, *args):
if self.filepath is not None:
newdata = self.memory
if os.path.exists(self.filepath):

if os.path.exists(self.filepath):
with open(self.filepath, "r") as fp:
previous_data = yaml.safe_load(fp)
newdata = merge(previous_data, self.memory)
@@ -290,7 +315,7 @@ def new_argument_resolver(pack):
context = deepcopy(system_global.get())
arch = context.get("arch", "cpu")

if hasattr(pack, 'config'):
if hasattr(pack, "config"):
device_count = len(pack.config.get("devices", [0]))
else:
device_count = len(get_gpu_info()["gpus"])
@@ -301,6 +326,7 @@ def new_argument_resolver(pack):
device_count = 1

options = CPUOptions()

def auto(value, default):
if options.enabled:
return value
31 changes: 28 additions & 3 deletions milabench/summary.py
Original file line number Diff line number Diff line change
@@ -135,8 +135,28 @@ def _metrics(xs):
return metrics


@error_guard(dict())
def augment(group, query=tuple([])):
"""Optional augmentation steps that will add additional data.
Usually extracted from the run itself
"""
data = {}

if "batch_size" in query:
from .sizer import get_batch_size

data["batch_size"] = get_batch_size(group["config"], group["start"])

if "elapsed" in query:
start_time = group["start"]["time"]
end_time = group["end"]["time"]
data["elapsed"] = end_time - start_time

return data


@error_guard(None)
def _summarize(group):
def _summarize(group, query):
agg = group["data"]
gpudata = defaultdict(lambda: defaultdict(list))

@@ -152,8 +172,12 @@ def _summarize(group):
per_gpu[device].append(tr)

config = group["config"]

additional = augment(group, query)

return {
"name": config["name"],
"group": config["group"],
"n": len(agg["success"]),
"successes": sum(agg["success"]),
"failures": sum(not x for x in agg["success"]),
@@ -170,12 +194,13 @@ def _summarize(group):
for device, data in gpudata.items()
},
"weight": config.get("weight", 0),
"extra": additional,
}


def make_summary(runs):
def make_summary(runs, query=None):
aggs = [agg for run in runs if (agg := aggregate(run))]
classified = _classify(aggs)
merged = {name: _merge(runs) for name, runs in classified.items()}
summarized = {name: _summarize(agg) for name, agg in merged.items()}
summarized = {name: _summarize(agg, query) for name, agg in merged.items()}
return summarized
20 changes: 12 additions & 8 deletions milabench/system.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextvars
from dataclasses import dataclass, field
import os
import socket
from dataclasses import dataclass, field

import psutil
import yaml
@@ -27,20 +27,23 @@ def getenv(name, expected_type):

def print_once(*args, **kwargs):
printed = 0

def _print():
nonlocal printed
if printed == 0:
print(*args, **kwargs)
printed += 1

return _print


warn_no_config = print_once("No system config found, using defaults")


def option(name, etype, default=None):
options = dict()
system = system_global.get()

if system:
options = system.get("options", dict())
else:
@@ -59,7 +62,7 @@ def option(name, etype, default=None):

if final_value is None:
return None

try:
value = etype(final_value)
lookup[frags[-1]] = value
@@ -75,6 +78,7 @@ def is_autoscale_enabled():

def default_save_location():
from pathlib import Path

return Path.home() / "new_scaling.yaml"


@@ -97,13 +101,13 @@ class CPUOptions:
cpu_max: int = option("cpu.max", int, 16)

# min number of CPU per GPU
cpu_min: int = option("cpu.min", int, 2)
cpu_min: int = option("cpu.min", int, 2)

# reserved CPU cores (i.e not available for the benchmark)
reserved_cores: int = option("cpu.reserved_cores", int, 0)
reserved_cores: int = option("cpu.reserved_cores", int, 0)

# Number of workers (ignores cpu_max and cpu_min)
n_workers: int = option("cpu.n_workers", int)
n_workers: int = option("cpu.n_workers", int)


@dataclass
@@ -130,8 +134,8 @@ class Nodes:
class SystemConfig:
arch: str = getenv("MILABENCH_GPU_ARCH", str)
sshkey: str = None
docker_image: str = None
nodes : list[Nodes] = field(default_factory=list)
docker_image: str = None
nodes: list[Nodes] = field(default_factory=list)
gpu: GPUConfig = None
options: Options = None