Skip to content

Commit

Permalink
Improve exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Blanca-Fuentes committed Jan 17, 2025
1 parent 914ddf1 commit 460ad1d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 45 deletions.
4 changes: 2 additions & 2 deletions reframe/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def is_severe(exc_type, exc_value, tb):
def is_warning(exc_type, exc_value, tb):
'''Check whether this exception can be treated as warning'''

return isinstance(exc_value, (RunSessionTimeout, KeyboardInterrupt))
return isinstance(exc_value, (RunSessionTimeout, KeyboardInterrupt, KeyboardError))


def what(exc_type, exc_value, tb):
Expand All @@ -364,7 +364,7 @@ def what(exc_type, exc_value, tb):
reason = utility.decamelize(exc_type.__name__, ' ')

# We need frame information for user type and value errors
if isinstance(exc_value, KeyboardInterrupt):
if isinstance(exc_value, KeyboardInterrupt) or isinstance(exc_value, KeyboardError):
reason = 'cancelled by user'
elif isinstance(exc_value, AbortTaskError):
reason = f'aborted due to {type(exc_value.__cause__).__name__}'
Expand Down
7 changes: 5 additions & 2 deletions reframe/frontend/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def __exit__(this, exc_type, exc_value, traceback):
self.skip()
raise TaskExit from e
except ABORT_REASONS:
self.fail()
# self.fail()
raise
except BaseException as e:
self.fail()
Expand Down Expand Up @@ -431,7 +431,7 @@ def __exit__(this, exc_type, exc_value, traceback):
self.skip()
raise TaskExit from e
except ABORT_REASONS:
self.fail()
# self.fail()
raise
except BaseException as e:
self.fail()
Expand Down Expand Up @@ -557,6 +557,9 @@ def abort(self, cause=None):
try:
if not self.zombie and self.check.job:
self.check.job.cancel()
# The abort can also happen during a compile job
if self.check.build_job:
self.check.build_job.cancel()
except JobNotStartedError:
self.fail((type(exc), exc, None), 'on_task_abort')
except BaseException:
Expand Down
134 changes: 93 additions & 41 deletions reframe/frontend/executors/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,14 @@ def __init__(self):
self._retired_tasks = []
self.task_listeners.append(self)

async def _runcase(self, case):
async def _runcase(self, case, task):
check, partition, _ = case
task = RegressionTask(case, self.task_listeners)
if check.is_dry_run():
self.printer.status('DRY', task.info())
else:
self.printer.status('RUN', task.info())

self._task_index[case] = task
self.stats.add_task(task)
try:
# Do not run test if any of its dependencies has failed
# NOTE: Restored dependencies are not in the task_index
Expand Down Expand Up @@ -307,14 +305,25 @@ def execute(self, testcases):

for case in testcases:
try:
loop.run_until_complete(self._runcase(case))
task = RegressionTask(case, self.task_listeners)
self.stats.add_task(task)
loop.run_until_complete(self._runcase(case, task))
except (Exception, KeyboardInterrupt) as e:
if type(e) in ABORT_REASONS:
# When the KeyboardInterrupt happens while asyncio.sleep it comes here
if not task.aborted:
# Make sure that the task is aborted in that case
task.abort(e)
for task in all_tasks(loop):
if isinstance(task, asyncio.tasks.Task):
task.cancel()
loop.close()
raise e
# In case we still receive the KeyboardInterrupt because it happened inside
# asyncio.sleep()
if isinstance(e, KeyboardInterrupt):
raise KeyboardError
else:
raise e
else:
getlogger().info(f"Execution stopped due to an error: {e}")
break
Expand Down Expand Up @@ -353,6 +362,42 @@ def __init__(self):
self._retired_tasks = []
self.task_listeners.append(self)

def _init_pipeline_progress(self, num_tasks):
self._pipeline_progress = {
'startup': [(num_tasks, 0)],
'ready_compile': [(0, 0)],
'compiling': [(0, 0)],
'ready_run': [(0, 0)],
'running': [(0, 0)],
'completing': [(0, 0)],
'retired': [(0, 0)],
'completed': [(0, 0)],
'fail': [(0, 0)],
'skip': [(0, 0)]
}
self._pipeline_step = 0
self._t_pipeline_start = time.time()

def _update_pipeline_progress(self, old_state, new_state, num_tasks=1):
timestamp = time.time() - self._t_pipeline_start
for state in self._pipeline_progress:
count = self._pipeline_progress[state][self._pipeline_step][0]
if old_state != new_state:
if state == old_state:
count -= num_tasks
elif state == new_state:
count += num_tasks

self._pipeline_progress[state].append((count, timestamp))

self._pipeline_step += 1

def _dump_pipeline_progress(self, filename):
import reframe.utility.jsonext as jsonext

with open(filename, 'w') as fp:
jsonext.dump(self._pipeline_progress, fp, indent=2)

async def _runcase(self, case, task):
# I added the task here as an argument because,
# I wanted to initialize it
Expand Down Expand Up @@ -424,6 +469,7 @@ async def _runcase(self, case, task):
await asyncio.sleep(2)
self._partition_tasks[partname].add(task)
await task.compile()
# If RunOnly, no polling for run jobs
if task.check.build_job:
# Pick the right scheduler
if task.check.build_locally:
Expand All @@ -444,6 +490,11 @@ async def _runcase(self, case, task):
await self._pollctl.snooze()
if task.compile_complete():
break
# We need to check the timeout inside the while loop
if self.timeout_expired():
raise RunSessionTimeout(
'maximum session duration exceeded'
)
await task.compile_wait()
self._partition_tasks[partname].remove(task)
partname = _get_partition_name(task, phase='run')
Expand All @@ -452,27 +503,31 @@ async def _runcase(self, case, task):
await asyncio.sleep(2)
self._partition_tasks[partname].add(task)
await task.run()
# If CompileOnly, no polling for run jobs
if task.check.job:
# Pick the right scheduler
if task.check.local:
sched = self.local_scheduler
else:
sched = partition.scheduler

# Pick the right scheduler
if task.check.local:
sched = self.local_scheduler
else:
sched = partition.scheduler

while True:
if not self.dry_run_mode:
if getpollcontroller().is_time_to_poll():
getpollcontroller().reset_time_to_poll()
await sched.poll(*getpollcontroller()._jobs_pool[
sched.registered_name
])

if task.run_complete():
break
await self._pollctl.snooze()
if task.run_complete():
break
while True:
if not self.dry_run_mode:
if getpollcontroller().is_time_to_poll():
getpollcontroller().reset_time_to_poll()
await sched.poll(*getpollcontroller()._jobs_pool[
sched.registered_name
])

if task.run_complete():
break
await self._pollctl.snooze()
if task.run_complete():
break
if self.timeout_expired():
raise RunSessionTimeout(
'maximum session duration exceeded'
)
await task.run_wait()
self._partition_tasks[partname].remove(task)
if not self.skip_sanity_check:
Expand Down Expand Up @@ -555,6 +610,16 @@ def _abortall(self, cause):
for task in self._current_tasks:
with contextlib.suppress(FailureLimitError):
task.abort(cause)
# Cancel the tasks inside the loop, otherwise they will continue switching control
# (after having cancelled all the jobs spawned by the task)
# WARNING SOMETIMES THE JOBS ARE NOT KILLED CORRECTLY
for task in all_tasks(asyncio.get_event_loop()):
if isinstance(task, asyncio.tasks.Task):
try:
task.cancel()
except RuntimeError:
pass


def on_task_setup(self, task):
pass
Expand Down Expand Up @@ -676,20 +741,18 @@ def execute(self, testcases):
all_cases = []
for t in testcases:
task = RegressionTask(t, self.task_listeners)

self.stats.add_task(task)
# Add the tasks outside the asyncio handling so that all tasks are aborted
# otherwise the task in the TesStats is not updated accordingly
self._current_tasks.add(task)
all_cases.append(asyncio.ensure_future(self._runcase(t, task)))
try:
# Wait for tasks until the first failure
loop.run_until_complete(self._execute_until_failure(all_cases))
loop.run_until_complete(asyncio.gather(*all_cases, return_exceptions=False))
except (Exception, KeyboardInterrupt) as e:
if type(e) in ABORT_REASONS:
# Try to cancel them again in case they were not cancelled properly
loop.run_until_complete(_cancel_gracefully(all_cases))
try:
raise AbortTaskError
except AbortTaskError as exc:
self._abortall(exc)
loop.close()
raise e
else:
Expand All @@ -699,17 +762,6 @@ def execute(self, testcases):
loop.close()
self._exit()

async def _execute_until_failure(self, all_cases):
"""Wait for tasks to complete or fail, stopping at the first failure.
"""
while all_cases:
done, all_cases = await asyncio.wait(
all_cases, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if task.exception():
raise task.exception() # Exit if aborted


async def _cancel_gracefully(all_cases):
for case in all_cases:
Expand Down

0 comments on commit 460ad1d

Please sign in to comment.