From 974c9ef9a8881509f0e9499af79e0a816130f0e8 Mon Sep 17 00:00:00 2001 From: Sebastian Hoffmann Date: Fri, 27 Dec 2024 11:25:19 +0100 Subject: [PATCH] fix: unit tests --- dmlcloud/core/stage.py | 2 +- test/conftest.py | 7 +- test/test_data.py | 2 +- test/test_metrics.py | 208 ----------------------------------------- test/test_smoke.py | 53 ++++++----- 5 files changed, 38 insertions(+), 234 deletions(-) delete mode 100644 test/test_metrics.py diff --git a/dmlcloud/core/stage.py b/dmlcloud/core/stage.py index 32ef480..22cd41d 100644 --- a/dmlcloud/core/stage.py +++ b/dmlcloud/core/stage.py @@ -84,7 +84,7 @@ def add_callback(self, callback: StageCallback): self.callbacks.append(callback) def log(self, name: str, value: Any, reduction: str = 'mean', prefixed: bool = True): - if prefixed: + if prefixed and self.metric_prefix: name = f'{self.metric_prefix}/{name}' self.tracker.log(name, value, reduction) diff --git a/test/conftest.py b/test/conftest.py index 0242a2a..ad9bb12 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,9 +1,10 @@ import pytest -from dmlcloud.core.distributed import deinitialize_torch_distributed, init_process_group_dummy +import torch +from dmlcloud.core.distributed import init @pytest.fixture def torch_distributed(): - init_process_group_dummy() + init(kind='dummy') yield - deinitialize_torch_distributed() + torch.distributed.destroy_process_group() diff --git a/test/test_data.py b/test/test_data.py index 7ac9aad..f3dff8e 100644 --- a/test/test_data.py +++ b/test/test_data.py @@ -5,7 +5,7 @@ import pytest import torch import xarray as xr -from dmlcloud.util.data import interleave_batches, shard_indices, sharded_xr_dataset, ShardedXrDataset +from dmlcloud.data import interleave_batches, shard_indices, sharded_xr_dataset, ShardedXrDataset from numpy.testing import assert_array_equal from torch.utils.data import DataLoader, IterableDataset diff --git a/test/test_metrics.py b/test/test_metrics.py deleted file mode 100644 index 4c938bf..0000000 --- a/test/test_metrics.py +++ /dev/null @@ -1,208 +0,0 @@ -import sys - -import pytest -import torch -from dmlcloud.core.metrics import MetricReducer, MetricTracker, Reduction - - -class TestMetricReducer: - def test_local_reduction(self): - reducer = MetricReducer(reduction=Reduction.MIN, globally=False) - reducer.append(torch.tensor([1, 2, 3], dtype=torch.float)) - reducer.append(torch.tensor([-1, -2, -3], dtype=torch.float)) - reducer.append(torch.tensor([1, 7, 10], dtype=torch.float)) - - assert reducer.reduce_locally().item() == -3 - assert reducer.reduce_globally().item() == -3 - - reducer.reduction = Reduction.MAX - assert reducer.reduce_locally().item() == 10 - assert reducer.reduce_globally().item() == 10 - - reducer.reduction = Reduction.SUM - assert reducer.reduce_locally().item() == 18 - assert reducer.reduce_globally().item() == 18 - - reducer.reduction = Reduction.MEAN - assert reducer.reduce_locally().item() == 2 - assert reducer.reduce_globally().item() == 2 - - def test_global_reduction(self, torch_distributed): - reducer = MetricReducer(reduction=Reduction.MIN, globally=True) - reducer.append(torch.tensor([1, 2, 3], dtype=torch.float)) - reducer.append(torch.tensor([-1, -2, -3], dtype=torch.float)) - reducer.append(torch.tensor([1, 7, 10], dtype=torch.float)) - - assert reducer.reduce_locally().item() == -3 - assert reducer.reduce_globally().item() == -3 - - reducer.reduction = Reduction.MAX - assert reducer.reduce_locally().item() == 10 - assert reducer.reduce_globally().item() == 10 - - reducer.reduction = Reduction.SUM - assert reducer.reduce_locally().item() == 18 - assert reducer.reduce_globally().item() == 18 - - reducer.reduction = Reduction.MEAN - assert reducer.reduce_locally().item() == 2 - assert reducer.reduce_globally().item() == 2 - - def test_partial_reduction(self): - tensor = torch.tensor([[[1, 2, 3], [4, 5, 6]], [[1, 2, 3], [4, 5, 6]]], dtype=torch.float) # shape: 2x2x3 - print(tensor.shape) - - reducer = MetricReducer(reduction=Reduction.MIN, globally=False, dim=[1, 2]) - reducer.append(tensor) - result = reducer.reduce_locally() - assert result.shape == (2,) - assert result[0].item() == 1 - assert result[1].item() == 1 - - reducer = MetricReducer(reduction=Reduction.SUM, globally=False, dim=2) - reducer.append(tensor) - result = reducer.reduce_locally() - assert result.shape == (2, 2) - assert result[0, 0].item() == 6 - assert result[0, 1].item() == 15 - assert result[1, 0].item() == 6 - assert result[1, 1].item() == 15 - - def test_serialization(self): - reducer = MetricReducer(reduction=Reduction.MIN, dim=(1, 2, 3)) - reducer.append(torch.tensor([1, 2, 3])) - state_dict = reducer.state_dict() - - new_reducer = MetricReducer() - new_reducer.load_state_dict(state_dict) - assert new_reducer.reduction == Reduction.MIN - assert new_reducer.dim == [1, 2, 3] - assert new_reducer.values == reducer.values - - def test_empty_reduction(self, torch_distributed): - reducer = MetricReducer(reduction=Reduction.MIN, globally=True) - result = reducer.reduce_locally() - assert result is None - - result = reducer.reduce_globally() - assert result is None - - -class TestMetricTracker: - def test_dictionary(self): - tracker = MetricTracker() - assert len(tracker) == 0 - - tracker.register_metric('A') - tracker.register_metric('B', reduction=Reduction.MEAN, globally=False) - assert len(tracker) == 2 - - assert 'A' in tracker - assert 'B' in tracker - assert 'C' not in tracker - - assert isinstance(tracker['A'], list) - assert len(tracker['A']) == 0 - - def test_is_reduced_metric(self): - tracker = MetricTracker() - tracker.register_metric('A') - tracker.register_metric('B', reduction=Reduction.MEAN, globally=False) - - assert not tracker.is_reduced_metric('A') - assert tracker.is_reduced_metric('B') - - def test_epoch_filling(self): - tracker = MetricTracker() - tracker.register_metric('A') - - tracker.next_epoch() - assert len(tracker['A']) == 1 and tracker['A'][0] is None - assert tracker.epoch == 2 - - tracker.next_epoch() - assert len(tracker['A']) == 2 and tracker['A'][1] is None - assert tracker.epoch == 3 - - tracker.register_metric('B', reduction=Reduction.MEAN, globally=False) - assert len(tracker['B']) == 2 and tracker['B'][1] is None - - def test_track(self): - tracker = MetricTracker() - tracker.register_metric('A') - - tracker.track('A', 1) - with pytest.raises(ValueError): # haven't progressed the epoch yet - tracker.track('A', 42) - tracker.next_epoch() - - tracker.track('A', 42) - - tracker.register_metric('B', reduction=Reduction.MEAN, globally=False) - tracker.track('B', 2.0) - tracker.track('B', 4.0) - tracker.track('B', 1.0) - tracker.track('B', 1.0) - - tracker.next_epoch() - assert tracker['A'] == [1, 42] - assert tracker['B'] == [None, torch.tensor(2.0)] - - def test_str(self): - tracker = MetricTracker() - tracker.register_metric('A') - tracker.register_metric('B', reduction=Reduction.MEAN, globally=False) - tracker.track('A', 1) - print(str(tracker)) - - def test_manual_reduction(self): - tracker = MetricTracker() - tracker.register_metric('A') - tracker.register_metric('B', reduction=Reduction.SUM, globally=False) - tracker.track('B', 1.0) - tracker.track('B', 2.0) - tracker.track('B', 3.0) - tracker.reduce_all(prefix='B') - - assert tracker.has_value('B') - assert not tracker.has_value('A') - assert tracker.current_value('B').item() == 6.0 - assert tracker.current_value('A') is None - assert tracker['B'] == [] - - with pytest.raises(ValueError): - tracker.reduce_all(prefix='B') - - # does not throw, nor modify value - tracker.reduce_all(prefix='B', strict=False) - assert tracker.current_value('B').item() == 6.0 - assert tracker['B'] == [] - - # advances epoch - tracker.next_epoch() - assert tracker['B'] == [torch.tensor(6.0)] - assert tracker['A'] == [None] - assert tracker.current_value('B') is None - - def test_serialization(self): - tracker1 = MetricTracker() - tracker1.register_metric('A') - tracker1.register_metric('B', reduction=Reduction.MEAN, globally=False) - - tracker1.track('A', 1) - tracker1.track('B', torch.randn(3, 2)) - tracker1.next_epoch() - tracker1.track('A', 2) - tracker1.track('B', torch.randn(3, 2)) - - state_dict = tracker1.state_dict() - tracker2 = MetricTracker() - tracker2.load_state_dict(state_dict) - assert tracker2.epoch == tracker1.epoch - assert 'A' in tracker2 and 'B' in tracker2 - assert tracker2['A'] == tracker1['A'] - assert tracker2['B'] == tracker1['B'] - - -if __name__ == '__main__': - sys.exit(pytest.main([__file__])) diff --git a/test/test_smoke.py b/test/test_smoke.py index c58a373..1111e2c 100644 --- a/test/test_smoke.py +++ b/test/test_smoke.py @@ -1,45 +1,56 @@ import sys +import dmlcloud as dml import pytest import torch -from dmlcloud.core.pipeline import TrainingPipeline -from dmlcloud.core.stage import TrainValStage class DummyDataset(torch.utils.data.Dataset): def __len__(self): - return 8 + return 256 def __getitem__(self, idx): - return torch.randn(10), torch.randint(0, 10, size=(1,)).item() + x = torch.randn(10) + y = x.sum() * 0.1 + return x, y -class DummyStage(TrainValStage): +class DummyStage(dml.Stage): def pre_stage(self): - self.model = torch.nn.Linear(10, 10) - self.pipeline.register_model('linear', self.model) + self.train_dl = torch.utils.data.DataLoader(DummyDataset(), batch_size=32) - self.optimizer = torch.optim.SGD(self.model.parameters(), lr=1e-3) - self.pipeline.register_optimizer('sgd', self.optimizer) + model = torch.nn.Sequential( + torch.nn.Linear(10, 32), + torch.nn.Linear(32, 1), + ) + self.model = dml.wrap_ddp(model, self.device) + self.optim = torch.optim.Adam(self.model.parameters(), lr=dml.scale_lr(1e-2)) + self.loss = torch.nn.L1Loss() - self.pipeline.register_dataset('train', torch.utils.data.DataLoader(DummyDataset(), batch_size=4)) - self.pipeline.register_dataset('val', torch.utils.data.DataLoader(DummyDataset(), batch_size=4)) + def run_epoch(self): + for x, y in self.train_dl: + self.optim.zero_grad() - self.loss = torch.nn.CrossEntropyLoss() + x, y = x.to(self.device), y.to(self.device) + output = self.model(x) + loss = self.loss(output[:, 0], y) + loss.backward() - def step(self, batch): - x, y = batch - x, y = x.to(self.device), y.to(self.device) - output = self.model(x) - loss = self.loss(output, y) - return loss + self.optim.step() + + self.log('train/loss', loss) class TestSmoke: def test_smoke(self, torch_distributed): - pipeline = TrainingPipeline() - pipeline.append_stage(DummyStage(), max_epochs=1) - pipeline.run() + pipe = dml.Pipeline() + stage = DummyStage(epochs=3) + pipe.append(stage) + pipe.run() + + assert stage.current_epoch == 3 + assert 'train/loss' in stage.history + assert stage.history.last()['train/loss'] < 0.1 if __name__ == '__main__':