Source code for AFL.automation.loading.LabJackGPIO

import lazy_loader as lazy
import threading
import time

[docs] class LabJackGPIO:
[docs] def __init__(self, channels, pull_dir="UP", devicetype="ANY", connection="ANY", deviceident="ANY", scan_rate=5, shared_device=None, intermittent_device_handle=False): """ Initializes LabJack digital input monitoring. Params: channels (dict): Mapping of pin id to name, e.g. {0: 'arm_up', 1: 'arm_down'} pull_dir (str or dict, default 'UP'): 'UP' or 'DOWN' pull direction for all pins, or a dict per pin. devicetype (str, default "ANY"): LabJack device type ("T4", "T7", etc.). connection (str, default "ANY"): "USB", "TCP", "ETHERNET", or "WIFI". deviceident (str, default "ANY"): Serial number, IP, device name, or "ANY". scan_rate (int, default 100): Frequency in Hz to poll pin states. shared_device (LabJack class, optional): Shared device handle to use. intermittent_device_handle (bool, default False): If True, closes the device after each operation. """ self.channels = {int(key): val for key, val in channels.items()} self.ids = {val: key for key, val in self.channels.items()} self.state = {name: None for name in self.channels.values()} # Store pin states self.running = True self.ljm = lazy.load("labjack.ljm", require="AFL-automation[labjack]") # Open or share LabJack device if shared_device is not None: self.shared_device = True self.device_handle = shared_device.device_handle else: self.shared_device = False self.device_handle = self.ljm.openS(devicetype, connection, deviceident) self.devicetype = devicetype self.connection = connection self.deviceident = deviceident self.intermittent_device_handle = intermittent_device_handle # Configure digital input pins with pull-up/down settings for pin in self.channels.keys(): addr = f"FIO{pin}" if pin < 8 else f"EIO{pin - 8}" # Adjust for FIO/EIO mapping pull_setting = 1 if (pull_dir == "UP" or (isinstance(pull_dir, dict) and pull_dir.get(pin, "UP") == "UP")) else 0 self.ljm.eWriteName(self.device_handle, addr, pull_setting) # Start the streaming thread self.thread = threading.Thread(target=self._stream_loop, args=(scan_rate,), daemon=True) self.thread.start()
def _stream_loop(self, scan_rate): """ Continuously reads digital inputs and updates the state dictionary. Runs in a background thread to avoid blocking the main thread. """ addresses = [self.ljm.nameToAddress(f"FIO{pin}" if pin < 8 else f"EIO{pin - 8}")[0] for pin in self.channels.keys()] self.ljm.eStreamStart(self.device_handle, 1, len(addresses), addresses, scan_rate) try: while self.running: results = self.ljm.eStreamRead(self.device_handle)[0] # Read latest pin values # print(results) for i, pin in enumerate(self.channels.keys()): self.state[self.channels[pin]] = int(results[i]) # Update state dictionary time.sleep(0.01) # Yield GIL, allow other threads to run except self.ljm.LJMError as e: print(f"Stream error: {e}") # finally: # self.ljm.eStreamStop(self.device_handle)
[docs] def read(self, name): """ Returns the last known state of a pin by its logical name. """ return self.state.get(name, None)
[docs] def stop(self): """ Stops the monitoring loop and closes the device if needed. """ self.running = False self.thread.join()
# if not self.intermittent_device_handle and not self.shared_device: # self.ljm.close(self.device_handle)