Source code for AFL.automation.sample.CastingServer

from AFL.automation.APIServer.Client import Client
from AFL.automation.prepare.OT2Client import OT2Client
from AFL.automation.shared.utilities import listify
from AFL.automation.APIServer.Driver import Driver

from math import ceil,sqrt
import json
import datetime,time
import requests
import shutil
import datetime
import traceback

import xarray as xr


[docs] class CastingServer(Driver): defaults={} defaults['manifest'] = '/home/afl642/' defaults['manifest_prep'] = '/home/afl642/' defaults['manifest_cast'] = '/home/afl642/'
[docs] def __init__(self, prep_url, overrides=None, ): Driver.__init__(self,name='CastingServer_SampleDriver',defaults=self.gather_defaults(),overrides=overrides) self.app = None self.name = 'CastingServer_SampleDriver' if not (len(prep_url.split(':'))==2): raise ArgumentError('Need to specify both ip and port on prep_url') #prepare samples self.prep_client = OT2Client(prep_url.split(':')[0],port=prep_url.split(':')[1]) self.prep_client.login('SampleServer_PrepClient') self.prep_client.debug(False) # sample_name --> location mapping self.prepare_locs = {} self.cast_locs = {} self.prep_uuid = None self.status_str = 'Fresh Server!' self.wait_time = 30.0 #seconds self.init_casting_manifest()
[docs] def init_casting_manifest(self,attrs=None,overwrite=False): self.manifests = {} for manifest in ['manifest','manifest_prep','manifest_cast']: print(f'Trying to load {self.config[manifest]}') if not overwrite: try: self.manifests[manifest] = xr.load_dataset(self.config[manifest]) print(f'Loaded!') except (FileNotFoundError,ValueError): self.manifests[manifest] = xr.Dataset() print(f'Not Found...starting new manifest') else: self.manifests[manifest] = xr.Dataset() print(f'Starting new manifest') if attrs is not None: self.manifests[manifest].attrs.update(attrs)
[docs] def status(self): status = [] status.append(self.status_str) return status
[docs] def update_status(self,value): self.status_str = value self.app.logger.info(value)
[docs] def log_event(self,manifest_key,event,write=True): #event['started'] = datetime.datetime.strptime(event['started'],'%m/%d/%y %H:%M:%S-%f') #event['ended'] = datetime.datetime.strptime(event['ended'],'%m/%d/%y %H:%M:%S-%f') ds_event = xr.Dataset() for k,v in event.items(): ds_event[k] = ('event',v) self.manifests[manifest_key] = xr.concat([ self.manifests[manifest_key], ds_event ], dim='event' ) if write: self.manifests[manifest_key].to_netcdf(self.config[manifest_key]) self.manifests[manifest_key].to_pandas().to_csv(self.config[manifest_key].replace('nc','csv'))
[docs] def assign_targets(self,protocols,target_map): if target_map is None: self.targets = set() for name,protocol in protocols.items(): for task in protocol: if 'target' in task['source'].lower(): self.targets.add(task['source']) if 'target' in task['dest'].lower(): self.targets.add(task['dest']) self.target_map = {} for t in self.targets: prep_target = self.prep_client.enqueue(task_name='get_prep_target',interactive=True)['return_val'] self.target_map[t] = prep_target else: self.target_map = target_map for name,protocol in protocols.items(): for task in protocol: task['source'] = self.target_map.get(task['source'],task['source']) task['dest'] = self.target_map.get(task['dest'],task['dest']) return protocols
[docs] def prepare_casting_stocks(self,**spec): '''Spec should contain sample_name and a protocol''' self.init_casting_manifest() manifest = self.manifests['manifest_prep'] stock_name = spec['stock_name'] sample_names = spec['sample_names'] protocol = spec['protocol'] for sample_name,task in zip(sample_names,protocol): self.update_status(f'Transferring {task["volume"]} uL from {task["source"]} to {task["dest"]}') self.last_prep = self.prep_client.transfer(interactive=True,**task) self.last_prep_time = self.last_prep['ended'] self.last_prep['event_type'] = 'prep' self.last_prep['sample_name'] = sample_name self.last_prep['stock_name'] = stock_name self.last_prep.update(**task) self.log_event('manifest_prep',self.last_prep)
[docs] def bulk_cast_films(self,**spec): '''Spec should contain sample_name and a protocol''' self.init_casting_manifest() manifest = self.manifests['manifest_cast'] sample_names = spec['sample_names'] plate_names = spec['plate_names'] protocol = spec['protocol'] if not ( (len(sample_names)==len(plate_names)) and (len(sample_names)==len(protocol))): raise ValueError('Number of plate_names,sample_names, and protocols must be equal') for sample_name,plate_name,task in zip(sample_names,plate_names,protocol): self.update_status(f'Transferring {task["volume"]} uL from {task["source"]} to {task["dest"]}') self.last_prep = self.prep_client.transfer(interactive=True,**task) self.last_prep_time = self.last_prep['ended'] self.last_prep['event_type'] = 'cast' self.last_prep['sample_name'] = sample_name self.last_prep['plate_name'] = plate_name self.last_prep.update(**task) self.log_event('manifest_cast',self.last_prep)
[docs] def prepare_and_cast(self,**sample): self.init_casting_manifest() sample_name = sample['sample_name'] plate_names = sample['plate_names'] #stock_name = sample['stock_name'] # protocols = self.assign_targets({k:sample[k] for k in ['prep_protocol','cast_protocol']}) protocols = {k:sample[k] for k in ['prep_protocol','cast_protocol']} self.update_status(f'Preparing sample: {sample_name}') for task in protocols['prep_protocol']: self.update_status(f'Transferring {task["volume"]} uL from {task["source"]} to {task["dest"]}') self.last_prep = self.prep_client.transfer(interactive=True,**task) self.last_prep_time = self.last_prep['ended'] self.last_prep['event_type'] = 'prep' self.last_prep['sample_name'] = sample_name #self.last_prep['stock_name'] = stock_name self.last_prep.update(**task) #self.log_event('manifest_prep',self.last_prep) # shouldn't have to wait, but let's do this for safety if self.prep_uuid is not None: self.prep_client.wait(self.prep_uuid) # if 'min_mix_time' in sample: # try: # if not (len(sample['min_mix_time'])==len(sample['cast_protocol'])): # raise ValueError(f'min_mix_time has multiple values, but does not match length of cast protocol len(sample["cast_protocol"]') # except TypeError: # #value is scalar, make vector # sample['min_mix_time'] = [sample['min_mix_time']]*len(sample['cast_protocol']) # else: # sample['min_mix_time'] = [0.0]*len(sample['cast_protocol']) # self.update_status(f'Casting sample: {sample_name}') for plate_name,task in zip(plate_names,protocols['cast_protocol']): # delta = datetime.timedelta(seconds=min_mix_time) # self.update_status(f'Waiting for min_mix_time to be satisfied: {min_mix_time} s') # while (datetime.datetime.now()-self.last_prep['ended'])<delta: # time.sleep(0.05) # actual_mix_time = datetime.datetime.now()-self.last_prep['ended'] self.update_status(f'Transferring {task["volume"]} uL from {task["source"]} to {task["dest"]}') self.last_prep = self.prep_client.transfer(**task,interactive=True) self.last_prep['event_type'] = 'cast' self.last_prep['sample_name'] = sample_name self.last_prep['plate_name'] = plate_name self.last_prep.update(**task) #self.log_event('manifest_cast',self.last_prep) self.update_status(f'Cast {self.last_prep["sample_name"]}!') self.update_status(f'All done for sample {sample_name} on plate {plate_name}!')
_DEFAULT_CUSTOM_CONFIG = { '_classname': 'AFL.automation.sample.CastingServer.CastingServer', 'prep_url': 'localhost:5000' } _DEFAULT_CUSTOM_PORT = 5050 if __name__ == '__main__': from AFL.automation.shared.launcher import *