import gc
import time
import datetime
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.instrument.ScatteringInstrument import ScatteringInstrument
import numpy as np # for return types in get data
import h5py #for Nexus file writing
import os
import pathlib
import PIL
import warnings
import re,telnetlib #for sics telnet comms
import json
[docs]
class SINQSANS(ScatteringInstrument,Driver):
defaults = {}
defaults['sics_host'] = 'sans.psi.ch'
defaults['sics_port'] = 1301
defaults['user_login'] = 'User 22lns1' #User 23lns1
defaults['empty transmission'] = 1
defaults['transmission strategy'] = 'sum'
defaults['reduced_data_dir'] = '/mnt/home/chess_id3b/beaucage/211012-data'
defaults['exposure'] = 1.
defaults['absolute_calibration_factor'] = 1
defaults['data_path'] = '/home/afl642'
defaults['pixel1'] = 0.075 #pixel y size in m
defaults['pixel2'] = 0.075 #pixel x size in m
defaults['num_pixel1'] = 128
defaults['num_pixel2'] = 128
[docs]
def __init__(self,overrides=None):
'''
connect to spec
'''
self.app = None
Driver.__init__(self,name='SINQSANS',defaults=self.gather_defaults(),overrides=overrides)
ScatteringInstrument.__init__(self)
self.client = SICSTelnetClient(self.config['sics_host'],self.config['sics_port'],self.config['user_login'])
self.status_monitor_client = SICSTelnetClient(self.config['sics_host'],self.config['sics_port'],self.config['user_login'])
if self.config['reduced_data_dir'] is not None:
os.chdir(self.config['reduced_data_dir'])
self.__instrument_name__ = 'PSI SINQ SANS instrument'
self.status_txt = 'Just started...'
self.last_measured_transmission = [0,0,0,0]
[docs]
def pre_execute(self,**kwargs):
pass
[docs]
def setReducedDataDir(self,path):
self.config['reduced_data_dir'] = path
os.chdir(path)
[docs]
@Driver.quickbar(qb={'button_text':'Measure Transmission',
'params':{
'set_empty_transmission':{'label':'Set Empty Trans?','type':'boolean','default':False}
}})
def measureTransmission(self,set_empty_transmission=False,return_full=False):
self._simple_expose(exposure=2,block=True,mode='trans')
try:
cts = int(self.client.ask_param('banana sum 40 80 40 80'))
except ValueError:
cts = int(self.client.ask_param('banana sum 40 80 40 80'))
monitor_cts = 10
trans = cts/monitor_cts
if set_empty_transmission:
self.config['empty transmission'] = trans
self.last_measured_transmission = (trans/self.config['empty transmission'],monitor_cts,cts,self.config['empty transmission'])
if return_full:
return self.last_measured_transmission
else:
return trans/self.config['empty transmission']
[docs]
def lastMeasuredTransmission(self):
return self.last_measured_transmission
[docs]
@Driver.unqueued()
def getExposure(self):
'''
get the currently set exposure time
'''
return self.config['exposure']
[docs]
@Driver.unqueued()
def getFilename(self):
'''
get the currently set file name
'''
return self.client.ask_param('sample')
[docs]
@Driver.unqueued()
def getLastFilePath(self,**kwargs):
'''
get the currently set file name
'''
count = 0
while True:
sicsdatanumber = self.client.ask_param('sicsdatanumber')
try:
int(sicsdatanumber)
except ValueError:
if count>10:
raise ValueError('Could not read sicsdatanumber!')
time.sleep(0.5)
else:
break
count+=1
# filepath = pathlib.Path(self.config['data_path'])/f'sans2023n000{sicsdatanumber}.hdf'
filepath = pathlib.Path(self.config['data_path'])/f'sans2023n{int(sicsdatanumber):06d}.hdf'
if self.app is not None:
self.app.logger.debug(f'Last file found to be {filepath}')
else:
print(f'Last file found to be {filepath}')
return filepath
[docs]
def setExposure(self,exposure):
if self.app is not None:
self.app.logger.debug(f'Setting exposure time to {exposure}')
self.config['exposure'] = exposure
[docs]
def setFilename(self,name):
if self.app is not None:
self.app.logger.debug(f'Setting filename to {name}')
name = name.replace('\\','').replace('/','').replace(':','').replace('%','')
self.client.set_param('sample',name)
[docs]
def getElapsedTime(self):
raise NotImplementedError
[docs]
def readH5(self,filepath,update_config=False,**kwargs):
out_dict = {}
with h5py.File(filepath,'r') as h5:
out_dict['counts'] = h5['entry1/data1/counts'][()]
# out_dict['name'] = h5['entry1/sample/name'][()]
# out_dict['dist'] = h5['entry1/SANS/detector/x_position'][()]/1000
# out_dict['wavelength'] = h5['entry1/data1/lambda'][()]*1e-9,
# out_dict['beam_center_x'] = h5['entry1/SANS/detector/beam_center_x'][()]
# out_dict['beam_center_y'] = h5['entry1/SANS/detector/beam_center_y'][()]
# out_dict['poni2'] = h5['entry1/SANS/detector/beam_center_x'][()]*self.config['pixel1']
# out_dict['poni1 '] = h5['entry1/SANS/detector/beam_center_y'][()]*self.config['pixel2']
# if update_config:
# self.config['wavelength'] = out_dict['wavelength']
# self.config['dist'] = out_dict['dist']
# self.config['poni1'] = out_dict['poni1']
# self.config['poni2'] = out_dict['poni2']
return out_dict
[docs]
@Driver.unqueued(render_hint='2d_img',log_image=True)
def getData(self,**kwargs):
try:
filepath = self.getLastFilePath()
data = self.readH5(filepath)['counts']
except (FileNotFoundError,OSError,KeyError):
nattempts = 1
while nattempts<31:
nattempts = nattempts +1
time.sleep(1.0)
try:
filepath = self.getLastFilePath()
data = self.readH5(filepath)['counts']
except (FileNotFoundError,OSError,KeyError):
if nattempts == 30:
raise FileNotFoundError(f'could not locate file after {nattempts} tries')
else:
warnings.warn(f'failed to load file, trying again, this is try {nattempts}')
else:
break
return np.nan_to_num(data)
def _simple_expose(self,name=None,exposure=None,mode='scatt',aects=1e6,block=False):
if name is None:
name=self.getFilename()
else:
self.setFilename(name)
if exposure is None:
exposure=self.getAutoExposure(desired_counts = aects)
#self.setExposure(exposure)
self.status_txt = f'Starting {exposure} moni count named {name}'
if self.app is not None:
self.app.logger.debug(f'Starting exposure with name {name} for {exposure} moni cts')
if mode == 'scatt':
self.client.send_cmd(f'MLscatt {self.config["exposure"]}')
else:
self.client.send_cmd(f'MLtrans {self.config["exposure"]}')
if block:
time.sleep(2)
self.blockForCompleted()
time.sleep(0.5)
self.client.flush_buffer()
[docs]
def getAutoExposure(self,short_count_dur=2,desired_counts = 1000000):
self.client.send_cmd(f'count moni {short_count_dur} n')
time.sleep(1)
self.blockForIdle()
self.client.conn.read_lazy()
time.sleep(1)
try:
counts = int(self.client.ask_param('banana sum 0 127 0 127'))
except ValueError:
counts = int(self.status_monitor_client.ask_param('banana sum 0 127 0 127'))
count_rate = counts / short_count_dur
proposed_time = (desired_counts / count_rate)
if proposed_time>500:
print(f'with {counts} in a {short_count_dur} exposure, I would like to measure {proposed_time}, but that seems too high.')
print(f'setting to 500 instead.')
proposed_time = 500
if proposed_time < 5:
print(f'with {counts} in a {short_count_dur} exposure, I would like to measure {proposed_time}, but that seems too low.')
print(f'setting to 5 instead.')
proposed_time = 5
return proposed_time
[docs]
@Driver.quickbar(qb={'button_text':'Expose',
'params':{
'name':{'label':'Name','type':'text','default':'test_exposure'},
'exposure':{'label':'Exposure (s)','type':'float','default':5},
'reduce_data':{'label':'Reduce?','type':'bool','default':True},
'measure_transmission':{'label':'Measure Trans?','type':'bool','default':True}
}})
def expose(self,name=None,exposure=None,block=True,reduce_data=True,measure_transmission=True,save_nexus=True):
if name is None:
name=self.getFilename()
else:
self.setFilename(name)
if measure_transmission:
self.measureTransmission()
if exposure is not None:
#time.sleep(5)
#print('finding auto exposure counts..:')
#exposure=self.getAutoExposure()
self.setExposure(exposure)
pre_sicsdatanumber = self.client.ask_param('sicsdatanumber')
if self.data is not None:
self.data['pre_sicsdatanumber'] = pre_sicsdatanumber
self.status_txt = f'Starting {exposure} moni count named {name}'
if self.app is not None:
self.app.logger.debug(f'Starting exposure with name {name} for {exposure} moni cts')
self.client.send_cmd(f'MLscatt {self.config["exposure"]}')
if block or reduce_data or save_nexus:
self.blockForCompleted()
self.client.flush_buffer()
try:
trash = int(self.client.ask_param('banana sum 40 80 40 80'))
except ValueError:
try:
self.client.flush_buffer()
trash = int(self.client.ask_param('banana sum 40 80 40 80'))
except IndexError:
self.client.flush_buffer()
data = self.getData()
if save_nexus:
self.status_txt = 'Writing Nexus'
normalized_sample_transmission = self.last_measured_transmission[0]
if self.data is not None:
self.data['raw_data'] = data
self.data['normalized_sample_transmission'] = normalized_sample_transmission
self._writeNexus(data,name,name,self.last_measured_transmission)
if reduce_data:
self.status_txt = 'Reducing Data'
reduced = self.getReducedData(write_data=True,filename=name)
if self.data is not None:
self.data['reduced_data'] = reduced
np.savetxt(f'{name}_chosen_r1d.csv',np.transpose(reduced),delimiter=',')
normalized_sample_transmission = self.last_measured_transmission[0]
open_flux = self.last_measured_transmission[1]
sample_flux = self.last_measured_transmission[2]
empty_cell_transmission = self.last_measured_transmission[3]
sample_transmission = normalized_sample_transmission*empty_cell_transmission
if self.data is not None:
self.data['normalized_sample_transmission'] = normalized_sample_transmission
self.data['open_flux'] = open_flux
self.data['sample_flux'] = sample_flux
self.data['empty_cell_transmission'] = empty_cell_transmission
self.data['sample_transmission'] = sample_transmission
if save_nexus:
self._appendReducedToNexus(reduced,name,name)
out = {}
out['normalized_sample_transmission'] = normalized_sample_transmission
out['open_flux'] = open_flux
out['sample_flux'] = sample_flux
out['empty_cell_transmission'] = empty_cell_transmission
out['sample_transmission'] = sample_transmission
with open(f'{name}_trans.json','w') as f:
json.dump(out,f)
self.status_txt = 'Instrument Idle'
[docs]
def blockForIdle(self):
self.status_monitor_client.success()
[docs]
def blockForCompleted(self,timeout=1800):
flag = True
start = datetime.datetime.now()
delta = datetime.timedelta(seconds=timeout)
timedout = start + delta
while flag and (datetime.datetime.now()<timedout):
resp = self.client.conn.read_until(b'\r\n',timeout=300).decode()
print(resp)
flag = ('ML completed' not in resp)
time.sleep(0.1)
[docs]
def getStatus(self):
status = self.status_monitor_client.ask_param('el737 RS')
return status
[docs]
def isCounting(self):
return bool(int(self.getStatus()))
[docs]
def status(self):
status = []
status.append(f'Last Measured Transmission: scaled={self.last_measured_transmission[0]} using empty cell trans of {self.last_measured_transmission[3]} with {self.last_measured_transmission[1]} raw counts in open/ {self.last_measured_transmission[2]} sample')
status.append(f'Status: {self.status_txt}')
#lmj = self._getLabviewValue("LMJ Status")
#status.append(f'LMJ status: {"running, power on target = "+str(lmj[0]*lmj[1])+"W" if lmj[6]==1 else "not running"}')
#status.append(f'Vacuum (mbar): {self._getLabviewValue("Pressure (mbar)")}')
status.append(f'<a href="getData" target="_blank">Live Data (2D)</a>')
status.append(f'<a href="getReducedData" target="_blank">Live Data (1D)</a>')
status.append(f'<a href="getReducedData?render_hint=2d_img&reduce_type=2d">Live Data (2D, reduced)</a>')
return status
[docs]
class SICSTelnetClient():
[docs]
def __init__(self,host,port,login):
self.host = host
self.port = port
self.conn = telnetlib.Telnet(host,port)
self.conn.open(host,port)
self.conn.write(f'sicslogin {login}\r\n'.encode('utf-8'))
time.sleep(1)
resp = self.conn.read_very_eager().decode()
if resp[0:2] != 'OK':
raise Exception(f'received unexpected answer from SICS: {resp}')
[docs]
def ask_param(self,param,tries = 0):
cmd = f'{param}\r\n'.encode('utf-8')
self.conn.write(cmd)
print(f'sending {cmd} to server')
time.sleep(1)
response = self.conn.read_very_eager().decode()
if 'ERROR: Busy' in response:
time.sleep(5)
print('rcvd busy response; retrying')
return self.ask_param(param)
try:
regexsplit = re.findall(r'(.*?) = (.*?)\r\n',response)
retval = regexsplit[0][1]
except IndexError:
self.conn.read_lazy()
if tries>4:
raise IndexError(f'weird answer from server: {response}, cannot parse after 4 tries.')
time.sleep(2) #cool down incase you were just too fast
retval = self.ask_param(param,tries = tries + 1)
return retval
[docs]
def set_param(self,param,val):
cmd = f'{param} {val}\r\n'.encode('utf-8')
print(f'sending {cmd} to server')
self.conn.write(cmd)
response = self.conn.read_very_eager()
return response.decode().replace(r'\r\n','')
[docs]
def send_cmd(self,cmd):
cmd = f'{cmd}\r\n'.encode('utf-8')
print(f'sending {cmd} to server')
self.conn.write(cmd)
response = self.conn.read_very_eager().decode()
print(f'head back: {response}')
return response
[docs]
def success(self):
cmd = f'success\r\n'.encode('utf-8')
print(f'sending {cmd} to server')
self.conn.write(cmd)
response = self.conn.read_until(b'\r\n').decode()
return response
def __del__(self):
self.conn.close()
[docs]
def flush_buffer(self):
flag = True
while flag:
resp = self.conn.read_lazy()
print(resp)
flag = (resp != b'')
time.sleep(0.2)
'''
if __name__ == '__main__':
import matplotlib.pyplot as plt
sans = SINQSANS()
'''
if __name__ == '__main__':
from AFL.automation.shared.launcher import *