Source code for AFL.automation.APIServer.data.DataPacket

import copy
from collections.abc import MutableMapping

import numpy as np
import pandas as pd

# Maximum number of elements in a list/array before it gets condensed to a summary string in metadata
METADATA_ARRAY_LENGTH_CUTOFF = 25


[docs] class DataPacket(MutableMapping): ''' A DataPacket is a container for data that is to be transmitted to a data store. It is a dictionary-like object that stores data in three different ways: - transient data, which is cleared on resets - system data, which is never cleared - sample data, which is cleared only on specific resets of the sample The data is transmitted to the data store on finalization, which is called at the end of each method. ''' PROTECTED_SYSTEM_KEYS = [ 'driver_name', 'driver_config', 'platform_serial', ] PROTECTED_SAMPLE_KEYS = [ 'sample_name', 'sample_uuid', 'sample_composition', 'AL_components', 'AL_campaign_name', 'AL_uuid', ]
[docs] def __init__(self): self._transient_dict = {} self._system_dict = {} self._sample_dict = {}
def __getitem__(self, key): if key in self._system_dict.keys(): return self._system_dict[key] elif key in self._sample_dict.keys(): return self._sample_dict[key] else: return self._transient_dict[key] def __setitem__(self, key, value): if key in self.PROTECTED_SYSTEM_KEYS: self._system_dict[key] = value elif key in self.PROTECTED_SAMPLE_KEYS: self._sample_dict[key] = value else: self._transient_dict[key] = value def __delitem__(self, key): if key in self.PROTECTED_SYSTEM_KEYS: self._system_dict.__delitem__(key) elif key in self.PROTECTED_SAMPLE_KEYS: self._sample_dict.__delitem__(key) else: self._transient_dict.__delitem__(key) def __len__(self): return len(self._system_dict) + len(self._sample_dict) + len(self._transient_dict) def __iter__(self): yield from self._system_dict yield from self._sample_dict yield from self._transient_dict def _dict(self): ''' returns a single dictionary that contains all values stored in data. N.B.: this dict is a deepcopy of the internal structures, so it cannot be written to - or at least, if it is, those writes will be lost. ''' retval = copy.deepcopy(self._transient_dict) retval.update(self._sample_dict) retval.update(self._system_dict) return retval def _core_sanitize(self, to_sanitize): ''' Inner worker function to make sure that all values in a dictionary are JSON-serializable. ''' output_dict = copy.deepcopy(to_sanitize) #print("DataPacket._core_sanitize start:") for key in to_sanitize.keys(): key = str(key) #print("key",key,'type(key)',type(key)) if isinstance(to_sanitize[key], (list, tuple)): # print(f'Sanitized list/tuple {key}') if len(to_sanitize[key]) > METADATA_ARRAY_LENGTH_CUTOFF: output_dict[key] = f"<{type(to_sanitize[key]).__name__} of {len(to_sanitize[key])} elements>" else: output_dict[key] = list(to_sanitize[key]) elif isinstance(to_sanitize[key], (int, float, str, bool)): # print(f'No need to sanitize primitive {key}') pass elif isinstance(to_sanitize[key], np.ndarray): # print(f'Sanitized ndarray {key}') if to_sanitize[key].size > METADATA_ARRAY_LENGTH_CUTOFF: output_dict[key] = f"<ndarray of shape {to_sanitize[key].shape}, dtype {to_sanitize[key].dtype}>" else: output_dict[key] = to_sanitize[key].tolist() elif isinstance(to_sanitize[key], pd.DataFrame): # print(f'Sanitized dataframe {key}') output_dict[key] = to_sanitize[key].to_json() elif isinstance(to_sanitize[key], dict): # print(f'Sanitized dict {key}') output_dict[key] = self._core_sanitize(to_sanitize[key]) else: # print(f'Sanitized fallback to string {key}') output_dict[key] = str(to_sanitize[key]) return output_dict def _print_dict_member_types(self, input, prefix=''): for key in input.keys(): print(f'{prefix} {key} is of type {type(input[key])}') if type(input[key]) is dict: self._print_dict_member_types(input[key], prefix=f'{prefix} -->') def _sanitize(self): ''' Sanitize the contents of the packet to be JSON-serializable. ''' self._transient_dict = self._core_sanitize(self._transient_dict) self._sample_dict = self._core_sanitize(self._sample_dict) self._system_dict = self._core_sanitize(self._system_dict)
[docs] def reset(self): ''' Clears all transient data. ''' self._transient_dict = {}
[docs] def keys(self): return self._transient_dict.keys() | self._sample_dict.keys() | self._system_dict.keys()
[docs] def setupDefaults(self): pass
[docs] def finalize(self): self.transmit() self.reset()
[docs] def reset_sample(self): self._sample_dict = {}
[docs] def transmit(self): raise NotImplementedError
[docs] def add_array(self): '''Abstract method adding arrays that need special handling''' raise NotImplementedError