import warnings
import time
import threading
from typing import List, Union, Dict, Any
from AFL.automation.mixing.MassBalance import MassBalance
from AFL.automation.mixing.MassBalanceDriver import MassBalanceDriver
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.shared.utilities import listify
from AFL.automation.shared.mock_eic_client import MockEICClient
from AFL.automation.shared.PersistentConfig import PersistentConfig
import lazy_loader as lazy
epics = lazy.load("epics", require="AFL-automation[neutron-scattering]")
# Global list of driver commands to inject as no-ops.
NOOP_COMMANDS = [
'transfer',
'transfer_to_catch',
'loadSample',
'advanceSample',
'calibrate_sensor',
'home',
'rinseCell',
]
def _noop(*args, **kwargs):
"""Default no-op handler for injected commands."""
return None
def _inject_noop_methods(cls, command_names):
"""Inject no-op methods for any missing command names on the class."""
for name in command_names:
if not hasattr(cls, name):
setattr(cls, name, _noop)
# Optional import for EIC client
try:
from eic_client.EICClient import EICClient
except ImportError:
EICClient = None
[docs]
class BioSANSPrepare(MassBalanceDriver):
defaults = {
'mixing_locations': [],
'catch_volume': '10 ul',
'exposure': 600,
'cfenable_timeout_s': 1800.0,
'stocks': [],
'fixed_compositions': {},
'eic_token': '1',
'ipts_number': '1234',
'beamline': 'CG3',
'mock_mode': False,
}
[docs]
def __init__(self, overrides=None):
MassBalanceDriver.__init__(self, overrides=overrides,)
self.name = 'BioSANSPrepare'
self.filepath = self.path / (self.name + '.config.json')
self.config = PersistentConfig(
path=self.filepath,
defaults=self.gather_defaults(),
overrides=overrides,
max_history=100,
max_history_size_mb=50,
write_debounce_seconds=0.5,
compact_json=True,
)
self._client = None
self.last_scan_id = None
self.stock_pv_map = {}
self.stocks = self.config.get('stocks', [])
self.targets = []
self.process_stocks()
[docs]
def status(self):
"""
Get the status of the BioSANSPrepareDriver.
"""
status = []
status.append(f'AFL Server Stocks: {self.config["stocks"]}')
status.append(f'BioSANS Stock PVs: {list(self.stock_pv_map.keys())}')
status.append(f'{len(self.config["mixing_locations"])} mixing locations left')
return status
[docs]
def is_feasible(self, targets: dict | list[dict]) -> list[dict | None]:
"""
Check if the target composition(s) is/are feasible for preparation using mass balance.
If feasible, returns the balanced target solution dictionary. Otherwise, returns None.
This implementation creates a local MassBalance instance for each feasibility check
to avoid modifying the driver's state.
Parameters
----------
targets : Union[dict, List[dict]]
Either a single target dictionary or a list of target dictionaries.
Returns
-------
List[Union[dict, None]]
A list containing the balanced target dictionary for each feasible target,
or None for infeasible targets.
"""
targets_to_check = listify(targets)
# Always process stocks from config to avoid stale in-memory state.
self.process_stocks()
# Get the minimum volume configuration
minimum_volume = self.config.get('minimum_volume', '100 ul')
results = []
for target in targets_to_check:
try:
# Create a local MassBalance instance
mb = MassBalance(minimum_volume=minimum_volume)
# Configure the same stocks as in the driver
for stock in self.stocks:
mb.stocks.append(stock)
# Apply any fixed compositions from config
target_with_fixed = self.apply_fixed_comps(target.copy())
# Create a Solution from the target and add it to the MassBalance instance
from AFL.automation.mixing.Solution import Solution
target_solution = Solution(**target_with_fixed)
mb.targets.append(target_solution)
# Calculate mass balance
mb.balance(tol=self.config.get('tol', 1e-3))
# Check if balance was successful for this target
if (mb.balanced and
len(mb.balanced) > 0 and
mb.balanced[0].get('balanced_target') is not None):
results.append(mb.balanced[0]['balanced_target'].to_dict())
else:
results.append(None)
except Exception as e:
# If an exception occurs, indicate failure
warnings.warn(f"Exception during feasibility check for target {target.get('name', 'Unnamed')}: {str(e)}", stacklevel=2)
results.append(None)
return results
[docs]
def apply_fixed_comps(self, target: dict) -> dict:
"""
Apply fixed compositions to a target dictionary without overwriting existing values.
Parameters
----------
target : dict
The target solution dictionary
Returns
-------
dict
A new target dictionary with fixed compositions applied
"""
# Create a copy to avoid modifying the original
result = target.copy()
# Get fixed compositions from config
fixed_comps = self.config.get('fixed_compositions', {})
if not fixed_comps:
return result
# For each component property type that might exist in the target
for prop_type in ['masses', 'volumes', 'concentrations', 'mass_fractions']:
# Initialize property dictionaries if they don't exist
if prop_type not in result:
result[prop_type] = {}
# If this property exists in fixed compositions
if prop_type in fixed_comps:
# Add each component from fixed compositions that doesn't already exist
for comp_name, comp_value in fixed_comps[prop_type].items():
if comp_name not in result[prop_type]:
result[prop_type][comp_name] = comp_value
# Handle simpler properties that might not be dictionaries
for prop in ['total_mass', 'total_volume', 'name', 'location']:
if prop in fixed_comps and prop not in result:
result[prop] = fixed_comps[prop]
# Handle solutes list
if 'solutes' in fixed_comps:
if 'solutes' not in result:
result['solutes'] = fixed_comps['solutes'].copy()
else:
# Add any solutes that aren't already in the list
for solute in fixed_comps['solutes']:
if solute not in result['solutes']:
result['solutes'].append(solute)
return result
[docs]
def prepare(self, target: dict, dest: str | None = None) -> tuple[dict, str] | tuple[None, None]:
"""Prepare the target solution. The dest argument is currently not used by this implementation."""
# Apply fixed compositions without overwriting existing values
target = self.apply_fixed_comps(target)
# Check if the target is feasible before attempting preparation
feasibility_results = self.is_feasible(target)
if not feasibility_results or feasibility_results[0] is None:
warnings.warn(f'Target composition {target.get("name", "Unnamed target")} is not feasible based on mass balance calculations', stacklevel=2)
return None, None
balanced_target_dict_from_feasible = feasibility_results[0]
if not self.config['mock_mode']:
timeout_s = float(self.config.get('cfenable_timeout_s', 1800.0))
self._wait_for_cfenable_cycle(timeout_s=timeout_s)
self.reset_targets()
# We need to re-add the original target, not the dict from is_feasible
self.add_target(target)
self.balance()
if not self.balanced or not self.balanced[0].get('balanced_target'):
warnings.warn(f'No suitable mass balance found for target: {target.get("name", "Unnamed target")}',stacklevel=2)
return None, None
# This is the Solution object containing the protocol
balanced_target_solution_object = self.balanced[0]['balanced_target']
self.make_stock_pv_map()
# Configure the destination for the preparation
if not self.config.get('mixing_locations'):
raise ValueError("No mixing locations configured. Cannot select a destination PV.")
# Pop the PV name that will be used to select the specific mixing destination/station
if dest is None:
# need to pop and then resend the locations list so that the persistant config triggers a write
mixing_locations = self.config['mixing_locations']
destination = mixing_locations.pop(0)
self.config['mixing_locations'] = mixing_locations
popped_destination = True
else:
destination = dest
popped_destination = False
try:
# Build stock volumes dict defaulting all 8 slots to 0
stock_volumes = {f'CG3:SE:CMP:S{i}Vol': 0 for i in range(1, 9)}
# Fill non-zero values from balanced_target protocol using stock_pv_map
for pipette_action in balanced_target_solution_object.protocol:
source_stock_name = pipette_action.source
if source_stock_name not in self.stock_pv_map:
raise ValueError(
f"Stock PV for '{source_stock_name}' not found in stock_pv_map. "
f"Available stocks in map: {list(self.stock_pv_map.keys())}"
)
stock_volumes[self.stock_pv_map[source_stock_name]] = pipette_action.volume
catch_volume = self.config.get('catch_volume')
if catch_volume is None:
raise ValueError("Catch volume ('catch_volume') is not configured.")
target_name = target.get('name', 'Unnamed target')
headers = [
'Title',
'CG3:SE:CMP:S1Vol', 'CG3:SE:CMP:S2Vol', 'CG3:SE:CMP:S3Vol', 'CG3:SE:CMP:S4Vol',
'CG3:SE:CMP:S5Vol', 'CG3:SE:CMP:S6Vol', 'CG3:SE:CMP:S7Vol', 'CG3:SE:CMP:S8Vol',
'CG3:SE:URMPI:143', 'CG3:SE:URPI:MixFinalVolume', 'CG3:SE:URPI:ChangeTips',
'RobotProcess', 'URMPI147Wait', 'Delay',
'CG3:Mot:AttnPos:Menu',
'Wait For', 'Value',
'CG3:Mot:AttnPos:Menu',
]
row = [
target_name,
stock_volumes['CG3:SE:CMP:S1Vol'], stock_volumes['CG3:SE:CMP:S2Vol'],
stock_volumes['CG3:SE:CMP:S3Vol'], stock_volumes['CG3:SE:CMP:S4Vol'],
stock_volumes['CG3:SE:CMP:S5Vol'], stock_volumes['CG3:SE:CMP:S6Vol'],
stock_volumes['CG3:SE:CMP:S7Vol'], stock_volumes['CG3:SE:CMP:S8Vol'],
destination, catch_volume, 0,
1, 1, 5,
2,
'seconds', self.config['exposure'],
6
]
success, self.last_scan_id, response_data = self.client.submit_table_scan(
parms={
'run_mode': 0,
'headers': headers,
'rows': [row],
},
desc=f'AFL prepare table scan for {target_name}',
simulate_only=False,
)
if not success:
raise RuntimeError(f'Error in EIC table scan: {response_data}')
self.blockForTableScan()
# Submit rinse/MT-cell scan immediately after measurement, but don't block
# rinse_headers = [
# 'Title', 'CG3:SE:URMPI:Mom144', 'CG3:SE:CMP:CFEnable', 'Delay',
# 'URMPI145Wait', 'CG3:SE:CMP:CFEnable', 'Wait For', 'Value',
# ]
# rinse_row = ['Clean cell and measure MT cell', 1, 1, 5, 1, 0, 'seconds', 10]
# Submit rinse/MT-cell scan immediately after measurement, but don't block
rinse_headers = [
'Title', 'CG3:SE:URMPI:Mom144', 'CG3:SE:CMP:CFEnable', 'Delay',
'URMPI145Wait', 'CG3:SE:CMP:CFEnable'
]
rinse_row = ['Clean cell and measure MT cell', 1, 1, 5, 1, 0]
self.client.submit_table_scan(
parms={
'run_mode': 0,
'headers': rinse_headers,
'rows': [rinse_row],
},
desc='AFL rinse cell table scan',
simulate_only=False,
)
except Exception:
if popped_destination:
mixing_locations = self.config['mixing_locations']
mixing_locations.insert(0, destination)
self.config['mixing_locations'] = mixing_locations
raise
return balanced_target_dict_from_feasible, destination
[docs]
def blockForTableScan(self):
"""Block until the last submitted table scan is complete."""
status_success, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id)
while not is_done:
time.sleep(0.1)
status_success, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id)
def _wait_for_cfenable_cycle(self, timeout_s: float) -> None:
pv_name = "CG3:SE:CMP:CFEnable"
start_wait = time.monotonic()
low_event = threading.Event()
def _on_change(value=None, **_kwargs):
try:
v = int(value)
except Exception:
return
if v == 0:
low_event.set()
pv = None
callback_index = None
try:
pv = epics.PV(pv_name)
callback_index = pv.add_callback(_on_change)
try:
current = pv.get()
except Exception as e:
raise RuntimeError(f"Failed to read {pv_name}: {e}")
try:
current_int = int(current)
except Exception:
current_int = None
elapsed = time.monotonic() - start_wait
remaining = max(0.0, timeout_s - elapsed)
if remaining == 0.0:
raise TimeoutError(f"Timed out waiting for {pv_name} cycle after {timeout_s} seconds")
if current_int == 1:
# Already high: wait for it to drop to 0
if not low_event.wait(timeout=remaining):
raise TimeoutError(f"Timed out waiting for {pv_name} to drop after {timeout_s} seconds")
return
if current_int == 0:
return
raise RuntimeError(f"Unexpected value for {pv_name}: {current}")
finally:
if pv is not None:
if callback_index is not None:
try:
pv.remove_callback(callback_index)
except Exception:
pass
try:
pv.disconnect()
except Exception:
pass
[docs]
def reset(self):
"""Reset the driver state/configuration."""
# Placeholder: implement reset logic
self.reset_targets()
self.reset_stocks()
self.stocks = []
self.targets = []
@property
def client(self):
"""
Property that returns the EIC client instance.
If the client doesn't exist yet, it instantiates a new EICClient
using the token and beamline from the configuration.
Returns
-------
EICClient
The client instance for communicating with the instrument.
"""
if self._client is None:
if self.config['mock_mode']:
self._client = MockEICClient(
ipts_number=str(self.config['ipts_number']),
eic_token=self.config['eic_token'],
beamline=self.config['beamline']
)
else:
if EICClient is None:
raise ImportError("eic_client is not available and mock_mode is False")
self._client = EICClient(
ipts_number=str(self.config['ipts_number']),
eic_token=self.config['eic_token'],
beamline=self.config['beamline']
)
return self._client
[docs]
def make_stock_pv_map(self):
"""
Make a map of the stock locations to the stock solutions.
"""
self.stock_pv_map = {}
self.process_stocks()
num_stocks = 8
desc_to_pv = {}
for i in range(num_stocks):
pv_name = f'CG3:SE:CMP:S{i+1}Vol'
stock_desc = str(self.get_pv(pv_name + '.DESC')).strip()
if stock_desc:
desc_to_pv[stock_desc] = pv_name
self.stock_pv_map[stock_desc] = pv_name
for stock in self.stocks:
pv_name = desc_to_pv.get(stock.name)
if pv_name is None:
continue
if stock.location is not None:
self.stock_pv_map[str(stock.location)] = pv_name
self.stock_pv_map[str(stock.name)] = pv_name
[docs]
def set_pv(self, pv_name, value,timeout=10,wait=True):
success_set, response_data_set = self.client.set_pv(pv_name, value,timeout,wait)
if not success_set:
raise ValueError(f'Failed to set PV {pv_name}')
[docs]
def get_pv(self, pv_name, timeout=10):
success_get, pv_value_read, response_data_get = self.client.get_pv(pv_name, timeout)
if not success_get:
raise ValueError(f'Failed to get PV {pv_name}: {response_data_get}')
return pv_value_read
_inject_noop_methods(BioSANSPrepare, NOOP_COMMANDS)
_DEFAULT_PORT=5002
if __name__ == '__main__':
from AFL.automation.shared.launcher import *