Source code for AFL.automation.sample.SampleDriver

import itertools
import datetime
import pathlib
import shutil
import traceback
import uuid
from typing import Optional, Dict, List
import warnings
import os
import time
import logging
import copy
import pandas as pd

import h5py  # type: ignore
import numpy as np
import requests  # type: ignore
import xarray as xr 
from tiled.client import from_uri  # type: ignore
from tiled.queries import Eq  # type: ignore
from scipy.spatial.distance import cdist

import AFL.automation.prepare  # type: ignore
from AFL.automation.APIServer.Client import Client  # type: ignore
from AFL.automation.APIServer.Driver import Driver  # type: ignore
from AFL.automation.shared.units import units  # type: ignore


[docs] class SampleDriver(Driver): """ PersistentConfig Values ----------------------- client: dict Contains APIServer uris (url:port) where the keys will be used as the accessor names. instrument: dict Description and execution/access/location information of each instrument to be used ternary: bool If true, process coordinates as ternary, Barycentric values data_tag: str Label for current measurements or active learning run """ defaults = {} defaults['client'] = {} defaults['instrument'] = {} defaults['ternary'] = False defaults['data_tag'] = 'default' defaults['data_path'] = './' defaults['components'] = [] defaults['AL_components'] = [] defaults['snapshot_directory'] = str(pathlib.Path.home() / 'snaps') defaults['max_sample_transmission'] = 0.6 defaults['mix_order'] = [] defaults['custom_stock_settings'] = [] defaults['composition_var_name'] = 'comps' defaults['concat_dim'] = 'sample' defaults['sample_composition_tol'] = 0.0 defaults['next_samples_variable'] = 'next_samples' defaults['camera_urls'] = [] defaults['grid_file'] = None defaults['grid_blank_interval'] = None defaults['grid_blank_sample'] = None
[docs] def __init__( self, camera_urls: Optional[List[str]] = None, snapshot_directory: Optional[str] = None, overrides: Optional[Dict] = None, ): """ Parameters ----------- """ Driver.__init__(self, name='SampleDriver', defaults=self.gather_defaults(), overrides=overrides) self.AL_campaign_name = None self.deck = None self.sample_name: Optional[str] = None self.app = None self.name = 'SampleDriver' if camera_urls is not None: self.config['camera_urls'] = camera_urls if snapshot_directory is not None: self.config['snapshot_directory'] = snapshot_directory #initialize client dict self.client = {} self.uuid = {'rinse': None, 'prep': None, 'catch': None, 'agent': None} self.status_str = 'Fresh Server!' self.wait_time = 30.0 # seconds self.catch_protocol = None self.AL_status_str = '' self.grid_sample_count = 0 self.grid_data = None self.stop_grid = False # XXX need to make deck inside this object because of 'different registries error in Pint self.reset_deck()
[docs] def validate_config(self): required_keys = [ 'client', 'instrument', 'ternary', 'data_tag', 'data_path', 'components', 'AL_components', 'snapshot_directory', 'max_sample_transmission', 'mix_order', 'custom_stock_settings', 'composition_var_name', 'concat_dim', 'sample_composition_tol', 'camera_urls' ] missing_keys = [key for key in required_keys if key not in self.config] if missing_keys: raise KeyError(f"The following required keys are missing from self.config: {', '.join(missing_keys)}") # Validate client configuration if not isinstance(self.config['client'], dict): raise TypeError("self.config['client'] must be a dictionary") if 'load' not in self.config['client']: raise KeyError("'load' client must be configured in self.config['client']") if 'prep' not in self.config['client']: raise KeyError("'prep' client must be configured in self.config['client']") # Validate instrument configuration if not isinstance(self.config['instrument'], list): raise TypeError("self.config['instrument'] must be a list") if len(self.config['instrument']) == 0: raise ValueError("At least one instrument must be configured in self.config['instrument']") for i, instrument in enumerate(self.config['instrument']): required_instrument_keys = ['name', 'client_name', 'data', 'measure_base_kw', 'empty_base_kw', 'sample_dim', 'sample_comps_variable'] missing_instrument_keys = [key for key in required_instrument_keys if key not in instrument] if missing_instrument_keys: raise KeyError(f"Instrument {i} is missing the following required keys: {', '.join(missing_instrument_keys)}") if not isinstance(instrument['data'], list): raise TypeError(f"Instrument {i}: 'data' must be a list") for j, data_item in enumerate(instrument['data']): required_data_keys = ['data_name', 'tiled_array_name'] missing_data_keys = [key for key in required_data_keys if key not in data_item] if missing_data_keys: raise KeyError(f"Instrument {i}, data item {j} is missing the following required keys: {', '.join(missing_data_keys)}") # Validate other list types list_keys = ['components', 'AL_components', 'mix_order', 'camera_urls'] for key in list_keys: if not isinstance(self.config[key], list): raise TypeError(f"self.config['{key}'] must be a list") # Validate dicts if not isinstance(self.config['custom_stock_settings'],dict): raise TypeError("self.config['custom_stock_settings'] must be a dict") # Validate other dict types list_keys = ['custom_stock_settings'] for key in list_keys: if not isinstance(self.config[key], dict): raise TypeError(f"self.config['{key}'] must be a dict") # Validate types of other keys if not isinstance(self.config['ternary'], bool): raise TypeError("self.config['ternary'] must be a boolean") if not isinstance(self.config['data_tag'], str): raise TypeError("self.config['data_tag'] must be a string") if not isinstance(self.config['data_path'], (str, pathlib.Path)): raise TypeError("self.config['data_path'] must be a string or pathlib.Path") if not isinstance(self.config['snapshot_directory'], (str, pathlib.Path)): raise TypeError("self.config['snapshot_directory'] must be a string or pathlib.Path") if not isinstance(self.config['max_sample_transmission'], (int, float)): raise TypeError("self.config['max_sample_transmission'] must be a number") if not isinstance(self.config['composition_var_name'], str): raise TypeError("self.config['composition_var_name'] must be a string") if not isinstance(self.config['concat_dim'], str): raise TypeError("self.config['concat_dim'] must be a string") if not isinstance(self.config['sample_composition_tol'], (int, float)): raise TypeError("self.config['sample_composition_tol'] must be a number") logging.info("Configuration validation passed successfully.")
[docs] def validate_config_grid(self): """Validate configuration specific to grid-based sample processing.""" # Basic client validation if not isinstance(self.config['client'], dict): raise TypeError("self.config['client'] must be a dictionary") # Instrument validation - simplified compared to regular validate_config if not isinstance(self.config['instrument'], list): raise TypeError("self.config['instrument'] must be a list") if len(self.config['instrument']) == 0: raise ValueError("At least one instrument must be configured in self.config['instrument']") for i, instrument in enumerate(self.config['instrument']): # Minimal required keys for grid mode required_instrument_keys = ['client_name', 'data', 'measure_base_kw', 'empty_base_kw'] missing_instrument_keys = [key for key in required_instrument_keys if key not in instrument] if missing_instrument_keys: raise KeyError(f"Instrument {i} is missing the following required keys: {', '.join(missing_instrument_keys)}") if not isinstance(instrument['data'], list): raise TypeError(f"Instrument {i}: 'data' must be a list") for j, data_item in enumerate(instrument['data']): required_data_keys = ['data_name', 'tiled_array_name'] missing_data_keys = [key for key in required_data_keys if key not in data_item] if missing_data_keys: raise KeyError(f"Instrument {i}, data item {j} is missing the following required keys: {', '.join(missing_data_keys)}") # Validate grid-specific configuration items if self.config['grid_file'] is not None and not isinstance(self.config['grid_file'], (str, pathlib.Path)): raise TypeError("self.config['grid_file'] must be a string or pathlib.Path") if self.config['grid_blank_interval'] is not None and not isinstance(self.config['grid_blank_interval'], int): raise TypeError("self.config['grid_blank_interval'] must be an integer") if self.config['grid_blank_sample'] is not None and not isinstance(self.config['grid_blank_sample'], dict): raise TypeError("self.config['grid_blank_sample'] must be a dictionary") # Validate data-related settings if not isinstance(self.config['data_path'], (str, pathlib.Path)): raise TypeError("self.config['data_path'] must be a string or pathlib.Path") if not isinstance(self.config['data_tag'], str): raise TypeError("self.config['data_tag'] must be a string") # Validate instrument configuration if not isinstance(self.config['instrument'], list): raise TypeError("self.config['instrument'] must be a list") if len(self.config['instrument']) == 0: raise ValueError("At least one instrument must be configured in self.config['instrument']") for i, instrument in enumerate(self.config['instrument']): required_instrument_keys = ['client_name', 'data', 'measure_base_kw', 'empty_base_kw', 'sample_dim', 'sample_comps_variable'] missing_instrument_keys = [key for key in required_instrument_keys if key not in instrument] if missing_instrument_keys: raise KeyError(f"Instrument {i} is missing the following required keys: {', '.join(missing_instrument_keys)}") if not isinstance(instrument['data'], list): raise TypeError(f"Instrument {i}: 'data' must be a list") for j, data_item in enumerate(instrument['data']): required_data_keys = ['data_name', 'tiled_array_name'] missing_data_keys = [key for key in required_data_keys if key not in data_item] if missing_data_keys: raise KeyError(f"Instrument {i}, data item {j} is missing the following required keys: {', '.join(missing_data_keys)}") logging.info("Grid configuration validation passed successfully.")
@property def tiled_client(self): # start tiled catalog connection if self.data is None: raise ValueError("No DataTiled object added to this class...was it instantiated correctly?") return self.data.tiled_client
[docs] def status(self): status = [] status.append(f'Snapshots: {self.config["snapshot_directory"]}') status.append(f'Cameras: {self.config["camera_urls"]}') status.append(f'{len(self.deck.stocks)} stocks loaded!') status.append(self.status_str) status.append(self.AL_status_str) if self.grid_data: status.append(f'Grid Dims: {self.grid_data.sizes}') return status
[docs] def update_status(self, value): self.status_str = value self.app.logger.info(value)
[docs] def get_client(self,name,refresh=False): try: client = self.client[name] except KeyError: if name not in self.config['client']: raise ValueError(( f"""Could not find client url for '{name}' in config. Current client dict in config is """ f"""self.config['client'] = {self.config['client']}""" )) url = self.config['client'][name] client = Client(url.split(':')[0], port=url.split(':')[1]) self.client[name] = client refresh = True if refresh: client.login("SampleServer") client.debug(False) return client
[docs] def take_snapshot(self, prefix): now = datetime.datetime.now().strftime('%y%m%d-%H:%M:%S') for i, cam_url in enumerate(self.config['camera_urls']): fname = pathlib.Path(self.config['snapshot_directory']) / ( f"{prefix}-{i}-{now}.jpg" ) try: r = requests.get(cam_url, stream=True) if r.status_code == 200: with open(fname, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) except Exception as error: output_str = f'take_snapshot failed with error: {error.__repr__()}\n\n' + traceback.format_exc() + '\n\n' self.app.logger.warning(output_str)
######################## ## DECK CONFIGURATION ## ########################
[docs] def reset_deck(self): self.deck = AFL.automation.prepare.Deck()
[docs] def add_container(self, name, slot): self.deck.add_container(name, slot)
[docs] def add_catch(self, name, slot): self.deck.add_catch(name, slot) self.catch_loc = f"{slot}A1"
[docs] def add_pipette(self, name, mount, tipracks): self.deck.add_pipette(name, mount, tipracks=tipracks)
[docs] def send_deck_config(self, home=True): self.deck.init_remote_connection( self.get_client('prep').ip, home=home ) self.deck.send_deck_config()
[docs] def add_stock(self, stock_dict, loc): soln = AFL.automation.prepare.Solution.from_dict(stock_dict) self.deck.add_stock(soln, loc)
[docs] def set_catch_protocol(self, **kwargs): self.catch_protocol = AFL.automation.prepare.PipetteAction(**kwargs)
[docs] def fix_protocol_order(self, mix_order: List, custom_stock_settings: Dict): mix_order = [self.deck.get_stock(i) for i in mix_order] mix_order_map = {loc: new_index for new_index, (stock, loc) in enumerate(mix_order)} for sample, validated in self.deck.sample_series: # if not validated: # continue old_protocol = sample.protocol ordered_indices = list(map(lambda x: mix_order_map.get(x.source), sample.protocol)) argsort = np.argsort(ordered_indices) new_protocol = list(map(sample.protocol.__getitem__, argsort)) time_patched_protocol = [] for entry in new_protocol: if entry.source in custom_stock_settings: for setting, value in custom_stock_settings[entry.source].items(): entry.kwargs[setting] = value time_patched_protocol.append(entry) sample.protocol = time_patched_protocol
[docs] def mfrac_to_mass(self, mass_fractions:Dict, fixed_conc: Dict, sample_volume, output_units:str='mg'): """Convert ternary/Barycentric mass fractions to mass""" if not (len(mass_fractions) == 3): raise ValueError('Only ternaries are currently supported. Need to pass three mass fractions') if len(fixed_conc) > 1: raise ValueError('Only one concentration should be fixed!') specified_component = list(fixed_conc.keys())[0] components = list(mass_fractions.keys()) components.remove(specified_component) xB = mass_fractions[components[0]] * units('') xC = mass_fractions[components[1]] * units('') XB = xB / (1 - xB) XC = xC / (1 - xC) mA = (fixed_conc[specified_component] * sample_volume) mC = mA * (XC + XB * XC) / (1 - XB * XC) mB = XB * (mA + mC) mass_dict = {} mass_dict[specified_component] = mA.to(output_units) mass_dict[components[0]] = mB.to(output_units) mass_dict[components[1]] = mC.to(output_units) return mass_dict
[docs] def process_sample( self, composition: Dict, sample_volume: Dict, fixed_concs: Dict, prepare_mfrac_split: Optional[Dict]=None, predict_combine_comps: Optional[Dict]=None, predict_next: bool = False, enqueue_next: bool = False, calibrate_sensor: bool = False, name: Optional[str] = None, sample_uuid: Optional[str] = None, AL_campaign_name: Optional[str] = None, AL_uuid: Optional[str] = None, ): """Make protocol for, mix, load, and measure sample. Potentially query Agent for next sample and enqueue Parameters ---------- composition: Dict Dict should be of the form composition["component_name"] = {"value":value, "units":units} sample_volume: dict Dict should be of the form sample_volume = {"value":value, "units":units} fixed_concs: Optional[Dict] Dict should be of the form fixed_concs[component] = {"value":value, "units":units} mfrac_split: Dict Dict should be of the form mfrac_split = {'component_to_split':{'component_A':'mfrac_A','component_B':'mfrac_B'}} predict_next: bool If True, will trigger predict call to the agent enqueue_next: bool If True, will pull the next sample from the dropbox of the agent calibrate_sensor: bool If True, trigger a load stopper sensor recalibration before the next measurement name: str The name of the sample, if not generated, it will be auto generated from the self.config['data_tag'] and uuid sample_uuid: str uuid of sample, if not specified it will be auto-generated AL_uuid: str uuid of AL campaign AL_campaign_name: str name of AL campaign """ assert len(self.config['instrument'])>0, ( """No instruments loaded in config for this server! Use client.set_config(instrument=[xyz])""" ) assert ('load' in self.config['client']), ( f"No client url for 'load'! self.config['client']={self.config['client']}" ) assert ('prep' in self.config['client']), ( f"No client url for 'prep'! self.config['client']={self.config['client']}" ) if predict_next or enqueue_next: assert ('agent' in self.config['client']), ( f"No client url for 'agent'! self.config['client']={self.config['client']}" ) # do this now, so that we fail early if we're missing something self.validate_config() if sample_uuid is None: self.uuid['sample'] = 'SAM-' + str(uuid.uuid4()) else: self.uuid['sample'] = sample_uuid if name is None: self.sample_name = f"{self.config['data_tag']}_{self.uuid['sample'][-8:]}" else: self.sample_name = name if predict_next and AL_uuid is None: self.uuid['AL'] = 'AL-' + str(uuid.uuid4()) else: self.uuid['AL'] = AL_uuid if predict_next and AL_campaign_name is None: self.AL_campaign_name = f"{self.config['data_tag']}_{self.uuid['AL'][-8:]}" else: self.AL_campaign_name = AL_campaign_name logging.info(f'Composition: {composition}') if composition: # composition is not empty prep_protocol, catch_protocol = self.compute_prep_protocol( composition = composition, fixed_concs = fixed_concs, mfrac_split = prepare_mfrac_split, sample_volume = sample_volume ) # configure all servers to this sample name and uuid sample_composition_realized = { k:{'value':v.magnitude ,'units':str(v.units)} for k,v in self.sample.target_check.concentration.items() } self.data['sample_composition_target'] = composition self.data['sample_composition_realized'] = sample_composition_realized sample_data = self.set_sample( sample_name = self.sample_name, sample_uuid = self.uuid['sample'], AL_campaign_name = self.AL_campaign_name, AL_uuid = self.uuid['AL'], AL_components = self.config['AL_components'], sample_composition = sample_composition_realized, ) for client_name in self.config['client'].keys(): if client_name not in self.config['tiled_exclusion_list']: self.get_client(client_name).enqueue(task_name='set_sample', **sample_data) # START NEW INDENT prep_protocol, catch_protocol = self.compute_prep_protocol( composition = composition, fixed_concs = fixed_concs, mfrac_split = prepare_mfrac_split, sample_volume = sample_volume ) # configure all servers to this sample name and uuid sample_composition_realized = { k:{'value':v.magnitude ,'units':str(v.units)} for k,v in self.sample.target_check.concentration.items() } self.data['sample_composition_target'] = composition self.data['sample_composition_realized'] = sample_composition_realized sample_data = self.set_sample( sample_name = self.sample_name, sample_uuid = self.uuid['sample'], AL_campaign_name = self.AL_campaign_name, AL_uuid = self.uuid['AL'], AL_components = self.config['AL_components'], sample_composition = sample_composition_realized, ) for client_name in self.config['client'].keys(): if client_name not in self.config['tiled_exclusion_list']: self.get_client(client_name).enqueue(task_name='set_sample', **sample_data) self.make_and_measure(name=self.sample_name, prep_protocol=prep_protocol, catch_protocol=catch_protocol, calibrate_sensor=calibrate_sensor) self.construct_datasets(combine_comps=predict_combine_comps) # END NEW INDENT if enqueue_next or predict_next: if composition:#assume we made/measured a sample and append self.add_new_data_to_agent() self.predict_next_sample() # Look away ... here be dragons ... if enqueue_next: ag_result = self.get_client('agent').retrieve_obj(uid=self.uuid['agent']) next_samples = ag_result[self.config['next_samples_variable']] new_composition = next_samples.to_pandas().squeeze().to_dict() new_composition = {k:{'value':v,'units':'milligram / milliliter'} for k,v in new_composition.items()} task = { 'task_name':'process_sample', 'composition': new_composition, 'sample_volume': sample_volume, 'fixed_concs':fixed_concs, 'prepare_mfrac_split':prepare_mfrac_split, 'predict_combine_comps': predict_combine_comps, 'predict_next':predict_next, 'enqueue_next':enqueue_next, 'AL_campaign_name':self.AL_campaign_name, 'AL_uuid': self.uuid['AL'], } task_uuid = 'QD-' + str(uuid.uuid4()) package = {'task':task,'meta':{},'uuid':task_uuid} package['meta']['queued'] = datetime.datetime.now().strftime('%m/%d/%y %H:%M:%S-%f') queue_loc = self._queue.qsize() #append at end of queue self._queue.put(package,queue_loc)
[docs] def compute_prep_protocol(self,composition: Dict, sample_volume: Dict, fixed_concs: Dict, mfrac_split:Optional[Dict]=None): """ Parameters ---------- composition: Dict Dict should be of the form composition["component_name"] = {"value":value, "units":units} sample_volume: Dict Dict should be of the form sample_volume = {"value":value, "units":units} mfrac_split: Dict Dict should be of the form mfrac_split = {'component_to_split':{'component_A':'mfrac_A','component_B':'mfrac_B'}} """ sample_volume = sample_volume['value'] * units(sample_volume['units']) if self.config['ternary']: assert len(composition)==3, ( f"Number of composition variables should be 3! You have composition = {composition}" ) fixed_concs_units = {} for name, value in fixed_concs.items(): fixed_concs_units[name] = value['value'] * units(value['units']) mass_dict = self.mfrac_to_mass( mass_fractions=composition, fixed_conc=fixed_concs_units, sample_volume=sample_volume, output_units='mg' ) else: # assume concs for now... if len(composition) < (len(self.config['components']) - len(fixed_concs) - 1): raise ValueError('System under specified...') mass_dict = {} for name, comp in composition.items(): mass_dict[name] = (comp['value'] * units(comp['units']) * sample_volume).to('mg') for name, comp in fixed_concs.items(): mass_dict[name] = (comp['value'] * units(comp['units']) * sample_volume).to('mg') logging.info(mass_dict) if mfrac_split is not None: for split_component, split_def in mfrac_split.items(): target_mass = mass_dict[split_component] for component,mfrac in split_def.items(): mass_dict[component] = target_mass*mfrac del mass_dict[split_component] # for component in self.config['components']: # if component not in mass_dict: # mass_dict[component] = 0.0*units('mg') self.target = AFL.automation.prepare.Solution('target', self.config['components']) #self.target = AFL.automation.prepare.Solution('target', list(composition.keys())) self.target.volume = sample_volume for k, v in mass_dict.items(): self.target[k].mass = v self.target.volume = sample_volume self.deck.reset_targets() self.deck.add_target(self.target, name='target') self.deck.make_sample_series(reset_sample_series=True) self.deck.validate_sample_series(tolerance=self.config['sample_composition_tol']) self.deck.make_protocol(only_validated=False) self.fix_protocol_order(self.config['mix_order'], self.config['custom_stock_settings']) self.sample, self.validated = self.deck.sample_series[0] self.app.logger.info(self.deck.validation_report) if self.validated: self.app.logger.info(f'Validation PASSED') self.AL_status_str = 'Last sample validation PASSED' else: self.app.logger.info(f'Validation FAILED') self.AL_status_str = 'Last sample validation FAILED' self.app.logger.info(f'Making next sample with mass fraction: {self.sample.target_check.mass_fraction}') self.catch_protocol.source = self.sample.target_loc self.protocol = self.sample.emit_protocol() return self.sample.emit_protocol(), [self.catch_protocol.emit_protocol()]
[docs] def make_and_measure( self, name: str, prep_protocol: dict, catch_protocol: dict, calibrate_sensor: bool = False, ): self.update_status(f'starting make and measure for {name}') targets = set() for task in prep_protocol: if 'target' in task['source'].lower(): targets.add(task['source']) if 'target' in task['dest'].lower(): targets.add(task['dest']) for task in catch_protocol: if 'target' in task['source'].lower(): targets.add(task['source']) if 'target' in task['dest'].lower(): targets.add(task['dest']) target_map = {} for t in targets: prep_target = self.get_client('prep').enqueue(task_name='get_prep_target', interactive=True)['return_val'] target_map[t] = prep_target for i, task in enumerate(prep_protocol): # if the well isn't in the map, just use the well task['source'] = target_map.get(task['source'], task['source']) task['dest'] = target_map.get(task['dest'], task['dest']) if i == 0: task['force_new_tip'] = True if i == (len(prep_protocol) - 1): # last prepare task['drop_tip'] = True# false to conserve tips, not working with OT2HTTP False self.uuid['prep'] = self.get_client('prep').enqueue(task_name='transfer',**task) if self.uuid['rinse'] is not None: self.update_status(f'Waiting for rinse...') self.get_client('load').wait(self.uuid['rinse'],for_history=False) self.update_status(f'Rinse done!') if calibrate_sensor: # calibrate sensor to avoid drift self.get_client('load').enqueue(task_name='calibrate_sensor') #XXX need to work out measure loop self.update_status(f'Cell is clean, measuring empty cell scattering...') self.measure(name=name, empty=True,wait=True) if self.uuid['prep'] is not None: self.get_client('prep').wait(self.uuid['prep'],for_history=False) self.take_snapshot(prefix=f'02-after-prep-{name}') self.update_status(f'Queueing sample {name} load into syringe loader') for task in catch_protocol: # if the well isn't in the map, just use the well task['source'] = target_map.get(task['source'], task['source']) task['dest'] = target_map.get(task['dest'], task['dest']) self.uuid['catch'] = self.get_client('prep').enqueue(task_name='transfer',**task) if self.uuid['catch'] is not None: self.update_status(f"Waiting for sample prep/catch of {name} to finish: {self.uuid['catch'][-8:]}") self.get_client('prep').wait(self.uuid['catch']) self.take_snapshot(prefix=f'03-after-catch-{name}') # homing robot to try to mitigate drift problems self.get_client('prep').enqueue(task_name='home') # do the sample measurement train self.update_status(f"Measuring sample with all loaded instruments...") self.measure(name=name, empty=False, wait=True) self.update_status(f'Cleaning up sample {name}...') self.uuid['rinse'] = self.get_client('load').enqueue(task_name='rinseCell') self.take_snapshot(prefix=f'07-after-measure-{name}') self.reset_sample_env(wait=False) self.update_status(f'All done for {name}!')
[docs] def measure(self, name: str, empty: bool = False, wait: bool = True): # need to iterate over instrument dict # - instrument dict will specify where to load sample to, how to call instrument, and any kwargs """ instrument = dict ( load_kw = {'load_dest_label':'AfterSANS'} client_name = 'larmor' measure_base_kw = {'task_name': expose, block:True, exposure: 3600} empty_base_kw = {'task_name': expose, block:True, exposure: 3600} ) """ assert len(self.config['instrument'])>0, 'No instruments loaded in config for this server!' if empty: name = 'MT-' + name instrument=None for i,instrument in enumerate(self.config['instrument']): self.update_status(f'Measuring using instrument #{i}') if not empty: load_kw = {} if i==0: load_kw['task_name'] = 'loadSample' else: load_kw['task_name'] = 'advanceSample' load_kw['load_dest_label'] = instrument.get('load_dest_label','') self.uuid['load'] = self.get_client('load').enqueue(**load_kw) self.get_client('load').wait(self.uuid['load']) self.take_snapshot(prefix=f'05-after-load-{instrument["name"]}-{name}') if empty: measure_kw = instrument['empty_base_kw'] else: measure_kw = instrument['measure_base_kw'] measure_kw['name'] = name if 'sample_env' in instrument.keys() and not empty: """ schema: 'sample_env': { 'client_name' = 'tempdeck', 'move_base_kw' = {'task_name': 'move_temp'}, 'move_swept_kw' = {'temperature': [15,20,25,30]}, } """ params = [] vals = [] for param,conds in instrument['sample_env']['move_swept_kw'].items(): params.append(param) vals.append(conds) conditions = [{i:j for i,j in zip(params,vallist)} for vallist in itertools.product(*vals)] # this ravels the list of conditions above in n-dimensional space, e.g.: # 'move_swept_kw' = {'temperature': [15,20,25,30], 'vibes': ['harsh', 'mid', 'cool']} # conditions = [{'temperature': 15, 'vibes': 'harsh'}, {'temperature': 15, 'vibes': 'mid'} ...] starting_condition = conditions[0] sample_data = self.get_sample() base_sample_name = sample_data["sample_name"] for i,cond in enumerate(conditions): sample_env_kw = {} sample_env_kw.update(cond) sample_env_kw.update(instrument['sample_env']['move_base_kw']) sample_data = self.get_sample() sample_data['sample_env_conditions'] = cond sample_data['sample_name'] = base_sample_name + f'_{str(i).zfill(3)}' # track up to 1000 conditions self.get_client(instrument['sample_env']['client_name']).enqueue(task_name='set_sample',**sample_data) self.get_client(instrument['client_name']).enqueue(task_name='set_sample',**sample_data) self.update_status(f'Moving sample env {instrument["sample_env"]["client_name"]}...') self.uuid['move_sample_env'] = self.get_client(instrument['sample_env']['client_name']).enqueue(**sample_env_kw) self.get_client(instrument['sample_env']['client_name']).wait(self.uuid['move_sample_env']) self.update_status(f'Measuring on instrument {instrument["client_name"]}') measure_kw['name'] = sample_data['sample_name'] self.uuid['measure'] = self.get_client(instrument['client_name']).enqueue(**measure_kw) self.get_client(instrument['client_name']).wait(self.uuid['measure']) # # move sample environment to initial starting state to prepare for next measurement # sample_env_kw = {} # sample_env_kw.update(starting_condition) # sample_env_kw.update(instrument['sample_env']['move_base_kw']) # self.uuid['move_sample_env'] = self.get_client(instrument['sample_env']['client_name']).enqueue(**sample_env_kw) else: self.uuid['measure'] = self.get_client(instrument['client_name']).enqueue(**measure_kw) if wait: self.get_client(instrument['client_name']).wait(self.uuid['measure'])
[docs] def reset_sample_env(self, wait: bool = True): for i,instrument in enumerate(self.config['instrument']): if 'sample_env' in instrument.keys(): """ schema: 'sample_env': { 'client_name' = 'tempdeck', 'move_base_kw' = {'task_name': 'move_temp'}, 'move_swept_kw' = {'temperature': [15,20,25,30]}, } """ params = [] vals = [] for param,conds in instrument['sample_env']['move_swept_kw'].items(): params.append(param) vals.append(conds) conditions = [{i:j for i,j in zip(params,vallist)} for vallist in itertools.product(*vals)] # this ravels the list of conditions above in n-dimensional space, e.g.: # 'move_swept_kw' = {'temperature': [15,20,25,30], 'vibes': ['harsh', 'mid', 'cool']} # conditions = [{'temperature': 15, 'vibes': 'harsh'}, {'temperature': 15, 'vibes': 'mid'} ...] starting_condition = conditions[0] sample_env_kw = {} sample_env_kw.update(starting_condition) sample_env_kw.update(instrument['sample_env']['move_base_kw']) self.uuid['move_sample_env'] = self.get_client(instrument['sample_env']['client_name']).enqueue(**sample_env_kw) if wait: self.get_client(instrument['sample_env']['client_name']).wait(self.uuid['move_sample_env'])
[docs] def construct_datasets(self,combine_comps=None): """Construct AL manifest from measurement and call predict""" data_path = pathlib.Path(self.config['data_path']) # if len(self.config['instrument'])>1: # raise NotImplementedError if self.tiled_client is None: self.tiled_client = self.data.tiled_client # this needs to be here, because in the constructor, we don't have the datapacket attached self.new_data = xr.Dataset() for i, instrument in enumerate(self.config['instrument']): for instrument_data in instrument['data']: tiled_result = ( self.tiled_client .search(Eq('sample_uuid', self.uuid['sample'])) .search(Eq('array_name', instrument_data['tiled_array_name'])) ) if len(tiled_result) == 0: raise ValueError(f"Could not find tiled entry for measurement sample_uuid={self.uuid['sample']}") # handle Python None and "None" depending on how json deserialization works out if (instrument_data['data_dim'] is not None) and (instrument_data['data_dim'] != 'None'): dims = instrument_data['data_dim'] coords = {instrument_data['data_dim']: tiled_result.items()[-1][-1].metadata[ instrument_data['tiled_metadata_dim']]} else: dims = None coords = None if 'sample_env' in instrument.keys(): sample_env_dims = list(instrument['sample_env']['move_swept_kw'].keys()) measurement_list = [] for _,tiled_data in tiled_result.items(): if 'MT-' in tiled_data.metadata['name']: continue measurement_list.append(xr.DataArray(tiled_data[()], dims=dims, coords=coords)) measurement = xr.concat(measurement_list, dim=instrument['sample_dim']) for key,values in instrument['sample_env']['move_swept_kw'].items(): measurement[key] = (instrument['sample_dim'],values) self.new_data[instrument_data['data_name']] = measurement logging.debug(self.new_data) logging.debug(self.new_data) else: tiled_data = tiled_result.items()[-1][-1] measurement = xr.DataArray(tiled_data[()], dims=dims, coords=coords) self.new_data[instrument_data['data_name']] = measurement if 'quality_metric' in instrument.keys(): quality_metric = instrument['quality_metric'] instrument_value = tiled_data.metadata[quality_metric['tiled_metadata_key']] if quality_metric['comparison'].lower() in ['<', 'lt']: accept = instrument_value < quality_metric['threshold'] elif quality_metric['comparison'].lower() in ['>', 'gt']: accept = instrument_value > quality_metric['threshold'] elif quality_metric['comparison'].lower() in ['==', '=', 'eq']: accept = instrument_value == quality_metric['threshold'] else: raise ValueError( f'Cannot recognize comparison for quality_metric. You passed {quality_metric["comparison"]}') else: accept = True self.new_data[instrument_data['data_name']].attrs['accept'] = int(accept) self.new_data['validated'] = self.validated self.new_data['sample_uuid'] = self.uuid['sample'] sample_composition = {} if self.config['ternary']: total = 0 for component in self.config['AL_components']: mf = self.sample.target_check.mass_fraction[component].magnitude self.new_data[component] = mf total += mf for component in self.config['AL_components']: self.new_data[component] = self.new_data[component] / total # for tiled sample_composition['ternary_mfrac_' + component] = self.sample.target_check.concentration[ component].to("mg/ml").magnitude else: for component in self.config['AL_components']: try: self.new_data[component] = self.sample.target_check.concentration[component].to("mg/ml").magnitude self.new_data[component].attrs['units'] = 'mg/ml' # for tiled sample_composition['conc_' + component] = self.sample.target_check.concentration[component].to( "mg/ml").magnitude except KeyError: warnings.warn(f"Skipping component {component} in AL_components") for component in self.config['components']: self.new_data['mfrac_' + component] = self.sample.target_check.mass_fraction[component].magnitude self.new_data['mass_' + component] = self.sample.target_check[component].mass.to('mg').magnitude self.new_data['mass_' + component].attrs['units'] = 'mg' if self.sample.target_check[component].volume is not None: self.new_data['volume_' + component] = self.sample.target_check[component].volume.to('ml').magnitude self.new_data['volume_' + component].attrs['units'] = 'ml' # for tiled sample_composition['mfrac_' + component] = self.sample.target_check.mass_fraction[component].magnitude sample_composition['mass_' + component] = self.sample.target_check[component].mass.to('mg').magnitude if combine_comps is not None: for new_component,combine_list in combine_comps.items(): conc = 0 * units('mg/ml') mass = 0 * units('mg') volume = 0 * units('ul') for component in combine_list: conc += self.sample.target_check.concentration[component].to("mg/ml") mass += self.sample.target_check[component].mass.to("mg") volume += self.sample.target_check[component].volume.to("ul") self.new_data[new_component] = conc.to("mg/ml").magnitude #include as main AL variable self.new_data['conc_'+new_component] = conc.to("mg/ml").magnitude self.new_data['mass_'+new_component] = mass.to("mg").magnitude self.new_data['volume_'+new_component] = volume.to("ul").magnitude self.new_data.to_netcdf(data_path / (self.sample_name + '.nc')) sample_composition['components'] = self.config['components'] sample_composition['conc_units'] = 'mg/ml' sample_composition['mass_units'] = 'mg' sample_composition = {str(k):v for k,v in sample_composition.items()} if self.data is not None: self.data['sample_composition'] = sample_composition self.data['time'] = datetime.datetime.now().strftime('%m/%d/%y %H:%M:%S-%f %Z%z')
#self.data.finalize() #I don't think we want this with the new sampel_server style
[docs] def add_new_data_to_agent(self,combine_comps=None): """Construct AL manifest from measurement and call predict""" for i,instrument in enumerate(self.config['instrument']): self.ds_append = xr.Dataset() self.ds_append[instrument['sample_comps_variable']] = self.new_data[self.config['AL_components']].to_array('component').transpose(...,'component') data_added=0 for instrument_data in instrument['data']: if bool(self.new_data[instrument_data['data_name']].attrs['accept']):#because xArray and json is the worst self.ds_append[instrument_data['data_name']] = self.new_data[instrument_data['data_name']] data_added+=1 else: continue if data_added>0: self.ds_append = self.ds_append.reset_coords() if instrument['sample_dim'] not in self.ds_append: self.ds_append = self.ds_append.expand_dims(instrument['sample_dim']) db_uuid = self.get_client('agent').deposit_obj(obj=self.ds_append) self.get_client('agent').enqueue(task_name='append',db_uuid=db_uuid,concat_dim=instrument['sample_dim'])
[docs] def predict_next_sample(self): self.uuid['agent'] = self.get_client('agent').enqueue( task_name='predict', sample_uuid=self.uuid['sample'], AL_campaign_name=self.AL_campaign_name, interactive=True )['return_val']
[docs] def validate_measurements(self): data_path = self.config['data_path'] h5_path = data_path / (self.sample_name + '.h5') with h5py.File(h5_path, 'r') as h5: self.SAS_transmission = h5['entry/sample/transmission'][()] if self.SAS_transmission > self.config['max_sample_transmission']: self.update_status(f'Last sample missed! (Transmission={self.SAS_transmission})') self.app.logger.info('Dropping this sample from AL and hoping the next one hits...') transmission_validated = False else: self.update_status(f'Last Sample success! (Transmission={self.SAS_transmission})') transmission_validated = True return transmission_validated
[docs] def process_sample_grid( self, sample, name: Optional[str] = None, sample_uuid: Optional[str] = None, AL_campaign_name: Optional[str] = None, AL_uuid: Optional[str] = None, predict_next: bool = False, enqueue_next: bool = False, reset_grid: bool = False, ): """Process a sample from a grid of samples. Parameters ---------- sample: Dict, optional Dictionary containing sample coordinates or properties. If None, will be fetched from the grid or agent. name: str, optional The name of the sample. If not provided, will be auto-generated. sample_uuid: str, optional UUID for the sample. If not provided, will be auto-generated. AL_campaign_name: str, optional Name of the active learning campaign. AL_uuid: str, optional UUID for the active learning campaign. predict_next: bool If True, triggers a predict call to the agent. enqueue_next: bool If True, will enqueue the next sample for measurement. """ # Validate config for grid processing self.validate_config_grid() if reset_grid or self.grid_data is None: self.reset_grid() # Handle sample UUID generation if sample_uuid is None: self.uuid['sample'] = 'SAM-' + str(uuid.uuid4()) else: self.uuid['sample'] = sample_uuid # Handle AL UUID if predict_next and AL_uuid is None: self.uuid['AL'] = 'AL-' + str(uuid.uuid4()) else: self.uuid['AL'] = AL_uuid # Handle campaign name if predict_next and AL_campaign_name is None: self.AL_campaign_name = f"{self.config['data_tag']}_{self.uuid['AL'][-8:]}" else: self.AL_campaign_name = AL_campaign_name # Check if we should measure a blank if (self.config['grid_blank_interval'] is not None and self.config['grid_blank_sample'] is not None and self.grid_sample_count > 0 and self.grid_sample_count % self.config['grid_blank_interval'] == 0): self.update_status(f"Measuring blank sample (scheduled interval {self.config['grid_blank_interval']})") blank_sample = self.config['grid_blank_sample'] blank_name = f"blank_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" # Configure blank measurement self.measure_grid_sample(blank_sample, name=blank_name, empty=True) # Handle the next sample based on mode if sample is not None: # find the closest sample in the grid by euclidean distance available = self.grid_data[self.config['components']].to_array('component').transpose(...,'component') selected = xr.Dataset(sample).to_array('component') dist = (selected - available).pipe(np.square).sum('component') #no sqrt needed for just min distance sample_index = dist.argmin() # Get the data variables from grid_data and add them individually to sample dict grid_sample = self.grid_data.isel(sample=sample_index).reset_coords() for var_name in grid_sample.data_vars: sample[var_name] = grid_sample[var_name].item() self.update_status(f"Found closest sample to be {grid_sample}") # Generate sample name if not provided if name is None: self.sample_name = f"{self.config['data_tag']}_{self.uuid['sample'][-8:]}" else: self.sample_name = name # Measure the sample self.measure_grid_sample(sample, name=self.sample_name, empty=False) self.construct_grid_datasets(sample) # update sample manifest and grid data self.num_samples = self.grid_data.sizes['sample']#update num samples self.grid_data = self.grid_data.drop_isel(sample=sample_index) self.grid_sample_count += 1 # Predict next sample if requested if predict_next: if sample is not None: self.add_new_data_to_agent() self.predict_next_sample() # Enqueue next sample if requested if enqueue_next: ag_result = self.get_client('agent').retrieve_obj(uid=self.uuid['agent']) next_samples = ag_result[self.config['next_samples_variable']] new_composition = next_samples.to_pandas().squeeze().to_dict() new_composition = {k:v for k,v in new_composition.items()} task = { 'task_name': 'process_sample_grid', 'sample': new_composition, 'predict_next': predict_next, 'enqueue_next': enqueue_next, 'AL_campaign_name': self.AL_campaign_name, 'AL_uuid': self.uuid['AL'], } task_uuid = 'QD-' + str(uuid.uuid4()) package = {'task': task, 'meta': {}, 'uuid': task_uuid} package['meta']['queued'] = datetime.datetime.now().strftime('%m/%d/%y %H:%M:%S-%f') queue_loc = self._queue.qsize() # append at end of queue self._queue.put(package, queue_loc)
[docs] def measure_grid_sample(self, sample, name, empty=False): """Measure a sample using the grid-based workflow. instrument = { 'client_name': 'APSUSAXS', 'empty_base_kw': {'task_name': 'expose'}, 'measure_base_kw': {'task_name': 'expose'}, 'select_sample_base_kw': {'task_name': 'setPosition', 'y_offset': 2}, 'sample_select_kwargs': ['plate', 'row', 'col'], 'sample_comps_variable': 'sample_composition' } Parameters ---------- sample: Dict Dictionary containing sample coordinates and properties name: str Sample name for the measurement """ self.update_status(f"Starting measurement of {name}") # Set sample information in all clients sample_data = self.set_sample( sample_name=name, sample_uuid=self.uuid['sample'], AL_campaign_name=self.AL_campaign_name, AL_uuid=self.uuid['AL'], AL_components=self.config['AL_components'], sample_composition=sample, ) for client_name in self.config['client'].keys(): self.get_client(client_name).enqueue(task_name='set_sample', **sample_data) # Move to the sample position based on instrument configuration for i, instrument in enumerate(self.config['instrument']): self.update_status(f"Moving to sample position using global command template") move_cmd_kwargs = {k:sample[k] for k in instrument['sample_select_kwargs']} move_cmd_kwargs.update(instrument['select_sample_base_kw']) self.uuid['move'] = self.get_client(instrument['client_name']).enqueue(**move_cmd_kwargs) self.get_client(instrument['client_name']).wait(self.uuid['move']) self.update_status(f"Measuring sample with {instrument['client_name']}") measure_cmd_kwargs = instrument['measure_base_kw'] if not empty else instrument['empty_base_kw'] measure_cmd_kwargs['name'] = name self.uuid['measure'] = self.get_client(instrument['client_name']).enqueue(**measure_cmd_kwargs) self.get_client(instrument['client_name']).wait(self.uuid['measure']) self.update_status(f'All done for {name}!')
[docs] def construct_grid_datasets(self, sample: dict): """Construct datasets from grid-based measurements""" data_path = pathlib.Path(self.config['data_path']) if self.tiled_client is None: self.tiled_client = self.data.tiled_client # this needs to be here, because in the constructor, we don't have the datapacket attached self.new_data = xr.Dataset() for i, instrument in enumerate(self.config['instrument']): for k in instrument['sample_select_kwargs']: self.new_data[k] = sample[k] self.new_data[k].attrs['description'] = 'sample coordinate' for instrument_data in instrument['data']: tiled_result = ( self.tiled_client .search(Eq('sample_uuid', self.uuid['sample'])) .search(Eq('array_name', instrument_data['tiled_array_name'])) ) if len(tiled_result) == 0: raise ValueError(f"Could not find tiled entry for measurement sample_uuid={self.uuid['sample']}") # handle Python None and "None" depending on how json deserialization works out if (instrument_data['data_dim'] is not None) and (instrument_data['data_dim'] != 'None'): # first try to read this as an array entry in tiled, if not found, look in metadata tiled_result_dim = ( self.tiled_client .search(Eq('sample_uuid', self.uuid['sample'])) .search(Eq('array_name', instrument_data['data_dim'])) ) if len(tiled_result_dim)>0: dims = instrument_data['data_dim'] coords = {instrument_data['data_dim']: tiled_result_dim.values()[-1].read()} else: dims = instrument_data['data_dim'] coords = {instrument_data['data_dim']: tiled_result.items()[-1][-1].metadata[ instrument_data['tiled_metadata_dim']]} else: dims = None coords = None if 'sample_env' in instrument.keys(): sample_env_dims = list(instrument['sample_env']['move_swept_kw'].keys()) measurement_list = [] for _,tiled_data in tiled_result.items(): if 'MT-' in tiled_data.metadata['name']: continue measurement_list.append(xr.DataArray(tiled_data[()], dims=dims, coords=coords)) measurement = xr.concat(measurement_list, dim=instrument['sample_dim']) for key,values in instrument['sample_env']['move_swept_kw'].items(): measurement[key] = (instrument['sample_dim'],values) self.new_data[instrument_data['data_name']] = measurement else: tiled_data = tiled_result.items()[-1][-1] measurement = xr.DataArray(tiled_data[()], dims=dims, coords=coords) self.new_data[instrument_data['data_name']] = measurement if 'quality_metric' in instrument.keys(): quality_metric = instrument['quality_metric'] instrument_value = tiled_data.metadata[quality_metric['tiled_metadata_key']] if quality_metric['comparison'].lower() in ['<', 'lt']: accept = instrument_value < quality_metric['threshold'] elif quality_metric['comparison'].lower() in ['>', 'gt']: accept = instrument_value > quality_metric['threshold'] elif quality_metric['comparison'].lower() in ['==', '=', 'eq']: accept = instrument_value == quality_metric['threshold'] else: raise ValueError( f'Cannot recognize comparison for quality_metric. You passed {quality_metric["comparison"]}') else: accept = True self.new_data[instrument_data['data_name']].attrs['accept'] = int(accept) self.new_data['sample_uuid'] = self.uuid['sample'] sample_composition = {} for component in self.config['components']: try: self.new_data[component] = sample[component] # for tiled sample_composition[component] = sample[component] except KeyError: warnings.warn(f"Skipping component {component} in AL_components") self.new_data.to_netcdf(data_path / (self.sample_name + '.nc')) sample_composition['components'] = self.config['components'] if self.data is not None: self.data['sample_composition'] = sample_composition self.data['time'] = datetime.datetime.now().strftime('%m/%d/%y %H:%M:%S-%f %Z%z')
[docs] def set_sample(self, sample_name: str, sample_uuid: str, AL_campaign_name: Optional[str] = None, AL_uuid: Optional[str] = None, AL_components: Optional[List] = None, sample_composition: Optional[Dict] = None): """Set sample information for all clients Parameters ---------- sample_name: str Name of the sample sample_uuid: str UUID of the sample AL_campaign_name: str, optional Name of the AL campaign AL_uuid: str, optional UUID of the AL campaign AL_components: List, optional List of components for AL sample_composition: Dict, optional Composition of the sample Returns ------- Dict Sample data for client communication """ sample_data = { 'sample_name': sample_name, 'sample_uuid': sample_uuid, } if AL_campaign_name is not None: sample_data['AL_campaign_name'] = AL_campaign_name if AL_uuid is not None: sample_data['AL_uuid'] = AL_uuid if AL_components is not None: sample_data['AL_components'] = AL_components if sample_composition is not None: sample_data['sample_composition'] = sample_composition return sample_data
# New grid reset methods
[docs] @Driver.quickbar(qb={'button_text':'Reset Grid'}) def reset_grid(self): """Reload the grid from file and reset grid_sample_count.""" if self.config['grid_file'] is None: self.app.logger.info("No grid file specified in configuration.") self.grid_data = None self.grid_sample_count = 0 else: self.grid_data = xr.load_dataset(self.config['grid_file']) self.grid_sample_count = 0 self.app.logger.info(f"Grid reloaded: {self.grid_data.sizes['sample']} samples available.")
_DEFAULT_CUSTOM_CONFIG = { '_classname': 'AFL.automation.sample.SampleDriver.SampleDriver', } if __name__ == '__main__': from AFL.automation.shared.launcher import *