Source code for AFL.automation.prepare.OT2HTTPDriver

import requests
import time
import json
import tomllib
from pathlib import Path
from importlib.resources import files
from jinja2 import Template


from math import ceil
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.shared.utilities import listify

# Add this constant at the top of the file, after the imports
TIPRACK_WELLS = [f"{row}{col}" for col in range(1, 13) for row in "ABCDEFGH"]

# Limited set of labware options for quick loading via the dashboard
LABWARE_OPTIONS = {
    "opentrons/opentrons_96_tiprack_300ul": "Opentrons 96 Tiprack 300µL",
    "opentrons/opentrons_96_tiprack_1000ul": "Opentrons 96 Tiprack 1000µL",
    "opentrons/corning_96_wellplate_360ul_flat": "Corning 96 Well Plate",
    "opentrons/nest_96_wellplate_2ml_deep": "NEST 2mL 96 Deep Well Plate",
    "custom_beta/nist_pneumatic_loader": "NIST Pneumatic Loader (slot 10 only)",
    "custom_beta/nist_6_20ml_vials": "NIST 6 x 20mL vial carrier",
    "custom_beta/nist_2_100ml_bottles": "NIST 2 x 100mL bottle carrier",
    "heaterShakerModuleV1": "HeaterShaker Module (still needs labware atop it!)"
}


[docs] class OT2HTTPDriver(Driver): defaults = {} defaults["robot_ip"] = "127.0.0.1" # Default to localhost, should be overridden defaults["robot_port"] = "31950" # Default Opentrons HTTP API port defaults["loaded_labware"] = {} # Persistent storage for loaded labware defaults["loaded_instruments"] = {} # Persistent storage for loaded instruments defaults["loaded_modules"] = {} # Persistent storage for loaded modules defaults["available_tips"] = {} # Persistent storage for available tips, Format: {mount: [(tiprack_id, well_name), ...]} defaults["prep_targets"] = [] # Persistent storage for prep target well locations
[docs] def __init__(self, overrides=None): self.app = None Driver.__init__( self, name="OT2_HTTP_Driver", defaults=self.gather_defaults(), overrides=overrides, ) self.name = "OT2_HTTP_Driver" # Initialize state variables self.session_id = None self.protocol_id = None self.max_transfer = None self.min_transfer = None self.has_tip = False self.last_pipette = None self.modules = {} self.pipette_info = {} # Custom labware handling self.custom_labware_files = {} self.sent_custom_labware = set() self.custom_labware_dir = self._get_custom_labware_dir() self._load_custom_labware_defs() # Base URL for HTTP requests self.base_url = f"http://{self.config['robot_ip']}:{self.config['robot_port']}" self.headers = {"Opentrons-Version": "2"} # Initialize the robot connection self._initialize_robot() self.useful_links['View Deck'] = '/visualize_deck'
def _log(self, level, message): """Safe logging that checks if app exists before logging""" if self.app is not None and hasattr(self.app, "logger"): log_method = getattr(self.app.logger, level, None) if log_method: log_method(message) else: print(f"[{level.upper()}] {message}")
[docs] def log_info(self, message): """Log info message safely""" self._log("info", message)
[docs] def log_error(self, message): """Log error message safely""" self._log("error", message)
[docs] def log_debug(self, message): """Log debug message safely""" self._log("debug", message)
[docs] def log_warning(self, message): """Log warning message safely""" self._log("warning", message)
def _get_custom_labware_dir(self) -> Path: """Return the path to the custom labware directory.""" current = Path(__file__).resolve() for parent in current.parents: if (parent / "pyproject.toml").exists(): pyproject = parent / "pyproject.toml" try: with open(pyproject, "rb") as f: data = tomllib.load(f) custom_dir = ( data.get("tool", {}) .get("afl", {}) .get("custom_labware_dir") ) if custom_dir: return (parent / custom_dir).expanduser() except Exception: pass return parent / "support" / "labware" return Path.cwd() / "support" / "labware" def _load_custom_labware_defs(self): """Record available custom labware definitions in the support directory.""" self.custom_labware_dir.mkdir(parents=True, exist_ok=True) for json_file in self.custom_labware_dir.glob("*.json"): try: with open(json_file, "r") as f: definition = json.load(f) ns = definition.get("namespace", "custom_beta") load = definition.get("parameters", {}).get("loadName") if ns and load: key = f"{ns}/{load}" self.custom_labware_files[key] = json_file except Exception: continue def _initialize_robot(self): """Initialize the connection to the robot and get basic information""" try: # Check if the robot is reachable response = requests.get(url=f"{self.base_url}/health", headers=self.headers) if response.status_code != 200: raise ConnectionError(f"Failed to connect to robot at {self.base_url}") # Get attached pipettes self._update_pipettes() except requests.exceptions.RequestException as e: self.log_error(f"Error connecting to robot: {str(e)}") raise ConnectionError( f"Error connecting to robot at {self.base_url}: {str(e)}" ) def _update_pipettes(self): """Get information about attached pipettes and their settings""" try: if self.app is not None: self.log_info("Fetching pipette information from robot") # Get basic pipette information response = requests.get( url=f"{self.base_url}/instruments", headers=self.headers ) if response.status_code != 200: raise RuntimeError(f"Failed to get pipettes: {response.text}") pipettes_data = response.json()['data'] self.pipette_info = {} # Update min/max transfer values based on attached pipettes self.min_transfer = None self.max_transfer = None for pipette in pipettes_data: mount = pipette['mount'] try: pipette_id = self.config["loaded_instruments"][mount]["pipette_id"] # the id from this run except KeyError: pipette_id = None # Store basic pipette info self.pipette_info[mount] = { "id": pipette_id, "name": pipette["instrumentName"], "model": pipette["instrumentModel"], "serial": pipette["serialNumber"], "mount": mount, "min_volume": pipette.get("data",{}).get("min_volume", None), "max_volume": pipette.get("data",{}).get("max_volume", None), "aspirate_flow_rate": pipette.get("data",{}).get( "aspirateFlowRate", {} ).get("value",150), "dispense_flow_rate": pipette.get("data",{}).get( "dispenseFlowRate", {} ).get("value",150), "channels": pipette.get("data",{}).get("channels", 1), } if pipette_id is None: continue # Update global min/max transfer values min_volume = self.pipette_info[mount]['min_volume'] max_volume = self.pipette_info[mount]['max_volume'] if (self.min_transfer is None) or (self.min_transfer > min_volume): self.min_transfer = min_volume if self.app is not None: self.log_info( f"Setting minimum transfer to {self.min_transfer}" ) if (self.max_transfer is None) or (self.max_transfer < max_volume): self.max_transfer = max_volume if self.app is not None: self.log_info( f"Setting maximum transfer to {self.max_transfer}" ) if self.app is not None: self.log_info(f"Pipette information updated: {self.pipette_info}") except Exception as e: raise RuntimeError(f"Error getting pipettes: {str(e)}")
[docs] def reset_prep_targets(self): """Clear the list of preparation targets stored in the config.""" self.config["prep_targets"] = []
[docs] def add_prep_targets(self, targets, reset=False): """Add well locations to the preparation target list.""" if reset: self.reset_prep_targets() self.config.setdefault("prep_targets", []) self.config["prep_targets"].extend(listify(targets)) self.config._update_history()
[docs] def get_prep_target(self): return self.config["prep_targets"].pop(0)
[docs] def status(self): status = [] prep_targets = self.config.get("prep_targets", []) if len(prep_targets) > 0: status.append(f"Next prep target: {prep_targets[0]}") status.append(f"Remaining prep targets: {len(prep_targets)}") else: status.append("No prep targets loaded") status.append(self.get_tip_status()) # Get current session status if available if self.session_id: try: response = requests.get( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) if response.status_code == 200: session_data = response.json().get("data", {}) current_state = session_data.get("details", {}).get( "currentState", "unknown" ) status.append(f"Session state: {current_state}") except requests.exceptions.RequestException: status.append("Unable to get session status") # Get pipette information for mount, pipette in self.pipette_info.items(): if pipette: status.append( f"Pipette on {mount} mount: {pipette.get('model', 'unknown')}" ) # Get loaded labware information try: for slot, (labware_id, name, _) in self.config["loaded_labware"].items(): status.append(f"Labware in slot {slot}: {name}") except Exception: print(self.config["loaded_labware"]) return status
[docs] @Driver.quickbar( qb={ "button_text": "Refill Tipracks", "params": { "mount": { "label": "Which Pipet left/right/both", "type": "text", "default": "both", }, }, } ) def reset_tipracks(self, mount="both"): """Reset the available tips for the specified mount(s)""" self.log_info(f"Resetting tipracks for {mount} mount") mounts_to_reset = [] if mount == "both": mounts_to_reset = list(self.config["loaded_instruments"].keys()) else: mounts_to_reset = [mount] for m in mounts_to_reset: if m in self.config["loaded_instruments"]: # Reinitialize available tips for this mount self.config["available_tips"][m] = [] for tiprack in self.config["loaded_instruments"][m]["tip_racks"]: for well in TIPRACK_WELLS: self.config["available_tips"][m].append((tiprack, well)) self.log_info(f"Reset {len(self.config['available_tips'][m])} tips for {m} mount") # Reset tip status self.has_tip = False
[docs] def reset(self): self.log_info("Resetting the protocol context") # Delete any active session if self.session_id: try: requests.delete( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting session: {str(e)}") # Delete any uploaded protocol if self.protocol_id: try: requests.delete( url=f"{self.base_url}/protocols/{self.protocol_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting protocol: {str(e)}") # Reset state variables self.session_id = None self.protocol_id = None self.has_tip = False self.last_pipette = None # Reset deck configuration too self.reset_deck() # Re-initialize robot connection self._initialize_robot()
[docs] def reset_deck(self): """Reset the deck configuration, clearing loaded labware, instruments, and modules""" self.log_info("Resetting the deck configuration") # Clear the deck configuration self.config["loaded_labware"] = {} self.config["loaded_instruments"] = {} self.config["loaded_modules"] = {} self.config["available_tips"] = {} self.config["prep_targets"] = []
[docs] @Driver.quickbar(qb={"button_text": "Home"}) def home(self, **kwargs): """ Home the robot's axes using the dedicated /robot/home endpoint. This endpoint is a direct control endpoint and doesn't require creating a run. It can be used to home all axes at once or specific axes as needed. """ self.log_info("Homing the robot's axes") try: # Call the dedicated home endpoint response = requests.post( url=f"{self.base_url}/robot/home", headers=self.headers, json={ "target": "robot", # Home the entire robot }, ) if response.status_code != 200: self.log_error(f"Failed to home robot: {response.status_code}") self.log_error(f"Response: {response.text}") raise RuntimeError(f"Failed to home robot: {response.text}") self.log_info("Robot homing completed successfully") return True except requests.exceptions.RequestException as e: self.log_error(f"Error during homing: {str(e)}") raise RuntimeError(f"Error during homing: {str(e)}")
[docs] def parse_well(self, loc): """Parse a well location string into slot and well components""" # Default value in case no alphabetic character is found i = 0 for i, loc_part in enumerate(list(loc)): if loc_part.isalpha(): break slot = loc[:i] well = loc[i:] return slot, well
[docs] def get_wells(self, locs): """Convert location strings to well objects with proper labware IDs, and check that wells are valid. Args: locs: Single location string or list of location strings in format "slotwell" (e.g. "1A1") Returns: List of well objects with labwareId and wellName Raises: ValueError: If labware is not found in the specified slot """ self.log_debug(f"Converting locations to well objects: {locs}") wells = [] for loc in listify(locs): slot, well = self.parse_well(loc) # Get labware info from the slot labware_info = self.config['loaded_labware'].get(slot) if not labware_info: raise ValueError(f"No labware found in slot {slot}") if not isinstance(labware_info, tuple) or len(labware_info) < 1: raise ValueError(f"Invalid labware info format in slot {slot}") labware_id = labware_info[0] wells.append({"labwareId": labware_id, "wellName": well}) self.log_debug(f"Created well objects: {wells}") # Check well validity here assert slot in self.config["loaded_labware"].keys(), f"Slot {slot} does not have any loaded labware" assert well in self.config["loaded_labware"][slot][2]['definition']['wells'].keys(), f"Well {well} is not a valid well for slot {slot}, {self.config['loaded_labware'][slot][2]['definition']['metadata']['displayName']}" return wells
def _check_cmd_success(self, response): if response.status_code != 201: self.log_error( f"Failed to execute command : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Failed to execute command: {response.text}" ) if 'status' in response.json()['data'].keys(): if response.json()['data']['status'] == 'failed': self.log_error( f"Command returned error : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Command returned error: {response.text}" )
[docs] def send_labware(self, labware_def): """Send a custom labware definition to the current run and persist it.""" self.log_debug(f"Sending custom labware definition: {labware_def}") ns = labware_def.get("namespace", "custom_beta") load_name = labware_def.get("parameters", {}).get("loadName") if not load_name: raise ValueError("labware_def missing parameters.loadName") key = f"{ns}/{load_name}" # Persist the definition for future use self.custom_labware_dir.mkdir(parents=True, exist_ok=True) file_path = self.custom_labware_dir / f"{ns}_{load_name}.json" with open(file_path, "w") as f: json.dump(labware_def, f, indent=2) self.custom_labware_files[key] = file_path # Avoid re-sending if already uploaded if key in self.sent_custom_labware: self.log_debug(f"Labware {key} already sent to robot") return key # Ensure we have a valid run run_id = self._ensure_run_exists() try: command_dict = {"data": labware_def} response = requests.post( url=f"{self.base_url}/runs/{run_id}/labware_definitions", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) response_data = response.json() labware_name = response_data["data"]["definitionUri"] self.sent_custom_labware.add(key) self.log_info( f"Successfully sent custom labware with name/URI {labware_name}" ) return labware_name except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error sending custom labware: {str(e)}") raise RuntimeError(f"Error sending custom labware: {str(e)}")
[docs] def load_labware(self, name, slot, module=None, **kwargs): """Load labware (containers, tipracks) into the protocol using HTTP API""" self.log_debug(f"Loading labware '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists() labware_json = kwargs.pop("labware_json", None) version = 1 if labware_json is not None: namespace = labware_json.get("namespace", "custom_beta") load_name = labware_json.get("parameters", {}).get("loadName") if not load_name: raise ValueError("labware_json missing parameters.loadName") name = load_name self.send_labware(labware_json) else: if "/" in name: namespace, load_name = name.split("/", 1) name = load_name else: load_name = name if f"custom_beta/{load_name}" in self.custom_labware_files: namespace = "custom_beta" elif f"opentrons/{load_name}" in self.custom_labware_files: namespace = "opentrons" else: namespace = "opentrons" key = f"{namespace}/{load_name}" if namespace != "opentrons" and key not in self.sent_custom_labware: path = self.custom_labware_files.get(key) if path and Path(path).exists(): with open(path, "r") as f: definition = json.load(f) self.send_labware(definition) else: self.log_warning(f"Custom labware definition not found for {key}") try: # Check if there's existing labware in the slot if slot in self.config["loaded_labware"]: self.log_info( f"Found existing labware in slot {slot}, moving it off-deck first" ) existing_labware_id = self.config["loaded_labware"][slot][ 0 ] # Get the ID of existing labware # Create command to move existing labware off-deck move_command = { "data": { "commandType": "moveLabware", "params": { "labwareId": existing_labware_id, "newLocation": "offDeck", "strategy": "manualMoveWithoutPause", # Allow user to manually move the labware }, "intent": "setup", } } # Execute the move command move_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=move_command, ) self._check_cmd_success(move_response) # Remove from our tracking del self.config["loaded_labware"][slot] if str(slot) in self.config["loaded_modules"].keys(): # we need to load into a module, not a slot location = {"moduleId": self.config["loaded_modules"][str(slot)][0]} else: location = {"slotName": str(slot)} # Determine namespace and version version = 1 # default version # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadLabware", "params": { "location": location, "loadName": name, "namespace": namespace, "version": version, }, "intent": "setup", } } # If this is a module, we need to specify the moduleId if module: command_dict["data"]["params"]["moduleId"] = module # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: labware_id = response_data["data"]["result"]["labwareId"] elif "data" in response_data and "labwareId" in response_data["data"]: labware_id = response_data["data"]["labwareId"] elif "data" in response_data and "id" in response_data["data"]: labware_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "labwareId" in value: labware_id = value["labwareId"] break else: raise KeyError("Could not find labwareId in response") except KeyError as e: self.log_error(f"Error extracting labware ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract labware ID from response: {str(e)}" ) result = response_data["data"]["result"] # Store the labware information directly in config self.config["loaded_labware"][slot] = (labware_id, name, result) # If this is a module, store it if module: self.modules[slot] = module self.log_info( f"Successfully loaded labware '{name}' in slot {slot} with ID {labware_id}" ) self.config._update_history() return labware_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading labware: {str(e)}") raise RuntimeError(f"Error loading labware: {str(e)}")
[docs] def load_module(self, name, slot, **kwargs): """Load modules (heater-shaker, tempdeck) into the protocol using HTTP API""" self.log_debug(f"Loading module '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists() try: if slot in self.config["loaded_modules"].keys(): # todo: check if same module raise RuntimeError(f"Module already loaded in slot {slot}: {self.config['loaded_modules']['slot']}. Overwrite not supported.") # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadModule", "params": { "location": {"slotName": str(slot)}, "model": name, }, "intent": "setup", } } # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: module_id = response_data["data"]["result"]["moduleId"] elif "data" in response_data and "moduleId" in response_data["data"]: module_id = response_data["data"]["moduleId"] elif "data" in response_data and "id" in response_data["data"]: module_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "moduleId" in value: module_id = value["moduleId"] break else: raise KeyError("Could not find moduleId in response") except KeyError as e: self.log_error(f"Error extracting module ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract module ID from response: {str(e)}" ) # Store the module information directly in config self.config["loaded_modules"][str(slot)] = (module_id, name) self.log_info( f"Successfully loaded module '{name}' in slot {slot} with ID {module_id}" ) self.config._update_history() return module_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading module: {str(e)}") raise RuntimeError(f"Error loading module: {str(e)}")
[docs] def load_instrument(self, name, mount, tip_rack_slots, reload=False, **kwargs): """Load pipette and store tiprack information using HTTP API.""" self.log_debug( f"Loading pipette '{name}' on '{mount}' mount with tip_racks in slots {tip_rack_slots}" ) # Ensure we have a valid run run_id = self._ensure_run_exists() try: # First, load the pipette using the HTTP API command_dict = { "data": { "commandType": "loadPipette", "params": { "pipetteName": name, "mount": mount, "tip_racks": [self.config["loaded_labware"][str(slot)][0] for slot in tip_rack_slots], }, "intent": "setup", } } # Execute the loadPipette command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the pipette ID from the response response_data = response.json() print(f'loadPipette response: {response_data}') pipette_id = response_data["data"]["result"]["pipetteId"] # Make sure we have the latest pipette information self._update_pipettes() self.pipette_info[mount][ "id" ] = pipette_id # patch the correct pipette id to the pipette_info dict # Get the tip rack IDs - note that loaded_labware now stores tuples of (id, name) tip_racks = [] for slot in listify(tip_rack_slots): labware_info = self.config["loaded_labware"].get(slot) if ( labware_info and isinstance(labware_info, tuple) and len(labware_info) >= 1 ): tip_racks.append(labware_info[0]) if not tip_racks: self.log_warning(f"No valid tip racks found in slots {tip_rack_slots}") # Store the instrument information self.config["loaded_instruments"][mount] = { "name": name, "pipette_id": pipette_id, "tip_racks": tip_racks, } # If not reloading, initialize available tips for this mount if not reload: self.config["available_tips"][mount] = [] for tiprack in tip_racks: for well in TIPRACK_WELLS: self.config["available_tips"][mount].append((tiprack, well)) # Verify that there's actually a pipette in this mount if mount not in self.pipette_info or self.pipette_info[mount] is None: self.log_warning( f"No physical pipette detected in {mount} mount, but pipette information stored" ) # Update min/max values for largest and smallest pipettes self._update_pipette_ranges() self.log_info( f"Successfully loaded pipette '{name}' on {mount} mount with ID {pipette_id}" ) self.config._update_history() return pipette_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading pipette: {str(e)}") raise RuntimeError(f"Error loading pipette: {str(e)}")
def _update_pipette_ranges(self): """Update the min/max values for largest and smallest pipettes""" self.min_largest_pipette = None self.max_smallest_pipette = None # Get all available pipettes with their volumes available_pipettes = { mount: info for mount, info in self.pipette_info.items() if info is not None } if available_pipettes: # Get min and max volumes for each pipette min_vols = { mount: info.get("min_volume", float("inf")) for mount, info in available_pipettes.items() } max_vols = { mount: info.get("max_volume", 0) for mount, info in available_pipettes.items() } # Find the smallest and largest pipettes if max_vols: # Use list and regular max/min functions with a key function mounts = list(max_vols.keys()) if mounts: largest_pipette_mount = max( mounts, key=lambda m: max_vols.get(m, 0) ) smallest_pipette_mount = min( mounts, key=lambda m: max_vols.get(m, float("inf")) ) # Set global min/max values if min_vols and largest_pipette_mount in min_vols: self.min_largest_pipette = min_vols[largest_pipette_mount] self.log_info( f"Setting min_largest_pipette to {self.min_largest_pipette}" ) if max_vols and smallest_pipette_mount in max_vols: self.max_smallest_pipette = max_vols[smallest_pipette_mount] self.log_info( f"Setting max_smallest_pipette to {self.max_smallest_pipette}" )
[docs] def mix(self, volume, location, repetitions=1, **kwargs): self.log_info(f"Mixing {volume}uL {repetitions} times at {location}") # Get pipette based on volume pipette = self.get_pipette(volume) pipette_mount = pipette["mount"] # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get well location wells = self.get_wells(location) if not wells: raise ValueError("Invalid location") well = wells[0] # Pick up tip if needed if not self.has_tip: self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack }, ) self.has_tip = True # Execute mix by performing repetitions of aspirate/dispense for _ in range(repetitions): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, )
def _split_up_transfers(self, vol): """Split up transfer volumes based on pipette constraints""" transfers = [] if self.max_transfer is None or vol <= 0: return transfers while sum(transfers) < vol: transfer = min(self.max_transfer, vol - sum(transfers)) # Handle case where remaining volume is less than minimum transfer if ( transfer < (self.min_transfer or 0) and len(transfers) > 0 and transfers[-1] >= (2 * (self.min_transfer or 0)) ): transfers[-1] -= (self.min_transfer or 0) - transfer transfer = self.min_transfer or 0 # Handle "valley of death" case - when transfer is between pipette ranges if ( self.min_largest_pipette is not None and self.max_smallest_pipette is not None and transfer < self.min_largest_pipette and transfer > self.max_smallest_pipette ): transfer = ( self.max_smallest_pipette ) # Use smaller pipette at max capacity transfers.append(transfer) # Exit condition - we've reached the target volume if sum(transfers) >= vol: break return transfers def _slot_by_labware_uuid(self,target_uuid): for slot, (uuid,name,_) in self.config["loaded_labware"].items(): if uuid == target_uuid: return slot return None
[docs] @Driver.quickbar( qb={ "button_text": "Transfer", "params": { "source": {"label": "Source Well", "type": "text", "default": "1A1"}, "dest": {"label": "Dest Well", "type": "text", "default": "1A1"}, "volume": {"label": "Volume (uL)", "type": "float", "default": 300}, }, } ) def transfer( self, source, dest, volume, mix_before=None, mix_after=None, air_gap=0, aspirate_rate=None, dispense_rate=None, mix_aspirate_rate=None, mix_dispense_rate=None, blow_out=False, post_aspirate_delay=0.0, aspirate_equilibration_delay=0.0, post_dispense_delay=0.0, drop_tip=True, force_new_tip=False, to_top=True, to_center=False, to_top_z_offset=0, fast_mixing=False, **kwargs, ): """Transfer fluid from one location to another using atomic HTTP API commands""" self.log_info(f"Transferring {volume}uL from {source} to {dest}") self._ensure_run_exists() # Set flow rates if specified if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate) if dispense_rate is not None: self.set_dispense_rate(dispense_rate) # Get pipette based on volume pipette = self.get_pipette(volume) pipette_mount = pipette["mount"] # Get the mount from the pipette object # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get source and destination wells source_wells = self.get_wells(source) if len(source_wells) > 1: raise ValueError("Transfer only accepts one source well at a time!") source_well = source_wells[0] dest_wells = self.get_wells(dest) if len(dest_wells) > 1: raise ValueError("Transfer only accepts one dest well at a time!") dest_well = dest_wells[0] # Handle special cases for well positions source_position = "bottom" # Default position dest_position = "bottom" # Default position if to_top and to_center: raise ValueError("Cannot dispense to_top and to_center simultaneously") elif to_top: dest_position = "top" elif to_center: dest_position = "center" # Split transfers if needed transfers = self._split_up_transfers(volume) for sub_volume in transfers: # 1. Always pick up a new tip for each transfer self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack, will be updated in _execute_atomic_command }, ) # 1a. If destination is on a heater-shaker, stop the shaking and latch the latch pre-flight was_shaking = False dest_well_slot = self._slot_by_labware_uuid(dest_well['labwareId']) source_well_slot = self._slot_by_labware_uuid(source_well['labwareId']) heater_shaker_slots = [slot for (slot,(uuid,name)) in self.config["loaded_modules"].items() if "heaterShaker" in name] if dest_well_slot in heater_shaker_slots or source_well_slot in heater_shaker_slots: # latch heater-shaker # this is contextual, maybe - seems to not cause trouble to run without conditional #if 'closed' not in self.get_shake_latch_status(): self.latch_shaker() # store current shake rpm and stop shake if self.get_shake_rpm()[0] != 'idle': shake_rpm = self.get_shake_rpm()[2] was_shaking = True self.stop_shake() # 2. Mix before if specified if mix_before is not None: n_mixes, mix_volume = mix_before # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix before transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 3. Aspirate self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": sub_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) # 4. Aspirate equilibration delay (while tip is in liquid) if aspirate_equilibration_delay > 0: time.sleep(aspirate_equilibration_delay) # self._execute_atomic_command("delay", {"seconds": aspirate_equilibration_delay}) # 5. Move tip above liquid and post-aspirate delay (tip above liquid) self._execute_atomic_command( "moveToWellTop", { "pipetteId": pipette_id, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], }, ) if post_aspirate_delay > 0: time.sleep(post_aspirate_delay) # self._execute_atomic_command("delay", {"seconds": post_aspirate_delay}) # 6. Air gap if specified if air_gap > 0: self._execute_atomic_command( "airGap", {"pipetteId": pipette_id, "volume": air_gap} ) # 7. Dispense offset = { "x": 0, "y": 0, "z": ( to_top_z_offset if dest_position == "top" and to_top_z_offset != 0 else 0 ), } self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": sub_volume + air_gap, # Include air gap in dispense volume "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # 8. Post-dispense delay if post_dispense_delay > 0: time.sleep(post_dispense_delay) # self._execute_atomic_command("delay", {"seconds": post_dispense_delay}) # 9. Mix after if specified if mix_after is not None: n_mixes, mix_volume = mix_after # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix after transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": { "origin": dest_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": { "origin": dest_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 10. Blow out if specified if blow_out: self._execute_atomic_command( "blowOut", { "pipetteId": pipette_id, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, }, ) if was_shaking: self.set_shake(shake_rpm) # back to running :) # 11. Drop tip if specified if drop_tip: # see https://github.com/Opentrons/opentrons/issues/14590 for the absolute bullshit that led to this. # in it: Opentrons incompetence self._execute_atomic_command("moveToAddressableAreaForDropTip", { "pipetteId": pipette_id, "addressableAreaName": "fixedTrash", "offset": { "x": 0, "y": 0, "z": 10 }, "alternateDropLocation": False}) self._execute_atomic_command("dropTipInPlace", {"pipetteId": pipette_id, }) # Update last pipette self.last_pipette = pipette
def _execute_atomic_command( self, command_type, params=None, wait_until_complete=True, timeout=None ): """Execute a single atomic command using the HTTP API""" if params is None: params = {} # Track tip usage for pick up and drop commands if command_type == "pickUpTip": mount = params.get("pipetteMount") if mount and mount in self.config["available_tips"] and self.config["available_tips"][mount]: tiprack_id, well = self.get_tip(mount) self.log_debug( f"Using tip from {tiprack_id} well {well} for {mount} mount" ) # Update the params to specify the exact tip location params["labwareId"] = tiprack_id params["wellName"] = well params["wellLocation"] = { "origin": "top", "offset": {"x": 0, "y": 0, "z": 0}, } del params["pipetteMount"] else: raise RuntimeError(f"No tips available for {mount} mount") self.log_debug( f"Executing atomic command: {command_type} with params: {params}" ) # Ensure we have a valid run run_id = self._ensure_run_exists() # Build the query parameters query_params = {"waitUntilComplete": wait_until_complete} if timeout is not None: query_params["timeout"] = timeout try: # Send the command command_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", params=query_params, headers=self.headers, json={ "data": { "commandType": command_type, "params": params, "intent": "setup", } }, ) self._check_cmd_success(command_response) command_data = command_response.json()["data"] command_id = command_data["id"] self.log_debug( f"Command {command_id} executed with status: {command_data['status']}" ) # If wait_until_complete is True, the command has already completed if wait_until_complete: if command_data["status"] == "succeeded": return True elif command_data["status"] in ["failed", "error"]: error_info = command_data.get("error", "Unknown error") self.log_error(f"Command failed: {error_info}") raise RuntimeError(f"Command failed: {error_info}") # If we're not waiting or the command is still running, return the command ID for tracking return command_id except requests.exceptions.RequestException as e: self.log_error(f"Error executing command: {str(e)}") raise RuntimeError(f"Error executing command: {str(e)}")
[docs] def set_aspirate_rate(self, rate=150, pipette=None): """Set aspirate rate in uL/s. Default is 150 uL/s""" self.log_info(f"Setting aspirate rate to {rate} uL/s") # If no specific pipette is provided, update all pipettes if pipette == 'left' or pipette is None: self.pipette_info['left']['aspirate_flow_rate'] = rate if pipette == 'right' or pipette is None: self.pipette_info['right']['aspirate_flow_rate'] = rate
[docs] def set_dispense_rate(self, rate=300, pipette=None): """Set dispense rate in uL/s. Default is 300 uL/s""" self.log_info(f"Setting dispense rate to {rate} uL/s") # If no specific pipette is provided, update all pipettes if pipette == 'left' or pipette is None: self.pipette_info['left']['dispense_flow_rate'] = rate if pipette == 'right' or pipette is None: self.pipette_info['right']['dispense_flow_rate'] = rate
[docs] def set_gantry_speed(self, speed=400): """Set movement speed of gantry. Default is 400 mm/s""" self.log_info(f"Setting gantry speed to {speed} mm/s") # In HTTP API, this would require updating robot settings # This is a placeholder - actual implementation would depend on HTTP API capabilities self.log_warning( "Setting gantry speed is not fully implemented in HTTP API mode" )
[docs] def get_pipette(self, volume, method="min_transfers"): self.log_debug(f"Looking for a pipette for volume {volume}") # Make sure we have the latest pipette information self._update_pipettes() pipettes = [] for mount, pipette_data in self.pipette_info.items(): if not pipette_data: continue min_volume = pipette_data.get("min_volume", 1) max_volume = pipette_data.get("max_volume", 300) if volume >= min_volume: pipettes.append( { "mount": mount, # Use mount as the identifier "min_volume": min_volume, "max_volume": max_volume, "name": pipette_data.get("name"), "model": pipette_data.get("model"), "channels": pipette_data.get("channels", 1), "pipette_id": pipette_data.get("id"), } ) if not pipettes: raise ValueError("No suitable pipettes found!\n") # Calculate transfers and uncertainties for pipette in pipettes: max_volume = pipette["max_volume"] ntransfers = ceil(volume / max_volume) vol_per_transfer = volume / ntransfers pipette["ntransfers"] = ntransfers # Calculate uncertainty (simplified from original) pipette["uncertainty"] = ( ntransfers * 0.1 ) # Simplified uncertainty calculation if self.data is not None: self.data["transfer_method"] = method self.data["pipette_options"] = str(pipettes) # Choose pipette based on method if method == "uncertainty": pipette = min(pipettes, key=lambda x: x["uncertainty"]) elif method == "min_transfers": min_xfers = min(pipettes, key=lambda x: x["ntransfers"])["ntransfers"] acceptable_pipettes = [p for p in pipettes if p["ntransfers"] == min_xfers] pipette = min(acceptable_pipettes, key=lambda x: x["max_volume"]) else: raise ValueError(f"Pipette selection method {method} was not recognized.") self.log_debug(f"Chosen pipette: {pipette}") if self.data is not None: self.data["chosen_pipette"] = str(pipette) return pipette
[docs] def get_aspirate_rate(self, pipette=None): """Get current aspirate rate for a pipette""" if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in self.pipette_info.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in self.pipette_info.items(): if mount == pipette and pipette_data: return pipette_data.get("aspirate_flow_rate", 150) except requests.exceptions.RequestException: pass return 150 # Default value
[docs] def get_dispense_rate(self, pipette=None): """Get current dispense rate for a pipette""" if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in self.pipette_info.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in self.pipette_info.items(): if mount == pipette and pipette_data: return pipette_data.get("dispense_flow_rate", 300) except requests.exceptions.RequestException: pass return 300 # Default value
# HTTP API communication with heater-shaker module
[docs] def set_shake(self, rpm, module_id = None): self.log_info(f"Setting heater-shaker speed to {rpm} RPM") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setAndWaitForShakeSpeed", params= { "moduleId": module_id, "rpm": rpm, }, )
[docs] def stop_shake(self, module_id = None): self.log_info("Stopping heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateShaker", params= { "moduleId": module_id, }, )
[docs] def set_shaker_temp(self, temp, module_id = None): self.log_info(f"Setting heater-shaker temperature to {temp}°C") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setTargetTemperature", params= { "moduleId": module_id, "celsius": temp, }, )
[docs] def stop_shaker_heat(self, module_id = None): self.log_info(f"Deactivating heater-shaker heating") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateHeater", params= { "moduleId": module_id, }, )
[docs] def unlatch_shaker(self, module_id = None): self.log_info("Unlatching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/openLabwareLatch", params= { "moduleId": module_id, }, )
[docs] def latch_shaker(self, module_id = None): self.log_info("Latching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/closeLabwareLatch", params= { "moduleId": module_id, }, )
def _find_module_by_type(self,partial_name): module_id = None for module in self.config["loaded_modules"].values(): if partial_name in module[1]: module_id = module[0] return module_id
[docs] def get_shaker_temp(self): self.log_info("Getting heater-shaker temperature") # For get operations, we still need to use the modules API directly try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" print(heater_shaker_module) current_temp = heater_shaker_module.get("data", {}).get("currentTemp") target_temp = heater_shaker_module.get("data", {}).get("targetTemp") self.log_info( f"Heater-shaker temperature - Current: {current_temp}°C, Target: {target_temp}°C" ) return (current_temp,target_temp) except Exception as e: self.log_error(f"Error getting temperature: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_rpm(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" current_rpm = heater_shaker_module.get("data", {}).get("currentSpeed") target_rpm = heater_shaker_module.get("data", {}).get("targetSpeed") status = heater_shaker_module.get("data", {}).get("speedStatus") return (status,current_rpm,target_rpm) except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_latch_status(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" status = heater_shaker_module.get("data", {}).get("labwareLatchStatus") return status except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
def _create_run(self): """Create a run on the robot for executing commands""" self.log_info("Creating a new run for commands") try: # Create a run import datetime run_response = requests.post( url=f"{self.base_url}/runs", headers=self.headers, ) if run_response.status_code != 201: self.log_error(f"Failed to create run: {run_response.status_code}") self.log_error(f"Response: {run_response.text}") raise RuntimeError(f"Failed to create run: {run_response.text}") self.run_id = run_response.json()["data"]["id"] self.log_debug(f"Created run: {self.run_id}") # Reload previously configured labware, instruments, and modules self._reload_deck_configuration() return self.run_id except requests.exceptions.RequestException as e: self.log_error(f"Error creating run: {str(e)}") raise RuntimeError(f"Error creating run: {str(e)}") def _reload_deck_configuration(self): """Reload the deck configuration (modules, labware, instruments) from persistent storage""" self.log_info("Reloading previously configured deck setup") # Store original configuration for recovery if needed original_modules = self.config["loaded_modules"].copy() original_labware = self.config["loaded_labware"].copy() original_instruments = self.config["loaded_instruments"].copy() old_uuid_to_slot = {} tiprack_slots = {} for (mount,instrument) in original_instruments.items(): tiprack_slots[mount] = [self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']] old_uuid_to_slot.update({uuid:self._slot_by_labware_uuid(uuid) for uuid in instrument['tip_racks']}) # Clear current state for reloading self.config["loaded_modules"] = {} self.config["loaded_labware"] = {} self.config["loaded_instruments"] = {} try: # Step 1: Load modules first self.log_info("Reloading modules") for slot, (_, module_name) in original_modules.items(): try: self.log_info(f"Reloading module {module_name} in slot {slot}") self.load_module(module_name, slot) # New module ID will be stored in config["loaded_modules"] except Exception as e: self.log_error(f"Error reloading module {module_name} in slot {slot}: {str(e)}") raise # Step 2: Load labware self.log_info("Reloading labware") for slot, (_, labware_name, labware_data) in original_labware.items(): # Check if this labware is on a module module_id = None if str(slot) in self.config["loaded_modules"]: module_id = self.config["loaded_modules"][str(slot)][0] # Get new module ID try: self.log_info(f"Reloading labware {labware_name} in slot {slot}") self.load_labware(labware_name, slot, module=module_id) # New labware ID will be stored in config["loaded_labware"] except Exception as e: self.log_error(f"Error reloading labware {labware_name} in slot {slot}: {str(e)}") raise # Step 3: Load instruments self.log_info("Reloading instruments") for mount, instrument_data in original_instruments.items(): instrument_name = instrument_data['name'] try: self.log_info(f"Reloading instrument {instrument_name} on {mount} mount") self.load_instrument(instrument_name, mount, tiprack_slots[mount],reload=True) # New instrument ID will be stored in config["loaded_instruments"] except Exception as e: self.log_error(f"Error reloading instrument {instrument_name} on {mount} mount: {str(e)}") raise self.log_info("Deck configuration successfully reloaded") # Update tiprack lists # Build slot->new_uuid mapping from new loaded_instruments slot_to_new_tiprack_uuid = {} for instrument in self.config["loaded_instruments"].values(): for new_uuid in instrument.get('tip_racks', []): slot = self._slot_by_labware_uuid(new_uuid) slot_to_new_tiprack_uuid[slot] = new_uuid # Remap available tips old_available_tips = self.config.get("available_tips", {}) new_available_tips = {} for mount in self.config["loaded_instruments"].keys(): new_available_tips[mount] = [] for tiprack_uuid, well in old_available_tips.get(mount, []): slot = old_uuid_to_slot.get(tiprack_uuid) new_uuid = slot_to_new_tiprack_uuid.get(slot) if new_uuid is not None: new_available_tips[mount].append((new_uuid, well)) self.log_info(f"Remapped {len(new_available_tips[mount])} available tips for {mount} mount after reload.") self.config["available_tips"] = new_available_tips return True except Exception as e: self.log_error(f"Failed to reload deck configuration: {str(e)}") # Restore original configuration in config self.config["loaded_modules"] = original_modules self.config["loaded_labware"] = original_labware self.config["loaded_instruments"] = original_instruments return False def _ensure_run_exists(self): """Ensure a run exists for executing commands, creating one if needed""" if not hasattr(self, "run_id") or not self.run_id: return self._create_run() # Check if the run is still valid try: response = requests.get( url=f"{self.base_url}/runs/{self.run_id}", headers=self.headers ) if response.status_code != 200: # Run doesn't exist, create a new one return self._create_run() # Check run state run_data = response.json()["data"] current_state = run_data.get("status") if current_state in ["failed", "error", "succeeded", "stopped"]: # Run is in a terminal state, create a new one return self._create_run() return self.run_id except requests.exceptions.RequestException: # Error checking run, create a new one return self._create_run()
[docs] def get_tip(self, mount): tip = self.config["available_tips"][mount].pop(0) self.config._update_history() return tip
[docs] def get_tip_status(self, mount=None): """Get the current tip usage status""" if mount: if mount not in self.config["available_tips"]: return f"No tipracks loaded for {mount} mount" if mount not in self.config["loaded_instruments"]: return f"No instrument defined for {mount} mount" total_tips = len(TIPRACK_WELLS) * len( self.config["loaded_instruments"][mount]["tip_racks"] ) available_tips = len(self.config["available_tips"][mount]) return f"{available_tips}/{total_tips} tips available on {mount} mount" # Return status for all mounts status = [] for m in self.config["available_tips"]: status.append(self.get_tip_status(m)) return "\n".join(status)
[docs] @Driver.unqueued(render_hint='html') def visualize_deck(self, mode='full', **kwargs): """ Generate HTML visualization of OT-2 deck layout with detailed or compact well layouts. mode: 'full' for detailed, 'simple' for compact Returns: str: HTML string for deck visualization """ import json slot_layout = [ [10, 11, "Trash"], [7, 8, 9], [4, 5, 6], [1, 2, 3] ] # Shared SVG generator def generate_well_svg(labware_data, size=90, labware_uuid=None, compact=False): if not labware_data: return "" definition = labware_data.get('definition', {}) wells = definition.get('wells', {}) if not wells: return "" labware_type = definition.get('metadata', {}).get('displayCategory', 'default') is_tiprack = labware_type == 'tipRack' or 'tiprack' in definition.get('parameters', {}).get('loadName', '').lower() available_tips_for_labware = set() if is_tiprack and labware_uuid: for mount_tips in self.config["available_tips"].values(): for tip_labware_uuid, well_name in mount_tips: if tip_labware_uuid == labware_uuid: available_tips_for_labware.add(well_name) well_count = len(wells) # Compact grid if compact: if well_count <= 8: cols = well_count; rows = 1 elif well_count <= 24: cols = 6; rows = (well_count + 5) // 6 elif well_count <= 96: cols = 12; rows = 8 else: cols = 12; rows = (well_count + 11) // 12 cell_width = size / max(cols, 6) cell_height = size / max(rows, 4) colors = { 'tipRack_available': '#4caf50', 'tipRack_used': '#f44336', 'tipRack': '#ffa726', 'wellPlate': '#42a5f5', 'reservoir': '#66bb6a', 'default': '#90a4ae' } svg_elements = [] well_names = list(wells.keys()) for i, well_name in enumerate(well_names[:min(well_count, rows * cols)]): row = i % rows col = i // rows x = col * cell_width + cell_width/4 y = row * cell_height + cell_height/4 if is_tiprack and labware_uuid: if well_name in available_tips_for_labware: color = colors['tipRack_available']; status = "Available" else: color = colors['tipRack_used']; status = "Used" tooltip = f"{well_name} - {status}" else: color = colors.get(labware_type, '#90a4ae') tooltip = well_name svg_elements.append( f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{min(cell_width, cell_height)/3:.1f}" ' f'fill="{color}" stroke="#333" stroke-width="0.3">' f'<title>{tooltip}</title></circle>' ) return f'<svg width="{size}" height="{size}" style="border: 1px solid #ddd; border-radius: 3px;">{"".join(svg_elements)}</svg>' # Full SVG else: labware_width = definition.get('dimensions', {}).get('xDimension', 127.76) labware_height = definition.get('dimensions', {}).get('yDimension', 85.48) scale_x = size / labware_width scale_y = (size * 0.75) / labware_height svg_elements = [] well_colors = { 'tipRack_available': '#4caf50', 'tipRack_used': '#f44336', 'tipRack_default': '#ffa726', 'wellPlate': '#42a5f5', 'reservoir': '#66bb6a', 'default': '#90a4ae' } for well_name, well_info in wells.items(): x = well_info.get('x', 0) * scale_x y = (labware_height - well_info.get('y', 0)) * scale_y shape = well_info.get('shape', 'circular') if is_tiprack and labware_uuid: if well_name in available_tips_for_labware: well_color = well_colors['tipRack_available']; tip_status = "Available" else: well_color = well_colors['tipRack_used']; tip_status = "Used" tooltip = f"{well_name} - {tip_status}" else: well_color = well_colors.get(labware_type, well_colors['default']) tooltip = well_name if shape == 'circular': diameter = well_info.get('diameter', 5) * min(scale_x, scale_y) radius = diameter / 2 svg_elements.append( f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{radius:.1f}" ' f'fill="{well_color}" stroke="#333" stroke-width="0.5" opacity="0.8">' f'<title>{tooltip}</title></circle>' ) elif shape == 'rectangular': well_width = well_info.get('xDimension', 8) * scale_x well_height = well_info.get('yDimension', 8) * scale_y rect_x = x - well_width/2 rect_y = y - well_height/2 svg_elements.append( f'<rect x="{rect_x:.1f}" y="{rect_y:.1f}" ' f'width="{well_width:.1f}" height="{well_height:.1f}" ' f'fill="{well_color}" stroke="#333" stroke-width="0.5" opacity="0.8">' f'<title>{tooltip}</title></rect>' ) if svg_elements: return f'<svg width="{size}" height="{int(size*0.75)}" style="margin: 5px 0;">{"".join(svg_elements)}</svg>' return "" # Slot info helper def get_slot_info(slot_num, compact): if slot_num == "Trash": return {"name": "Trash", "type": "trash", "color": "#ffcdd2", "svg": ""} slot_str = str(slot_num) info = {"name": "Empty", "type": "empty", "color": "#f5f5f5", "svg": ""} has_labware = slot_str in self.config["loaded_labware"] has_module = slot_str in self.config["loaded_modules"] if has_labware: labware_id, labware_type, labware_data = self.config["loaded_labware"][slot_str] definition = labware_data.get('definition', {}) display_name = definition.get('metadata', {}).get('displayName', labware_type) is_tiprack = ( 'tiprack' in labware_type.lower() or definition.get('metadata', {}).get('displayCategory') == 'tipRack' ) wells = list(definition.get('wells', {}).keys()) # sort wells in row-major order (A1, A2, B1, B2, ...) def _well_key(w): import re m = re.match(r"([A-Za-z]+)(\d+)", w) if m: row = m.group(1) col = int(m.group(2)) return (row, col) return (w, 0) wells = sorted(wells, key=_well_key) mounts = [] if is_tiprack: for m, d in self.config['loaded_instruments'].items(): if labware_id in d.get('tip_racks', []): mounts.append(m) info.update({ "name": display_name[:20] + ("..." if len(display_name) > 20 else ""), "type": "labware", "color": "#bbdefb", "svg": generate_well_svg(labware_data, size=50 if compact else 90, labware_uuid=labware_id, compact=compact) }) if is_tiprack: info['tiprack'] = True info['mounts'] = mounts info['color'] = '#fff3e0' info['target_count'] = len(wells) info['targets'] = ','.join([f"{slot_str}{w}" for w in wells]) if has_module: module_id, module_type = self.config["loaded_modules"][slot_str] module_name = module_type.replace('ModuleV1', '').replace('Module', ' Mod') if has_labware: info["name"] = f"{module_name}<br><small>{info['name']}</small>" info["color"] = "#c8e6c9" else: info.update({ "name": module_name, "type": "module_only", # distinguish module with no labware "color": "#e1bee7" }) return info slot_infos = {} for row in slot_layout: for slot in row: info = get_slot_info(slot, compact=(mode == 'simple')) slot_label = "T" if slot == "Trash" else str(slot) info["slot_label"] = slot_label info["click_attr"] = "" if info["type"] in ["empty", "module_only"]: info["click_attr"] = f"onclick=\"showLabwareOptions('{slot}')\" style=\"cursor:pointer;\"" info["buttons"] = ''.join([ f"<button style='margin-top:4px;font-size:10px;' onclick=\"resetTipracks('{m}')\">Reset</button>" for m in info.get('mounts', []) ]) if info.get('target_count', 0) > 10: target_str = info.get('targets', '') slot_id = str(slot) info["buttons"] += ( f"<button style='margin-top:4px;font-size:10px;' " f"onclick=\"openPrepTargetDialog('{slot_id}','{target_str}')\">" "Manage Targets</button>" ) slot_infos[str(slot)] = info template_path = files('AFL.automation.driver_templates').joinpath('ot2_deck.html') template = Template(template_path.read_text()) html = template.render( slot_layout=slot_layout, slot_infos=slot_infos, loaded_instruments=self.config.get('loaded_instruments', {}), mode=mode, labware_choices_json=json.dumps(LABWARE_OPTIONS) ) return html
if __name__ == "__main__": from AFL.automation.shared.launcher import *