Skip to content

Commit

Permalink
fix: cleanup in reverse order (RAII-like)
Browse files Browse the repository at this point in the history
  • Loading branch information
sehoffmann committed Jan 3, 2025
1 parent f9823dd commit 0117c38
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions dmlcloud/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __exit__(self, exc_type, exc_value, traceback):
callbacks += self.pipe.current_stage.callbacks
callbacks += self.pipe.callbacks

for callback in callbacks:
for callback in reversed(callbacks):
callback.cleanup(self.pipe, exc_type, exc_value, traceback)

return suppress_exception
Expand Down Expand Up @@ -104,8 +104,6 @@ def __init__(self, config: Optional[Union[OmegaConf, Dict]] = None, name: Option
self.name = name

self.checkpoint_dir = None
self.gloo_group = None
self.io_redirector = None
self.resumed = None
self.start_time = None
self.stop_time = None
Expand All @@ -116,6 +114,11 @@ def __init__(self, config: Optional[Union[OmegaConf, Dict]] = None, name: Option
self.stages = []
self.callbacks = []

if dist.is_gloo_available():
self.gloo_group = dist.new_group(backend='gloo')
else:
warnings.warn('Gloo backend not available. Barriers will not use custom timeouts.')

@property
def checkpointing_enabled(self):
return self.checkpoint_dir is not None
Expand Down Expand Up @@ -197,16 +200,15 @@ def run(self):
if len(self.stages) == 0:
raise ValueError('No stages defined. Use append() to add stages to the pipeline.')

if dist.is_gloo_available():
self.gloo_group = dist.new_group(backend='gloo')
else:
warnings.warn('Gloo backend not available. Barriers will not use custom timeouts.')

for stage in self.stages:
stage.add_callback(_ForwardCallback()) # forward callbacks to pipeline callbacks

self.add_callback(DiagnosticsCallback())

# make sure everything is set up before starting the run
# important to prevent checkpoint dir creation before all processes searched for it
self.barrier(timeout=10 * 60)

with _RunGuard(self):
self._pre_run()
for stage in self.stages:
Expand Down Expand Up @@ -238,10 +240,6 @@ def device(self):
return torch.device('cpu')

def _pre_run(self):
# make sure everything is set up before starting the run
# important to prevent checkpoint dir creation before all processes searched for it
self.barrier(timeout=10 * 60)

self.start_time = datetime.now()

header = '\n' + experiment_header(self.name, self.checkpoint_dir, self.start_time)
Expand Down

0 comments on commit 0117c38

Please sign in to comment.