Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: tapo discovery #168

Merged
merged 2 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions plugp100/discovery/arp_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
import functools
import ipaddress
import logging
from math import floor
from typing import Optional

from scapy.layers.l2 import Ether, ARP
from scapy.sendrecv import srp

from plugp100.common.functional.tri import Try, Failure

logger = logging.getLogger(__name__)


class ArpLookup:
@staticmethod
async def lookup(
mac: str,
network: str,
timeout: Optional[int] = None,
allow_promiscuous: bool = True,
) -> Try[Optional[str]]:
"""
Perform an ARP scan on a network to find the IP address associated with a given MAC address.
Args:
mac (str): The target MAC address for which to find the corresponding IP address.
network (str): The network IP range to scan, e.g., "192.168.1.0/24".
timeout (int): Timeout to wait for ARP response
Returns:
Optional[str]: The IP address corresponding to the provided MAC address if found, or None if not found.
Note:
This function disables promiscuous mode, sends an ARP request to the specified network using the provided
MAC address, and captures responses to identify the IP address associated with the MAC address.
It constructs ARP packets using the Scapy library.
Example:
network = "192.168.1.0/24"
target_mac = "00:11:22:33:44:55"
ip_address = LocalDeviceFinder.scan_one(network, target_mac)
if ip_address:
print(f"The IP address of {target_mac} is {ip_address}")
else:
print(f"No IP address found for {target_mac}")
"""
try:
from scapy.all import conf as scapyconf

scapyconf.sniff_promisc = 1 if allow_promiscuous else 0
arp_request = Ether(dst=mac) / ARP(pdst=network)
timeout = ArpLookup._estimate_timeout(network) if timeout is None else timeout
result, _ = await asyncio.get_event_loop().run_in_executor(
None,
functools.partial(srp, arp_request, timeout=timeout, verbose=False),
)
for sent, received in result:
return Try.of(received[ARP].psrc)
return Try.of(None)
except Exception as e:
logger.warning("Failed to scan network %s error: %s", network, str(e))
return Failure(e)

@staticmethod
def _estimate_timeout(network: str) -> int:
network = ipaddress.IPv4Network(network, strict=False)
ip_count = network.num_addresses
return floor((ip_count / 128) * 1)


def run_async(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, lambda: f(*args, **kwargs))

return inner
44 changes: 44 additions & 0 deletions plugp100/discovery/discovered_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import dataclasses
from typing import Optional, Any


@dataclasses.dataclass
class DiscoveredDevice:
device_type: str
device_model: str
ip: str
mac: str
mgt_encrypt_schm: "EncryptionScheme"

device_id: Optional[str] = None
owner: Optional[str] = None
hw_ver: Optional[str] = None
is_support_iot_cloud: Optional[bool] = None
obd_src: Optional[str] = None
factory_default: Optional[bool] = None

@staticmethod
def from_dict(values: dict[str, Any]) -> "DiscoveredDevice":
return DiscoveredDevice(
device_type=values.get("device_type", values.get("device_type_text")),
device_model=values.get("device_model", values.get("model")),
ip=values.get("ip", values.get("alias")),
mac=values.get("mac"),
device_id=values.get("device_id", values.get("device_id_hash", None)),
owner=values.get("owner", values.get("device_owner_hash", None)),
hw_ver=values.get("hw_ver", None),
is_support_iot_cloud=values.get("is_support_iot_cloud", None),
obd_src=values.get("obd_src", None),
factory_default=values.get("factory_default", None),
mgt_encrypt_schm=EncryptionScheme(**values.get("mgt_encrypt_schm")),
)


@dataclasses.dataclass
class EncryptionScheme:
"""Base model for encryption scheme of discovery result."""

is_support_https: Optional[bool] = None
encrypt_type: Optional[str] = None
http_port: Optional[int] = None
lv: Optional[int] = 1
131 changes: 0 additions & 131 deletions plugp100/discovery/local_device_finder.py

This file was deleted.

Loading