Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: scheduler #275

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add scheduler cli, query scheduler automatically
superstar54 committed Aug 27, 2024
commit ed47f0b6391bf3679f4d4f310530bedbacbba6ad
3 changes: 2 additions & 1 deletion aiida_workgraph/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
from aiida_workgraph.cli import cmd_graph
from aiida_workgraph.cli import cmd_web
from aiida_workgraph.cli import cmd_task
from aiida_workgraph.cli import cmd_scheduler


__all__ = ["cmd_graph", "cmd_web", "cmd_task"]
__all__ = ["cmd_graph", "cmd_web", "cmd_task", "cmd_scheduler"]
125 changes: 125 additions & 0 deletions aiida_workgraph/cli/cmd_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from aiida_workgraph.cli.cmd_workgraph import workgraph
from aiida import orm
import click
import os
from pathlib import Path
from aiida.cmdline.utils import echo
from .cmd_graph import REPAIR_INSTRUCTIONS


REACT_PORT = "3000"


def get_package_root():
"""Returns the root directory of the package."""
current_file = Path(__file__)
# Root directory of your package
return current_file.parent


def get_pid_file_path():
"""Get the path to the PID file in the desired directory."""
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER

return AIIDA_CONFIG_FOLDER / "scheduler_processes.pid"


@workgraph.group("scheduler")
def scheduler():
"""Commands to manage the scheduler process."""


@scheduler.command()
def start():
"""Start the scheduler application."""
from aiida_workgraph.engine.scheduler import WorkGraphScheduler
from aiida.engine import submit

click.echo("Starting the scheduler process...")

pid_file_path = get_pid_file_path()
# if the PID file already exists, check if the process is running
if pid_file_path.exists():
with open(pid_file_path, "r") as pid_file:
for line in pid_file:
_, pid = line.strip().split(":")
if pid:
try:
node = orm.load_node(pid)
if node.is_sealed:
click.echo(
"PID file exists but no running scheduler process found."
)
else:
click.echo(
f"Scheduler process with PID {node.pk} already running."
)
return
except Exception:
click.echo(
"PID file exists but no running scheduler process found."
)

with open(pid_file_path, "w") as pid_file:
node = submit(WorkGraphScheduler)
pid_file.write(f"Scheduler:{node.pk}\n")
click.echo(f"Scheduler process started with PID {node.pk}.")


@scheduler.command()
def stop():
"""Stop the scheduler application."""
from aiida.engine.processes import control

pid_file_path = get_pid_file_path()

if not pid_file_path.exists():
click.echo("No running scheduler application found.")
return

with open(pid_file_path, "r") as pid_file:
for line in pid_file:
_, pid = line.strip().split(":")
if pid:
click.confirm(
"Are you sure you want to kill the scheduler process?", abort=True
)
process = orm.load_node(pid)
try:
message = "Killed through `verdi process kill`"
control.kill_processes(
[process],
timeout=5,
wait=True,
message=message,
)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f"{exception}\n{REPAIR_INSTRUCTIONS}")
os.remove(pid_file_path)


@scheduler.command()
def status():
"""Check the status of the scheduler application."""
from aiida.orm import QueryBuilder
from aiida_workgraph.engine.scheduler import WorkGraphScheduler

qb = QueryBuilder()
projections = ["id"]
filters = {
"or": [
{"attributes.sealed": False},
{"attributes": {"!has_key": "sealed"}},
]
}
qb.append(
WorkGraphScheduler,
filters=filters,
project=projections,
tag="process",
)
results = qb.all()
if len(results) == 0:
click.echo("No scheduler found. Please start the scheduler first.")
else:
click.echo(f"Scheduler process is running with PID: {results[0][0]}")
20 changes: 20 additions & 0 deletions aiida_workgraph/engine/utils.py
Original file line number Diff line number Diff line change
@@ -139,3 +139,23 @@ def prepare_for_shell_task(task: dict, kwargs: dict) -> dict:
"metadata": metadata or {},
}
return inputs


def get_scheduler():
from aiida.orm import QueryBuilder
from aiida_workgraph.engine.scheduler import WorkGraphScheduler

qb = QueryBuilder()
projections = ["id"]
filters = {
"or": [
{"attributes.sealed": False},
{"attributes": {"!has_key": "sealed"}},
]
}
qb.append(WorkGraphScheduler, filters=filters, project=projections, tag="process")
results = qb.all()
if len(results) == 0:
raise ValueError("No scheduler found. Please start the scheduler first.")
scheduler_id = results[0][0]
return scheduler_id
9 changes: 8 additions & 1 deletion aiida_workgraph/workgraph.py
Original file line number Diff line number Diff line change
@@ -115,6 +115,7 @@ def submit(
wait: bool = False,
timeout: int = 60,
metadata: Optional[Dict[str, Any]] = None,
to_scheduler: bool = False,
) -> aiida.orm.ProcessNode:
"""Submit the AiiDA workgraph process and optionally wait for it to finish.
Args:
@@ -123,6 +124,8 @@ def submit(
restart (bool): Restart the process, and reset the modified tasks, then only re-run the modified tasks.
new (bool): Submit a new process.
"""
from aiida_workgraph.engine.utils import get_scheduler

# set task inputs
if inputs is not None:
for name, input in inputs.items():
@@ -134,7 +137,11 @@ def submit(
self.save(metadata=metadata)
if self.process.process_state.value.upper() not in ["CREATED"]:
raise ValueError(f"Process {self.process.pk} has already been submitted.")
self.continue_process()
if to_scheduler:
scheduler_pk = get_scheduler()
self.continue_process_in_scheduler(scheduler_pk)
else:
self.continue_process()
# as long as we submit the process, it is a new submission, we should set restart_process to None
self.restart_process = None
if wait:
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ workgraph = "aiida_workgraph.cli.cmd_workgraph:workgraph"

[project.entry-points.'aiida.workflows']
"workgraph.engine" = "aiida_workgraph.engine.workgraph:WorkGraphEngine"
"workgraph.scheduler" = "aiida_workgraph.engine.workgraph:WorkGraphScheduler"
"workgraph.scheduler" = "aiida_workgraph.engine.scheduler:WorkGraphScheduler"

[project.entry-points."aiida.data"]
"workgraph.general" = "aiida_workgraph.orm.general_data:GeneralData"