Skip to content

Commit

Permalink
Fix utility tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Blanca-Fuentes committed Jan 14, 2025
1 parent ec10732 commit 9f020c5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 34 deletions.
1 change: 0 additions & 1 deletion reframe/core/schedulers/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ async def _poll_job(self, job):
job._state = 'SUCCESS'
else:
job._state = 'FAILURE'
# print("finished", job)

exec_proc = job.steps['exec']
if exec_proc.started():
Expand Down
14 changes: 10 additions & 4 deletions reframe/utility/osext.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def kill(self, signum):
except (ProcessLookupError, PermissionError,
psutil.NoSuchProcess):
self.log(f'child pid {child.pid} already dead')
self._completed = True

def terminate(self):
'''Terminate the spawned process by sending ``SIGTERM``.'''
Expand Down Expand Up @@ -239,9 +240,10 @@ async def exception(self):
if self._proc.returncode == 0:
return

stdout, stderr = await self._proc.communicate()
return SpawnedProcessError(self._proc.args,
self._proc.stdout.read(),
self._proc.stderr.read(),
stdout.decode(),
stderr.decode(),
self._proc.returncode)

async def communicate(self):
Expand Down Expand Up @@ -359,18 +361,22 @@ async def run_command_asyncio_alone(cmd,

if shell:
# Call create_subprocess_shell
return await asyncio.create_subprocess_shell(
process = await asyncio.create_subprocess_shell(
cmd, stdout=stdout,
stderr=stderr,
**kwargs
)
process.args = cmd
else:
# Call create_subprocess_exec
return await asyncio.create_subprocess_exec(
process = await asyncio.create_subprocess_exec(
cmd, stdout=stdout,
stderr=stderr,
**kwargs
)
process.args = cmd

return process


async def run_command_asyncio(cmd,
Expand Down
61 changes: 32 additions & 29 deletions unittests/test_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_command_futures():

# Check that some operations cannot be performed on an unstarted future
with pytest.raises(osext.UnstartedProcError):
proc.done()
test_util.asyncio_run(proc.done)

with pytest.raises(osext.UnstartedProcError):
proc.cancel()
Expand All @@ -99,28 +99,29 @@ def test_command_futures():
proc.terminate()

with pytest.raises(osext.UnstartedProcError):
proc.wait()
test_util.asyncio_run(proc.wait)

assert not proc.started()
proc.start()
test_util.asyncio_run(proc.start)
assert proc.started()
assert proc.pid is not None

# By default a process is not started as a new session
assert not proc.is_session()

# stdout must block
assert proc.stdout().read() == 'hello\n'
stdout, stderr = test_util.asyncio_run(proc.communicate)
assert stdout.decode() == 'hello\n'
assert proc.exitcode == 0
assert proc.signal is None

# Additional wait() should have no effect
proc.wait()
proc.wait()
test_util.asyncio_run(proc.wait)
test_util.asyncio_run(proc.wait)

assert proc.done()
assert test_util.asyncio_run(proc.done)
assert not proc.cancelled()
assert proc.exception() is None
assert test_util.asyncio_run(proc.exception) is None


def test_command_futures_callbacks():
Expand All @@ -135,13 +136,13 @@ def _callback(_):
with pytest.raises(ValueError):
proc.add_done_callback(lambda: 1)

proc.start()
while not proc.done():
test_util.asyncio_run(proc.start)
while not test_util.asyncio_run(proc.done):
pass

# Call explicitly more times
proc.done()
proc.done()
test_util.asyncio_run(proc.done)
test_util.asyncio_run(proc.done)
assert num_called == 1


Expand All @@ -152,13 +153,13 @@ def _checked_cmd(request):

def test_command_futures_error(_checked_cmd):
proc = osext.run_command_async2("false", shell=True, check=_checked_cmd)
proc.start()
test_util.asyncio_run(proc.start)

# exception() blocks until the process is finished
if _checked_cmd:
assert isinstance(proc.exception(), SpawnedProcessError)
assert isinstance(test_util.asyncio_run(proc.exception), SpawnedProcessError)
else:
assert proc.exception() is None
assert test_util.asyncio_run(proc.exception) is None

assert proc.exitcode == 1
assert proc.signal is None
Expand All @@ -178,16 +179,16 @@ def _signal(request):

def test_command_futures_signal(_checked_cmd, _signal):
proc = osext.run_command_async2('sleep 3', shell=True, check=_checked_cmd)
proc.start()
test_util.asyncio_run(proc.start)
if _signal == signal.SIGTERM:
proc.terminate()
elif _signal == signal.SIGKILL:
proc.cancel()
else:
proc.kill(_signal)

proc.wait()
assert proc.done()
test_util.asyncio_run(proc.wait)
assert test_util.asyncio_run(proc.done)
if _signal == signal.SIGKILL:
assert proc.cancelled()
else:
Expand All @@ -196,9 +197,10 @@ def test_command_futures_signal(_checked_cmd, _signal):
assert proc.signal == _signal
assert proc.exitcode is None
if _checked_cmd:
assert isinstance(proc.exception(), SpawnedProcessError)
assert isinstance(test_util.asyncio_run(proc.exception),
SpawnedProcessError)
else:
assert proc.exception() is None
assert test_util.asyncio_run(proc.exception) is None


def test_command_futures_chain(tmp_path):
Expand All @@ -211,13 +213,14 @@ def test_command_futures_chain(tmp_path):
proc0.then(proc2).then(proc3)
all_procs = [proc0, proc1, proc2, proc3]
t_start = time.time()
proc0.start()
while not all(p.done() for p in all_procs if p.started()):
test_util.asyncio_run(proc0.start)
while not all(test_util.asyncio_run(p.done) for p in all_procs
if p.started()):
pass

t_elapsed = time.time() - t_start
assert t_elapsed < 2
assert all(p.done() for p in all_procs)
assert all(test_util.asyncio_run(p.done) for p in all_procs)

with open(tmp_path / 'stdout.txt') as fp:
assert fp.read() == 'hello\nworld\n'
Expand All @@ -244,13 +247,13 @@ def cond(proc):
with pytest.raises(ValueError):
proc0.then(proc1, when=lambda: False)

proc0.start()
proc0.wait()
proc1.wait()
test_util.asyncio_run(proc0.start)
test_util.asyncio_run(proc0.wait)
test_util.asyncio_run(proc1.wait)
if _chain_policy == 'fail_on_error':
assert not proc2.started()
else:
proc2.wait()
test_util.asyncio_run(proc2.wait)

with open(tmp_path / 'stdout.txt') as fp:
if _chain_policy == 'fail_on_error':
Expand All @@ -264,8 +267,8 @@ def test_command_futures_chain_cancel():
proc1 = osext.run_command_async2('sleep 1', shell=True)
proc2 = osext.run_command_async2('echo world', shell=True)
proc0.then(proc1).then(proc2)
proc0.start()
while not proc0.done():
test_util.asyncio_run(proc0.start)
while not test_util.asyncio_run(proc0.done):
pass

assert proc1.started()
Expand Down

0 comments on commit 9f020c5

Please sign in to comment.