Source code for AFL.automation.prepare.OT2HTTPDriver

import requests
import time
import logging

import copy
import hashlib
import json
import shutil
from pathlib import Path


from math import ceil
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.prepare.OT2DeckWebAppMixin import OT2DeckWebAppMixin
from AFL.automation.shared.utilities import listify

# Add this constant at the top of the file, after the imports
TIPRACK_WELLS = [f"{row}{col}" for col in range(1, 13) for row in "ABCDEFGH"]

[docs] class OT2HTTPDriver(OT2DeckWebAppMixin, Driver): PIPETTE_NAME_ALIASES = { "p10": "p10_single", "p10_single": "p10_single", "p10_single_gen1": "p10_single", "p300": "p300_single", "p300_single": "p300_single", "p1000": "p1000_single", "p1000_single": "p1000_single", } EXPECTED_TIPRACK_TOKEN = { "p10_single": "10ul", "p300_single": "300ul", "p1000_single": "1000ul", } defaults = {} defaults["robot_ip"] = "127.0.0.1" # Default to localhost, should be overridden defaults["robot_port"] = "31950" # Default Opentrons HTTP API port defaults["loaded_labware"] = {} # Persistent storage for loaded labware defaults["loaded_instruments"] = {} # Persistent storage for loaded instruments defaults["loaded_modules"] = {} # Persistent storage for loaded modules defaults["available_tips"] = {} # Persistent storage for available tips, Format: {mount: [(tiprack_id, well_name), ...]} defaults["prep_targets"] = [] # Persistent storage for prep target well locations
[docs] def __init__(self, overrides=None): self.app = None Driver.__init__( self, name="OT2_HTTP_Driver", defaults=self.gather_defaults(), overrides=overrides, ) self.name = "OT2_HTTP_Driver" # Initialize state variables self.session_id = None self.protocol_id = None self.max_transfer = None self.min_transfer = None self.has_tip = False self.last_pipette = None self.modules = {} self.pipette_info = {} # Custom labware handling self.custom_labware_files = {} self.sent_custom_labware = {} self.custom_labware_dir = self._get_custom_labware_dir() self._load_custom_labware_defs() # Base URL for HTTP requests self.base_url = f"http://{self.config['robot_ip']}:{self.config['robot_port']}" self.headers = {"Opentrons-Version": "2"} # Initialize the robot connection self._initialize_robot() self.useful_links['View Deck'] = '/visualize_deck'
def _log(self, level, message): """Safe logging that checks if app exists before logging""" if self.app is not None and hasattr(self.app, "logger"): log_method = getattr(self.app.logger, level, None) if log_method: log_method(message) else: print(f"[{level.upper()}] {message}")
[docs] def log_info(self, message): """Log info message safely""" self._log("info", message)
[docs] def log_error(self, message): """Log error message safely""" self._log("error", message)
[docs] def log_debug(self, message): """Log debug message safely""" self._log("debug", message)
[docs] def log_warning(self, message): """Log warning message safely""" self._log("warning", message)
def _get_custom_labware_dir(self) -> Path: """Return the user-scoped custom labware directory.""" custom_labware_dir = Path.home() / ".afl" / "opentrons_labware" self._bootstrap_custom_labware_dir(custom_labware_dir) return custom_labware_dir def _get_seed_custom_labware_dir(self): """Find the packaged labware definitions used to seed first-run state.""" current = Path(__file__).resolve() for parent in current.parents: candidate = parent / "support" / "labware" if candidate.is_dir(): return candidate candidate = Path.cwd() / "support" / "labware" if candidate.is_dir(): return candidate return None def _bootstrap_custom_labware_dir(self, custom_labware_dir: Path): """Create the user labware directory and seed it once from packaged files.""" if custom_labware_dir.exists(): return custom_labware_dir.mkdir(parents=True, exist_ok=True) seed_dir = self._get_seed_custom_labware_dir() if seed_dir is None: self.log_warning( f"Custom labware seed directory not found; leaving {custom_labware_dir} empty" ) return for json_file in sorted(seed_dir.glob("*.json")): shutil.copy2(json_file, custom_labware_dir / json_file.name) def _load_custom_labware_defs(self): """Record available custom labware definitions in the user directory.""" self.custom_labware_dir.mkdir(parents=True, exist_ok=True) self.custom_labware_files = {} duplicates = [] for json_file in self.custom_labware_dir.glob("*.json"): try: with open(json_file, "r") as f: definition = json.load(f) _, _, key = self._custom_labware_key(definition) existing_path = self.custom_labware_files.get(key) if existing_path is not None and existing_path != json_file: duplicates.append((key, existing_path, json_file)) continue self.custom_labware_files[key] = json_file except Exception: continue if duplicates: details = "; ".join( f"{key}: {first} vs {second}" for key, first, second in duplicates ) raise ValueError( f"Duplicate custom labware definitions found in {self.custom_labware_dir}: {details}" ) def _custom_labware_key(self, labware_def): ns = labware_def.get("namespace", "custom_beta") load_name = labware_def.get("parameters", {}).get("loadName") if not load_name: raise ValueError("labware_def missing parameters.loadName") return ns, load_name, f"{ns}/{load_name}" def _canonical_labware_def(self, labware_def): canonical_def = copy.deepcopy(labware_def) canonical_def.pop("version", None) return json.dumps(canonical_def, sort_keys=True, separators=(",", ":")) def _hash_labware_def(self, labware_def): canonical = self._canonical_labware_def(labware_def) return hashlib.sha256(canonical.encode("utf-8")).hexdigest() def _labware_upload_info(self, key): info = self.sent_custom_labware.get(key) if isinstance(info, dict): return info return None def _next_custom_labware_version(self, key, labware_def): existing = self._labware_upload_info(key) requested_version = int(labware_def.get("version", 1) or 1) if existing is None: return requested_version return max(requested_version, int(existing["version"]) + 1) def _custom_labware_file_path(self, labware_def): _, load_name, _ = self._custom_labware_key(labware_def) return self.custom_labware_dir / f"{load_name}.json" def _loaded_labware_key(self, name, labware_data): definition = {} if isinstance(labware_data, dict): definition = labware_data.get("definition", {}) or {} try: namespace, load_name, _ = self._custom_labware_key(definition) return namespace, load_name except ValueError: if "/" in str(name): namespace, load_name = str(name).split("/", 1) return namespace, load_name return "opentrons", str(name) def _remap_tip_availability(self, mount, old_available_tips, old_uuid_to_slot): remapped = [] for tiprack_uuid, well in old_available_tips.get(mount, []): slot = old_uuid_to_slot.get(tiprack_uuid) if slot is None or slot not in self.config["loaded_labware"]: continue new_uuid = self.config["loaded_labware"][slot][0] remapped.append((new_uuid, well)) self.config["available_tips"][mount] = remapped def _reload_matching_labware_definition( self, labware_def, run_id=None, check_run_status=True ): namespace, load_name, _ = self._custom_labware_key(labware_def) matching_slots = [] original_labware = copy.deepcopy(self.config["loaded_labware"]) for slot, (labware_id, name, labware_data) in original_labware.items(): loaded_namespace, loaded_name = self._loaded_labware_key(name, labware_data) if loaded_namespace == namespace and loaded_name == load_name: matching_slots.append(str(slot)) if not matching_slots: return if run_id is None: run_id = self._ensure_run_exists(check_run_status=check_run_status) original_instruments = copy.deepcopy(self.config["loaded_instruments"]) old_available_tips = copy.deepcopy(self.config.get("available_tips", {})) old_uuid_to_slot = {} affected_mounts = {} for mount, instrument in original_instruments.items(): tiprack_slots = [] for tiprack_uuid in instrument.get("tip_racks", []): slot = self._slot_by_labware_uuid(tiprack_uuid) if slot is not None: slot = str(slot) old_uuid_to_slot[tiprack_uuid] = slot tiprack_slots.append(slot) if any(slot in matching_slots for slot in tiprack_slots): affected_mounts[mount] = { "name": instrument["name"], "tiprack_slots": tiprack_slots, } for slot in matching_slots: module_id = None if slot in self.config["loaded_modules"]: module_id = self.config["loaded_modules"][slot][0] self.log_info( f"Reloading active labware '{namespace}/{load_name}' in slot {slot}" ) self.load_labware( f"{namespace}/{load_name}", slot, module=module_id, labware_json=labware_def, check_run_status=False, ) for mount, instrument in affected_mounts.items(): self.log_info( f"Reloading pipette '{instrument['name']}' on {mount} mount after tiprack update" ) self.load_instrument( instrument["name"], mount, instrument["tiprack_slots"], reload=True, check_run_status=False, ) self._remap_tip_availability(mount, old_available_tips, old_uuid_to_slot) if affected_mounts: self.has_tip = False self.last_pipette = None def _initialize_robot(self): """Initialize the connection to the robot and get basic information""" try: # Check if the robot is reachable response = requests.get(url=f"{self.base_url}/health", headers=self.headers) if response.status_code != 200: raise ConnectionError(f"Failed to connect to robot at {self.base_url}") # Get attached pipettes self._update_pipettes() except requests.exceptions.RequestException as e: self.log_error(f"Error connecting to robot: {str(e)}") raise ConnectionError( f"Error connecting to robot at {self.base_url}: {str(e)}" ) def _update_pipettes(self): """Get information about attached pipettes and their settings""" try: if self.app is not None: self.log_info("Fetching pipette information from robot") # Get basic pipette information response = requests.get( url=f"{self.base_url}/instruments", headers=self.headers ) if response.status_code != 200: raise RuntimeError(f"Failed to get pipettes: {response.text}") pipettes_data = response.json()['data'] self.pipette_info = {} # Update min/max transfer values based on attached pipettes self.min_transfer = None self.max_transfer = None for pipette in pipettes_data: mount = pipette['mount'] try: pipette_id = self.config["loaded_instruments"][mount]["pipette_id"] # the id from this run except KeyError: pipette_id = None # Store basic pipette info self.pipette_info[mount] = { "id": pipette_id, "name": pipette["instrumentName"], "model": pipette["instrumentModel"], "serial": pipette["serialNumber"], "mount": mount, "min_volume": pipette.get("data",{}).get("min_volume", None), "max_volume": pipette.get("data",{}).get("max_volume", None), "aspirate_flow_rate": pipette.get("data",{}).get( "aspirateFlowRate", {} ).get("value",150), "dispense_flow_rate": pipette.get("data",{}).get( "dispenseFlowRate", {} ).get("value",150), "channels": pipette.get("data",{}).get("channels", 1), } if pipette_id is None: continue # Update global min/max transfer values min_volume = self.pipette_info[mount]['min_volume'] max_volume = self.pipette_info[mount]['max_volume'] if (self.min_transfer is None) or (self.min_transfer > min_volume): self.min_transfer = min_volume if self.app is not None: self.log_info( f"Setting minimum transfer to {self.min_transfer}" ) if (self.max_transfer is None) or (self.max_transfer < max_volume): self.max_transfer = max_volume if self.app is not None: self.log_info( f"Setting maximum transfer to {self.max_transfer}" ) if self.app is not None: self.log_info(f"Pipette information updated: {self.pipette_info}") except Exception as e: raise RuntimeError(f"Error getting pipettes: {str(e)}") def _get_active_pipettes(self): """Return pipettes that are both physically attached and loaded into the active run.""" active_pipettes = {} loaded_instruments = self.config.get("loaded_instruments", {}) for mount, info in self.pipette_info.items(): if not info: continue if mount not in loaded_instruments: continue if info.get("id") is None: continue active_pipettes[mount] = info return active_pipettes def _get_active_pipette_info(self, mount): mount = str(mount).strip().lower() info = self._get_active_pipettes().get(mount) if info is None: raise ValueError(f"No loaded pipette available on {mount} mount") return info
[docs] def reset_prep_targets(self): """Clear the list of preparation targets stored in the config.""" self.config["prep_targets"] = []
[docs] def add_prep_targets(self, targets, reset=False): """Add well locations to the preparation target list.""" if reset: self.reset_prep_targets() self.config.setdefault("prep_targets", []) self.config["prep_targets"].extend(listify(targets)) self.config._update_history()
[docs] def get_prep_target(self): return self.config["prep_targets"].pop(0)
[docs] def status(self): status = [] prep_targets = self.config.get("prep_targets", []) if len(prep_targets) > 0: status.append(f"Next prep target: {prep_targets[0]}") status.append(f"Remaining prep targets: {len(prep_targets)}") else: status.append("No prep targets loaded") status.append(self.get_tip_status()) # Get current session status if available if self.session_id: try: response = requests.get( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) if response.status_code == 200: session_data = response.json().get("data", {}) current_state = session_data.get("details", {}).get( "currentState", "unknown" ) status.append(f"Session state: {current_state}") except requests.exceptions.RequestException: status.append("Unable to get session status") # Get pipette information for mounts that are loaded into the active run for mount, pipette in self._get_active_pipettes().items(): if pipette: status.append( f"Pipette on {mount} mount: {pipette.get('model', 'unknown')}" ) # Get loaded labware information try: for slot, (labware_id, name, _) in self.config["loaded_labware"].items(): status.append(f"Labware in slot {slot}: {name}") except Exception: print(self.config["loaded_labware"]) return status
[docs] @Driver.quickbar( qb={ "button_text": "Refill Tipracks", "params": { "mount": { "label": "Which Pipet left/right/both", "type": "text", "default": "both", }, }, } ) def reset_tipracks(self, mount="both"): """Reset the available tips for the specified mount(s)""" self.log_info(f"Resetting tipracks for {mount} mount") mounts_to_reset = [] if mount == "both": mounts_to_reset = list(self.config["loaded_instruments"].keys()) else: mounts_to_reset = [mount] for m in mounts_to_reset: if m in self.config["loaded_instruments"]: # Reinitialize available tips for this mount self.config["available_tips"][m] = [] for tiprack in self.config["loaded_instruments"][m]["tip_racks"]: for well in TIPRACK_WELLS: self.config["available_tips"][m].append((tiprack, well)) self.log_info(f"Reset {len(self.config['available_tips'][m])} tips for {m} mount") # Reset tip status self.has_tip = False
[docs] def reset(self): self.log_info("Resetting the protocol context") # Delete any active session if self.session_id: try: requests.delete( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting session: {str(e)}") # Delete any uploaded protocol if self.protocol_id: try: requests.delete( url=f"{self.base_url}/protocols/{self.protocol_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting protocol: {str(e)}") # Reset state variables self.session_id = None self.protocol_id = None self.has_tip = False self.last_pipette = None # Reset deck configuration too self.reset_deck() # Re-initialize robot connection self._initialize_robot()
[docs] def reset_deck(self): """Reset the deck configuration, clearing loaded labware, instruments, and modules""" self.log_info("Resetting the deck configuration") # Clear the deck configuration self.config["loaded_labware"] = {} self.config["loaded_instruments"] = {} self.config["loaded_modules"] = {} self.config["available_tips"] = {} self.config["prep_targets"] = [] # Clear internal state variables self.modules = {} self.sent_custom_labware = {} self.run_id = None
[docs] @Driver.quickbar(qb={"button_text": "Home"}) def home(self, **kwargs): """ Home the robot's axes using the dedicated /robot/home endpoint. This endpoint is a direct control endpoint and doesn't require creating a run. It can be used to home all axes at once or specific axes as needed. """ self.log_info("Homing the robot's axes") try: # Call the dedicated home endpoint response = requests.post( url=f"{self.base_url}/robot/home", headers=self.headers, json={ "target": "robot", # Home the entire robot }, ) if response.status_code != 200: self.log_error(f"Failed to home robot: {response.status_code}") self.log_error(f"Response: {response.text}") raise RuntimeError(f"Failed to home robot: {response.text}") self.log_info("Robot homing completed successfully") return True except requests.exceptions.RequestException as e: self.log_error(f"Error during homing: {str(e)}") raise RuntimeError(f"Error during homing: {str(e)}")
[docs] def parse_well(self, loc): """Parse a well location string into slot and well components""" # Default value in case no alphabetic character is found i = 0 for i, loc_part in enumerate(list(loc)): if loc_part.isalpha(): break slot = loc[:i] well = loc[i:] return slot, well
[docs] def get_wells(self, locs): """Convert location strings to well objects with proper labware IDs, and check that wells are valid. Args: locs: Single location string or list of location strings in format "slotwell" (e.g. "1A1") Returns: List of well objects with labwareId and wellName Raises: ValueError: If labware is not found in the specified slot """ self.log_debug(f"Converting locations to well objects: {locs}") wells = [] for loc in listify(locs): slot, well = self.parse_well(loc) # Get labware info from the slot labware_info = self.config['loaded_labware'].get(slot) if not labware_info: raise ValueError(f"No labware found in slot {slot}") if not isinstance(labware_info, tuple) or len(labware_info) < 1: raise ValueError(f"Invalid labware info format in slot {slot}") labware_id = labware_info[0] wells.append({"labwareId": labware_id, "wellName": well}) self.log_debug(f"Created well objects: {wells}") # Check well validity here assert slot in self.config["loaded_labware"].keys(), f"Slot {slot} does not have any loaded labware" assert well in self.config["loaded_labware"][slot][2]['definition']['wells'].keys(), f"Well {well} is not a valid well for slot {slot}, {self.config['loaded_labware'][slot][2]['definition']['metadata']['displayName']}" return wells
def _check_cmd_success(self, response): if response.status_code != 201: self.log_error( f"Failed to execute command : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Failed to execute command: {response.text}" ) if 'status' in response.json()['data'].keys(): if response.json()['data']['status'] == 'failed': self.log_error( f"Command returned error : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Command returned error: {response.text}" )
[docs] def send_labware( self, labware_def, check_run_status=True, reload_loaded_labware=True ): """Send a custom labware definition to the current run and persist it. Args: check_run_status: If False, skip HTTP GET check when ensuring run exists. Passed to _ensure_run_exists for optimization. """ self.log_debug(f"Sending custom labware definition: {labware_def}") ns, load_name, key = self._custom_labware_key(labware_def) content_hash = self._hash_labware_def(labware_def) # Persist the definition for future use self.custom_labware_dir.mkdir(parents=True, exist_ok=True) file_path = self._custom_labware_file_path(labware_def) with open(file_path, "w") as f: json.dump(labware_def, f, indent=2) self._load_custom_labware_defs() existing = self._labware_upload_info(key) if existing is not None and existing.get("content_hash") == content_hash: self.log_debug( f"Labware {key} already sent to robot as version {existing['version']}" ) return copy.deepcopy(existing) # Ensure we have a valid run run_id = self._ensure_run_exists(check_run_status=check_run_status) try: upload_def = copy.deepcopy(labware_def) upload_version = self._next_custom_labware_version(key, upload_def) upload_def["version"] = upload_version command_dict = {"data": upload_def} response = requests.post( url=f"{self.base_url}/runs/{run_id}/labware_definitions", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) response_data = response.json() labware_name = response_data["data"]["definitionUri"] upload_info = { "definition_uri": labware_name, "version": upload_version, "content_hash": content_hash, } self.sent_custom_labware[key] = upload_info if reload_loaded_labware: self._reload_matching_labware_definition( upload_def, run_id=run_id, check_run_status=False ) self.log_info( f"Successfully sent custom labware with name/URI {labware_name}" ) return copy.deepcopy(upload_info) except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error sending custom labware: {str(e)}") raise RuntimeError(f"Error sending custom labware: {str(e)}")
[docs] def load_labware(self, name, slot, module=None, check_run_status=True, **kwargs): """Load labware (containers, tipracks) into the protocol using HTTP API Args: check_run_status: If False, skip HTTP GET check when ensuring run exists. Passed to _ensure_run_exists and send_labware for optimization. """ self.log_debug(f"Loading labware '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists(check_run_status=check_run_status) labware_json = kwargs.pop("labware_json", None) version = 1 if labware_json is not None: namespace = labware_json.get("namespace", "custom_beta") load_name = labware_json.get("parameters", {}).get("loadName") if not load_name: raise ValueError("labware_json missing parameters.loadName") name = load_name version = int(labware_json.get("version", 1) or 1) if namespace != "opentrons": upload_info = self.send_labware( labware_json, check_run_status=check_run_status, reload_loaded_labware=False, ) version = int(upload_info["version"]) else: if "/" in name: namespace, load_name = name.split("/", 1) name = load_name else: load_name = name if f"custom_beta/{load_name}" in self.custom_labware_files: namespace = "custom_beta" elif f"opentrons/{load_name}" in self.custom_labware_files: namespace = "opentrons" else: namespace = "opentrons" key = f"{namespace}/{load_name}" if namespace != "opentrons": path = self.custom_labware_files.get(key) if path and Path(path).exists(): with open(path, "r") as f: definition = json.load(f) upload_info = self.send_labware( definition, check_run_status=check_run_status, reload_loaded_labware=False, ) version = int(upload_info["version"]) else: self.log_warning(f"Custom labware definition not found for {key}") try: # Check if there's existing labware in the slot if slot in self.config["loaded_labware"]: self.log_info( f"Found existing labware in slot {slot}, moving it off-deck first" ) existing_labware_id = self.config["loaded_labware"][slot][ 0 ] # Get the ID of existing labware # Create command to move existing labware off-deck move_command = { "data": { "commandType": "moveLabware", "params": { "labwareId": existing_labware_id, "newLocation": "offDeck", "strategy": "manualMoveWithoutPause", # Allow user to manually move the labware }, "intent": "setup", } } # Execute the move command move_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=move_command, ) self._check_cmd_success(move_response) # Remove from our tracking del self.config["loaded_labware"][slot] if str(slot) in self.config["loaded_modules"].keys(): # we need to load into a module, not a slot location = {"moduleId": self.config["loaded_modules"][str(slot)][0]} else: location = {"slotName": str(slot)} # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadLabware", "params": { "location": location, "loadName": name, "namespace": namespace, "version": version, }, "intent": "setup", } } # If this is a module, we need to specify the moduleId if module: command_dict["data"]["params"]["moduleId"] = module # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: labware_id = response_data["data"]["result"]["labwareId"] elif "data" in response_data and "labwareId" in response_data["data"]: labware_id = response_data["data"]["labwareId"] elif "data" in response_data and "id" in response_data["data"]: labware_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "labwareId" in value: labware_id = value["labwareId"] break else: raise KeyError("Could not find labwareId in response") except KeyError as e: self.log_error(f"Error extracting labware ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract labware ID from response: {str(e)}" ) result = response_data["data"]["result"] # Store the labware information directly in config self.config["loaded_labware"][slot] = (labware_id, name, result) # If this is a module, store it if module: self.modules[slot] = module self.log_info( f"Successfully loaded labware '{name}' in slot {slot} with ID {labware_id}" ) self.config._update_history() return labware_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading labware: {str(e)}") raise RuntimeError(f"Error loading labware: {str(e)}")
[docs] def load_module(self, name, slot, check_run_status=True, **kwargs): """Load modules (heater-shaker, tempdeck) into the protocol using HTTP API Args: check_run_status: If False, skip HTTP GET check when ensuring run exists. Passed to _ensure_run_exists for optimization. """ self.log_debug(f"Loading module '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists(check_run_status=check_run_status) try: if slot in self.config["loaded_modules"].keys(): # todo: check if same module raise RuntimeError(f"Module already loaded in slot {slot}: {self.config['loaded_modules']['slot']}. Overwrite not supported.") # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadModule", "params": { "location": {"slotName": str(slot)}, "model": name, }, "intent": "setup", } } # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: module_id = response_data["data"]["result"]["moduleId"] elif "data" in response_data and "moduleId" in response_data["data"]: module_id = response_data["data"]["moduleId"] elif "data" in response_data and "id" in response_data["data"]: module_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "moduleId" in value: module_id = value["moduleId"] break else: raise KeyError("Could not find moduleId in response") except KeyError as e: self.log_error(f"Error extracting module ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract module ID from response: {str(e)}" ) # Store the module information directly in config self.config["loaded_modules"][str(slot)] = (module_id, name) self.log_info( f"Successfully loaded module '{name}' in slot {slot} with ID {module_id}" ) self.config._update_history() return module_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading module: {str(e)}") raise RuntimeError(f"Error loading module: {str(e)}")
[docs] def load_instrument(self, name, mount, tip_rack_slots, reload=False, check_run_status=True, update_pipettes=True, **kwargs): """Load pipette and store tiprack information using HTTP API. Args: check_run_status: If False, skip HTTP GET check when ensuring run exists. Passed to _ensure_run_exists for optimization. update_pipettes: If False, skip calling _update_pipettes (for optimization during reload). """ pipette_name = self._normalize_pipette_name(name) mount = str(mount).strip().lower() if mount not in {"left", "right"}: raise ValueError(f"Mount must be 'left' or 'right'. Received: {mount!r}") tip_rack_slots = [str(slot) for slot in listify(tip_rack_slots)] if len(tip_rack_slots) == 0: raise ValueError("At least one tip rack slot must be provided.") for slot in tip_rack_slots: if slot not in self.config["loaded_labware"]: raise ValueError( f"Tip rack slot {slot!r} is not loaded. Load a tiprack first." ) labware_name = str(self.config["loaded_labware"][slot][1]).lower() if "tiprack" not in labware_name: self.log_warning( f"Slot {slot} contains labware '{labware_name}', which may not be a tiprack." ) self._warn_on_tiprack_mismatch(pipette_name, tip_rack_slots) self.log_debug( f"Loading pipette '{pipette_name}' on '{mount}' mount with tip_racks in slots {tip_rack_slots}" ) # Ensure we have a valid run run_id = self._ensure_run_exists(check_run_status=check_run_status) try: # First, load the pipette using the HTTP API command_dict = { "data": { "commandType": "loadPipette", "params": { "pipetteName": pipette_name, "mount": mount, "tip_racks": [self.config["loaded_labware"][str(slot)][0] for slot in tip_rack_slots], }, "intent": "setup", } } # Execute the loadPipette command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the pipette ID from the response response_data = response.json() logging.debug(f'loadPipette response: {response_data}') pipette_id = response_data["data"]["result"]["pipetteId"] # Make sure we have the latest pipette information (unless disabled for optimization) if update_pipettes: self._update_pipettes() # Ensure pipette_info entry exists before patching if mount not in self.pipette_info: self.pipette_info[mount] = {} self.pipette_info[mount][ "id" ] = pipette_id # patch the correct pipette id to the pipette_info dict # Get the tip rack IDs - note that loaded_labware now stores tuples of (id, name) tip_racks = [] for slot in listify(tip_rack_slots): labware_info = self.config["loaded_labware"].get(slot) if ( labware_info and isinstance(labware_info, tuple) and len(labware_info) >= 1 ): tip_racks.append(labware_info[0]) if not tip_racks: self.log_warning(f"No valid tip racks found in slots {tip_rack_slots}") # Store the instrument information self.config["loaded_instruments"][mount] = { "name": pipette_name, "pipette_id": pipette_id, "tip_racks": tip_racks, } # If not reloading, initialize available tips for this mount if not reload: self.config["available_tips"][mount] = [] for tiprack in tip_racks: for well in TIPRACK_WELLS: self.config["available_tips"][mount].append((tiprack, well)) # Verify that there's actually a pipette in this mount if mount not in self.pipette_info or self.pipette_info[mount] is None: self.log_warning( f"No physical pipette detected in {mount} mount, but pipette information stored" ) # Update min/max values for largest and smallest pipettes self._update_pipette_ranges() self.log_info( f"Successfully loaded pipette '{pipette_name}' on {mount} mount with ID {pipette_id}" ) self.config._update_history() return pipette_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading pipette: {str(e)}") raise RuntimeError(f"Error loading pipette: {str(e)}")
def _normalize_pipette_name(self, name): key = str(name).strip().lower() normalized = self.PIPETTE_NAME_ALIASES.get(key, key) return normalized def _warn_on_tiprack_mismatch(self, pipette_name, tip_rack_slots): token = self.EXPECTED_TIPRACK_TOKEN.get(pipette_name) if token is None: return mismatched_slots = [] for slot in tip_rack_slots: labware_info = self.config["loaded_labware"].get(str(slot)) if labware_info is None: continue labware_name = str(labware_info[1]).lower() if "tiprack" in labware_name and token not in labware_name: mismatched_slots.append(slot) if mismatched_slots: self.log_warning( f"Loaded pipette '{pipette_name}' with potentially mismatched tipracks in slots {mismatched_slots}. " f"Expected tiprack names containing '{token}'." ) def _update_pipette_ranges(self): """Update the min/max values for largest and smallest pipettes""" self.min_largest_pipette = None self.max_smallest_pipette = None # Get all available pipettes with their volumes available_pipettes = self._get_active_pipettes() if available_pipettes: # Get min and max volumes for each pipette min_vols = { mount: info.get("min_volume", float("inf")) for mount, info in available_pipettes.items() } max_vols = { mount: info.get("max_volume", 0) for mount, info in available_pipettes.items() } # Find the smallest and largest pipettes if max_vols: # Use list and regular max/min functions with a key function mounts = list(max_vols.keys()) if mounts: largest_pipette_mount = max( mounts, key=lambda m: max_vols.get(m, 0) ) smallest_pipette_mount = min( mounts, key=lambda m: max_vols.get(m, float("inf")) ) # Set global min/max values if min_vols and largest_pipette_mount in min_vols: self.min_largest_pipette = min_vols[largest_pipette_mount] self.log_info( f"Setting min_largest_pipette to {self.min_largest_pipette}" ) if max_vols and smallest_pipette_mount in max_vols: self.max_smallest_pipette = max_vols[smallest_pipette_mount] self.log_info( f"Setting max_smallest_pipette to {self.max_smallest_pipette}" )
[docs] def mix(self, volume, location, repetitions=1, **kwargs): self.log_info(f"Mixing {volume}uL {repetitions} times at {location}") # Verify run exists once at the start, then skip checks for all atomic commands self._ensure_run_exists() # Get pipette based on volume pipette = self.get_pipette(volume) pipette_mount = pipette["mount"] # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get well location wells = self.get_wells(location) if not wells: raise ValueError("Invalid location") well = wells[0] # Pick up tip if needed if not self.has_tip: self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack }, check_run_status=False, ) self.has_tip = True # Execute mix by performing repetitions of aspirate/dispense for _ in range(repetitions): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, check_run_status=False, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, check_run_status=False, )
def _split_up_transfers(self, vol): """Split up transfer volumes based on pipette constraints""" transfers = [] if self.max_transfer is None or vol <= 0: return transfers while sum(transfers) < vol: transfer = min(self.max_transfer, vol - sum(transfers)) # Handle case where remaining volume is less than minimum transfer if ( transfer < (self.min_transfer or 0) and len(transfers) > 0 and transfers[-1] >= (2 * (self.min_transfer or 0)) ): transfers[-1] -= (self.min_transfer or 0) - transfer transfer = self.min_transfer or 0 # Handle "valley of death" case - when transfer is between pipette ranges if ( self.min_largest_pipette is not None and self.max_smallest_pipette is not None and transfer < self.min_largest_pipette and transfer > self.max_smallest_pipette ): transfer = ( self.max_smallest_pipette ) # Use smaller pipette at max capacity if transfer <= 0: self.log_warning( f"Computed nonpositive transfer volume {transfer}uL while splitting {vol}uL; stopping split." ) break transfers.append(transfer) # Exit condition - we've reached the target volume if sum(transfers) >= vol: break return transfers def _slot_by_labware_uuid(self,target_uuid): for slot, (uuid,name,_) in self.config["loaded_labware"].items(): if uuid == target_uuid: return slot return None
[docs] @Driver.quickbar( qb={ "button_text": "Transfer", "params": { "source": {"label": "Source Well", "type": "text", "default": "1A1"}, "dest": {"label": "Dest Well", "type": "text", "default": "1A1"}, "volume": {"label": "Volume (uL)", "type": "float", "default": 300}, }, } ) def transfer( self, source, dest, volume, mix_before=None, mix_after=None, air_gap=0, aspirate_rate=None, dispense_rate=None, mix_aspirate_rate=None, mix_dispense_rate=None, blow_out=False, post_aspirate_delay=0.0, aspirate_equilibration_delay=0.0, post_dispense_delay=0.0, drop_tip=True, force_new_tip=False, to_top=True, to_center=False, to_top_z_offset=0, fast_mixing=False, touch_tip=False, **kwargs, ): """Transfer fluid from one location to another using atomic HTTP API commands""" self.log_info(f"Transferring {volume}uL from {source} to {dest}") # Accept common aliases used by different callers. if "blowout" in kwargs and not blow_out: blow_out = bool(kwargs["blowout"]) if "touchTip" in kwargs and not touch_tip: touch_tip = bool(kwargs["touchTip"]) volume_ul = float(volume) if volume_ul <= 0: self.log_info(f"Skipping transfer with nonpositive volume {volume_ul}uL from {source} to {dest}") return { "source": source, "dest": dest, "requested_volume_ul": volume_ul, "subtransfers_ul": [], "status": "skipped_nonpositive_volume", } # Verify run exists once at the start, then skip checks for all atomic commands self._ensure_run_exists() # Set flow rates if specified if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate) if dispense_rate is not None: self.set_dispense_rate(dispense_rate) # Get pipette based on volume pipette = self.get_pipette(volume_ul) pipette_mount = pipette["mount"] # Get the mount from the pipette object # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get source and destination wells source_wells = self.get_wells(source) if len(source_wells) > 1: raise ValueError("Transfer only accepts one source well at a time!") source_well = source_wells[0] dest_wells = self.get_wells(dest) if len(dest_wells) > 1: raise ValueError("Transfer only accepts one dest well at a time!") dest_well = dest_wells[0] # Handle special cases for well positions source_position = "bottom" # Default position dest_position = "bottom" # Default position if to_top and to_center: raise ValueError("Cannot dispense to_top and to_center simultaneously") elif to_top: dest_position = "top" elif to_center: dest_position = "center" # Split transfers if needed transfers = self._split_up_transfers(volume_ul) transfer_record = { "source": source, "dest": dest, "requested_volume_ul": volume_ul, "subtransfers_ul": [], "subtransfer_count": 0, "pipette_mount": pipette_mount, "pipette_name": pipette.get("name"), "pipette_id": pipette_id, "source_well": { "labware_id": source_well["labwareId"], "well_name": source_well["wellName"], "position": source_position, }, "dest_well": { "labware_id": dest_well["labwareId"], "well_name": dest_well["wellName"], "position": dest_position, "offset": {"x": 0, "y": 0, "z": to_top_z_offset if dest_position == "top" else 0}, }, "options": { "mix_before": list(mix_before) if mix_before is not None else None, "mix_after": list(mix_after) if mix_after is not None else None, "air_gap": air_gap, "aspirate_rate": aspirate_rate, "dispense_rate": dispense_rate, "mix_aspirate_rate": mix_aspirate_rate, "mix_dispense_rate": mix_dispense_rate, "blow_out": blow_out, "post_aspirate_delay": post_aspirate_delay, "aspirate_equilibration_delay": aspirate_equilibration_delay, "post_dispense_delay": post_dispense_delay, "drop_tip": drop_tip, "force_new_tip": force_new_tip, "to_top": to_top, "to_center": to_center, "to_top_z_offset": to_top_z_offset, "fast_mixing": fast_mixing, "touch_tip": touch_tip, }, "status": "executed", } for i, sub_volume in enumerate(transfers): if sub_volume <= 0: self.log_warning( f"Skipping nonpositive sub-transfer volume {sub_volume}uL from {source} to {dest}" ) continue transfer_record["subtransfers_ul"].append(float(sub_volume)) # Intermediate split transfers should not drop the tip unless explicitly forced. is_last_subtransfer = i == (len(transfers) - 1) effective_drop_tip = drop_tip if (is_last_subtransfer or force_new_tip) else False # Keep tip handling consistent with the non-HTTP driver: # reuse the current tip across split transfers unless force_new_tip is set. if force_new_tip and self.has_tip: tip_mount = self.last_pipette if self.last_pipette is not None else pipette_mount tip_pipette_id = self.pipette_info.get(tip_mount, {}).get("id", pipette_id) self._execute_atomic_command( "moveToAddressableAreaForDropTip", { "pipetteId": tip_pipette_id, "addressableAreaName": "fixedTrash", "offset": {"x": 0, "y": 0, "z": 10}, "alternateDropLocation": False, }, check_run_status=False, ) self._execute_atomic_command( "dropTipInPlace", {"pipetteId": tip_pipette_id}, check_run_status=False, ) self.has_tip = False # If a tip is on a different mount, drop it before switching mounts. if self.has_tip and self.last_pipette not in (None, pipette_mount): tip_pipette_id = self.pipette_info.get(self.last_pipette, {}).get("id", pipette_id) self._execute_atomic_command( "moveToAddressableAreaForDropTip", { "pipetteId": tip_pipette_id, "addressableAreaName": "fixedTrash", "offset": {"x": 0, "y": 0, "z": 10}, "alternateDropLocation": False, }, check_run_status=False, ) self._execute_atomic_command( "dropTipInPlace", {"pipetteId": tip_pipette_id}, check_run_status=False, ) self.has_tip = False if not self.has_tip: self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack, will be updated in _execute_atomic_command }, check_run_status=False, ) self.has_tip = True self.last_pipette = pipette_mount # 1a. If destination is on a heater-shaker, stop the shaking and latch the latch pre-flight was_shaking = False dest_well_slot = self._slot_by_labware_uuid(dest_well['labwareId']) source_well_slot = self._slot_by_labware_uuid(source_well['labwareId']) heater_shaker_slots = [slot for (slot,(uuid,name)) in self.config["loaded_modules"].items() if "heaterShaker" in name] if dest_well_slot in heater_shaker_slots or source_well_slot in heater_shaker_slots: # latch heater-shaker # this is contextual, maybe - seems to not cause trouble to run without conditional #if 'closed' not in self.get_shake_latch_status(): self.latch_shaker() # store current shake rpm and stop shake if self.get_shake_rpm()[0] != 'idle': shake_rpm = self.get_shake_rpm()[2] was_shaking = True self.stop_shake() # 2. Mix before if specified if mix_before is not None: n_mixes, mix_volume = mix_before # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix before transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, check_run_status=False, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, check_run_status=False, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 3. Aspirate self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": sub_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, check_run_status=False, ) # 4. Aspirate equilibration delay (while tip is in liquid) if aspirate_equilibration_delay > 0: time.sleep(aspirate_equilibration_delay) # self._execute_atomic_command("delay", {"seconds": aspirate_equilibration_delay}) # 5. Move tip above liquid and post-aspirate delay (tip above liquid) self._execute_atomic_command( "moveToWell", { "pipetteId": pipette_id, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": "top", "offset": {"x": 0, "y": 0, "z": 0}, }, }, check_run_status=False, ) if post_aspirate_delay > 0: time.sleep(post_aspirate_delay) # self._execute_atomic_command("delay", {"seconds": post_aspirate_delay}) # 6. Air gap if specified if air_gap > 0: # Air gap is implemented as aspirate at the top of the source well self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": air_gap, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": "top", "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, check_run_status=False, ) # 7. Dispense offset = { "x": 0, "y": 0, "z": ( to_top_z_offset if dest_position == "top" and to_top_z_offset != 0 else 0 ), } self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": sub_volume + air_gap, # Include air gap in dispense volume "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, check_run_status=False, ) # 8. Post-dispense delay if post_dispense_delay > 0: time.sleep(post_dispense_delay) # self._execute_atomic_command("delay", {"seconds": post_dispense_delay}) # 9. Mix after if specified if mix_after is not None: n_mixes, mix_volume = mix_after # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix after transfer should be performed from the bottom of the destination well mix_well_location = { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, } # Mix after transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": mix_well_location, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, check_run_status=False, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": mix_well_location, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, check_run_status=False, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 10. Blow out if specified if blow_out: self._execute_atomic_command( "blowOut", { "pipetteId": pipette_id, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, }, check_run_status=False, ) # 10b. Optionally touch the tip to destination well edge. if touch_tip: self._touch_tip_well(pipette_id=pipette_id, well=dest_well) if was_shaking: self.set_shake(shake_rpm) # back to running :) # 11. Drop tip if specified if effective_drop_tip: # see https://github.com/Opentrons/opentrons/issues/14590 for the absolute bullshit that led to this. # in it: Opentrons incompetence self._execute_atomic_command("moveToAddressableAreaForDropTip", { "pipetteId": pipette_id, "addressableAreaName": "fixedTrash", "offset": { "x": 0, "y": 0, "z": 10 }, "alternateDropLocation": False}, check_run_status=False) self._execute_atomic_command("dropTipInPlace", {"pipetteId": pipette_id, }, check_run_status=False) self.has_tip = False # Update last pipette self.last_pipette = pipette_mount transfer_record["subtransfer_count"] = len(transfer_record["subtransfers_ul"]) return transfer_record
def _touch_tip_well(self, pipette_id, well): """Touch tip at a well edge if supported by robot-server; otherwise move to well top.""" params = { "pipetteId": pipette_id, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": {"origin": "top", "offset": {"x": 0, "y": 0, "z": 0}}, "mmFromEdge": 1, } try: self._execute_atomic_command("touchTip", params, check_run_status=False) except RuntimeError as exc: self.log_warning( f"touchTip command unavailable; using moveToWell fallback. Error: {exc}" ) self._execute_atomic_command( "moveToWell", { "pipetteId": pipette_id, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": {"origin": "top", "offset": {"x": 0, "y": 0, "z": -2}}, }, check_run_status=False, ) def _execute_atomic_command( self, command_type, params=None, wait_until_complete=True, timeout=None, check_run_status=True ): """Execute a single atomic command using the HTTP API Args: check_run_status: If False, skip HTTP GET check when ensuring run exists. Passed to _ensure_run_exists for optimization. """ if params is None: params = {} # Track tip usage for pick up and drop commands if command_type == "pickUpTip": mount = params.get("pipetteMount") if mount and mount in self.config["available_tips"] and self.config["available_tips"][mount]: tiprack_id, well = self.get_tip(mount) self.log_debug( f"Using tip from {tiprack_id} well {well} for {mount} mount" ) # Update the params to specify the exact tip location params["labwareId"] = tiprack_id params["wellName"] = well params["wellLocation"] = { "origin": "top", "offset": {"x": 0, "y": 0, "z": 0}, } del params["pipetteMount"] else: raise RuntimeError(f"No tips available for {mount} mount") self.log_debug( f"Executing atomic command: {command_type} with params: {params}" ) # Ensure we have a valid run run_id = self._ensure_run_exists(check_run_status=check_run_status) # Build the query parameters query_params = {"waitUntilComplete": wait_until_complete} if timeout is not None: query_params["timeout"] = timeout try: # Send the command command_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", params=query_params, headers=self.headers, json={ "data": { "commandType": command_type, "params": params, "intent": "setup", } }, ) self._check_cmd_success(command_response) command_data = command_response.json()["data"] command_id = command_data["id"] self.log_debug( f"Command {command_id} executed with status: {command_data['status']}" ) # If wait_until_complete is True, the command has already completed if wait_until_complete: if command_data["status"] == "succeeded": return True elif command_data["status"] in ["failed", "error"]: error_info = command_data.get("error", "Unknown error") self.log_error(f"Command failed: {error_info}") raise RuntimeError(f"Command failed: {error_info}") # If we're not waiting or the command is still running, return the command ID for tracking return command_id except requests.exceptions.RequestException as e: self.log_error(f"Error executing command: {str(e)}") raise RuntimeError(f"Error executing command: {str(e)}")
[docs] def set_aspirate_rate(self, rate=150, pipette=None): """Set aspirate rate in uL/s. Default is 150 uL/s""" self.log_info(f"Setting aspirate rate to {rate} uL/s") if pipette is None: active_pipettes = self._get_active_pipettes() if not active_pipettes: self.log_warning("No loaded pipettes available to update aspirate rate") return for info in active_pipettes.values(): info["aspirate_flow_rate"] = rate return self._get_active_pipette_info(pipette)["aspirate_flow_rate"] = rate
[docs] def set_dispense_rate(self, rate=300, pipette=None): """Set dispense rate in uL/s. Default is 300 uL/s""" self.log_info(f"Setting dispense rate to {rate} uL/s") if pipette is None: active_pipettes = self._get_active_pipettes() if not active_pipettes: self.log_warning("No loaded pipettes available to update dispense rate") return for info in active_pipettes.values(): info["dispense_flow_rate"] = rate return self._get_active_pipette_info(pipette)["dispense_flow_rate"] = rate
[docs] def set_gantry_speed(self, speed=400): """Set movement speed of gantry. Default is 400 mm/s""" self.log_info(f"Setting gantry speed to {speed} mm/s") # In HTTP API, this would require updating robot settings # This is a placeholder - actual implementation would depend on HTTP API capabilities self.log_warning( "Setting gantry speed is not fully implemented in HTTP API mode" )
[docs] def get_pipette(self, volume, method="min_transfers"): self.log_debug(f"Looking for a pipette for volume {volume}") # Make sure we have the latest pipette information self._update_pipettes() pipettes = [] for mount, pipette_data in self._get_active_pipettes().items(): if not pipette_data: continue min_volume = pipette_data.get("min_volume", 1) max_volume = pipette_data.get("max_volume", 300) if volume >= min_volume: pipettes.append( { "mount": mount, # Use mount as the identifier "min_volume": min_volume, "max_volume": max_volume, "name": pipette_data.get("name"), "model": pipette_data.get("model"), "channels": pipette_data.get("channels", 1), "pipette_id": pipette_data.get("id"), } ) if not pipettes: raise ValueError("No suitable loaded pipettes found!\n") # Calculate transfers and uncertainties for pipette in pipettes: max_volume = pipette["max_volume"] ntransfers = ceil(volume / max_volume) vol_per_transfer = volume / ntransfers pipette["ntransfers"] = ntransfers # Calculate uncertainty (simplified from original) pipette["uncertainty"] = ( ntransfers * 0.1 ) # Simplified uncertainty calculation if self.data is not None: self.data["transfer_method"] = method self.data["pipette_options"] = str(pipettes) # Choose pipette based on method if method == "uncertainty": pipette = min(pipettes, key=lambda x: x["uncertainty"]) elif method == "min_transfers": min_xfers = min(pipettes, key=lambda x: x["ntransfers"])["ntransfers"] acceptable_pipettes = [p for p in pipettes if p["ntransfers"] == min_xfers] pipette = min(acceptable_pipettes, key=lambda x: x["max_volume"]) else: raise ValueError(f"Pipette selection method {method} was not recognized.") self.log_debug(f"Chosen pipette: {pipette}") if self.data is not None: self.data["chosen_pipette"] = str(pipette) return pipette
[docs] def get_aspirate_rate(self, pipette=None): """Get current aspirate rate for a pipette""" active_pipettes = self._get_active_pipettes() if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in active_pipettes.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in active_pipettes.items(): if mount == pipette and pipette_data: return pipette_data.get("aspirate_flow_rate", 150) except requests.exceptions.RequestException: pass return 150 # Default value
[docs] def get_dispense_rate(self, pipette=None): """Get current dispense rate for a pipette""" active_pipettes = self._get_active_pipettes() if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in active_pipettes.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in active_pipettes.items(): if mount == pipette and pipette_data: return pipette_data.get("dispense_flow_rate", 300) except requests.exceptions.RequestException: pass return 300 # Default value
# HTTP API communication with heater-shaker module
[docs] def set_shake(self, rpm, module_id = None): self.log_info(f"Setting heater-shaker speed to {rpm} RPM") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setAndWaitForShakeSpeed", params= { "moduleId": module_id, "rpm": rpm, }, )
[docs] def stop_shake(self, module_id = None): self.log_info("Stopping heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateShaker", params= { "moduleId": module_id, }, )
[docs] def set_shaker_temp(self, temp, module_id = None): self.log_info(f"Setting heater-shaker temperature to {temp}°C") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setTargetTemperature", params= { "moduleId": module_id, "celsius": temp, }, )
[docs] def stop_shaker_heat(self, module_id = None): self.log_info(f"Deactivating heater-shaker heating") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateHeater", params= { "moduleId": module_id, }, )
[docs] def unlatch_shaker(self, module_id = None): self.log_info("Unlatching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/openLabwareLatch", params= { "moduleId": module_id, }, )
[docs] def latch_shaker(self, module_id = None): self.log_info("Latching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/closeLabwareLatch", params= { "moduleId": module_id, }, )
def _find_module_by_type(self,partial_name): module_id = None for module in self.config["loaded_modules"].values(): if partial_name in module[1]: module_id = module[0] return module_id
[docs] def get_shaker_temp(self): self.log_info("Getting heater-shaker temperature") # For get operations, we still need to use the modules API directly try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" logging.debug(heater_shaker_module) current_temp = heater_shaker_module.get("data", {}).get("currentTemp") target_temp = heater_shaker_module.get("data", {}).get("targetTemp") self.log_info( f"Heater-shaker temperature - Current: {current_temp}°C, Target: {target_temp}°C" ) return (current_temp,target_temp) except Exception as e: self.log_error(f"Error getting temperature: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_rpm(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" current_rpm = heater_shaker_module.get("data", {}).get("currentSpeed") target_rpm = heater_shaker_module.get("data", {}).get("targetSpeed") status = heater_shaker_module.get("data", {}).get("speedStatus") return (status,current_rpm,target_rpm) except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_latch_status(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" status = heater_shaker_module.get("data", {}).get("labwareLatchStatus") return status except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
def _create_run(self): """Create a run on the robot for executing commands""" self.log_info("Creating a new run for commands") try: # Clear custom labware tracking so definitions are re-uploaded for the new run self.sent_custom_labware = {} # Create a run import datetime run_response = requests.post( url=f"{self.base_url}/runs", headers=self.headers, ) if run_response.status_code != 201: self.log_error(f"Failed to create run: {run_response.status_code}") self.log_error(f"Response: {run_response.text}") raise RuntimeError(f"Failed to create run: {run_response.text}") self.run_id = run_response.json()["data"]["id"] self.log_debug(f"Created run: {self.run_id}") # Reload previously configured labware, instruments, and modules self._reload_deck_configuration() return self.run_id except requests.exceptions.RequestException as e: self.log_error(f"Error creating run: {str(e)}") raise RuntimeError(f"Error creating run: {str(e)}") def _reload_deck_configuration(self): """Reload the deck configuration (modules, labware, instruments) from persistent storage""" self.log_info("Reloading previously configured deck setup") # Store original configuration for recovery if needed original_modules = self.config["loaded_modules"].copy() original_labware = self.config["loaded_labware"].copy() original_instruments = self.config["loaded_instruments"].copy() old_uuid_to_slot = {} tiprack_slots = {} for (mount,instrument) in original_instruments.items(): tiprack_slots[mount] = [self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']] old_uuid_to_slot.update({uuid:self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']}) # Clear current state for reloading self.config["loaded_modules"] = {} self.config["loaded_labware"] = {} self.config["loaded_instruments"] = {} try: # Step 1: Load modules first # We know the run exists because _create_run just created it, so skip status checks self.log_info("Reloading modules") for slot, (_, module_name) in original_modules.items(): try: self.log_info(f"Reloading module {module_name} in slot {slot}") self.load_module(module_name, slot, check_run_status=False) # New module ID will be stored in config["loaded_modules"] except Exception as e: self.log_error(f"Error reloading module {module_name} in slot {slot}: {str(e)}") raise # Step 2: Load labware # We know the run exists because _create_run just created it, so skip status checks self.log_info("Reloading labware") for slot, (_, labware_name, labware_data) in original_labware.items(): # Check if this labware is on a module module_id = None if str(slot) in self.config["loaded_modules"]: module_id = self.config["loaded_modules"][str(slot)][0] # Get new module ID try: self.log_info(f"Reloading labware {labware_name} in slot {slot}") self.load_labware(labware_name, slot, module=module_id, check_run_status=False) # New labware ID will be stored in config["loaded_labware"] except Exception as e: self.log_error(f"Error reloading labware {labware_name} in slot {slot}: {str(e)}") raise # Step 3: Load instruments # We know the run exists because _create_run just created it, so skip status checks # Also skip pipette updates since _initialize_robot already fetched pipette info self.log_info("Reloading instruments") for mount, instrument_data in original_instruments.items(): instrument_name = instrument_data['name'] try: self.log_info(f"Reloading instrument {instrument_name} on {mount} mount") self.load_instrument(instrument_name, mount, tiprack_slots[mount], reload=True, check_run_status=False, update_pipettes=False) # New instrument ID will be stored in config["loaded_instruments"] except Exception as e: self.log_error(f"Error reloading instrument {instrument_name} on {mount} mount: {str(e)}") raise self.log_info("Deck configuration successfully reloaded") # Update tiprack lists # Build slot->new_uuid mapping from new loaded_instruments slot_to_new_tiprack_uuid = {} for instrument in self.config["loaded_instruments"].values(): for new_uuid in instrument.get('tip_racks', []): slot = self._slot_by_labware_uuid(new_uuid) slot_to_new_tiprack_uuid[slot] = new_uuid # Remap available tips old_available_tips = self.config.get("available_tips", {}) new_available_tips = {} for mount in self.config["loaded_instruments"].keys(): new_available_tips[mount] = [] for tiprack_uuid, well in old_available_tips.get(mount, []): slot = old_uuid_to_slot.get(tiprack_uuid) new_uuid = slot_to_new_tiprack_uuid.get(slot) if new_uuid is not None: new_available_tips[mount].append((new_uuid, well)) self.log_info(f"Remapped {len(new_available_tips[mount])} available tips for {mount} mount after reload.") self.config["available_tips"] = new_available_tips return True except Exception as e: self.log_error(f"Failed to reload deck configuration: {str(e)}") # Restore original configuration in config self.config["loaded_modules"] = original_modules self.config["loaded_labware"] = original_labware self.config["loaded_instruments"] = original_instruments return False def _ensure_run_exists(self, check_run_status=True): """Ensure a run exists for executing commands, creating one if needed Args: check_run_status: If False, skip HTTP GET check and return run_id if it exists. If True, verify run status via HTTP GET request. """ if not hasattr(self, "run_id") or not self.run_id: return self._create_run() # Skip status check if requested (optimization for bulk operations) if not check_run_status: return self.run_id # Check if the run is still valid try: response = requests.get( url=f"{self.base_url}/runs/{self.run_id}", headers=self.headers ) if response.status_code != 200: # Run doesn't exist, create a new one return self._create_run() # Check run state run_data = response.json()["data"] current_state = run_data.get("status") if current_state in ["failed", "error", "succeeded", "stopped"]: # Run is in a terminal state, create a new one return self._create_run() return self.run_id except requests.exceptions.RequestException: # Error checking run, create a new one return self._create_run()
[docs] def get_tip(self, mount): tip = self.config["available_tips"][mount].pop(0) self.config._update_history() return tip
[docs] def get_tip_status(self, mount=None): """Get the current tip usage status""" if mount: if mount not in self.config["available_tips"]: return f"No tipracks loaded for {mount} mount" if mount not in self.config["loaded_instruments"]: return f"No instrument defined for {mount} mount" total_tips = len(TIPRACK_WELLS) * len( self.config["loaded_instruments"][mount]["tip_racks"] ) available_tips = len(self.config["available_tips"][mount]) return f"{available_tips}/{total_tips} tips available on {mount} mount" # Return status for all mounts status = [] for m in self.config["available_tips"]: status.append(self.get_tip_status(m)) return "\n".join(status)
[docs] def make_align_script(self, filename: str): """ Generate an Opentrons Python Protocol API script to verify alignment. This script recreates the current deck state (modules, labware, instruments) and performs a movement to the top of well A1 for each labware using each pipette. This allows for visual verification of calibration and alignment. """ script = [] # Header script.append("from opentrons import protocol_api") script.append("") script.append("metadata = {") script.append(" 'protocolName': 'Alignment Check',") script.append(" 'author': 'AFL Auto-Generated',") script.append(" 'description': 'Script for aligning and testing deck configuration',") script.append(" 'apiLevel': '2.13'") script.append("}") script.append("") script.append("def run(protocol: protocol_api.ProtocolContext):") indent = " " # Track labware variable names by ID labware_var_by_id = {} # 1. Modules loaded_modules = self.config.get("loaded_modules", {}) if loaded_modules: script.append(f"{indent}# Modules") # Sort by slot for slot in sorted(loaded_modules.keys(), key=lambda x: int(x) if x.isdigit() else 99): module_id, module_name = loaded_modules[slot] script.append(f"{indent}module_{slot} = protocol.load_module('{module_name}', '{slot}')") script.append("") # 2. Labware loaded_labware = self.config.get("loaded_labware", {}) regular_labware_vars = [] if loaded_labware: script.append(f"{indent}# Labware") # Sort by slot for slot in sorted(loaded_labware.keys(), key=lambda x: int(x) if x.isdigit() else 99): labware_id, _, labware_data = loaded_labware[slot] # Get precise load info from definition if available definition = labware_data.get('definition', {}) params = definition.get('parameters', {}) load_name = params.get('loadName', 'unknown_labware') namespace = definition.get('namespace', 'opentrons') version = definition.get('version', 1) # Determine if tiprack labware_type = definition.get('metadata', {}).get('displayCategory', 'default') is_tiprack = ( params.get('isTiprack') or labware_type == 'tipRack' or 'tiprack' in load_name.lower() ) var_name = f"labware_{slot}" labware_var_by_id[labware_id] = var_name if not is_tiprack: regular_labware_vars.append(var_name) # Check if on module if str(slot) in loaded_modules: parent = f"module_{slot}" script.append(f"{indent}{var_name} = {parent}.load_labware('{load_name}', namespace='{namespace}', version={version})") else: script.append(f"{indent}{var_name} = protocol.load_labware('{load_name}', '{slot}', namespace='{namespace}', version={version})") script.append("") # 3. Pipettes loaded_instruments = self.config.get("loaded_instruments", {}) pipette_vars = [] if loaded_instruments: script.append(f"{indent}# Pipettes") for mount, instrument_data in loaded_instruments.items(): name = instrument_data['name'] tip_rack_ids = instrument_data.get('tip_racks', []) # Resolve tip rack variables tip_rack_vars = [labware_var_by_id[tid] for tid in tip_rack_ids if tid in labware_var_by_id] tip_racks_arg = f"[{', '.join(tip_rack_vars)}]" var_name = f"pipette_{mount}" pipette_vars.append(var_name) script.append(f"{indent}{var_name} = protocol.load_instrument('{name}', '{mount}', tip_racks={tip_racks_arg})") script.append("") # 4. Alignment Moves if pipette_vars and regular_labware_vars: script.append(f"{indent}# Alignment Verification") script.append(f"{indent}# Move to top of A1 for each labware") for pip in pipette_vars: for lab in regular_labware_vars: script.append(f"{indent}protocol.comment(f'Checking {lab} with {pip}')") script.append(f"{indent}{pip}.move_to({lab}['A1'].top())") script.append(f"{indent}protocol.delay(seconds=0.5)") # Write to file with open(filename, 'w') as f: f.write('\n'.join(script)) self.log_info(f"Generated alignment script at {filename}")
if __name__ == "__main__": from AFL.automation.shared.launcher import *