Skip to content

Commit

Permalink
fix adapter trainer ext test
Browse files Browse the repository at this point in the history
  • Loading branch information
lenglaender committed Nov 16, 2023
1 parent b54d127 commit 9030a87
Showing 1 changed file with 126 additions and 81 deletions.
207 changes: 126 additions & 81 deletions tests_adapters/extended/test_adapter_trainer_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
import os
import re
import sys
import unittest
from pathlib import Path
from typing import Tuple
from unittest.mock import patch

from parameterized import parameterized
from transformers.integrations import is_fairscale_available

from transformers.testing_utils import (
CaptureStderr,
ExtendSysPath,
TestCasePlus,
backend_device_count,
execute_subprocess_async,
get_gpu_count,
get_torch_dist_unique_port,
require_apex,
require_bitsandbytes,
require_torch,
require_torch_gpu,
require_torch_multi_gpu,
require_torch_non_multi_gpu,
require_torch_multi_accelerator,
require_torch_non_multi_accelerator,
slow,
torch_device,
)
from transformers.trainer_callback import TrainerState
from transformers.trainer_utils import set_seed
from transformers.utils import is_apex_available


bindir = os.path.abspath(os.path.dirname(__file__))
Expand All @@ -49,28 +52,6 @@
MBART_TINY = "sshleifer/tiny-mbart"


# a candidate for testing_utils
def require_fairscale(test_case):
"""
Decorator marking a test that requires fairscale
"""
if not is_fairscale_available():
return unittest.skip("test requires fairscale")(test_case)
else:
return test_case


# a candidate for testing_utils
def require_apex(test_case):
"""
Decorator marking a test that requires apex
"""
if not is_apex_available():
return unittest.skip("test requires apex")(test_case)
else:
return test_case


@require_torch
class TestTrainerExt(TestCasePlus):
def run_seq2seq_quick(
Expand Down Expand Up @@ -109,46 +90,20 @@ def run_seq2seq_quick(
assert isinstance(last_step_stats["eval_bleu"], float)
assert not math.isnan(float(last_step_stats["eval_loss"])), "eval_loss must not be `nan`"

@require_torch_non_multi_gpu
@require_torch_non_multi_accelerator
def test_run_seq2seq_no_dist(self):
self.run_seq2seq_quick()

# verify that the trainer can handle non-distributed with n_gpu > 1
@require_torch_multi_gpu
@require_torch_multi_accelerator
def test_run_seq2seq_dp(self):
self.run_seq2seq_quick(distributed=False)

# verify that the trainer can handle distributed with n_gpu > 1
@require_torch_multi_gpu
@require_torch_multi_accelerator
def test_run_seq2seq_ddp(self):
self.run_seq2seq_quick(distributed=True)

# test --sharded_ddp w/o --fp16
@require_torch_multi_gpu
@require_fairscale
def test_run_seq2seq_sharded_ddp(self):
self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp simple")

# test --sharded_ddp w/ --fp16
@require_torch_multi_gpu
@require_fairscale
def test_run_seq2seq_sharded_ddp_fp16(self):
self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp simple --fp16")

# test --sharded_ddp zero_dp_2 w/o --fp16
@require_torch_multi_gpu
@require_fairscale
def test_run_seq2seq_fully_sharded_ddp(self):
self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp zero_dp_2", predict_with_generate=False)

# test --sharded_ddp zero_dp_2 w/ --fp16
@require_torch_multi_gpu
@require_fairscale
def test_run_seq2seq_fully_sharded_ddp_fp16(self):
self.run_seq2seq_quick(
distributed=True, extra_args_str="--sharded_ddp zero_dp_2 --fp16", predict_with_generate=False
)

@require_apex
@require_torch_gpu
def test_run_seq2seq_apex(self):
Expand All @@ -166,32 +121,32 @@ def test_run_seq2seq_apex(self):
self.run_seq2seq_quick(distributed=True, extra_args_str="--fp16 --fp16_backend=apex")

@parameterized.expand(["base", "low", "high", "mixed"])
@require_torch_multi_gpu
@require_torch_multi_accelerator
def test_trainer_log_level_replica(self, experiment_id):
# as each sub-test is slow-ish split into multiple sub-tests to avoid CI timeout
experiments = dict(
experiments = {
# test with the default log_level - should be info and thus log info once
base=dict(extra_args_str="", n_matches=1),
"base": {"extra_args_str": "", "n_matches": 1},
# test with low log_level and log_level_replica - should be noisy on all processes
# now the info string should appear twice on 2 processes
low=dict(extra_args_str="--log_level debug --log_level_replica debug", n_matches=2),
"low": {"extra_args_str": "--log_level debug --log_level_replica debug", "n_matches": 2},
# test with high log_level and low log_level_replica
# now the info string should appear once only on the replica
high=dict(extra_args_str="--log_level error --log_level_replica debug", n_matches=1),
"high": {"extra_args_str": "--log_level error --log_level_replica debug", "n_matches": 1},
# test with high log_level and log_level_replica - should be quiet on all processes
mixed=dict(extra_args_str="--log_level error --log_level_replica error", n_matches=0),
)
"mixed": {"extra_args_str": "--log_level error --log_level_replica error", "n_matches": 0},
}

data = experiments[experiment_id]
kwargs = dict(distributed=True, predict_with_generate=False, do_eval=False, do_predict=False)
kwargs = {"distributed": True, "predict_with_generate": False, "do_eval": False, "do_predict": False}
log_info_string = "Running training"
with CaptureStderr() as cl:
self.run_seq2seq_quick(**kwargs, extra_args_str=data["extra_args_str"])
n_matches = len(re.findall(log_info_string, cl.err))
self.assertEqual(n_matches, data["n_matches"])

@slow
def test_run_seq2seq_slow(self):
def test_run_seq2seq(self):
output_dir = self.run_trainer(
eval_steps=2,
max_len=128,
Expand All @@ -216,21 +171,105 @@ def test_run_seq2seq_slow(self):
assert "generated_predictions.txt" in contents
assert "predict_results.json" in contents

@slow
@require_bitsandbytes
def test_run_seq2seq_bnb(self):
from transformers.training_args import OptimizerNames

def train_and_return_metrics(optim: str) -> Tuple[int, float]:
extra_args = "--skip_memory_metrics 0"

output_dir = self.run_trainer(
max_len=128,
model_name=MARIAN_MODEL,
learning_rate=3e-4,
num_train_epochs=1,
optim=optim,
distributed=True, # force run in a new process
extra_args_str=extra_args,
do_eval=False,
do_predict=False,
n_gpus_to_use=1, # to allow deterministic fixed memory usage
)

# Check metrics
logs = TrainerState.load_from_json(Path(output_dir, "trainer_state.json")).log_history
gpu_peak_mem_mb = int(logs[0]["train_mem_gpu_peaked_delta"] / 2**20)
gpu_alloc_mem_mb = int(logs[0]["train_mem_gpu_alloc_delta"] / 2**20)

loss = logs[0]["train_loss"]
return gpu_peak_mem_mb, gpu_alloc_mem_mb, loss

gpu_peak_mem_orig, gpu_alloc_mem_orig, loss_orig = train_and_return_metrics(OptimizerNames.ADAMW_TORCH.value)
gpu_peak_mem_bnb, gpu_alloc_mem_bnb, loss_bnb = train_and_return_metrics(OptimizerNames.ADAMW_BNB.value)

gpu_alloc_mem_diff = gpu_alloc_mem_orig - gpu_alloc_mem_bnb

gpu_total_mem_orig = gpu_peak_mem_orig + gpu_alloc_mem_orig
gpu_total_mem_bnb = gpu_peak_mem_bnb + gpu_alloc_mem_bnb
gpu_total_mem_diff = gpu_total_mem_orig - gpu_total_mem_bnb

# sshleifer/student_marian_en_ro_6_1 has 54M parameter, 29M of which is `nn.Embedding` which
# doesn't get quantized and remains in fp32. Therefore we only have 25M parameters quantized
# in 2 bytes and the diff in optim memory usage is derived as so:
#
# - normal 25*8=~200MB (8 bytes per param)
# - bnb 25*2= ~50MB (2 bytes per param)
#
# Thus we should expect ~150MB total memory saved.
#
# Peak memory should be the same - the total should be different by about that same margin
#
# After leaving a small margin to accommodate for differences between gpus let's check
# that we have at least 120MB in savings
expected_savings = 120

# uncomment the following if this test starts failing - requires py38 for a new print feature
# gpu_peak_mem_diff = gpu_peak_mem_orig - gpu_peak_mem_bnb
# print(f"{gpu_alloc_mem_orig=}MB {gpu_peak_mem_orig=}MB {gpu_alloc_mem_orig+gpu_peak_mem_orig=}MB")
# print(f" {gpu_alloc_mem_bnb=}MB {gpu_peak_mem_bnb=}MB {gpu_alloc_mem_bnb+gpu_peak_mem_bnb=}MB")
# print(f"{gpu_alloc_mem_diff=}MB")
# print(f"{gpu_peak_mem_diff=}MB")
# print(f"{gpu_total_mem_orig=}MB, {gpu_total_mem_bnb=}MB")
# print(f"{gpu_total_mem_diff=}MB, {gpu_total_mem_diff=}MB")

self.assertGreater(
gpu_alloc_mem_diff,
expected_savings,
"should use ~150MB less alloc gpu memory with BNB, compared to without it for this model but got"
f" a difference of {gpu_alloc_mem_diff}MB, with gpu_alloc_mem_orig={gpu_alloc_mem_orig}MB and"
f" gpu_alloc_mem_bnb={gpu_alloc_mem_bnb}MB",
)

self.assertGreater(
gpu_total_mem_diff,
expected_savings,
"should use ~150MB less total gpu memory with BNB, compared to without it for this model but got"
f" a difference of {gpu_total_mem_diff}MB, with gpu_total_mem_orig={gpu_total_mem_orig}MB and"
f" gpu_total_mem_bnb={gpu_total_mem_bnb}MB",
)

self.assertEqual(
loss_orig, loss_bnb, f"loss should be the same, but got loss_orig={loss_orig}, loss_bnb={loss_bnb}"
)

def run_trainer(
self,
eval_steps: int,
max_len: int,
model_name: str,
num_train_epochs: int,
learning_rate: float = 3e-3,
optim: str = "adafactor",
distributed: bool = False,
extra_args_str: str = None,
eval_steps: int = 0,
predict_with_generate: bool = True,
do_train: bool = True,
do_eval: bool = True,
do_predict: bool = True,
n_gpus_to_use: int = None,
):
data_dir = "./hf_transformers/tests/fixtures/tests_samples/wmt_en_ro"
data_dir = self.test_file_dir / "../../hf_transformers/tests/fixtures/tests_samples/wmt_en_ro"
output_dir = self.get_auto_remove_tmp_dir()
args_train = f"""
--model_name_or_path {model_name}
Expand All @@ -243,7 +282,6 @@ def run_trainer(
--max_source_length {max_len}
--max_target_length {max_len}
--do_train
--train_adapter
--num_train_epochs {str(num_train_epochs)}
--per_device_train_batch_size 4
--learning_rate {learning_rate}
Expand All @@ -253,26 +291,26 @@ def run_trainer(
--save_steps {str(eval_steps)}
--group_by_length
--label_smoothing_factor 0.1
--adafactor
--target_lang ro_RO
--source_lang en_XX
"""
--train_adapter
""".split()

args_eval = f"""
--do_eval
--train_adapter
--per_device_eval_batch_size 4
--max_eval_samples 8
--val_max_target_length {max_len}
--evaluation_strategy steps
--eval_steps {str(eval_steps)}
"""
--train_adapter
""".split()

args_predict = """
--do_predict
"""
""".split()

args = ""
args = []
if do_train:
args += args_train

Expand All @@ -283,23 +321,30 @@ def run_trainer(
args += args_predict

if predict_with_generate:
args += "--predict_with_generate"
args += "--predict_with_generate".split()

args = args.split()
if do_train:
if optim == "adafactor":
args += "--adafactor".split()
else:
args += f"--optim {optim}".split()

if extra_args_str is not None:
args.extend(extra_args_str.split())
args += extra_args_str.split()

if distributed:
n_gpu = get_gpu_count()
if n_gpus_to_use is None:
n_gpus_to_use = backend_device_count(torch_device)
master_port = get_torch_dist_unique_port()
distributed_args = f"""
-m torch.distributed.launch
--nproc_per_node={n_gpu}
-m torch.distributed.run
--nproc_per_node={n_gpus_to_use}
--master_port={master_port}
{self.examples_dir_str}/pytorch/translation/run_translation.py
""".split()
cmd = [sys.executable] + distributed_args + args
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
else:
testargs = ["run_translation.py"] + args
Expand Down

0 comments on commit 9030a87

Please sign in to comment.