Source code for AFL.automation.loading.Sensor
import threading
import numpy as np
[docs]
class Sensor():
[docs]
def __init__(self,address=1,channel=0):
raise NotImplementedError
[docs]
def calibrate(self):
raise NotImplementedError
[docs]
def read(self):
raise NotImplementedError
[docs]
class DummySensor1(threading.Thread):
[docs]
def __init__(self,period=2,minval=-5,maxval=5):
threading.Thread.__init__(self, name='DummySensor1', daemon=True)
self.period = period
self.minval = minval
self.maxval = maxval
self._stop = False
self._lock = threading.Lock()
self._value = 0
[docs]
def read(self):
with self._lock:
return self._value
[docs]
def terminate(self):
self._stop = True
[docs]
def run(self):
print('Starting runloop for DummySensor1')
while not self._stop:
with self._lock:
self._value = (self.maxval-self.minval)*np.random.random()+self.minval
time.sleep(self.period)
[docs]
class DummySensor2(threading.Thread):
[docs]
def __init__(self,period=0.1,hi_value=5,lo_time=15,hi_time=2):
threading.Thread.__init__(self, name='DummySensor2', daemon=True)
self.period = period
self.lo_value = 0
self.hi_value = hi_value
self.lo_time = datetime.timedelta(seconds=lo_time)
self.hi_time = datetime.timedelta(seconds=hi_time)
self.hi_state = False
self._stop = False
self._lock = threading.Lock()
self._value = 0
[docs]
def driver_status(self):
if self.hi_state:
return ['State: IDLE']
else:
return ['State: LOAD IN PROGRESS']
[docs]
def read(self):
with self._lock:
return self._value
[docs]
def terminate(self):
self._stop = True
[docs]
def run(self):
start = datetime.datetime.now()
print('Starting runloop for DummySensor2')
while not self._stop:
if ((datetime.datetime.now()-start)<self.lo_time):
value = np.random.normal(self.lo_value,0.2)
else:
self.hi_state = True
value = np.random.normal(self.hi_value,0.2)
if ((datetime.datetime.now()-start)>(self.lo_time+self.hi_time)):
start = datetime.datetime.now()
self.hi_state = False
with self._lock:
self._value = value
time.sleep(self.period)