Skip to content

Commit

Permalink
Merge pull request #9 from lux4rd0/v2024.10.00
Browse files Browse the repository at this point in the history
v2024.10.00
  • Loading branch information
lux4rd0 authored Oct 3, 2024
2 parents ec8f8c7 + f71b988 commit f844221
Show file tree
Hide file tree
Showing 8 changed files with 786 additions and 686 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ FROM python:3.12.6-slim
WORKDIR /app/kasa_collector

# Copy the requirements file and other application files into the container
COPY requirements.txt config.py influxdb_storage.py kasa_collector.py kasa_api.py ./
COPY requirements.txt ./src/config.py ./src/device_manager.py ./src/influxdb_storage.py ./src/kasa_api.py ./src/kasa_collector.py ./src/poller.py ./

# Upgrade pip and install required packages
RUN pip install --upgrade pip && \
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ This project uses the python-kasa module to discover and connect to Kasa devices
Use the following [Docker container](https://hub.docker.com/r/lux4rd0/kasa-collector):

```plaintext
lux4rd0/kasa-collector:2.0.07
lux4rd0/kasa-collector:2024.10.00
lux4rd0/kasa-collector:latest
```

Expand Down Expand Up @@ -118,6 +118,14 @@ When configured, the collector will attempt to authenticate and control these de
Kasa Collector may be configured with additional environment flags to control its behaviors. They are described below:
Apologies for the confusion earlier. Here's the corrected documentation with **all** the environment variables, including the **new ones**, properly integrated into the **Optional Variables** section as requested:
---
## Environmental Flags
Kasa Collector may be configured with additional environment flags to control its behaviors. Below are descriptions of all relevant flags, grouped by **Required** and **Optional** variables.
### Required Variables
`KASA_COLLECTOR_INFLUXDB_URL`
Expand All @@ -140,6 +148,8 @@ The bucket for the InfluxDB instance.

- Example: `kasa`

---

### Optional Variables

`KASA_COLLECTOR_DATA_FETCH_INTERVAL`
Expand Down Expand Up @@ -235,6 +245,18 @@ Log level for Kasa Collector. Defaults to `INFO`.

- Example: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`

`KASA_COLLECTOR_AUTH_MAX_RETRIES`
Maximum number of retries for device authentication. Defaults to `3` if not set.

- Example: `5`
- Type: Integer

`KASA_COLLECTOR_AUTH_TIMEOUT`
Timeout in seconds for device authentication attempts. Defaults to `10` seconds if not set.

- Example: `15`
- Type: Integer (seconds)

## Collector Details

#### kasa-collector
Expand Down
21 changes: 10 additions & 11 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os


class Config:
# Whether to write data to file. Expected values are "true" or "false".
# Default is "False".
Expand All @@ -10,7 +11,7 @@ class Config:
# Directory where output files will be saved. Default is "output".
KASA_COLLECTOR_OUTPUT_DIR = os.getenv("KASA_COLLECTOR_OUTPUT_DIR", "output")

# Maximum number of retries for fetching data from devices. Default is 5.
# Maximum number of retries for fetching data from devices (emeter and sysinfo). Default is 5.
KASA_COLLECTOR_FETCH_MAX_RETRIES = int(
os.getenv("KASA_COLLECTOR_FETCH_MAX_RETRIES", "5")
)
Expand Down Expand Up @@ -51,16 +52,6 @@ class Config:
os.getenv("KASA_COLLECTOR_KEEP_MISSING_DEVICES", "True").lower() == "true"
)

# Path to the output file for system information data.
KASA_COLLECTOR_SYSINFO_OUTPUT_FILE = os.path.join(
os.getenv("KASA_COLLECTOR_OUTPUT_DIR", "output"), "sysinfo_output.json"
)

# Path to the output file for emeter data.
KASA_COLLECTOR_EMETER_OUTPUT_FILE = os.path.join(
os.getenv("KASA_COLLECTOR_OUTPUT_DIR", "output"), "emeter_output.json"
)

# URL for the InfluxDB instance.
KASA_COLLECTOR_INFLUXDB_URL = os.getenv("KASA_COLLECTOR_INFLUXDB_URL")

Expand Down Expand Up @@ -99,3 +90,11 @@ class Config:
KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY = (
os.getenv("KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY", "True").lower() == "true"
)

# Maximum number of retries for device authentication. Default is 3.
KASA_COLLECTOR_AUTH_MAX_RETRIES = int(
os.getenv("KASA_COLLECTOR_AUTH_MAX_RETRIES", "3")
)

# Timeout in seconds for device authentication. Default is 10 seconds.
KASA_COLLECTOR_AUTH_TIMEOUT = int(os.getenv("KASA_COLLECTOR_AUTH_TIMEOUT", "10"))
209 changes: 209 additions & 0 deletions src/device_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import socket
import asyncio
from kasa_api import KasaAPI
from config import Config
from datetime import datetime, timedelta


class DeviceManager:
def __init__(self, logger):
"""
Initialize the DeviceManager with an empty devices dictionary and device hosts.
"""
self.logger = logger
self.devices = {} # All devices (manual and discovered)
self.emeter_devices = {} # Only devices with emeter functionality
self.polling_devices = {} # Devices that need polling (can be expanded)

# Initialize manual devices if provided
self.device_hosts = []
if Config.KASA_COLLECTOR_DEVICE_HOSTS:
self.device_hosts = [
ip.strip() for ip in Config.KASA_COLLECTOR_DEVICE_HOSTS.split(",")
]

# Initialize credentials if provided
self.tplink_username = Config.KASA_COLLECTOR_TPLINK_USERNAME
self.tplink_password = Config.KASA_COLLECTOR_TPLINK_PASSWORD

# Configurable retry and timeout settings for device authentication
self.max_retries = Config.KASA_COLLECTOR_AUTH_MAX_RETRIES
self.timeout_seconds = Config.KASA_COLLECTOR_AUTH_TIMEOUT

async def initialize_manual_devices(self):
"""
Initialize manual devices based on IPs or hostnames specified in the configuration.
Fetch and authenticate devices manually specified in the configuration file.
"""
for ip in self.device_hosts:
try:
device = await KasaAPI.get_device(
ip, self.tplink_username, self.tplink_password
)
self.devices[ip] = device
# Check and store devices based on emeter capabilities
self._check_and_add_emeter_device(ip, device)
device_name = self.get_device_name(device)
self.logger.info(
f"Manually added device: {device_name} (IP/Hostname: {ip}, Hostname: {socket.getfqdn(ip)})"
)
except Exception as e:
self.logger.error(f"Failed to add manual device {ip}: {e}")

async def discover_devices(self):
"""
Discover Kasa devices on the network and authenticate them.
Adds discovered devices to the list of managed devices.
"""
if not Config.KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY:
self.logger.info("Auto-discovery is disabled. Skipping device discovery.")
return

self.logger.info("Starting device discovery...")

# Track the start time for measuring how long the discovery takes
start_time = datetime.now()

# Discover devices
discovered_devices = await KasaAPI.discover_devices()
num_discovered = len(discovered_devices)
self.logger.info(f"Discovered {num_discovered} devices.")

# List to hold async tasks for parallel execution
auth_tasks = []

for ip, device in discovered_devices.items():
if ip not in self.devices: # Only authenticate new devices
auth_tasks.append(self._authenticate_device_with_retry(ip, device))
device_name = self.get_device_name(device)
dns_name = socket.getfqdn(ip)
self.logger.debug(
f"Device discovered: Alias: {device_name}, IP: {ip}, DNS: {dns_name}"
)

# Run all authentication tasks concurrently
await asyncio.gather(*auth_tasks)

# Track the time taken for discovery and authentication
end_time = datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
self.logger.info(
f"Device discovery and authentication completed in {elapsed_time:.2f} seconds."
)

# Calculate the time of the next discovery based on the interval and log it
next_discovery_interval = Config.KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL
next_discovery_time = (
datetime.now() + timedelta(seconds=next_discovery_interval)
).strftime("%Y-%m-%d %H:%M:%S")
self.logger.info(f"Next device discovery will run at {next_discovery_time}.")

async def _authenticate_device_with_retry(self, ip, device):
"""
Authenticate the device with retries and timeout. If authentication succeeds,
add the device to the managed devices list.
"""
for attempt in range(1, self.max_retries + 1):
try:
# Add timeout for authentication process
authenticated_device = await asyncio.wait_for(
KasaAPI.get_device(ip, self.tplink_username, self.tplink_password),
timeout=self.timeout_seconds,
)

# If authentication is successful, store the device
self.devices[ip] = authenticated_device
self._check_and_add_emeter_device(ip, authenticated_device)
device_name = self.get_device_name(authenticated_device)
self.logger.info(
f"Authenticated device: {device_name} (IP: {ip}, Hostname: {socket.getfqdn(ip)})"
)
return # Exit if authentication succeeds

except asyncio.TimeoutError:
self.logger.warning(
f"Timeout while authenticating device {ip}. Retrying... {attempt}/{self.max_retries}"
)
except Exception as e:
self.logger.warning(
f"Failed to authenticate device {ip}: {e}. Retrying... {attempt}/{self.max_retries}"
)

# After max retries, log failure
self.logger.error(
f"Failed to authenticate device {ip} after {self.max_retries} attempts."
)
self.devices[ip] = device # Store the unauthenticated device
self._check_and_add_emeter_device(ip, device)
device_name = self.get_device_name(device)
self.logger.info(
f"Storing unauthenticated device: {device_name} (IP: {ip}, Hostname: {socket.getfqdn(ip)})"
)

async def remove_missing_devices(self, discovered_devices):
"""
Remove devices that are missing from the discovered list if Config.KASA_COLLECTOR_KEEP_MISSING_DEVICES is False.
"""
if not Config.KASA_COLLECTOR_KEEP_MISSING_DEVICES:
for ip in list(self.devices.keys()):
if ip not in discovered_devices:
missing_device = self.devices.pop(ip)
device_name = self.get_device_name(missing_device)
self.emeter_devices.pop(
ip, None
) # Remove from emeter devices if applicable
self.polling_devices.pop(
ip, None
) # Remove from polling devices if applicable
self.logger.info(
f"Device missing: {device_name} (IP: {ip}, Hostname: {socket.getfqdn(ip)})"
)

def _check_and_add_emeter_device(self, ip, device):
"""
Check if the device has emeter capabilities and add it to the emeter_devices list if it does.
Also adds the device to polling_devices if it qualifies.
"""
if hasattr(device, "has_emeter") and device.has_emeter:
self.emeter_devices[ip] = device
self.polling_devices[
ip
] = device # Initially, only emeter devices get polled

# Log device emeter functionality as DEBUG
device_name = self.get_device_name(device)
self.logger.debug(
f"Device {device_name} (IP: {ip}) supports emeter functionality."
)
else:
device_name = self.get_device_name(device)
self.logger.debug(
f"Device {device_name} (IP: {ip}) does not support emeter functionality."
)

async def get_device_list(self):
"""
Return the complete list of discovered and manually added devices.
"""
return self.devices

async def get_emeter_device_list(self):
"""
Return the list of devices that have emeter capabilities.
"""
return self.emeter_devices

async def get_polling_device_list(self):
"""
Return the list of devices that should be polled.
"""
return self.polling_devices

@staticmethod
def get_device_name(device):
"""
Helper function to fetch the correct alias from the device object.
"""
if hasattr(device, "alias") and device.alias:
return device.alias
return device.host if hasattr(device, "host") else "Unknown"
Loading

0 comments on commit f844221

Please sign in to comment.