labbench¶
- class labbench.CSVLogger(path: Path, *, append: bool = False, text_relational_min: int = 1024, force_relational: list[str] = ['host_log'], nonscalar_file_type: str = 'csv', tar: bool = False)¶
Bases:
ParamAttrLogger
Manage logging of experimental data and methods into CSV files.
Explicit save methods are exposed for arbitrary custom data. Automatic logging is performed for
Access to parameters of
labbench.Device
objects that are defined withlabbench.paramattr.value
,
Function calls to methods of
labbench.Rack
Metadata
- Parameters:
path – Base path to use for the root database
overwrite – Whether to overwrite the root database if it exists (otherwise, append)
text_relational_min – Text with at least this many characters is stored as a relational text file instead of directly in the database
force_relational – A list of columns that should always be stored as relational data instead of directly in the database
nonscalar_file_type – The data type to use in non-scalar (tabular, vector, etc.) relational data
tar – Whether to store the relational data within directories in a tar file, instead of subdirectories
- INPUT_FILE_NAME = 'inputs.csv'¶
- OUTPUT_FILE_NAME = 'outputs.csv'¶
- ROOT_FILE_NAME = 'outputs.csv'¶
- nonscalar_file_type = 'csv'¶
- open()¶
Instead of calling open directly, consider using with statements to guarantee proper disconnection if there is an error. For example, the following sets up a connected instance:
with CSVLogger('my.csv') as db: ### do the data acquisition here pass
would instantiate a CSVLogger instance, and also guarantee a final attempt to write unwritten data is written, and that the file is closed when exiting the with block, even if there is an exception.
- output_index = 0¶
- tables = {}¶
- class labbench.Call(func: callable, *args, **kws)¶
Bases:
object
Wrap a function to apply arguments for threaded calls to concurrently. This can be passed in directly by a user in order to provide arguments; otherwise, it will automatically be wrapped inside concurrently to keep track of some call metadata during execution.
- rename(name)¶
- set_queue(queue)¶
Set the queue object used to communicate between threads
- class labbench.Device¶
Bases:
DeviceDataClass
base class for labbench device wrappers.
Drivers that subclass Device share
standardized connection management via context blocks (the with statement)
hooks for automatic data logging and heads-up displays
API style consistency
bounds checking and casting for typed attributes
Note
This Device base class has convenience functions for device control, but no implementation.
Some wrappers for particular APIs labbench Device subclasses:
VISADevice: pyvisa,
ShellBackend: binaries and scripts
Serial: pyserial
DotNetDevice: pythonnet
(and others). If you are implementing a driver that uses one of these backends, inherit from the corresponding class above, not Device.
- close()¶
Backend implementations must overload this to disconnect an existing connection to the resource encapsulated in the object. This will be called without super().close().
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
Backend implementations may overload this to open a backend connection to the resource. This will be called without super().open().
- class labbench.DotNetDevice¶
Bases:
Device
Base class for .NET library wrappers based on pythonnet.
To implement a DotNetDevice subclass:
import labbench as lb class MyLibraryWrapper(lb.DotNetDevice, libary=<imported python module colocated with dll>, dll_name='mylibrary.dll') ...
When a DotNetDevice is instantiated, it looks to load the dll from the location of the python module and dll_name.
- - `backend` is None after open and is available for replacement by the subclass
- dll_name: str | None = None¶
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
Case sensitive
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- library: object | None = None¶
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
dynamically import a .net CLR as a python module at self.dll
- class labbench.Email(resource: str = 'smtp.nist.gov', *, port: int = 25, sender: str = 'myemail@nist.gov', recipients: list = ['myemail@nist.gov'], success_message: str | None = 'Test finished normally', failure_message: str | None = 'Exception ended test early')¶
Bases:
Device
Sends a notification message on disconnection. If an exception was thrown, this is a failure subject line with traceback information in the main body. Otherwise, the message is a success message in the subject line. Stderr is also sent.
- close()¶
- failure_message: str | None = 'Exception ended test early'¶
subject line for test failure emails (None to suppress the emails)
Logging event stored in metadata log after first access
Case sensitive
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
- port: int = 25¶
TCP/IP port
Logging event stored in metadata log after first access
Minimum: 1
- recipients: list = ['myemail@nist.gov']¶
list of email addresses of recipients
Logging event stored in metadata log after first access
- resource: str = 'smtp.nist.gov'¶
smtp server to use
Logging event stored in metadata log after first access
Case sensitive
- send_summary()¶
Send the email containing the final property trait of the test.
- sender: str = 'myemail@nist.gov'¶
email address of the sender
Logging event stored in metadata log after first access
Case sensitive
- success_message: str | None = 'Test finished normally'¶
subject line for test success emails (None to suppress the emails)
Logging event stored in metadata log after first access
Case sensitive
- class labbench.LabviewSocketInterface(resource: str = '127.0.0.1', *, tx_port: int = 61551, rx_port: int = 61552, delay: float = 1, timeout: float = 2, rx_buffer_size: int = 1024)¶
Bases:
Device
Base class demonstrating simple sockets-based control of a LabView VI.
Keyed get/set with attr.property are implemented by simple ‘ command value’. Subclasses can therefore implement support for commands in specific labview VI similar to VISA commands by assigning the commands implemented in the corresponding labview VI.
- - backend
connection object mapping {‘rx’: rxsock, ‘tx’: txsock}
- clear()¶
Clear any data present in the read socket buffer.
- close()¶
- delay: float = 1¶
time to wait after each property trait write or query
Minimum: 0
Logging events are triggered on each access, and are stored as a key or column
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
- read(convert_func=None)¶
Receive from the rx socket until self.rx_buffer_size samples are received or timeout happens after self.timeout seconds.
Optionally, apply the conversion function to the value after it is received.
- resource: str = '127.0.0.1'¶
LabView VI host address
Logging events are triggered on each access, and are stored as a key or column
Case sensitive
- rx_buffer_size: int = 1024¶
Logging events are triggered on each access, and are stored as a key or column
Minimum: 1
- rx_port: int = 61552¶
TX port to send to the LabView VI
Logging events are triggered on each access, and are stored as a key or column
- timeout: float = 2¶
maximum wait replies before raising TimeoutError
Minimum: 0
Logging events are triggered on each access, and are stored as a key or column
- tx_port: int = 61551¶
TX port to send to the LabView VI
Logging events are triggered on each access, and are stored as a key or column
- write(msg)¶
Send a string over the tx socket.
- class labbench.Rack¶
Bases:
Owner
,Ownable
A Rack provides context management and methods for groups of Device instances.
The Rack object provides connection management for all devices and data managers for with block:
with Rack() as testbed: # use the testbed here pass
For functional validation, it is also possible to open only a subset of devices like this:
testbed = Rack() with testbed.dev1, testbed.dev2: # use the testbed.dev1 and testbed.dev2 here pass
The following syntax creates a new Rack class for an experiment:
import labbench as lb
- class MyRack(lb.Rack):
db = lb.SQLiteManager() sa = MySpectrumAnalyzer()
spectrogram = Spectrogram(db=db, sa=sa)
- class labbench.SQLiteLogger(path: Path, *, append: bool = False, text_relational_min: int = 1024, force_relational: list[str] = ['host_log'], nonscalar_file_type: str = 'csv', tar: bool = False)¶
Bases:
ParamAttrLogger
Store data and property traits to disk into an an sqlite database.
This extends
Aggregator
to supportqueuing aggregate property trait of devices by lists of dictionaries;
custom metadata in each queued aggregate property trait entry; and
custom response to non-scalar data (such as relational databasing).
- Parameters:
path – Base path to use for the root database
overwrite – Whether to overwrite the root database if it exists (otherwise, append)
text_relational_min – Text with at least this many characters is stored as a relational text file instead of directly in the database
force_relational – A list of columns that should always be stored as relational data instead of directly in the database
nonscalar_file_type – The data type to use in non-scalar (tabular, vector, etc.) relational data
tar – Whether to store the relational data within directories in a tar file, instead of subdirectories
- INDEX_LABEL¶
- OUTPUT_TABLE_NAME¶
- ROOT_FILE_NAME¶
- close()¶
- committed¶
- inprogress¶
- key(name, attr)¶
The key determines the SQL column name. df.to_sql does not seem to support column names that include spaces
- open()¶
Instead of calling open directly, consider using with statements to guarantee proper disconnection if there is an error. For example, the following sets up a connected instance:
with SQLiteLogger('my.db') as db: ### do the data acquisition here pass
would instantiate a SQLiteLogger instance, and also guarantee a final attempt to write unwritten data is written, and that the file is closed when exiting the with block, even if there is an exception.
- output_index¶
- class labbench.Sequence(*specification, shared_names=[], input_table=None)¶
Bases:
Ownable
An experimental procedure defined with methods in Rack instances. The input is a specification for sequencing these steps, including support for threading.
Sequence are meant to be defined as attributes of Rack subclasses in instances of the Rack subclasses.
- access_spec¶
- cleanup_func¶
- exception_allowlist¶
alias of
NeverRaisedException
- return_on_exceptions(exception_or_exceptions, cleanup_func=None)¶
Configures calls to the bound Sequence to swallow the specified exceptions raised by constitent steps. If an exception is swallowed, subsequent steps Sequence are not executed. The dictionary of return values from each Step is returned with an additional ‘exception’ key indicating the type of the exception that occurred.
- class labbench.SerialDevice(resource: str, *, timeout: float | None = None, write_timeout: float | None = None, baud_rate: int = 9600, parity: str = 'N', stopbits: float | None = None, xonxoff: bool = False, rtscts: bool | None = False, dsrdtr: bool | None = False, bytesize: int | None = 8)¶
Bases:
Device
Base class for wrappers that communicate via pyserial.
This implementation is very sparse because there is in general no messaging string format for serial devices.
- - backend
control object, after open
- Type:
serial.Serial
- baud_rate: int = 9600¶
data rate of the physical serial connection. (bytes/s)
Logging events are triggered on each access, and are stored as a key or column
Minimum: 1 bytes/s
- bytesize: int | None = 8¶
(bits)
Allowed values are {None, 5, 6, 8, 7}
Logging events are triggered on each access, and are stored as a key or column
- close()¶
Disconnect the serial instrument
- dsrdtr: bool | None = False¶
whether to enable hardware (DSR/DTR) flow control on open
Logging events are triggered on each access, and are stored as a key or column
- classmethod from_hwid(hwid=None, *args, **connection_params) SerialDevice ¶
Instantiate a new SerialDevice from a windows `hwid’ string instead of a comport resource. A hwid string in windows might look something like:
r’PCIVEN_8086&DEV_9D3D&SUBSYS_06DC1028&REV_213&11583659&1&B3’
- classmethod from_url(url, **kws) SerialDevice ¶
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- static list_ports(hwid=None)¶
List USB serial devices on the computer
- Returns:
list of port resource information
- open()¶
Connect to the serial device with the VISA resource string defined in self.resource
- parity: str = 'N'¶
parity in the physical serial connection.
Allowed values are {‘E’, ‘M’, ‘O’, ‘S’, ‘N’}
Logging events are triggered on each access, and are stored as a key or column
Case sensitive
- resource: str¶
platform-dependent serial port address or URL
Logging event stored in metadata log after first access
Case sensitive
- rtscts: bool | None = False¶
whether to enable hardware (RTS/CTS) flow control on open
Logging events are triggered on each access, and are stored as a key or column
- stopbits: float | None = None¶
(bits)
Allowed values are {1, 2, None, 1.5}
Logging events are triggered on each access, and are stored as a key or column
- timeout: float | None = None¶
max wait time on reads before raising TimeoutError (s)
Minimum: 0 s
Logging events are triggered on each access, and are stored as a key or column
- write_timeout: float | None = None¶
max wait time on writes before raising TimeoutError. (s)
Minimum: 0 s
Logging events are triggered on each access, and are stored as a key or column
- xonxoff: bool = False¶
whether to enable software flow control on open
Logging events are triggered on each access, and are stored as a key or column
- class labbench.SerialLoggingDevice(resource: str, *, timeout: float | None = None, write_timeout: float | None = None, baud_rate: int = 9600, parity: str = 'N', stopbits: float | None = None, xonxoff: bool = False, rtscts: bool | None = False, dsrdtr: bool | None = False, bytesize: int | None = 8, poll_rate: float = 0.1, stop_timeout: float = 0.5, max_queue_size: int = 100000)¶
Bases:
SerialDevice
Manage connection, acquisition, and data retreival on a device that streams logs over serial in a background thread. maintaining their own threads, and blocking during setup or stop command execution.
Listener objects must implement an attach method with one argument consisting of the queue that the device manager uses to push data from the serial port.
- baud_rate: int = 9600¶
data rate of the physical serial connection. (bytes/s)
Logging events are triggered on each access, and are stored as a key or column
Minimum: 1 bytes/s
- bytesize: int | None = 8¶
(bits)
Allowed values are {None, 5, 6, 8, 7}
Logging events are triggered on each access, and are stored as a key or column
- clear()¶
Throw away any log data in the buffer.
- close()¶
- dsrdtr: bool | None = False¶
whether to enable hardware (DSR/DTR) flow control on open
Logging events are triggered on each access, and are stored as a key or column
- fetch()¶
Retrieve and return any log data in the buffer.
- Returns:
any bytes in the buffer
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- max_queue_size: int = 100000¶
bytes to allocate in the data retreival buffer
Logging events are triggered on each access, and are stored as a key or column
Minimum: 1
- parity: str = 'N'¶
parity in the physical serial connection.
Allowed values are {‘E’, ‘M’, ‘O’, ‘S’, ‘N’}
Logging events are triggered on each access, and are stored as a key or column
Case sensitive
- poll_rate: float = 0.1¶
Data retreival rate from the device (in seconds)
Minimum: 0
Logging events are triggered on each access, and are stored as a key or column
- resource: str¶
platform-dependent serial port address or URL
Logging event stored in metadata log after first access
Case sensitive
- rtscts: bool | None = False¶
whether to enable hardware (RTS/CTS) flow control on open
Logging events are triggered on each access, and are stored as a key or column
- running()¶
Check whether the logger is running.
- Returns:
True if the logger is running
- start()¶
Start a background thread that acquires log data into a queue.
- Returns:
None
- stop()¶
Stops the logger acquisition if it is running. Returns silently otherwise.
- Returns:
None
- stop_timeout: float = 0.5¶
delay after stop before terminating run thread
Minimum: 0
Logging events are triggered on each access, and are stored as a key or column
- stopbits: float | None = None¶
(bits)
Allowed values are {1, 2, None, 1.5}
Logging events are triggered on each access, and are stored as a key or column
- timeout: float | None = None¶
max wait time on reads before raising TimeoutError (s)
Minimum: 0 s
Logging events are triggered on each access, and are stored as a key or column
- write_timeout: float | None = None¶
max wait time on writes before raising TimeoutError. (s)
Minimum: 0 s
Logging events are triggered on each access, and are stored as a key or column
- xonxoff: bool = False¶
whether to enable software flow control on open
Logging events are triggered on each access, and are stored as a key or column
- class labbench.ShellBackend(*, background_timeout: float = 1)¶
Bases:
Device
A wrapper for running shell commands.
This is a thin wrapper around the subprocess module. Data can be captured from standard output, and standard error pipes, and optionally run as a background thread.
After opening, backend is None. On a call to run(background=True), backend becomes is a subprocess instance. When EOF is reached on the executable’s stdout, the backend resets to None.
When run is called, the program runs in a subprocess. The output piped to the command line standard output is queued in a background thread. Call read_stdout() to retreive (and clear) this queued stdout.
- background_timeout: float = 1¶
wait time after close before killing background processes (s)
Minimum: 0 s
Logging event stored in metadata log after first access
- clear_stdout()¶
Clear queued standard output. Subsequent calls to self.read_stdout() will return ‘’.
- close()¶
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- kill()¶
If a process is running in the background, kill it. Sends a console warning if no process is running.
- open()¶
The
open()
method implements opening in theDevice
object protocol. Call theexecute()
method when open to execute the binary.
- read_stdout(wait_for=0)¶
Pop any standard output that has been queued by a background run (see run). Afterward, the queue is cleared. Starting another background run also clears the queue.
- Returns:
stdout
- run(*argv, pipe=True, background=False, check_return=True, raise_on_stderr=False, respawn=False, timeout=None)¶
- running()¶
Check whether a background process is running.
- Returns:
True if running, otherwise False
- write_stdin(text)¶
Write characters to stdin if a background process is running. Raises Exception if no background process is running.
- class labbench.TelnetDevice(resource: str, *, timeout: float = 2)¶
Bases:
Device
A general base class for communication devices via telnet. Unlike (for example) VISA instruments, there is no standardized command format like SCPI. The implementation is therefore limited to open and close, which open or close a pyserial connection object: the backend attribute. Subclasses can read or write with the backend attribute like they would any other telnetlib instance.
A TelnetDevice resource string is an IP address. The port is specified by port. These can be set when you instantiate the TelnetDevice or by setting them afterward in value traits.
Subclassed devices that need property trait descriptors will need to implement get_key and set_key methods to implement the property trait set and get operations (as appropriate).
- close()¶
Disconnect the telnet connection
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
Open a telnet connection to the host defined by the string in self.resource
- resource: str¶
server host address
Logging event stored in metadata log after first access
Case sensitive
- timeout: float = 2¶
connection timeout (s)
Minimum: 0 s
Logging events are triggered on each access, and are stored as a key or column
- labbench.Undefined¶
alias of
_empty
- class labbench.VISADevice(resource: str | None = None, *, read_termination: str = '\n', write_termination: str = '\n', open_timeout: float | None = None, timeout: float | None = None)¶
Bases:
Device
A wrapper for VISA device automation.
This exposes pyvisa instrument automation capabilities in a labbench object. Automatic connection based on make and model is supported.
Customized operation for specific instruments should be implemented in subclasses.
Examples
Connect to a VISA device using a known resource string:
with VISADevice('USB0::0x2A8D::0x1E01::SG56360004::INSTR') as instr: print(inst)
Probe available connections and print valid VISADevice constructors:
print(visa_probe_devices())
Probe details of available connections and identity strings on the command line:
labbench visa-probe
Connect to instrument with serial number ‘SG56360004’ and query ‘:FETCH?’ CSV:
with VISADevice('SG56360004') as instr: print(inst.query_ascii_values(':FETCH?'))
See also
- Pure python backend installation:
https://pyvisa.readthedocs.io/projects/pyvisa-py/en/latest/installation.html
- Proprietary backend installation:
https://pyvisa.readthedocs.io/en/latest/faq/getting_nivisa.html#faq-getting-nivisa
- Resource strings and basic configuration:
- backend¶
instance of a pyvisa instrument object (when open)
- Type:
pyvisa.Resource
- close()¶
closes the instrument.
When managing device connection through a with context, this is called automatically and does not need to be invoked.
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- make: str | None = None¶
device manufacturer name used to autodetect resource string
Logging event stored in metadata log after first access
Case sensitive
- model: str | None = None¶
device model used to autodetect resource string
Logging event stored in metadata log after first access
Case sensitive
- open()¶
opens the instrument.
When managing device connection through a with context, this is called automatically and does not need to be invoked.
- open_timeout: float | None = None¶
timeout for opening a connection to the instrument (s)
Logging events are triggered on each access, and are stored as a key or column
- overlap_and_block(timeout: float = None, quiet: bool = False, query_func: callable = None)¶
context manager that sends ‘*OPC’ on entry, and performs a blocking ‘*OPC?’ query on exit.
By convention, these SCPI commands give a hint to the instrument that commands sent inside this block may be executed concurrently. The context exit then blocks until all of the commands have completed.
Example:
with inst.overlap_and_block(): inst.write('long running command 1') inst.write('long running command 2')
- query(msg: str, timeout=None, remap: bool = False, kws: dict[str, Any] = {}) str ¶
queries the device with an SCPI message and returns its reply.
Handles debug logging and adjustments when in overlap_and_block contexts as appropriate.
- Parameters:
msg – the SCPI message to send
- query_ascii_values(msg: str, converter='f', separator=', ', container=<class 'list'>, delay=None, timeout=None)¶
- read_termination: str = '\n'¶
end-of-line string to delineate the end of ascii query replies
Logging event stored in metadata log after first access
Case sensitive
- resource: str | None = None¶
VISA resource addressing string for device connection
Logging event stored in metadata log after first access
Case sensitive
- property serial: str¶
device-reported serial number
Cannot be set after device creation
Logging event stored in metadata log after first access
Case sensitive
- property status_byte: dict¶
instrument status decoded from ‘*STB?’
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- class suppress_timeout(*exceptions)¶
Bases:
suppress
context manager that suppresses timeout exceptions on write or query.
Example:
with inst.suppress_timeout(): inst.write('long command 1') inst.write('long command 2') If the command 1 raises an exception, command 2 will not execute the context block is complete, and the exception from command 1 is swallowed.
- timeout: float | None = None¶
message response timeout (s)
Logging event stored in metadata log after first access
- write(msg: str, kws: dict[str, Any] = {})¶
sends an SCPI message to the device.
Wraps self.backend.write, and handles debug logging and adjustments when in overlap_and_block contexts as appropriate.
- Parameters:
msg – the SCPI command to send
- Returns:
None
- write_termination: str = '\n'¶
end-of-line string to send after writes
Logging event stored in metadata log after first access
Case sensitive
- class labbench.Win32ComDevice¶
Bases:
Device
Basic support for calling win32 COM APIs.
a dedicated background thread. Set concurrency=True to decide whether this thread support wrapper is applied to the dispatched Win32Com object.
- com_object: str = ''¶
the win32com object string
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
Case sensitive
- concurrency: bool = True¶
if False, enforces locking for single-threaded access
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- property isopen: bool¶
True if the backend is ready for use
Cannot be set after device creation
Logging events are triggered on each access, and are stored as a key or column
- open()¶
Connect to the win32 com object
- labbench.concurrently(*objs, **kws)¶
- If *objs are callable (like functions), call each of
*objs in concurrent threads. If *objs are context managers (such as Device instances to be connected), enter each context in concurrent threads.
Multiple references to the same function in objs only result in one call. The catch and nones arguments may be callables, in which case they are executed (and each flag value is treated as defaults).
- Parameters:
objs – each argument may be a callable (function or class that defines a __call__ method), or context manager (such as a Device instance)
catch – if False (the default), a ConcurrentException is raised if any of funcs raise an exception; otherwise, any remaining successful calls are returned as normal
nones – if not callable and evalues as True, includes entries for calls that return None (default is False)
flatten – if True, results of callables that returns a dictionary are merged into the return dictionary with update (instead of passed through as dictionaries)
traceback_delay – if False, immediately show traceback information on a thread exception; if True (the default), wait until all threads finish
operation – if ‘auto’ (default), try to determine automatically; otherwise, ‘context’ or ‘call’
- Returns:
the values returned by each call
- Return type:
dictionary keyed by function name
Here are some examples:
- Example:
Call each function myfunc1 and myfunc2, each with no arguments:
>>> def do_something_1 (): >>> time.sleep(0.5) >>> return 1 >>> def do_something_2 (): >>> time.sleep(1) >>> return 2 >>> rets = concurrent(myfunc1, myfunc2) >>> rets[do_something_1]
- Example:
To pass arguments, use the Call wrapper
>>> def do_something_3 (a,b,c): >>> time.sleep(2) >>> return a,b,c >>> rets = concurrent(myfunc1, Call(myfunc3, a, b, c=c)) >>> rets[do_something_3] a, b, c
Caveats
Because the calls are in different threads, not different processes, this should be used for IO-bound functions (not CPU-intensive functions).
Be careful about thread-safety.
- labbench.dump_rack(rack: Rack, output_path: Path, sourcepath: Path, pythonpath: Path | None = None, exist_ok: bool = False, with_defaults: bool = False, skip_tables: bool = False)¶
- labbench.find_owned_rack_by_type(parent_rack: Rack, target_type: Rack, include_parent: bool = True)¶
return a rack instance of target_type owned by parent_rack. if there is not exactly 1 for target_type, TypeError is raised.
- labbench.import_as_rack(import_string: str, *, cls_name: str | None = None, append_path: list = [], base_cls: type = <class 'labbench.Rack'>, replace_attrs: list = ['__doc__', '__module__'])¶
Creates a Rack subclass with the specified module’s contents. Ownable objects are annotated by type, allowing the resulting class to be instantiated.
- Parameters:
import_string – for the module that contains the Rack to import
cls_name – the name of the Rack subclass to import from the module (or None to build a new subclass with the module contents)
base_cls – the base class to use for the new subclass
append_path – list of paths to append to sys.path before import
replace_attrs – attributes of base_cls to replace from the module
- Exceptions:
NameError: if there is an attribute name conflict between the module and base_cls
- Returns:
A dynamically created subclass of base_cls
- labbench.load_rack(output_path: str, defaults: dict = {}, apply: bool = True) Rack ¶
instantiates a Rack object from a config directory created by dump_rack.
After instantiation, the current working directory is changed to output_path.
- class labbench.rack_input_table(table_path: str)¶
Bases:
MethodTaggerDataclass
tag a method defined in a Rack to support execution from a flat table.
In practice, this often means a very long argument list.
- Parameters:
table_path – location of the input table
- table_path: str¶
- class labbench.rack_kwargs_skip(*arg_names)¶
Bases:
MethodTaggerDataclass
tag a method defined in a Rack to replace a **kwargs argument using the signature of the specified callable.
In practice, this often means a very long argument list.
- Parameters:
callable_template – replace variable keyword arguments (**kwargs) with the keyword arguments defined in this callable
skip – list of column names to omit
- skip: list¶
- class labbench.rack_kwargs_template(template: callable | None = None)¶
Bases:
MethodTaggerDataclass
tag a method defined in a Rack to replace a **kwargs argument using the signature of the specified callable.
In practice, this often means a very long argument list.
- Parameters:
callable_template – replace variable keyword arguments (**kwargs) with the keyword arguments defined in this callable
skip – list of column names to omit
- template: callable¶
- labbench.read(path_or_buf: str, columns: list[str] | None = None, nrows: int | None = None, format: str = 'auto', **kws)¶
Read tabular data from a file in one of various formats using pandas.
- Parameters:
path – path to the data file.
columns – a column or iterable of multiple columns to return from the data file, or None (the default) to return all columns
nrows – number of rows to read at the beginning of the table, or None (the default) to read all rows
format – data file format, one of [‘pickle’,’feather’,’csv’,’json’,’csv’], or ‘auto’ (the default) to guess from the file extension
kws – additional keyword arguments to pass to the pandas read_<ext> function matching the file extension
- Returns:
pandas.DataFrame instance containing data read from file
- labbench.read_relational(path: str | Path, expand_col: str, root_cols: list[str] | None = None, target_cols: list[str] | None = None, root_nrows: int | None = None, root_format: str = 'auto', prepend_column_name: bool = True) DataFrame ¶
Flatten a relational database table by loading the table located each row of root[expand_col]. The value of each column in this row is copied to the loaded table. The columns in the resulting table generated on each row are downselected according to root_cols and target_cols. Each of the resulting tables is concatenated and returned.
The expanded dataframe may be very large, making downselecting a practical necessity in some scenarios.
- Parameters:
path – file location of the root data table
expand_col – name of the columns of the root data file to expand with relational data
root_cols – the root columns to include in the expanded dataframe, or None (the default) pass all columns from root
target_cols – the root columns to include in the expanded dataframe, or None (the default) to pass all columns loaded from each root[expand_col]
prepend_column_name – whether to prepend the name of the expanded column from the root table
- Returns:
the expanded dataframe
- labbench.retry(exception_or_exceptions: Union[BaseException, typing.Iterable[BaseException]], tries: int = 4, *, delay: float = 0, backoff: float = 0, exception_func=<function <lambda>>, log: bool = True) callable[_Tfunc, _Tfunc] ¶
calls to the decorated function are repeated, suppressing specified exception(s), until a maximum number of retries has been attempted.
If the function raises the exception the specified number of times, the underlying exception is raised. Otherwise, return the result of the function call.
Example
The following retries the telnet connection 5 times on ConnectionRefusedError:
import telnetlib # Retry a telnet connection 5 times if the telnet library raises ConnectionRefusedError @retry(ConnectionRefusedError, tries=5) def open(host, port): t = telnetlib.Telnet() t.open(host, port, 5) return t
Inspired by https://github.com/saltycrane/retry-decorator which is released under the BSD license.
- Parameters:
exception_or_exceptions – Exception (sub)class (or tuple of exception classes) to watch for
tries – number of times to try before giving up
delay – initial delay between retries in seconds
backoff – backoff to multiply to the delay for each retry
exception_func – function to call on exception before the next retry
log – whether to emit a log message on the first retry
- labbench.sequentially(*objs, **kws)¶
- If *objs are callable (like functions), call each of
*objs in the given order. If *objs are context managers (such as Device instances to be connected), enter each context in the given order, and return a context manager suited for a with statement. This is the sequential implementation of the concurrently function, with a compatible convention of returning dictionaries.
Multiple references to the same function in objs only result in one call. The nones argument may be callables in case they are executed (and each flag value is treated as defaults).
- Parameters:
objs – callables or context managers or Device instances for connections
kws – dictionary of additional callables or Device instances for connections
nones – True to include dictionary entries for calls that return None (default: False)
flatten – True to flatten any dict return values into the return dictionary
- Returns:
a dictionary keyed on the object name containing the return value of each function
- Return type:
dictionary of keyed by function
Here are some examples:
- Example:
Call each function myfunc1 and myfunc2, each with no arguments:
>>> def do_something_1 (): >>> time.sleep(0.5) >>> return 1 >>> def do_something_2 (): >>> time.sleep(1) >>> return 2 >>> rets = concurrent(myfunc1, myfunc2) >>> rets[do_something_1] 1
- Example:
To pass arguments, use the Call wrapper
>>> def do_something_3 (a,b,c): >>> time.sleep(2) >>> return a,b,c >>> rets = concurrent(myfunc1, Call(myfunc3, a, b, c=c)) >>> rets[do_something_3] a, b, c
Caveats
Unlike concurrently, an exception in a context manager’s __enter__ means that any remaining context managers will not be entered.
- labbench.shell_options_from_keyed_values(device: ~labbench.Device, skip_none=True, hide_false: bool = False, join_str: typing_extensions.Literal[False] | str = False, remap: dict = {}, converter: callable = <class 'str'>) list[str] ¶
generate a list of command line argument strings based on :module:`labbench.paramattr.value` descriptors in device.
Each of these descriptors defined with key may be treated as a command line option. Value descriptors are ignored when key is unset. The value for each option is determined by fetching the corresponding attribute from device.
The returned list of strings can be used to build the argv needed to run shell commands using
ShellBackend
or the subprocess module.- Parameters:
device – the device containing values to broadcast into command-line arguments
skip_none – if True, no command-line argument string is generated for values that are unset in devices
hide_false – if True, boolean options are treated as a flag (e.g., this triggers argument strings are omitted for False values)
join_str – a string to use to join option (name, value) pairs, or False to generate as separate strings
remap – a dictionary of {python_value: string_value} pairs to accommodate special cases in string conversion
converter – function to use to convert the values to strings
Example
Simple boolean options and flags:
>>> import labbench as lb >>> class ShellCopy(ShellBackend): ... recursive: bool = attr.value.bool(False, key='-R') >>> cp = ShellCopy(recursive=True) >>> print(shell_options_from_keyed_values(cp, hide_false=True)) ['-R'] >>> print(shell_options_from_keyed_values(cp, remap={True: 'yes', False: 'no'})) ['-R', 'yes']
Example
A non-boolean option:
>>> class DiskDuplicate(ShellBackend): ... block_size: str = attr.value.str('1M', key='bs') >>> dd = DiskDuplicate() >>> dd.block_size = '1024k' >>> print(shell_options_from_keyed_values(dd, join_str='=')) ['bs','1024k'] >>> print(shell_options_from_keyed_values(dd, join_str='=')) ['bs=1024k']
- labbench.show_messages(minimum_level: typing_extensions.Literal[debug] | typing_extensions.Literal[warning] | typing_extensions.Literal[error] | typing_extensions.Literal[info] | typing_extensions.Literal[critical] | typing_extensions.Literal[False] | None, colors: bool = True)¶
filters logging messages displayed to the console by importance
- Parameters:
minimum_level – ‘debug’, ‘warning’, ‘error’, or None (to disable all output)
- Returns:
None
- labbench.sleep(seconds: float, tick=1.0)¶
Drop-in replacement for time.sleep that raises ConcurrentException if another thread requests that all threads stop.
- labbench.stopwatch(desc: str = '', threshold: float = 0, logger_level: typing_extensions.Literal[debug] | typing_extensions.Literal[warning] | typing_extensions.Literal[error] | typing_extensions.Literal[info] | typing_extensions.Literal[critical] = 'info')¶
Time a block of code using a with statement like this:
>>> with stopwatch('sleep statement'): >>> time.sleep(2) sleep statement time elapsed 1.999s.
- Parameters:
desc – text for display that describes the event being timed
threshold – only show timing if at least this much time (in s) elapsed
: :returns: context manager
- labbench.timeout_iter(duration)¶
sets a timer for duration seconds, yields time elapsed as long as timeout has not been reached
- labbench.until_timeout(exception_or_exceptions: Union[BaseException, typing.Iterable[BaseException]], timeout: float, delay: float = 0, backoff: float = 0, exception_func: callable = <function <lambda>>) callable[_Tfunc, _Tfunc] ¶
calls to the decorated function are repeated, suppressing specified exception(s), until the specified timeout period has expired.
If the timeout expires, the underlying exception is raised.
Otherwise, return the result of the function call.
Inspired by https://github.com/saltycrane/retry-decorator which is released under the BSD license.
Example
The following retries the telnet connection for 5 seconds on ConnectionRefusedError:
import telnetlib @until_timeout(ConnectionRefusedError, 5) def open(host, port): t = telnetlib.Telnet() t.open(host, port, 5) return t
- Parameters:
exception_or_exceptions – Exception (sub)class (or tuple of exception classes) to watch for
timeout – time in seconds to continue calling the decorated function while suppressing exception_or_exceptions
delay – initial delay between retries in seconds
backoff – backoff to multiply to the delay for each retry
exception_func – function to call on exception before the next retry
- labbench.visa_default_resource_manager(name: str | None = None)¶
set the pyvisa resource manager used by labbench.
- Parameters:
name – the name of the resource manager, such as ‘@py’, ‘@sim-labbench’, or ‘@ivi’
- labbench.visa_list_resources(resourcemanager: str | None = None) list[str] ¶
autodetects and returns a list of valid VISADevice resource strings
- labbench.visa_probe_devices(target: VISADevice | None = None, skip_interfaces: list[str] = [], open_timeout: float = 0.5, timeout: float = 0.25) list[VISADevice] ¶
discover devices available for communication and their required connection settings.
Each returned VISADevice is and set with a combination of resource, read_termination, and write_termination that establishes communication, and the make and model reported by the instrument. The pyvisa resource manager is set by the most recent call to visa_default_resource_manager.
The probe mechanism is to open a connection to each available resource, and if successful, query the instrument identity (’*IDN?’). Discovery will fail for devices that do not support this message.
- Parameters:
target – if specified, return only devices that match target.make and target.model
skip_interfaces – do not probe interfaces that begin with these strings (case-insensitive)
open_timeout – timeout on resource open (in s)
timeout – timeout on identity query (in s)