Skip to content

Commit

Permalink
release code for v0.7.0
Browse files Browse the repository at this point in the history
Merge pull request !10 from yh_cc/auto-1850475-master-1634268275819
  • Loading branch information
yh_cc authored and gitee-org committed Oct 15, 2021
2 parents 099a144 + 9881258 commit 3cb01d1
Show file tree
Hide file tree
Showing 31 changed files with 484 additions and 92 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
![Hex.pm](https://img.shields.io/hexpm/l/plug.svg)
[![Documentation Status](https://readthedocs.org/projects/fastnlp/badge/?version=latest)](http://fastnlp.readthedocs.io/?badge=latest)

fastNLP是一款轻量级的自然语言处理(NLP)工具包,目标是快速实现NLP任务以及构建复杂模型。
fastNLP是一款面向自然语言处理(NLP)的轻量级框架,目标是快速实现NLP任务以及构建复杂模型。

fastNLP具有如下的特性:

Expand Down
6 changes: 4 additions & 2 deletions fastNLP/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ def __getitem__(self, idx):
if idx.start is not None and (idx.start >= len(self) or idx.start <= -len(self)):
raise RuntimeError(f"Start index {idx.start} out of range 0-{len(self) - 1}")
data_set = DataSet()
for field in self.field_arrays.values():
data_set.add_field(field_name=field.name, fields=field.content[idx], padder=field.padder,
for field_name, field in self.field_arrays.items():
data_set.add_field(field_name=field_name, fields=field.content[idx], padder=field.padder,
is_input=field.is_input, is_target=field.is_target, ignore_type=field.ignore_type)
data_set.collater = self.collater.copy_from(self.collater)
return data_set
Expand Down Expand Up @@ -616,6 +616,7 @@ def add_fieldarray(self, field_name, fieldarray):
if len(self) != len(fieldarray):
raise RuntimeError(f"The field to add must have the same size as dataset. "
f"Dataset size {len(self)} != field size {len(fieldarray)}")
fieldarray.name = field_name
self.field_arrays[field_name] = fieldarray

def add_field(self, field_name, fields, padder=AutoPadder(), is_input=False, is_target=False, ignore_type=False):
Expand Down Expand Up @@ -673,6 +674,7 @@ def copy_field(self, field_name, new_field_name):
if not self.has_field(field_name):
raise KeyError(f"Field:{field_name} not found in DataSet.")
fieldarray = deepcopy(self.get_field(field_name))
fieldarray.name = new_field_name
self.add_fieldarray(field_name=new_field_name, fieldarray=fieldarray)
return self

Expand Down
161 changes: 92 additions & 69 deletions fastNLP/core/dist_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .utils import _build_fp16_env
from .utils import _get_func_signature
from .utils import _move_dict_value_to_device
from .sampler import Sampler

__all__ = [
'get_local_rank',
Expand All @@ -54,7 +55,7 @@ def get_local_rank():
raise RuntimeError('Please use "python -m torch.distributed.launch --nproc_per_node=N train_script.py')


class DistTrainer():
class DistTrainer:
r"""
分布式的 Trainer,支持分布式训练和混合精度的训练。具体实现原理请阅读 pytorch 官方文档。
Expand All @@ -68,11 +69,11 @@ def __init__(self, train_data, model, optimizer=None, loss=None,
dev_data=None, metrics=None, metric_key=None,
update_every=1, print_every=10, validate_every=-1,
save_path=None, device='auto',
fp16=False, use_tqdm=True, **kwargs):
fp16=False, use_tqdm=True, sampler=None, **kwargs):
r"""
:param train_data: 训练集, :class:`~fastNLP.DataSet` 类型。
:param nn.modules model: 待训练的模型
:param nn.modules, DDP model: 待训练的模型
:param optimizer: `torch.optim.Optimizer` 优化器。如果为None,则Trainer使用默认的Adam(model.parameters(), lr=4e-3)这个优化器
:param loss: 使用的 :class:`~fastNLP.core.losses.LossBase` 对象。当为None时,默认使用 :class:`~fastNLP.LossInForward`
:param list callbacks_all: 用于在train过程中起调节作用的回调函数,作用于所有训练进程中。
Expand Down Expand Up @@ -101,13 +102,18 @@ def __init__(self, train_data, model, optimizer=None, loss=None,
:param str device: 指定 device,可以是 gpu,cpu 或 auto
:param bool fp16: 指定是否使用半精度训练。
:param bool use_tqdm: 是否使用tqdm来显示训练进度; 如果为False,则将loss打印在终端中。
:param Sampler sampler: 使用的sampler,如果不指定,默认使用的DistributedSampler。使用这个参数的情况一般为,明确修改了每个
rank的Dataset,使得每个rank上的dataset虽然sample数量一样多,但是sample其实不一样。
:param kwargs: 支持配置可选参数
bool test_use_tqdm: 在dev上验证的时候是否开启tqdm
Sampler test_sampler: 在evaluate的时候使用的sampler
int dev_batch_size: 在evaluate时,使用的evaluate的batch大小
bool test_use_fp16: test时使用fp16
bool set_grad_to_none: zero_grad时将grad设为None而不是0
GradScaler gradscaler: 自定义的梯度 scaler
GradScaler grad_scaler: 自定义的梯度 scaler
bool pin_memory: 是否将产生的tensor使用pin memory, 可能会加快数据速度。一般在tensor较多或tensor维度较大时,有速度增益。
bool find_unused_parameters: 在将model转化为DistributedDataParallel类型的时候,需要填入该参数,除非model内确实有
forward没用上的参数,否则应该不需要用到该参数。
"""
assert device in ['auto', 'cuda', 'cpu'], "Please set correct device in [auto', 'cuda', 'cpu']"
if device == 'auto':
Expand All @@ -126,6 +132,9 @@ def __init__(self, train_data, model, optimizer=None, loss=None,
self.rank = dist.get_rank() # unique id for each process

self.train_data = train_data
self.kwargs = kwargs
if kwargs.get('batch_size', None):
batch_size_per_gpu = int(kwargs.get('batch_size'))
self.batch_size_per_gpu = int(batch_size_per_gpu)
self.n_epochs = int(n_epochs)
self.num_data_workers = int(num_workers)
Expand All @@ -137,61 +146,74 @@ def __init__(self, train_data, model, optimizer=None, loss=None,
self.losser = _prepare_losser(loss)
self.fp16 = fp16
self.local_rank = get_local_rank()
self._forward_func = model.forward
self.callback_manager = DistCallbackManager(
env={"trainer": self}, callbacks_all=callbacks_all,
callbacks_master=callbacks_master)
self.test_manager = DistCallbackManager(env={'trainer': self})
self.metric_key = metric_key
self.use_tqdm = use_tqdm

model.to(self.device)

# init fp16, must before DataParallel init
autocast, GradScaler = _build_fp16_env(dummy=not self.fp16)
self.auto_cast = autocast
user_grad_scaler = getattr(kwargs, 'gradscaler', None)
user_grad_scaler = kwargs.get('grad_scaler', None)
if user_grad_scaler is not None:
assert self.fp16, "must set fp16=True to enable gradscaler"
assert self.fp16, "must set fp16=True to enable grad_scaler"
grad_scaler = user_grad_scaler
else:
grad_scaler = GradScaler()
self.grad_scaler = grad_scaler

self.set_grad_to_none = getattr(kwargs, 'set_grad_to_none', True)

self.set_grad_to_none = kwargs.get('set_grad_to_none', False)
# init DataParallel
if parse_version(torch.__version__)>=parse_version('1.1'):
self.ddp_model = DDP(model, device_ids=[self.local_rank],
output_device=self.local_rank, find_unused_parameters=True)
if isinstance(model, DDP):
self.ddp_model = model
else:
self.ddp_model = DDP(model, device_ids=[self.local_rank],
output_device=self.local_rank)
model.to(self.device)
if parse_version(torch.__version__)>=parse_version('1.1'):
self.ddp_model = DDP(model, device_ids=[self.local_rank],
output_device=self.local_rank,
find_unused_parameters=kwargs.get('find_unused_parameters', False))
else:
self.ddp_model = DDP(model, device_ids=[self.local_rank],
output_device=self.local_rank)
self.model = self.ddp_model.module

self._forward_func = self.model.forward
self.model.to(self.device)

optimizer = self._get_optimizer(optimizer)
self.optimizer = optimizer
if isinstance(self.train_data, DataSet):
self.sampler = DistributedSampler(self.train_data)
if sampler is None:
self.sampler = DistributedSampler(self.train_data)
else:
# sampler check
if sampler is not None and not isinstance(sampler, (Sampler, torch.utils.data.Sampler)):
raise ValueError(
f"The type of sampler should be fastNLP.BaseSampler or pytorch's Sampler, got {type(sampler)}")
elif hasattr(sampler, 'set_batch_size'):
sampler.set_batch_size(batch_size_per_gpu)
self.sampler = sampler
# concerning issue from https://github.com/pytorch/pytorch/issues/57273
self.pin_memory = kwargs.get('pin_memory', False if parse_version(torch.__version__)==parse_version('1.9') else True)
self.data_iterator = self._get_data_iter(self.train_data)
self.batch_size = self.world_size * self.batch_size_per_gpu
self.n_steps = self._get_n_steps()

self.dev_data = dev_data
self.metrics = metrics
self.test_use_tqdm = True
self.kwargs = kwargs
self.test_use_tqdm = kwargs.get('test_use_tqdm', self.use_tqdm)
dev_batch_size = kwargs.get('dev_batch_size', batch_size_per_gpu)

# for evaluation, only run eval on master proc
if dev_data and metrics:
cb = _TesterCallback(
dev_data, model, metrics,
dev_data, self.model, metrics,
batch_size=dev_batch_size, num_workers=num_workers, sampler=kwargs.get('test_sampler', None),
use_tqdm=self.test_use_tqdm)
self.test_manager.add_callback([cb], master=True)

# Setup logging
# 同步start_time
sync_time = torch.tensor(time.time(), dtype=torch.double).to(self.device)
Expand All @@ -211,29 +233,14 @@ def __init__(self, train_data, model, optimizer=None, loss=None,
self.logger.info("Num of processes: {}".format(self.world_size))
self.logger.info("Use device: {}".format(device))

def _maybe_no_sync(self):
"""
Whenever *samples* contains more than one mini-batch, we
want to accumulate gradients locally and only call
all-reduce in the last backwards pass.
"""
i = self.step % self.update_every
if (
self.world_size > 1
and hasattr(self.ddp_model, "no_sync")
and i != 0
):
return self.ddp_model.no_sync()
else:
return contextlib.ExitStack() # dummy contextmanager

def _get_n_steps(self):
return len(self.data_iterator) * self.n_epochs

def _get_data_iter(self, dataset):
if isinstance(dataset, DataSet):
return DataSetIter(dataset=dataset, batch_size=self.batch_size_per_gpu, sampler=self.sampler,
num_workers=self.num_data_workers, drop_last=self.drop_last)
num_workers=self.num_data_workers, drop_last=self.drop_last,
pin_memory=self.pin_memory)
elif isinstance(dataset, BatchIter):
return dataset
else:
Expand Down Expand Up @@ -339,45 +346,50 @@ def _train(self):
avg_loss = 0
data_iterator = self.data_iterator
self.ddp_model.zero_grad()
self.batch_per_epoch = self.data_iterator.num_batches
for epoch in range(1, self.n_epochs + 1):
self.epoch = epoch
pbar.set_description_str(desc="Epoch {}/{}".format(epoch, self.n_epochs))
# early stopping
self.callback_manager.on_epoch_begin()
for batch_x, batch_y in data_iterator:
self.step += 1
self.ddp_model.train()
_move_dict_value_to_device(batch_x, batch_y, device=self.device)
indices = data_iterator.get_batch_indices()
# negative sampling; replace unknown; re-weight batch_y
self.callback_manager.on_batch_begin(batch_x, batch_y, indices)
with self.auto_cast():
prediction = self._data_forward(self.ddp_model, batch_x)
# edit prediction
self.callback_manager.on_loss_begin(batch_y, prediction)
loss = self._compute_loss(prediction, batch_y)

avg_loss += loss.detach()

# Is loss NaN or inf? requires_grad = False
self.callback_manager.on_backward_begin(loss)
self.grad_scaler.scale(loss).backward()
self.callback_manager.on_backward_end()
if self.step % self.update_every == 0:
if self.step%self.update_every!=0:
no_sync = self.ddp_model.no_sync
else:
no_sync = contextlib.ExitStack
with no_sync():
self.ddp_model.train()
_move_dict_value_to_device(batch_x, batch_y, device=self.device, non_blocking=self.pin_memory)
indices = data_iterator.get_batch_indices()
# negative sampling; replace unknown; re-weight batch_y
self.callback_manager.on_batch_begin(batch_x, batch_y, indices)
with self.auto_cast():
prediction = self._data_forward(self.ddp_model, batch_x)
# edit prediction
self.callback_manager.on_loss_begin(batch_y, prediction)
loss = self._compute_loss(prediction, batch_y)

avg_loss += loss.detach()

# Is loss NaN or inf? requires_grad = False
self.callback_manager.on_backward_begin(loss)
self._grad_backward(loss)
self.callback_manager.on_backward_end()
self._update()
self.callback_manager.on_step_end()
self.callback_manager.on_step_end()

if self.step % self.print_every == 0:
avg_loss = float(avg_loss) / self.print_every
print_output = "loss:{:<6.5f}".format(avg_loss)
pbar.update(self.print_every)
pbar.set_postfix_str(print_output)
avg_loss = 0
if self.step % self.print_every == 0:
avg_loss = float(avg_loss) / self.print_every
print_output = "loss:{:<6.5f}".format(avg_loss)
pbar.update(self.print_every)
pbar.set_postfix_str(print_output)
avg_loss = 0

self.callback_manager.on_batch_end()
self.callback_manager.on_batch_end()

if (self.validate_every > 0 and self.step % self.validate_every == 0) and len(self.test_manager.callbacks):
self._do_validation()
if (self.validate_every > 0 and self.step % self.validate_every == 0) and len(self.test_manager.callbacks):
self._do_validation()

# ================= mini-batch end ==================== #
if self.validate_every < 0 and len(self.test_manager.callbacks):
Expand All @@ -390,7 +402,7 @@ def _train(self):
self.pbar = None
# ============ tqdm end ============== #

def _clear_grad_opt(self, optimizer):
def _clear_grad(self, optimizer):
if self.set_grad_to_none:
for group in optimizer.param_groups:
for p in group['params']:
Expand All @@ -399,13 +411,24 @@ def _clear_grad_opt(self, optimizer):
else:
optimizer.zero_grad()

def _grad_backward(self, loss):
r"""Compute gradient with link rules.
:param loss: a scalar where back-prop starts
For PyTorch, just do "loss.backward()"
"""
if (self.step-1) % self.update_every == 0:
self._clear_grad(self.optimizer)
self.grad_scaler.scale(loss).backward()

def _update(self):
r"""Perform weight update on a model.
"""
self.grad_scaler.step(self.optimizer)
self.grad_scaler.update()
self._clear_grad_opt(self.optimizer)
if self.step % self.update_every == 0:
self.grad_scaler.step(self.optimizer)
self.grad_scaler.update()

def _data_forward(self, network, x):
x = _build_args(self._forward_func, **x)
Expand Down
4 changes: 2 additions & 2 deletions fastNLP/core/losses.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class BCEWithLogits(LossBase):
:param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred`
:param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target`
:param int class_in_dim: 在序列标注的场景中,pred可能的shape为(batch_size, max_len, num_classes)
或(batch_size, num_classes, max_len), CrossEntropyLoss需要知道哪一维是class的维度以计算loss。如果为-1,就根据pred的第
或(batch_size, num_classes, max_len), BCEWithLogits需要知道哪一维是class的维度以计算loss。如果为-1,就根据pred的第
二维是否等于target的第二维来判断是否需要交换pred的第二维和第三维,因为target的第二维是length的维度,如果这一维度上和pred相等,
那么pred可能第二维也是长度维(存在误判的可能,如果有误判的情况,请显示设置该值)。其它大于0的值则认为该维度是class的维度。
:param str reduction: 支持 `mean` ,`sum` 和 `none` .
Expand All @@ -340,7 +340,7 @@ def get_loss(self, pred, target):
pred = pred.transpose(1, 2)
else:
pred = pred.transpose(-1, self.class_in_dim)
pred = pred.reshape(-1, pred.size(-1))
pred = pred.reshape(-1)
target = target.reshape(-1)

return F.binary_cross_entropy_with_logits(input=pred, target=target, reduction=self.reduction)
Expand Down
Loading

0 comments on commit 3cb01d1

Please sign in to comment.