"""
Extract metadata from various electron microscopy file types.
Extractors should return a dictionary containing the values to be displayed
in NexusLIMS as a sub-dictionary under the key ``nx_meta``. The remaining keys
will be for the metadata as extracted. Under ``nx_meta``, a few keys are
expected (although not enforced):
* ``'Creation Time'`` - ISO format date and time as a string
* ``'Data Type'`` - a human-readable description of the data type separated by
underscores - e.g "STEM_Imaging", "TEM_EDS", etc.
* ``'DatasetType'`` - determines the value of the Type attribute for the dataset
(defined in the schema)
* ``'Data Dimensions'`` - dimensions of the dataset, surrounded by parentheses,
separated by commas as a string- e.g. '(12, 1024, 1024)'
* ``'Instrument ID'`` - instrument PID pulled from the instrument database
"""
import base64
import inspect
import json
import logging
import shutil
from collections import abc
from datetime import datetime as dt
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import hyperspy.api_nogui as hs
import numpy as np
from nexusLIMS.instruments import get_instr_from_filepath
from nexusLIMS.utils import current_system_tz, replace_mmf_path
from nexusLIMS.version import __version__
from .basic_metadata import get_basic_metadata
from .digital_micrograph import get_dm3_metadata
from .edax import get_msa_metadata, get_spc_metadata
from .fei_emi import get_ser_metadata
from .quanta_tif import get_quanta_metadata
from .thumbnail_generator import (
down_sample_image,
image_to_square_thumbnail,
sig_to_thumbnail,
text_to_thumbnail,
)
logger = logging.getLogger(__name__)
PLACEHOLDER_PREVIEW = Path(__file__).parent / "extractor_error.png"
extension_reader_map = {
"dm3": get_dm3_metadata,
"dm4": get_dm3_metadata,
"tif": get_quanta_metadata,
"ser": get_ser_metadata,
"spc": get_spc_metadata,
"msa": get_msa_metadata,
}
# filetypes that will only have basic metadata extracted but will nonetheless
# have a custom preview image generated
unextracted_preview_map = {
"txt": text_to_thumbnail,
"png": image_to_square_thumbnail,
"tiff": image_to_square_thumbnail,
"bmp": image_to_square_thumbnail,
"gif": image_to_square_thumbnail,
"jpg": image_to_square_thumbnail,
"jpeg": image_to_square_thumbnail,
}
def _add_extraction_details(
nx_meta: Dict,
extractor_module: Callable,
) -> Dict[str, str]:
"""
Add extraction details to the NexusLIMS metadata.
Adds metadata about the extraction process, given an extractor module
to the ``nx_meta`` metadata dictionary under the ``'NexusLIMS Extraction'``
sub-key. The ``'Extractor Module'`` metadata key will contain the fully
qualified path of a given extractor, e.g.
``nexusLIMS.extractors.basic_metadata``.
Note
----
If the ``'NexusLIMS Extraction'`` key already exists in the ``nx_meta``
metadata dictionary, this method *will* overwrite its value.
Parameters
----------
nx_meta
The metadata dictionary as returend by :py:meth:`parse_metadata`
extractor_module
The (callable) module for a specific metadata extractor from the
:py:mod:`~nexusLIMS.extractors` module.
Returns
-------
dict
An updated ``nx_meta`` dictionary, containing extraction details
"""
nx_meta["nx_meta"]["NexusLIMS Extraction"] = {
"Date": dt.now(tz=current_system_tz()).isoformat(),
"Module": inspect.getmodule(extractor_module).__name__,
"Version": __version__,
}
return nx_meta
[docs]def create_preview(fname: Path, *, overwrite: bool) -> Optional[Path]: # noqa: PLR0912
"""
Generate a preview image for a given file using one of a few different methods.
For most files, this method will try to load the file using HyperSpy and generate
a preview using that library's capabilities.
Parameters
----------
fname
The filename from which to read data
overwrite
Whether to overwrite the .json metadata file and thumbnail
image if either exists
Returns
-------
preview_fname : Optional[pathlib.Path]
The filename of the generated preview image; if None, a preview could not be
successfully generated.
"""
preview_fname = replace_mmf_path(fname, ".thumb.png")
extension = fname.suffix[1:]
if extension == "tif":
instr = get_instr_from_filepath(fname)
instr_name = instr.name if instr is not None else None
if instr_name == "FEI-Quanta200-ESEM-633137_n":
# we know the output size we want for the Quanta
output_size = (512, 471)
down_sample_image(fname, out_path=preview_fname, output_size=output_size)
else:
factor = 2
down_sample_image(fname, out_path=preview_fname, factor=factor)
elif extension in unextracted_preview_map:
# use preview generation function from the map of functions defined
# at the top of this file (unextracted_preview_map)
preview_return = unextracted_preview_map[extension](
f=fname,
out_path=preview_fname,
output_size=500,
)
# handle the case where PIL cannot open an image
if preview_return is False:
preview_fname = None
return preview_fname
else:
load_options = {"lazy": True}
if extension == "ser":
load_options["only_valid_data"] = True
# noinspection PyBroadException
try:
s = hs.load(fname, **load_options)
except Exception: # pylint: disable=broad-exception-caught
logger.warning(
"Signal could not be loaded by HyperSpy. "
"Using placeholder image for preview.",
)
preview_fname = replace_mmf_path(fname, ".thumb.png")
shutil.copyfile(PLACEHOLDER_PREVIEW, preview_fname)
return preview_fname
# If s is a list of signals, use just the first one for
# our purposes
if isinstance(s, list):
num_sigs = len(s)
fname = s[0].metadata.General.original_filename
s = s[0]
s.metadata.General.title = (
s.metadata.General.title
+ f' (1 of {num_sigs} total signals in file "{fname}")'
)
elif not s.metadata.General.title:
s.metadata.General.title = s.metadata.General.original_filename.replace(
extension,
"",
).strip(".")
# only generate the preview if it doesn't exist, or overwrite
# parameter is explicitly provided
if not preview_fname.is_file() or overwrite:
logger.info("Generating preview: %s", preview_fname)
# Create the directory for the thumbnail, if needed
preview_fname.parent.mkdir(
parents=True,
exist_ok=True,
)
# Generate the thumbnail
s.compute(show_progressbar=False)
sig_to_thumbnail(s, out_path=preview_fname)
else:
logger.info("Preview already exists: %s", preview_fname)
return preview_fname
[docs]def flatten_dict(_dict, parent_key="", separator=" "):
"""
Flatten a nested dictionary into a single level.
Utility method to take a nested dictionary structure and flatten it into a
single level, separating the levels by a string as specified by
``separator``.
Cribbed from: https://stackoverflow.com/a/6027615/1435788
Parameters
----------
_dict : dict
The dictionary to flatten
parent_key : str
The "root" key to add to the existing keys
separator : str
The string to use to separate values in the flattened keys (i.e.
{'a': {'b': 'c'}} would become {'a' + sep + 'b': 'c'})
Returns
-------
flattened_dict : str
The dictionary with depth one, with nested dictionaries flattened
into root-level keys
"""
items = []
for k, v in _dict.items():
new_key = parent_key + separator + k if parent_key else k
if isinstance(v, abc.MutableMapping):
items.extend(flatten_dict(v, new_key, separator=separator).items())
else:
items.append((new_key, v))
return dict(items)
class _CustomEncoder(json.JSONEncoder):
"""
Allow non-serializable types to be written in a JSON format.
A custom JSON Encoder class that will allow certain types to be serialized that are
not able to be by default (taken from https://stackoverflow.com/a/27050186).
"""
def default(self, o):
if isinstance(o, np.integer):
return int(o)
if isinstance(o, np.floating):
return float(o)
if isinstance(o, np.ndarray):
return o.tolist()
if isinstance(o, np.bytes_):
return o.decode()
if isinstance(o, np.void):
# np.void array may contain arbitary binary, so base64 encode it
return base64.b64encode(o.tolist()).decode("utf-8")
return super().default(o)