Skip to content

Commit

Permalink
set distance and speed for z and plunger axes
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Jan 7, 2025
1 parent 12181dd commit 606fc96
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 21 deletions.
14 changes: 14 additions & 0 deletions api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
Tuple,
Set,
TypeVar,
Literal,
Union,
)
from opentrons_shared_data.pipette.types import (
PipetteName,
Expand Down Expand Up @@ -462,3 +464,15 @@ async def set_hepa_uv_state(self, light_on: bool, uv_duration_s: int) -> bool:

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
...

async def aspirate_while_tracking(
self,
mount: OT3Mount,
z_distance: float,
z_speed: float,
plunger_distance: float,
plunger_speed: float,
direction: Union[Literal[1], Literal[-1]],
duration: float,
) -> None:
...
12 changes: 8 additions & 4 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,10 @@ def _build_move_gear_axis_runner(
async def aspirate_while_tracking(
self,
mount: OT3Mount,
distance: float,
speed: float,
z_distance: float,
z_speed: float,
plunger_distance: float,
plunger_speed: float,
direction: Union[Literal[1], Literal[-1]],
duration: float,
) -> None:
Expand All @@ -739,8 +741,10 @@ async def aspirate_while_tracking(
messenger=self._messenger,
tool=tool,
head_node=head_node,
distance=distance,
speed=speed,
z_distance=z_distance,
z_speed=z_speed,
plunger_distance=plunger_distance,
plunger_speed=plunger_speed,
direction=direction,
duration=duration,
)
Expand Down
22 changes: 13 additions & 9 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2967,16 +2967,19 @@ async def capacitive_sweep(
async def aspirate_while_tracking(
self,
mount: Union[top_types.Mount, OT3Mount],
distance: float,
rate: float,
volume: Optional[float] = None,
z_distance: float,
flow_rate: float,
volume: float,
) -> None:
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette."""
realmount = OT3Mount.from_mount(mount)
aspirate_spec = self._pipette_handler.plan_check_aspirate(
realmount, volume, rate
realmount, volume, flow_rate
)
# what is aspirate_spec.volume for
aspirate_duration = volume / flow_rate
z_speed = z_distance / aspirate_duration
if not aspirate_spec:
return

Expand All @@ -2989,12 +2992,13 @@ async def aspirate_while_tracking(
realmount, aspirate_spec.acceleration
)
await self._backend.aspirate_while_tracking(
mount=mount,
distance=distance,
speed=rate,
mount=realmount,
z_distance=z_distance,
z_speed=z_speed,
plunger_distance=aspirate_spec.plunger_distance,
plunger_speed=aspirate_spec.speed,
direction=-1,
# have to actually determine duration here
duration=0.0,
duration=aspirate_duration,
)
except Exception:
self._log.exception("Aspirate failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,12 @@ async def liquid_probe(
max_z_dist : maximum depth to probe for liquid
"""
...

async def aspirate_while_tracking(
self,
mount: MountArgType,
z_distance: float,
flow_rate: float,
volume: float,
) -> None:
...
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
if params.is_tracking:
aspirate_result = await aspirate_while_tracking(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
volume=params.volume,
flow_rate=params.flowRate,
location_if_error={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,21 @@ async def aspirate_in_place(

async def aspirate_while_tracking(
pipette_id: str,
labware_id: str,
well_name: str,
volume: float,
flow_rate: float,
location_if_error: ErrorLocationInfo,
command_note_adder: CommandNoteAdder,
pipetting: PipettingHandler,
model_utils: ModelUtils,
) -> SuccessData[BaseLiquidHandlingResult] | DefinedErrorData[OverpressureError]:
"""Execute an aspirate in place microoperation."""
"""Execute an aspirate while tracking microoperation."""
try:
volume_aspirated = await pipetting.aspirate_while_tracking(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
volume=volume,
flow_rate=flow_rate,
command_note_adder=command_note_adder,
Expand Down
28 changes: 27 additions & 1 deletion api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ async def aspirate_in_place(
async def aspirate_while_tracking(
self,
pipette_id: str,
labware_id: str,
well_name: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
Expand Down Expand Up @@ -114,6 +116,7 @@ def get_hw_aspirate_params(
volume: float,
command_note_adder: CommandNoteAdder,
) -> Tuple[HardwarePipette, float]:
"""Get params for hardware aspirate."""
_adjusted_volume = _validate_aspirate_volume(
state_view=self._state_view,
pipette_id=pipette_id,
Expand All @@ -129,6 +132,8 @@ def get_hw_aspirate_params(
async def aspirate_while_tracking(
self,
pipette_id: str,
labware_id: str,
well_name: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
Expand All @@ -142,9 +147,18 @@ async def aspirate_while_tracking(
hw_pipette, adjusted_volume = self.get_hw_aspirate_params(
pipette_id, volume, command_note_adder
)

aspirate_z_distance = self._state_view.geometry.get_liquid_handling_z_change(
labware_id=labware_id,
well_name=well_name, # make sure the protocol engine actually has the well name atp ?
operation_volume=volume,
)
with self._set_flow_rate(pipette=hw_pipette, aspirate_flow_rate=flow_rate):
await self._hardware_api.aspirate_while_tracking(
mount=hw_pipette.mount, volume=adjusted_volume
mount=hw_pipette.mount,
z_distance=aspirate_z_distance,
flow_rate=flow_rate,
volume=adjusted_volume,
)

return adjusted_volume
Expand Down Expand Up @@ -346,6 +360,18 @@ def _validate_tip_attached(self, pipette_id: str, command_name: str) -> None:
f"Cannot perform {command_name} without a tip attached"
)

async def aspirate_while_tracking(
self,
pipette_id: str,
labware_id: str,
well_name: str,
volume: float,
flow_rate: float,
command_note_adder: CommandNoteAdder,
) -> float:
"""Aspirate while moving the z stage with the liquid meniscus."""
return 0.0


def create_pipetting_handler(
state_view: StateView, hardware_api: HardwareControlAPI
Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,25 @@ def get_offset_location(self, labware_id: str) -> Optional[LabwareOffsetLocation

return None

def get_liquid_handling_z_change(
self,
labware_id: str,
well_name: str,
operation_volume: float,
) -> float:
"""Get the change in height from a liquid handling operation."""
initial_handling_height = self.get_meniscus_height(
labware_id=labware_id, well_name=well_name
)
final_height = self.get_well_height_after_volume(
labware_id=labware_id,
well_name=well_name,
initial_height=initial_handling_height,
volume=operation_volume,
)
# make sure we handle aspirate and dispense both directions
return initial_handling_height - final_height

def get_well_offset_adjustment(
self,
labware_id: str,
Expand Down
15 changes: 9 additions & 6 deletions hardware/opentrons_hardware/hardware_control/tool_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,23 +258,26 @@ async def finalize_logs(
for listener in listeners.values():
await listener.wait_for_complete()


async def move_plunger_while_tracking_z(
messenger: CanMessenger,
tool: PipetteProbeTarget,
head_node: NodeId,
distance: float,
speed: float,
z_distance: float,
z_speed: float,
plunger_distance: float,
plunger_speed: float,
direction: Union[Literal[1], Literal[-1]],
duration: float,
) -> Dict[NodeId, MotorPositionStatus]:
liquid_action_step = create_step(
distance={
tool: float64(abs(distance) * direction),
head_node: float64(abs(distance) * direction)
tool: float64(abs(plunger_distance) * direction),
head_node: float64(abs(z_distance) * direction)
},
velocity={
tool: float64(abs(speed) * direction),
head_node: float64(abs(speed) * direction)
tool: float64(abs(plunger_speed) * direction),
head_node: float64(abs(z_speed) * direction)
},
acceleration={},
duration=float64(duration),
Expand Down

0 comments on commit 606fc96

Please sign in to comment.