Skip to content

Commit

Permalink
style fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ArendJan committed Jun 5, 2024
1 parent 29d2bb1 commit ccc0196
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 49 deletions.
21 changes: 18 additions & 3 deletions mirte_telemetrix/scripts/ROS_telemetrix_aio_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
# devices = rospy.get_param("mirte/device")

global_data = {
"current_soc" : "???" # TODO: change to something better, but for now we communicate SOC from the powerwatcher to the Oled using a global
"current_soc": "???" # TODO: change to something better, but for now we communicate SOC from the powerwatcher to the Oled using a global
}


# Until we update our own fork of TelemtrixAIO to the renamed pwm calls
# we need to add a simple wrapper
async def set_pin_mode_analog_output(board, pin):
Expand Down Expand Up @@ -783,9 +784,23 @@ def actuators(loop, board, device):
if "name" not in oled_settings:
oled_settings["name"] = oled
if "type" in oled_settings and oled_settings["type"] == "module":
oled_obj = Oled.Oled_module(board, oled, oleds[oled], board_mapping, global_data=global_data, loop=loop )
oled_obj = Oled.Oled_module(
board,
oled,
oleds[oled],
board_mapping,
global_data=global_data,
loop=loop,
)
else:
oled_obj = Oled.Oled(board,oled, oleds[oled],board_mapping, global_data=global_data, port=oled_id, loop=loop
oled_obj = Oled.Oled(
board,
oled,
oleds[oled],
board_mapping,
global_data=global_data,
port=oled_id,
loop=loop,
) # get_pin_numbers(oleds[oled]))
oled_id = oled_id + 1
servers.append(loop.create_task(oled_obj.start()))
Expand Down
101 changes: 55 additions & 46 deletions mirte_telemetrix/scripts/modules/Oled.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from mirte_msgs.srv import SetOLEDImage, SetOLEDImageRequest, SetOLEDImageResponse
import subprocess
import asyncio

# Extended adafruit _SSD1306


class Oled_interface:
async def start_int(self):
self.server = rospy.Service(
Expand All @@ -25,6 +27,7 @@ async def start_int(self):
self.default_image = True
self.default_timer = rospy.Timer(rospy.Duration(10), self.show_default)
await self.show_default_async()

async def show_default_async(self):
if not self.enabled:
self.default_timer.shutdown()
Expand All @@ -50,7 +53,7 @@ async def show_default_async(self):
text += f"\nSOC: {self.global_data['current_soc']}%"
if "show_time" in self.oled_obj and self.oled_obj["show_time"]:
any_show = True
date = subprocess.getoutput('date +\"%Y-%m-%dT%H:%M:%S%:::z\"').strip()
date = subprocess.getoutput('date +"%Y-%m-%dT%H:%M:%S%:::z"').strip()
text += f"\n{date}"
if len(text) > 0:
await self.set_oled_image_service_async(
Expand All @@ -61,11 +64,14 @@ async def show_default_async(self):
await self.show_png(
"/usr/local/src/mirte/mirte-oled-images/images/mirte_logo_inv.png"
) # open color image
except Exception as e: # probably that that image does not exist or the oled is broken.
except (
Exception
) as e: # probably that that image does not exist or the oled is broken.
pass

async def set_oled_image_service_async(self, req):
if req.type =="text":
return await self.set_oled_text_async(req.value.replace('\\n', '\n'))
if req.type == "text":
return await self.set_oled_text_async(req.value.replace("\\n", "\n"))
if req.type == "image":
return await self.show_png(
"/usr/local/src/mirte/mirte-oled-images/images/" + req.value + ".png"
Expand All @@ -83,13 +89,15 @@ async def set_oled_image_service_async(self, req):
]
)
for i in range(number_of_images):
if not (await self.show_png(folder + req.value + "_" + str(i) + ".png")):
if not (
await self.show_png(folder + req.value + "_" + str(i) + ".png")
):
return False
return True


async def set_oled_text_async(self, text):
print("todo setoled text async")

def set_oled_image_service(self, req):
self.default_image = False
if not self.enabled:
Expand All @@ -107,8 +115,10 @@ def set_oled_image_service(self, req):
except Exception as e:
print(e)
return False

async def start(self):
print("todo start")

def show_default(self, event=None):
if not self.default_image:
self.default_timer.shutdown()
Expand All @@ -121,30 +131,33 @@ def show_default(self, event=None):
)
except Exception as e:
print(e)

async def show_png(self, file):
print("todo show png")


# class Oled_module:


class Oled_module(Oled_interface):
def __init__(
self,
board,
module_name,
module,
board_mapping,
loop,
global_data, # obj with some shared data, like SOC
addr=0x3C, # not yet possible with modules
external_vcc=False,
reset=None,
):
self,
board,
module_name,
module,
board_mapping,
loop,
global_data, # obj with some shared data, like SOC
addr=0x3C, # not yet possible with modules
external_vcc=False,
reset=None,
):
self.board = board
self.oled_obj = module
self.addr = addr
self.loop = loop
self.global_data = global_data
width = 128 # hardcoded in the pico code
width = 128 # hardcoded in the pico code
height = 64
self.init_awaits = []
self.enabled = True
Expand Down Expand Up @@ -172,7 +185,7 @@ def __init__(
else:
print("oled module probably not supported on your hardware!!!!")
self.init_awaits.append(self.board.set_pin_mode_i2c(i2c_port=self.i2c_port))

async def start(self):
for ev in self.init_awaits:
try: # catch set_pin_mode_i2c already for this port
Expand All @@ -184,20 +197,21 @@ async def start(self):
await self.update_functions["send_text"]("Starting ROS...")
await self.start_int()



async def set_oled_text_async(self, text):
if(not self.enabled):
if not self.enabled:
return False
print("udpate text")
ok = await self.update_functions["send_text"](text.replace('\\n', '\n'))
ok = await self.update_functions["send_text"](text.replace("\\n", "\n"))
if not ok:
self.enabled = False
return ok

def show(self):
print("unused?")

async def show_async(self):
print("todo show async")

async def show_png(self, file):
image_file = Image.open(file) # open color image
image_file = image_file.convert("1", dither=Image.NONE)
Expand All @@ -213,7 +227,7 @@ def __init__(
board_mapping,
port,
loop,
global_data, # obj with some shared data, like SOC
global_data, # obj with some shared data, like SOC
addr=0x3C,
external_vcc=False,
reset=None,
Expand All @@ -228,7 +242,7 @@ def __init__(
self.init_awaits = []
self.write_commands = []
self.global_data = global_data
width = 128 # hardcoded in the pico code
width = 128 # hardcoded in the pico code
height = 64
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
Expand Down Expand Up @@ -284,7 +298,6 @@ async def start(self):
self.enabled = False
return
await self.start_int()


def show_default(self, event=None):
if not self.default_image:
Expand All @@ -298,26 +311,23 @@ def show_default(self, event=None):
except Exception as e:
print(e)


async def set_oled_text_async(self, text):
text = text.replace("\\n", "\n")
image = Image.new("1", (128, 64))
draw = ImageDraw.Draw(image)
split_text = text.splitlines()
lines = []
for i in split_text:
lines.extend(textwrap.wrap(i, width=20))

y_text = 1
for line in lines:
width, height = font.getsize(line)
draw.text((1, y_text), line, font=font, fill=255)
y_text += height
self.image(image)
await self.show_async()
return self.enabled


text = text.replace("\\n", "\n")
image = Image.new("1", (128, 64))
draw = ImageDraw.Draw(image)
split_text = text.splitlines()
lines = []
for i in split_text:
lines.extend(textwrap.wrap(i, width=20))

y_text = 1
for line in lines:
width, height = font.getsize(line)
draw.text((1, y_text), line, font=font, fill=255)
y_text += height
self.image(image)
await self.show_async()
return self.enabled

def show(self):
"""Update the display"""
Expand Down Expand Up @@ -409,4 +419,3 @@ async def show_png(self, file):
image_file = image_file.convert("1", dither=Image.NONE)
self.image(image_file)
await self.show_async()

0 comments on commit ccc0196

Please sign in to comment.