Source code for AFL.automation.prepare.OT2Prepare

import warnings

from AFL.automation.prepare.OT2HTTPDriver import OT2HTTPDriver
from AFL.automation.prepare.PrepareDriver import PrepareDriver


[docs] class OT2Prepare(OT2HTTPDriver, PrepareDriver): defaults = { "prep_targets": [], "prepare_volume": "900 ul", "catch_volume": "900 ul", "deck": {}, "stocks": [], "stock_mix_order": [], "fixed_compositions": {}, "stock_locations": {}, # Maps stock names to deck positions: {'stockH2O': '3A2'} "stock_transfer_params": {}, # Per-stock transfer parameters: {'stockH2O': {'mix_after': True}} "catch_protocol": {}, # PipetteAction-formatted dict for catch transfer parameters }
[docs] def __init__(self, overrides=None): OT2HTTPDriver.__init__(self, overrides=overrides) PrepareDriver.__init__(self, driver_name="OT2Prepare", overrides=overrides) self.last_target_location = None self.useful_links["View Deck"] = "/visualize_deck"
[docs] def status(self): return PrepareDriver.status(self) + OT2HTTPDriver.status(self)
def _status_lines(self): status = [] status.append(f"Stocks: {len(self.stocks)} configured") status.append(f"Stock locations: {self.config['stock_locations']}") status.append(f"{len(self.config['prep_targets'])} preparation targets available") return status
[docs] def resolve_destination(self, dest): if dest is not None: return dest if not self.config.get("prep_targets"): raise ValueError("No preparation targets configured. Cannot select a destination target.") prep_targets = self.config["prep_targets"] destination = prep_targets.pop(0) self.config["prep_targets"] = prep_targets return destination
[docs] def execute_preparation(self, target, balanced_target, destination): if not hasattr(balanced_target, "protocol") or not balanced_target.protocol: raise ValueError("No protocol generated for the target solution") protocol = self.reorder_protocol(balanced_target.protocol) for step in protocol: source = step.source volume_ul = step.volume if float(volume_ul) <= 0: continue stock_name = self.config.get("deck", {}).get(source) if stock_name is None: raise ValueError(f"No stock name found for deck location: {source}") transfer_params = self.get_transfer_params(stock_name) try: transfer_result = self.transfer( source=source, dest=destination, volume=volume_ul, **transfer_params, ) self._record_prepare_transfer( stage_type="single", source=source, dest=destination, requested_volume_ul=float(volume_ul), source_stock_name=stock_name, transfer_params=transfer_params, transfer_result=transfer_result, planned_transfer={ "source": source, "dest": destination, "source_stock_name": stock_name, }, ) except Exception as e: warnings.warn(f"Transfer failed from {source} to {destination}: {str(e)}", stacklevel=2) return False self.last_target_location = destination return True
def _resolve_stage_source(self, source_location, intermediate_map): if isinstance(source_location, str) and source_location.startswith("@intermediate:"): key = source_location.split(":", 1)[1] if key not in intermediate_map: raise ValueError(f"Unknown intermediate source token: {source_location}") return intermediate_map[key] return source_location def _record_prepare_transfer( self, stage_type, source, dest, requested_volume_ul, source_stock_name, transfer_params, transfer_result, planned_transfer=None, extra=None, ): entry = { "stage_type": stage_type, "source_location": source, "dest_location": dest, "source_stock_name": source_stock_name, "requested_volume_ul": float(requested_volume_ul), "transfer_params": dict(transfer_params or {}), "transfer_result": transfer_result, } if planned_transfer is not None: entry["planned_transfer"] = planned_transfer if extra: entry.update(extra) self._append_prepare_transfer(entry) def _transfer_stage( self, source, dest, volume_ul, stage_type, source_stock_name=None, planned_transfer=None, extra=None, ): if float(volume_ul) <= 0: return stock_name = source_stock_name if stock_name is None: stock_name = self.config.get("deck", {}).get(source) transfer_params = self.get_transfer_params(stock_name) if stock_name is not None else self.get_transfer_params("default") transfer_result = self.transfer(source=source, dest=dest, volume=volume_ul, **transfer_params) self._record_prepare_transfer( stage_type=stage_type, source=source, dest=dest, requested_volume_ul=float(volume_ul), source_stock_name=stock_name, transfer_params=transfer_params, transfer_result=transfer_result, planned_transfer=planned_transfer, extra=extra, )
[docs] def execute_preparation_plan(self, target, balanced_target, destination, procedure_plan, intermediate_destinations): intermediate_ids = procedure_plan.get("intermediate_ids", []) if len(intermediate_ids) != len(intermediate_destinations): raise ValueError( f"Intermediate destination mismatch. Need {len(intermediate_ids)}, got {len(intermediate_destinations)}." ) intermediate_map = { intermediate_id: intermediate_destinations[i] for i, intermediate_id in enumerate(intermediate_ids) } stages = procedure_plan.get("stages", []) for stage in stages: stage_type = stage.get("stage_type") if stage_type == "dilution": dest_token = stage.get("destination_token") if not isinstance(dest_token, str) or not dest_token.startswith("@intermediate:"): raise ValueError(f"Invalid dilution destination token: {dest_token}") intermediate_id = dest_token.split(":", 1)[1] if intermediate_id not in intermediate_map: raise ValueError(f"No destination assigned for intermediate '{intermediate_id}'") stage_dest = intermediate_map[intermediate_id] source_loc = self._resolve_stage_source(stage.get("source_location"), intermediate_map) diluent_loc = self._resolve_stage_source(stage.get("diluent_location"), intermediate_map) source_mass_g = float(stage.get("total_source_mass_g", 0.0)) diluent_mass_g = float(stage.get("total_diluent_mass_g", 0.0)) if source_mass_g > 0: source_stock = self.stocks_by_location(source_loc) source_volume = source_stock.measure_out(f"{source_mass_g} g").volume.to("ul").magnitude self._transfer_stage( source_loc, stage_dest, source_volume, stage_type="dilution", source_stock_name=stage.get("source_stock_name"), planned_transfer={ "required_mass_g": source_mass_g, "source_location": source_loc, "destination_token": dest_token, }, extra={ "intermediate_id": intermediate_id, "destination_token": dest_token, "dilution_factor": stage.get("dilution_factor"), "batches": stage.get("batches"), "transfer_role": "source", "intermediate_location": stage_dest, }, ) if diluent_mass_g > 0: diluent_stock = self.stocks_by_location(diluent_loc) diluent_volume = diluent_stock.measure_out(f"{diluent_mass_g} g").volume.to("ul").magnitude self._transfer_stage( diluent_loc, stage_dest, diluent_volume, stage_type="dilution", source_stock_name=stage.get("diluent_stock_name"), planned_transfer={ "required_mass_g": diluent_mass_g, "source_location": diluent_loc, "destination_token": dest_token, }, extra={ "intermediate_id": intermediate_id, "destination_token": dest_token, "dilution_factor": stage.get("dilution_factor"), "batches": stage.get("batches"), "transfer_role": "diluent", "intermediate_location": stage_dest, }, ) elif stage_type == "final_mix": for transfer in stage.get("transfers", []): source_loc = self._resolve_stage_source(transfer.get("source_location"), intermediate_map) vol_ul = float(transfer.get("required_volume_ul", 0.0)) if vol_ul <= 0: continue self._transfer_stage( source_loc, destination, vol_ul, stage_type="final_mix", source_stock_name=transfer.get("source_stock_name"), planned_transfer=transfer, extra={ "destination_location": destination, }, ) else: raise ValueError(f"Unknown stage type '{stage_type}' in procedure plan") self.last_target_location = destination return True
[docs] def stocks_by_location(self, location): for stock in self.stocks: if stock.location == location: return stock raise ValueError(f"No stock configured at location '{location}'")
[docs] def build_prepare_result(self, feasible_result, balanced_target): result_dict = balanced_target.to_dict() if hasattr(balanced_target, "volume") and balanced_target.volume is not None: result_dict["total_volume"] = f"{balanced_target.volume.to('ul').magnitude} ul" return result_dict
[docs] def process_stocks(self): PrepareDriver.process_stocks(self) self._update_deck_config()
def _update_deck_config(self): deck_config = {} stock_locations = self.config.get("stock_locations", {}) for stock_name, deck_location in stock_locations.items(): deck_config[deck_location] = stock_name self.config["deck"] = deck_config
[docs] def get_transfer_params(self, stock_name): stock_params = self.config.get("stock_transfer_params", {}).get(stock_name, {}) default_params = self.config.get("stock_transfer_params", {}).get("default", {}) params = default_params.copy() params.update(stock_params) return params
[docs] def reorder_protocol(self, protocol): stock_mix_order = self.config.get("stock_mix_order", []) if not stock_mix_order: return protocol steps_by_source = {} for step in protocol: if step.source not in steps_by_source: steps_by_source[step.source] = [] steps_by_source[step.source].append(step) reordered = [] for stock_name in stock_mix_order: if stock_name in steps_by_source: reordered.extend(steps_by_source[stock_name]) del steps_by_source[stock_name] for steps in steps_by_source.values(): reordered.extend(steps) return reordered
[docs] def transfer_to_catch(self, source=None, dest=None, **kwargs): catch_params = self.config.get("catch_protocol", {}).copy() if source is None: if self.last_target_location is None: raise ValueError( "No source specified and no last target location available. " "Call prepare() first or specify source." ) source = self.last_target_location kwargs["source"] = source if dest is not None: kwargs["dest"] = dest catch_params.update(kwargs) if "dest" not in catch_params: raise ValueError("Destination 'dest' must be specified in catch_protocol config or as an argument.") try: transfer_result = self.transfer(**catch_params) self._record_prepare_transfer( stage_type="catch", source=catch_params["source"], dest=catch_params["dest"], requested_volume_ul=float(catch_params.get("volume", 0.0)), source_stock_name=self.config.get("deck", {}).get(catch_params["source"]), transfer_params={k: v for k, v in catch_params.items() if k not in ("source", "dest", "volume")}, transfer_result=transfer_result, ) except Exception as e: dest_val = catch_params.get("dest", "unknown") warnings.warn( f"Transfer to catch failed from {source} to {dest_val} using {catch_params}: {str(e)}", stacklevel=2, ) raise
[docs] def load_gen1_p10(self, mount, tip_rack_slots, **kwargs): """Convenience wrapper for loading a GEN1 P10 single-channel pipette.""" return self.load_instrument( name="p10_single", mount=mount, tip_rack_slots=tip_rack_slots, **kwargs, )
[docs] def reset(self): self.reset_targets() self.reset_stocks()
_DEFAULT_PORT = 5002 if __name__ == "__main__": from AFL.automation.shared.launcher import *