Source code for AFL.automation.APIServer.data.DataTiled
from .DataPacket import DataPacket
import datetime
import json
import os
import tiled.client
from tiled.client.xarray import write_xarray_dataset
import numpy as np
import xarray as xr
import copy
[docs]
class DataTiled(DataPacket):
'''
A DataPacket implementation that serializes its data to Tiled
with backup to JSON, named according to the current time.
'''
[docs]
def __init__(self,server,api_key,backup_path):
self.backup_path = backup_path
self.tiled_client = tiled.client.from_uri(server,api_key=api_key)
super().__init__()
self.arrays = {}
[docs]
def finalize(self):
self.transmit()
self.reset()
[docs]
def add_array(self,array_name,array):
self.arrays[array_name] = array
[docs]
def subtransmit_array(self,array_name,array):
'''
Transmits a numpy array along with all data in container and then clears the array from the container. All other data is preserved (no reset).
'''
self._transient_dict['main_array'] = array
self._transient_dict['array_name'] = array_name
self._transmit()
for name in ['main_array','array_name']:
if name in self._transient_dict:
del (self._transient_dict[name])
[docs]
def transmit(self):
if 'main_dataset' in self._dict().keys():
# main_dataset is handled directly in _transmit, no need to move to arrays dict
self._transmit()
return
if 'main_array' in self._dict().keys():
self.arrays['main_array'] = self._transient_dict['main_array']
del(self._transient_dict['main_array'])
if self.arrays:
for array_name,array in self.arrays.items():
# print(f'Attempting to add {array_name} of dtype {array.dtype} and size {array.size}')
self.subtransmit_array(array_name,array)
self.arrays = {}
else:
self._transmit()
def _transmit(self):
'''
Transmits the data inside this container to Tiled.
If a tiled connection fails or is not possible, serializes to JSON,
named according to the current time.
'''
try:
if 'main_dataset' in self._dict().keys():
main_data = copy.deepcopy(self._dict()['main_dataset'])
del(self._transient_dict['main_dataset'])
# Sanitize internal dicts first
self._sanitize()
# Get sanitized metadata from DataPacket
metadata = self._dict()
# Merge DataPacket metadata into dataset.attrs so it becomes searchable
# write_xarray_dataset stores dataset.attrs in metadata['attrs']
if not hasattr(main_data, 'attrs'):
main_data.attrs = {}
# Update dataset attrs with DataPacket metadata
main_data.attrs.update(metadata)
# Write using native Tiled xarray support
write_xarray_dataset(self.tiled_client, main_data)
elif 'main_array' in self._dict().keys():
main_data = copy.deepcopy(self._dict()['main_array'])
array_name = self._transient_dict.get('array_name', 'main_array')
del(self._transient_dict['main_array'])
if 'array_name' in self._transient_dict:
del(self._transient_dict['array_name'])
# Convert numpy array to xarray Dataset
# Create dimension names based on array shape
dims = [f'dim_{i}' for i in range(main_data.ndim)]
dataset = xr.Dataset({array_name: (dims, main_data)})
# Sanitize internal dicts first
self._sanitize()
# Get sanitized metadata from DataPacket
metadata = self._dict()
# Merge DataPacket metadata into dataset.attrs so it becomes searchable
# write_xarray_dataset stores dataset.attrs in metadata['attrs']
if not hasattr(dataset, 'attrs'):
dataset.attrs = {}
# Update dataset attrs with DataPacket metadata
dataset.attrs.update(metadata)
# Write using native Tiled xarray support
write_xarray_dataset(self.tiled_client, dataset)
elif 'main_dataframe' in self._dict().keys():
main_data = copy.deepcopy(self._dict()['main_dataframe'])
del(self._transient_dict['main_dataframe'])
fxn = self.tiled_client.write_dataframe
self._sanitize()
fxn(main_data, metadata=self._dict())
else:
main_data = [np.nan]
fxn = self.tiled_client.write_array
self._sanitize()
fxn(main_data, metadata=self._dict())
except Exception as e:
print(f'Exception while transmitting to Tiled! {e}. Saving data in backup store.')
if 'main_data' not in locals():
if 'main_dataset' in self._dict().keys():
# For datasets, try to serialize key info
try:
dataset = self._dict()['main_dataset']
self['main_data'] = {'type': 'xarray.Dataset', 'variables': list(dataset.data_vars.keys())}
except:
self['main_data'] = 'xarray.Dataset'
elif 'main_array' in self._dict().keys():
main_data = self._dict()['main_array']
if isinstance(main_data, np.ndarray):
self['main_data'] = main_data.tolist()
else:
self['main_data'] = str(main_data)
self._sanitize()
filename = str(datetime.datetime.now()).replace(' ','-')
with open(f'{self.backup_path}/{filename}.json','w') as f:
json.dump(self._dict(),f)