Source code for AFL.automation.prepare.OT2HTTPDriver

import requests
import time

from math import ceil
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.shared.utilities import listify

# Add this constant at the top of the file, after the imports
TIPRACK_WELLS = [f"{row}{col}" for col in range(1, 13) for row in "ABCDEFGH"]


[docs] class OT2HTTPDriver(Driver): defaults = {} defaults["robot_ip"] = "127.0.0.1" # Default to localhost, should be overridden defaults["robot_port"] = "31950" # Default Opentrons HTTP API port
[docs] def __init__(self, overrides=None): self.app = None Driver.__init__( self, name="OT2_HTTP_Driver", defaults=self.gather_defaults(), overrides=overrides, ) self.name = "OT2_HTTP_Driver" # Initialize state variables self.session_id = None self.protocol_id = None self.max_transfer = None self.min_transfer = None self.prep_targets = [] self.has_tip = False self.last_pipette = None self.modules = {} self.loaded_labware = {} self.loaded_instruments = {} self.loaded_modules = {} self.pipette_info = {} # Base URL for HTTP requests self.base_url = f"http://{self.config['robot_ip']}:{self.config['robot_port']}" self.headers = {"Opentrons-Version": "2"} # Initialize the robot connection self._initialize_robot() # Add tip tracking state self.available_tips = {} # Format: {mount: [(tiprack_id, well_name), ...]}
def _log(self, level, message): """Safe logging that checks if app exists before logging""" if self.app is not None and hasattr(self.app, "logger"): log_method = getattr(self.app.logger, level, None) if log_method: log_method(message) else: print(f"[{level.upper()}] {message}")
[docs] def log_info(self, message): """Log info message safely""" self._log("info", message)
[docs] def log_error(self, message): """Log error message safely""" self._log("error", message)
[docs] def log_debug(self, message): """Log debug message safely""" self._log("debug", message)
[docs] def log_warning(self, message): """Log warning message safely""" self._log("warning", message)
def _initialize_robot(self): """Initialize the connection to the robot and get basic information""" try: # Check if the robot is reachable response = requests.get(url=f"{self.base_url}/health", headers=self.headers) if response.status_code != 200: raise ConnectionError(f"Failed to connect to robot at {self.base_url}") # Get attached pipettes self._update_pipettes() except requests.exceptions.RequestException as e: self.log_error(f"Error connecting to robot: {str(e)}") raise ConnectionError( f"Error connecting to robot at {self.base_url}: {str(e)}" ) def _update_pipettes(self): """Get information about attached pipettes and their settings""" try: if self.app is not None: self.log_info("Fetching pipette information from robot") # Get basic pipette information response = requests.get( url=f"{self.base_url}/instruments", headers=self.headers ) if response.status_code != 200: raise RuntimeError(f"Failed to get pipettes: {response.text}") pipettes_data = response.json()['data'] self.pipette_info = {} # Update min/max transfer values based on attached pipettes self.min_transfer = None self.max_transfer = None for pipette in pipettes_data: mount = pipette['mount'] try: pipette_id = self.loaded_instruments[mount]["pipette_id"] # the id from this run except KeyError: pipette_id = None # Store basic pipette info self.pipette_info[mount] = { "id": pipette_id, "name": pipette["instrumentName"], "model": pipette["instrumentModel"], "serial": pipette["serialNumber"], "mount": mount, "min_volume": pipette.get("data",{}).get("min_volume", None), "max_volume": pipette.get("data",{}).get("max_volume", None), "aspirate_flow_rate": pipette.get("data",{}).get( "aspirateFlowRate", {} ).get("value",150), "dispense_flow_rate": pipette.get("data",{}).get( "dispenseFlowRate", {} ).get("value",150), "channels": pipette.get("data",{}).get("channels", 1), } if pipette_id is None: continue # Update global min/max transfer values min_volume = self.pipette_info[mount]['min_volume'] max_volume = self.pipette_info[mount]['max_volume'] if (self.min_transfer is None) or (self.min_transfer > min_volume): self.min_transfer = min_volume if self.app is not None: self.log_info( f"Setting minimum transfer to {self.min_transfer}" ) if (self.max_transfer is None) or (self.max_transfer < max_volume): self.max_transfer = max_volume if self.app is not None: self.log_info( f"Setting maximum transfer to {self.max_transfer}" ) if self.app is not None: self.log_info(f"Pipette information updated: {self.pipette_info}") except Exception as e: raise RuntimeError(f"Error getting pipettes: {str(e)}")
[docs] def reset_prep_targets(self): self.prep_targets = []
[docs] def add_prep_targets(self, targets, reset=False): if reset: self.reset_prep_targets() self.prep_targets.extend(targets)
[docs] def get_prep_target(self): return self.prep_targets.pop(0)
[docs] def status(self): status = [] if len(self.prep_targets) > 0: status.append(f"Next prep target: {self.prep_targets[0]}") status.append(f"Remaining prep targets: {len(self.prep_targets)}") else: status.append("No prep targets loaded") status.append(self.get_tip_status()) # Get current session status if available if self.session_id: try: response = requests.get( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) if response.status_code == 200: session_data = response.json().get("data", {}) current_state = session_data.get("details", {}).get( "currentState", "unknown" ) status.append(f"Session state: {current_state}") except requests.exceptions.RequestException: status.append("Unable to get session status") # Get pipette information for mount, pipette in self.pipette_info.items(): if pipette: status.append( f"Pipette on {mount} mount: {pipette.get('model', 'unknown')}" ) # Get loaded labware information for slot, (labware_id, name) in self.loaded_labware.items(): status.append(f"Labware in slot {slot}: {name}") return status
[docs] @Driver.quickbar( qb={ "button_text": "Refill Tipracks", "params": { "mount": { "label": "Which Pipet left/right/both", "type": "text", "default": "both", }, }, } ) def reset_tipracks(self, mount="both"): """Reset the available tips for the specified mount(s)""" self.log_info(f"Resetting tipracks for {mount} mount") mounts_to_reset = [] if mount == "both": mounts_to_reset = list(self.loaded_instruments.keys()) else: mounts_to_reset = [mount] for m in mounts_to_reset: if m in self.loaded_instruments: # Reinitialize available tips for this mount self.available_tips[m] = [] for tiprack in self.loaded_instruments[m]["tip_racks"]: for well in TIPRACK_WELLS: self.available_tips[m].append((tiprack, well)) self.log_info(f"Reset {len(self.available_tips[m])} tips for {m} mount") # Reset tip status self.has_tip = False
[docs] def reset(self): self.log_info("Resetting the protocol context") # Delete any active session if self.session_id: try: requests.delete( url=f"{self.base_url}/sessions/{self.session_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting session: {str(e)}") # Delete any uploaded protocol if self.protocol_id: try: requests.delete( url=f"{self.base_url}/protocols/{self.protocol_id}", headers=self.headers, ) except requests.exceptions.RequestException as e: self.log_error(f"Error deleting protocol: {str(e)}") # Reset state variables self.session_id = None self.protocol_id = None self.loaded_labware = {} self.loaded_instruments = {} self.loaded_modules = {} self.has_tip = False self.last_pipette = None # Re-initialize robot connection self._initialize_robot()
[docs] @Driver.quickbar(qb={"button_text": "Home"}) def home(self, **kwargs): """ Home the robot's axes using the dedicated /robot/home endpoint. This endpoint is a direct control endpoint and doesn't require creating a run. It can be used to home all axes at once or specific axes as needed. """ self.log_info("Homing the robot's axes") try: # Call the dedicated home endpoint response = requests.post( url=f"{self.base_url}/robot/home", headers=self.headers, json={ "target": "robot", # Home the entire robot }, ) if response.status_code != 200: self.log_error(f"Failed to home robot: {response.status_code}") self.log_error(f"Response: {response.text}") raise RuntimeError(f"Failed to home robot: {response.text}") self.log_info("Robot homing completed successfully") return True except requests.exceptions.RequestException as e: self.log_error(f"Error during homing: {str(e)}") raise RuntimeError(f"Error during homing: {str(e)}")
[docs] def parse_well(self, loc): """Parse a well location string into slot and well components""" # Default value in case no alphabetic character is found i = 0 for i, loc_part in enumerate(list(loc)): if loc_part.isalpha(): break slot = loc[:i] well = loc[i:] return slot, well
[docs] def get_wells(self, locs): """Convert location strings to well objects with proper labware IDs, and check that wells are valid. Args: locs: Single location string or list of location strings in format "slotwell" (e.g. "1A1") Returns: List of well objects with labwareId and wellName Raises: ValueError: If labware is not found in the specified slot """ self.log_debug(f"Converting locations to well objects: {locs}") wells = [] for loc in listify(locs): slot, well = self.parse_well(loc) # Get labware info from the slot labware_info = self.loaded_labware.get(slot) if not labware_info: raise ValueError(f"No labware found in slot {slot}") if not isinstance(labware_info, tuple) or len(labware_info) < 1: raise ValueError(f"Invalid labware info format in slot {slot}") labware_id = labware_info[0] wells.append({"labwareId": labware_id, "wellName": well}) self.log_debug(f"Created well objects: {wells}") # Check well validity here assert slot in self.loaded_labware.keys(), f"Slot {slot} does not have any loaded labware" assert well in self.loaded_labware[slot][2]['definition']['wells'].keys(), f"Well {well} is not a valid well for slot {slot}, {self.loaded_labware[slot][2]['definition']['metadata']['displayName']}" return wells
def _check_cmd_success(self, response): if response.status_code != 201: self.log_error( f"Failed to execute command : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Failed to execute command: {response.text}" ) if response.json()['data']['status'] == 'failed': self.log_error( f"Command returned error : {response.status_code}" ) self.log_error(f"Response: {response.text}") raise RuntimeError( f"Command returned error: {response.text}" )
[docs] def load_labware(self, name, slot, module=None, **kwargs): """Load labware (containers, tipracks) into the protocol using HTTP API""" self.log_debug(f"Loading labware '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists() try: # Check if there's existing labware in the slot if slot in self.loaded_labware: self.log_info( f"Found existing labware in slot {slot}, moving it off-deck first" ) existing_labware_id = self.loaded_labware[slot][ 0 ] # Get the ID of existing labware # Create command to move existing labware off-deck move_command = { "data": { "commandType": "moveLabware", "params": { "labwareId": existing_labware_id, "newLocation": "offDeck", "strategy": "manualMoveWithoutPause", # Allow user to manually move the labware }, "intent": "setup", } } # Execute the move command move_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=move_command, ) self._check_cmd_success(move_response) # Remove from our tracking del self.loaded_labware[slot] if str(slot) in self.loaded_modules.keys(): # we need to load into a module, not a slot location = {"moduleId": self.loaded_modules[str(slot)][0]} else: location = {"slotName": str(slot)} # Determine namespace and version # For custom labware, the name might include namespace info namespace = "opentrons" # default namespace version = 1 # default version # Check if name includes namespace info (e.g. "custom/my_plate") if "/" in name: namespace, name = name.split("/", 1) # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadLabware", "params": { "location": location, "loadName": name, "namespace": namespace, "version": version, }, "intent": "setup", } } # If this is a module, we need to specify the moduleId if module: command_dict["data"]["params"]["moduleId"] = module # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: labware_id = response_data["data"]["result"]["labwareId"] elif "data" in response_data and "labwareId" in response_data["data"]: labware_id = response_data["data"]["labwareId"] elif "data" in response_data and "id" in response_data["data"]: labware_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "labwareId" in value: labware_id = value["labwareId"] break else: raise KeyError("Could not find labwareId in response") except KeyError as e: self.log_error(f"Error extracting labware ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract labware ID from response: {str(e)}" ) result = response_data["data"]["result"] # Store the labware information self.loaded_labware[slot] = (labware_id, name,result) # If this is a module, store it if module: self.modules[slot] = module self.log_info( f"Successfully loaded labware '{name}' in slot {slot} with ID {labware_id}" ) return labware_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading labware: {str(e)}") raise RuntimeError(f"Error loading labware: {str(e)}")
[docs] def load_module(self, name, slot, **kwargs): """Load modules (heater-shaker, tempdeck) into the protocol using HTTP API""" self.log_debug(f"Loading module '{name}' into slot '{slot}'") # Ensure we have a valid run run_id = self._ensure_run_exists() try: if slot in self.loaded_modules.keys(): # todo: check if same module raise RuntimeError(f"Module already loaded in slot {slot}: {self.loaded_modules['slot']}. Overwrite not supported.") # Prepare the loadLabware command command_dict = { "data": { "commandType": "loadModule", "params": { "location": {"slotName": str(slot)}, "model": name, }, "intent": "setup", } } # Execute the command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the labware ID from the response response_data = response.json() # Debug log the response structure self.log_debug(f"Load labware response: {response_data}") # Handle different response structures that might occur try: if "data" in response_data and "result" in response_data["data"]: module_id = response_data["data"]["result"]["moduleId"] elif "data" in response_data and "moduleId" in response_data["data"]: module_id = response_data["data"]["moduleId"] elif "data" in response_data and "id" in response_data["data"]: module_id = response_data["data"]["id"] else: # Try to find labware ID in any structure self.log_warning(f"Unexpected response structure: {response_data}") for key, value in response_data.items(): if isinstance(value, dict) and "moduleId" in value: module_id = value["moduleId"] break else: raise KeyError("Could not find moduleId in response") except KeyError as e: self.log_error(f"Error extracting module ID from response: {str(e)}") self.log_error(f"Response data: {response_data}") raise RuntimeError( f"Failed to extract module ID from response: {str(e)}" ) # Store the labware information self.loaded_modules[str(slot)] = (module_id, name) self.log_info( f"Successfully loaded module '{name}' in slot {slot} with ID {module_id}" ) return module_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading module: {str(e)}") raise RuntimeError(f"Error loading module: {str(e)}")
[docs] def load_instrument(self, name, mount, tip_rack_slots, **kwargs): """Load pipette and store tiprack information using HTTP API.""" self.log_debug( f"Loading pipette '{name}' on '{mount}' mount with tip_racks in slots {tip_rack_slots}" ) # Ensure we have a valid run run_id = self._ensure_run_exists() try: # First, load the pipette using the HTTP API command_dict = { "data": { "commandType": "loadPipette", "params": { "pipetteName": name, "mount": mount, "tip_racks": [self.loaded_labware[str(slot)][0] for slot in tip_rack_slots], }, "intent": "setup", } } # Execute the loadPipette command response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", headers=self.headers, params={"waitUntilComplete": True}, json=command_dict, ) self._check_cmd_success(response) # Get the pipette ID from the response response_data = response.json() print(f'loadPipette response: {response_data}') pipette_id = response_data["data"]["result"]["pipetteId"] # Make sure we have the latest pipette information self._update_pipettes() self.pipette_info[mount][ "id" ] = pipette_id # patch the correct pipette id to the pipette_info dict # Get the tip rack IDs - note that loaded_labware now stores tuples of (id, name) tip_racks = [] for slot in listify(tip_rack_slots): labware_info = self.loaded_labware.get(slot) if ( labware_info and isinstance(labware_info, tuple) and len(labware_info) >= 1 ): tip_racks.append(labware_info[0]) if not tip_racks: self.log_warning(f"No valid tip racks found in slots {tip_rack_slots}") # Store the instrument information including the pipette ID self.loaded_instruments[mount] = { "name": name, "pipette_id": pipette_id, "tip_racks": tip_racks, } # Initialize available tips for this mount self.available_tips[mount] = [] for tiprack in tip_racks: for well in TIPRACK_WELLS: self.available_tips[mount].append((tiprack, well)) # Verify that there's actually a pipette in this mount if mount not in self.pipette_info or self.pipette_info[mount] is None: self.log_warning( f"No physical pipette detected in {mount} mount, but pipette information stored" ) # Update min/max values for largest and smallest pipettes self._update_pipette_ranges() self.log_info( f"Successfully loaded pipette '{name}' on {mount} mount with ID {pipette_id}" ) return pipette_id except (requests.exceptions.RequestException, KeyError) as e: self.log_error(f"Error loading pipette: {str(e)}") raise RuntimeError(f"Error loading pipette: {str(e)}")
def _update_pipette_ranges(self): """Update the min/max values for largest and smallest pipettes""" self.min_largest_pipette = None self.max_smallest_pipette = None # Get all available pipettes with their volumes available_pipettes = { mount: info for mount, info in self.pipette_info.items() if info is not None } if available_pipettes: # Get min and max volumes for each pipette min_vols = { mount: info.get("min_volume", float("inf")) for mount, info in available_pipettes.items() } max_vols = { mount: info.get("max_volume", 0) for mount, info in available_pipettes.items() } # Find the smallest and largest pipettes if max_vols: # Use list and regular max/min functions with a key function mounts = list(max_vols.keys()) if mounts: largest_pipette_mount = max( mounts, key=lambda m: max_vols.get(m, 0) ) smallest_pipette_mount = min( mounts, key=lambda m: max_vols.get(m, float("inf")) ) # Set global min/max values if min_vols and largest_pipette_mount in min_vols: self.min_largest_pipette = min_vols[largest_pipette_mount] self.log_info( f"Setting min_largest_pipette to {self.min_largest_pipette}" ) if max_vols and smallest_pipette_mount in max_vols: self.max_smallest_pipette = max_vols[smallest_pipette_mount] self.log_info( f"Setting max_smallest_pipette to {self.max_smallest_pipette}" )
[docs] def mix(self, volume, location, repetitions=1, **kwargs): self.log_info(f"Mixing {volume}uL {repetitions} times at {location}") # Get pipette based on volume pipette = self.get_pipette(volume) pipette_mount = pipette["mount"] # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get well location wells = self.get_wells(location) if not wells: raise ValueError("Invalid location") well = wells[0] # Pick up tip if needed if not self.has_tip: self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack }, ) self.has_tip = True # Execute mix by performing repetitions of aspirate/dispense for _ in range(repetitions): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": volume, "labwareId": well["labwareId"], "wellName": well["wellName"], "wellLocation": { "origin": "bottom", "offset": {"x": 0, "y": 0, "z": 0}, }, }, )
def _split_up_transfers(self, vol): """Split up transfer volumes based on pipette constraints""" transfers = [] if self.max_transfer is None or vol <= 0: return transfers while sum(transfers) < vol: transfer = min(self.max_transfer, vol - sum(transfers)) # Handle case where remaining volume is less than minimum transfer if ( transfer < (self.min_transfer or 0) and len(transfers) > 0 and transfers[-1] >= (2 * (self.min_transfer or 0)) ): transfers[-1] -= (self.min_transfer or 0) - transfer transfer = self.min_transfer or 0 # Handle "valley of death" case - when transfer is between pipette ranges if ( self.min_largest_pipette is not None and self.max_smallest_pipette is not None and transfer < self.min_largest_pipette and transfer > self.max_smallest_pipette ): transfer = ( self.max_smallest_pipette ) # Use smaller pipette at max capacity transfers.append(transfer) # Exit condition - we've reached the target volume if sum(transfers) >= vol: break return transfers def _slot_by_labware_uuid(self,target_uuid): for slot, (uuid,name) in self.loaded_labware.items(): if uuid == target_uuid: return slot return None
[docs] @Driver.quickbar( qb={ "button_text": "Transfer", "params": { "source": {"label": "Source Well", "type": "text", "default": "1A1"}, "dest": {"label": "Dest Well", "type": "text", "default": "1A1"}, "volume": {"label": "Volume (uL)", "type": "float", "default": 300}, }, } ) def transfer( self, source, dest, volume, mix_before=None, mix_after=None, air_gap=0, aspirate_rate=None, dispense_rate=None, mix_aspirate_rate=None, mix_dispense_rate=None, blow_out=False, post_aspirate_delay=0.0, aspirate_equilibration_delay=0.0, post_dispense_delay=0.0, drop_tip=True, force_new_tip=False, to_top=True, to_center=False, to_top_z_offset=0, fast_mixing=False, **kwargs, ): """Transfer fluid from one location to another using atomic HTTP API commands""" self.log_info(f"Transferring {volume}uL from {source} to {dest}") # Set flow rates if specified if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate) if dispense_rate is not None: self.set_dispense_rate(dispense_rate) # Get pipette based on volume pipette = self.get_pipette(volume) pipette_mount = pipette["mount"] # Get the mount from the pipette object # Get the pipette ID pipette_id = None for mount, data in self.pipette_info.items(): if mount == pipette_mount and data: pipette_id = data.get("id") break if not pipette_id: raise ValueError(f"Could not find ID for pipette on {pipette_mount} mount") # Get source and destination wells source_wells = self.get_wells(source) if len(source_wells) > 1: raise ValueError("Transfer only accepts one source well at a time!") source_well = source_wells[0] dest_wells = self.get_wells(dest) if len(dest_wells) > 1: raise ValueError("Transfer only accepts one dest well at a time!") dest_well = dest_wells[0] # Handle special cases for well positions source_position = "bottom" # Default position dest_position = "bottom" # Default position if to_top and to_center: raise ValueError("Cannot dispense to_top and to_center simultaneously") elif to_top: dest_position = "top" elif to_center: dest_position = "center" # Split transfers if needed transfers = self._split_up_transfers(volume) for sub_volume in transfers: # 1. Always pick up a new tip for each transfer self._execute_atomic_command( "pickUpTip", { "pipetteId": pipette_id, "pipetteMount": pipette_mount, "wellLocation": None, # Use next available tip in rack, will be updated in _execute_atomic_command }, ) # 1a. If destination is on a heater-shaker, stop the shaking and latch the latch pre-flight was_shaking = False dest_well_slot = self._slot_by_labware_uuid(dest_well['labwareId']) source_well_slot = self._slot_by_labware_uuid(source_well['labwareId']) heater_shaker_slots = [slot for (slot,(uuid,name)) in self.loaded_modules.items() if "heaterShaker" in name] if dest_well_slot in heater_shaker_slots or source_well_slot in heater_shaker_slots: # latch heater-shaker # this is contextual, maybe - seems to not cause trouble to run without conditional #if 'closed' not in self.get_shake_latch_status(): self.latch_shaker() # store current shake rpm and stop shake if self.get_shake_rpm()[0] != 'idle': shake_rpm = self.get_shake_rpm()[2] was_shaking = True self.stop_shake() # 2. Mix before if specified if mix_before is not None: n_mixes, mix_volume = mix_before # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix before transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 3. Aspirate self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": sub_volume, "labwareId": source_well["labwareId"], "wellName": source_well["wellName"], "wellLocation": { "origin": source_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) # 4. Post-aspirate delay if post_aspirate_delay > 0: self._execute_atomic_command("delay", {"seconds": post_aspirate_delay}) # 5. Aspirate equilibration delay if aspirate_equilibration_delay > 0: self._execute_atomic_command( "delay", {"seconds": aspirate_equilibration_delay} ) # 6. Air gap if specified if air_gap > 0: self._execute_atomic_command( "airGap", {"pipetteId": pipette_id, "volume": air_gap} ) # 7. Dispense offset = { "x": 0, "y": 0, "z": ( to_top_z_offset if dest_position == "top" and to_top_z_offset != 0 else 0 ), } self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": sub_volume + air_gap, # Include air gap in dispense volume "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # 8. Post-dispense delay if post_dispense_delay > 0: self._execute_atomic_command("delay", {"seconds": post_dispense_delay}) # 9. Mix after if specified if mix_after is not None: n_mixes, mix_volume = mix_after # Set mix aspirate rate if specified if mix_aspirate_rate is not None: self.set_aspirate_rate(mix_aspirate_rate, pipette_mount) # Set mix dispense rate if specified if mix_dispense_rate is not None: self.set_dispense_rate(mix_dispense_rate, pipette_mount) # Mix after transfer - implement by executing multiple aspirate/dispense for _ in range(n_mixes): self._execute_atomic_command( "aspirate", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": { "origin": dest_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['aspirate_flow_rate'], }, ) self._execute_atomic_command( "dispense", { "pipetteId": pipette_id, "volume": mix_volume, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": { "origin": dest_position, "offset": {"x": 0, "y": 0, "z": 0}, }, "flowRate": self.pipette_info[pipette_mount]['dispense_flow_rate'], }, ) # Restore original rates if mix_aspirate_rate is not None or mix_dispense_rate is not None: # Reset rates to default or specified rates if aspirate_rate is not None: self.set_aspirate_rate(aspirate_rate, pipette_mount) if dispense_rate is not None: self.set_dispense_rate(dispense_rate, pipette_mount) # 10. Blow out if specified if blow_out: self._execute_atomic_command( "blowOut", { "pipetteId": pipette_id, "labwareId": dest_well["labwareId"], "wellName": dest_well["wellName"], "wellLocation": {"origin": dest_position, "offset": offset}, }, ) if was_shaking: self.set_shake(shake_rpm) # back to running :) # 11. Drop tip if specified if drop_tip: # see https://github.com/Opentrons/opentrons/issues/14590 for the absolute bullshit that led to this. # in it: Opentrons incompetence self._execute_atomic_command("moveToAddressableAreaForDropTip", { "pipetteId": pipette_id, "addressableAreaName": "fixedTrash", "offset": { "x": 0, "y": 0, "z": 10 }, "alternateDropLocation": False}) self._execute_atomic_command("dropTipInPlace", {"pipetteId": pipette_id, }) # Update last pipette self.last_pipette = pipette
def _execute_atomic_command( self, command_type, params=None, wait_until_complete=True, timeout=None ): """Execute a single atomic command using the HTTP API""" if params is None: params = {} # Track tip usage for pick up and drop commands if command_type == "pickUpTip": mount = params.get("pipetteMount") if mount and mount in self.available_tips and self.available_tips[mount]: tiprack_id, well = self.get_tip(mount) self.log_debug( f"Using tip from {tiprack_id} well {well} for {mount} mount" ) # Update the params to specify the exact tip location params["labwareId"] = tiprack_id params["wellName"] = well params["wellLocation"] = { "origin": "top", "offset": {"x": 0, "y": 0, "z": 0}, } del params["pipetteMount"] else: raise RuntimeError(f"No tips available for {mount} mount") self.log_debug( f"Executing atomic command: {command_type} with params: {params}" ) # Ensure we have a valid run run_id = self._ensure_run_exists() # Build the query parameters query_params = {"waitUntilComplete": wait_until_complete} if timeout is not None: query_params["timeout"] = timeout try: # Send the command command_response = requests.post( url=f"{self.base_url}/runs/{run_id}/commands", params=query_params, headers=self.headers, json={ "data": { "commandType": command_type, "params": params, "intent": "setup", } }, ) self._check_cmd_success(command_response) command_data = command_response.json()["data"] command_id = command_data["id"] self.log_debug( f"Command {command_id} executed with status: {command_data['status']}" ) # If wait_until_complete is True, the command has already completed if wait_until_complete: if command_data["status"] == "succeeded": return True elif command_data["status"] in ["failed", "error"]: error_info = command_data.get("error", "Unknown error") self.log_error(f"Command failed: {error_info}") raise RuntimeError(f"Command failed: {error_info}") # If we're not waiting or the command is still running, return the command ID for tracking return command_id except requests.exceptions.RequestException as e: self.log_error(f"Error executing command: {str(e)}") raise RuntimeError(f"Error executing command: {str(e)}")
[docs] def set_aspirate_rate(self, rate=150, pipette=None): """Set aspirate rate in uL/s. Default is 150 uL/s""" self.log_info(f"Setting aspirate rate to {rate} uL/s") # If no specific pipette is provided, update all pipettes if pipette == 'left' or pipette is None: self.pipette_info['left']['aspirate_flow_rate'] = rate if pipette == 'right' or pipette is None: self.pipette_info['right']['aspirate_flow_rate'] = rate
[docs] def set_dispense_rate(self, rate=300, pipette=None): """Set dispense rate in uL/s. Default is 300 uL/s""" self.log_info(f"Setting dispense rate to {rate} uL/s") # If no specific pipette is provided, update all pipettes if pipette == 'left' or pipette is None: self.pipette_info['left']['dispense_flow_rate'] = rate if pipette == 'right' or pipette is None: self.pipette_info['right']['dispense_flow_rate'] = rate
[docs] def set_gantry_speed(self, speed=400): """Set movement speed of gantry. Default is 400 mm/s""" self.log_info(f"Setting gantry speed to {speed} mm/s") # In HTTP API, this would require updating robot settings # This is a placeholder - actual implementation would depend on HTTP API capabilities self.log_warning( "Setting gantry speed is not fully implemented in HTTP API mode" )
[docs] def get_pipette(self, volume, method="min_transfers"): self.log_debug(f"Looking for a pipette for volume {volume}") # Make sure we have the latest pipette information self._update_pipettes() pipettes = [] for mount, pipette_data in self.pipette_info.items(): if not pipette_data: continue min_volume = pipette_data.get("min_volume", 1) max_volume = pipette_data.get("max_volume", 300) if volume >= min_volume: pipettes.append( { "mount": mount, # Use mount as the identifier "min_volume": min_volume, "max_volume": max_volume, "name": pipette_data.get("name"), "model": pipette_data.get("model"), "channels": pipette_data.get("channels", 1), "pipette_id": pipette_data.get("id"), } ) if not pipettes: raise ValueError("No suitable pipettes found!\n") # Calculate transfers and uncertainties for pipette in pipettes: max_volume = pipette["max_volume"] ntransfers = ceil(volume / max_volume) vol_per_transfer = volume / ntransfers pipette["ntransfers"] = ntransfers # Calculate uncertainty (simplified from original) pipette["uncertainty"] = ( ntransfers * 0.1 ) # Simplified uncertainty calculation if self.data is not None: self.data["transfer_method"] = method self.data["pipette_options"] = str(pipettes) # Choose pipette based on method if method == "uncertainty": pipette = min(pipettes, key=lambda x: x["uncertainty"]) elif method == "min_transfers": min_xfers = min(pipettes, key=lambda x: x["ntransfers"])["ntransfers"] acceptable_pipettes = [p for p in pipettes if p["ntransfers"] == min_xfers] pipette = min(acceptable_pipettes, key=lambda x: x["max_volume"]) else: raise ValueError(f"Pipette selection method {method} was not recognized.") self.log_debug(f"Chosen pipette: {pipette}") if self.data is not None: self.data["chosen_pipette"] = str(pipette) return pipette
[docs] def get_aspirate_rate(self, pipette=None): """Get current aspirate rate for a pipette""" if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in self.pipette_info.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in self.pipette_info.items(): if mount == pipette and pipette_data: return pipette_data.get("aspirate_flow_rate", 150) except requests.exceptions.RequestException: pass return 150 # Default value
[docs] def get_dispense_rate(self, pipette=None): """Get current dispense rate for a pipette""" if pipette is None: # Return the rate of the first pipette found for mount, pipette_data in self.pipette_info.items(): if pipette_data: pipette = mount break if pipette is None: return None try: for mount, pipette_data in self.pipette_info.items(): if mount == pipette and pipette_data: return pipette_data.get("dispense_flow_rate", 300) except requests.exceptions.RequestException: pass return 300 # Default value
# HTTP API communication with heater-shaker module
[docs] def set_shake(self, rpm, module_id = None): self.log_info(f"Setting heater-shaker speed to {rpm} RPM") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setAndWaitForShakeSpeed", params= { "moduleId": module_id, "rpm": rpm, }, )
[docs] def stop_shake(self, module_id = None): self.log_info("Stopping heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateShaker", params= { "moduleId": module_id, }, )
[docs] def set_shaker_temp(self, temp, module_id = None): self.log_info(f"Setting heater-shaker temperature to {temp}ยฐC") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/setTargetTemperature", params= { "moduleId": module_id, "celsius": temp, }, )
[docs] def stop_shaker_heat(self, module_id = None): self.log_info(f"Deactivating heater-shaker heating") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/deactivateHeater", params= { "moduleId": module_id, }, )
[docs] def unlatch_shaker(self, module_id = None): self.log_info("Unlatching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/openLabwareLatch", params= { "moduleId": module_id, }, )
[docs] def latch_shaker(self, module_id = None): self.log_info("Latching heater-shaker") if module_id is None: module_id = self._find_module_by_type("heaterShaker") self._execute_atomic_command("heaterShaker/closeLabwareLatch", params= { "moduleId": module_id, }, )
def _find_module_by_type(self,partial_name): module_id = None for module in self.loaded_modules.values(): if partial_name in module[1]: module_id = module[0] return module_id
[docs] def get_shaker_temp(self): self.log_info("Getting heater-shaker temperature") # For get operations, we still need to use the modules API directly try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" print(heater_shaker_module) current_temp = heater_shaker_module.get("data", {}).get("currentTemp") target_temp = heater_shaker_module.get("data", {}).get("targetTemp") self.log_info( f"Heater-shaker temperature - Current: {current_temp}ยฐC, Target: {target_temp}ยฐC" ) return (current_temp,target_temp) except Exception as e: self.log_error(f"Error getting temperature: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_rpm(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" current_rpm = heater_shaker_module.get("data", {}).get("currentSpeed") target_rpm = heater_shaker_module.get("data", {}).get("targetSpeed") status = heater_shaker_module.get("data", {}).get("speedStatus") return (status,current_rpm,target_rpm) except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
[docs] def get_shake_latch_status(self): # For get operations, we just use the modules API try: # Get modules to find the heater-shaker module modules_response = requests.get( url=f"{self.base_url}/modules", headers=self.headers ) if modules_response.status_code != 200: self.log_error(f"Failed to get modules: {modules_response.status_code}") return f"Error getting modules: {modules_response.status_code}" modules = modules_response.json().get("modules", []) heater_shaker_module = next( (m for m in modules if "heaterShaker" in m.get("moduleModel")), None, ) if not heater_shaker_module: self.log_error("No heater-shaker module found") return "No heater-shaker module found" status = heater_shaker_module.get("data", {}).get("labwareLatchStatus") return status except Exception as e: self.log_error(f"Error getting RPM: {str(e)}") return f"Error: {str(e)}"
def _create_run(self): """Create a run on the robot for executing commands""" self.log_info("Creating a new run for commands") try: # Create a run import datetime run_response = requests.post( url=f"{self.base_url}/runs", headers=self.headers, ) if run_response.status_code != 201: self.log_error(f"Failed to create run: {run_response.status_code}") self.log_error(f"Response: {run_response.text}") raise RuntimeError(f"Failed to create run: {run_response.text}") self.run_id = run_response.json()["data"]["id"] self.log_debug(f"Created run: {self.run_id}") return self.run_id except requests.exceptions.RequestException as e: self.log_error(f"Error creating run: {str(e)}") raise RuntimeError(f"Error creating run: {str(e)}") def _ensure_run_exists(self): """Ensure a run exists for executing commands, creating one if needed""" if not hasattr(self, "run_id") or not self.run_id: return self._create_run() # Check if the run is still valid try: response = requests.get( url=f"{self.base_url}/runs/{self.run_id}", headers=self.headers ) if response.status_code != 200: # Run doesn't exist, create a new one return self._create_run() # Check run state run_data = response.json()["data"] current_state = run_data.get("status") if current_state in ["failed", "error", "succeeded", "stopped"]: # Run is in a terminal state, create a new one return self._create_run() return self.run_id except requests.exceptions.RequestException: # Error checking run, create a new one return self._create_run()
[docs] def get_tip(self, mount): return self.available_tips[mount].pop(0)
[docs] def get_tip_status(self, mount=None): """Get the current tip usage status""" if mount: if mount not in self.available_tips: return f"No tipracks loaded for {mount} mount" total_tips = len(TIPRACK_WELLS) * len( self.loaded_instruments[mount]["tip_racks"] ) available_tips = len(self.available_tips[mount]) return f"{available_tips}/{total_tips} tips available on {mount} mount" # Return status for all mounts status = [] for m in self.available_tips: status.append(self.get_tip_status(m)) return "\n".join(status)
[docs] @Driver.unqueued(render_hint='html') def visualize_deck(self,**kwargs): """ Generate HTML visualization of OT-2 deck layout with detailed well layouts. Returns: str: HTML string for deck visualization """ # OT-2 deck slot layout (11 slots + trash) slot_layout = [ [10, 11, "Trash"], [7, 8, 9], [4, 5, 6], [1, 2, 3] ] def generate_well_layout_svg(labware_data, width=120, height=90, labware_uuid=None): """Generate SVG representation of well layout for labware.""" if not labware_data: return "" definition = labware_data.get('definition', {}) wells = definition.get('wells', {}) ordering = definition.get('ordering', []) dimensions = definition.get('dimensions', {}) if not wells: return "" # Calculate scaling factors based on labware dimensions labware_width = dimensions.get('xDimension', 127.76) labware_height = dimensions.get('yDimension', 85.48) scale_x = width / labware_width scale_y = height / labware_height svg_elements = [] # Check if this is a tiprack and get available tips labware_type = definition.get('metadata', {}).get('displayCategory', 'default') is_tiprack = labware_type == 'tipRack' or 'tiprack' in definition.get('parameters', {}).get('loadName', '').lower() # Get all available tips for this labware if it's a tiprack available_tips_for_labware = set() if is_tiprack and labware_uuid and hasattr(self, 'available_tips'): for mount_tips in self.available_tips.values(): for tip_labware_uuid, well_name in mount_tips: if tip_labware_uuid == labware_uuid: available_tips_for_labware.add(well_name) # Group wells by type for coloring well_colors = { 'tipRack_available': '#4caf50', # Green for available tips 'tipRack_used': '#f44336', # Red for used tips 'tipRack_default': '#ffa726', # Orange fallback 'wellPlate': '#42a5f5', 'reservoir': '#66bb6a', 'default': '#90a4ae' } for well_name, well_info in wells.items(): x = well_info.get('x', 0) * scale_x y = (labware_height - well_info.get('y', 0)) * scale_y # Flip Y coordinate shape = well_info.get('shape', 'circular') # Determine color based on tip availability for tipracks if is_tiprack and labware_uuid: if well_name in available_tips_for_labware: well_color = well_colors['tipRack_available'] # Available tip tip_status = "Available" else: well_color = well_colors['tipRack_used'] # Used tip tip_status = "Used" tooltip = f"{well_name} - {tip_status}" else: well_color = well_colors.get(labware_type, well_colors['default']) tooltip = well_name if shape == 'circular': diameter = well_info.get('diameter', 5) * min(scale_x, scale_y) radius = diameter / 2 svg_elements.append( f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{radius:.1f}" ' f'fill="{well_color}" stroke="#333" stroke-width="0.5" opacity="0.8">' f'<title>{tooltip}</title></circle>' ) elif shape == 'rectangular': well_width = well_info.get('xDimension', 8) * scale_x well_height = well_info.get('yDimension', 8) * scale_y rect_x = x - well_width/2 rect_y = y - well_height/2 svg_elements.append( f'<rect x="{rect_x:.1f}" y="{rect_y:.1f}" ' f'width="{well_width:.1f}" height="{well_height:.1f}" ' f'fill="{well_color}" stroke="#333" stroke-width="0.5" opacity="0.8">' f'<title>{tooltip}</title></rect>' ) if svg_elements: return f''' <svg width="{width}" height="{height}" style="margin: 5px 0;"> {"".join(svg_elements)} </svg> ''' return "" def get_well_count_summary(labware_data): """Get a summary of wells for display.""" if not labware_data: return "" definition = labware_data.get('definition', {}) wells = definition.get('wells', {}) if not wells: return "" well_count = len(wells) # Try to determine format ordering = definition.get('ordering', []) if ordering: rows = len(ordering[0]) if ordering[0] else 0 cols = len(ordering) if rows > 0 and cols > 0 and rows * cols == well_count: return f"{rows}ร—{cols} ({well_count} wells)" return f"{well_count} wells" # Helper function to get labware info for a slot def get_slot_content(slot_num): slot_str = str(slot_num) content = { 'type': 'empty', 'name': 'Empty', 'details': '', 'color': '#f0f0f0', 'svg': '', 'well_info': '' } # Check if slot has labware labware_on_slot = None if slot_str in self.loaded_labware: labware_id, labware_type, labware_data = self.loaded_labware[slot_str] labware_on_slot = labware_data definition = labware_data.get('definition', {}) metadata = definition.get('metadata', {}) content.update({ 'type': 'labware', 'name': metadata.get('displayName', labware_type), 'details': f"Type: {labware_type}<br>ID: {labware_id}", 'color': '#e3f2fd', 'svg': generate_well_layout_svg(labware_data, labware_uuid=labware_id), 'well_info': get_well_count_summary(labware_data) }) # Special coloring for tip racks if 'tiprack' in labware_type.lower() or metadata.get('displayCategory') == 'tipRack': content['color'] = '#fff3e0' # Check if slot has a module if slot_str in self.loaded_modules: module_id, module_type = self.loaded_modules[slot_str] module_name = module_type.replace('ModuleV1', ' Module V1').replace('V1', ' V1') if labware_on_slot: # Module with labware labware_id, labware_type, labware_data = self.loaded_labware[slot_str] definition = labware_data.get('definition', {}) metadata = definition.get('metadata', {}) labware_name = metadata.get('displayName', labware_type) content.update({ 'type': 'module_with_labware', 'name': f"{module_name}", 'details': f"Module: {module_type}<br>ID: {module_id}<br><br>Labware: {labware_name}<br>ID: {labware_id}", 'color': '#e8f5e8', 'svg': generate_well_layout_svg(labware_data, labware_uuid=labware_id), 'well_info': get_well_count_summary(labware_data) }) else: # Module only content.update({ 'type': 'module', 'name': module_name, 'details': f"Module: {module_type}<br>ID: {module_id}", 'color': '#f3e5f5' }) return content # Get pipette information def get_pipette_info(): pipettes = [] for mount, pipette_data in self.loaded_instruments.items(): pipette_name = pipette_data.get('name', 'Unknown Pipette') pipette_id = pipette_data.get('pipette_id', 'Unknown ID') tip_racks = pipette_data.get('tip_racks', []) # Find which slots contain the tip racks tip_rack_slots = [] for slot, (labware_id, _, _) in self.loaded_labware.items(): if labware_id in tip_racks: tip_rack_slots.append(slot) pipettes.append({ 'mount': mount.title(), 'name': pipette_name.replace('_', ' ').title(), 'id': pipette_id, 'tip_racks': tip_rack_slots }) return pipettes # Generate HTML html = """ <!DOCTYPE html> <html> <head> <style> body { font-family: Arial, sans-serif; margin: 20px; background-color: #fafafa; } .deck-container { max-width: 900px; margin: 0 auto; background: white; border-radius: 10px; padding: 20px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } .deck-title { text-align: center; color: #333; margin-bottom: 20px; font-size: 24px; font-weight: bold; } .deck-grid { display: grid; grid-template-columns: repeat(3, 1fr); gap: 15px; margin-bottom: 20px; } .deck-slot { border: 2px solid #ddd; border-radius: 8px; padding: 12px; text-align: center; display: flex; flex-direction: column; justify-content: flex-start; align-items: center; position: relative; transition: transform 0.2s; min-height: 180px; } .deck-slot:hover { transform: translateY(-2px); box-shadow: 0 4px 12px rgba(0,0,0,0.15); } .slot-number { position: absolute; top: 5px; left: 8px; font-weight: bold; font-size: 14px; color: #666; background: rgba(255,255,255,0.8); padding: 2px 4px; border-radius: 3px; } .slot-content { font-size: 13px; font-weight: bold; margin: 15px 0 8px 0; text-align: center; line-height: 1.2; } .well-layout { flex-grow: 1; display: flex; justify-content: center; align-items: center; margin: 5px 0; } .well-info { font-size: 10px; color: #666; margin: 5px 0; font-style: italic; } .slot-details { font-size: 9px; color: #666; text-align: center; margin-top: auto; padding-top: 5px; border-top: 1px solid rgba(0,0,0,0.1); width: 100%; } .pipettes-section { margin-top: 20px; padding: 15px; background: #f8f9fa; border-radius: 8px; } .pipettes-title { font-size: 18px; font-weight: bold; margin-bottom: 10px; color: #333; } .pipette-item { background: white; padding: 10px; margin: 5px 0; border-radius: 5px; border-left: 4px solid #2196f3; } .trash-slot { background: #ffebee !important; border-color: #e57373 !important; } .legend { display: flex; justify-content: center; gap: 20px; margin: 15px 0; font-size: 12px; } .legend-item { display: flex; align-items: center; gap: 5px; } .legend-color { width: 12px; height: 12px; border-radius: 2px; } svg circle:hover, svg rect:hover { stroke-width: 2 !important; stroke: #ff5722 !important; } </style> </head> <body> <div class="deck-container"> <div class="deck-title">๐Ÿงช Opentrons OT-2 Deck Layout</div> <div class="legend"> <div class="legend-item"> <div class="legend-color" style="background: #4caf50;"></div> <span>Available Tips</span> </div> <div class="legend-item"> <div class="legend-color" style="background: #f44336;"></div> <span>Used Tips</span> </div> <div class="legend-item"> <div class="legend-color" style="background: #42a5f5;"></div> <span>Plate Wells</span> </div> <div class="legend-item"> <div class="legend-color" style="background: #66bb6a;"></div> <span>Reservoir Wells</span> </div> </div> <div class="deck-grid"> """ # Generate deck slots for row in slot_layout: for slot in row: if slot == "Trash": html += f""" <div class="deck-slot trash-slot"> <div class="slot-number">Trash</div> <div class="slot-content">๐Ÿ—‘๏ธ Waste</div> <div class="well-layout"> <svg width="60" height="45"> <rect x="10" y="10" width="40" height="25" fill="#f44336" stroke="#333" stroke-width="1" rx="3"/> <text x="30" y="25" text-anchor="middle" font-size="8" fill="white">TRASH</text> </svg> </div> <div class="slot-details">Fixed trash bin</div> </div> """ else: content = get_slot_content(slot) svg_content = content['svg'] if content['svg'] else '<div style="height: 90px; display: flex; align-items: center; justify-content: center; color: #ccc; font-style: italic;">No wells</div>' well_info_display = f'<div class="well-info">{content["well_info"]}</div>' if content['well_info'] else '' html += f""" <div class="deck-slot" style="background-color: {content['color']};"> <div class="slot-number">{slot}</div> <div class="slot-content">{content['name']}</div> <div class="well-layout">{svg_content}</div> {well_info_display} <div class="slot-details">{content['details']}</div> </div> """ html += """ </div> """ # Add pipettes section pipettes = get_pipette_info() if pipettes: html += """ <div class="pipettes-section"> <div class="pipettes-title">๐Ÿ”ง Loaded Pipettes</div> """ for pipette in pipettes: tip_rack_text = f"Tip racks in slots: {', '.join(pipette['tip_racks'])}" if pipette['tip_racks'] else "No tip racks assigned" html += f""" <div class="pipette-item"> <strong>{pipette['mount']} Mount:</strong> {pipette['name']}<br> <small>ID: {pipette['id']}</small><br> <small>{tip_rack_text}</small> </div> """ html += """ </div> """ html += """ <div style="margin-top: 15px; padding: 10px; background: #e3f2fd; border-radius: 5px; font-size: 11px; color: #1565c0;"> ๐Ÿ’ก <strong>Tip:</strong> Hover over wells to see names. For tipracks: ๐ŸŸข = Available tips, ๐Ÿ”ด = Used tips </div> </div> </body> </html> """ return html
# Alternative compact version with well layouts
[docs] @Driver.unqueued(render_hint='html') def visualize_deck_simple(self,**kwargs): """ Generate a simple HTML snippet for OT-2 deck visualization with well layouts. Returns: str: HTML snippet for deck visualization """ slot_layout = [ [10, 11, "Trash"], [7, 8, 9], [4, 5, 6], [1, 2, 3] ] def generate_mini_well_svg(labware_data, size=50, labware_uuid=None): """Generate compact SVG for well layout.""" if not labware_data: return "" definition = labware_data.get('definition', {}) wells = definition.get('wells', {}) if not wells: return "" # Check if this is a tiprack and get available tips labware_type = definition.get('metadata', {}).get('displayCategory', 'default') is_tiprack = labware_type == 'tipRack' or 'tiprack' in definition.get('parameters', {}).get('loadName', '').lower() # Get all available tips for this labware if it's a tiprack available_tips_for_labware = set() if is_tiprack and labware_uuid and hasattr(self, 'available_tips'): for mount_tips in self.available_tips.values(): for tip_labware_uuid, well_name in mount_tips: if tip_labware_uuid == labware_uuid: available_tips_for_labware.add(well_name) # Simple grid representation for compact view well_count = len(wells) if well_count <= 8: # Single row cols = well_count rows = 1 elif well_count <= 24: # 2-4 rows cols = 6 rows = (well_count + 5) // 6 elif well_count <= 96: # Standard 96-well format cols = 12 rows = 8 else: cols = 12 rows = (well_count + 11) // 12 cell_width = size / max(cols, 6) cell_height = size / max(rows, 4) # Color based on labware type and tip availability colors = { 'tipRack_available': '#4caf50', 'tipRack_used': '#f44336', 'tipRack': '#ffa726', 'wellPlate': '#42a5f5', 'reservoir': '#66bb6a' } svg_elements = [] well_names = list(wells.keys()) for i, well_name in enumerate(well_names[:min(well_count, rows * cols)]): row = i % rows col = i // rows x = col * cell_width + cell_width/4 y = row * cell_height + cell_height/4 # Determine color for tipracks based on availability if is_tiprack and labware_uuid: if well_name in available_tips_for_labware: color = colors['tipRack_available'] status = "Available" else: color = colors['tipRack_used'] status = "Used" tooltip = f"{well_name} - {status}" else: color = colors.get(labware_type, '#90a4ae') tooltip = well_name svg_elements.append( f'<circle cx="{x:.1f}" cy="{y:.1f}" r="{min(cell_width, cell_height)/3:.1f}" ' f'fill="{color}" stroke="#333" stroke-width="0.3">' f'<title>{tooltip}</title></circle>' ) return f'<svg width="{size}" height="{size}" style="border: 1px solid #ddd; border-radius: 3px;">{"".join(svg_elements)}</svg>' def get_slot_info(slot_num): if slot_num == "Trash": return {"name": "Trash", "type": "trash", "color": "#ffcdd2", "svg": ""} slot_str = str(slot_num) info = {"name": "Empty", "type": "empty", "color": "#f5f5f5", "svg": ""} # Check for labware if slot_str in self.loaded_labware: labware_id, labware_type, labware_data = self.loaded_labware[slot_str] definition = labware_data.get('definition', {}) display_name = definition.get('metadata', {}).get('displayName', labware_type) info.update({ "name": display_name[:20] + ("..." if len(display_name) > 20 else ""), "type": "labware", "color": "#bbdefb", "svg": generate_mini_well_svg(labware_data, labware_uuid=labware_id) }) # Check for modules if slot_str in self.loaded_modules: module_id, module_type = self.loaded_modules[slot_str] module_name = module_type.replace('ModuleV1', '').replace('Module', ' Mod') if info["type"] == "labware": info["name"] = f"{module_name}<br><small>{info['name']}</small>" info["color"] = "#c8e6c9" else: info.update({ "name": module_name, "type": "module", "color": "#e1bee7" }) return info html = '<div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 10px; max-width: 650px; font-family: Arial, sans-serif;">' for row in slot_layout: for slot in row: info = get_slot_info(slot) slot_label = "T" if slot == "Trash" else str(slot) svg_display = f'<div style="margin: 5px 0;">{info["svg"]}</div>' if info["svg"] else "" html += f""" <div style=" background: {info['color']}; border: 1px solid #ccc; border-radius: 6px; padding: 8px; text-align: center; min-height: 100px; display: flex; flex-direction: column; justify-content: flex-start; align-items: center; position: relative; font-size: 11px; "> <div style="position: absolute; top: 2px; left: 4px; font-weight: bold; font-size: 10px; background: rgba(255,255,255,0.8); padding: 1px 3px; border-radius: 2px;"> {slot_label} </div> <div style="margin: 12px 0 5px 0; font-weight: 500; line-height: 1.1;"> {info['name']} </div> {svg_display} </div> """ html += '</div>' # Add pipette summary if hasattr(self, 'loaded_instruments') and self.loaded_instruments: pipette_summary = [] for mount, data in self.loaded_instruments.items(): name = data.get('name', 'Unknown').replace('_', ' ').title() pipette_summary.append(f"{mount.title()}: {name}") html += f""" <div style="margin-top: 10px; padding: 8px; background: #f0f0f0; border-radius: 4px; font-size: 12px;"> <strong>Pipettes:</strong> {' | '.join(pipette_summary)} </div> """ return html
if __name__ == "__main__": from AFL.automation.shared.launcher import *