Source code for AFL.automation.prepare.BioSANSPrepare

import threading
import time
from typing import Dict

import lazy_loader as lazy

from AFL.automation.prepare.PrepareDriver import PrepareDriver
from AFL.automation.shared.mock_eic_client import MockEICClient

epics = lazy.load("epics", require="AFL-automation[neutron-scattering]")


# Global list of driver commands to inject as no-ops.
NOOP_COMMANDS = [
    "transfer",
    "transfer_to_catch",
    "loadSample",
    "advanceSample",
    "calibrate_sensor",
    "home",
    "rinseCell",
]


def _noop(*args, **kwargs):
    """Default no-op handler for injected commands."""
    return None


def _inject_noop_methods(cls, command_names):
    """Inject no-op methods for any missing command names on the class."""
    for name in command_names:
        if not hasattr(cls, name):
            setattr(cls, name, _noop)


try:
    from eic_client.EICClient import EICClient
except ImportError:
    EICClient = None


[docs] class BioSANSPrepare(PrepareDriver): defaults = { "mixing_locations": [], "catch_volume": "10 ul", "exposure": 600, "cfenable_timeout_s": 1800.0, "stocks": [], "fixed_compositions": {}, "eic_token": "1", "ipts_number": "1234", "beamline": "CG3", "mock_mode": False, }
[docs] def __init__(self, overrides=None): PrepareDriver.__init__(self, driver_name="BioSANSPrepare", overrides=overrides) self._client = None self.last_scan_id = None self.stock_pv_map = {} self._popped_destination = None
def _status_lines(self): status = [] status.append(f"BioSANS Stock PVs: {list(self.stock_pv_map.keys())}") status.append(f"{len(self.config['mixing_locations'])} mixing locations left") return status
[docs] def before_balance(self, target: dict) -> None: if not self.config["mock_mode"]: timeout_s = float(self.config.get("cfenable_timeout_s", 1800.0)) self._wait_for_cfenable_cycle(timeout_s=timeout_s)
[docs] def resolve_destination(self, dest): self._popped_destination = None if dest is not None: return dest if not self.config.get("mixing_locations"): raise ValueError("No mixing locations configured. Cannot select a destination PV.") mixing_locations = self.config["mixing_locations"] destination = mixing_locations.pop(0) self.config["mixing_locations"] = mixing_locations self._popped_destination = destination return destination
[docs] def on_prepare_exception(self, destination, dest_was_none): if dest_was_none and self._popped_destination is not None: mixing_locations = self.config["mixing_locations"] mixing_locations.insert(0, self._popped_destination) self.config["mixing_locations"] = mixing_locations self._popped_destination = None
[docs] def execute_preparation(self, target: dict, balanced_target, destination: str) -> bool: self.make_stock_pv_map() stock_volumes = {f"CG3:SE:CMP:S{i}Vol": 0 for i in range(1, 9)} for pipette_action in balanced_target.protocol: source_stock_name = pipette_action.source if source_stock_name not in self.stock_pv_map: raise ValueError( f"Stock PV for '{source_stock_name}' not found in stock_pv_map. " f"Available stocks in map: {list(self.stock_pv_map.keys())}" ) stock_volumes[self.stock_pv_map[source_stock_name]] = pipette_action.volume catch_volume = self.config.get("catch_volume") if catch_volume is None: raise ValueError("Catch volume ('catch_volume') is not configured.") target_name = target.get("name", "Unnamed target") headers = [ "Title", "CG3:SE:CMP:S1Vol", "CG3:SE:CMP:S2Vol", "CG3:SE:CMP:S3Vol", "CG3:SE:CMP:S4Vol", "CG3:SE:CMP:S5Vol", "CG3:SE:CMP:S6Vol", "CG3:SE:CMP:S7Vol", "CG3:SE:CMP:S8Vol", "CG3:SE:URMPI:143", "CG3:SE:URPI:MixFinalVolume", "CG3:SE:URPI:ChangeTips", "RobotProcess", "URMPI147Wait", "Delay", "CG3:Mot:AttnPos:Menu", "Wait For", "Value", "CG3:Mot:AttnPos:Menu", ] row = [ target_name, stock_volumes["CG3:SE:CMP:S1Vol"], stock_volumes["CG3:SE:CMP:S2Vol"], stock_volumes["CG3:SE:CMP:S3Vol"], stock_volumes["CG3:SE:CMP:S4Vol"], stock_volumes["CG3:SE:CMP:S5Vol"], stock_volumes["CG3:SE:CMP:S6Vol"], stock_volumes["CG3:SE:CMP:S7Vol"], stock_volumes["CG3:SE:CMP:S8Vol"], destination, catch_volume, 0, 1, 1, 5, 2, "seconds", self.config["exposure"], 6, ] success, self.last_scan_id, response_data = self.client.submit_table_scan( parms={"run_mode": 0, "headers": headers, "rows": [row]}, desc=f"AFL prepare table scan for {target_name}", simulate_only=False, ) if not success: raise RuntimeError(f"Error in EIC table scan: {response_data}") self.blockForTableScan() rinse_headers = [ "Title", "CG3:SE:URMPI:Mom144", "CG3:SE:CMP:CFEnable", "Delay", "URMPI145Wait", "CG3:SE:CMP:CFEnable", ] rinse_row = ["Clean cell and measure MT cell", 1, 1, 5, 1, 0] self.client.submit_table_scan( parms={"run_mode": 0, "headers": rinse_headers, "rows": [rinse_row]}, desc="AFL rinse cell table scan", simulate_only=False, ) return True
[docs] def blockForTableScan(self): """Block until the last submitted table scan is complete.""" status_success, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id) while not is_done: time.sleep(0.1) status_success, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id)
def _wait_for_cfenable_cycle(self, timeout_s: float) -> None: pv_name = "CG3:SE:CMP:CFEnable" start_wait = time.monotonic() low_event = threading.Event() def _on_change(value=None, **_kwargs): try: v = int(value) except Exception: return if v == 0: low_event.set() pv = None callback_index = None try: pv = epics.PV(pv_name) callback_index = pv.add_callback(_on_change) try: current = pv.get() except Exception as e: raise RuntimeError(f"Failed to read {pv_name}: {e}") try: current_int = int(current) except Exception: current_int = None elapsed = time.monotonic() - start_wait remaining = max(0.0, timeout_s - elapsed) if remaining == 0.0: raise TimeoutError(f"Timed out waiting for {pv_name} cycle after {timeout_s} seconds") if current_int == 1: if not low_event.wait(timeout=remaining): raise TimeoutError(f"Timed out waiting for {pv_name} to drop after {timeout_s} seconds") return if current_int == 0: return raise RuntimeError(f"Unexpected value for {pv_name}: {current}") finally: if pv is not None: if callback_index is not None: try: pv.remove_callback(callback_index) except Exception: pass try: pv.disconnect() except Exception: pass
[docs] def reset(self): self.reset_targets() self.reset_stocks() self.stocks = [] self.targets = []
@property def client(self): if self._client is None: if self.config["mock_mode"]: self._client = MockEICClient( ipts_number=str(self.config["ipts_number"]), eic_token=self.config["eic_token"], beamline=self.config["beamline"], ) else: if EICClient is None: raise ImportError("eic_client is not available and mock_mode is False") self._client = EICClient( ipts_number=str(self.config["ipts_number"]), eic_token=self.config["eic_token"], beamline=self.config["beamline"], ) return self._client
[docs] def make_stock_pv_map(self): self.stock_pv_map = {} self.process_stocks() desc_to_pv: Dict[str, str] = {} for i in range(8): pv_name = f"CG3:SE:CMP:S{i+1}Vol" stock_desc = str(self.get_pv(pv_name + ".DESC")).strip() if stock_desc: desc_to_pv[stock_desc] = pv_name self.stock_pv_map[stock_desc] = pv_name for stock in self.stocks: pv_name = desc_to_pv.get(stock.name) if pv_name is None: continue if stock.location is not None: self.stock_pv_map[str(stock.location)] = pv_name self.stock_pv_map[str(stock.name)] = pv_name
[docs] def set_pv(self, pv_name, value, timeout=10, wait=True): success_set, response_data_set = self.client.set_pv(pv_name, value, timeout, wait) if not success_set: raise ValueError(f"Failed to set PV {pv_name}")
[docs] def get_pv(self, pv_name, timeout=10): success_get, pv_value_read, response_data_get = self.client.get_pv(pv_name, timeout) if not success_get: raise ValueError(f"Failed to get PV {pv_name}: {response_data_get}") return pv_value_read
_inject_noop_methods(BioSANSPrepare, NOOP_COMMANDS) _DEFAULT_PORT = 5002 if __name__ == "__main__": from AFL.automation.shared.launcher import *