From 0117c381e6a893f0b2de47534f74da4bf43f0d0e Mon Sep 17 00:00:00 2001 From: Sebastian Hoffmann Date: Fri, 3 Jan 2025 15:08:23 +0100 Subject: [PATCH] fix: cleanup in reverse order (RAII-like) --- dmlcloud/core/pipeline.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/dmlcloud/core/pipeline.py b/dmlcloud/core/pipeline.py index cf0011b..e2ee322 100644 --- a/dmlcloud/core/pipeline.py +++ b/dmlcloud/core/pipeline.py @@ -68,7 +68,7 @@ def __exit__(self, exc_type, exc_value, traceback): callbacks += self.pipe.current_stage.callbacks callbacks += self.pipe.callbacks - for callback in callbacks: + for callback in reversed(callbacks): callback.cleanup(self.pipe, exc_type, exc_value, traceback) return suppress_exception @@ -104,8 +104,6 @@ def __init__(self, config: Optional[Union[OmegaConf, Dict]] = None, name: Option self.name = name self.checkpoint_dir = None - self.gloo_group = None - self.io_redirector = None self.resumed = None self.start_time = None self.stop_time = None @@ -116,6 +114,11 @@ def __init__(self, config: Optional[Union[OmegaConf, Dict]] = None, name: Option self.stages = [] self.callbacks = [] + if dist.is_gloo_available(): + self.gloo_group = dist.new_group(backend='gloo') + else: + warnings.warn('Gloo backend not available. Barriers will not use custom timeouts.') + @property def checkpointing_enabled(self): return self.checkpoint_dir is not None @@ -197,16 +200,15 @@ def run(self): if len(self.stages) == 0: raise ValueError('No stages defined. Use append() to add stages to the pipeline.') - if dist.is_gloo_available(): - self.gloo_group = dist.new_group(backend='gloo') - else: - warnings.warn('Gloo backend not available. Barriers will not use custom timeouts.') - for stage in self.stages: stage.add_callback(_ForwardCallback()) # forward callbacks to pipeline callbacks self.add_callback(DiagnosticsCallback()) + # make sure everything is set up before starting the run + # important to prevent checkpoint dir creation before all processes searched for it + self.barrier(timeout=10 * 60) + with _RunGuard(self): self._pre_run() for stage in self.stages: @@ -238,10 +240,6 @@ def device(self): return torch.device('cpu') def _pre_run(self): - # make sure everything is set up before starting the run - # important to prevent checkpoint dir creation before all processes searched for it - self.barrier(timeout=10 * 60) - self.start_time = datetime.now() header = '\n' + experiment_header(self.name, self.checkpoint_dir, self.start_time)