import requests
import time
import logging
import json
import tomllib
from pathlib import Path
from math import ceil
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.prepare.OT2DeckWebAppMixin import OT2DeckWebAppMixin
from AFL.automation.shared.utilities import listify
# Add this constant at the top of the file, after the imports
TIPRACK_WELLS = [f"{row}{col}" for col in range(1, 13) for row in "ABCDEFGH"]
[docs]
class OT2HTTPDriver(OT2DeckWebAppMixin, Driver):
PIPETTE_NAME_ALIASES = {
"p10": "p10_single",
"p10_single": "p10_single",
"p10_single_gen1": "p10_single",
"p300": "p300_single",
"p300_single": "p300_single",
"p1000": "p1000_single",
"p1000_single": "p1000_single",
}
EXPECTED_TIPRACK_TOKEN = {
"p10_single": "10ul",
"p300_single": "300ul",
"p1000_single": "1000ul",
}
defaults = {}
defaults["robot_ip"] = "127.0.0.1" # Default to localhost, should be overridden
defaults["robot_port"] = "31950" # Default Opentrons HTTP API port
defaults["loaded_labware"] = {} # Persistent storage for loaded labware
defaults["loaded_instruments"] = {} # Persistent storage for loaded instruments
defaults["loaded_modules"] = {} # Persistent storage for loaded modules
defaults["available_tips"] = {} # Persistent storage for available tips, Format: {mount: [(tiprack_id, well_name), ...]}
defaults["prep_targets"] = [] # Persistent storage for prep target well locations
[docs]
def __init__(self, overrides=None):
self.app = None
Driver.__init__(
self,
name="OT2_HTTP_Driver",
defaults=self.gather_defaults(),
overrides=overrides,
)
self.name = "OT2_HTTP_Driver"
# Initialize state variables
self.session_id = None
self.protocol_id = None
self.max_transfer = None
self.min_transfer = None
self.has_tip = False
self.last_pipette = None
self.modules = {}
self.pipette_info = {}
# Custom labware handling
self.custom_labware_files = {}
self.sent_custom_labware = set()
self.custom_labware_dir = self._get_custom_labware_dir()
self._load_custom_labware_defs()
# Base URL for HTTP requests
self.base_url = f"http://{self.config['robot_ip']}:{self.config['robot_port']}"
self.headers = {"Opentrons-Version": "2"}
# Initialize the robot connection
self._initialize_robot()
self.useful_links['View Deck'] = '/visualize_deck'
def _log(self, level, message):
"""Safe logging that checks if app exists before logging"""
if self.app is not None and hasattr(self.app, "logger"):
log_method = getattr(self.app.logger, level, None)
if log_method:
log_method(message)
else:
print(f"[{level.upper()}] {message}")
[docs]
def log_info(self, message):
"""Log info message safely"""
self._log("info", message)
[docs]
def log_error(self, message):
"""Log error message safely"""
self._log("error", message)
[docs]
def log_debug(self, message):
"""Log debug message safely"""
self._log("debug", message)
[docs]
def log_warning(self, message):
"""Log warning message safely"""
self._log("warning", message)
def _get_custom_labware_dir(self) -> Path:
"""Return the path to the custom labware directory."""
current = Path(__file__).resolve()
for parent in current.parents:
if (parent / "pyproject.toml").exists():
pyproject = parent / "pyproject.toml"
try:
with open(pyproject, "rb") as f:
data = tomllib.load(f)
custom_dir = (
data.get("tool", {})
.get("afl", {})
.get("custom_labware_dir")
)
if custom_dir:
return (parent / custom_dir).expanduser()
except Exception:
pass
return parent / "support" / "labware"
return Path.cwd() / "support" / "labware"
def _load_custom_labware_defs(self):
"""Record available custom labware definitions in the support directory."""
self.custom_labware_dir.mkdir(parents=True, exist_ok=True)
for json_file in self.custom_labware_dir.glob("*.json"):
try:
with open(json_file, "r") as f:
definition = json.load(f)
ns = definition.get("namespace", "custom_beta")
load = definition.get("parameters", {}).get("loadName")
if ns and load:
key = f"{ns}/{load}"
self.custom_labware_files[key] = json_file
except Exception:
continue
def _initialize_robot(self):
"""Initialize the connection to the robot and get basic information"""
try:
# Check if the robot is reachable
response = requests.get(url=f"{self.base_url}/health", headers=self.headers)
if response.status_code != 200:
raise ConnectionError(f"Failed to connect to robot at {self.base_url}")
# Get attached pipettes
self._update_pipettes()
except requests.exceptions.RequestException as e:
self.log_error(f"Error connecting to robot: {str(e)}")
raise ConnectionError(
f"Error connecting to robot at {self.base_url}: {str(e)}"
)
def _update_pipettes(self):
"""Get information about attached pipettes and their settings"""
try:
if self.app is not None:
self.log_info("Fetching pipette information from robot")
# Get basic pipette information
response = requests.get(
url=f"{self.base_url}/instruments", headers=self.headers
)
if response.status_code != 200:
raise RuntimeError(f"Failed to get pipettes: {response.text}")
pipettes_data = response.json()['data']
self.pipette_info = {}
# Update min/max transfer values based on attached pipettes
self.min_transfer = None
self.max_transfer = None
for pipette in pipettes_data:
mount = pipette['mount']
try:
pipette_id = self.config["loaded_instruments"][mount]["pipette_id"] # the id from this run
except KeyError:
pipette_id = None
# Store basic pipette info
self.pipette_info[mount] = {
"id": pipette_id,
"name": pipette["instrumentName"],
"model": pipette["instrumentModel"],
"serial": pipette["serialNumber"],
"mount": mount,
"min_volume": pipette.get("data",{}).get("min_volume", None),
"max_volume": pipette.get("data",{}).get("max_volume", None),
"aspirate_flow_rate": pipette.get("data",{}).get(
"aspirateFlowRate", {}
).get("value",150),
"dispense_flow_rate": pipette.get("data",{}).get(
"dispenseFlowRate", {}
).get("value",150),
"channels": pipette.get("data",{}).get("channels", 1),
}
if pipette_id is None:
continue
# Update global min/max transfer values
min_volume = self.pipette_info[mount]['min_volume']
max_volume = self.pipette_info[mount]['max_volume']
if (self.min_transfer is None) or (self.min_transfer > min_volume):
self.min_transfer = min_volume
if self.app is not None:
self.log_info(
f"Setting minimum transfer to {self.min_transfer}"
)
if (self.max_transfer is None) or (self.max_transfer < max_volume):
self.max_transfer = max_volume
if self.app is not None:
self.log_info(
f"Setting maximum transfer to {self.max_transfer}"
)
if self.app is not None:
self.log_info(f"Pipette information updated: {self.pipette_info}")
except Exception as e:
raise RuntimeError(f"Error getting pipettes: {str(e)}")
[docs]
def reset_prep_targets(self):
"""Clear the list of preparation targets stored in the config."""
self.config["prep_targets"] = []
[docs]
def add_prep_targets(self, targets, reset=False):
"""Add well locations to the preparation target list."""
if reset:
self.reset_prep_targets()
self.config.setdefault("prep_targets", [])
self.config["prep_targets"].extend(listify(targets))
self.config._update_history()
[docs]
def get_prep_target(self):
return self.config["prep_targets"].pop(0)
[docs]
def status(self):
status = []
prep_targets = self.config.get("prep_targets", [])
if len(prep_targets) > 0:
status.append(f"Next prep target: {prep_targets[0]}")
status.append(f"Remaining prep targets: {len(prep_targets)}")
else:
status.append("No prep targets loaded")
status.append(self.get_tip_status())
# Get current session status if available
if self.session_id:
try:
response = requests.get(
url=f"{self.base_url}/sessions/{self.session_id}",
headers=self.headers,
)
if response.status_code == 200:
session_data = response.json().get("data", {})
current_state = session_data.get("details", {}).get(
"currentState", "unknown"
)
status.append(f"Session state: {current_state}")
except requests.exceptions.RequestException:
status.append("Unable to get session status")
# Get pipette information
for mount, pipette in self.pipette_info.items():
if pipette:
status.append(
f"Pipette on {mount} mount: {pipette.get('model', 'unknown')}"
)
# Get loaded labware information
try:
for slot, (labware_id, name, _) in self.config["loaded_labware"].items():
status.append(f"Labware in slot {slot}: {name}")
except Exception:
print(self.config["loaded_labware"])
return status
[docs]
@Driver.quickbar(
qb={
"button_text": "Refill Tipracks",
"params": {
"mount": {
"label": "Which Pipet left/right/both",
"type": "text",
"default": "both",
},
},
}
)
def reset_tipracks(self, mount="both"):
"""Reset the available tips for the specified mount(s)"""
self.log_info(f"Resetting tipracks for {mount} mount")
mounts_to_reset = []
if mount == "both":
mounts_to_reset = list(self.config["loaded_instruments"].keys())
else:
mounts_to_reset = [mount]
for m in mounts_to_reset:
if m in self.config["loaded_instruments"]:
# Reinitialize available tips for this mount
self.config["available_tips"][m] = []
for tiprack in self.config["loaded_instruments"][m]["tip_racks"]:
for well in TIPRACK_WELLS:
self.config["available_tips"][m].append((tiprack, well))
self.log_info(f"Reset {len(self.config['available_tips'][m])} tips for {m} mount")
# Reset tip status
self.has_tip = False
[docs]
def reset(self):
self.log_info("Resetting the protocol context")
# Delete any active session
if self.session_id:
try:
requests.delete(
url=f"{self.base_url}/sessions/{self.session_id}",
headers=self.headers,
)
except requests.exceptions.RequestException as e:
self.log_error(f"Error deleting session: {str(e)}")
# Delete any uploaded protocol
if self.protocol_id:
try:
requests.delete(
url=f"{self.base_url}/protocols/{self.protocol_id}",
headers=self.headers,
)
except requests.exceptions.RequestException as e:
self.log_error(f"Error deleting protocol: {str(e)}")
# Reset state variables
self.session_id = None
self.protocol_id = None
self.has_tip = False
self.last_pipette = None
# Reset deck configuration too
self.reset_deck()
# Re-initialize robot connection
self._initialize_robot()
[docs]
def reset_deck(self):
"""Reset the deck configuration, clearing loaded labware, instruments, and modules"""
self.log_info("Resetting the deck configuration")
# Clear the deck configuration
self.config["loaded_labware"] = {}
self.config["loaded_instruments"] = {}
self.config["loaded_modules"] = {}
self.config["available_tips"] = {}
self.config["prep_targets"] = []
# Clear internal state variables
self.modules = {}
self.sent_custom_labware = set()
self.run_id = None
[docs]
@Driver.quickbar(qb={"button_text": "Home"})
def home(self, **kwargs):
"""
Home the robot's axes using the dedicated /robot/home endpoint.
This endpoint is a direct control endpoint and doesn't require creating a run.
It can be used to home all axes at once or specific axes as needed.
"""
self.log_info("Homing the robot's axes")
try:
# Call the dedicated home endpoint
response = requests.post(
url=f"{self.base_url}/robot/home",
headers=self.headers,
json={
"target": "robot", # Home the entire robot
},
)
if response.status_code != 200:
self.log_error(f"Failed to home robot: {response.status_code}")
self.log_error(f"Response: {response.text}")
raise RuntimeError(f"Failed to home robot: {response.text}")
self.log_info("Robot homing completed successfully")
return True
except requests.exceptions.RequestException as e:
self.log_error(f"Error during homing: {str(e)}")
raise RuntimeError(f"Error during homing: {str(e)}")
[docs]
def parse_well(self, loc):
"""Parse a well location string into slot and well components"""
# Default value in case no alphabetic character is found
i = 0
for i, loc_part in enumerate(list(loc)):
if loc_part.isalpha():
break
slot = loc[:i]
well = loc[i:]
return slot, well
[docs]
def get_wells(self, locs):
"""Convert location strings to well objects with proper labware IDs, and check that wells are valid.
Args:
locs: Single location string or list of location strings in format "slotwell" (e.g. "1A1")
Returns:
List of well objects with labwareId and wellName
Raises:
ValueError: If labware is not found in the specified slot
"""
self.log_debug(f"Converting locations to well objects: {locs}")
wells = []
for loc in listify(locs):
slot, well = self.parse_well(loc)
# Get labware info from the slot
labware_info = self.config['loaded_labware'].get(slot)
if not labware_info:
raise ValueError(f"No labware found in slot {slot}")
if not isinstance(labware_info, tuple) or len(labware_info) < 1:
raise ValueError(f"Invalid labware info format in slot {slot}")
labware_id = labware_info[0]
wells.append({"labwareId": labware_id, "wellName": well})
self.log_debug(f"Created well objects: {wells}")
# Check well validity here
assert slot in self.config["loaded_labware"].keys(), f"Slot {slot} does not have any loaded labware"
assert well in self.config["loaded_labware"][slot][2]['definition']['wells'].keys(), f"Well {well} is not a valid well for slot {slot}, {self.config['loaded_labware'][slot][2]['definition']['metadata']['displayName']}"
return wells
def _check_cmd_success(self, response):
if response.status_code != 201:
self.log_error(
f"Failed to execute command : {response.status_code}"
)
self.log_error(f"Response: {response.text}")
raise RuntimeError(
f"Failed to execute command: {response.text}"
)
if 'status' in response.json()['data'].keys():
if response.json()['data']['status'] == 'failed':
self.log_error(
f"Command returned error : {response.status_code}"
)
self.log_error(f"Response: {response.text}")
raise RuntimeError(
f"Command returned error: {response.text}"
)
[docs]
def send_labware(self, labware_def, check_run_status=True):
"""Send a custom labware definition to the current run and persist it.
Args:
check_run_status: If False, skip HTTP GET check when ensuring run exists.
Passed to _ensure_run_exists for optimization.
"""
self.log_debug(f"Sending custom labware definition: {labware_def}")
ns = labware_def.get("namespace", "custom_beta")
load_name = labware_def.get("parameters", {}).get("loadName")
if not load_name:
raise ValueError("labware_def missing parameters.loadName")
key = f"{ns}/{load_name}"
# Persist the definition for future use
self.custom_labware_dir.mkdir(parents=True, exist_ok=True)
file_path = self.custom_labware_dir / f"{ns}_{load_name}.json"
with open(file_path, "w") as f:
json.dump(labware_def, f, indent=2)
self.custom_labware_files[key] = file_path
# Avoid re-sending if already uploaded
if key in self.sent_custom_labware:
self.log_debug(f"Labware {key} already sent to robot")
return key
# Ensure we have a valid run
run_id = self._ensure_run_exists(check_run_status=check_run_status)
try:
command_dict = {"data": labware_def}
response = requests.post(
url=f"{self.base_url}/runs/{run_id}/labware_definitions",
headers=self.headers,
params={"waitUntilComplete": True},
json=command_dict,
)
self._check_cmd_success(response)
response_data = response.json()
labware_name = response_data["data"]["definitionUri"]
self.sent_custom_labware.add(key)
self.log_info(
f"Successfully sent custom labware with name/URI {labware_name}"
)
return labware_name
except (requests.exceptions.RequestException, KeyError) as e:
self.log_error(f"Error sending custom labware: {str(e)}")
raise RuntimeError(f"Error sending custom labware: {str(e)}")
[docs]
def load_labware(self, name, slot, module=None, check_run_status=True, **kwargs):
"""Load labware (containers, tipracks) into the protocol using HTTP API
Args:
check_run_status: If False, skip HTTP GET check when ensuring run exists.
Passed to _ensure_run_exists and send_labware for optimization.
"""
self.log_debug(f"Loading labware '{name}' into slot '{slot}'")
# Ensure we have a valid run
run_id = self._ensure_run_exists(check_run_status=check_run_status)
labware_json = kwargs.pop("labware_json", None)
version = 1
if labware_json is not None:
namespace = labware_json.get("namespace", "custom_beta")
load_name = labware_json.get("parameters", {}).get("loadName")
if not load_name:
raise ValueError("labware_json missing parameters.loadName")
name = load_name
self.send_labware(labware_json, check_run_status=check_run_status)
else:
if "/" in name:
namespace, load_name = name.split("/", 1)
name = load_name
else:
load_name = name
if f"custom_beta/{load_name}" in self.custom_labware_files:
namespace = "custom_beta"
elif f"opentrons/{load_name}" in self.custom_labware_files:
namespace = "opentrons"
else:
namespace = "opentrons"
key = f"{namespace}/{load_name}"
if namespace != "opentrons" and key not in self.sent_custom_labware:
path = self.custom_labware_files.get(key)
if path and Path(path).exists():
with open(path, "r") as f:
definition = json.load(f)
self.send_labware(definition, check_run_status=check_run_status)
else:
self.log_warning(f"Custom labware definition not found for {key}")
try:
# Check if there's existing labware in the slot
if slot in self.config["loaded_labware"]:
self.log_info(
f"Found existing labware in slot {slot}, moving it off-deck first"
)
existing_labware_id = self.config["loaded_labware"][slot][
0
] # Get the ID of existing labware
# Create command to move existing labware off-deck
move_command = {
"data": {
"commandType": "moveLabware",
"params": {
"labwareId": existing_labware_id,
"newLocation": "offDeck",
"strategy": "manualMoveWithoutPause", # Allow user to manually move the labware
},
"intent": "setup",
}
}
# Execute the move command
move_response = requests.post(
url=f"{self.base_url}/runs/{run_id}/commands",
headers=self.headers,
params={"waitUntilComplete": True},
json=move_command,
)
self._check_cmd_success(move_response)
# Remove from our tracking
del self.config["loaded_labware"][slot]
if str(slot) in self.config["loaded_modules"].keys():
# we need to load into a module, not a slot
location = {"moduleId": self.config["loaded_modules"][str(slot)][0]}
else:
location = {"slotName": str(slot)}
# Determine namespace and version
version = 1 # default version
# Prepare the loadLabware command
command_dict = {
"data": {
"commandType": "loadLabware",
"params": {
"location": location,
"loadName": name,
"namespace": namespace,
"version": version,
},
"intent": "setup",
}
}
# If this is a module, we need to specify the moduleId
if module:
command_dict["data"]["params"]["moduleId"] = module
# Execute the command
response = requests.post(
url=f"{self.base_url}/runs/{run_id}/commands",
headers=self.headers,
params={"waitUntilComplete": True},
json=command_dict,
)
self._check_cmd_success(response)
# Get the labware ID from the response
response_data = response.json()
# Debug log the response structure
self.log_debug(f"Load labware response: {response_data}")
# Handle different response structures that might occur
try:
if "data" in response_data and "result" in response_data["data"]:
labware_id = response_data["data"]["result"]["labwareId"]
elif "data" in response_data and "labwareId" in response_data["data"]:
labware_id = response_data["data"]["labwareId"]
elif "data" in response_data and "id" in response_data["data"]:
labware_id = response_data["data"]["id"]
else:
# Try to find labware ID in any structure
self.log_warning(f"Unexpected response structure: {response_data}")
for key, value in response_data.items():
if isinstance(value, dict) and "labwareId" in value:
labware_id = value["labwareId"]
break
else:
raise KeyError("Could not find labwareId in response")
except KeyError as e:
self.log_error(f"Error extracting labware ID from response: {str(e)}")
self.log_error(f"Response data: {response_data}")
raise RuntimeError(
f"Failed to extract labware ID from response: {str(e)}"
)
result = response_data["data"]["result"]
# Store the labware information directly in config
self.config["loaded_labware"][slot] = (labware_id, name, result)
# If this is a module, store it
if module:
self.modules[slot] = module
self.log_info(
f"Successfully loaded labware '{name}' in slot {slot} with ID {labware_id}"
)
self.config._update_history()
return labware_id
except (requests.exceptions.RequestException, KeyError) as e:
self.log_error(f"Error loading labware: {str(e)}")
raise RuntimeError(f"Error loading labware: {str(e)}")
[docs]
def load_module(self, name, slot, check_run_status=True, **kwargs):
"""Load modules (heater-shaker, tempdeck) into the protocol using HTTP API
Args:
check_run_status: If False, skip HTTP GET check when ensuring run exists.
Passed to _ensure_run_exists for optimization.
"""
self.log_debug(f"Loading module '{name}' into slot '{slot}'")
# Ensure we have a valid run
run_id = self._ensure_run_exists(check_run_status=check_run_status)
try:
if slot in self.config["loaded_modules"].keys():
# todo: check if same module
raise RuntimeError(f"Module already loaded in slot {slot}: {self.config['loaded_modules']['slot']}. Overwrite not supported.")
# Prepare the loadLabware command
command_dict = {
"data": {
"commandType": "loadModule",
"params": {
"location": {"slotName": str(slot)},
"model": name,
},
"intent": "setup",
}
}
# Execute the command
response = requests.post(
url=f"{self.base_url}/runs/{run_id}/commands",
headers=self.headers,
params={"waitUntilComplete": True},
json=command_dict,
)
self._check_cmd_success(response)
# Get the labware ID from the response
response_data = response.json()
# Debug log the response structure
self.log_debug(f"Load labware response: {response_data}")
# Handle different response structures that might occur
try:
if "data" in response_data and "result" in response_data["data"]:
module_id = response_data["data"]["result"]["moduleId"]
elif "data" in response_data and "moduleId" in response_data["data"]:
module_id = response_data["data"]["moduleId"]
elif "data" in response_data and "id" in response_data["data"]:
module_id = response_data["data"]["id"]
else:
# Try to find labware ID in any structure
self.log_warning(f"Unexpected response structure: {response_data}")
for key, value in response_data.items():
if isinstance(value, dict) and "moduleId" in value:
module_id = value["moduleId"]
break
else:
raise KeyError("Could not find moduleId in response")
except KeyError as e:
self.log_error(f"Error extracting module ID from response: {str(e)}")
self.log_error(f"Response data: {response_data}")
raise RuntimeError(
f"Failed to extract module ID from response: {str(e)}"
)
# Store the module information directly in config
self.config["loaded_modules"][str(slot)] = (module_id, name)
self.log_info(
f"Successfully loaded module '{name}' in slot {slot} with ID {module_id}"
)
self.config._update_history()
return module_id
except (requests.exceptions.RequestException, KeyError) as e:
self.log_error(f"Error loading module: {str(e)}")
raise RuntimeError(f"Error loading module: {str(e)}")
[docs]
def load_instrument(self, name, mount, tip_rack_slots, reload=False, check_run_status=True, update_pipettes=True, **kwargs):
"""Load pipette and store tiprack information using HTTP API.
Args:
check_run_status: If False, skip HTTP GET check when ensuring run exists.
Passed to _ensure_run_exists for optimization.
update_pipettes: If False, skip calling _update_pipettes (for optimization during reload).
"""
pipette_name = self._normalize_pipette_name(name)
mount = str(mount).strip().lower()
if mount not in {"left", "right"}:
raise ValueError(f"Mount must be 'left' or 'right'. Received: {mount!r}")
tip_rack_slots = [str(slot) for slot in listify(tip_rack_slots)]
if len(tip_rack_slots) == 0:
raise ValueError("At least one tip rack slot must be provided.")
for slot in tip_rack_slots:
if slot not in self.config["loaded_labware"]:
raise ValueError(
f"Tip rack slot {slot!r} is not loaded. Load a tiprack first."
)
labware_name = str(self.config["loaded_labware"][slot][1]).lower()
if "tiprack" not in labware_name:
self.log_warning(
f"Slot {slot} contains labware '{labware_name}', which may not be a tiprack."
)
self._warn_on_tiprack_mismatch(pipette_name, tip_rack_slots)
self.log_debug(
f"Loading pipette '{pipette_name}' on '{mount}' mount with tip_racks in slots {tip_rack_slots}"
)
# Ensure we have a valid run
run_id = self._ensure_run_exists(check_run_status=check_run_status)
try:
# First, load the pipette using the HTTP API
command_dict = {
"data": {
"commandType": "loadPipette",
"params": {
"pipetteName": pipette_name,
"mount": mount,
"tip_racks": [self.config["loaded_labware"][str(slot)][0] for slot in tip_rack_slots],
},
"intent": "setup",
}
}
# Execute the loadPipette command
response = requests.post(
url=f"{self.base_url}/runs/{run_id}/commands",
headers=self.headers,
params={"waitUntilComplete": True},
json=command_dict,
)
self._check_cmd_success(response)
# Get the pipette ID from the response
response_data = response.json()
logging.debug(f'loadPipette response: {response_data}')
pipette_id = response_data["data"]["result"]["pipetteId"]
# Make sure we have the latest pipette information (unless disabled for optimization)
if update_pipettes:
self._update_pipettes()
# Ensure pipette_info entry exists before patching
if mount not in self.pipette_info:
self.pipette_info[mount] = {}
self.pipette_info[mount][
"id"
] = pipette_id # patch the correct pipette id to the pipette_info dict
# Get the tip rack IDs - note that loaded_labware now stores tuples of (id, name)
tip_racks = []
for slot in listify(tip_rack_slots):
labware_info = self.config["loaded_labware"].get(slot)
if (
labware_info
and isinstance(labware_info, tuple)
and len(labware_info) >= 1
):
tip_racks.append(labware_info[0])
if not tip_racks:
self.log_warning(f"No valid tip racks found in slots {tip_rack_slots}")
# Store the instrument information
self.config["loaded_instruments"][mount] = {
"name": pipette_name,
"pipette_id": pipette_id,
"tip_racks": tip_racks,
}
# If not reloading, initialize available tips for this mount
if not reload:
self.config["available_tips"][mount] = []
for tiprack in tip_racks:
for well in TIPRACK_WELLS:
self.config["available_tips"][mount].append((tiprack, well))
# Verify that there's actually a pipette in this mount
if mount not in self.pipette_info or self.pipette_info[mount] is None:
self.log_warning(
f"No physical pipette detected in {mount} mount, but pipette information stored"
)
# Update min/max values for largest and smallest pipettes
self._update_pipette_ranges()
self.log_info(
f"Successfully loaded pipette '{pipette_name}' on {mount} mount with ID {pipette_id}"
)
self.config._update_history()
return pipette_id
except (requests.exceptions.RequestException, KeyError) as e:
self.log_error(f"Error loading pipette: {str(e)}")
raise RuntimeError(f"Error loading pipette: {str(e)}")
def _normalize_pipette_name(self, name):
key = str(name).strip().lower()
normalized = self.PIPETTE_NAME_ALIASES.get(key, key)
return normalized
def _warn_on_tiprack_mismatch(self, pipette_name, tip_rack_slots):
token = self.EXPECTED_TIPRACK_TOKEN.get(pipette_name)
if token is None:
return
mismatched_slots = []
for slot in tip_rack_slots:
labware_info = self.config["loaded_labware"].get(str(slot))
if labware_info is None:
continue
labware_name = str(labware_info[1]).lower()
if "tiprack" in labware_name and token not in labware_name:
mismatched_slots.append(slot)
if mismatched_slots:
self.log_warning(
f"Loaded pipette '{pipette_name}' with potentially mismatched tipracks in slots {mismatched_slots}. "
f"Expected tiprack names containing '{token}'."
)
def _update_pipette_ranges(self):
"""Update the min/max values for largest and smallest pipettes"""
self.min_largest_pipette = None
self.max_smallest_pipette = None
# Get all available pipettes with their volumes
available_pipettes = {
mount: info for mount, info in self.pipette_info.items() if info is not None
}
if available_pipettes:
# Get min and max volumes for each pipette
min_vols = {
mount: info.get("min_volume", float("inf"))
for mount, info in available_pipettes.items()
}
max_vols = {
mount: info.get("max_volume", 0)
for mount, info in available_pipettes.items()
}
# Find the smallest and largest pipettes
if max_vols:
# Use list and regular max/min functions with a key function
mounts = list(max_vols.keys())
if mounts:
largest_pipette_mount = max(
mounts, key=lambda m: max_vols.get(m, 0)
)
smallest_pipette_mount = min(
mounts, key=lambda m: max_vols.get(m, float("inf"))
)
# Set global min/max values
if min_vols and largest_pipette_mount in min_vols:
self.min_largest_pipette = min_vols[largest_pipette_mount]
self.log_info(
f"Setting min_largest_pipette to {self.min_largest_pipette}"
)
if max_vols and smallest_pipette_mount in max_vols:
self.max_smallest_pipette = max_vols[smallest_pipette_mount]
self.log_info(
f"Setting max_smallest_pipette to {self.max_smallest_pipette}"
)
[docs]
def mix(self, volume, location, repetitions=1, **kwargs):
self.log_info(f"Mixing {volume}uL {repetitions} times at {location}")
# Verify run exists once at the start, then skip checks for all atomic commands
self._ensure_run_exists()
# Get pipette based on volume
pipette = self.get_pipette(volume)
pipette_mount = pipette["mount"]
# Get the pipette ID
pipette_id = None
for mount, data in self.pipette_info.items():
if mount == pipette_mount and data:
pipette_id = data.get("id")
break
if not pipette_id:
raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount")
# Get well location
wells = self.get_wells(location)
if not wells:
raise ValueError("Invalid location")
well = wells[0]
# Pick up tip if needed
if not self.has_tip:
self._execute_atomic_command(
"pickUpTip",
{
"pipetteId": pipette_id,
"pipetteMount": pipette_mount,
"wellLocation": None, # Use next available tip in rack
},
check_run_status=False,
)
self.has_tip = True
# Execute mix by performing repetitions of aspirate/dispense
for _ in range(repetitions):
self._execute_atomic_command(
"aspirate",
{
"pipetteId": pipette_id,
"volume": volume,
"labwareId": well["labwareId"],
"wellName": well["wellName"],
"wellLocation": {
"origin": "bottom",
"offset": {"x": 0, "y": 0, "z": 0},
},
},
check_run_status=False,
)
self._execute_atomic_command(
"dispense",
{
"pipetteId": pipette_id,
"volume": volume,
"labwareId": well["labwareId"],
"wellName": well["wellName"],
"wellLocation": {
"origin": "bottom",
"offset": {"x": 0, "y": 0, "z": 0},
},
},
check_run_status=False,
)
def _split_up_transfers(self, vol):
"""Split up transfer volumes based on pipette constraints"""
transfers = []
if self.max_transfer is None or vol <= 0:
return transfers
while sum(transfers) < vol:
transfer = min(self.max_transfer, vol - sum(transfers))
# Handle case where remaining volume is less than minimum transfer
if (
transfer < (self.min_transfer or 0)
and len(transfers) > 0
and transfers[-1] >= (2 * (self.min_transfer or 0))
):
transfers[-1] -= (self.min_transfer or 0) - transfer
transfer = self.min_transfer or 0
# Handle "valley of death" case - when transfer is between pipette ranges
if (
self.min_largest_pipette is not None
and self.max_smallest_pipette is not None
and transfer < self.min_largest_pipette
and transfer > self.max_smallest_pipette
):
transfer = (
self.max_smallest_pipette
) # Use smaller pipette at max capacity
if transfer <= 0:
self.log_warning(
f"Computed nonpositive transfer volume {transfer}uL while splitting {vol}uL; stopping split."
)
break
transfers.append(transfer)
# Exit condition - we've reached the target volume
if sum(transfers) >= vol:
break
return transfers
def _slot_by_labware_uuid(self,target_uuid):
for slot, (uuid,name,_) in self.config["loaded_labware"].items():
if uuid == target_uuid:
return slot
return None
[docs]
@Driver.quickbar(
qb={
"button_text": "Transfer",
"params": {
"source": {"label": "Source Well", "type": "text", "default": "1A1"},
"dest": {"label": "Dest Well", "type": "text", "default": "1A1"},
"volume": {"label": "Volume (uL)", "type": "float", "default": 300},
},
}
)
def transfer(
self,
source,
dest,
volume,
mix_before=None,
mix_after=None,
air_gap=0,
aspirate_rate=None,
dispense_rate=None,
mix_aspirate_rate=None,
mix_dispense_rate=None,
blow_out=False,
post_aspirate_delay=0.0,
aspirate_equilibration_delay=0.0,
post_dispense_delay=0.0,
drop_tip=True,
force_new_tip=False,
to_top=True,
to_center=False,
to_top_z_offset=0,
fast_mixing=False,
touch_tip=False,
**kwargs,
):
"""Transfer fluid from one location to another using atomic HTTP API commands"""
self.log_info(f"Transferring {volume}uL from {source} to {dest}")
# Accept common aliases used by different callers.
if "blowout" in kwargs and not blow_out:
blow_out = bool(kwargs["blowout"])
if "touchTip" in kwargs and not touch_tip:
touch_tip = bool(kwargs["touchTip"])
volume_ul = float(volume)
if volume_ul <= 0:
self.log_info(f"Skipping transfer with nonpositive volume {volume_ul}uL from {source} to {dest}")
return
# Verify run exists once at the start, then skip checks for all atomic commands
self._ensure_run_exists()
# Set flow rates if specified
if aspirate_rate is not None:
self.set_aspirate_rate(aspirate_rate)
if dispense_rate is not None:
self.set_dispense_rate(dispense_rate)
# Get pipette based on volume
pipette = self.get_pipette(volume_ul)
pipette_mount = pipette["mount"] # Get the mount from the pipette object
# Get the pipette ID
pipette_id = None
for mount, data in self.pipette_info.items():
if mount == pipette_mount and data:
pipette_id = data.get("id")
break
if not pipette_id:
raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount")
# Get source and destination wells
source_wells = self.get_wells(source)
if len(source_wells) > 1:
raise ValueError("Transfer only accepts one source well at a time!")
source_well = source_wells[0]
dest_wells = self.get_wells(dest)
if len(dest_wells) > 1:
raise ValueError("Transfer only accepts one dest well at a time!")
dest_well = dest_wells[0]
# Handle special cases for well positions
source_position = "bottom" # Default position
dest_position = "bottom" # Default position
if to_top and to_center:
raise ValueError("Cannot dispense to_top and to_center simultaneously")
elif to_top:
dest_position = "top"
elif to_center:
dest_position = "center"
# Split transfers if needed
transfers = self._split_up_transfers(volume_ul)
for i, sub_volume in enumerate(transfers):
if sub_volume <= 0:
self.log_warning(
f"Skipping nonpositive sub-transfer volume {sub_volume}uL from {source} to {dest}"
)
continue
# Intermediate split transfers should not drop the tip unless explicitly forced.
is_last_subtransfer = i == (len(transfers) - 1)
effective_drop_tip = drop_tip if (is_last_subtransfer or force_new_tip) else False
# Keep tip handling consistent with the non-HTTP driver:
# reuse the current tip across split transfers unless force_new_tip is set.
if force_new_tip and self.has_tip:
tip_mount = self.last_pipette if self.last_pipette is not None else pipette_mount
tip_pipette_id = self.pipette_info.get(tip_mount, {}).get("id", pipette_id)
self._execute_atomic_command(
"moveToAddressableAreaForDropTip",
{
"pipetteId": tip_pipette_id,
"addressableAreaName": "fixedTrash",
"offset": {"x": 0, "y": 0, "z": 10},
"alternateDropLocation": False,
},
check_run_status=False,
)
self._execute_atomic_command(
"dropTipInPlace",
{"pipetteId": tip_pipette_id},
check_run_status=False,
)
self.has_tip = False
# If a tip is on a different mount, drop it before switching mounts.
if self.has_tip and self.last_pipette not in (None, pipette_mount):
tip_pipette_id = self.pipette_info.get(self.last_pipette, {}).get("id", pipette_id)
self._execute_atomic_command(
"moveToAddressableAreaForDropTip",
{
"pipetteId": tip_pipette_id,
"addressableAreaName": "fixedTrash",
"offset": {"x": 0, "y": 0, "z": 10},
"alternateDropLocation": False,
},
check_run_status=False,
)
self._execute_atomic_command(
"dropTipInPlace",
{"pipetteId": tip_pipette_id},
check_run_status=False,
)
self.has_tip = False
if not self.has_tip:
self._execute_atomic_command(
"pickUpTip",
{
"pipetteId": pipette_id,
"pipetteMount": pipette_mount,
"wellLocation": None, # Use next available tip in rack, will be updated in _execute_atomic_command
},
check_run_status=False,
)
self.has_tip = True
self.last_pipette = pipette_mount
# 1a. If destination is on a heater-shaker, stop the shaking and latch the latch pre-flight
was_shaking = False
dest_well_slot = self._slot_by_labware_uuid(dest_well['labwareId'])
source_well_slot = self._slot_by_labware_uuid(source_well['labwareId'])
heater_shaker_slots = [slot for (slot,(uuid,name)) in self.config["loaded_modules"].items() if "heaterShaker" in name]
if dest_well_slot in heater_shaker_slots or source_well_slot in heater_shaker_slots:
# latch heater-shaker
# this is contextual, maybe - seems to not cause trouble to run without conditional
#if 'closed' not in self.get_shake_latch_status():
self.latch_shaker()
# store current shake rpm and stop shake
if self.get_shake_rpm()[0] != 'idle':
shake_rpm = self.get_shake_rpm()[2]
was_shaking = True
self.stop_shake()
# 2. Mix before if specified
if mix_before is not None:
n_mixes, mix_volume = mix_before
# Set mix aspirate rate if specified
if mix_aspirate_rate is not None:
self.set_aspirate_rate(mix_aspirate_rate, pipette_mount)
# Set mix dispense rate if specified
if mix_dispense_rate is not None:
self.set_dispense_rate(mix_dispense_rate, pipette_mount)
# Mix before transfer - implement by executing multiple aspirate/dispense
for _ in range(n_mixes):
self._execute_atomic_command(
"aspirate",
{
"pipetteId": pipette_id,
"volume": mix_volume,
"labwareId": source_well["labwareId"],
"wellName": source_well["wellName"],
"wellLocation": {
"origin": source_position,
"offset": {"x": 0, "y": 0, "z": 0},
},
"flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'],
},
check_run_status=False,
)
self._execute_atomic_command(
"dispense",
{
"pipetteId": pipette_id,
"volume": mix_volume,
"labwareId": source_well["labwareId"],
"wellName": source_well["wellName"],
"wellLocation": {
"origin": source_position,
"offset": {"x": 0, "y": 0, "z": 0},
},
"flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'],
},
check_run_status=False,
)
# Restore original rates
if mix_aspirate_rate is not None or mix_dispense_rate is not None:
# Reset rates to default or specified rates
if aspirate_rate is not None:
self.set_aspirate_rate(aspirate_rate, pipette_mount)
if dispense_rate is not None:
self.set_dispense_rate(dispense_rate, pipette_mount)
# 3. Aspirate
self._execute_atomic_command(
"aspirate",
{
"pipetteId": pipette_id,
"volume": sub_volume,
"labwareId": source_well["labwareId"],
"wellName": source_well["wellName"],
"wellLocation": {
"origin": source_position,
"offset": {"x": 0, "y": 0, "z": 0},
},
"flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'],
},
check_run_status=False,
)
# 4. Aspirate equilibration delay (while tip is in liquid)
if aspirate_equilibration_delay > 0:
time.sleep(aspirate_equilibration_delay)
# self._execute_atomic_command("delay", {"seconds": aspirate_equilibration_delay})
# 5. Move tip above liquid and post-aspirate delay (tip above liquid)
self._execute_atomic_command(
"moveToWell",
{
"pipetteId": pipette_id,
"labwareId": source_well["labwareId"],
"wellName": source_well["wellName"],
"wellLocation": {
"origin": "top",
"offset": {"x": 0, "y": 0, "z": 0},
},
},
check_run_status=False,
)
if post_aspirate_delay > 0:
time.sleep(post_aspirate_delay)
# self._execute_atomic_command("delay", {"seconds": post_aspirate_delay})
# 6. Air gap if specified
if air_gap > 0:
# Air gap is implemented as aspirate at the top of the source well
self._execute_atomic_command(
"aspirate",
{
"pipetteId": pipette_id,
"volume": air_gap,
"labwareId": source_well["labwareId"],
"wellName": source_well["wellName"],
"wellLocation": {
"origin": "top",
"offset": {"x": 0, "y": 0, "z": 0},
},
"flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'],
},
check_run_status=False,
)
# 7. Dispense
offset = {
"x": 0,
"y": 0,
"z": (
to_top_z_offset
if dest_position == "top" and to_top_z_offset != 0
else 0
),
}
self._execute_atomic_command(
"dispense",
{
"pipetteId": pipette_id,
"volume": sub_volume
+ air_gap, # Include air gap in dispense volume
"labwareId": dest_well["labwareId"],
"wellName": dest_well["wellName"],
"wellLocation": {"origin": dest_position, "offset": offset},
"flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'],
},
check_run_status=False,
)
# 8. Post-dispense delay
if post_dispense_delay > 0:
time.sleep(post_dispense_delay)
# self._execute_atomic_command("delay", {"seconds": post_dispense_delay})
# 9. Mix after if specified
if mix_after is not None:
n_mixes, mix_volume = mix_after
# Set mix aspirate rate if specified
if mix_aspirate_rate is not None:
self.set_aspirate_rate(mix_aspirate_rate, pipette_mount)
# Set mix dispense rate if specified
if mix_dispense_rate is not None:
self.set_dispense_rate(mix_dispense_rate, pipette_mount)
# Mix after transfer should be performed from the bottom of the destination well
mix_well_location = {
"origin": "bottom",
"offset": {"x": 0, "y": 0, "z": 0},
}
# Mix after transfer - implement by executing multiple aspirate/dispense
for _ in range(n_mixes):
self._execute_atomic_command(
"aspirate",
{
"pipetteId": pipette_id,
"volume": mix_volume,
"labwareId": dest_well["labwareId"],
"wellName": dest_well["wellName"],
"wellLocation": mix_well_location,
"flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'],
},
check_run_status=False,
)
self._execute_atomic_command(
"dispense",
{
"pipetteId": pipette_id,
"volume": mix_volume,
"labwareId": dest_well["labwareId"],
"wellName": dest_well["wellName"],
"wellLocation": mix_well_location,
"flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'],
},
check_run_status=False,
)
# Restore original rates
if mix_aspirate_rate is not None or mix_dispense_rate is not None:
# Reset rates to default or specified rates
if aspirate_rate is not None:
self.set_aspirate_rate(aspirate_rate, pipette_mount)
if dispense_rate is not None:
self.set_dispense_rate(dispense_rate, pipette_mount)
# 10. Blow out if specified
if blow_out:
self._execute_atomic_command(
"blowOut",
{
"pipetteId": pipette_id,
"labwareId": dest_well["labwareId"],
"wellName": dest_well["wellName"],
"wellLocation": {"origin": dest_position, "offset": offset},
},
check_run_status=False,
)
# 10b. Optionally touch the tip to destination well edge.
if touch_tip:
self._touch_tip_well(pipette_id=pipette_id, well=dest_well)
if was_shaking:
self.set_shake(shake_rpm)
# back to running :)
# 11. Drop tip if specified
if effective_drop_tip:
# see https://github.com/Opentrons/opentrons/issues/14590 for the absolute bullshit that led to this.
# in it: Opentrons incompetence
self._execute_atomic_command("moveToAddressableAreaForDropTip", {
"pipetteId": pipette_id,
"addressableAreaName": "fixedTrash",
"offset": {
"x": 0,
"y": 0,
"z": 10
},
"alternateDropLocation": False},
check_run_status=False)
self._execute_atomic_command("dropTipInPlace", {"pipetteId": pipette_id,
},
check_run_status=False)
self.has_tip = False
# Update last pipette
self.last_pipette = pipette_mount
def _touch_tip_well(self, pipette_id, well):
"""Touch tip at a well edge if supported by robot-server; otherwise move to well top."""
params = {
"pipetteId": pipette_id,
"labwareId": well["labwareId"],
"wellName": well["wellName"],
"wellLocation": {"origin": "top", "offset": {"x": 0, "y": 0, "z": 0}},
"mmFromEdge": 1,
}
try:
self._execute_atomic_command("touchTip", params, check_run_status=False)
except RuntimeError as exc:
self.log_warning(
f"touchTip command unavailable; using moveToWell fallback. Error: {exc}"
)
self._execute_atomic_command(
"moveToWell",
{
"pipetteId": pipette_id,
"labwareId": well["labwareId"],
"wellName": well["wellName"],
"wellLocation": {"origin": "top", "offset": {"x": 0, "y": 0, "z": -2}},
},
check_run_status=False,
)
def _execute_atomic_command(
self, command_type, params=None, wait_until_complete=True, timeout=None, check_run_status=True
):
"""Execute a single atomic command using the HTTP API
Args:
check_run_status: If False, skip HTTP GET check when ensuring run exists.
Passed to _ensure_run_exists for optimization.
"""
if params is None:
params = {}
# Track tip usage for pick up and drop commands
if command_type == "pickUpTip":
mount = params.get("pipetteMount")
if mount and mount in self.config["available_tips"] and self.config["available_tips"][mount]:
tiprack_id, well = self.get_tip(mount)
self.log_debug(
f"Using tip from {tiprack_id} well {well} for {mount} mount"
)
# Update the params to specify the exact tip location
params["labwareId"] = tiprack_id
params["wellName"] = well
params["wellLocation"] = {
"origin": "top",
"offset": {"x": 0, "y": 0, "z": 0},
}
del params["pipetteMount"]
else:
raise RuntimeError(f"No tips available for {mount} mount")
self.log_debug(
f"Executing atomic command: {command_type} with params: {params}"
)
# Ensure we have a valid run
run_id = self._ensure_run_exists(check_run_status=check_run_status)
# Build the query parameters
query_params = {"waitUntilComplete": wait_until_complete}
if timeout is not None:
query_params["timeout"] = timeout
try:
# Send the command
command_response = requests.post(
url=f"{self.base_url}/runs/{run_id}/commands",
params=query_params,
headers=self.headers,
json={
"data": {
"commandType": command_type,
"params": params,
"intent": "setup",
}
},
)
self._check_cmd_success(command_response)
command_data = command_response.json()["data"]
command_id = command_data["id"]
self.log_debug(
f"Command {command_id} executed with status: {command_data['status']}"
)
# If wait_until_complete is True, the command has already completed
if wait_until_complete:
if command_data["status"] == "succeeded":
return True
elif command_data["status"] in ["failed", "error"]:
error_info = command_data.get("error", "Unknown error")
self.log_error(f"Command failed: {error_info}")
raise RuntimeError(f"Command failed: {error_info}")
# If we're not waiting or the command is still running, return the command ID for tracking
return command_id
except requests.exceptions.RequestException as e:
self.log_error(f"Error executing command: {str(e)}")
raise RuntimeError(f"Error executing command: {str(e)}")
[docs]
def set_aspirate_rate(self, rate=150, pipette=None):
"""Set aspirate rate in uL/s. Default is 150 uL/s"""
self.log_info(f"Setting aspirate rate to {rate} uL/s")
# If no specific pipette is provided, update all pipettes
if pipette == 'left' or pipette is None:
self.pipette_info['left']['aspirate_flow_rate'] = rate
if pipette == 'right' or pipette is None:
self.pipette_info['right']['aspirate_flow_rate'] = rate
[docs]
def set_dispense_rate(self, rate=300, pipette=None):
"""Set dispense rate in uL/s. Default is 300 uL/s"""
self.log_info(f"Setting dispense rate to {rate} uL/s")
# If no specific pipette is provided, update all pipettes
if pipette == 'left' or pipette is None:
self.pipette_info['left']['dispense_flow_rate'] = rate
if pipette == 'right' or pipette is None:
self.pipette_info['right']['dispense_flow_rate'] = rate
[docs]
def set_gantry_speed(self, speed=400):
"""Set movement speed of gantry. Default is 400 mm/s"""
self.log_info(f"Setting gantry speed to {speed} mm/s")
# In HTTP API, this would require updating robot settings
# This is a placeholder - actual implementation would depend on HTTP API capabilities
self.log_warning(
"Setting gantry speed is not fully implemented in HTTP API mode"
)
[docs]
def get_pipette(self, volume, method="min_transfers"):
self.log_debug(f"Looking for a pipette for volume {volume}")
# Make sure we have the latest pipette information
self._update_pipettes()
pipettes = []
for mount, pipette_data in self.pipette_info.items():
if not pipette_data:
continue
min_volume = pipette_data.get("min_volume", 1)
max_volume = pipette_data.get("max_volume", 300)
if volume >= min_volume:
pipettes.append(
{
"mount": mount, # Use mount as the identifier
"min_volume": min_volume,
"max_volume": max_volume,
"name": pipette_data.get("name"),
"model": pipette_data.get("model"),
"channels": pipette_data.get("channels", 1),
"pipette_id": pipette_data.get("id"),
}
)
if not pipettes:
raise ValueError("No suitable pipettes found!\n")
# Calculate transfers and uncertainties
for pipette in pipettes:
max_volume = pipette["max_volume"]
ntransfers = ceil(volume / max_volume)
vol_per_transfer = volume / ntransfers
pipette["ntransfers"] = ntransfers
# Calculate uncertainty (simplified from original)
pipette["uncertainty"] = (
ntransfers * 0.1
) # Simplified uncertainty calculation
if self.data is not None:
self.data["transfer_method"] = method
self.data["pipette_options"] = str(pipettes)
# Choose pipette based on method
if method == "uncertainty":
pipette = min(pipettes, key=lambda x: x["uncertainty"])
elif method == "min_transfers":
min_xfers = min(pipettes, key=lambda x: x["ntransfers"])["ntransfers"]
acceptable_pipettes = [p for p in pipettes if p["ntransfers"] == min_xfers]
pipette = min(acceptable_pipettes, key=lambda x: x["max_volume"])
else:
raise ValueError(f"Pipette selection method {method} was not recognized.")
self.log_debug(f"Chosen pipette: {pipette}")
if self.data is not None:
self.data["chosen_pipette"] = str(pipette)
return pipette
[docs]
def get_aspirate_rate(self, pipette=None):
"""Get current aspirate rate for a pipette"""
if pipette is None:
# Return the rate of the first pipette found
for mount, pipette_data in self.pipette_info.items():
if pipette_data:
pipette = mount
break
if pipette is None:
return None
try:
for mount, pipette_data in self.pipette_info.items():
if mount == pipette and pipette_data:
return pipette_data.get("aspirate_flow_rate", 150)
except requests.exceptions.RequestException:
pass
return 150 # Default value
[docs]
def get_dispense_rate(self, pipette=None):
"""Get current dispense rate for a pipette"""
if pipette is None:
# Return the rate of the first pipette found
for mount, pipette_data in self.pipette_info.items():
if pipette_data:
pipette = mount
break
if pipette is None:
return None
try:
for mount, pipette_data in self.pipette_info.items():
if mount == pipette and pipette_data:
return pipette_data.get("dispense_flow_rate", 300)
except requests.exceptions.RequestException:
pass
return 300 # Default value
# HTTP API communication with heater-shaker module
[docs]
def set_shake(self, rpm, module_id = None):
self.log_info(f"Setting heater-shaker speed to {rpm} RPM")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/setAndWaitForShakeSpeed",
params= {
"moduleId": module_id,
"rpm": rpm,
},
)
[docs]
def stop_shake(self, module_id = None):
self.log_info("Stopping heater-shaker")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/deactivateShaker",
params= {
"moduleId": module_id,
},
)
[docs]
def set_shaker_temp(self, temp, module_id = None):
self.log_info(f"Setting heater-shaker temperature to {temp}°C")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/setTargetTemperature",
params= {
"moduleId": module_id,
"celsius": temp,
},
)
[docs]
def stop_shaker_heat(self, module_id = None):
self.log_info(f"Deactivating heater-shaker heating")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/deactivateHeater",
params= {
"moduleId": module_id,
},
)
[docs]
def unlatch_shaker(self, module_id = None):
self.log_info("Unlatching heater-shaker")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/openLabwareLatch",
params= {
"moduleId": module_id,
},
)
[docs]
def latch_shaker(self, module_id = None):
self.log_info("Latching heater-shaker")
if module_id is None:
module_id = self._find_module_by_type("heaterShaker")
self._execute_atomic_command("heaterShaker/closeLabwareLatch",
params= {
"moduleId": module_id,
},
)
def _find_module_by_type(self,partial_name):
module_id = None
for module in self.config["loaded_modules"].values():
if partial_name in module[1]:
module_id = module[0]
return module_id
[docs]
def get_shaker_temp(self):
self.log_info("Getting heater-shaker temperature")
# For get operations, we still need to use the modules API directly
try:
# Get modules to find the heater-shaker module
modules_response = requests.get(
url=f"{self.base_url}/modules", headers=self.headers
)
if modules_response.status_code != 200:
self.log_error(f"Failed to get modules: {modules_response.status_code}")
return f"Error getting modules: {modules_response.status_code}"
modules = modules_response.json().get("modules", [])
heater_shaker_module = next(
(m for m in modules if "heaterShaker" in m.get("moduleModel")),
None,
)
if not heater_shaker_module:
self.log_error("No heater-shaker module found")
return "No heater-shaker module found"
logging.debug(heater_shaker_module)
current_temp = heater_shaker_module.get("data", {}).get("currentTemp")
target_temp = heater_shaker_module.get("data", {}).get("targetTemp")
self.log_info(
f"Heater-shaker temperature - Current: {current_temp}°C, Target: {target_temp}°C"
)
return (current_temp,target_temp)
except Exception as e:
self.log_error(f"Error getting temperature: {str(e)}")
return f"Error: {str(e)}"
[docs]
def get_shake_rpm(self):
# For get operations, we just use the modules API
try:
# Get modules to find the heater-shaker module
modules_response = requests.get(
url=f"{self.base_url}/modules", headers=self.headers
)
if modules_response.status_code != 200:
self.log_error(f"Failed to get modules: {modules_response.status_code}")
return f"Error getting modules: {modules_response.status_code}"
modules = modules_response.json().get("modules", [])
heater_shaker_module = next(
(m for m in modules if "heaterShaker" in m.get("moduleModel")),
None,
)
if not heater_shaker_module:
self.log_error("No heater-shaker module found")
return "No heater-shaker module found"
current_rpm = heater_shaker_module.get("data", {}).get("currentSpeed")
target_rpm = heater_shaker_module.get("data", {}).get("targetSpeed")
status = heater_shaker_module.get("data", {}).get("speedStatus")
return (status,current_rpm,target_rpm)
except Exception as e:
self.log_error(f"Error getting RPM: {str(e)}")
return f"Error: {str(e)}"
[docs]
def get_shake_latch_status(self):
# For get operations, we just use the modules API
try:
# Get modules to find the heater-shaker module
modules_response = requests.get(
url=f"{self.base_url}/modules", headers=self.headers
)
if modules_response.status_code != 200:
self.log_error(f"Failed to get modules: {modules_response.status_code}")
return f"Error getting modules: {modules_response.status_code}"
modules = modules_response.json().get("modules", [])
heater_shaker_module = next(
(m for m in modules if "heaterShaker" in m.get("moduleModel")),
None,
)
if not heater_shaker_module:
self.log_error("No heater-shaker module found")
return "No heater-shaker module found"
status = heater_shaker_module.get("data", {}).get("labwareLatchStatus")
return status
except Exception as e:
self.log_error(f"Error getting RPM: {str(e)}")
return f"Error: {str(e)}"
def _create_run(self):
"""Create a run on the robot for executing commands"""
self.log_info("Creating a new run for commands")
try:
# Clear custom labware tracking so definitions are re-uploaded for the new run
self.sent_custom_labware = set()
# Create a run
import datetime
run_response = requests.post(
url=f"{self.base_url}/runs",
headers=self.headers,
)
if run_response.status_code != 201:
self.log_error(f"Failed to create run: {run_response.status_code}")
self.log_error(f"Response: {run_response.text}")
raise RuntimeError(f"Failed to create run: {run_response.text}")
self.run_id = run_response.json()["data"]["id"]
self.log_debug(f"Created run: {self.run_id}")
# Reload previously configured labware, instruments, and modules
self._reload_deck_configuration()
return self.run_id
except requests.exceptions.RequestException as e:
self.log_error(f"Error creating run: {str(e)}")
raise RuntimeError(f"Error creating run: {str(e)}")
def _reload_deck_configuration(self):
"""Reload the deck configuration (modules, labware, instruments) from persistent storage"""
self.log_info("Reloading previously configured deck setup")
# Store original configuration for recovery if needed
original_modules = self.config["loaded_modules"].copy()
original_labware = self.config["loaded_labware"].copy()
original_instruments = self.config["loaded_instruments"].copy()
old_uuid_to_slot = {}
tiprack_slots = {}
for (mount,instrument) in original_instruments.items():
tiprack_slots[mount] = [self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']]
old_uuid_to_slot.update({uuid:self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']})
# Clear current state for reloading
self.config["loaded_modules"] = {}
self.config["loaded_labware"] = {}
self.config["loaded_instruments"] = {}
try:
# Step 1: Load modules first
# We know the run exists because _create_run just created it, so skip status checks
self.log_info("Reloading modules")
for slot, (_, module_name) in original_modules.items():
try:
self.log_info(f"Reloading module {module_name} in slot {slot}")
self.load_module(module_name, slot, check_run_status=False)
# New module ID will be stored in config["loaded_modules"]
except Exception as e:
self.log_error(f"Error reloading module {module_name} in slot {slot}: {str(e)}")
raise
# Step 2: Load labware
# We know the run exists because _create_run just created it, so skip status checks
self.log_info("Reloading labware")
for slot, (_, labware_name, labware_data) in original_labware.items():
# Check if this labware is on a module
module_id = None
if str(slot) in self.config["loaded_modules"]:
module_id = self.config["loaded_modules"][str(slot)][0] # Get new module ID
try:
self.log_info(f"Reloading labware {labware_name} in slot {slot}")
self.load_labware(labware_name, slot, module=module_id, check_run_status=False)
# New labware ID will be stored in config["loaded_labware"]
except Exception as e:
self.log_error(f"Error reloading labware {labware_name} in slot {slot}: {str(e)}")
raise
# Step 3: Load instruments
# We know the run exists because _create_run just created it, so skip status checks
# Also skip pipette updates since _initialize_robot already fetched pipette info
self.log_info("Reloading instruments")
for mount, instrument_data in original_instruments.items():
instrument_name = instrument_data['name']
try:
self.log_info(f"Reloading instrument {instrument_name} on {mount} mount")
self.load_instrument(instrument_name, mount, tiprack_slots[mount], reload=True, check_run_status=False, update_pipettes=False)
# New instrument ID will be stored in config["loaded_instruments"]
except Exception as e:
self.log_error(f"Error reloading instrument {instrument_name} on {mount} mount: {str(e)}")
raise
self.log_info("Deck configuration successfully reloaded")
# Update tiprack lists
# Build slot->new_uuid mapping from new loaded_instruments
slot_to_new_tiprack_uuid = {}
for instrument in self.config["loaded_instruments"].values():
for new_uuid in instrument.get('tip_racks', []):
slot = self._slot_by_labware_uuid(new_uuid)
slot_to_new_tiprack_uuid[slot] = new_uuid
# Remap available tips
old_available_tips = self.config.get("available_tips", {})
new_available_tips = {}
for mount in self.config["loaded_instruments"].keys():
new_available_tips[mount] = []
for tiprack_uuid, well in old_available_tips.get(mount, []):
slot = old_uuid_to_slot.get(tiprack_uuid)
new_uuid = slot_to_new_tiprack_uuid.get(slot)
if new_uuid is not None:
new_available_tips[mount].append((new_uuid, well))
self.log_info(f"Remapped {len(new_available_tips[mount])} available tips for {mount} mount after reload.")
self.config["available_tips"] = new_available_tips
return True
except Exception as e:
self.log_error(f"Failed to reload deck configuration: {str(e)}")
# Restore original configuration in config
self.config["loaded_modules"] = original_modules
self.config["loaded_labware"] = original_labware
self.config["loaded_instruments"] = original_instruments
return False
def _ensure_run_exists(self, check_run_status=True):
"""Ensure a run exists for executing commands, creating one if needed
Args:
check_run_status: If False, skip HTTP GET check and return run_id if it exists.
If True, verify run status via HTTP GET request.
"""
if not hasattr(self, "run_id") or not self.run_id:
return self._create_run()
# Skip status check if requested (optimization for bulk operations)
if not check_run_status:
return self.run_id
# Check if the run is still valid
try:
response = requests.get(
url=f"{self.base_url}/runs/{self.run_id}", headers=self.headers
)
if response.status_code != 200:
# Run doesn't exist, create a new one
return self._create_run()
# Check run state
run_data = response.json()["data"]
current_state = run_data.get("status")
if current_state in ["failed", "error", "succeeded", "stopped"]:
# Run is in a terminal state, create a new one
return self._create_run()
return self.run_id
except requests.exceptions.RequestException:
# Error checking run, create a new one
return self._create_run()
[docs]
def get_tip(self, mount):
tip = self.config["available_tips"][mount].pop(0)
self.config._update_history()
return tip
[docs]
def get_tip_status(self, mount=None):
"""Get the current tip usage status"""
if mount:
if mount not in self.config["available_tips"]:
return f"No tipracks loaded for {mount} mount"
if mount not in self.config["loaded_instruments"]:
return f"No instrument defined for {mount} mount"
total_tips = len(TIPRACK_WELLS) * len(
self.config["loaded_instruments"][mount]["tip_racks"]
)
available_tips = len(self.config["available_tips"][mount])
return f"{available_tips}/{total_tips} tips available on {mount} mount"
# Return status for all mounts
status = []
for m in self.config["available_tips"]:
status.append(self.get_tip_status(m))
return "\n".join(status)
[docs]
def make_align_script(self, filename: str):
"""
Generate an Opentrons Python Protocol API script to verify alignment.
This script recreates the current deck state (modules, labware, instruments)
and performs a movement to the top of well A1 for each labware using each pipette.
This allows for visual verification of calibration and alignment.
"""
script = []
# Header
script.append("from opentrons import protocol_api")
script.append("")
script.append("metadata = {")
script.append(" 'protocolName': 'Alignment Check',")
script.append(" 'author': 'AFL Auto-Generated',")
script.append(" 'description': 'Script for aligning and testing deck configuration',")
script.append(" 'apiLevel': '2.13'")
script.append("}")
script.append("")
script.append("def run(protocol: protocol_api.ProtocolContext):")
indent = " "
# Track labware variable names by ID
labware_var_by_id = {}
# 1. Modules
loaded_modules = self.config.get("loaded_modules", {})
if loaded_modules:
script.append(f"{indent}# Modules")
# Sort by slot
for slot in sorted(loaded_modules.keys(), key=lambda x: int(x) if x.isdigit() else 99):
module_id, module_name = loaded_modules[slot]
script.append(f"{indent}module_{slot} = protocol.load_module('{module_name}', '{slot}')")
script.append("")
# 2. Labware
loaded_labware = self.config.get("loaded_labware", {})
regular_labware_vars = []
if loaded_labware:
script.append(f"{indent}# Labware")
# Sort by slot
for slot in sorted(loaded_labware.keys(), key=lambda x: int(x) if x.isdigit() else 99):
labware_id, _, labware_data = loaded_labware[slot]
# Get precise load info from definition if available
definition = labware_data.get('definition', {})
params = definition.get('parameters', {})
load_name = params.get('loadName', 'unknown_labware')
namespace = definition.get('namespace', 'opentrons')
version = definition.get('version', 1)
# Determine if tiprack
labware_type = definition.get('metadata', {}).get('displayCategory', 'default')
is_tiprack = (
params.get('isTiprack') or
labware_type == 'tipRack' or
'tiprack' in load_name.lower()
)
var_name = f"labware_{slot}"
labware_var_by_id[labware_id] = var_name
if not is_tiprack:
regular_labware_vars.append(var_name)
# Check if on module
if str(slot) in loaded_modules:
parent = f"module_{slot}"
script.append(f"{indent}{var_name} = {parent}.load_labware('{load_name}', namespace='{namespace}', version={version})")
else:
script.append(f"{indent}{var_name} = protocol.load_labware('{load_name}', '{slot}', namespace='{namespace}', version={version})")
script.append("")
# 3. Pipettes
loaded_instruments = self.config.get("loaded_instruments", {})
pipette_vars = []
if loaded_instruments:
script.append(f"{indent}# Pipettes")
for mount, instrument_data in loaded_instruments.items():
name = instrument_data['name']
tip_rack_ids = instrument_data.get('tip_racks', [])
# Resolve tip rack variables
tip_rack_vars = [labware_var_by_id[tid] for tid in tip_rack_ids if tid in labware_var_by_id]
tip_racks_arg = f"[{', '.join(tip_rack_vars)}]"
var_name = f"pipette_{mount}"
pipette_vars.append(var_name)
script.append(f"{indent}{var_name} = protocol.load_instrument('{name}', '{mount}', tip_racks={tip_racks_arg})")
script.append("")
# 4. Alignment Moves
if pipette_vars and regular_labware_vars:
script.append(f"{indent}# Alignment Verification")
script.append(f"{indent}# Move to top of A1 for each labware")
for pip in pipette_vars:
for lab in regular_labware_vars:
script.append(f"{indent}protocol.comment(f'Checking {lab} with {pip}')")
script.append(f"{indent}{pip}.move_to({lab}['A1'].top())")
script.append(f"{indent}protocol.delay(seconds=0.5)")
# Write to file
with open(filename, 'w') as f:
f.write('\n'.join(script))
self.log_info(f"Generated alignment script at {filename}")
if __name__ == "__main__":
from AFL.automation.shared.launcher import *