Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitor for pins and reading loop for MCU #99

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion OpenCatPythonAPI/PetoiRobot/ardSerial.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def serialWriteByte(port, var=None):
var.insert(1, var[0][1:])
var[1:] = list(map(int, var[1:]))
in_str = token.encode() + struct.pack('b' * (len(var) - 1), *var[1:]) + '~'.encode()
elif token == 'w' or token == 'k' or token == 'X':
elif token == 'w' or token == 'k' or token == 'X' or token == 'g':
in_str = var[0] + '\n'
else:
in_str = token + '\n'
Expand Down Expand Up @@ -859,6 +859,59 @@ def refreshBox(ls):
tk.messagebox.showwarning(title=txt('Warning'), message=txt('Manual mode'))
window.mainloop()

def monitoringVoltage(ports, VoltagePin, timer, callback):
while True and len(ports):
time.sleep(timer)
voltage = send(ports, ["R", [97, VoltagePin], 0])
if callback is not None:
callback(voltage)
else:
print("Current Voltage:" + str(voltage))

def monitoringDistance(ports, trigerPin, echoPin, timer, callback):
while True and len(ports):
time.sleep(timer)
distance = send(ports, ["XU", [trigerPin, echoPin], 0])
if callback is not None:
callback(distance)
else:
print("Current Distance:" + str(distance))

def monitoringJoint(ports, jointIndex, timer, callback):
while True and len(ports):
time.sleep(timer)
if jointIndex == 0:
angel = send(ports, ["j", jointIndex])
else:
angel = send(ports, ["j", [jointIndex], 0])
if callback is not None:
callback(angel)
else:
print("Current Angel:" + str(angel))

def read_MCU_loop(PortList, callback=None):
result = send(PortList, ['gP', 0])
print("send results " + str(result))
p = list(PortList.keys())
serialObject = p[0]
while True:
try:
if PortList:
data = serialObject.main_engine.readline()
if data:
try:
decoded_data = data.decode('ISO-8859-1').strip()
if callback is not None:
callback(decoded_data)
else:
print(str(decoded_data))
except Exception as e:
logger.error(f"Error decoding serial port data: {e}")
time.sleep(0.005) # avoid high CPU usage
except Exception as e:
logger.error(f"Error reading serial port data: {e}")
break

#if need to open serial port, use objects goodPorts
goodPorts = {} # goodPorts is a dictionary, the structure is {SerialPort Object(<class 'SerialCommunication.Communication'>): portName(string), ...}

Expand All @@ -870,11 +923,33 @@ def refreshBox(ls):
returnValue = ''
timePassed = 0

'''
# Monitor callback usage sample
def voltageHanle(voltage):
if voltage <0.5:
print("Low Power Warning") # do something to handle low power

def distanceHanle(distance):
if distance <0.5:
print("Small Distance Warning") # do something to handle small distance
'''


if __name__ == '__main__':
try:
connectPort(goodPorts)
t = threading.Thread(target=keepCheckingPort, args=(goodPorts,), daemon=True)
t.start()
t1=threading.Thread(target=read_MCU_loop, args=(goodPorts, None))
t1.start()
### Monitor Threads
# t_monitor_voltage = threading.Thread(target=monitoringVoltage, args=(goodPorts, 0xA7, 60, voltageHanle), daemon=True)
# t_monitor_voltage.start()
# t_monitor_distance = threading.Thread(target=monitoringDistance, args=(goodPorts, 16, 17, 0.5, distanceHanle), daemon=True)
# t_monitor_distance.start()
# t_monitor_joint = threading.Thread(target=monitoringJoint, args=(goodPorts, 0 , 0.5, None), daemon=True)
# t_monitor_joint.start()

if len(sys.argv) >= 2:
if len(sys.argv) == 2:
cmd = sys.argv[1]
Expand Down
77 changes: 76 additions & 1 deletion serialMaster/ardSerial.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,59 @@ def refreshBox(ls):
tk.messagebox.showwarning(title=txt('Warning'), message=txt('Manual mode'))
window.mainloop()

def monitoringVoltage(ports, VoltagePin, timer, callback):
while True and len(ports):
time.sleep(timer)
voltage = send(ports, ["R", [97, VoltagePin], 0])
if callback is not None:
callback(voltage)
else:
print("Current Voltage:" + str(voltage))

def monitoringDistance(ports, trigerPin, echoPin, timer, callback):
while True and len(ports):
time.sleep(timer)
distance = send(ports, ["XU", [trigerPin, echoPin], 0])
if callback is not None:
callback(distance)
else:
print("Current Distance:" + str(distance))

def monitoringJoint(ports, jointIndex, timer, callback):
while True and len(ports):
time.sleep(timer)
if jointIndex == 0:
angel = send(ports, ["j", jointIndex])
else:
angel = send(ports, ["j", [jointIndex], 0])
if callback is not None:
callback(angel)
else:
print("Current Angel:" + str(angel))

def read_MCU_loop(PortList, callback=None):
result = send(PortList, ['gP', 0])
print("send results " + str(result))
p = list(PortList.keys())
serialObject = p[0]
while True:
try:
if PortList:
data = serialObject.main_engine.readline()
if data:
try:
decoded_data = data.decode('ISO-8859-1').strip()
if callback is not None:
callback(decoded_data)
else:
print(str(decoded_data))
except Exception as e:
logger.error(f"Error decoding serial port data: {e}")
time.sleep(0.005) # avoid high CPU usage
except Exception as e:
logger.error(f"Error reading serial port data: {e}")
break

#if need to open serial port, use objects goodPorts
goodPorts = {} # goodPorts is a dictionary, the structure is {SerialPort Object(<class 'SerialCommunication.Communication'>): portName(string), ...}

Expand All @@ -928,11 +981,33 @@ def refreshBox(ls):
returnValue = ''
timePassed = 0

'''
# Monitor callback usage sample
def voltageHanle(voltage):
if voltage <0.5:
print("Low Power Warning") # do something to handle low power

def distanceHanle(distance):
if distance <0.5:
print("Small Distance Warning") # do something to handle small distance
'''


if __name__ == '__main__':
try:
connectPort(goodPorts)
t = threading.Thread(target=keepCheckingPort, args=(goodPorts,))
t = threading.Thread(target=keepCheckingPort, args=(goodPorts,), daemon=True)
t.start()
t1=threading.Thread(target=read_MCU_loop, args=(goodPorts, None), daemon=True)
t1.start()
### Monitor Threads
# t_monitor_voltage = threading.Thread(target=monitoringVoltage, args=(goodPorts, 0xA7, 60, voltageHanle), daemon=True)
# t_monitor_voltage.start()
# t_monitor_distance = threading.Thread(target=monitoringDistance, args=(goodPorts, 16, 17, 0.5, distanceHanle), daemon=True)
# t_monitor_distance.start()
# t_monitor_joint = threading.Thread(target=monitoringJoint, args=(goodPorts, 0 , 0.5, None), daemon=True)
# t_monitor_joint.start()

if len(sys.argv) >= 2:
if len(sys.argv) == 2:
cmd = sys.argv[1]
Expand Down