Source code for AFL.automation.instrument.OpticalTurbidity

import copy
import datetime
import pathlib
import time
import warnings

import lazy_loader as lazy
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from skimage.color import rgb2gray
from skimage.feature import canny
from skimage.transform import hough_circle, hough_circle_peaks
from skimage.util import img_as_ubyte

from AFL.automation.APIServer.Driver import Driver

try:
    from tiled.queries import Eq
except ImportError:
    Eq = None
    warnings.warn("Cannot import from tiled...empty UUID lookup will not work", stacklevel=2)


[docs] class OpticalTurbidity(Driver): defaults = {} defaults["hough_radii"] = 98 defaults["row_crop"] = [0, 479] defaults["col_crop"] = [0, 479] defaults["save_path"] = "/home/afl642/2305_SINQ_TurbidityImages/" defaults["camera_interface"] = "http" defaults["camera_url"] = "http://afl-video:8081/103/current" defaults["camera_index"] = 0 defaults["empty_uuid"] = ""
[docs] def __init__(self, camera=None, overrides=None): """ Initialize OpticalTurbidity calculator driver. Parameters ---------- camera : object, optional Camera object (for example, a NetworkCamera instance). If None, one will be created when using the HTTP interface. overrides : dict, optional Configuration overrides for PersistentConfig. """ self.camera = camera self.empty_img = None self._opencv_capture = None Driver.__init__( self, name="OpticalTurbidity", defaults=self.gather_defaults(), overrides=overrides, ) if self.camera is None and self.config["camera_interface"] == "http": from AFL.automation.instrument.NetworkCamera import NetworkCamera self.camera = NetworkCamera(self.config["camera_url"])
def _collect_image(self, **kwargs): """ Collect an image based on the configured camera interface. Returns ------- tuple `(collected, img)` where `collected` indicates success. """ interface = self.config["camera_interface"] if interface == "http": if self.camera is None: from AFL.automation.instrument.NetworkCamera import NetworkCamera if "camera_url" not in self.config: raise ValueError("camera_url must be set in config when camera_interface='http'") self.camera = NetworkCamera(self.config["camera_url"]) return self.camera.collect(**kwargs) if interface == "opencv": try: cv2_module = lazy.load("cv2", require="AFL-automation[vision]") except Exception as exc: raise ImportError( "opencv-python is required for camera_interface='opencv'. " f"Install with: pip install AFL-automation[vision]. Error: {exc}" ) if "camera_index" not in self.config: raise ValueError("camera_index must be set in config when camera_interface='opencv'") camera_index = self.config["camera_index"] if self._opencv_capture is None: self._opencv_capture = cv2_module.VideoCapture(camera_index) return self._opencv_capture.read() raise ValueError( f"Unsupported camera_interface: '{interface}'. " "Supported values are: 'http', 'opencv'" ) def _reset_camera(self): """Reset the configured camera connection.""" interface = self.config["camera_interface"] if interface == "http": if self.camera is not None: self.camera.camera_reset() return if interface == "opencv": if self._opencv_capture is not None: self._opencv_capture.release() try: cv2_module = lazy.load("cv2", require="AFL-automation[vision]") except Exception as exc: raise ImportError( "opencv-python is required for camera_interface='opencv'. " f"Install with: pip install AFL-automation[vision]. Error: {exc}" ) camera_index = self.config.get("camera_index", 0) self._opencv_capture = cv2_module.VideoCapture(camera_index) def _build_dataset( self, *, name, turbidity_metric, measurement_img, empty_img, mask, cx, cy, empty_from_measurement=False, is_empty_reference=False, ): ds = xr.Dataset() ds.attrs["located_center"] = [cx, cy] ds.attrs["name"] = name ds.attrs["turbidity_metric"] = turbidity_metric ds.attrs["empty_uuid"] = self.config.get("empty_uuid", "") ds.attrs["empty_available"] = not empty_from_measurement ds.attrs["is_empty_reference"] = is_empty_reference ds["turbidity"] = turbidity_metric ds["img"] = (("px", "py"), measurement_img) ds["img_MT"] = (("px", "py"), empty_img) ds["mask"] = (("px", "py"), mask) return ds
[docs] def measure(self, set_empty=False, plotting=False, **kwargs): """ This is an optical turbidity measurement observing the SANS cell. Parameters ---------- set_empty : bool, optional If True, set the current image as the empty reference image. plotting : bool, optional If True, save diagnostic plots. **kwargs : dict Additional arguments passed to image collection. Returns ------- xarray.Dataset Dataset with turbidity measurements and images. When `set_empty=True`, the dataset contains the captured empty image. """ row_crop = self.config["row_crop"] col_crop = self.config["col_crop"] hough_radii = self.config["hough_radii"] print("crops: ", row_crop, col_crop) print("hough_radii", hough_radii) name = kwargs.pop("name", "") print("attempting to collect camera image") self._reset_camera() time.sleep(0.2) collected, img = self._collect_image(**kwargs) print(collected, img) if collected: print("collected image") measurement_img = img_as_ubyte(rgb2gray(img)) else: self._reset_camera() print("trying to reset camera connection") collected, img = self._collect_image(**kwargs) if collected: measurement_img = img_as_ubyte(rgb2gray(img)) print("success on retry") else: raise RuntimeError( "Failed to collect camera image after two attempts. " "Check that the camera is connected and the " f"camera_interface ('{self.config['camera_interface']}') " "settings are correct." ) if set_empty: self.empty_img = measurement_img print("setting empty image", measurement_img) if self.data is not None and "sample_uuid" in self.data: self.config["empty_uuid"] = copy.deepcopy(self.data["sample_uuid"]) return self._build_dataset( name=name, turbidity_metric=1.0, measurement_img=measurement_img, empty_img=measurement_img, mask=np.ones_like(measurement_img, dtype=bool), cx=0, cy=0, empty_from_measurement=False, is_empty_reference=True, ) print("measuring turbidity") measurement_img = measurement_img[row_crop[0] : row_crop[1], col_crop[0] : col_crop[1]] print("measurement image: ", measurement_img.shape, type(measurement_img)) empty_img = None empty_from_measurement = False if self.empty_img is not None: empty_img = self.empty_img[row_crop[0] : row_crop[1], col_crop[0] : col_crop[1]] elif self.config.get("empty_uuid"): if Eq is None: self.log_warning("Cannot load empty image without tiled. Using measurement image for mask.") elif self.data is None or not hasattr(self.data, "tiled_client") or self.data.tiled_client is None: self.log_warning("No tiled client available. Using measurement image for mask.") else: tiled_result = self.data.tiled_client.search(Eq("sample_uuid", self.config["empty_uuid"])) result_items = list(tiled_result.items()) if not result_items: self.log_warning( f"No tiled entry found for empty_uuid={self.config['empty_uuid']}. " "Using measurement image for mask." ) else: item = result_items[-1][-1] try: ds = item.read(optimize_wide_table=False) except TypeError: ds = item.read() if "img_MT" in ds: empty_img = ds["img_MT"].values elif "img" in ds: empty_img = ds["img"].values if empty_img is not None: empty_img = empty_img[row_crop[0] : row_crop[1], col_crop[0] : col_crop[1]] if empty_img is None: self.log_warning("No empty image available. Using measurement image for mask and normalization.") empty_img = measurement_img empty_from_measurement = True edges = canny(empty_img, sigma=2, low_threshold=10, high_threshold=50) hough_radii = [hough_radii] hough_res = hough_circle(edges, hough_radii) _, cx, cy, radii = hough_circle_peaks(hough_res, hough_radii, total_num_peaks=1) y = np.arange(empty_img.shape[0]) x = np.arange(empty_img.shape[1]) X, Y = np.meshgrid(x, y) XX = X - cx YY = Y - cy R = np.sqrt(XX * XX + YY * YY) mask = R < radii empty_intensity = empty_img[mask] filled_intensity = measurement_img[mask] pedestal = np.abs(np.min(empty_intensity)) + 1 norm_intensity = (np.array(filled_intensity, dtype=float) + pedestal) / ( np.array(empty_intensity, dtype=float) + pedestal ) norm_intensity = np.nan_to_num(norm_intensity, nan=1) turbidity_metric = np.average(norm_intensity) if isinstance(turbidity_metric, np.ndarray): turbidity_metric = turbidity_metric.tolist() if isinstance(cx, np.ndarray): cx = cx.tolist() if isinstance(cy, np.ndarray): cy = cy.tolist() if plotting: fig, ax = plt.subplots(1, 2) ax[0].imshow(empty_img) ax[0].imshow(np.where(mask, 0.0, np.nan)) ax[1].imshow(measurement_img) ax[1].imshow(np.where(mask, 0.0, np.nan)) fig.suptitle(f"Turbidity metric {np.round(turbidity_metric, 2)}") plt.savefig( pathlib.Path(self.config["save_path"]) / f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity0.png' ) plt.close(fig) fig, ax = plt.subplots(1, 2) ax[0].imshow(empty_img) ax[0].imshow(np.where(~mask, 0.0, np.nan)) ax[1].imshow(measurement_img) ax[1].imshow(np.where(~mask, 0.0, np.nan)) fig.suptitle(f"Turbidity metric {np.round(turbidity_metric, 2)}") plt.savefig( pathlib.Path(self.config["save_path"]) / f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity1.png' ) plt.close(fig) return self._build_dataset( name=name, turbidity_metric=turbidity_metric, measurement_img=measurement_img, empty_img=empty_img, mask=mask, cx=cx, cy=cy, empty_from_measurement=empty_from_measurement, is_empty_reference=False, )
_DEFAULT_CUSTOM_CONFIG = { "_classname": "AFL.automation.instrument.OpticalTurbidity.OpticalTurbidity", "_args": [ { "_classname": "AFL.automation.instrument.NetworkCamera.NetworkCamera", "_args": ["http://afl-video:8081/103/current"], } ], } _DEFAULT_CUSTOM_PORT = 5001 if __name__ == "__main__": from AFL.automation.shared.launcher import *