diff --git a/diagnostic_common_diagnostics/diagnostic_common_diagnostics/hd_monitor.py b/diagnostic_common_diagnostics/diagnostic_common_diagnostics/hd_monitor.py index 93a9f6ac..2eecf3dc 100755 --- a/diagnostic_common_diagnostics/diagnostic_common_diagnostics/hd_monitor.py +++ b/diagnostic_common_diagnostics/diagnostic_common_diagnostics/hd_monitor.py @@ -1,4 +1,5 @@ #! /usr/bin/env python3 +"""Hard Drive (or any other memory) monitor. Contains a the monitor node and its main function.""" # -*- coding: utf-8 -*- # # Software License Agreement (BSD License) @@ -36,60 +37,78 @@ # \author Kevin Watts # \author Antoine Lima -from typing import List from pathlib import Path -from socket import gethostname from shutil import disk_usage +from socket import gethostname +from typing import List -import rclpy +from diagnostic_msgs.msg import DiagnosticStatus, KeyValue +from diagnostic_updater import Updater from rcl_interfaces.msg import SetParametersResult +import rclpy from rclpy.node import Node -from diagnostic_updater import Updater -from diagnostic_msgs.msg import DiagnosticStatus, KeyValue FREE_PERCENT_LOW = 0.05 FREE_PERCENT_CRIT = 0.01 DICT_STATUS = { - DiagnosticStatus.OK: "OK", - DiagnosticStatus.WARN: "Warning", - DiagnosticStatus.ERROR: "Error", + DiagnosticStatus.OK: 'OK', + DiagnosticStatus.WARN: 'Warning', + DiagnosticStatus.ERROR: 'Error', } DICT_USAGE = { - DiagnosticStatus.OK: "OK", - DiagnosticStatus.WARN: "Low Disk Space", - DiagnosticStatus.ERROR: "Very Low Disk Space", + DiagnosticStatus.OK: 'OK', + DiagnosticStatus.WARN: 'Low Disk Space', + DiagnosticStatus.ERROR: 'Very Low Disk Space', } class HDMonitor(Node): + """Diagnostic node checking the remaining space on the specified hard drive. + + Three ROS parameters: + - path: Path on the filesystem to check (string, default: home directory) + - free_percent_low: Percentage at which to consider the space left as low + - free_percent_crit: Percentage at which to consider the space left as critical + """ + def __init__(self): - hostname = gethostname().replace(".", "_").replace("-", "_") - super().__init__(f"hd_monitor_{hostname}") + hostname = gethostname().replace('.', '_').replace('-', '_') + super().__init__(f'hd_monitor_{hostname}') self.add_on_set_parameters_callback(self.callback_config) - self.declare_parameter("path", "~") - self.declare_parameter("free_percent_low", 0.05) - self.declare_parameter("free_percent_crit", 0.01) + self.declare_parameter('path', '~') + self.declare_parameter('free_percent_low', 0.05) + self.declare_parameter('free_percent_crit', 0.01) + self._path = self._free_percent_low = self._free_percent_crit = None self._updater = Updater(self) self._updater.setHardwareID(hostname) - self._updater.add(f"{hostname} HD Usage", self.check_disk_usage) + self._updater.add(f'{hostname} HD Usage', self.check_disk_usage) def callback_config(self, params: List[rclpy.Parameter]): + """Retrieve ROS parameters. + + see the class documentation for declared parameters. + """ for param in params: match param.name: - case "path": + case 'path': self._path = str( Path(param.value).expanduser().resolve(strict=True) ) - case "free_percent_low": + case 'free_percent_low': self._free_percent_low = param.value - case "free_percent_crit": + case 'free_percent_crit': self._free_percent_crit = param.value + return SetParametersResult(successful=True) def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus: + """Compute the disk usage and derive a status from it. + + Task periodically ran by the diagnostic updater. + """ diag.level = DiagnosticStatus.OK total, _, free = disk_usage(self._path) @@ -102,13 +121,13 @@ def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus: else: diag.level = DiagnosticStatus.ERROR - total_Go = total // (1024 * 1024) + total_go = total // (1024 * 1024) diag.values.extend( [ - KeyValue(key="Name", value=self._path), - KeyValue(key="Status", value=DICT_STATUS[diag.level]), - KeyValue(key="Total (Go)", value=str(total_Go)), - KeyValue(key="Available (%)", value=str(round(percent, 2))), + KeyValue(key='Name', value=self._path), + KeyValue(key='Status', value=DICT_STATUS[diag.level]), + KeyValue(key='Total (Go)', value=str(total_go)), + KeyValue(key='Available (%)', value=str(round(percent, 2))), ] ) @@ -117,6 +136,7 @@ def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus: def main(args=None): + """Run the HDMonitor class.""" rclpy.init(args=args) node = HDMonitor() @@ -126,5 +146,5 @@ def main(args=None): pass -if __name__ == "__main__": +if __name__ == '__main__': main()