Source code for AFL.automation.APIServer.QueueDaemon
import time
import functools
import threading
import time
import datetime
import os
import sys
import traceback
import subprocess
import json
import pathlib
import numpy as np
import pandas as pd
from AFL.automation.shared.serialization import is_serialized
from AFL.automation.APIServer.data.DataTrashcan import DataTrashcan
[docs]
class QueueDaemon(threading.Thread):
'''
'''
[docs]
def __init__(self, app, driver, task_queue, history, debug=False, data = None):
app.logger.info('Creating QueueDaemon thread')
threading.Thread.__init__(self, name='QueueDaemon', daemon=True)
self.driver = driver
self.app = app
self.task_queue = task_queue
self.history = history #history local to this server restart
self.running_task = []
self.stop = False
self.debug = debug
self.paused = False
self.busy = False # flag denotes if a task is being processed
if data is None:
self.data = DataTrashcan()
else:
self.data = data
self.data['driver_name'] = driver.name
self.data['driver_config'] = driver.config.config
try:
self.data['platform_serial'] = os.environ['AFL_SYSTEM_SERIAL']
except KeyError:
self.data['platform_serial'] = 'not_set'
try:
self.data['afl_automation_version'] = subprocess.check_output(["git", "describe", "--always"]).strip().decode()
except Exception:
self.data['afl_automation_version'] = 'could_not_determine'
[docs]
def terminate(self):
self.app.logger.info('Terminating QueueDaemon thread')
self.stop = True
self.task_queue.put(None)
[docs]
def check_if_paused(self):
# pause queue but notify user of state every minute
count = 600
while self.paused:
time.sleep(0.1)
count+=1
if count>600:
self.app.logger.info((
'Queued is paused. '
'Set paused state to false to continue execution'
))
count = 0
[docs]
def mask_serialized_objs(self,package):
masked_package = {'task':{}}
if 'meta' in package:
masked_package['meta'] = package['meta']
if 'uuid' in package:
masked_package['uuid'] = str(package['uuid'])
for k,v in package['task'].items():
if is_serialized(v):
masked_package['task'][k] = 'serialized_object'
else:
masked_package['task'][k] = v
return masked_package
[docs]
def run(self):
self.app.logger.info('Initializing QueueDaemon run-loop')
while not self.stop:
self.check_if_paused()
self.app.logger.debug('Getting item from queue')
package = self.task_queue.get(block=True, timeout=None)
self.app.logger.debug('Got item from queue')
# If the task object is None, break the queue-loop
if package is None: # stop the queue execution
self.terminate()
break
self.busy = True
task = package['task']
self.app.logger.info(f'Running task {task}')
start_time = datetime.datetime.now()
masked_package = self.mask_serialized_objs(package)
#masked_package['meta']['started'] = start_time.strftime('%H:%M:%S')
masked_package['meta']['started'] = start_time.strftime('%m/%d/%y %H:%M:%S-%f %Z%z')
self.running_task = [masked_package]
self.check_if_paused()
# if debug_mode, pop and wait but don't execute
if self.debug:
time.sleep(3.0)
return_val = None
exit_state = 'Debug Mode!'
else:
try:
self.driver.pre_execute(**task)
self.data['driver_config'].update(self.driver.config.config)
self.data.update(task)
self.data['status_before'] = self.driver.status()
#ops_thread = threading.Thread(target=self.driver.execute,kwargs=task)
return_val = self.driver.execute(**task)
self.driver.post_execute(**task)
exit_state = 'Success!'
except Exception as error:
return_val = f'Error: {error.__repr__()}\n\n' + traceback.format_exc() + '\n\n'
return_val += 'Exception encountered in driver.execute, pausing queue...'
exit_state = 'Error!'
self.app.logger.error(return_val)
self.paused = True
self.data['status_after'] = self.driver.status()
end_time = datetime.datetime.now()
run_time = end_time - start_time
masked_package['meta']['ended'] = end_time.strftime('%m/%d/%y %H:%M:%S-%f %Z%z')
masked_package['meta']['run_time_seconds'] = run_time.seconds
masked_package['meta']['run_time_minutes'] = run_time.seconds/60
masked_package['meta']['exit_state'] = exit_state
if isinstance(return_val,np.ndarray):
masked_package['meta']['return_val'] = return_val.tolist()
elif isinstance(return_val,pd.Series):
masked_package['meta']['return_val'] = return_val.tolist()
else:
masked_package['meta']['return_val'] = return_val
masked_package['uuid'] = str(masked_package['uuid'])
self.running_task = []
self.data.update(masked_package)
# the following block names the return value a special name
# so that DataTiled can store it as the main data element
if type(return_val) is np.ndarray:
self.data['main_array'] = return_val
if type(return_val) is np.ndarray:
self.data['main_array'] = return_val
elif type(return_val) is pd.DataFrame:
self.data['main_dataframe'] = return_val
elif type(return_val) is pd.Series:
self.data['main_dataframe'] = return_val.to_frame()
self.data.finalize()
self.history.append(masked_package)#history for this server restart
self.task_queue.iteration_id = time.time()
# mark queue iteration as changed
self.busy = False
time.sleep(0.1)
self.app.logger.info('QueueDaemon runloop exiting')