Source code for AFL.automation.loading.SensorPollingThread

import numpy as np
import threading
import time
import datetime

[docs] class SensorPollingThread(threading.Thread):
[docs] def __init__(self,sensor,period=0.1,callback=None,hv_pipe=None,window=None,filename=None,daemon=True,data=None): threading.Thread.__init__(self, name='SignalPollingThread', daemon=daemon) self.app = None self.data = None self.sensor = sensor self.callback = callback self.window = window self.hv_pipe = hv_pipe self.period = period self.filename = filename self._stop = False self._lock = threading.Lock() self._buffer_rolling_start = None self._buffer_rolling = [] self._buffer_load = [] self._buffer_load_timeout = datetime.timedelta(seconds=180) self._buffer_load_start = None
[docs] def read(self): with self._lock: return self._buffer_rolling
[docs] def read_load_buffer(self): with self._lock: return self._buffer_load
[docs] def reset_load_buffer(self): with self._lock: self._buffer_load = [] self._buffer_load_start = datetime.datetime.now()
[docs] def terminate(self): self._stop = True
[docs] def alive(self): return self._stop
[docs] def run(self): i=0 self._buffer_rolling_start = datetime.datetime.now() print(f'Starting runloop for PollingThread:') while not self._stop: value = self.sensor.read() with self._lock: now = datetime.datetime.now() self._buffer_rolling.append([now.timestamp(),value]) if self.window is not None: self._buffer_rolling = self._buffer_rolling[-self.window:] if (self._buffer_load_start is not None): buffer_dt = now-self._buffer_load_start if (buffer_dt<self._buffer_load_timeout): self._buffer_load.append([buffer_dt.total_seconds(),value]) if self.hv_pipe is not None: self.hv_pipe.send(np.array([[i,value]])) if self.filename is not None: datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S-%fus') with open(filename,'a') as f: f.write(f'{datestr},{i},{value}\n') if self.callback is not None: self.callback(self._buffer_rolling) time.sleep(self.period) i+=1