From ccc0196472e8066d302da2bc6644884356fd6cc9 Mon Sep 17 00:00:00 2001 From: "Arend-Jan van Hilten (TU)" Date: Wed, 5 Jun 2024 22:45:55 +0200 Subject: [PATCH] style fixes --- .../scripts/ROS_telemetrix_aio_api.py | 21 +++- mirte_telemetrix/scripts/modules/Oled.py | 101 ++++++++++-------- 2 files changed, 73 insertions(+), 49 deletions(-) diff --git a/mirte_telemetrix/scripts/ROS_telemetrix_aio_api.py b/mirte_telemetrix/scripts/ROS_telemetrix_aio_api.py index 4d0c69f2..cf87a896 100755 --- a/mirte_telemetrix/scripts/ROS_telemetrix_aio_api.py +++ b/mirte_telemetrix/scripts/ROS_telemetrix_aio_api.py @@ -26,9 +26,10 @@ # devices = rospy.get_param("mirte/device") global_data = { -"current_soc" : "???" # TODO: change to something better, but for now we communicate SOC from the powerwatcher to the Oled using a global + "current_soc": "???" # TODO: change to something better, but for now we communicate SOC from the powerwatcher to the Oled using a global } + # Until we update our own fork of TelemtrixAIO to the renamed pwm calls # we need to add a simple wrapper async def set_pin_mode_analog_output(board, pin): @@ -783,9 +784,23 @@ def actuators(loop, board, device): if "name" not in oled_settings: oled_settings["name"] = oled if "type" in oled_settings and oled_settings["type"] == "module": - oled_obj = Oled.Oled_module(board, oled, oleds[oled], board_mapping, global_data=global_data, loop=loop ) + oled_obj = Oled.Oled_module( + board, + oled, + oleds[oled], + board_mapping, + global_data=global_data, + loop=loop, + ) else: - oled_obj = Oled.Oled(board,oled, oleds[oled],board_mapping, global_data=global_data, port=oled_id, loop=loop + oled_obj = Oled.Oled( + board, + oled, + oleds[oled], + board_mapping, + global_data=global_data, + port=oled_id, + loop=loop, ) # get_pin_numbers(oleds[oled])) oled_id = oled_id + 1 servers.append(loop.create_task(oled_obj.start())) diff --git a/mirte_telemetrix/scripts/modules/Oled.py b/mirte_telemetrix/scripts/modules/Oled.py index f0369954..6cee1989 100644 --- a/mirte_telemetrix/scripts/modules/Oled.py +++ b/mirte_telemetrix/scripts/modules/Oled.py @@ -13,8 +13,10 @@ from mirte_msgs.srv import SetOLEDImage, SetOLEDImageRequest, SetOLEDImageResponse import subprocess import asyncio + # Extended adafruit _SSD1306 + class Oled_interface: async def start_int(self): self.server = rospy.Service( @@ -25,6 +27,7 @@ async def start_int(self): self.default_image = True self.default_timer = rospy.Timer(rospy.Duration(10), self.show_default) await self.show_default_async() + async def show_default_async(self): if not self.enabled: self.default_timer.shutdown() @@ -50,7 +53,7 @@ async def show_default_async(self): text += f"\nSOC: {self.global_data['current_soc']}%" if "show_time" in self.oled_obj and self.oled_obj["show_time"]: any_show = True - date = subprocess.getoutput('date +\"%Y-%m-%dT%H:%M:%S%:::z\"').strip() + date = subprocess.getoutput('date +"%Y-%m-%dT%H:%M:%S%:::z"').strip() text += f"\n{date}" if len(text) > 0: await self.set_oled_image_service_async( @@ -61,11 +64,14 @@ async def show_default_async(self): await self.show_png( "/usr/local/src/mirte/mirte-oled-images/images/mirte_logo_inv.png" ) # open color image - except Exception as e: # probably that that image does not exist or the oled is broken. + except ( + Exception + ) as e: # probably that that image does not exist or the oled is broken. pass + async def set_oled_image_service_async(self, req): - if req.type =="text": - return await self.set_oled_text_async(req.value.replace('\\n', '\n')) + if req.type == "text": + return await self.set_oled_text_async(req.value.replace("\\n", "\n")) if req.type == "image": return await self.show_png( "/usr/local/src/mirte/mirte-oled-images/images/" + req.value + ".png" @@ -83,13 +89,15 @@ async def set_oled_image_service_async(self, req): ] ) for i in range(number_of_images): - if not (await self.show_png(folder + req.value + "_" + str(i) + ".png")): + if not ( + await self.show_png(folder + req.value + "_" + str(i) + ".png") + ): return False return True - async def set_oled_text_async(self, text): print("todo setoled text async") + def set_oled_image_service(self, req): self.default_image = False if not self.enabled: @@ -107,8 +115,10 @@ def set_oled_image_service(self, req): except Exception as e: print(e) return False + async def start(self): print("todo start") + def show_default(self, event=None): if not self.default_image: self.default_timer.shutdown() @@ -121,30 +131,33 @@ def show_default(self, event=None): ) except Exception as e: print(e) + async def show_png(self, file): print("todo show png") + + # class Oled_module: class Oled_module(Oled_interface): def __init__( - self, - board, - module_name, - module, - board_mapping, - loop, - global_data, # obj with some shared data, like SOC - addr=0x3C, # not yet possible with modules - external_vcc=False, - reset=None, - ): + self, + board, + module_name, + module, + board_mapping, + loop, + global_data, # obj with some shared data, like SOC + addr=0x3C, # not yet possible with modules + external_vcc=False, + reset=None, + ): self.board = board self.oled_obj = module self.addr = addr self.loop = loop self.global_data = global_data - width = 128 # hardcoded in the pico code + width = 128 # hardcoded in the pico code height = 64 self.init_awaits = [] self.enabled = True @@ -172,7 +185,7 @@ def __init__( else: print("oled module probably not supported on your hardware!!!!") self.init_awaits.append(self.board.set_pin_mode_i2c(i2c_port=self.i2c_port)) - + async def start(self): for ev in self.init_awaits: try: # catch set_pin_mode_i2c already for this port @@ -184,20 +197,21 @@ async def start(self): await self.update_functions["send_text"]("Starting ROS...") await self.start_int() - - async def set_oled_text_async(self, text): - if(not self.enabled): + if not self.enabled: return False print("udpate text") - ok = await self.update_functions["send_text"](text.replace('\\n', '\n')) + ok = await self.update_functions["send_text"](text.replace("\\n", "\n")) if not ok: self.enabled = False return ok + def show(self): print("unused?") + async def show_async(self): print("todo show async") + async def show_png(self, file): image_file = Image.open(file) # open color image image_file = image_file.convert("1", dither=Image.NONE) @@ -213,7 +227,7 @@ def __init__( board_mapping, port, loop, - global_data, # obj with some shared data, like SOC + global_data, # obj with some shared data, like SOC addr=0x3C, external_vcc=False, reset=None, @@ -228,7 +242,7 @@ def __init__( self.init_awaits = [] self.write_commands = [] self.global_data = global_data - width = 128 # hardcoded in the pico code + width = 128 # hardcoded in the pico code height = 64 # Add an extra byte to the data buffer to hold an I2C data/command byte # to use hardware-compatible I2C transactions. A memoryview of the @@ -284,7 +298,6 @@ async def start(self): self.enabled = False return await self.start_int() - def show_default(self, event=None): if not self.default_image: @@ -298,26 +311,23 @@ def show_default(self, event=None): except Exception as e: print(e) - async def set_oled_text_async(self, text): - text = text.replace("\\n", "\n") - image = Image.new("1", (128, 64)) - draw = ImageDraw.Draw(image) - split_text = text.splitlines() - lines = [] - for i in split_text: - lines.extend(textwrap.wrap(i, width=20)) - - y_text = 1 - for line in lines: - width, height = font.getsize(line) - draw.text((1, y_text), line, font=font, fill=255) - y_text += height - self.image(image) - await self.show_async() - return self.enabled - - + text = text.replace("\\n", "\n") + image = Image.new("1", (128, 64)) + draw = ImageDraw.Draw(image) + split_text = text.splitlines() + lines = [] + for i in split_text: + lines.extend(textwrap.wrap(i, width=20)) + + y_text = 1 + for line in lines: + width, height = font.getsize(line) + draw.text((1, y_text), line, font=font, fill=255) + y_text += height + self.image(image) + await self.show_async() + return self.enabled def show(self): """Update the display""" @@ -409,4 +419,3 @@ async def show_png(self, file): image_file = image_file.convert("1", dither=Image.NONE) self.image(image_file) await self.show_async() -