Skip to content

Commit

Permalink
Setonix changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Schroeter committed Jan 9, 2025
1 parent 8f69bfb commit 1a33e73
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 34 deletions.
50 changes: 37 additions & 13 deletions hpcpy/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,25 @@
class BaseClient:
"""A base class from which all others inherit."""

def __init__(self, tmp_submit, tmp_status, tmp_delete, job_script_expiry="1H"):
def __init__(self, cmd_templates, statuses, status_attribute, job_script_expiry="1H"):
"""Constructor.
Parameters
----------
tmp_submit : str
Submit command template.
tmp_status : str
Status command template.
tmp_delete : str
Delete command template.
cmd_templates : dict
Dictionary of command templates.
statuses : list
List of statuses.
status_attribute : str
Attribute to use for status lookup.
job_script_expiry : str, optional
Job script expiry interval, by default "1H"
"""

# Set the command templates
self._tmp_submit = tmp_submit
self._tmp_status = tmp_status
self._tmp_delete = tmp_delete
self.cmd_templates = cmd_templates
self.job_script_expiry = job_script_expiry
self.statuses = statuses

def _clean_rendered_job_scripts(self) -> None:
"""Clean the rendered job scripts from the JOB_SCRIPT_DIR."""
Expand Down Expand Up @@ -98,7 +97,7 @@ def submit(
context["directives"] = self._render_directives(directives)

context["job_script"] = _job_script
cmd = self._tmp_submit.format(**context)
cmd = self.cmd_templates['submit'].format(**context)

# Just return the command string for the user without submitting
if dry_run:
Expand All @@ -115,7 +114,7 @@ def status(self, job_id):
job_id : str
Job ID.
"""
cmd = self._tmp_status.format(job_id=job_id)
cmd = self.cmd_templates['status'].format(job_id=job_id)
result = self._shell(cmd)
return result

Expand All @@ -127,7 +126,7 @@ def delete(self, job_id):
job_id : str
Job ID.
"""
cmd = self._tmp_delete.format(job_id=job_id)
cmd = self.cmd_templates['delete'].format(job_id=job_id)
result = self._shell(cmd)
return result

Expand Down Expand Up @@ -254,3 +253,28 @@ def _render_directives(self, directives):
return ""

return " " + " ".join(directives)

def _lookup_status(self, status):
"""Lookup a status in the statuses list.
Parameters
----------
status : str
Raw status code from the scheduler.
Returns
-------
hpcpy.status.Status
Status object.
Raises
------
ValueError
When the status is not found in the statuses list.
"""

for _status in self.statuses:
if getattr(_status, self.status_attribute) == status:
return _status

raise ValueError(f"Status {status} not found in statuses.")
24 changes: 22 additions & 2 deletions hpcpy/client/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,25 @@ def __init__(self, *args, **kwargs):

# Set up the templates
super().__init__(
tmp_submit=hc.PBS_SUBMIT, tmp_status=hc.PBS_STATUS, tmp_delete=hc.PBS_DELETE
cmd_templates=hc.PBS_COMMANDS,
statuses=hc.PBS_STATUSES,
status_attribute="short",
*args, **kwargs
)

def status(self, job_id):
"""Get the status of a job.
Parameters
----------
job_id : str
Job ID.
Returns
-------
str
Generic status code.
"""

# Get the raw response
raw = super().status(job_id=job_id)
Expand All @@ -26,7 +41,12 @@ def status(self, job_id):

# Get the status out of the job ID
_status = parsed.get("Jobs").get(job_id).get("job_state")
return hc.PBS_STATUSES[_status]

# Get the native status by looking it up using the parent class.
status_native = super()._lookup_status(_status)

# Return the generic status
return status_native.generic

def _render_variables(self, variables):
"""Render the variables flag for PBS.
Expand Down
96 changes: 77 additions & 19 deletions hpcpy/constants.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Constants."""

from pathlib import Path
from hpcpy.status import Status

# Location for rendered job scripts
JOB_SCRIPT_DIR = Path.home() / ".hpcpy" / "job_scripts"
JOB_SCRIPT_DIR.mkdir(parents=True, exist_ok=True)

# Statuses
# Generic Statuses
STATUS_CYCLE_HARVESTING = "U"
STATUS_EXITING = "E"
STATUS_FINISHED = "F"
Expand All @@ -20,23 +21,80 @@
STATUS_SUSPENDED = "S"
STATUS_WAITING = "W"

# PBS status translation
PBS_STATUSES = dict(
B=STATUS_HAS_SUBJOB,
E=STATUS_EXITING,
F=STATUS_FINISHED,
H=STATUS_HELD,
M=STATUS_MOVED,
Q=STATUS_QUEUED,
R=STATUS_RUNNING,
S=STATUS_SUSPENDED,
T=STATUS_MOVING,
U=STATUS_CYCLE_HARVESTING,
W=STATUS_WAITING,
X=STATUS_SUBJOB_COMPLETED,
)
# # PBS status translation
# PBS_STATUSES = dict(
# B=STATUS_HAS_SUBJOB,
# E=STATUS_EXITING,
# F=STATUS_FINISHED,
# H=STATUS_HELD,
# M=STATUS_MOVED,
# Q=STATUS_QUEUED,
# R=STATUS_RUNNING,
# S=STATUS_SUSPENDED,
# T=STATUS_MOVING,
# U=STATUS_CYCLE_HARVESTING,
# W=STATUS_WAITING,
# X=STATUS_SUBJOB_COMPLETED,
# )

# PBS command templates
PBS_SUBMIT = "qsub{directives} {job_script}"
PBS_STATUS = "qstat -f -F json {job_id}"
PBS_DELETE = "qdel {job_id}"
PBS_COMMANDS = dict(
submit="qsub{directives} {job_script}",
status="qstat -f -F json {job_id}",
delete="qdel {job_id}",
hold="qhold {job_id}",
release="qrls {job_id}",
)

# SLURM command templates
SLURM_COMMANDS = dict(
submit="sbatch{directives} {job_script}",
status="squeue -j {job_id} --json",
delete="scancel {job_id}",
hold="scontrol hold {job_id}",
release="scontrol release {job_id}"
)

# SLURM status codes
SLURM_STATUSES = list(
Status("BF", "BOOT_FAIL", "Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued)."),
Status("CA", "CANCELLED", "Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated."),
Status("CD", "COMPLETED", "Job has terminated all processes on all nodes with an exit code of zero.", generic=STATUS_FINISHED),
Status("CF", "CONFIGURING", "Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting)."),
Status("CG", "COMPLETING", "Job is in the process of completing. Some processes on some nodes may still be active."),
Status("DL", "DEADLINE", "Job terminated on deadline."),
Status("F", "FAILED", "Job terminated with non-zero exit code or other failure condition."),
Status("NF", "NODE_FAIL", "Job terminated due to failure of one or more allocated nodes."),
Status("OOM", "OUT_OF_MEMORY", "Job experienced out of memory error."),
Status("PD", "PENDING", "Job is awaiting resource allocation.", generic=STATUS_QUEUED),
Status("PR", "PREEMPTED", "Job terminated due to preemption."),
Status("R", "RUNNING", "Job currently has an allocation.", generic=STATUS_RUNNING),
Status("RD", "RESV_DEL_HOLD", "Job is being held after requested reservation was deleted."),
Status("RF", "REQUEUE_FED", "Job is being requeued by a federation."),
Status("RH", "REQUEUE_HOLD", "Held job is being requeued."),
Status("RQ", "REQUEUED", "Completing job is being requeued."),
Status("RS", "RESIZING", "Job is about to change size."),
Status("RV", "REVOKED", "Sibling was removed from cluster due to other cluster starting the job."),
Status("SI", "SIGNALING", "Job is being signaled."),
Status("SE", "SPECIAL_EXIT", "The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value."),
Status("SO", "STAGE_OUT", "Job is staging out files."),
Status("ST", "STOPPED", "Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job."),
Status("S", "SUSPENDED", "Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.", generic=STATUS_SUSPENDED),
Status("TO", "TIMEOUT", "Job terminated upon reaching its time limit.")
)

# PBS status codes
PBS_STATUSES = list(
Status("B", None, "Array job has at least one subjob running", generic=STATUS_HAS_SUBJOB),
Status("E", None, "Job is exiting after having run", generic=STATUS_EXITING),
Status("F", None, "Job is finished", generic=STATUS_FINISHED),
Status("H", None, "Job is held", generic=STATUS_HELD),
Status("M", None, "Job was moved to another server", generic=STATUS_MOVED),
Status("Q", None, "Job is queued", generic=STATUS_QUEUED),
Status("R", None, "Job is running", generic=STATUS_RUNNING),
Status("S", None, "Job is suspended", generic=STATUS_SUSPENDED),
Status("T", None, "Job is being moved to new location", generic=STATUS_MOVING),
Status("U", None, "Cycle-harvesting job is suspended due to keyboard activity", generic=STATUS_CYCLE_HARVESTING),
Status("W", None, "Job is waiting for its submitter-assigned start time to be reached", generic=STATUS_WAITING),
Status("X", None, "Subjob has completed execution or has been deleted", generic=STATUS_SUBJOB_COMPLETED)
)

0 comments on commit 1a33e73

Please sign in to comment.