Source code for AFL.automation.prepare.PrepareDriver

import warnings
from typing import Dict, Optional

from AFL.automation.mixcalc.MassBalance import MassBalance
from AFL.automation.mixcalc.MassBalanceDriver import MassBalanceDriver
from AFL.automation.mixcalc.Solution import Solution
from AFL.automation.shared.PersistentConfig import PersistentConfig
from AFL.automation.shared.utilities import listify


[docs] class PrepareDriver(MassBalanceDriver): """Base class for prepare drivers. Subclasses provide transport/backend-specific execution while this class handles shared target conditioning and mass-balance feasibility/solve flow. """ defaults = { "stocks": [], "fixed_compositions": {}, "enable_multistep_dilution": False, "multistep_max_steps": 2, "multistep_diluent_policy": "primary_solvent", }
[docs] def __init__(self, driver_name: str, overrides=None): MassBalanceDriver.__init__(self, overrides=overrides) self.name = driver_name self.filepath = self.path / (self.name + ".config.json") self.config = PersistentConfig( path=self.filepath, defaults=self.gather_defaults(), overrides=overrides, max_history=100, max_history_size_mb=50, write_debounce_seconds=0.5, compact_json=True, ) self.stocks = [] self.targets = [] self.process_stocks()
[docs] def status(self): status = [] status.append(f"AFL Server Stocks: {self.config['stocks']}") status.extend(self._status_lines()) return status
def _status_lines(self) -> list[str]: """Subclass hook for additional status lines.""" return []
[docs] def is_feasible( self, targets: dict | list[dict], enable_multistep_dilution: bool | None = None, ) -> list[dict | None]: targets_to_check = listify(targets) self.process_stocks() minimum_volume = self.config.get("minimum_volume", "100 ul") if enable_multistep_dilution is None: enable_multistep_dilution = bool(self.config.get("enable_multistep_dilution", False)) results: list[dict | None] = [] for target in targets_to_check: try: mb = MassBalance(minimum_volume=minimum_volume) for stock in self.stocks: mb.stocks.append(stock) target_with_fixed = self.apply_fixed_comps(target.copy()) target_solution = Solution(**target_with_fixed) mb.targets.append(target_solution) mb.balance( tol=self.config.get("tol", 1e-3), enable_multistep_dilution=bool(enable_multistep_dilution), multistep_max_steps=int(self.config.get("multistep_max_steps", 2)), multistep_diluent_policy=str(self.config.get("multistep_diluent_policy", "primary_solvent")), ) if ( mb.balanced and len(mb.balanced) > 0 and mb.balanced[0].get("balanced_target") is not None ): results.append(mb.balanced[0]["balanced_target"].to_dict()) else: results.append(None) except Exception as e: warnings.warn( f"Exception during feasibility check for target " f"{target.get('name', 'Unnamed')}: {str(e)}", stacklevel=2, ) results.append(None) return results
[docs] def apply_fixed_comps(self, target: dict) -> dict: result = target.copy() fixed_comps = self.config.get("fixed_compositions", {}) if not fixed_comps: return result for prop_type in ["masses", "volumes", "concentrations", "mass_fractions"]: if prop_type not in result: result[prop_type] = {} if prop_type in fixed_comps: for comp_name, comp_value in fixed_comps[prop_type].items(): if comp_name not in result[prop_type]: result[prop_type][comp_name] = comp_value for prop in ["total_mass", "total_volume", "name", "location"]: if prop in fixed_comps and prop not in result: result[prop] = fixed_comps[prop] if "solutes" in fixed_comps: if "solutes" not in result: result["solutes"] = fixed_comps["solutes"].copy() else: for solute in fixed_comps["solutes"]: if solute not in result["solutes"]: result["solutes"].append(solute) return result
[docs] def before_balance(self, target: dict) -> None: """Subclass hook to perform backend-specific checks before solving."""
[docs] def resolve_destination(self, dest: Optional[str]) -> str: """Return destination identifier for this backend.""" raise NotImplementedError("PrepareDriver subclasses must implement resolve_destination().")
[docs] def execute_preparation(self, target: dict, balanced_target: Solution, destination: str) -> bool: """Execute backend-specific prepare actions. Returns False for handled, non-fatal failures where caller should return (None, None). Raise for hard failures. """ raise NotImplementedError("PrepareDriver subclasses must implement execute_preparation().")
[docs] def execute_preparation_plan( self, target: dict, balanced_target: Solution, destination: str, procedure_plan: dict, intermediate_destinations: list[str], ) -> bool: if procedure_plan.get("required_intermediate_targets", 0) > 0: raise NotImplementedError( "This prepare backend does not implement multi-step execution." ) return self.execute_preparation(target, balanced_target, destination)
[docs] def on_prepare_exception(self, destination: str, dest_was_none: bool) -> None: """Subclass hook to rollback destination bookkeeping on exceptions."""
[docs] def build_prepare_result(self, feasible_result: dict, balanced_target: Solution) -> dict: """Build return payload for prepare().""" return feasible_result
def _destination_queue_key(self) -> str | None: if "prep_targets" in self.config: return "prep_targets" if "mixing_locations" in self.config: return "mixing_locations" return None def _reserve_destinations( self, dest: str | None, required_intermediate_targets: int, ) -> tuple[str, list[str], list[str], str | None]: if required_intermediate_targets <= 0: destination = self.resolve_destination(dest) return destination, [], [], None queue_key = self._destination_queue_key() if queue_key is None: raise ValueError( "Multi-step prepare requires a configured destination queue (prep_targets or mixing_locations)." ) queue = list(self.config.get(queue_key, [])) needed = required_intermediate_targets + (0 if dest is not None else 1) if len(queue) < needed: raise ValueError( f"Not enough {queue_key} entries for multi-step preparation. " f"Need {needed}, found {len(queue)}." ) consumed = queue[:needed] self.config[queue_key] = queue[needed:] intermediate_destinations = consumed[:required_intermediate_targets] destination = dest if dest is not None else consumed[required_intermediate_targets] return destination, intermediate_destinations, consumed, queue_key def _restore_reserved_destinations(self, queue_key: str | None, consumed: list[str]) -> None: if not queue_key or not consumed: return queue = list(self.config.get(queue_key, [])) self.config[queue_key] = consumed + queue
[docs] def prepare( self, target: dict, dest: str | None = None, enable_multistep_dilution: bool | None = None, ) -> tuple[dict, str] | tuple[None, None]: target = self.apply_fixed_comps(target) if enable_multistep_dilution is None: enable_multistep_dilution = bool(self.config.get("enable_multistep_dilution", False)) feasibility_results = self.is_feasible( target, enable_multistep_dilution=bool(enable_multistep_dilution), ) if not feasibility_results or feasibility_results[0] is None: warnings.warn( f"Target composition {target.get('name', 'Unnamed target')} is not feasible " f"based on mass balance calculations", stacklevel=2, ) return None, None feasible_result = feasibility_results[0] self.before_balance(target) self.reset_targets() self.add_target(target) self.balance(enable_multistep_dilution=bool(enable_multistep_dilution)) if not self.balanced or not self.balanced[0].get("balanced_target"): warnings.warn( f"No suitable mass balance found for target: {target.get('name', 'Unnamed target')}", stacklevel=2, ) return None, None balanced_target = self.balanced[0]["balanced_target"] procedure_plan = self.balanced[0].get("procedure_plan") or {} required_intermediate_targets = int(procedure_plan.get("required_intermediate_targets", 0)) destination, intermediate_destinations, consumed, queue_key = self._reserve_destinations( dest=dest, required_intermediate_targets=required_intermediate_targets, ) try: if required_intermediate_targets > 0: success = self.execute_preparation_plan( target=target, balanced_target=balanced_target, destination=destination, procedure_plan=procedure_plan, intermediate_destinations=intermediate_destinations, ) else: success = self.execute_preparation(target, balanced_target, destination) if success is False: return None, None except Exception: if required_intermediate_targets > 0: self._restore_reserved_destinations(queue_key=queue_key, consumed=consumed) else: self.on_prepare_exception(destination=destination, dest_was_none=(dest is None)) raise return self.build_prepare_result(feasible_result, balanced_target), destination