Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[HD Monitor] Fix pylint/flake8 errors (import order, docstrings, quot…
Browse files Browse the repository at this point in the history
…es, ...)
limaanto committed Jun 27, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 248a242 commit 0e97de9
Showing 1 changed file with 46 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#! /usr/bin/env python3
"""Hard Drive (or any other memory) monitor. Contains a the monitor node and its main function."""
# -*- coding: utf-8 -*-
#
# Software License Agreement (BSD License)
@@ -36,60 +37,78 @@
# \author Kevin Watts
# \author Antoine Lima

from typing import List
from pathlib import Path
from socket import gethostname
from shutil import disk_usage
from socket import gethostname
from typing import List

import rclpy
from diagnostic_msgs.msg import DiagnosticStatus, KeyValue
from diagnostic_updater import Updater
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from diagnostic_updater import Updater
from diagnostic_msgs.msg import DiagnosticStatus, KeyValue


FREE_PERCENT_LOW = 0.05
FREE_PERCENT_CRIT = 0.01
DICT_STATUS = {
DiagnosticStatus.OK: "OK",
DiagnosticStatus.WARN: "Warning",
DiagnosticStatus.ERROR: "Error",
DiagnosticStatus.OK: 'OK',
DiagnosticStatus.WARN: 'Warning',
DiagnosticStatus.ERROR: 'Error',
}
DICT_USAGE = {
DiagnosticStatus.OK: "OK",
DiagnosticStatus.WARN: "Low Disk Space",
DiagnosticStatus.ERROR: "Very Low Disk Space",
DiagnosticStatus.OK: 'OK',
DiagnosticStatus.WARN: 'Low Disk Space',
DiagnosticStatus.ERROR: 'Very Low Disk Space',
}


class HDMonitor(Node):
"""Diagnostic node checking the remaining space on the specified hard drive.
Three ROS parameters:
- path: Path on the filesystem to check (string, default: home directory)
- free_percent_low: Percentage at which to consider the space left as low
- free_percent_crit: Percentage at which to consider the space left as critical
"""

def __init__(self):
hostname = gethostname().replace(".", "_").replace("-", "_")
super().__init__(f"hd_monitor_{hostname}")
hostname = gethostname().replace('.', '_').replace('-', '_')
super().__init__(f'hd_monitor_{hostname}')

self.add_on_set_parameters_callback(self.callback_config)
self.declare_parameter("path", "~")
self.declare_parameter("free_percent_low", 0.05)
self.declare_parameter("free_percent_crit", 0.01)
self.declare_parameter('path', '~')
self.declare_parameter('free_percent_low', 0.05)
self.declare_parameter('free_percent_crit', 0.01)
self._path = self._free_percent_low = self._free_percent_crit = None

self._updater = Updater(self)
self._updater.setHardwareID(hostname)
self._updater.add(f"{hostname} HD Usage", self.check_disk_usage)
self._updater.add(f'{hostname} HD Usage', self.check_disk_usage)

def callback_config(self, params: List[rclpy.Parameter]):
"""Retrieve ROS parameters.
see the class documentation for declared parameters.
"""
for param in params:
match param.name:
case "path":
case 'path':
self._path = str(
Path(param.value).expanduser().resolve(strict=True)
)
case "free_percent_low":
case 'free_percent_low':
self._free_percent_low = param.value
case "free_percent_crit":
case 'free_percent_crit':
self._free_percent_crit = param.value

return SetParametersResult(successful=True)

def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus:
"""Compute the disk usage and derive a status from it.
Task periodically ran by the diagnostic updater.
"""
diag.level = DiagnosticStatus.OK

total, _, free = disk_usage(self._path)
@@ -102,13 +121,13 @@ def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus:
else:
diag.level = DiagnosticStatus.ERROR

total_Go = total // (1024 * 1024)
total_go = total // (1024 * 1024)
diag.values.extend(
[
KeyValue(key="Name", value=self._path),
KeyValue(key="Status", value=DICT_STATUS[diag.level]),
KeyValue(key="Total (Go)", value=str(total_Go)),
KeyValue(key="Available (%)", value=str(round(percent, 2))),
KeyValue(key='Name', value=self._path),
KeyValue(key='Status', value=DICT_STATUS[diag.level]),
KeyValue(key='Total (Go)', value=str(total_go)),
KeyValue(key='Available (%)', value=str(round(percent, 2))),
]
)

@@ -117,6 +136,7 @@ def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus:


def main(args=None):
"""Run the HDMonitor class."""
rclpy.init(args=args)

node = HDMonitor()
@@ -126,5 +146,5 @@ def main(args=None):
pass


if __name__ == "__main__":
if __name__ == '__main__':
main()

0 comments on commit 0e97de9

Please sign in to comment.