Source code for AFL.automation.APIServer.Client

import requests,uuid,time,copy,inspect
from AFL.automation.shared import serialization
try:
    from AFL.automation.shared.ServerDiscovery import ServerDiscovery
except ModuleNotFoundError:
    pass
[docs] class Client: ''' Communicate with APIServer This class provides an interface to generate HTTP REST requests that are sent to an APIServer, monitor the status of those requests, and retrieve the results of those requests. It is intended to be used as a client to the APIServer class. '''
[docs] def __init__(self,ip=None,port='5000',username=None,interactive=False): if ip is None: raise ValueError('ip (server address) must be specified') #trim trailing slash if present if ip[-1] == '/': ip = ip[:-1] self.ip = ip self.port = port self.url = f'http://{ip}:{port}' self.interactive=interactive self.cached_queue = None self.queue_iteration = None self.supports_queue_iteration = False self.headers = {} try: import AFL.automation.shared.widgetui import IPython except ImportError: pass else: #Client.ui = AFL.automation.shared.widgetui.client_construct_ui setattr(Client,'ui',AFL.automation.shared.widgetui.client_construct_ui) if username is not None: self.login(username)
[docs] @classmethod def from_server_name(cls,server_name,**kwargs): sd = ServerDiscovery() address = ServerDiscovery.sa_discover_server_by_name(server_name)[0] (address,port) = address.split(':') return cls(ip=address,port=port,**kwargs)
[docs] def logged_in(self): url = self.url + '/login_test' response = requests.get(url,headers=self.headers) if response.status_code == 200: return True else: print(response.content) return False
[docs] def login(self,username,populate_commands=True): url = self.url + '/login' response = requests.post(url,json={'username':username,'password':'domo_arigato'}) if not (response.status_code == 200): raise RuntimeError(f'Client login failed with status code {response.status_code}:\n{response.content}') # headers should be included in all HTTP requests self.token = response.json()['token'] self.headers = {'Authorization':'Bearer {}'.format(self.token)} if populate_commands: self.get_queued_commands() self.get_unqueued_commands() try: response = requests.post(self.url + '/get_queue_iteration',headers=self.headers) self.supports_queue_iteration = True except Exception as e: self.supports_queue_iteration = False
[docs] def driver_status(self): response = requests.get(self.url+'/driver_status',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to driver_status command failed with status_code {response.status_code}\n{response.text}') return response.json()
[docs] def get_queue(self): if self.supports_queue_iteration: server_queue_iteration = requests.get(self.url+'/get_queue_iteration',headers=self.headers).json() if server_queue_iteration != self.queue_iteration: # the queue in our store is not so fresh, need to update it self.cached_queue = requests.get(self.url + '/get_queue?with_iteration=1',headers=self.headers).json() self.queue_iteration = self.cached_queue.pop(0) return self.cached_queue else: response = requests.get(self.url+'/get_queue',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to set_queue_mode command failed with status_code {response.status_code}\n{response.text}') return response.json()
[docs] def wait(self,target_uuid=None,interval=0.1,for_history=True,first_check_delay=5.0): time.sleep(first_check_delay) while True: try: response = requests.get(self.url+'/get_queue',headers=self.headers,timeout=15) except (TimeoutError,requests.exceptions.ConnectionError) as e: continue history,running,queued = response.json() if target_uuid is not None: if for_history: if any([str(task['uuid'])==str(target_uuid) for task in history]): break else: if not any([str(task['uuid'])==str(target_uuid) for task in running+queued]): break else: if len(running+queued)==0: break time.sleep(interval) #check the return info of the command we waited on return history[-1]['meta']
[docs] def get_quickbar(self): response = requests.get(self.url+'/get_quickbar',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}') return response.json()
[docs] def server_cmd(self,cmd,**kwargs): json=kwargs response = requests.get(self.url+'/'+cmd,headers=self.headers,json=json) if response.status_code != 200: raise RuntimeError(f'API call to server command failed with status_code {response.status_code}\n{response.text}') return response.json()
[docs] def enqueued_base(self,**kwargs): return self.enqueue(**kwargs)
[docs] def unqueued_base(self,**kwargs): response = requests.get(self.url+'/'+kwargs['endpoint'],headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to set_queue_mode command failed with status_code {response.status_code}\n{response.text}') return response.json()
[docs] def get_unqueued_commands(self,inherit_commands=True): response = requests.get(self.url+'/get_unqueued_commands',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}') if inherit_commands: #XXX Need to find a cleaner way to do this. It works reasonbly #XXX well, but it doesn't support tab completions for function_name,info in response.json().items(): parameters = [] for parameter_name,default in info['kwargs']: p = inspect.Parameter(parameter_name,inspect.Parameter.KEYWORD_ONLY,default=default) parameters.append(p) function = lambda **kwargs: self.unqueued_base(endpoint=function_name,**kwargs) function.__name__ = function_name function.__doc__ = info['doc'] function.__signature__ = inspect.signature(self.enqueued_base).replace(parameters=parameters) setattr(self,function_name,function) return response.json()
[docs] def get_queued_commands(self,inherit_commands=True): response = requests.get(self.url+'/get_queued_commands',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}') if inherit_commands: #XXX Need to find a cleaner way to do this. It works reasonbly #XXX well, but it doesn't support tab completions for function_name,info in response.json().items(): parameters = [] for parameter_name,default in info['kwargs']: p = inspect.Parameter(parameter_name,inspect.Parameter.KEYWORD_ONLY,default=default) parameters.append(p) function = lambda **kwargs: self.enqueued_base(task_name=function_name,**kwargs) function.__name__ = function_name function.__doc__ = info['doc'] function.__signature__ = inspect.signature(self.enqueued_base).replace(parameters=parameters) setattr(self,function_name,function) return response.json()
[docs] def enqueue(self,interactive=None,**kwargs): if interactive is None: interactive = self.interactive if 'params' in kwargs: additional_kwargs = kwargs['params']() del kwargs['params'] kwargs.update(additional_kwargs) json=kwargs response = requests.post(self.url+'/enqueue',headers=self.headers,json=json) if response.status_code != 200: raise RuntimeError(f'API call to enqueue command failed with status_code {response.status_code}\n{response.text}') task_uuid = str(response.text) if interactive: meta = self.wait(target_uuid=task_uuid,first_check_delay=0.5) if meta['exit_state']=='Error!': print(meta['return_val']) return meta else: return task_uuid
[docs] def set_config(self,interactive=None,**kwargs): return self.enqueue(interactive=interactive,task_name='set_config',**kwargs)
[docs] def get_config(self,name,print_console=True,interactive=None): if name == 'all': return self.enqueue(interactive=interactive,task_name='get_configs',print_console=print_console) else: return self.enqueue(interactive=interactive,task_name='get_config',name=name,print_console=print_console)
[docs] def get_server_time(self): response = requests.get(self.url+'/get_server_time',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to enqueue command failed with status_code {response.status_code}\n{response.text}') return response.text
[docs] def query_driver(self,**kwargs): json=kwargs response = requests.get(self.url+'/query_driver',headers=self.headers,json=json) if response.status_code != 200: raise RuntimeError(f'API call to query_driver command failed with status_code {response.status_code}\n{response.text}') return response.text
[docs] def reset_queue_daemon(self): response = requests.post(self.url+'/reset_queue_daemon',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to reset_queue_daemon command failed with status_code {response.status_code}\n{response.text}')
[docs] def pause(self,state): json={'state':state} response = requests.post(self.url+'/pause',headers=self.headers,json=json) if response.status_code != 200: raise RuntimeError(f'API call to pause command failed with status_code {response.status_code}\n{response.text}')
[docs] def clear_queue(self): response = requests.post(self.url+'/clear_queue',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to clear_queue command failed with status_code {response.status_code}\n{response.text}')
[docs] def clear_history(self): response = requests.post(self.url+'/clear_history',headers=self.headers) if response.status_code != 200: raise RuntimeError(f'API call to clear_history command failed with status_code {response.status_code}\n{response.text}')
[docs] def debug(self,state): json={'state':state} response = requests.post(self.url+'/debug',headers=self.headers,json=json) if response.status_code != 200: raise RuntimeError(f'API call to debug command failed with status_code {response.status_code}\n{response.text}')
[docs] def halt(self): response = requests.post(self.url+'/halt',headers=self.headers,json={}) if response.status_code != 200: raise RuntimeError(f'API call to halt command failed with status_code {response.status_code}\n{response.content}')
[docs] def queue_state(self): response = requests.get(self.url+'/queue_state',headers=self.headers,json={}) if response.status_code != 200: raise RuntimeError(f'API call to queue_state command failed with status_code {response.status_code}\n{response.content}') return response
[docs] def remove_item(self,uuid): response = requests.post(self.url+'/remove_item',headers=self.headers,json={'uuid':uuid}) if response.status_code != 200: raise RuntimeError(f'API call to remove_item command failed with status_code {response.status_code}\n{response.content}') return response
[docs] def move_item(self,uuid,pos): response = requests.post(self.url+'/move_item',headers=self.headers,json={'uuid':uuid,'pos':pos}) if response.status_code != 200: raise RuntimeError(f'API call to move_item command failed with status_code {response.status_code}\n{response.content}') return response
[docs] def set_driver_object(self,**kw): json = {} for name,value in kw.items(): value = serialization.serialize(value) json[name] = value response = requests.post(self.url+'/set_driver_object',headers=self.headers,json=json) return response
[docs] def get_driver_object(self,name): json = {'name':name} response = requests.get(self.url+'/get_driver_object',headers=self.headers,json=json) return serialization.deserialize(response.json()['obj'])
[docs] def deposit_obj(self, obj, uid=None): ''' Deposit an object in the dropbox obj : object, the object to deposit id : str, the uuid to deposit the object under if not specified, a new uuid will be generated ''' json = {} if uid is None: uid = 'DB-' + str(uuid.uuid4()) json['uuid'] = uid json['obj'] = serialization.serialize(obj) # print(json) response = requests.post(self.url + '/deposit_obj', headers=self.headers, json=json) return response.content.decode('UTF-8')
[docs] def retrieve_obj(self, uid,delete=True): ''' Retrieve an object from the dropbox id : str, the uuid of the object to retrieve delete : bool, if True, delete the object after retrieving ''' json = {'uuid':uid,'delete':delete} response = requests.get(self.url + '/retrieve_obj', headers=self.headers, json=json) if response.status_code == 404: raise KeyError('invalid uuid') elif response.status_code != 200: raise Exception(f'server-side error: {response.status_code}') else: return serialization.deserialize(response.json()['obj'])
[docs] def set_object(self,serialize=True,**kw): json = {} json['task_name'] = 'set_object' if serialize: json['serialized'] = True for name,value in kw.items(): if serialize: value = serialization.serialize(value) json[name] = value self.enqueue(**json)
[docs] def get_object(self,name,serialize=True): json = {} json['task_name'] = 'get_object' json['name'] = name json['interactive'] = True json['serialize'] = serialize retval = self.enqueue(**json) if serialize: obj = serialization.deserialize(retval['return_val']) else: obj = retval['return_val'] return obj
def __str__(self): if self.logged_in(): return f'APIServer Client(ip={self.ip},port={self.port}), connected' else: return f'APIServer Client(ip={self.ip},port={self.port}), disconnected'