Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rough dynamic liquid tracking #17167

Closed
wants to merge 10 commits into from
16 changes: 16 additions & 0 deletions api/src/opentrons/hardware_control/motion_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ def target_position_from_plunger(
return all_axes_pos


def target_positions_from_plunger_tracking(
mount: Union[Mount, OT3Mount],
plunger_delta: float,
z_delta: float,
current_position: Dict[Axis, float],
) -> "OrderedDict[Axis, float]":
"""Create a target position for machine axes including plungers.

The x/y axis remain constant but the plunger and Z move to create a tracking action.
"""
all_axes_pos = target_position_from_plunger(mount, plunger_delta, current_position)
z_ax = Axis.by_mount(mount)
all_axes_pos[z_ax] = current_position[z_ax] + z_delta
return all_axes_pos


def deck_point_from_machine_point(
machine_point: Point, attitude: AttitudeMatrix, offset: Point
) -> Point:
Expand Down
90 changes: 89 additions & 1 deletion api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
target_position_from_absolute,
target_position_from_relative,
target_position_from_plunger,
target_positions_from_plunger_tracking,
offset_for_mount,
deck_from_machine,
machine_from_deck,
Expand Down Expand Up @@ -2750,7 +2751,7 @@ async def liquid_probe( # noqa: C901
if not probe_settings:
probe_settings = deepcopy(self.config.liquid_sense)

# We need to significatly slow down the 96 channel liquid probe
# We need to significantly slow down the 96 channel liquid probe
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
max_plunger_speed = self.config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
Expand Down Expand Up @@ -2964,6 +2965,93 @@ async def capacitive_sweep(

AMKey = TypeVar("AMKey")

async def aspirate_while_tracking(
self,
mount: Union[top_types.Mount, OT3Mount],
z_distance: float,
flow_rate: float,
volume: float,
) -> None:
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette."""
realmount = OT3Mount.from_mount(mount)
aspirate_spec = self._pipette_handler.plan_check_aspirate(
realmount, volume, flow_rate
)
if not aspirate_spec:
return

target_pos = target_positions_from_plunger_tracking(
realmount,
aspirate_spec.plunger_distance,
z_distance,
self._current_position,
)

try:
await self._backend.set_active_current(
{aspirate_spec.axis: aspirate_spec.current}
)
async with self.restore_system_constrants():
await self.set_system_constraints_for_plunger_acceleration(
realmount, aspirate_spec.acceleration
)
await self._move(
target_pos,
speed=aspirate_spec.speed,
home_flagged_axes=False,
)
except Exception:
self._log.exception("Aspirate failed")
aspirate_spec.instr.set_current_volume(0)
raise
else:
aspirate_spec.instr.add_current_volume(aspirate_spec.volume)

async def dispense_while_tracking(
self,
mount: Union[top_types.Mount, OT3Mount],
z_distance: float,
flow_rate: float,
volume: float,
push_out: Optional[float],
) -> None:
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette."""
realmount = OT3Mount.from_mount(mount)
dispense_spec = self._pipette_handler.plan_check_dispense(
realmount, volume, flow_rate, push_out
)
if not dispense_spec:
return

target_pos = target_positions_from_plunger_tracking(
realmount,
dispense_spec.plunger_distance,
z_distance,
self._current_position,
)

try:
await self._backend.set_active_current(
{dispense_spec.axis: dispense_spec.current}
)
async with self.restore_system_constrants():
await self.set_system_constraints_for_plunger_acceleration(
realmount, dispense_spec.acceleration
)
await self._move(
target_pos,
speed=dispense_spec.speed,
home_flagged_axes=False,
)
except Exception:
self._log.exception("dispense failed")
dispense_spec.instr.set_current_volume(0)
raise
else:
dispense_spec.instr.remove_current_volume(dispense_spec.volume)

@property
def attached_subsystems(self) -> Dict[SubSystem, SubSystemState]:
"""Get a view of the state of the currently-attached subsystems."""
Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/hardware_control/protocols/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,22 @@ async def liquid_probe(
max_z_dist : maximum depth to probe for liquid
"""
...

async def aspirate_while_tracking(
self,
mount: MountArgType,
z_distance: float,
flow_rate: float,
volume: float,
) -> None:
...

async def dispense_while_tracking(
self,
mount: MountArgType,
z_distance: float,
flow_rate: float,
volume: float,
push_out: Optional[float],
) -> None:
...
136 changes: 109 additions & 27 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

from __future__ import annotations

from typing import Optional, TYPE_CHECKING, cast, Union, List
from opentrons.types import Location, Mount, NozzleConfigurationType, NozzleMapInterface
from typing import Optional, TYPE_CHECKING, cast, Union, List, Tuple
from opentrons.types import (
Location,
Mount,
NozzleConfigurationType,
NozzleMapInterface,
MeniscusTracking,
)
from opentrons.hardware_control import SyncHardwareAPI
from opentrons.hardware_control.dev_types import PipetteDict
from opentrons.protocols.api_support.util import FlowRates, find_value_for_api_version
Expand Down Expand Up @@ -134,7 +140,7 @@ def aspirate(
rate: float,
flow_rate: float,
in_place: bool,
is_meniscus: Optional[bool] = None,
meniscus_tracking: Optional[MeniscusTracking] = None,
) -> None:
"""Aspirate a given volume of liquid from the specified location.
Args:
Expand All @@ -144,6 +150,7 @@ def aspirate(
rate: Not used in this core.
flow_rate: The flow rate in µL/s to aspirate at.
in_place: whether this is a in-place command.
meniscus_tracking: Optional data about where to aspirate from.
"""
if well_core is None:
if not in_place:
Expand Down Expand Up @@ -173,27 +180,44 @@ def aspirate(
labware_id=labware_id,
well_name=well_name,
absolute_point=location.point,
is_meniscus=is_meniscus,
meniscus_tracking=meniscus_tracking,
)
if well_location.origin == WellOrigin.MENISCUS:
well_location.volumeOffset = "operationVolume"
dynamic_liquid_tracking = False
# caila bookmark
if meniscus_tracking:
if meniscus_tracking.target == "end":
well_location.volumeOffset = "operationVolume"
elif meniscus_tracking.target == "dynamic_meniscus":
dynamic_liquid_tracking = True
pipette_movement_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.execute_command(
cmd.AspirateParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
if dynamic_liquid_tracking:
self._engine_client.execute_command(
cmd.AspirateWhileTrackingParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
)
)
else:
self._engine_client.execute_command(
cmd.AspirateParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
)
)
)

self._protocol_core.set_last_location(location=location, mount=self.get_mount())

Expand All @@ -206,7 +230,7 @@ def dispense(
flow_rate: float,
in_place: bool,
push_out: Optional[float],
is_meniscus: Optional[bool] = None,
meniscus_tracking: Optional[MeniscusTracking] = None,
) -> None:
"""Dispense a given volume of liquid into the specified location.
Args:
Expand All @@ -217,7 +241,9 @@ def dispense(
flow_rate: The flow rate in µL/s to dispense at.
in_place: whether this is a in-place command.
push_out: The amount to push the plunger below bottom position.
meniscus_tracking: Optional data about where to dispense from.
"""
# raise ValueError(f"well location = {location}")
if self._protocol_core.api_version < _DISPENSE_VOLUME_VALIDATION_ADDED_IN:
# In older API versions, when you try to dispense more than you can,
# it gets clamped.
Expand Down Expand Up @@ -266,26 +292,45 @@ def dispense(
labware_id=labware_id,
well_name=well_name,
absolute_point=location.point,
is_meniscus=is_meniscus,
meniscus_tracking=meniscus_tracking,
)
dynamic_liquid_tracking = False
if meniscus_tracking:
if meniscus_tracking.target == "end":
well_location.volumeOffset = "operationVolume"
elif meniscus_tracking.target == "dynamic_meniscus":
dynamic_liquid_tracking = True
pipette_movement_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.execute_command(
cmd.DispenseParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
pushOut=push_out,
if dynamic_liquid_tracking:
self._engine_client.execute_command(
cmd.DispenseWhileTrackingParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
pushOut=push_out,
)
)
else:
self._engine_client.execute_command(
cmd.DispenseParams(
pipetteId=self._pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
volume=volume,
flowRate=flow_rate,
pushOut=push_out,
)
)
)

if isinstance(location, (TrashBin, WasteChute)):
self._protocol_core.set_last_location(location=None, mount=self.get_mount())
Expand Down Expand Up @@ -968,6 +1013,43 @@ def liquid_probe_with_recovery(self, well_core: WellCore, loc: Location) -> None

self._protocol_core.set_last_location(location=loc, mount=self.get_mount())

def liquid_probe_testing_data(
self,
well_core: WellCore,
loc: Location,
operation_volume: float,
) -> Tuple[float, float, float]:
labware_id = well_core.labware_id
well_name = well_core.get_name()
well_location = WellLocation(
origin=WellOrigin.TOP, offset=WellOffset(x=0, y=0, z=2)
)
result = self._engine_client.execute_command_without_recovery(
cmd.LiquidProbeParams(
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
pipetteId=self.pipette_id,
)
)

self._protocol_core.set_last_location(location=loc, mount=self.get_mount())
projected_current_volume = (
self._engine_client.state.geometry.get_well_volume_at_height(
labware_id=labware_id, well_name=well_name, height=result.z_position
)
)
projected_final_height = (
self._engine_client.state.geometry.get_well_height_after_volume(
labware_id=labware_id,
well_name=well_name,
initial_height=result.z_position,
volume=operation_volume,
)
)

return result.z_position, projected_current_volume, projected_final_height

def liquid_probe_without_recovery(
self, well_core: WellCore, loc: Location
) -> float:
Expand Down
Loading
Loading