labbench

class labbench.CSVLogger(path: Path, *, append: bool = False, text_relational_min: int = 1024, force_relational: list[str] = ['host_log'], nonscalar_file_type: str = 'csv', tar: bool = False)

Bases: ParamAttrLogger

Manage logging of experimental data and methods into CSV files.

Explicit save methods are exposed for arbitrary custom data. Automatic logging is performed for

  1. Access to parameters of labbench.Device objects that are defined with labbench.paramattr.value,

  1. Function calls to methods of labbench.Rack

  2. Metadata

Parameters:
  • path – Base path to use for the root database

  • overwrite – Whether to overwrite the root database if it exists (otherwise, append)

  • text_relational_min – Text with at least this many characters is stored as a relational text file instead of directly in the database

  • force_relational – A list of columns that should always be stored as relational data instead of directly in the database

  • nonscalar_file_type – The data type to use in non-scalar (tabular, vector, etc.) relational data

  • tar – Whether to store the relational data within directories in a tar file, instead of subdirectories

INPUT_FILE_NAME = 'inputs.csv'
OUTPUT_FILE_NAME = 'outputs.csv'
ROOT_FILE_NAME = 'outputs.csv'
nonscalar_file_type = 'csv'
open()

Instead of calling open directly, consider using with statements to guarantee proper disconnection if there is an error. For example, the following sets up a connected instance:

with CSVLogger('my.csv') as db:
    ### do the data acquisition here
    pass

would instantiate a CSVLogger instance, and also guarantee a final attempt to write unwritten data is written, and that the file is closed when exiting the with block, even if there is an exception.

output_index = 0
tables = {}
class labbench.Call(func: callable, *args, **kws)

Bases: object

Wrap a function to apply arguments for threaded calls to concurrently. This can be passed in directly by a user in order to provide arguments; otherwise, it will automatically be wrapped inside concurrently to keep track of some call metadata during execution.

rename(name)
set_queue(queue)

Set the queue object used to communicate between threads

classmethod wrap_list_to_dict(name_func_pairs: dict[str, callable]) dict[str, Call]

adjusts naming and wraps callables with Call

class labbench.Device

Bases: DeviceDataClass

base class for labbench device wrappers.

Drivers that subclass Device share

  • standardized connection management via context blocks (the with statement)

  • hooks for automatic data logging and heads-up displays

  • API style consistency

  • bounds checking and casting for typed attributes

Note

This Device base class has convenience functions for device control, but no implementation.

Some wrappers for particular APIs labbench Device subclasses:

  • VISADevice: pyvisa,

  • ShellBackend: binaries and scripts

  • Serial: pyserial

  • DotNetDevice: pythonnet

(and others). If you are implementing a driver that uses one of these backends, inherit from the corresponding class above, not Device.

close()

Backend implementations must overload this to disconnect an existing connection to the resource encapsulated in the object. This will be called without super().close().

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()

Backend implementations may overload this to open a backend connection to the resource. This will be called without super().open().

class labbench.DotNetDevice

Bases: Device

Base class for .NET library wrappers based on pythonnet.

To implement a DotNetDevice subclass:

import labbench as lb

class MyLibraryWrapper(lb.DotNetDevice, libary=<imported python module colocated with dll>, dll_name='mylibrary.dll')
    ...

When a DotNetDevice is instantiated, it looks to load the dll from the location of the python module and dll_name.

- `backend` is None after open and is available for replacement by the subclass
dll_name: str | None = None
  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

  • Case sensitive

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

library: object | None = None
  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()

dynamically import a .net CLR as a python module at self.dll

class labbench.Email(resource: str = 'smtp.nist.gov', *, port: int = 25, sender: str = 'myemail@nist.gov', recipients: list = ['myemail@nist.gov'], success_message: str | None = 'Test finished normally', failure_message: str | None = 'Exception ended test early')

Bases: Device

Sends a notification message on disconnection. If an exception was thrown, this is a failure subject line with traceback information in the main body. Otherwise, the message is a success message in the subject line. Stderr is also sent.

close()
failure_message: str | None = 'Exception ended test early'

subject line for test failure emails (None to suppress the emails)

  • Logging event stored in metadata log after first access

  • Case sensitive

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()
port: int = 25

TCP/IP port

  • Logging event stored in metadata log after first access

  • Minimum: 1

recipients: list = ['myemail@nist.gov']

list of email addresses of recipients

  • Logging event stored in metadata log after first access

resource: str = 'smtp.nist.gov'

smtp server to use

  • Logging event stored in metadata log after first access

  • Case sensitive

send_summary()

Send the email containing the final property trait of the test.

sender: str = 'myemail@nist.gov'

email address of the sender

  • Logging event stored in metadata log after first access

  • Case sensitive

success_message: str | None = 'Test finished normally'

subject line for test success emails (None to suppress the emails)

  • Logging event stored in metadata log after first access

  • Case sensitive

class labbench.LabviewSocketInterface(resource: str = '127.0.0.1', *, tx_port: int = 61551, rx_port: int = 61552, delay: float = 1, timeout: float = 2, rx_buffer_size: int = 1024)

Bases: Device

Base class demonstrating simple sockets-based control of a LabView VI.

Keyed get/set with attr.property are implemented by simple ‘ command value’. Subclasses can therefore implement support for commands in specific labview VI similar to VISA commands by assigning the commands implemented in the corresponding labview VI.

- backend

connection object mapping {‘rx’: rxsock, ‘tx’: txsock}

clear()

Clear any data present in the read socket buffer.

close()
delay: float = 1

time to wait after each property trait write or query

  • Minimum: 0

  • Logging events are triggered on each access, and are stored as a key or column

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()
read(convert_func=None)

Receive from the rx socket until self.rx_buffer_size samples are received or timeout happens after self.timeout seconds.

Optionally, apply the conversion function to the value after it is received.

resource: str = '127.0.0.1'

LabView VI host address

  • Logging events are triggered on each access, and are stored as a key or column

  • Case sensitive

rx_buffer_size: int = 1024
  • Logging events are triggered on each access, and are stored as a key or column

  • Minimum: 1

rx_port: int = 61552

TX port to send to the LabView VI

  • Logging events are triggered on each access, and are stored as a key or column

timeout: float = 2

maximum wait replies before raising TimeoutError

  • Minimum: 0

  • Logging events are triggered on each access, and are stored as a key or column

tx_port: int = 61551

TX port to send to the LabView VI

  • Logging events are triggered on each access, and are stored as a key or column

write(msg)

Send a string over the tx socket.

class labbench.Rack

Bases: Owner, Ownable

A Rack provides context management and methods for groups of Device instances.

The Rack object provides connection management for all devices and data managers for with block:

with Rack() as testbed:
    # use the testbed here
    pass

For functional validation, it is also possible to open only a subset of devices like this:

testbed = Rack()
with testbed.dev1, testbed.dev2:
    # use the testbed.dev1 and testbed.dev2 here
    pass

The following syntax creates a new Rack class for an experiment:

import labbench as lb

class MyRack(lb.Rack):

db = lb.SQLiteManager() sa = MySpectrumAnalyzer()

spectrogram = Spectrogram(db=db, sa=sa)

class labbench.SQLiteLogger(path: Path, *, append: bool = False, text_relational_min: int = 1024, force_relational: list[str] = ['host_log'], nonscalar_file_type: str = 'csv', tar: bool = False)

Bases: ParamAttrLogger

Store data and property traits to disk into an an sqlite database.

This extends Aggregator to support

  1. queuing aggregate property trait of devices by lists of dictionaries;

  2. custom metadata in each queued aggregate property trait entry; and

  3. custom response to non-scalar data (such as relational databasing).

Parameters:
  • path – Base path to use for the root database

  • overwrite – Whether to overwrite the root database if it exists (otherwise, append)

  • text_relational_min – Text with at least this many characters is stored as a relational text file instead of directly in the database

  • force_relational – A list of columns that should always be stored as relational data instead of directly in the database

  • nonscalar_file_type – The data type to use in non-scalar (tabular, vector, etc.) relational data

  • tar – Whether to store the relational data within directories in a tar file, instead of subdirectories

INDEX_LABEL
OUTPUT_TABLE_NAME
ROOT_FILE_NAME
close()
committed
inprogress
key(name, attr)

The key determines the SQL column name. df.to_sql does not seem to support column names that include spaces

open()

Instead of calling open directly, consider using with statements to guarantee proper disconnection if there is an error. For example, the following sets up a connected instance:

with SQLiteLogger('my.db') as db:
    ### do the data acquisition here
    pass

would instantiate a SQLiteLogger instance, and also guarantee a final attempt to write unwritten data is written, and that the file is closed when exiting the with block, even if there is an exception.

output_index
class labbench.Sequence(*specification, shared_names=[], input_table=None)

Bases: Ownable

An experimental procedure defined with methods in Rack instances. The input is a specification for sequencing these steps, including support for threading.

Sequence are meant to be defined as attributes of Rack subclasses in instances of the Rack subclasses.

access_spec
cleanup_func
exception_allowlist

alias of NeverRaisedException

return_on_exceptions(exception_or_exceptions, cleanup_func=None)

Configures calls to the bound Sequence to swallow the specified exceptions raised by constitent steps. If an exception is swallowed, subsequent steps Sequence are not executed. The dictionary of return values from each Step is returned with an additional ‘exception’ key indicating the type of the exception that occurred.

class labbench.SerialDevice(resource: str, *, timeout: float | None = None, write_timeout: float | None = None, baud_rate: int = 9600, parity: str = 'N', stopbits: float | None = None, xonxoff: bool = False, rtscts: bool | None = False, dsrdtr: bool | None = False, bytesize: int | None = 8)

Bases: Device

Base class for wrappers that communicate via pyserial.

This implementation is very sparse because there is in general no messaging string format for serial devices.

- backend

control object, after open

Type:

serial.Serial

baud_rate: int = 9600

data rate of the physical serial connection. (bytes/s)

  • Logging events are triggered on each access, and are stored as a key or column

  • Minimum: 1 bytes/s

bytesize: int | None = 8

(bits)

  • Allowed values are {None, 5, 6, 8, 7}

  • Logging events are triggered on each access, and are stored as a key or column

close()

Disconnect the serial instrument

dsrdtr: bool | None = False

whether to enable hardware (DSR/DTR) flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

classmethod from_hwid(hwid=None, *args, **connection_params) SerialDevice

Instantiate a new SerialDevice from a windows `hwid’ string instead of a comport resource. A hwid string in windows might look something like:

r’PCIVEN_8086&DEV_9D3D&SUBSYS_06DC1028&REV_213&11583659&1&B3’

classmethod from_url(url, **kws) SerialDevice
property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

static list_ports(hwid=None)

List USB serial devices on the computer

Returns:

list of port resource information

open()

Connect to the serial device with the VISA resource string defined in self.resource

parity: str = 'N'

parity in the physical serial connection.

  • Allowed values are {‘E’, ‘M’, ‘O’, ‘S’, ‘N’}

  • Logging events are triggered on each access, and are stored as a key or column

  • Case sensitive

resource: str

platform-dependent serial port address or URL

  • Logging event stored in metadata log after first access

  • Case sensitive

rtscts: bool | None = False

whether to enable hardware (RTS/CTS) flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

stopbits: float | None = None

(bits)

  • Allowed values are {1, 2, None, 1.5}

  • Logging events are triggered on each access, and are stored as a key or column

timeout: float | None = None

max wait time on reads before raising TimeoutError (s)

  • Minimum: 0 s

  • Logging events are triggered on each access, and are stored as a key or column

write_timeout: float | None = None

max wait time on writes before raising TimeoutError. (s)

  • Minimum: 0 s

  • Logging events are triggered on each access, and are stored as a key or column

xonxoff: bool = False

whether to enable software flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

class labbench.SerialLoggingDevice(resource: str, *, timeout: float | None = None, write_timeout: float | None = None, baud_rate: int = 9600, parity: str = 'N', stopbits: float | None = None, xonxoff: bool = False, rtscts: bool | None = False, dsrdtr: bool | None = False, bytesize: int | None = 8, poll_rate: float = 0.1, stop_timeout: float = 0.5, max_queue_size: int = 100000)

Bases: SerialDevice

Manage connection, acquisition, and data retreival on a device that streams logs over serial in a background thread. maintaining their own threads, and blocking during setup or stop command execution.

Listener objects must implement an attach method with one argument consisting of the queue that the device manager uses to push data from the serial port.

baud_rate: int = 9600

data rate of the physical serial connection. (bytes/s)

  • Logging events are triggered on each access, and are stored as a key or column

  • Minimum: 1 bytes/s

bytesize: int | None = 8

(bits)

  • Allowed values are {None, 5, 6, 8, 7}

  • Logging events are triggered on each access, and are stored as a key or column

clear()

Throw away any log data in the buffer.

close()
dsrdtr: bool | None = False

whether to enable hardware (DSR/DTR) flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

fetch()

Retrieve and return any log data in the buffer.

Returns:

any bytes in the buffer

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

max_queue_size: int = 100000

bytes to allocate in the data retreival buffer

  • Logging events are triggered on each access, and are stored as a key or column

  • Minimum: 1

parity: str = 'N'

parity in the physical serial connection.

  • Allowed values are {‘E’, ‘M’, ‘O’, ‘S’, ‘N’}

  • Logging events are triggered on each access, and are stored as a key or column

  • Case sensitive

poll_rate: float = 0.1

Data retreival rate from the device (in seconds)

  • Minimum: 0

  • Logging events are triggered on each access, and are stored as a key or column

resource: str

platform-dependent serial port address or URL

  • Logging event stored in metadata log after first access

  • Case sensitive

rtscts: bool | None = False

whether to enable hardware (RTS/CTS) flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

running()

Check whether the logger is running.

Returns:

True if the logger is running

start()

Start a background thread that acquires log data into a queue.

Returns:

None

stop()

Stops the logger acquisition if it is running. Returns silently otherwise.

Returns:

None

stop_timeout: float = 0.5

delay after stop before terminating run thread

  • Minimum: 0

  • Logging events are triggered on each access, and are stored as a key or column

stopbits: float | None = None

(bits)

  • Allowed values are {1, 2, None, 1.5}

  • Logging events are triggered on each access, and are stored as a key or column

timeout: float | None = None

max wait time on reads before raising TimeoutError (s)

  • Minimum: 0 s

  • Logging events are triggered on each access, and are stored as a key or column

write_timeout: float | None = None

max wait time on writes before raising TimeoutError. (s)

  • Minimum: 0 s

  • Logging events are triggered on each access, and are stored as a key or column

xonxoff: bool = False

whether to enable software flow control on open

  • Logging events are triggered on each access, and are stored as a key or column

class labbench.ShellBackend(*, background_timeout: float = 1)

Bases: Device

A wrapper for running shell commands.

This is a thin wrapper around the subprocess module. Data can be captured from standard output, and standard error pipes, and optionally run as a background thread.

After opening, backend is None. On a call to run(background=True), backend becomes is a subprocess instance. When EOF is reached on the executable’s stdout, the backend resets to None.

When run is called, the program runs in a subprocess. The output piped to the command line standard output is queued in a background thread. Call read_stdout() to retreive (and clear) this queued stdout.

background_timeout: float = 1

wait time after close before killing background processes (s)

  • Minimum: 0 s

  • Logging event stored in metadata log after first access

clear_stdout()

Clear queued standard output. Subsequent calls to self.read_stdout() will return ‘’.

close()
property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

kill()

If a process is running in the background, kill it. Sends a console warning if no process is running.

open()

The open() method implements opening in the Device object protocol. Call the execute() method when open to execute the binary.

read_stdout(wait_for=0)

Pop any standard output that has been queued by a background run (see run). Afterward, the queue is cleared. Starting another background run also clears the queue.

Returns:

stdout

run(*argv, pipe=True, background=False, check_return=True, raise_on_stderr=False, respawn=False, timeout=None)
running()

Check whether a background process is running.

Returns:

True if running, otherwise False

write_stdin(text)

Write characters to stdin if a background process is running. Raises Exception if no background process is running.

class labbench.TelnetDevice(resource: str, *, timeout: float = 2)

Bases: Device

A general base class for communication devices via telnet. Unlike (for example) VISA instruments, there is no standardized command format like SCPI. The implementation is therefore limited to open and close, which open or close a pyserial connection object: the backend attribute. Subclasses can read or write with the backend attribute like they would any other telnetlib instance.

A TelnetDevice resource string is an IP address. The port is specified by port. These can be set when you instantiate the TelnetDevice or by setting them afterward in value traits.

Subclassed devices that need property trait descriptors will need to implement get_key and set_key methods to implement the property trait set and get operations (as appropriate).

close()

Disconnect the telnet connection

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()

Open a telnet connection to the host defined by the string in self.resource

resource: str

server host address

  • Logging event stored in metadata log after first access

  • Case sensitive

timeout: float = 2

connection timeout (s)

  • Minimum: 0 s

  • Logging events are triggered on each access, and are stored as a key or column

labbench.Undefined

alias of _empty

class labbench.VISADevice(resource: str | None = None, *, read_termination: str = '\n', write_termination: str = '\n', open_timeout: float | None = None, timeout: float | None = None)

Bases: Device

A wrapper for VISA device automation.

This exposes pyvisa instrument automation capabilities in a labbench object. Automatic connection based on make and model is supported.

Customized operation for specific instruments should be implemented in subclasses.

Examples

Connect to a VISA device using a known resource string:

with VISADevice('USB0::0x2A8D::0x1E01::SG56360004::INSTR') as instr:
    print(inst)

Probe available connections and print valid VISADevice constructors:

print(visa_probe_devices())

Probe details of available connections and identity strings on the command line:

labbench visa-probe

Connect to instrument with serial number ‘SG56360004’ and query ‘:FETCH?’ CSV:

with VISADevice('SG56360004') as instr:
    print(inst.query_ascii_values(':FETCH?'))
backend

instance of a pyvisa instrument object (when open)

Type:

pyvisa.Resource

close()

closes the instrument.

When managing device connection through a with context, this is called automatically and does not need to be invoked.

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

make: str | None = None

device manufacturer name used to autodetect resource string

  • Logging event stored in metadata log after first access

  • Case sensitive

model: str | None = None

device model used to autodetect resource string

  • Logging event stored in metadata log after first access

  • Case sensitive

open()

opens the instrument.

When managing device connection through a with context, this is called automatically and does not need to be invoked.

open_timeout: float | None = None

timeout for opening a connection to the instrument (s)

  • Logging events are triggered on each access, and are stored as a key or column

overlap_and_block(timeout: float = None, quiet: bool = False, query_func: callable = None)

context manager that sends ‘*OPC’ on entry, and performs a blocking ‘*OPC?’ query on exit.

By convention, these SCPI commands give a hint to the instrument that commands sent inside this block may be executed concurrently. The context exit then blocks until all of the commands have completed.

Example:

with inst.overlap_and_block():
    inst.write('long running command 1')
    inst.write('long running command 2')
Parameters:
  • timeout – maximum time to wait for ‘*OPC?’ reply, or None to use self.backend.timeout

  • quiet – Suppress timeout exceptions if this evaluates as True

Raises:

TimeoutError – on ‘*OPC?’ query timeout

preset()

sends ‘*RST’ to reset the instrument to preset

query(msg: str, timeout=None, remap: bool = False, kws: dict[str, Any] = {}) str

queries the device with an SCPI message and returns its reply.

Handles debug logging and adjustments when in overlap_and_block contexts as appropriate.

Parameters:

msg – the SCPI message to send

query_ascii_values(msg: str, converter='f', separator=', ', container=<class 'list'>, delay=None, timeout=None)
read_termination: str = '\n'

end-of-line string to delineate the end of ascii query replies

  • Logging event stored in metadata log after first access

  • Case sensitive

resource: str | None = None

VISA resource addressing string for device connection

  • Logging event stored in metadata log after first access

  • Case sensitive

property serial: str

device-reported serial number

  • Cannot be set after device creation

  • Logging event stored in metadata log after first access

  • Case sensitive

property status_byte: dict

instrument status decoded from ‘*STB?’

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

class suppress_timeout(*exceptions)

Bases: suppress

context manager that suppresses timeout exceptions on write or query.

Example:

with inst.suppress_timeout():
    inst.write('long command 1')
    inst.write('long command 2')

If the command 1 raises an exception, command 2 will not execute
the context block is complete, and the exception from command 1
is swallowed.
timeout: float | None = None

message response timeout (s)

  • Logging event stored in metadata log after first access

wait()

sends ‘*WAI’ to wait for all commands to complete before continuing

write(msg: str, kws: dict[str, Any] = {})

sends an SCPI message to the device.

Wraps self.backend.write, and handles debug logging and adjustments when in overlap_and_block contexts as appropriate.

Parameters:

msg – the SCPI command to send

Returns:

None

write_termination: str = '\n'

end-of-line string to send after writes

  • Logging event stored in metadata log after first access

  • Case sensitive

class labbench.Win32ComDevice

Bases: Device

Basic support for calling win32 COM APIs.

a dedicated background thread. Set concurrency=True to decide whether this thread support wrapper is applied to the dispatched Win32Com object.

com_object: str = ''

the win32com object string

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

  • Case sensitive

concurrency: bool = True

if False, enforces locking for single-threaded access

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

property isopen: bool

True if the backend is ready for use

  • Cannot be set after device creation

  • Logging events are triggered on each access, and are stored as a key or column

open()

Connect to the win32 com object

labbench.concurrently(*objs, **kws)
If *objs are callable (like functions), call each of

*objs in concurrent threads. If *objs are context managers (such as Device instances to be connected), enter each context in concurrent threads.

Multiple references to the same function in objs only result in one call. The catch and nones arguments may be callables, in which case they are executed (and each flag value is treated as defaults).

Parameters:
  • objs – each argument may be a callable (function or class that defines a __call__ method), or context manager (such as a Device instance)

  • catch – if False (the default), a ConcurrentException is raised if any of funcs raise an exception; otherwise, any remaining successful calls are returned as normal

  • nones – if not callable and evalues as True, includes entries for calls that return None (default is False)

  • flatten – if True, results of callables that returns a dictionary are merged into the return dictionary with update (instead of passed through as dictionaries)

  • traceback_delay – if False, immediately show traceback information on a thread exception; if True (the default), wait until all threads finish

  • operation – if ‘auto’ (default), try to determine automatically; otherwise, ‘context’ or ‘call’

Returns:

the values returned by each call

Return type:

dictionary keyed by function name

Here are some examples:

Example:

Call each function myfunc1 and myfunc2, each with no arguments:

>>> def do_something_1 ():
>>>     time.sleep(0.5)
>>>     return 1
>>> def do_something_2 ():
>>>     time.sleep(1)
>>>     return 2
>>> rets = concurrent(myfunc1, myfunc2)
>>> rets[do_something_1]
Example:

To pass arguments, use the Call wrapper

>>> def do_something_3 (a,b,c):
>>>     time.sleep(2)
>>>     return a,b,c
>>> rets = concurrent(myfunc1, Call(myfunc3, a, b, c=c))
>>> rets[do_something_3]
a, b, c

Caveats

  • Because the calls are in different threads, not different processes, this should be used for IO-bound functions (not CPU-intensive functions).

  • Be careful about thread-safety.

labbench.dump_rack(rack: Rack, output_path: Path, sourcepath: Path, pythonpath: Path | None = None, exist_ok: bool = False, with_defaults: bool = False, skip_tables: bool = False)
labbench.find_owned_rack_by_type(parent_rack: Rack, target_type: Rack, include_parent: bool = True)

return a rack instance of target_type owned by parent_rack. if there is not exactly 1 for target_type, TypeError is raised.

labbench.import_as_rack(import_string: str, *, cls_name: str | None = None, append_path: list = [], base_cls: type = <class 'labbench.Rack'>, replace_attrs: list = ['__doc__', '__module__'])

Creates a Rack subclass with the specified module’s contents. Ownable objects are annotated by type, allowing the resulting class to be instantiated.

Parameters:
  • import_string – for the module that contains the Rack to import

  • cls_name – the name of the Rack subclass to import from the module (or None to build a new subclass with the module contents)

  • base_cls – the base class to use for the new subclass

  • append_path – list of paths to append to sys.path before import

  • replace_attrs – attributes of base_cls to replace from the module

Exceptions:

NameError: if there is an attribute name conflict between the module and base_cls

Returns:

A dynamically created subclass of base_cls

labbench.load_rack(output_path: str, defaults: dict = {}, apply: bool = True) Rack

instantiates a Rack object from a config directory created by dump_rack.

After instantiation, the current working directory is changed to output_path.

class labbench.rack_input_table(table_path: str)

Bases: MethodTaggerDataclass

tag a method defined in a Rack to support execution from a flat table.

In practice, this often means a very long argument list.

Parameters:

table_path – location of the input table

table_path: str
class labbench.rack_kwargs_skip(*arg_names)

Bases: MethodTaggerDataclass

tag a method defined in a Rack to replace a **kwargs argument using the signature of the specified callable.

In practice, this often means a very long argument list.

Parameters:
  • callable_template – replace variable keyword arguments (**kwargs) with the keyword arguments defined in this callable

  • skip – list of column names to omit

skip: list
class labbench.rack_kwargs_template(template: callable | None = None)

Bases: MethodTaggerDataclass

tag a method defined in a Rack to replace a **kwargs argument using the signature of the specified callable.

In practice, this often means a very long argument list.

Parameters:
  • callable_template – replace variable keyword arguments (**kwargs) with the keyword arguments defined in this callable

  • skip – list of column names to omit

template: callable
labbench.read(path_or_buf: str, columns: list[str] | None = None, nrows: int | None = None, format: str = 'auto', **kws)

Read tabular data from a file in one of various formats using pandas.

Parameters:
  • path – path to the data file.

  • columns – a column or iterable of multiple columns to return from the data file, or None (the default) to return all columns

  • nrows – number of rows to read at the beginning of the table, or None (the default) to read all rows

  • format – data file format, one of [‘pickle’,’feather’,’csv’,’json’,’csv’], or ‘auto’ (the default) to guess from the file extension

  • kws – additional keyword arguments to pass to the pandas read_<ext> function matching the file extension

Returns:

pandas.DataFrame instance containing data read from file

labbench.read_relational(path: str | Path, expand_col: str, root_cols: list[str] | None = None, target_cols: list[str] | None = None, root_nrows: int | None = None, root_format: str = 'auto', prepend_column_name: bool = True) DataFrame

Flatten a relational database table by loading the table located each row of root[expand_col]. The value of each column in this row is copied to the loaded table. The columns in the resulting table generated on each row are downselected according to root_cols and target_cols. Each of the resulting tables is concatenated and returned.

The expanded dataframe may be very large, making downselecting a practical necessity in some scenarios.

Parameters:
  • path – file location of the root data table

  • expand_col – name of the columns of the root data file to expand with relational data

  • root_cols – the root columns to include in the expanded dataframe, or None (the default) pass all columns from root

  • target_cols – the root columns to include in the expanded dataframe, or None (the default) to pass all columns loaded from each root[expand_col]

  • prepend_column_name – whether to prepend the name of the expanded column from the root table

Returns:

the expanded dataframe

labbench.retry(exception_or_exceptions: Union[BaseException, typing.Iterable[BaseException]], tries: int = 4, *, delay: float = 0, backoff: float = 0, exception_func=<function <lambda>>, log: bool = True) callable[_Tfunc, _Tfunc]

calls to the decorated function are repeated, suppressing specified exception(s), until a maximum number of retries has been attempted.

If the function raises the exception the specified number of times, the underlying exception is raised. Otherwise, return the result of the function call.

Example

The following retries the telnet connection 5 times on ConnectionRefusedError:

import telnetlib


# Retry a telnet connection 5 times if the telnet library raises ConnectionRefusedError
@retry(ConnectionRefusedError, tries=5)
def open(host, port):
    t = telnetlib.Telnet()
    t.open(host, port, 5)
    return t

Inspired by https://github.com/saltycrane/retry-decorator which is released under the BSD license.

Parameters:
  • exception_or_exceptions – Exception (sub)class (or tuple of exception classes) to watch for

  • tries – number of times to try before giving up

  • delay – initial delay between retries in seconds

  • backoff – backoff to multiply to the delay for each retry

  • exception_func – function to call on exception before the next retry

  • log – whether to emit a log message on the first retry

labbench.sequentially(*objs, **kws)
If *objs are callable (like functions), call each of

*objs in the given order. If *objs are context managers (such as Device instances to be connected), enter each context in the given order, and return a context manager suited for a with statement. This is the sequential implementation of the concurrently function, with a compatible convention of returning dictionaries.

Multiple references to the same function in objs only result in one call. The nones argument may be callables in case they are executed (and each flag value is treated as defaults).

Parameters:
  • objs – callables or context managers or Device instances for connections

  • kws – dictionary of additional callables or Device instances for connections

  • nonesTrue to include dictionary entries for calls that return None (default: False)

  • flattenTrue to flatten any dict return values into the return dictionary

Returns:

a dictionary keyed on the object name containing the return value of each function

Return type:

dictionary of keyed by function

Here are some examples:

Example:

Call each function myfunc1 and myfunc2, each with no arguments:

>>> def do_something_1 ():
>>>     time.sleep(0.5)
>>>     return 1
>>> def do_something_2 ():
>>>     time.sleep(1)
>>>     return 2
>>> rets = concurrent(myfunc1, myfunc2)
>>> rets[do_something_1]
1
Example:

To pass arguments, use the Call wrapper

>>> def do_something_3 (a,b,c):
>>>     time.sleep(2)
>>>     return a,b,c
>>> rets = concurrent(myfunc1, Call(myfunc3, a, b, c=c))
>>> rets[do_something_3]
a, b, c

Caveats

  • Unlike concurrently, an exception in a context manager’s __enter__ means that any remaining context managers will not be entered.

labbench.shell_options_from_keyed_values(device: ~labbench.Device, skip_none=True, hide_false: bool = False, join_str: typing_extensions.Literal[False] | str = False, remap: dict = {}, converter: callable = <class 'str'>) list[str]

generate a list of command line argument strings based on :module:`labbench.paramattr.value` descriptors in device.

Each of these descriptors defined with key may be treated as a command line option. Value descriptors are ignored when key is unset. The value for each option is determined by fetching the corresponding attribute from device.

The returned list of strings can be used to build the argv needed to run shell commands using ShellBackend or the subprocess module.

Parameters:
  • device – the device containing values to broadcast into command-line arguments

  • skip_none – if True, no command-line argument string is generated for values that are unset in devices

  • hide_false – if True, boolean options are treated as a flag (e.g., this triggers argument strings are omitted for False values)

  • join_str – a string to use to join option (name, value) pairs, or False to generate as separate strings

  • remap – a dictionary of {python_value: string_value} pairs to accommodate special cases in string conversion

  • converter – function to use to convert the values to strings

Example

Simple boolean options and flags:

>>> import labbench as lb
>>> class ShellCopy(ShellBackend):
...     recursive: bool = attr.value.bool(False, key='-R')
>>> cp = ShellCopy(recursive=True)
>>> print(shell_options_from_keyed_values(cp, hide_false=True))
['-R']
>>> print(shell_options_from_keyed_values(cp, remap={True: 'yes', False: 'no'}))
['-R', 'yes']

Example

A non-boolean option:

>>> class DiskDuplicate(ShellBackend):
...     block_size: str = attr.value.str('1M', key='bs')
>>> dd = DiskDuplicate()
>>> dd.block_size = '1024k'
>>> print(shell_options_from_keyed_values(dd, join_str='='))
['bs','1024k']
>>> print(shell_options_from_keyed_values(dd, join_str='='))
['bs=1024k']
labbench.show_messages(minimum_level: typing_extensions.Literal[debug] | typing_extensions.Literal[warning] | typing_extensions.Literal[error] | typing_extensions.Literal[info] | typing_extensions.Literal[critical] | typing_extensions.Literal[False] | None, colors: bool = True)

filters logging messages displayed to the console by importance

Parameters:

minimum_level – ‘debug’, ‘warning’, ‘error’, or None (to disable all output)

Returns:

None

labbench.sleep(seconds: float, tick=1.0)

Drop-in replacement for time.sleep that raises ConcurrentException if another thread requests that all threads stop.

labbench.stopwatch(desc: str = '', threshold: float = 0, logger_level: typing_extensions.Literal[debug] | typing_extensions.Literal[warning] | typing_extensions.Literal[error] | typing_extensions.Literal[info] | typing_extensions.Literal[critical] = 'info')

Time a block of code using a with statement like this:

>>> with stopwatch('sleep statement'):
>>>     time.sleep(2)
sleep statement time elapsed 1.999s.
Parameters:
  • desc – text for display that describes the event being timed

  • threshold – only show timing if at least this much time (in s) elapsed

: :returns: context manager

labbench.timeout_iter(duration)

sets a timer for duration seconds, yields time elapsed as long as timeout has not been reached

labbench.until_timeout(exception_or_exceptions: Union[BaseException, typing.Iterable[BaseException]], timeout: float, delay: float = 0, backoff: float = 0, exception_func: callable = <function <lambda>>) callable[_Tfunc, _Tfunc]

calls to the decorated function are repeated, suppressing specified exception(s), until the specified timeout period has expired.

  • If the timeout expires, the underlying exception is raised.

  • Otherwise, return the result of the function call.

Inspired by https://github.com/saltycrane/retry-decorator which is released under the BSD license.

Example

The following retries the telnet connection for 5 seconds on ConnectionRefusedError:

import telnetlib


@until_timeout(ConnectionRefusedError, 5)
def open(host, port):
    t = telnetlib.Telnet()
    t.open(host, port, 5)
    return t
Parameters:
  • exception_or_exceptions – Exception (sub)class (or tuple of exception classes) to watch for

  • timeout – time in seconds to continue calling the decorated function while suppressing exception_or_exceptions

  • delay – initial delay between retries in seconds

  • backoff – backoff to multiply to the delay for each retry

  • exception_func – function to call on exception before the next retry

labbench.visa_default_resource_manager(name: str | None = None)

set the pyvisa resource manager used by labbench.

Parameters:

name – the name of the resource manager, such as @py’, @sim-labbench’, or @ivi

labbench.visa_list_resources(resourcemanager: str | None = None) list[str]

autodetects and returns a list of valid VISADevice resource strings

labbench.visa_probe_devices(target: VISADevice | None = None, skip_interfaces: list[str] = [], open_timeout: float = 0.5, timeout: float = 0.25) list[VISADevice]

discover devices available for communication and their required connection settings.

Each returned VISADevice is and set with a combination of resource, read_termination, and write_termination that establishes communication, and the make and model reported by the instrument. The pyvisa resource manager is set by the most recent call to visa_default_resource_manager.

The probe mechanism is to open a connection to each available resource, and if successful, query the instrument identity (’*IDN?’). Discovery will fail for devices that do not support this message.

Parameters:
  • target – if specified, return only devices that match target.make and target.model

  • skip_interfaces – do not probe interfaces that begin with these strings (case-insensitive)

  • open_timeout – timeout on resource open (in s)

  • timeout – timeout on identity query (in s)