Source code for AFL.automation.instrument.OpticalTurbidity

import pathlib,glob,os,datetime
import matplotlib.pyplot as plt
import numpy as np
import lazy_loader as lazy
cv2 = lazy.load("cv2", require="AFL-automation[vision]")
from PIL import Image
from skimage import data, color
from skimage.transform import hough_circle, hough_circle_peaks,hough_ellipse
from skimage.feature import canny
from skimage.draw import circle_perimeter
from skimage.util import img_as_ubyte
from skimage.color import rgb2gray
from AFL.automation.APIServer.Driver import Driver
import time


[docs] class OpticalTurbidity(Driver): defaults = {} defaults['hough_radii'] = 98 defaults['row_crop'] = [0,479] defaults['col_crop'] = [0,479] defaults['camera_id'] = 5 #0 is the opentrons camera defaults['save_path'] = '/home/afl642/2305_SINQ_TurbidityImages/'
[docs] def __init__(self,camera,overrides=None): ''' Initalize OPticalTurbidity calculator driver ''' self.camera = camera self.empty_img = None Driver.__init__(self,name='OpticalTurbidity',defaults=self.gather_defaults(),overrides=overrides)
[docs] def measure(self,set_empty=False, plotting=False,**kwargs): """ This is an optical turbidity measurement observing the sans cell ------------------------ empty_fn: (str) is the path to the empty SANS cell image filled_fn: (str) is the path to the filled SANS cell image """ row_crop = self.config['row_crop'] col_crop = self.config['col_crop'] hough_radii = self.config['hough_radii'] print('crops: ',row_crop,col_crop) print('hough_radii', hough_radii) name = '' if 'name' in kwargs.keys(): name = kwargs['name'] del kwargs['name'] print("attempting to collect camera image") #loaded SANS cell image self.camera.camera_reset() time.sleep(0.2) collected, img = self.camera.collect(**kwargs) print(collected, img) if collected: print('collected image') measurement_img = img_as_ubyte(rgb2gray(img)) else: self.camera.camera_reset() print("trying to reset camera connection") collected, img = self.camera.collect(**kwargs) if collected: measurement_img = img_as_ubyte(rgb2gray(img)) print('success on retry') else: print('failed to recollect') #measurement_img = img_as_ubyte(rgb2gray(np.array(Image.open(filled_fn))))#[:,:650] if set_empty: self.empty_img = measurement_img print('setting empty image', measurement_img) return 0,[0,0] else: print('measuring turbidity') measurement_img = measurement_img[row_crop[0]:row_crop[1], col_crop[0]:col_crop[1]] print('measurement image: ', measurement_img.shape, type(measurement_img)) #empty SANS cell image for masking if self.empty_img is not None: empty_img = self.empty_img[row_crop[0]:row_crop[1], col_crop[0]:col_crop[1]]#[:,:650] else: raise ValueError('need to set empty before measuring') #Place to fix edge detection edges = canny(empty_img, sigma=2, low_threshold=10, high_threshold=50) # Detect radii hough_radii = [hough_radii] hough_res = hough_circle(edges, hough_radii) # Select the most prominent circle accums, cx, cy, radii = hough_circle_peaks(hough_res, hough_radii, total_num_peaks=1) #mask grid y = np.arange(empty_img.shape[0]) x = np.arange(empty_img.shape[1]) X,Y = np.meshgrid(x,y) XX = X-cx YY = Y-cy R = np.sqrt(XX*XX+YY*YY) mask = R<radii # print(mask.shape, np.min(mask),np.max(mask)) # print(empty_img.shape, np.min(empty_img),np.max(empty_img)) # plt.imshow(image) # plt.imshow(np.where(mask,0.0,np.nan)) # # Draw them # fig, ax = plt.subplots(ncols=1, nrows=1, figsize=(10, 4)) # image = color.gray2rgb(image) # for center_y, center_x, radius in zip(cy, cx, radii): # circy, circx = circle_perimeter(center_y, center_x, radius, # shape=image.shape) # image[circy, circx] = (220, 20, 20) empty_intensity = empty_img[mask] filled_intensity = measurement_img[mask] # print(empty_intensity.shape) # print(filled_intensity.shape) # print(type(empty_intensity)) # print(type(filled_intensity)) # print(np.sum(empty_intensity<0)) # print(np.sum(filled_intensity<0)) # print(np.min(empty_intensity)) # print(np.max(empty_intensity)) pedestal = np.abs(np.min(empty_intensity))+1 # print(pedestal) # print(np.min(empty_intensity)+pedestal) #normalize the filled cell intensity by the empty cell intensity norm_intensity = (np.array(filled_intensity, dtype=float)+pedestal)/(np.array(empty_intensity,dtype=float)+pedestal) norm_intensity = np.nan_to_num(norm_intensity,nan=1) turbidity_metric = np.average(norm_intensity) if type(turbidity_metric) == np.ndarray: turbidity_metric = turbidity_metric.tolist() if type(cx) == np.ndarray: cx = cx.tolist() if type(cy) == np.ndarray: cy = cy.tolist() #plotting if plotting: # cv2.imwrite( str(pathlib.Path(self.config['save_path'])/f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity0.png'),empty_img) # cv2.imwrite(str(pathlib.Path(self.config['save_path'])/f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity1.png'),measurement_img) fig,ax = plt.subplots(1,2) ax[0].imshow(empty_img) ax[0].imshow(np.where(mask,0.0,np.nan)) ax[1].imshow(measurement_img) ax[1].imshow(np.where(mask,0.0,np.nan)) fig.suptitle(f'Turbidity metric {np.round(turbidity_metric,2)}') plt.savefig(pathlib.Path(self.config['save_path'])/f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity0.png') plt.close(fig) fig,ax = plt.subplots(1,2) ax[0].imshow(empty_img) ax[0].imshow(np.where(~mask,0.0,np.nan)) ax[1].imshow(measurement_img) ax[1].imshow(np.where(~mask,0.0,np.nan)) fig.suptitle(f'Turbidity metric {np.round(turbidity_metric,2)}') plt.savefig(pathlib.Path(self.config['save_path'])/f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}-turbidity1.png') # plt.show() plt.close(fig) if self.data is not None: self.data['located_center'] = [cx,cy] self.data['name'] = name self.data.add_array('turbidity',[turbidity_metric]) #returns the turbidity value in normalized units, the center of the circular mask and the radius as a list return turbidity_metric, [cx, cy]
_DEFAULT_CUSTOM_CONFIG = { '_classname': 'AFL.automation.instrument.OpticalTurbidity.OpticalTurbidity', '_args': [ {'_classname': 'AFL.automation.instrument.NetworkCamera.NetworkCamera', '_args' : [ 'http://afl-video:8081/103/current' ] } ] } _DEFAULT_CUSTOM_PORT = 5001 if __name__ == '__main__': from AFL.automation.shared.launcher import *