forked from mayeranalytics/pySX127x
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbeacon.py
130 lines (92 loc) · 3.26 KB
/
beacon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
""" Beacon Class """
""" Thomas Verbeke - [email protected]
Send out data + counter value
Tested with Embed LoRa Module (SX1276 Semtech chip)
"""
import sys
from time import sleep
from SX127x.LoRa import *
from SX127x.board_config import BOARD, GPIO
BOARD.setup() # setup GPIO's
class LoRaBeacon(LoRa):
tx_counter = 0
def __init__(self, verbose=False):
super(LoRaBeacon, self).__init__(verbose)
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([1,0,0,0,0,0]) #DIO0 is set to TxDone
""" DIO Mapping DIO5 DIO4 DIO3 DIO2 DIO1 DIO0
ModeReady CadDetected CadDone FhssChangeChannel RxTimeout TxDone
"""
def on_rx_done(self): # will not be called trough DIO (because it is mapped to TxDone)
print("\n(RxDone) Packet Received")
print(self.get_irq_flags())
print('num bytes payload', self.get_rx_nb_bytes())
payload = self.read_payload(nocheck=True)
print('Payload:', payload,map(hex, payload))
decoded_msg_str = ''.join(map(chr,payload)) # convert byte to string and join
print (decoded_msg_str)
def on_tx_done(self):
global msg
print("\n(TxDone) Interrupt generated")
self.tx_counter +=1
sleep(2)
self.set_mode(MODE.STDBY)
sleep(0.001)
self.clear_irq_flag_TxDone()
sys.stdout.flush()
b_msg = list(bytearray(msg,'utf-8'))
self.set_payload_length(len(b_msg)) # set message length
base_addr = self.get_fifo_tx_base_addr()
self.set_fifo_addr_ptr(base_addr)
self.spi.xfer([REG.LORA.FIFO | 0x80] + b_msg)[1:]
print "Sending Message %s" % self.tx_counter
self.set_mode(MODE.TX) # send Message
def on_cad_done(self):
print("\non_CadDone")
print(self.get_irq_flags())
def on_rx_timeout(self):
print("\non_RxTimeout")
print(self.get_irq_flags())
def on_valid_header(self):
print("\non_ValidHeader")
print(self.get_irq_flags())
def on_payload_crc_error(self):
print("\non_PayloadCrcError")
print(self.get_irq_flags())
def on_fhss_change_channel(self):
print("\non_FhssChangeChannel")
print(self.get_irq_flags())
def start(self):
global msg
sys.stdout.write("\rstart")
self.tx_counter = 0
b_msg = list(bytearray(msg,'utf-8'))
self.set_payload_length(len(b_msg)) # set message length
base_addr = self.get_fifo_tx_base_addr()
self.set_fifo_addr_ptr(base_addr)
self.spi.xfer([REG.LORA.FIFO | 0x80] + b_msg)[1:]
print "\rSending Message %s" % self.tx_counter
self.set_mode(MODE.TX) # send Message
while True:
sleep(0.5)
msg = "Hello"
lora = LoRaBeacon(verbose=False)
lora.set_pa_config(pa_select=0)
lora.set_coding_rate(CODING_RATE.CR4_6)
lora.set_implicit_header_mode(False)
print(lora)
try: input("Press enter to start...")
except: pass
try:
lora.start()
except KeyboardInterrupt:
sys.stdout.flush()
print("")
sys.stderr.write("KeyboardInterrupt\n")
finally:
sys.stdout.flush() # Calling sys.stdout.flush() forces it to "flush" the buffer, meaning that it will write everything in the buffer to the terminal
print("")
lora.set_mode(MODE.SLEEP)
print(lora)
BOARD.teardown() # Cleanup GPIO and SpiDev