Source code for AFL.automation.loading.PiPlatesRelay

import lazy_loader as lazy
from AFL.automation.loading.MultiChannelRelay import MultiChannelRelay
import atexit
import warnings
import time
import threading

[docs] class PiPlatesRelay(MultiChannelRelay):
[docs] def __init__(self,relaylabels,board_id=0): ''' Init connection to a labeled Pi-Plates Relay module. Params: relaylabels (dict): mapping of port id to load name, e.g. {0:'arm_up',1:'arm_down'} board_id (int, default 0): board ID to connect to, set via jumpers on the board. ''' self.threadlock = threading.Lock() self.RELAYplate = lazy.load("piplates.RELAYplate", require="AFL-automation[piplates]") with self.threadlock: conn = self.RELAYplate.getID(board_id) print(f'Got connection response from board: {conn}') with self.threadlock: self.RELAYplate.RESET(board_id) self.state = [False]*7 self.board_id = board_id #Sanitize labels: for port_id in range(1,7): if port_id not in [int(x) for x in relaylabels.keys()]: relaylabels[port_id] = f'UNUSED{port_id}' self.labels = {int(key):val for key,val in relaylabels.items()} self.ids = {val:key for key,val in self.labels.items()} atexit.register(self.setAllChannelsOff)
[docs] def setAllChannelsOff(self): with self.threadlock: self.RELAYplate.relayALL(self.board_id,0) self.state = [False]*7
[docs] def setChannels(self,channels,verify=True): ''' Write a value (True, False) to the channels specified in channels Parameters: channels (dict): dict of channels, keys as either str (name) or int (id) to write to, vals as the value to write ''' #print(f'RUNNING SET CHANNELS WITH INPUT {channels}') channels_to_set = {} for key,val in channels.items(): if type(key)==str: channels_to_set[self.ids[key]]=val #del channels[key] else: channels_to_set[key] = val print(f'Relay state change, CHANNELS TO SET = {channels_to_set} and CHANNELS = {channels}') for key,val in channels_to_set.items(): self.state[key-1]=val self._refresh_board_state() ''' if val==True: self.RELAYplate.relayON(self.board_id,key) print(f'SET RELAY # {key} ON') elif val==False: self.RELAYplate.relayOFF(self.board_id,key) print(f'SET RELAY # {key} OFF') else: raise KeyError('Improper value for relay set.') verify version if verify: for entry,state in channels.items(): trycounter = 0 while self.getChannels()[entry] != state and trycounter < 6: self.setChannels({entry:state},verify=False) trycounter = trycounter + 1 if trycounter >= 6: raise Exception(f'Relay ERROR while attempting to set the state of {entry} to {state}') relayALL(addr,value) – used to control the state of all relays with a single command. “value” is a 7 bit number with each bit corresponding to a relay. Bit 0 is relay 1, bit 1 is relay 2, and so on. To turn all the relays on at once, use the number 127 for the value. relaySTATE(addr) – Returns a 7-bit number with the current state of each relay. Bit 0 is relay 1, bit 1 is relay 2, and so on. A “1” in a bit position means that the relay is on and zero means that it’s off. '''
def _refresh_board_state(self): val_to_send = 0 for pos,val in enumerate(self.state): if val: val_to_send = val_to_send | 2**(pos) with self.threadlock: self.RELAYplate.relayALL(self.board_id,val_to_send) with self.threadlock: readback = self.RELAYplate.relaySTATE(self.board_id) if readback != val_to_send: retries = 0 warnings.warn(f'ERROR: attempted relay set to {val_to_send} but readback was {readback}.') while retries<60: with self.threadlock: self.RELAYplate.relayALL(self.board_id,val_to_send) time.sleep(0.01) with self.threadlock: readback = self.RELAYplate.relaySTATE(self.board_id) if readback == val_to_send: print(f'Success after {retries} tries.') break else: retries = retries + 1 if readback != val_to_send: raise Exception(f'Relay failed on cmd {self.state}, after {retries} attempts to correct')
[docs] def getChannels(self,asid=False): ''' Read the current state of all channels Parameters: asid (bool,default false): Dict keys should simply be the id, not the name. Returns: (dict) key:value mappings of state. ''' with self.threadlock: allchannels = self.RELAYplate.relaySTATE(self.board_id) retval = {} for portid,name in self.labels.items(): if asid: retval[portid] = bool(allchannels & 2**(portid-1)) else: retval[name] = bool(allchannels & 2**(portid-1)) return retval
[docs] def toggleChannels(self,channels): ids = [] for port in channels: if type(port)==str: ids.append(self.ids[port]) else: ids.append(port) for port in ids: with self.threadlock: self.RELAYplate.relayTOGGLE(self.board_id,port)