Skip to content

Commit

Permalink
- remove defaults from LimitSwitchStaus and PlatformStatus
Browse files Browse the repository at this point in the history
- added HopperDoorState and LatchState to FlexStacker module
- replaced hopperDoorClosed with hopperDoorState in live data
- replaced axisStateL with LatchState in live data
- use move_axis instead of home_axis for open_latch and close_latch
- set find_dfu_device expected_device_count to 3
- cleanup
  • Loading branch information
vegano1 committed Jan 9, 2025
1 parent 83b5b7d commit a6703e2
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 44 deletions.
14 changes: 7 additions & 7 deletions api/src/opentrons/drivers/flex_stacker/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def distance(self, distance: float) -> float:
class LimitSwitchStatus:
"""Stacker Limit Switch Statuses."""

XE: bool = False
XR: bool = False
ZE: bool = False
ZR: bool = False
LR: bool = False
XE: bool
XR: bool
ZE: bool
ZR: bool
LR: bool

@classmethod
def get_fields(cls) -> List[str]:
Expand All @@ -128,8 +128,8 @@ def get(self, axis: StackerAxis, direction: Direction) -> bool:
class PlatformStatus:
"""Stacker Platform Statuses."""

E: bool = False
R: bool = False
E: bool
R: bool

@classmethod
def get_fields(cls) -> List[str]:
Expand Down
74 changes: 60 additions & 14 deletions api/src/opentrons/hardware_control/modules/flex_stacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from opentrons.hardware_control.modules import mod_abc, update
from opentrons.hardware_control.modules.types import (
FlexStackerStatus,
HopperDoorState,
LatchState,
ModuleDisconnectedCallback,
ModuleType,
PlatformState,
Expand All @@ -37,6 +39,9 @@

DFU_PID = "df11"

# Distance in mm the latch can travel to open/close
LATCH_TRAVEL = 25.0


class FlexStacker(mod_abc.AbstractModule):
"""Hardware control interface for an attached Flex-Stacker module."""
Expand Down Expand Up @@ -125,6 +130,7 @@ def __init__(
self._driver = driver
self._reader = reader
self._poller = poller
self._stacker_status = FlexStackerStatus.IDLE

async def cleanup(self) -> None:
"""Stop the poller task"""
Expand All @@ -148,30 +154,34 @@ def _model_from_revision(revision: Optional[str]) -> str:
def model(self) -> str:
return self._model_from_revision(self._device_info.get("model"))

@property
def latch_state(self) -> LatchState:
"""The state of the latch."""
return LatchState.from_state(self.limit_switch_status[StackerAxis.L])

@property
def platform_state(self) -> PlatformState:
"""The state of the platform."""
return self._reader.platform_state

@property
def hopper_door_state(self) -> HopperDoorState:
"""The status of the hopper door."""
return HopperDoorState.from_state(self._reader.hopper_door_closed)

@property
def limit_switch_status(self) -> Dict[StackerAxis, StackerAxisState]:
"""The status of the Limit switches."""
return self._reader.limit_switch_status

@property
def hopper_door_closed(self) -> bool:
"""The status of the hopper door."""
return self._reader.hopper_door_closed

@property
def device_info(self) -> Mapping[str, str]:
return self._device_info

@property
def status(self) -> FlexStackerStatus:
"""Module status or error state details."""
# TODO: Implement getting device status from the stacker
return FlexStackerStatus.IDLE
return self._stacker_status

@property
def is_simulated(self) -> bool:
Expand All @@ -182,11 +192,11 @@ def live_data(self) -> LiveData:
return {
"status": self.status.value,
"data": {
"latchState": self.latch_state.value,
"platformState": self.platform_state.value,
"hopperDoorState": self.hopper_door_state.value,
"axisStateX": self.limit_switch_status[StackerAxis.X].value,
"axisStateZ": self.limit_switch_status[StackerAxis.Z].value,
"axisStateL": self.limit_switch_status[StackerAxis.L].value,
"hopperDoorClosed": self.hopper_door_closed,
"errorDetails": self._reader.error,
},
}
Expand All @@ -195,7 +205,8 @@ async def prep_for_update(self) -> str:
await self._poller.stop()
await self._driver.stop_motors()
await self._driver.enter_programming_mode()
dfu_info = await update.find_dfu_device(pid=DFU_PID, expected_device_count=2)
# flex stacker has three unique "devices" over DFU
dfu_info = await update.find_dfu_device(pid=DFU_PID, expected_device_count=3)
return dfu_info

def bootloader(self) -> UploadFunction:
Expand Down Expand Up @@ -252,11 +263,24 @@ async def close_latch(
acceleration: Optional[float] = None,
) -> bool:
"""Close the latch, dropping any labware its holding."""
# Dont move the latch if its already closed.
if self.limit_switch_status[StackerAxis.L] == StackerAxisState.EXTENDED:
return True
motion_params = STACKER_MOTION_CONFIG[StackerAxis.L]["move"]
speed = velocity or motion_params.max_speed
accel = acceleration or motion_params.acceleration
return await self.home_axis(
StackerAxis.L, Direction.RETRACT, speed=speed, acceleration=accel
success = await self.move_axis(
StackerAxis.L,
Direction.RETRACT,
distance=LATCH_TRAVEL,
speed=speed,
acceleration=accel,
)
# Check that the latch is closed.
await self._reader.get_limit_switch_status()
return (
success
and self.limit_switch_status[StackerAxis.L] == StackerAxisState.EXTENDED
)

async def open_latch(
Expand All @@ -265,12 +289,34 @@ async def open_latch(
acceleration: Optional[float] = None,
) -> bool:
"""Open the latch."""
# Dont move the latch if its already opened.
if self.limit_switch_status[StackerAxis.L] == StackerAxisState.RETRACTED:
return True
motion_params = STACKER_MOTION_CONFIG[StackerAxis.L]["move"]
speed = velocity or motion_params.max_speed
accel = acceleration or motion_params.acceleration
return await self.home_axis(
StackerAxis.L, Direction.EXTENT, speed=speed, acceleration=accel
success = await self.move_axis(
StackerAxis.L,
Direction.EXTENT,
distance=LATCH_TRAVEL,
speed=speed,
acceleration=accel,
)
# Check that the latch is opened.
await self._reader.get_limit_switch_status()
return (
success
and self.limit_switch_status[StackerAxis.L] == StackerAxisState.RETRACTED
)

# NOTE: We are defining the interface, will implement in seperate pr.
async def dispense(self) -> bool:
"""Dispenses the next labware in the stacker."""
return True

async def store(self) -> bool:
"""Stores a labware in the stacker."""
return True


class FlexStackerReader(Reader):
Expand Down
45 changes: 31 additions & 14 deletions api/src/opentrons/hardware_control/modules/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ class LidStatus(str, Enum):

class FlexStackerStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
DISPENSING = "dispensing"
STORING = "storing"
ERROR = "error"


Expand All @@ -259,10 +260,10 @@ class PlatformState(str, Enum):
def from_status(cls, status: PlatformStatus) -> "PlatformState":
"""Get the state from the platform status."""
if status.E and not status.R:
return PlatformState.EXTENDED
return cls.EXTENDED
if status.R and not status.E:
return PlatformState.RETRACTED
return PlatformState.UNKNOWN
return cls.RETRACTED
return cls.UNKNOWN


class StackerAxisState(str, Enum):
Expand All @@ -278,18 +279,34 @@ def from_status(
match axis:
case StackerAxis.X:
if status.XE and not status.XR:
return StackerAxisState.EXTENDED
return cls.EXTENDED
if status.XR and not status.XE:
return StackerAxisState.RETRACTED
return cls.RETRACTED
case StackerAxis.Z:
if status.ZE and not status.ZR:
return StackerAxisState.EXTENDED
return cls.EXTENDED
if status.ZR and not status.ZE:
return StackerAxisState.RETRACTED
return cls.RETRACTED
case StackerAxis.L:
return (
StackerAxisState.EXTENDED
if status.LR
else StackerAxisState.RETRACTED
)
return StackerAxisState.UNKNOWN
return cls.EXTENDED if status.LR else cls.RETRACTED
return cls.UNKNOWN


class LatchState(str, Enum):
CLOSED = "closed"
OPENED = "opened"

@classmethod
def from_state(cls, state: StackerAxisState) -> "LatchState":
"""Get the latch state from the axis state."""
return cls.CLOSED if state == StackerAxisState.EXTENDED else cls.OPENED


class HopperDoorState(str, Enum):
CLOSED = "closed"
OPENED = "opened"

@classmethod
def from_state(cls, state: bool) -> "HopperDoorState":
"""Get the hopper door state from the door state boolean."""
return cls.CLOSED if state else cls.OPENED
7 changes: 5 additions & 2 deletions robot-server/robot_server/modules/module_data_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from opentrons.drivers.rpi_drivers.types import USBPort as HardwareUSBPort

from opentrons.hardware_control.modules.types import HopperDoorState, LatchState
from opentrons.protocol_engine import ModuleModel, DeckType

from .module_identifier import ModuleIdentity
Expand Down Expand Up @@ -164,13 +165,15 @@ def map_data(
module_cls = FlexStackerModule
module_data = FlexStackerModuleData(
status=FlexStackerStatus(live_data["status"]),
latchState=cast(LatchState, live_data["data"].get("latchState")),
platformState=cast(
PlatformState, live_data["data"].get("platformState")
),
hopperDoorState=cast(
HopperDoorState, live_data["data"].get("hopperDoorState")
),
axisStateX=cast(StackerAxisState, live_data["data"].get("axisStateX")),
axisStateZ=cast(StackerAxisState, live_data["data"].get("axisStateZ")),
axisStateL=cast(StackerAxisState, live_data["data"].get("axisStateL")),
hopperDoorClosed=cast(bool, live_data["data"].get("hopperDoorClosed")),
)
else:
assert False, f"Invalid module type {module_type}"
Expand Down
12 changes: 5 additions & 7 deletions robot-server/robot_server/modules/module_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
AbsorbanceReaderLidStatus,
AbsorbanceReaderPlatePresence,
)
from opentrons.hardware_control.modules.types import HopperDoorState, LatchState
from opentrons.protocol_engine import ModuleModel
from opentrons.protocol_engine.types import Vec3f

Expand Down Expand Up @@ -355,24 +356,21 @@ class AbsorbanceReaderModule(
class FlexStackerModuleData(BaseModel):
"""Live data from a Flex Stacker module."""

# TODO: add rest of data
status: str = Field(
...,
description="Overall status of the module.",
)
latchState: LatchState = Field(..., description="The state of the labware latch.")
platformState: PlatformState = Field(..., description="The state of the platform.")
hopperDoorState: HopperDoorState = Field(
..., description="The state of the hopper door."
)
axisStateX: StackerAxisState = Field(
..., description="The state of the X axis limit switches."
)
axisStateZ: StackerAxisState = Field(
..., description="The state of the Z axis limit switches."
)
axisStateL: StackerAxisState = Field(
..., description="The state of the L axis limit switches."
)
hopperDoorClosed: bool = Field(
..., description="Whether the hopper door is closed."
)


class FlexStackerModule(
Expand Down

0 comments on commit a6703e2

Please sign in to comment.