import requests,uuid,time,copy,inspect
from AFL.automation.shared import serialization
try:
from AFL.automation.shared.ServerDiscovery import ServerDiscovery
except ModuleNotFoundError:
pass
[docs]
class Client:
'''
Communicate with APIServer
This class provides an interface to generate HTTP REST requests that are sent to
an APIServer, monitor the status of those requests, and retrieve the results of
those requests. It is intended to be used as a client to the APIServer class.
'''
[docs]
def __init__(self,ip=None,port='5000',username=None,interactive=False):
if ip is None:
raise ValueError('ip (server address) must be specified')
#trim trailing slash if present
if ip[-1] == '/':
ip = ip[:-1]
self.ip = ip
self.port = port
self.url = f'http://{ip}:{port}'
self.interactive=interactive
self.cached_queue = None
self.queue_iteration = None
self.supports_queue_iteration = False
self.headers = {}
try:
import AFL.automation.shared.widgetui
import IPython
except ImportError:
pass
else:
#Client.ui = AFL.automation.shared.widgetui.client_construct_ui
setattr(Client,'ui',AFL.automation.shared.widgetui.client_construct_ui)
if username is not None:
self.login(username)
[docs]
@classmethod
def from_server_name(cls,server_name,**kwargs):
sd = ServerDiscovery()
address = ServerDiscovery.sa_discover_server_by_name(server_name)[0]
(address,port) = address.split(':')
return cls(ip=address,port=port,**kwargs)
[docs]
def logged_in(self):
url = self.url + '/login_test'
response = requests.get(url,headers=self.headers)
if response.status_code == 200:
return True
else:
print(response.content)
return False
[docs]
def login(self,username,populate_commands=True):
url = self.url + '/login'
response = requests.post(url,json={'username':username,'password':'domo_arigato'})
if not (response.status_code == 200):
raise RuntimeError(f'Client login failed with status code {response.status_code}:\n{response.content}')
# headers should be included in all HTTP requests
self.token = response.json()['token']
self.headers = {'Authorization':'Bearer {}'.format(self.token)}
if populate_commands:
self.get_queued_commands()
self.get_unqueued_commands()
try:
response = requests.post(self.url + '/get_queue_iteration',headers=self.headers)
self.supports_queue_iteration = True
except Exception as e:
self.supports_queue_iteration = False
[docs]
def driver_status(self):
response = requests.get(self.url+'/driver_status',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to driver_status command failed with status_code {response.status_code}\n{response.text}')
return response.json()
[docs]
def get_queue(self):
if self.supports_queue_iteration:
server_queue_iteration = requests.get(self.url+'/get_queue_iteration',headers=self.headers).json()
if server_queue_iteration != self.queue_iteration:
# the queue in our store is not so fresh, need to update it
self.cached_queue = requests.get(self.url + '/get_queue?with_iteration=1',headers=self.headers).json()
self.queue_iteration = self.cached_queue.pop(0)
return self.cached_queue
else:
response = requests.get(self.url+'/get_queue',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to set_queue_mode command failed with status_code {response.status_code}\n{response.text}')
return response.json()
[docs]
def wait(self,target_uuid=None,interval=0.1,for_history=True,first_check_delay=5.0):
time.sleep(first_check_delay)
while True:
try:
response = requests.get(self.url+'/get_queue',headers=self.headers,timeout=15)
except (TimeoutError,requests.exceptions.ConnectionError) as e:
continue
history,running,queued = response.json()
if target_uuid is not None:
if for_history:
if any([str(task['uuid'])==str(target_uuid) for task in history]):
break
else:
if not any([str(task['uuid'])==str(target_uuid) for task in running+queued]):
break
else:
if len(running+queued)==0:
break
time.sleep(interval)
#check the return info of the command we waited on
return history[-1]['meta']
[docs]
def get_quickbar(self):
response = requests.get(self.url+'/get_quickbar',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}')
return response.json()
[docs]
def server_cmd(self,cmd,**kwargs):
json=kwargs
response = requests.get(self.url+'/'+cmd,headers=self.headers,json=json)
if response.status_code != 200:
raise RuntimeError(f'API call to server command failed with status_code {response.status_code}\n{response.text}')
return response.json()
[docs]
def enqueued_base(self,**kwargs):
return self.enqueue(**kwargs)
[docs]
def unqueued_base(self,**kwargs):
response = requests.get(self.url+'/'+kwargs['endpoint'],headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to set_queue_mode command failed with status_code {response.status_code}\n{response.text}')
return response.json()
[docs]
def get_unqueued_commands(self,inherit_commands=True):
response = requests.get(self.url+'/get_unqueued_commands',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}')
if inherit_commands:
#XXX Need to find a cleaner way to do this. It works reasonbly
#XXX well, but it doesn't support tab completions
for function_name,info in response.json().items():
parameters = []
for parameter_name,default in info['kwargs']:
p = inspect.Parameter(parameter_name,inspect.Parameter.KEYWORD_ONLY,default=default)
parameters.append(p)
function = lambda **kwargs: self.unqueued_base(endpoint=function_name,**kwargs)
function.__name__ = function_name
function.__doc__ = info['doc']
function.__signature__ = inspect.signature(self.enqueued_base).replace(parameters=parameters)
setattr(self,function_name,function)
return response.json()
[docs]
def get_queued_commands(self,inherit_commands=True):
response = requests.get(self.url+'/get_queued_commands',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to get_queued_commands command failed with status_code {response.status_code}\n{response.text}')
if inherit_commands:
#XXX Need to find a cleaner way to do this. It works reasonbly
#XXX well, but it doesn't support tab completions
for function_name,info in response.json().items():
parameters = []
for parameter_name,default in info['kwargs']:
p = inspect.Parameter(parameter_name,inspect.Parameter.KEYWORD_ONLY,default=default)
parameters.append(p)
function = lambda **kwargs: self.enqueued_base(task_name=function_name,**kwargs)
function.__name__ = function_name
function.__doc__ = info['doc']
function.__signature__ = inspect.signature(self.enqueued_base).replace(parameters=parameters)
setattr(self,function_name,function)
return response.json()
[docs]
def enqueue(self,interactive=None,**kwargs):
if interactive is None:
interactive = self.interactive
if 'params' in kwargs:
additional_kwargs = kwargs['params']()
del kwargs['params']
kwargs.update(additional_kwargs)
json=kwargs
response = requests.post(self.url+'/enqueue',headers=self.headers,json=json)
if response.status_code != 200:
raise RuntimeError(f'API call to enqueue command failed with status_code {response.status_code}\n{response.text}')
task_uuid = str(response.text)
if interactive:
meta = self.wait(target_uuid=task_uuid,first_check_delay=0.5)
if meta['exit_state']=='Error!':
print(meta['return_val'])
return meta
else:
return task_uuid
[docs]
def set_config(self,interactive=None,**kwargs):
return self.enqueue(interactive=interactive,task_name='set_config',**kwargs)
[docs]
def get_config(self,name,print_console=True,interactive=None):
if name == 'all':
return self.enqueue(interactive=interactive,task_name='get_configs',print_console=print_console)
else:
return self.enqueue(interactive=interactive,task_name='get_config',name=name,print_console=print_console)
[docs]
def get_server_time(self):
response = requests.get(self.url+'/get_server_time',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to enqueue command failed with status_code {response.status_code}\n{response.text}')
return response.text
[docs]
def query_driver(self,**kwargs):
json=kwargs
response = requests.get(self.url+'/query_driver',headers=self.headers,json=json)
if response.status_code != 200:
raise RuntimeError(f'API call to query_driver command failed with status_code {response.status_code}\n{response.text}')
return response.text
[docs]
def reset_queue_daemon(self):
response = requests.post(self.url+'/reset_queue_daemon',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to reset_queue_daemon command failed with status_code {response.status_code}\n{response.text}')
[docs]
def pause(self,state):
json={'state':state}
response = requests.post(self.url+'/pause',headers=self.headers,json=json)
if response.status_code != 200:
raise RuntimeError(f'API call to pause command failed with status_code {response.status_code}\n{response.text}')
[docs]
def clear_queue(self):
response = requests.post(self.url+'/clear_queue',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to clear_queue command failed with status_code {response.status_code}\n{response.text}')
[docs]
def clear_history(self):
response = requests.post(self.url+'/clear_history',headers=self.headers)
if response.status_code != 200:
raise RuntimeError(f'API call to clear_history command failed with status_code {response.status_code}\n{response.text}')
[docs]
def debug(self,state):
json={'state':state}
response = requests.post(self.url+'/debug',headers=self.headers,json=json)
if response.status_code != 200:
raise RuntimeError(f'API call to debug command failed with status_code {response.status_code}\n{response.text}')
[docs]
def halt(self):
response = requests.post(self.url+'/halt',headers=self.headers,json={})
if response.status_code != 200:
raise RuntimeError(f'API call to halt command failed with status_code {response.status_code}\n{response.content}')
[docs]
def queue_state(self):
response = requests.get(self.url+'/queue_state',headers=self.headers,json={})
if response.status_code != 200:
raise RuntimeError(f'API call to queue_state command failed with status_code {response.status_code}\n{response.content}')
return response
[docs]
def remove_item(self,uuid):
response = requests.post(self.url+'/remove_item',headers=self.headers,json={'uuid':uuid})
if response.status_code != 200:
raise RuntimeError(f'API call to remove_item command failed with status_code {response.status_code}\n{response.content}')
return response
[docs]
def move_item(self,uuid,pos):
response = requests.post(self.url+'/move_item',headers=self.headers,json={'uuid':uuid,'pos':pos})
if response.status_code != 200:
raise RuntimeError(f'API call to move_item command failed with status_code {response.status_code}\n{response.content}')
return response
[docs]
def set_driver_object(self,**kw):
json = {}
for name,value in kw.items():
value = serialization.serialize(value)
json[name] = value
response = requests.post(self.url+'/set_driver_object',headers=self.headers,json=json)
return response
[docs]
def get_driver_object(self,name):
json = {'name':name}
response = requests.get(self.url+'/get_driver_object',headers=self.headers,json=json)
return serialization.deserialize(response.json()['obj'])
[docs]
def deposit_obj(self, obj, uid=None):
'''
Deposit an object in the dropbox
obj : object, the object to deposit
id : str, the uuid to deposit the object under
if not specified, a new uuid will be generated
'''
json = {}
if uid is None:
uid = 'DB-' + str(uuid.uuid4())
json['uuid'] = uid
json['obj'] = serialization.serialize(obj)
# print(json)
response = requests.post(self.url + '/deposit_obj', headers=self.headers, json=json)
return response.content.decode('UTF-8')
[docs]
def retrieve_obj(self, uid,delete=True):
'''
Retrieve an object from the dropbox
id : str, the uuid of the object to retrieve
delete : bool, if True, delete the object after retrieving
'''
json = {'uuid':uid,'delete':delete}
response = requests.get(self.url + '/retrieve_obj', headers=self.headers, json=json)
if response.status_code == 404:
raise KeyError('invalid uuid')
elif response.status_code != 200:
raise Exception(f'server-side error: {response.status_code}')
else:
return serialization.deserialize(response.json()['obj'])
[docs]
def set_object(self,serialize=True,**kw):
json = {}
json['task_name'] = 'set_object'
if serialize:
json['serialized'] = True
for name,value in kw.items():
if serialize:
value = serialization.serialize(value)
json[name] = value
self.enqueue(**json)
[docs]
def get_object(self,name,serialize=True):
json = {}
json['task_name'] = 'get_object'
json['name'] = name
json['interactive'] = True
json['serialize'] = serialize
retval = self.enqueue(**json)
if serialize:
obj = serialization.deserialize(retval['return_val'])
else:
obj = retval['return_val']
return obj
def __str__(self):
if self.logged_in():
return f'APIServer Client(ip={self.ip},port={self.port}), connected'
else:
return f'APIServer Client(ip={self.ip},port={self.port}), disconnected'