Skip to content

Commit

Permalink
Merge pull request #877 from MolSSI/manager_idle
Browse files Browse the repository at this point in the history
Option to shut down managers if idle for too long
  • Loading branch information
bennybp authored Jan 9, 2025
2 parents f5b076e + 36406bb commit 90883d3
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 20 deletions.
4 changes: 4 additions & 0 deletions qcarchivetesting/qcarchivetesting/testing_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def reset(self):
self._stop_compute()
self._all_completed = set()
self._qcf_config = self._original_config.copy(deep=True)

if self._api_proc is None:
self.start_api()

self.pg_harness.recreate_database()

def get_storage_socket(self) -> SQLAlchemySocket:
Expand Down
52 changes: 42 additions & 10 deletions qcfractalcompute/qcfractalcompute/compute_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ def __init__(self, config: FractalComputeConfig):
# Number of failed heartbeats. After missing a bunch, we will shutdown
self._failed_heartbeats = 0

# Time at which the worker started idling (no jobs being run)
self._idle_start_time = None

@staticmethod
def _get_max_workers(executor: ParslExecutor) -> int:
"""
Expand Down Expand Up @@ -296,6 +299,10 @@ def scheduler_heartbeat():
self.logger.info("Compute Manager successfully started.")

self._failed_heartbeats = 0

# Start the idle timer to be right now, since we aren't doing anything
self._idle_start_time = time.time()

self.scheduler.enter(0, 1, scheduler_update)
self.scheduler.enter(0, 2, scheduler_heartbeat)

Expand Down Expand Up @@ -502,16 +509,7 @@ def _update_deferred_tasks(self) -> Dict[int, TaskReturnMetadata]:
self._deferred_tasks = new_deferred_tasks
return ret

def update(self, new_tasks) -> None:
"""Examines the queue for completed tasks and adds successful completions to the database
while unsuccessful are logged for future inspection.
Parameters
----------
new_tasks
Try to get new tasks from the server
"""

def _update(self, new_tasks) -> None:
# First, try pushing back any stale results
deferred_return_info = self._update_deferred_tasks()

Expand Down Expand Up @@ -660,6 +658,40 @@ def update(self, new_tasks) -> None:
self.preprocess_new_tasks(new_task_info)
self._submit_tasks(executor_label, new_task_info)

def update(self, new_tasks) -> None:
"""Examines the queue for completed tasks and adds successful completions to the database
while unsuccessful are logged for future inspection.
Parameters
----------
new_tasks
Try to get new tasks from the server
"""

self._update(new_tasks=new_tasks)

if self.manager_config.max_idle_time is None:
return

# Check if we are idle. If we are beyond the max idle time, then shut down
is_idle = (self.n_total_active_tasks == 0) and (self.n_deferred_tasks == 0)

if is_idle and self._idle_start_time is None:
self._idle_start_time = time.time()

if not is_idle:
self._idle_start_time = None
else:
idle_time = time.time() - self._idle_start_time
if idle_time >= self.manager_config.max_idle_time:
self.logger.warning(
f"Manager has been idle for {idle_time:.2f} seconds - max is "
f"{self.manager_config.max_idle_time}, shutting down"
)
self.stop()
else:
self.logger.info(f"Manager has been idle for {idle_time:.2f} seconds")

def preprocess_new_tasks(self, new_tasks: List[RecordTask]):
"""
Any processing to do to the new tasks
Expand Down
8 changes: 7 additions & 1 deletion qcfractalcompute/qcfractalcompute/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class FractalComputeConfig(BaseModel):
gt=0,
)

max_idle_time: Optional[int] = Field(
None,
description="Maximum consecutive time in seconds that the manager "
"should be allowed to run. If this is reached, the manager will shutdown.",
)

parsl_run_dir: str = "parsl_run_dir"
parsl_usage_tracking: int = 0

Expand All @@ -220,7 +226,7 @@ def _check_logfile(cls, v, values):
def _check_run_dir(cls, v, values):
return _make_abs_path(v, values["base_folder"], "parsl_run_dir")

@validator("update_frequency", pre=True)
@validator("update_frequency", "max_idle_time", pre=True)
def _convert_durations(cls, v):
return duration_to_seconds(v)

Expand Down
38 changes: 38 additions & 0 deletions qcfractalcompute/qcfractalcompute/test_compute_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,41 @@ def test_manager_missed_heartbeats_shutdown(snowflake: QCATestingSnowflake):

compute_thread._compute_thread.join(5)
assert compute_thread.is_alive() is False


def test_manager_idle_shutdown_0(snowflake: QCATestingSnowflake):
add_config = {"max_idle_time": 0}
compute_thread = QCATestingComputeThread(snowflake._qcf_config, additional_manager_config=add_config)
compute_thread.start(manual_updates=False)

for i in range(10):
time.sleep(1)
if not compute_thread.is_alive():
break
else:
raise RuntimeError("Compute thread did not stop in 10 seconds")

compute_thread._compute_thread.join(5)
assert compute_thread.is_alive() is False


def test_manager_idle_shutdown_5(snowflake: QCATestingSnowflake):
storage_socket = snowflake.get_storage_socket()

add_config = {"max_idle_time": 5}
compute_thread = QCATestingComputeThread(snowflake._qcf_config, additional_manager_config=add_config)
compute_thread.start(manual_updates=False)

time.sleep(2)
assert compute_thread.is_alive()

populate_db(storage_socket)

time.sleep(9)
assert compute_thread.is_alive()

time.sleep(10)
assert not compute_thread.is_alive()

compute_thread._compute_thread.join(5)
assert compute_thread.is_alive() is False
8 changes: 8 additions & 0 deletions qcfractalcompute/qcfractalcompute/test_manager_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,25 @@ def test_manager_config_durations(tmp_path):
base_config = copy.deepcopy(_base_config)

base_config["update_frequency"] = "900"
base_config["max_idle_time"] = "3600"
manager_config = FractalComputeConfig(base_folder=base_folder, **base_config)
assert manager_config.update_frequency == 900
assert manager_config.max_idle_time == 3600

base_config["update_frequency"] = 900
base_config["max_idle_time"] = 3600
manager_config = FractalComputeConfig(base_folder=base_folder, **base_config)
assert manager_config.update_frequency == 900
assert manager_config.max_idle_time == 3600

base_config["update_frequency"] = "3d4h80m09s"
base_config["max_idle_time"] = "1d8h99m77s"
manager_config = FractalComputeConfig(base_folder=base_folder, **base_config)
assert manager_config.update_frequency == 278409
assert manager_config.max_idle_time == 121217

base_config["update_frequency"] = "3:04:80:9"
base_config["max_idle_time"] = "1:8:99:77"
manager_config = FractalComputeConfig(base_folder=base_folder, **base_config)
assert manager_config.update_frequency == 278409
assert manager_config.max_idle_time == 121217
54 changes: 45 additions & 9 deletions qcfractalcompute/qcfractalcompute/testing_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,41 @@
import tempfile
import threading
import weakref
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any

import parsl

from qcarchivetesting.testing_classes import _activated_manager_programs
from qcfractal.components.optimization.testing_helpers import submit_test_data as submit_opt_test_data
from qcfractal.components.singlepoint.testing_helpers import submit_test_data as submit_sp_test_data
from qcfractal.config import FractalConfig
from qcfractal.config import update_nested_dict
from qcfractal.db_socket import SQLAlchemySocket
from qcfractalcompute.apps.models import AppTaskResult
from qcfractalcompute.compress import compress_result
from qcfractalcompute.compute_manager import ComputeManager
from qcfractalcompute.config import FractalComputeConfig, FractalServerSettings, LocalExecutorConfig
from qcportal.all_results import AllResultTypes
from qcportal.all_results import AllResultTypes, FailedOperation
from qcportal.record_models import PriorityEnum, RecordTask

failed_op = FailedOperation(
input_data=None,
error={"error_type": "internal_error", "error_message": "This is a test error message"},
)


class MockTestingAppManager:
def all_program_info(self, executor_label: Optional[str] = None) -> Dict[str, List[str]]:
return _activated_manager_programs


class MockTestingComputeManager(ComputeManager):
def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTypes]):
def __init__(
self,
qcf_config: FractalConfig,
result_data: Optional[Dict[int, AllResultTypes]] = None,
additional_manager_config: Optional[Dict[str, Any]] = None,
):
self._qcf_config = qcf_config

host = self._qcf_config.api.host
Expand All @@ -43,7 +54,7 @@ def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTy
os.makedirs(parsl_run_dir, exist_ok=True)
os.makedirs(compute_scratch_dir, exist_ok=True)

self._compute_config = FractalComputeConfig(
config_dict = dict(
base_folder=tmpdir.name,
parsl_run_dir=parsl_run_dir,
cluster="mock_compute",
Expand All @@ -63,6 +74,11 @@ def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTy
},
)

if additional_manager_config:
config_dict = update_nested_dict(config_dict, additional_manager_config)

self._compute_config = FractalComputeConfig(**config_dict)

# Set the app manager already
self.app_manager = MockTestingAppManager()
ComputeManager.__init__(self, self._compute_config)
Expand All @@ -81,15 +97,26 @@ def cleanup(d):
def _submit_tasks(self, executor_label: str, tasks: List[RecordTask]):
# A mock app that just returns the result data given to it after two seconds
@parsl.python_app(data_flow_kernel=self.dflow_kernel)
def _mock_app(result: AllResultTypes) -> AppTaskResult:
def _mock_app(result: Optional[AllResultTypes]) -> AppTaskResult:
import time

time.sleep(2)
return AppTaskResult(success=result.success, walltime=2.0, result_compressed=compress_result(result.dict()))

if result is None:
return AppTaskResult(success=False, walltime=2.0, result_compressed=compress_result(failed_op.dict()))
else:
return AppTaskResult(
success=result.success, walltime=2.0, result_compressed=compress_result(result.dict())
)

for task in tasks:
self._record_id_map[task.id] = task.record_id
task_future = _mock_app(self._result_data[task.record_id])

if self._result_data:
task_future = _mock_app(self._result_data.get(task.record_id, None))
else:
task_future = _mock_app(None)

self._task_futures[executor_label][task.id] = task_future


Expand All @@ -98,10 +125,17 @@ class QCATestingComputeThread:
Runs a compute manager in a separate process
"""

def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTypes] = None):
def __init__(
self,
qcf_config: FractalConfig,
result_data: Dict[int, AllResultTypes] = None,
additional_manager_config: Optional[Dict[str, Any]] = None,
):
self._qcf_config = qcf_config
self._result_data = result_data

self._additional_manager_config = additional_manager_config

self._compute: Optional[MockTestingComputeManager] = None
self._compute_thread = None

Expand All @@ -117,7 +151,7 @@ def _stop(cls, compute, compute_thread):
def start(self, manual_updates) -> None:
if self._compute is not None:
raise RuntimeError("Compute manager already started")
self._compute = MockTestingComputeManager(self._qcf_config, self._result_data)
self._compute = MockTestingComputeManager(self._qcf_config, self._result_data, self._additional_manager_config)
self._compute_thread = threading.Thread(
target=self._compute.start, kwargs={"manual_updates": manual_updates}, daemon=True
)
Expand All @@ -138,6 +172,8 @@ def stop(self) -> None:
self._compute_thread = None

def is_alive(self) -> bool:
if self._compute_thread is None:
return False
return self._compute_thread.is_alive()


Expand Down

0 comments on commit 90883d3

Please sign in to comment.