Source code for AFL.automation.instrument.NicosScriptClient

"""
ToDo
- Add support for reading h5 data written by nicos
    - get list of files, load data by file name or data title
"""
import datetime
import time

import numpy as np
import copy
import warnings

try:
    from nicos.clients.base import ConnectionData, NicosClient
    from nicos.utils.loggers import ACTION, INPUT
    from nicos.protocols.daemon import BREAK_AFTER_LINE, BREAK_AFTER_STEP, \
         STATUS_IDLE, STATUS_IDLEEXC
except ImportError:
    warnings.warn('NICOS import failed- NICOS instrument connections will not work.  Install nicos.',stacklevel=2)
   
#NICOS events to exclude from client
EVENTMASK = ('watch', 'datapoint', 'datacurve', 'clientexec')

MAX_MESSAGE_QUEUE_SIZE = 100000

[docs] class NicosScriptClient(NicosClient): """ A client for interacting with a NICOS server from python scripts or the command line. Attributes ---------- livedata : dict Dictionary to store live data from the NICOS server. This will only show data generated **after** the client has connected. status : str Current status of the client, either 'idle' or 'run'. messages : list Queue to store log messages. """ livedata = {} status = 'idle'
[docs] def __init__(self): """ Initialize the NicosScriptClient. """ self.log = logging.getLogger(__name__) NicosClient.__init__(self, self.log) self.messages = []
[docs] def signal(self, name, data=None, exc=None): """ Handle signals from the NICOS server. Parameters ---------- name : str The name of the signal. data : any, optional The data associated with the signal. exc : any, optional Additional data for the signal. """ accept = ['message', 'processing', 'done'] if name in accept: self.log_func(name, data) elif name == 'livedata': converted_data = [] for desc, ardata in zip(data['datadescs'], exc): npdata = np.frombuffer(ardata, dtype=desc['dtype']) npdata = npdata.reshape(desc['shape']) converted_data.append(npdata) self.livedata[data['det'] + '_live'] = converted_data elif name == 'status': status, _ = data if status == STATUS_IDLE or status == STATUS_IDLEEXC: self.status = 'idle' else: self.status = 'run' else: if name != 'cache': pass
[docs] def log(self, name, txt): """ Log a message. Parameters ---------- name : str The name of the log entry. txt : str The log message. """ self.messages.append((name, txt)) self.messages = self.messages[-MAX_MESSAGE_QUEUE_SIZE:]
[docs] def print_messages(self): """ Print and clear the message queue. """ for msg in self.messages: print(f'{msg[0]}: {msg[1]}') self.messages = []
[docs] def clear_messages(self): """ Clear the message queue. """ self.messages = []
[docs] def connect(self, host, port, user, password): """ Connect to a NICOS server. Parameters ---------- host : str The hostname or IP address of the NICOS server. port : int The port number of the NICOS server. user : str The username for authentication. password : str The password for authentication. Raises ------ RuntimeError If the NICOS server protocol version is incompatible. """ con = ConnectionData(host, port, user, password) NicosClient.connect(self, con, EVENTMASK) if self.daemon_info.get('protocol_version') < 22: raise RuntimeError("incompatible nicos server") state = self.ask('getstatus') self.signal('status', state['status']) self.print_messages() if self.isconnected: print('Successfully connected to %s' % host) else: print('Failed to connect to %s' % host)
[docs] def command(self, line, interactive=False): """ Send a command to the NICOS server. Parameters ---------- line : str The command to send. interactive : bool, optional Whether to run the command interactively. Returns ------- any The result of the command. """ com = "%s" % line.strip() if interactive: return self._interactive(com) else: return self.run(com)
def _interactive(self, com): """ Run a command interactively. Parameters ---------- com : str The command to run. Returns ------- any The result of the command. """ start_detected = False ignore = [ACTION, INPUT] reqID = None if self.status == 'idle': self.run(com) else: return 'NICOS is busy, cannot send commands' while True: if self.messages: work_messages = copy.deepcopy(self.messages) self.messages = [] for name, message in work_messages: if name == 'processing': if message['script'] == com: start_detected = True reqID = message['reqid'] continue if name == 'done' and message['reqid'] == reqID: return if message[2] in ignore: continue if message[0] != 'nicos': messagetxt = message[0] + ' ' + message[3] else: messagetxt = message[3] if start_detected and reqID == message[-1]: print(messagetxt.strip())
[docs] def get(self, parameter): """ Get the value of a parameter or array Parameters ---------- parameter : str The name of the parameter. Returns ------- any The value of the parameter. """ if parameter in self.livedata: return self.livedata[parameter] if parameter == 'scandata': xs, ys, _, names = self.eval( '__import__("nicos").commands.analyze._getData()[:4]') return xs, ys, names if parameter.find('.') > 0: devpar = parameter.split('.') return self.getDeviceParam(devpar[0], devpar[1]) else: return self.getDeviceValue(parameter)
[docs] def blockForIdle(self, timeout=1800, initial_delay=5): """ Block execution until the client status is 'idle' or a timeout occurs. Parameters ---------- timeout : int, optional The maximum time to wait in seconds (default is 1800 seconds). initial_delay : int, optional The initial delay before starting to check the status in seconds (default is 5 seconds). Notes ----- This method will block the execution of the script until the client's status becomes 'idle' or the specified timeout is reached. It checks the status every 0.1 seconds after the initial delay. """ time.sleep(initial_delay) start_time = datetime.datetime.now() end_time = start_time + datetime.timedelta(seconds=timeout) while datetime.datetime.now() < end_time: if self.status == 'idle': return time.sleep(0.1) raise TimeoutError(f"Timeout of {timeout} seconds reached while waiting for idle status")
[docs] def estop(self): """ Emergency stop the client. """ self.tell('emergency')
[docs] def stop(self, after_command=True, after_scan_point=False, emergency=False): """ Stop the client based on the specified conditions. Parameters ---------- after_command : bool, optional If True, stop the client after the current command (default is True). after_scan_point : bool, optional If True, stop the client after the current scan point (default is False). emergency : bool, optional If True, perform an emergency stop (default is False). Notes ----- This method allows stopping the client in different ways: - If `emergency` is True, an emergency stop is performed. - If `after_command` is True, the client stops after the current command. - If `after_scan_point` is True, the client stops after the current scan point. """ if emergency: self.estop() elif after_command: self.tell('stop', BREAK_AFTER_LINE) elif after_scan_point: self.tell('stop', BREAK_AFTER_STEP)