-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #31 from astrobytec/main
feat: Add cativity
- Loading branch information
Showing
11 changed files
with
1,570 additions
and
0 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# Cativity Detector | ||
CativityDetector is a tool designed to analyze channel activity in Zigbee networks. It uses a CatSniffer device to monitor Zigbee channels, captures packets, and displays data related to network activity. The tool also provides graphical representations of the activity, helping users analyze the traffic over various Zigbee channels. | ||
|
||
## Installation | ||
To use *CatitivityDetector*, yo must have the required dependencies installed. You can install them with `pip`: | ||
```shell | ||
pip install -r requirements.txt | ||
``` | ||
You will also need to connect the CatSniffer device to your computer, as it is the primary hardware used by this tool. | ||
|
||
## Usage | ||
You can run the tool via the command line: | ||
```shell | ||
python cativity.py catsniffer_path [options] | ||
``` | ||
**Options**: | ||
|
||
- **catsniffer**: The serial path to the CatSniffer device. The default path is automatically detected. | ||
- **channel**: The Zigbee channel to start sniffing on. If not provided, the tool will hop through channels 11 to 26. | ||
- **topology**: Show the topology of the network. | ||
|
||
> [!NOTE] | ||
> The automatically function may fail in some cases where the operative systems not recognize the vendor ID, and if you have two catnsniffer connected, the first one will be the returned port | ||
### Examples | ||
Run with a automatucally detect catsniffer and hopping channel: | ||
```shell | ||
python cativity.py | ||
|
||
____ _ _ _ _ ____ _ _ | ||
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \ ___| |_ ___ ___| |_ ___ _ __ | ||
| | / _` | __| \ \ / / | __| | | | | | |/ _ \ __/ _ \/ __| __/ _ \| '__| | ||
| |__| (_| | |_| |\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | | | ||
\____\__,_|\__|_| \_/ |_|\__|\__, |____/ \___|\__\___|\___|\__\___/|_| | ||
|___/ | ||
A tool to analyze the channel activity fro Zigbee Networks | ||
Author: astrobyte | ||
Version: 1.0 | ||
Channel Activity | ||
┏━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━┓ | ||
┃ Current ┃ Channel ┃ Activity ┃ Packets ┃ | ||
┡━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━┩ | ||
│ │ 11 │ │ 0 │ | ||
│ │ 12 │ ❚ │ 1 │ | ||
│ │ 13 │ │ 0 │ | ||
│ │ 14 │ │ 0 │ | ||
│ │ 15 │ │ 0 │ | ||
│ │ 16 │ │ 0 │ | ||
│ │ 17 │ ❚ │ 1 │ | ||
│ │ 18 │ │ 0 │ | ||
│ │ 19 │ │ 0 │ | ||
│ ----> │ 20 │ │ 0 │ | ||
│ │ 21 │ │ 0 │ | ||
│ │ 22 │ │ 0 │ | ||
│ │ 23 │ │ 0 │ | ||
│ │ 24 │ │ 0 │ | ||
│ │ 25 │ ❚❚ │ 2 │ | ||
│ │ 26 │ ❚❚ │ 2 │ | ||
└─────────┴─────────┴──────────┴─────────┘ | ||
Channel Hopping Activity | ||
``` | ||
Run with a explicit path and fixed channel | ||
```shell | ||
python cativity.py /dev/ttyACM0 --channel 25 | ||
____ _ _ _ _ ____ _ _ | ||
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \ ___| |_ ___ ___| |_ ___ _ __ | ||
| | / _` | __| \ \ / / | __| | | | | | |/ _ \ __/ _ \/ __| __/ _ \| '__| | ||
| |__| (_| | |_| |\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | | | ||
\____\__,_|\__|_| \_/ |_|\__|\__, |____/ \___|\__\___|\___|\__\___/|_| | ||
|___/ | ||
A tool to analyze the channel activity fro Zigbee Networks | ||
Author: astrobyte | ||
Version: 1.0 | ||
Channel Activity | ||
┏━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━┓ | ||
┃ Current ┃ Channel ┃ Activity ┃ Packets ┃ | ||
┡━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━┩ | ||
│ ----> │ 25 │ ❚❚ │ 2 │ | ||
└─────────┴─────────┴──────────┴─────────┘ | ||
Channel Hopping Activity | ||
``` | ||
Run a topology recognize | ||
```shell | ||
python cativity.py /dev/ttyACM0 --channel 25 --topology | ||
____ _ _ _ _ ____ _ _ | ||
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \ ___| |_ ___ ___| |_ ___ _ __ | ||
| | / _` | __| \ \ / / | __| | | | | | |/ _ \ __/ _ \/ __| __/ _ \| '__| | ||
| |__| (_| | |_| |\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | | | ||
\____\__,_|\__|_| \_/ |_|\__|\__, |____/ \___|\__\___|\___|\__\___/|_| | ||
|___/ | ||
A tool to analyze the channel activity fro Zigbee Networks | ||
Author: astrobyte | ||
Version: 1.0 | ||
Network Topology - 0 | ||
┏━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┓ | ||
┃ Children ┃ Ext. Source ┃ | ||
┡━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩ | ||
│ 0x3fba │ 74:4d:bd:ff:fe:60:30:fd │ | ||
└──────────┴─────────────────────────┘ | ||
Zigbee Network Topology | ||
``` | ||
## Functionality | ||
- Sniffing Zigbee Channels: The tool listens to Zigbee channels (11 to 26) and collects packet data. It can either hop between channels automatically or remain fixed on a user-specified channel. | ||
- Channel Hopping: The tool hops between Zigbee channels with a default interval of 3.5 seconds. It collects and analyzes packet data for each channel. | ||
- Data Collection: As packets are received, the tool processes them using the Sniffer class, which decodes the Zigbee frames. It uses the TISnifferPacket class to handle packet payloads. | ||
- Graphing Activity: The tool visualizes the channel activity using the Graphs class. It continuously updates the graph based on the number of packets received for each channel. | ||
- Topology: Show the network childs of the network as the packet will be detected. | ||
- Threading: The tool runs two background threads: | ||
- One for handling the channel hopping and activity collection. | ||
- One for updating and displaying the graphical representation of the channel activity. | ||
- Logging: The tool logs key events and errors to both the console and a log file (catbee.log). The default logging level is set to "WARNING", but this can be adjusted in the logging configuration. | ||
# Acknowledgements | ||
Special thanks to @kevlem97 for the catbee repository, which served as the foundation for this project. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
""" | ||
CativityDetector - A tool to analyze the channel activity for Zigbee Networks | ||
Developed by astrobyte | ||
Thanks to @kevlem97 for the catbee repository, which was used as a reference for this project. | ||
GNU General Public License v3.0 | ||
Usage: cativity.py catsniffer [options] | ||
""" | ||
import os | ||
import sys | ||
import logging | ||
import threading | ||
import queue | ||
import time | ||
import typer | ||
from modules.utils import UsageError | ||
from modules.catsniffer import Sniffer, TISnifferPacket | ||
from modules.graphs import Graphs | ||
from modules.network import Network | ||
|
||
CHANNEL_HOPPING_INTERVAL = 3.5 | ||
SCRIPT_NAME = os.path.basename(sys.argv[0]) | ||
|
||
logging.basicConfig( | ||
handlers=[logging.FileHandler("cativity.log"), logging.StreamHandler()], | ||
level="WARNING", | ||
format="%(asctime)s %(levelname)-8s %(message)s", | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
|
||
class Cativity: | ||
def __init__(self): | ||
self.app = typer.Typer( | ||
name="CativityDetector", | ||
help="A tool to analyze the channel activity for Zigbee Networks", | ||
add_completion=False, | ||
no_args_is_help=True, | ||
epilog="Hack the Bee!", | ||
) | ||
self.app.command()(self.main) | ||
self.logger = logging.getLogger(SCRIPT_NAME) | ||
|
||
self.catsniffer = Sniffer(logger=logging.getLogger("CatSniffer")) | ||
self.grapher = Graphs() | ||
self.network = Network() | ||
self.capture_started = True | ||
self.packet_received = queue.Queue() | ||
self.channel_activity = {} | ||
self.fixed_channel = False | ||
self.__init_channel_map() | ||
|
||
def __init_channel_map(self): | ||
for channel in range(11, 27): | ||
self.channel_activity[channel] = 0 | ||
|
||
def __print_banner(self): | ||
typer.secho( | ||
""" | ||
____ _ _ _ _ ____ _ _ | ||
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \ ___| |_ ___ ___| |_ ___ _ __ | ||
| | / _` | __| \ \ / / | __| | | | | | |/ _ \ __/ _ \/ __| __/ _ \| '__| | ||
| |__| (_| | |_| |\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | | | ||
\____\__,_|\__|_| \_/ |_|\__|\__, |____/ \___|\__\___|\___|\__\___/|_| | ||
|___/ | ||
""", | ||
fg=typer.colors.BRIGHT_YELLOW, | ||
) | ||
typer.secho( | ||
"A tool to analyze the channel activity fro Zigbee Networks", | ||
fg=typer.colors.BRIGHT_CYAN, | ||
) | ||
typer.secho("Author: astrobyte", fg=typer.colors.BRIGHT_CYAN) | ||
typer.secho("Version: 1.0", fg=typer.colors.BRIGHT_CYAN) | ||
typer.secho("\n") | ||
|
||
def channel_handler(self): | ||
while self.capture_started: | ||
if self.fixed_channel: | ||
if len(self.packet_received.queue) > 0: | ||
self.channel_activity[self.catsniffer.channel] = len( | ||
self.packet_received.queue | ||
) | ||
self.grapher.update_graph_value(self.channel_activity) | ||
else: | ||
for channel in range(11, 27): | ||
self.catsniffer.change_channel(channel) | ||
self.grapher.update_channel(channel) | ||
time.sleep(CHANNEL_HOPPING_INTERVAL) | ||
if self.channel_activity[channel] == 0: | ||
self.channel_activity[channel] = len(self.packet_received.queue) | ||
else: | ||
self.channel_activity[channel] += len( | ||
self.packet_received.queue | ||
) | ||
|
||
self.grapher.update_graph_value(self.channel_activity) | ||
self.packet_received.queue.clear() | ||
|
||
def main( | ||
self, | ||
catsniffer: str = typer.Argument( | ||
help="Serial path to the CatSniffer", | ||
default=Sniffer.find_catsniffer_serial_port(), | ||
), | ||
channel: int = typer.Option( | ||
None, help="Channel to start the sniffer", show_default=True | ||
), | ||
topology: bool = typer.Option( | ||
False, help="Show the network topology", show_default=True | ||
), | ||
): | ||
if catsniffer is None: | ||
raise UsageError("Please provide the serial path to the CatSniffer") | ||
self.catsniffer.set_serial_path(catsniffer) | ||
|
||
if channel is not None: | ||
if channel < 11 or channel > 26: | ||
raise UsageError( | ||
"Invalid channel. Please provide a channel between 11 and 26" | ||
) | ||
self.catsniffer.set_channel(channel) | ||
if not topology: | ||
self.grapher.update_channel(channel) | ||
self.fixed_channel = True | ||
self.channel_activity = {} | ||
self.channel_activity[channel] = 0 | ||
|
||
self.__print_banner() | ||
self.catsniffer.start_sniffer() | ||
|
||
if topology: | ||
typer.secho( | ||
"Starting network topology analysis...", fg=typer.colors.BRIGHT_YELLOW | ||
) | ||
typer.secho( | ||
f"Channel sniffing: {self.catsniffer.channel}\n", | ||
fg=typer.colors.BRIGHT_YELLOW, | ||
) | ||
topology_threat = threading.Thread( | ||
target=self.grapher.create_topology_graph, daemon=True | ||
) | ||
topology_threat.start() | ||
else: | ||
channel_threat = threading.Thread(target=self.channel_handler, daemon=True) | ||
channel_threat.start() | ||
grapher_threat = threading.Thread( | ||
target=self.grapher.create_channel_graph, daemon=True | ||
) | ||
grapher_threat.start() | ||
|
||
self.grapher.update_graph_value(self.channel_activity) | ||
|
||
while self.capture_started: | ||
packet = self.catsniffer.recv() | ||
if packet is not None: | ||
tisniffer_packet = TISnifferPacket(packet) | ||
if tisniffer_packet.is_command_response(): | ||
continue | ||
self.packet_received.put(tisniffer_packet.payload) | ||
if topology: | ||
dissected_packet = self.network.dissect_packet( | ||
tisniffer_packet.payload | ||
) | ||
if dissected_packet is not None: | ||
self.grapher.update_topology_packets(dissected_packet) | ||
|
||
if topology: | ||
topology_threat.join() | ||
else: | ||
channel_threat.join() | ||
self.grapher.stop() | ||
grapher_threat.join() | ||
|
||
def stop(self): | ||
self.capture_started = False | ||
self.catsniffer.stop_sniffer() | ||
typer.secho("\nExiting...", fg=typer.colors.BRIGHT_RED) | ||
typer.secho("Happy Hacking!", fg=typer.colors.BRIGHT_YELLOW) | ||
|
||
|
||
if __name__ == "__main__": | ||
catbee = Cativity() | ||
try: | ||
catbee.app() | ||
except UsageError as e: | ||
os._exit(1) | ||
except KeyboardInterrupt: | ||
catbee.stop() | ||
os._exit(0) | ||
except Exception as e: | ||
typer.echo(f"Error: {e}") | ||
os._exit(1) |
Empty file.
Oops, something went wrong.