Skip to content

Commit

Permalink
feat: Start Lora Spectrum
Browse files Browse the repository at this point in the history
  • Loading branch information
JahazielLem committed Jan 20, 2025
1 parent 4136116 commit c6cea72
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
*.pyc
*.hex
*.txt
build/
*.egg-info
101 changes: 101 additions & 0 deletions catlora_sdr/catlora.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import sys
import time
import threading
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from modules.hardware import Board as Catsniffer

DEFAULT_COLOR_MAP = "viridis"
DEFAULT_RSSI_OFFSET = -11
SCAN_WIDTH = 33 # Asumiendo que hay 33 muestras por escaneo

DEFAULT_START_FERQ = 868
DEFAULT_END_FREQ = 928

class SpectrumScan:
def __init__(self):
self.catsniffer = Catsniffer()
self.recv_running = False
self.no_bytes_count = 0
self.fig, self.ax = plt.subplots(figsize=(12, 6))
self.frequency_list = []
self.data_matrix = np.zeros((33, 600)) # Ajusta el tamaño según tu necesidad
self.im = None
self.start_freq = DEFAULT_START_FERQ
self.end_freq = DEFAULT_END_FREQ
self.num_ticks = 50

def __data_dissector(self, plot_data):
raw_data = plot_data[len("@S"):-len("@E")]
freq, data = raw_data.split(";")
freq = float(freq)
if freq not in self.frequency_list:
self.frequency_list.append(freq)

data = list(map(int, data.split(",")[:-1]))
index = int(round((freq - self.start_freq) / (self.end_freq - self.start_freq) * (self.num_ticks*12 - 1)))
self.data_matrix[:, index] = data

def stop_task(self):
self.recv_running = False

def recv_task(self):
print("Collecting")
while self.recv_running:
bytestream = self.catsniffer.recv()
if bytestream == b"":
self.no_bytes_count += 1
if self.no_bytes_count > 5:
self.no_bytes_count = 0
print("No com")
self.stop_task()
continue

bytestream = bytestream.decode().strip()
if bytestream.startswith("@S"):
print(f"> {bytestream}")
self.__data_dissector(bytestream)

def create_plot(self):
self.ax.set_ylabel('RSSI [dBm]')
self.ax.set_xlabel('Frequency (Hz)')
self.ax.set_xlim(self.start_freq-5, self.end_freq)
precise_ticks = np.linspace(self.start_freq, self.end_freq, self.num_ticks)
self.ax.set_xticks(precise_ticks)
self.ax.set_xticklabels([f"{tick:.1f}" for tick in precise_ticks], rotation=45)
self.ax.set_aspect('auto')
self.fig.suptitle("Catsniffer LoRa Spectral Scan")
self.fig.canvas.manager.set_window_title("PWNLabs - ElectroniCats")
self.im = self.ax.imshow(self.data_matrix, cmap=DEFAULT_COLOR_MAP, aspect='auto', extent=[self.start_freq, self.end_freq, -4*34, DEFAULT_RSSI_OFFSET])
self.fig.colorbar(self.im)

def show_plot(self, i):
self.im.set_data(self.data_matrix)
self.ax.relim()
self.ax.autoscale_view()

def main(self):
self.recv_running = True
serial_path = self.catsniffer.find_catsniffer_serial_port()
print(serial_path)
self.catsniffer.set_serial_path(serial_path)
recv_worker = threading.Thread(target=self.recv_task, daemon=True)
recv_worker.start()
self.create_plot()
ani = animation.FuncAnimation(self.fig, self.show_plot, interval=100)
plt.show()
while self.recv_running:
time.sleep(0.1)

recv_worker.join()
self.catsniffer.close()
print("Exited")

if __name__ == "__main__":
sc = SpectrumScan()
try:
sc.main()
except KeyboardInterrupt:
sc.stop_task()
sys.exit(0)
Empty file added catlora_sdr/modules/__init__.py
Empty file.
90 changes: 90 additions & 0 deletions catlora_sdr/modules/hardware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import serial
import logging
import platform
import serial.tools.list_ports

BOARD_BAUDRATE = 921600
CATSNIFFER_VID = 11914
CATSNIFFER_PID = 192

if platform.system() == "Windows":
DEFAULT_COMPORT = "COM1"
elif platform.system() == "Darwin":
DEFAULT_COMPORT = "/dev/tty.usbmodem0001"
else:
DEFAULT_COMPORT = "/dev/ttyACM0"

class SerialError(Exception):
pass

class Board:
def __init__(self):
self.serial_worker = serial.Serial()
self.serial_path = None
self.serial_worker.baudrate = BOARD_BAUDRATE
self.serial_worker.timeout = 2

def __del__(self):
self.serial_worker.close()

def __str__(self):
return "Board(serial_path={}, baudrate={})".format(
self.serial_path, self.serial_worker.baudrate
)

def set_serial_path(self, serial_path):
self.serial_path = serial_path
self.serial_worker.port = self.serial_path

def set_serial_baudrate(self, baudrate):
self.serial_worker.baudrate = baudrate

def reset_buffer(self):
self.serial_worker.reset_input_buffer()
self.serial_worker.reset_output_buffer()

def write(self, data):
if self.serial_worker.is_open:
self.serial_worker.write(data)

def is_connected(self):
return self.serial_worker.is_open

def open(self) -> bool:
if not self.is_connected():
try:
self.serial_worker.open()
self.reset_buffer()
except serial.SerialException as e:
print("Error opening serial port: %s", e)
raise SerialError("Error opening serial port")

def close(self):
if self.is_connected():
try:
self.reset_buffer()
self.serial_worker.close()
except serial.SerialException as e:
print("Error closing serial port: %s", e)
raise e

def recv(self):
if not self.is_connected():
self.open()

try:
bytestream = self.serial_worker.readline()
return bytestream
except serial.SerialTimeoutException:
return "TIMEOUT"
except serial.SerialException as e:
print("Error reading from serial port: %s", e)
raise e

@staticmethod
def find_catsniffer_serial_port():
ports = serial.tools.list_ports.comports()
for port in ports:
if port.vid == CATSNIFFER_VID and port.pid == CATSNIFFER_PID:
return port.device
return DEFAULT_COMPORT

0 comments on commit c6cea72

Please sign in to comment.