From 31fd08f89cc6a235532d19ca4c83616114d01327 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 28 Jan 2025 16:16:15 +0000 Subject: [PATCH] Get poll to return task failure if job/log has been removed. added unit tests for JobRunnerMgr._jobs_poll_status_files test the task_job_mgr end --- changes.d/6577.fix.md | 1 + cylc/flow/job_runner_mgr.py | 6 ++ cylc/flow/task_job_mgr.py | 9 +++ tests/integration/test_task_job_mgr.py | 24 ++++++++ tests/unit/test_job_runner_mgr.py | 77 ++++++++++++++++++++++++++ 5 files changed, 117 insertions(+) create mode 100644 changes.d/6577.fix.md create mode 100644 tests/unit/test_job_runner_mgr.py diff --git a/changes.d/6577.fix.md b/changes.d/6577.fix.md new file mode 100644 index 00000000000..ab3276aa70a --- /dev/null +++ b/changes.d/6577.fix.md @@ -0,0 +1 @@ +Cylc poll will now return a task failure if the job log directory has been deleted whilst the task is active, fixing a bug where premature houskeeping left tasks permenantly submitted or running. diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index 558a3d158b2..9e7deccb406 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -439,6 +439,12 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err): def _jobs_poll_status_files(self, job_log_root, job_log_dir): """Helper 1 for self.jobs_poll(job_log_root, job_log_dirs).""" ctx = JobPollContext(job_log_dir) + # If the log directory has been deleted prematurely, return a task + # failure and an explanation: + if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)): + ctx.run_status = 1 + ctx.run_signal = 'ERR/Job files have been removed' + return ctx try: with open( os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 7e1cfa02cc4..784d6956b5b 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -753,6 +753,15 @@ def _manip_task_jobs_callback( or (ctx.ret_code and ctx.ret_code != 255) ): LOG.error(ctx) + # A polling task lets us know that a task has failed because it's + # log folder has been deleted whilst the task was active: + if ( + getattr(ctx, 'out', None) + and 'Job files have been removed' in ctx.out + ): + LOG.error( + f'Task {ctx.cmd[-1]} failed because task log directory' + f'\n {"/".join(ctx.cmd[-2:])}\n has been removed.') # A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy # # Note for "reload": A TaskProxy instance may be replaced on reload, so diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index b1cf1347071..715ccbc068b 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -16,6 +16,7 @@ from contextlib import suppress import logging +from types import SimpleNamespace from typing import Any as Fixture from cylc.flow import CYLC_LOG @@ -233,3 +234,26 @@ async def test_broadcast_platform_change( assert schd.pool.get_tasks()[0].platform['name'] == 'foo' # ... and that remote init failed because all hosts bad: assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)") + + +async def test_poll_job_deleted_log_folder( + one_conf, flow, scheduler, start, caplog +): + """Capture a task error caused by polling finding the job log dir deleted. + + https://github.com/cylc/cylc-flow/issues/6425 + """ + ctx = SimpleNamespace() + ctx.out = 'ERR/Job files have been removed' + ctx.ret_code = None + ctx.cmd = ['foo', 'bar'] + + schd = scheduler(flow(one_conf), run_mode='live', paused_start=False) + async with start(schd): + schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '') + + assert ( + 'Task bar failed because task log directory' + '\n foo/bar\n has been removed.' + in caplog.messages + ) diff --git a/tests/unit/test_job_runner_mgr.py b/tests/unit/test_job_runner_mgr.py new file mode 100644 index 00000000000..403964ed3ea --- /dev/null +++ b/tests/unit/test_job_runner_mgr.py @@ -0,0 +1,77 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from cylc.flow.job_runner_mgr import JobRunnerManager + +jrm = JobRunnerManager() + + +SAMPLE_STATUS = """ +ignore me, I have no = sign +CYLC_JOB_RUNNER_NAME=pbs +CYLC_JOB_ID=2361713 +CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z +CYLC_JOB_PID=2361713 +CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z +CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31 +CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z +CYLC_JOB_EXIT=SUCCEEDED +CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z +""" + + +def test__job_poll_status_files(tmp_path): + """Good Path: A valid job.status files exists""" + (tmp_path / 'sub').mkdir() + (tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS) + ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub') + assert ctx.job_runner_name == 'pbs' + assert ctx.job_id == '2361713' + assert ctx.job_runner_exit_polled == 1 + assert ctx.pid == '2361713' + assert ctx.time_submit_exit == '2025-01-28T14:46:04Z' + assert ctx.time_run == '2025-01-28T14:46:05Z' + assert ctx.time_run_exit == '2025-01-28T14:46:38Z' + assert ctx.run_status == 0 + assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31'] + + +def test__job_poll_status_files_task_failed(tmp_path): + """Good Path: A valid job.status files exists""" + (tmp_path / 'sub').mkdir() + (tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO") + ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub') + assert ctx.run_status == 1 + assert ctx.run_signal == 'FOO' + + +def test__job_poll_status_files_deleted_logdir(): + """The log dir has been deleted whilst the task is still active. + Return the context with the message that the task has failed. + """ + ctx = jrm._jobs_poll_status_files('foo', 'bar') + assert ctx.run_signal == 'ERR/Job files have been removed' + assert ctx.run_status == 1 + + +def test__job_poll_status_files_ioerror(tmp_path, capsys): + """There is no readable file. + """ + (tmp_path / 'sub').mkdir() + jrm._jobs_poll_status_files(str(tmp_path), 'sub') + cap = capsys.readouterr() + assert '[Errno 2] No such file or directory' in cap.err +