From a6703e24a50ec820cad2dc278167feee2c08e003 Mon Sep 17 00:00:00 2001 From: vegano1 Date: Thu, 9 Jan 2025 13:36:35 -0500 Subject: [PATCH] - remove defaults from LimitSwitchStaus and PlatformStatus - added HopperDoorState and LatchState to FlexStacker module - replaced hopperDoorClosed with hopperDoorState in live data - replaced axisStateL with LatchState in live data - use move_axis instead of home_axis for open_latch and close_latch - set find_dfu_device expected_device_count to 3 - cleanup --- .../opentrons/drivers/flex_stacker/types.py | 14 ++-- .../hardware_control/modules/flex_stacker.py | 74 +++++++++++++++---- .../hardware_control/modules/types.py | 45 +++++++---- .../modules/module_data_mapper.py | 7 +- .../robot_server/modules/module_models.py | 12 ++- 5 files changed, 108 insertions(+), 44 deletions(-) diff --git a/api/src/opentrons/drivers/flex_stacker/types.py b/api/src/opentrons/drivers/flex_stacker/types.py index 2e5befd3085..9f8e8825b93 100644 --- a/api/src/opentrons/drivers/flex_stacker/types.py +++ b/api/src/opentrons/drivers/flex_stacker/types.py @@ -102,11 +102,11 @@ def distance(self, distance: float) -> float: class LimitSwitchStatus: """Stacker Limit Switch Statuses.""" - XE: bool = False - XR: bool = False - ZE: bool = False - ZR: bool = False - LR: bool = False + XE: bool + XR: bool + ZE: bool + ZR: bool + LR: bool @classmethod def get_fields(cls) -> List[str]: @@ -128,8 +128,8 @@ def get(self, axis: StackerAxis, direction: Direction) -> bool: class PlatformStatus: """Stacker Platform Statuses.""" - E: bool = False - R: bool = False + E: bool + R: bool @classmethod def get_fields(cls) -> List[str]: diff --git a/api/src/opentrons/hardware_control/modules/flex_stacker.py b/api/src/opentrons/hardware_control/modules/flex_stacker.py index 380e671d26c..5ded3b391b3 100644 --- a/api/src/opentrons/hardware_control/modules/flex_stacker.py +++ b/api/src/opentrons/hardware_control/modules/flex_stacker.py @@ -22,6 +22,8 @@ from opentrons.hardware_control.modules import mod_abc, update from opentrons.hardware_control.modules.types import ( FlexStackerStatus, + HopperDoorState, + LatchState, ModuleDisconnectedCallback, ModuleType, PlatformState, @@ -37,6 +39,9 @@ DFU_PID = "df11" +# Distance in mm the latch can travel to open/close +LATCH_TRAVEL = 25.0 + class FlexStacker(mod_abc.AbstractModule): """Hardware control interface for an attached Flex-Stacker module.""" @@ -125,6 +130,7 @@ def __init__( self._driver = driver self._reader = reader self._poller = poller + self._stacker_status = FlexStackerStatus.IDLE async def cleanup(self) -> None: """Stop the poller task""" @@ -148,21 +154,26 @@ def _model_from_revision(revision: Optional[str]) -> str: def model(self) -> str: return self._model_from_revision(self._device_info.get("model")) + @property + def latch_state(self) -> LatchState: + """The state of the latch.""" + return LatchState.from_state(self.limit_switch_status[StackerAxis.L]) + @property def platform_state(self) -> PlatformState: """The state of the platform.""" return self._reader.platform_state + @property + def hopper_door_state(self) -> HopperDoorState: + """The status of the hopper door.""" + return HopperDoorState.from_state(self._reader.hopper_door_closed) + @property def limit_switch_status(self) -> Dict[StackerAxis, StackerAxisState]: """The status of the Limit switches.""" return self._reader.limit_switch_status - @property - def hopper_door_closed(self) -> bool: - """The status of the hopper door.""" - return self._reader.hopper_door_closed - @property def device_info(self) -> Mapping[str, str]: return self._device_info @@ -170,8 +181,7 @@ def device_info(self) -> Mapping[str, str]: @property def status(self) -> FlexStackerStatus: """Module status or error state details.""" - # TODO: Implement getting device status from the stacker - return FlexStackerStatus.IDLE + return self._stacker_status @property def is_simulated(self) -> bool: @@ -182,11 +192,11 @@ def live_data(self) -> LiveData: return { "status": self.status.value, "data": { + "latchState": self.latch_state.value, "platformState": self.platform_state.value, + "hopperDoorState": self.hopper_door_state.value, "axisStateX": self.limit_switch_status[StackerAxis.X].value, "axisStateZ": self.limit_switch_status[StackerAxis.Z].value, - "axisStateL": self.limit_switch_status[StackerAxis.L].value, - "hopperDoorClosed": self.hopper_door_closed, "errorDetails": self._reader.error, }, } @@ -195,7 +205,8 @@ async def prep_for_update(self) -> str: await self._poller.stop() await self._driver.stop_motors() await self._driver.enter_programming_mode() - dfu_info = await update.find_dfu_device(pid=DFU_PID, expected_device_count=2) + # flex stacker has three unique "devices" over DFU + dfu_info = await update.find_dfu_device(pid=DFU_PID, expected_device_count=3) return dfu_info def bootloader(self) -> UploadFunction: @@ -252,11 +263,24 @@ async def close_latch( acceleration: Optional[float] = None, ) -> bool: """Close the latch, dropping any labware its holding.""" + # Dont move the latch if its already closed. + if self.limit_switch_status[StackerAxis.L] == StackerAxisState.EXTENDED: + return True motion_params = STACKER_MOTION_CONFIG[StackerAxis.L]["move"] speed = velocity or motion_params.max_speed accel = acceleration or motion_params.acceleration - return await self.home_axis( - StackerAxis.L, Direction.RETRACT, speed=speed, acceleration=accel + success = await self.move_axis( + StackerAxis.L, + Direction.RETRACT, + distance=LATCH_TRAVEL, + speed=speed, + acceleration=accel, + ) + # Check that the latch is closed. + await self._reader.get_limit_switch_status() + return ( + success + and self.limit_switch_status[StackerAxis.L] == StackerAxisState.EXTENDED ) async def open_latch( @@ -265,12 +289,34 @@ async def open_latch( acceleration: Optional[float] = None, ) -> bool: """Open the latch.""" + # Dont move the latch if its already opened. + if self.limit_switch_status[StackerAxis.L] == StackerAxisState.RETRACTED: + return True motion_params = STACKER_MOTION_CONFIG[StackerAxis.L]["move"] speed = velocity or motion_params.max_speed accel = acceleration or motion_params.acceleration - return await self.home_axis( - StackerAxis.L, Direction.EXTENT, speed=speed, acceleration=accel + success = await self.move_axis( + StackerAxis.L, + Direction.EXTENT, + distance=LATCH_TRAVEL, + speed=speed, + acceleration=accel, ) + # Check that the latch is opened. + await self._reader.get_limit_switch_status() + return ( + success + and self.limit_switch_status[StackerAxis.L] == StackerAxisState.RETRACTED + ) + + # NOTE: We are defining the interface, will implement in seperate pr. + async def dispense(self) -> bool: + """Dispenses the next labware in the stacker.""" + return True + + async def store(self) -> bool: + """Stores a labware in the stacker.""" + return True class FlexStackerReader(Reader): diff --git a/api/src/opentrons/hardware_control/modules/types.py b/api/src/opentrons/hardware_control/modules/types.py index 52aaefd2312..a43c7046a6b 100644 --- a/api/src/opentrons/hardware_control/modules/types.py +++ b/api/src/opentrons/hardware_control/modules/types.py @@ -246,7 +246,8 @@ class LidStatus(str, Enum): class FlexStackerStatus(str, Enum): IDLE = "idle" - RUNNING = "running" + DISPENSING = "dispensing" + STORING = "storing" ERROR = "error" @@ -259,10 +260,10 @@ class PlatformState(str, Enum): def from_status(cls, status: PlatformStatus) -> "PlatformState": """Get the state from the platform status.""" if status.E and not status.R: - return PlatformState.EXTENDED + return cls.EXTENDED if status.R and not status.E: - return PlatformState.RETRACTED - return PlatformState.UNKNOWN + return cls.RETRACTED + return cls.UNKNOWN class StackerAxisState(str, Enum): @@ -278,18 +279,34 @@ def from_status( match axis: case StackerAxis.X: if status.XE and not status.XR: - return StackerAxisState.EXTENDED + return cls.EXTENDED if status.XR and not status.XE: - return StackerAxisState.RETRACTED + return cls.RETRACTED case StackerAxis.Z: if status.ZE and not status.ZR: - return StackerAxisState.EXTENDED + return cls.EXTENDED if status.ZR and not status.ZE: - return StackerAxisState.RETRACTED + return cls.RETRACTED case StackerAxis.L: - return ( - StackerAxisState.EXTENDED - if status.LR - else StackerAxisState.RETRACTED - ) - return StackerAxisState.UNKNOWN + return cls.EXTENDED if status.LR else cls.RETRACTED + return cls.UNKNOWN + + +class LatchState(str, Enum): + CLOSED = "closed" + OPENED = "opened" + + @classmethod + def from_state(cls, state: StackerAxisState) -> "LatchState": + """Get the latch state from the axis state.""" + return cls.CLOSED if state == StackerAxisState.EXTENDED else cls.OPENED + + +class HopperDoorState(str, Enum): + CLOSED = "closed" + OPENED = "opened" + + @classmethod + def from_state(cls, state: bool) -> "HopperDoorState": + """Get the hopper door state from the door state boolean.""" + return cls.CLOSED if state else cls.OPENED diff --git a/robot-server/robot_server/modules/module_data_mapper.py b/robot-server/robot_server/modules/module_data_mapper.py index 6e61cff6f14..b425c547c22 100644 --- a/robot-server/robot_server/modules/module_data_mapper.py +++ b/robot-server/robot_server/modules/module_data_mapper.py @@ -25,6 +25,7 @@ ) from opentrons.drivers.rpi_drivers.types import USBPort as HardwareUSBPort +from opentrons.hardware_control.modules.types import HopperDoorState, LatchState from opentrons.protocol_engine import ModuleModel, DeckType from .module_identifier import ModuleIdentity @@ -164,13 +165,15 @@ def map_data( module_cls = FlexStackerModule module_data = FlexStackerModuleData( status=FlexStackerStatus(live_data["status"]), + latchState=cast(LatchState, live_data["data"].get("latchState")), platformState=cast( PlatformState, live_data["data"].get("platformState") ), + hopperDoorState=cast( + HopperDoorState, live_data["data"].get("hopperDoorState") + ), axisStateX=cast(StackerAxisState, live_data["data"].get("axisStateX")), axisStateZ=cast(StackerAxisState, live_data["data"].get("axisStateZ")), - axisStateL=cast(StackerAxisState, live_data["data"].get("axisStateL")), - hopperDoorClosed=cast(bool, live_data["data"].get("hopperDoorClosed")), ) else: assert False, f"Invalid module type {module_type}" diff --git a/robot-server/robot_server/modules/module_models.py b/robot-server/robot_server/modules/module_models.py index 44fd524b471..fb56e9e3fbf 100644 --- a/robot-server/robot_server/modules/module_models.py +++ b/robot-server/robot_server/modules/module_models.py @@ -21,6 +21,7 @@ AbsorbanceReaderLidStatus, AbsorbanceReaderPlatePresence, ) +from opentrons.hardware_control.modules.types import HopperDoorState, LatchState from opentrons.protocol_engine import ModuleModel from opentrons.protocol_engine.types import Vec3f @@ -355,24 +356,21 @@ class AbsorbanceReaderModule( class FlexStackerModuleData(BaseModel): """Live data from a Flex Stacker module.""" - # TODO: add rest of data status: str = Field( ..., description="Overall status of the module.", ) + latchState: LatchState = Field(..., description="The state of the labware latch.") platformState: PlatformState = Field(..., description="The state of the platform.") + hopperDoorState: HopperDoorState = Field( + ..., description="The state of the hopper door." + ) axisStateX: StackerAxisState = Field( ..., description="The state of the X axis limit switches." ) axisStateZ: StackerAxisState = Field( ..., description="The state of the Z axis limit switches." ) - axisStateL: StackerAxisState = Field( - ..., description="The state of the L axis limit switches." - ) - hopperDoorClosed: bool = Field( - ..., description="Whether the hopper door is closed." - ) class FlexStackerModule(