diff --git a/qcarchivetesting/qcarchivetesting/testing_classes.py b/qcarchivetesting/qcarchivetesting/testing_classes.py index 0869d0f5d..cc7250cba 100644 --- a/qcarchivetesting/qcarchivetesting/testing_classes.py +++ b/qcarchivetesting/qcarchivetesting/testing_classes.py @@ -191,6 +191,10 @@ def reset(self): self._stop_compute() self._all_completed = set() self._qcf_config = self._original_config.copy(deep=True) + + if self._api_proc is None: + self.start_api() + self.pg_harness.recreate_database() def get_storage_socket(self) -> SQLAlchemySocket: diff --git a/qcfractalcompute/qcfractalcompute/compute_manager.py b/qcfractalcompute/qcfractalcompute/compute_manager.py index beb23d94a..e04604bc3 100644 --- a/qcfractalcompute/qcfractalcompute/compute_manager.py +++ b/qcfractalcompute/qcfractalcompute/compute_manager.py @@ -203,6 +203,9 @@ def __init__(self, config: FractalComputeConfig): # Number of failed heartbeats. After missing a bunch, we will shutdown self._failed_heartbeats = 0 + # Time at which the worker started idling (no jobs being run) + self._idle_start_time = None + @staticmethod def _get_max_workers(executor: ParslExecutor) -> int: """ @@ -296,6 +299,10 @@ def scheduler_heartbeat(): self.logger.info("Compute Manager successfully started.") self._failed_heartbeats = 0 + + # Start the idle timer to be right now, since we aren't doing anything + self._idle_start_time = time.time() + self.scheduler.enter(0, 1, scheduler_update) self.scheduler.enter(0, 2, scheduler_heartbeat) @@ -502,16 +509,7 @@ def _update_deferred_tasks(self) -> Dict[int, TaskReturnMetadata]: self._deferred_tasks = new_deferred_tasks return ret - def update(self, new_tasks) -> None: - """Examines the queue for completed tasks and adds successful completions to the database - while unsuccessful are logged for future inspection. - - Parameters - ---------- - new_tasks - Try to get new tasks from the server - """ - + def _update(self, new_tasks) -> None: # First, try pushing back any stale results deferred_return_info = self._update_deferred_tasks() @@ -660,6 +658,40 @@ def update(self, new_tasks) -> None: self.preprocess_new_tasks(new_task_info) self._submit_tasks(executor_label, new_task_info) + def update(self, new_tasks) -> None: + """Examines the queue for completed tasks and adds successful completions to the database + while unsuccessful are logged for future inspection. + + Parameters + ---------- + new_tasks + Try to get new tasks from the server + """ + + self._update(new_tasks=new_tasks) + + if self.manager_config.max_idle_time is None: + return + + # Check if we are idle. If we are beyond the max idle time, then shut down + is_idle = (self.n_total_active_tasks == 0) and (self.n_deferred_tasks == 0) + + if is_idle and self._idle_start_time is None: + self._idle_start_time = time.time() + + if not is_idle: + self._idle_start_time = None + else: + idle_time = time.time() - self._idle_start_time + if idle_time >= self.manager_config.max_idle_time: + self.logger.warning( + f"Manager has been idle for {idle_time:.2f} seconds - max is " + f"{self.manager_config.max_idle_time}, shutting down" + ) + self.stop() + else: + self.logger.info(f"Manager has been idle for {idle_time:.2f} seconds") + def preprocess_new_tasks(self, new_tasks: List[RecordTask]): """ Any processing to do to the new tasks diff --git a/qcfractalcompute/qcfractalcompute/config.py b/qcfractalcompute/qcfractalcompute/config.py index 319ebc7b1..11b27633b 100644 --- a/qcfractalcompute/qcfractalcompute/config.py +++ b/qcfractalcompute/qcfractalcompute/config.py @@ -201,6 +201,12 @@ class FractalComputeConfig(BaseModel): gt=0, ) + max_idle_time: Optional[int] = Field( + None, + description="Maximum consecutive time in seconds that the manager " + "should be allowed to run. If this is reached, the manager will shutdown.", + ) + parsl_run_dir: str = "parsl_run_dir" parsl_usage_tracking: int = 0 @@ -220,7 +226,7 @@ def _check_logfile(cls, v, values): def _check_run_dir(cls, v, values): return _make_abs_path(v, values["base_folder"], "parsl_run_dir") - @validator("update_frequency", pre=True) + @validator("update_frequency", "max_idle_time", pre=True) def _convert_durations(cls, v): return duration_to_seconds(v) diff --git a/qcfractalcompute/qcfractalcompute/test_compute_manager.py b/qcfractalcompute/qcfractalcompute/test_compute_manager.py index 2b5cdd02e..31bc0a18d 100644 --- a/qcfractalcompute/qcfractalcompute/test_compute_manager.py +++ b/qcfractalcompute/qcfractalcompute/test_compute_manager.py @@ -236,3 +236,41 @@ def test_manager_missed_heartbeats_shutdown(snowflake: QCATestingSnowflake): compute_thread._compute_thread.join(5) assert compute_thread.is_alive() is False + + +def test_manager_idle_shutdown_0(snowflake: QCATestingSnowflake): + add_config = {"max_idle_time": 0} + compute_thread = QCATestingComputeThread(snowflake._qcf_config, additional_manager_config=add_config) + compute_thread.start(manual_updates=False) + + for i in range(10): + time.sleep(1) + if not compute_thread.is_alive(): + break + else: + raise RuntimeError("Compute thread did not stop in 10 seconds") + + compute_thread._compute_thread.join(5) + assert compute_thread.is_alive() is False + + +def test_manager_idle_shutdown_5(snowflake: QCATestingSnowflake): + storage_socket = snowflake.get_storage_socket() + + add_config = {"max_idle_time": 5} + compute_thread = QCATestingComputeThread(snowflake._qcf_config, additional_manager_config=add_config) + compute_thread.start(manual_updates=False) + + time.sleep(2) + assert compute_thread.is_alive() + + populate_db(storage_socket) + + time.sleep(9) + assert compute_thread.is_alive() + + time.sleep(10) + assert not compute_thread.is_alive() + + compute_thread._compute_thread.join(5) + assert compute_thread.is_alive() is False diff --git a/qcfractalcompute/qcfractalcompute/test_manager_config.py b/qcfractalcompute/qcfractalcompute/test_manager_config.py index 8efba9838..79a0496a6 100644 --- a/qcfractalcompute/qcfractalcompute/test_manager_config.py +++ b/qcfractalcompute/qcfractalcompute/test_manager_config.py @@ -56,17 +56,25 @@ def test_manager_config_durations(tmp_path): base_config = copy.deepcopy(_base_config) base_config["update_frequency"] = "900" + base_config["max_idle_time"] = "3600" manager_config = FractalComputeConfig(base_folder=base_folder, **base_config) assert manager_config.update_frequency == 900 + assert manager_config.max_idle_time == 3600 base_config["update_frequency"] = 900 + base_config["max_idle_time"] = 3600 manager_config = FractalComputeConfig(base_folder=base_folder, **base_config) assert manager_config.update_frequency == 900 + assert manager_config.max_idle_time == 3600 base_config["update_frequency"] = "3d4h80m09s" + base_config["max_idle_time"] = "1d8h99m77s" manager_config = FractalComputeConfig(base_folder=base_folder, **base_config) assert manager_config.update_frequency == 278409 + assert manager_config.max_idle_time == 121217 base_config["update_frequency"] = "3:04:80:9" + base_config["max_idle_time"] = "1:8:99:77" manager_config = FractalComputeConfig(base_folder=base_folder, **base_config) assert manager_config.update_frequency == 278409 + assert manager_config.max_idle_time == 121217 diff --git a/qcfractalcompute/qcfractalcompute/testing_helpers.py b/qcfractalcompute/qcfractalcompute/testing_helpers.py index f315f587e..64ddf241d 100644 --- a/qcfractalcompute/qcfractalcompute/testing_helpers.py +++ b/qcfractalcompute/qcfractalcompute/testing_helpers.py @@ -4,7 +4,7 @@ import tempfile import threading import weakref -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any import parsl @@ -12,14 +12,20 @@ from qcfractal.components.optimization.testing_helpers import submit_test_data as submit_opt_test_data from qcfractal.components.singlepoint.testing_helpers import submit_test_data as submit_sp_test_data from qcfractal.config import FractalConfig +from qcfractal.config import update_nested_dict from qcfractal.db_socket import SQLAlchemySocket from qcfractalcompute.apps.models import AppTaskResult from qcfractalcompute.compress import compress_result from qcfractalcompute.compute_manager import ComputeManager from qcfractalcompute.config import FractalComputeConfig, FractalServerSettings, LocalExecutorConfig -from qcportal.all_results import AllResultTypes +from qcportal.all_results import AllResultTypes, FailedOperation from qcportal.record_models import PriorityEnum, RecordTask +failed_op = FailedOperation( + input_data=None, + error={"error_type": "internal_error", "error_message": "This is a test error message"}, +) + class MockTestingAppManager: def all_program_info(self, executor_label: Optional[str] = None) -> Dict[str, List[str]]: @@ -27,7 +33,12 @@ def all_program_info(self, executor_label: Optional[str] = None) -> Dict[str, Li class MockTestingComputeManager(ComputeManager): - def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTypes]): + def __init__( + self, + qcf_config: FractalConfig, + result_data: Optional[Dict[int, AllResultTypes]] = None, + additional_manager_config: Optional[Dict[str, Any]] = None, + ): self._qcf_config = qcf_config host = self._qcf_config.api.host @@ -43,7 +54,7 @@ def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTy os.makedirs(parsl_run_dir, exist_ok=True) os.makedirs(compute_scratch_dir, exist_ok=True) - self._compute_config = FractalComputeConfig( + config_dict = dict( base_folder=tmpdir.name, parsl_run_dir=parsl_run_dir, cluster="mock_compute", @@ -63,6 +74,11 @@ def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTy }, ) + if additional_manager_config: + config_dict = update_nested_dict(config_dict, additional_manager_config) + + self._compute_config = FractalComputeConfig(**config_dict) + # Set the app manager already self.app_manager = MockTestingAppManager() ComputeManager.__init__(self, self._compute_config) @@ -81,15 +97,26 @@ def cleanup(d): def _submit_tasks(self, executor_label: str, tasks: List[RecordTask]): # A mock app that just returns the result data given to it after two seconds @parsl.python_app(data_flow_kernel=self.dflow_kernel) - def _mock_app(result: AllResultTypes) -> AppTaskResult: + def _mock_app(result: Optional[AllResultTypes]) -> AppTaskResult: import time time.sleep(2) - return AppTaskResult(success=result.success, walltime=2.0, result_compressed=compress_result(result.dict())) + + if result is None: + return AppTaskResult(success=False, walltime=2.0, result_compressed=compress_result(failed_op.dict())) + else: + return AppTaskResult( + success=result.success, walltime=2.0, result_compressed=compress_result(result.dict()) + ) for task in tasks: self._record_id_map[task.id] = task.record_id - task_future = _mock_app(self._result_data[task.record_id]) + + if self._result_data: + task_future = _mock_app(self._result_data.get(task.record_id, None)) + else: + task_future = _mock_app(None) + self._task_futures[executor_label][task.id] = task_future @@ -98,10 +125,17 @@ class QCATestingComputeThread: Runs a compute manager in a separate process """ - def __init__(self, qcf_config: FractalConfig, result_data: Dict[int, AllResultTypes] = None): + def __init__( + self, + qcf_config: FractalConfig, + result_data: Dict[int, AllResultTypes] = None, + additional_manager_config: Optional[Dict[str, Any]] = None, + ): self._qcf_config = qcf_config self._result_data = result_data + self._additional_manager_config = additional_manager_config + self._compute: Optional[MockTestingComputeManager] = None self._compute_thread = None @@ -117,7 +151,7 @@ def _stop(cls, compute, compute_thread): def start(self, manual_updates) -> None: if self._compute is not None: raise RuntimeError("Compute manager already started") - self._compute = MockTestingComputeManager(self._qcf_config, self._result_data) + self._compute = MockTestingComputeManager(self._qcf_config, self._result_data, self._additional_manager_config) self._compute_thread = threading.Thread( target=self._compute.start, kwargs={"manual_updates": manual_updates}, daemon=True ) @@ -138,6 +172,8 @@ def stop(self) -> None: self._compute_thread = None def is_alive(self) -> bool: + if self._compute_thread is None: + return False return self._compute_thread.is_alive()