Source code for AFL.automation.instrument.PySpecClient

import lazy_loader as lazy

import sys
import time


[docs] class PySpecClient(): """ This class will interact with the spec server on the hutch IDB3 computer and allow SARA pass the appropriate commands """
[docs] def __init__(self, address='id3b.classe.cornell.edu', port='spec'): self.address = address self.port = port self.connected = False self.output = [] self.last_output = None self.channel = {} self.spec = None self.SpecConnection = lazy.load("pyspec.client.SpecConnection", require="AFL-automation[pyspec]") self.SpecCounter = lazy.load("pyspec.client.SpecCounter", require="AFL-automation[pyspec]") self.SpecConnectionsManager = lazy.load("pyspec.client.SpecConnectionsManager", require="AFL-automation[pyspec]") self.SpecWaitObject = lazy.load("pyspec.client.SpecWaitObject", require="AFL-automation[pyspec]") self.SpecVariable = lazy.load("pyspec.client.SpecVariable", require="AFL-automation[pyspec]") self.SpecMotor = lazy.load("pyspec.client.SpecMotor", require="AFL-automation[pyspec]") self.SpecCommand = lazy.load("pyspec.client.SpecCommand", require="AFL-automation[pyspec]")
def _update_output(self, value, channel): self.output.append(value) self.last_output = self.output[-1] print(self.last_output) def _update_channel(self, value, channel): print(f"{channel}: {value}")
[docs] def connect(self): ''' Connect to the external spec server at connection name conn. This should be an object that can be passed to the other functions that talk to a spec server ''' self.conn = f'{self.address}:{self.port}' print('') print(self.conn) # self.spec = SpecConnectionsManager_mod.SpecConnectionsManager().getConnection(self.conn) self.spec = self.SpecConnection(self.conn) #hard coded connection while not self.spec.is_connected(): pass if self.spec.is_connected(): print(f'established connection to {self.conn}') #need to register callbacks our desired channels: #this channel can be read as if spec is busy or not?? self.spec.register('status/ready',self._update_channel) #this channel returns the output from the cmd line and passes it into the output list self.spec.register('output/tty',self._update_output) print('List of registered channels') for ch in list(self.spec.reg_channels): print(ch) print("") return
[docs] def run_cmd(self,cmd,block=True,timeout=1800): cmd = self.SpecCommand(self.spec,cmd,timeout=timeout) if not block: cmd.synchronous = False cmd()
[docs] def cd(self, path): """ A generic mv command. Moves the Spec directory to the specified path """ self.spec.run_cmd(f'cd {path}')
[docs] def mkdir(self, path): """A generic mkdir command. makes the specified directory or set of directories """ self.spec.run_cmd(f'u mkdir {path}')
[docs] def get_variable(self,name): return self.spec.getVariable(name)
[docs] def get_motor(self,name): return self.spec.getMotor(name)
[docs] def get_counter(self,name): return self.spec.getChannel(name)
[docs] def block_for_ready(self,timeout=300): ready = self.spec.getChannel('status/ready') if ready.read()==1: return else: w = self.SpecWaitObject(self.spec) w.waitChannelUpdate('status/ready', waitValue = 1,timeout=timeout*1000)
[docs] def block_for_count(self,timeout=300): w = self.SpecWaitObject(self.spec) w.waitChannelUpdate('scaler/.all./count', waitValue = 0,timeout=timeout*1000)
[docs] def count(self, name,time,wait_on_time=True): """Count up to a certain time or monitor count Arguments: time -- count time """ counter = self.spec.getChannel('scaler/.all./count') channel = self.spec.getChannel(f'scaler/{name}/value') index = self.spec.getChannel(f'var/{name}').read() if index == 1: #MONITOR time = -time counter.write(time) self.block_for_count() return channel.read()
if __name__=="__main__": #from AFL.automation.instrument.PySpecClient import PySpecClient client = PySpecClient("localhost","spec")