# NIST Public License - 2019
#
# This software was developed by employees of the National Institute of
# Standards and Technology (NIST), an agency of the Federal Government
# and is being made available as a public service. Pursuant to title 17
# United States Code Section 105, works of NIST employees are not subject
# to copyright protection in the United States. This software may be
# subject to foreign copyright. Permission in the United States and in
# foreign countries, to the extent that NIST may hold copyright, to use,
# copy, modify, create derivative works, and distribute this software and
# its documentation without fee is hereby granted on a non-exclusive basis,
# provided that this notice and disclaimer of warranty appears in all copies.
#
# THE SOFTWARE IS PROVIDED 'AS IS' WITHOUT ANY WARRANTY OF ANY KIND,
# EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED
# TO, ANY WARRANTY THAT THE SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY
# IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
# AND FREEDOM FROM INFRINGEMENT, AND ANY WARRANTY THAT THE DOCUMENTATION
# WILL CONFORM TO THE SOFTWARE, OR ANY WARRANTY THAT THE SOFTWARE WILL BE
# ERROR FREE. IN NO EVENT SHALL NIST BE LIABLE FOR ANY DAMAGES, INCLUDING,
# BUT NOT LIMITED TO, DIRECT, INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES,
# ARISING OUT OF, RESULTING FROM, OR IN ANY WAY CONNECTED WITH THIS SOFTWARE,
# WHETHER OR NOT BASED UPON WARRANTY, CONTRACT, TORT, OR OTHERWISE, WHETHER
# OR NOT INJURY WAS SUSTAINED BY PERSONS OR PROPERTY OR OTHERWISE, AND
# WHETHER OR NOT LOSS WAS SUSTAINED FROM, OR AROSE OUT OF THE RESULTS OF,
# OR USE OF, THE SOFTWARE OR SERVICES PROVIDED HEREUNDER.
#
"""
Builds NexusLIMS records.
Attributes
----------
XSD_PATH
A string containing the path to the Nexus Experiment schema file,
which is used to validate XML records built by this module
"""
import argparse
import logging
import os
import shutil
import sys
from datetime import datetime as dt
from datetime import timedelta as td
from importlib import import_module, util
from io import BytesIO
from pathlib import Path
from timeit import default_timer
from typing import List, Optional
from uuid import uuid4
from lxml import etree
from nexusLIMS import version
from nexusLIMS.cdcs import upload_record_files
from nexusLIMS.db.session_handler import Session, db_query, get_sessions_to_build
from nexusLIMS.extractors import extension_reader_map as ext_map
from nexusLIMS.harvesters import nemo, sharepoint_calendar
from nexusLIMS.harvesters.nemo import utils as nemo_utils
from nexusLIMS.harvesters.reservation_event import ReservationEvent
from nexusLIMS.schemas import activity
from nexusLIMS.schemas.activity import AcquisitionActivity, cluster_filelist_mtimes
from nexusLIMS.utils import (
current_system_tz,
find_files_by_mtime,
gnu_find_files_by_mtime,
has_delay_passed,
)
logger = logging.getLogger(__name__)
XSD_PATH: str = Path(activity.__file__).parent / "nexus-experiment.xsd"
[docs]def build_record(
session: Session,
sample_id: Optional[str] = None,
*,
generate_previews: bool = True,
) -> str:
"""
Build a NexusLIMS XML record of an Experiment.
Construct an XML document conforming to the NexusLIMS schema from a
directory containing microscopy data files. Accepts either a
:py:class:`~nexusLIMS.db.session_handler.Session` object or an Instrument
and date range (for backwards compatibility). For calendar parsing,
currently no logic is implemented for a query that returns multiple records.
Parameters
----------
session
A :py:class:`~nexusLIMS.db.session_handler.Session` or ``None``. If
a value is provided, ``instrument``, ``dt_from``, ``dt_to`` and ``user``
will be ignored, and the values from the Session object will be used
instead
sample_id
A unique identifier pointing to a sample identifier for data
collected in this record. If None, a UUIDv4 will be generated
generate_previews
Whether to create the preview thumbnail images
Returns
-------
xml_record : str
A formatted string containing a well-formed and valid XML document
for the data contained in the provided path
"""
if sample_id is None:
sample_id = str(uuid4())
# setup XML namespaces
nx_namespace = "https://data.nist.gov/od/dm/nexus/experiment/v1.0"
xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance"
ns_map = {None: nx_namespace, "xsi": xsi_namespace, "nx": nx_namespace}
xml = etree.Element("Experiment", nsmap=ns_map)
logger.info(
"Getting calendar events with instrument: %s, from %s to %s, "
"user: %s; using harvester: %s",
session.instrument.name,
session.dt_from.isoformat(),
session.dt_to.isoformat(),
session.user,
session.instrument.harvester,
)
# this returns a nexusLIMS.harvesters.reservation_event.ReservationEvent
res_event = get_reservation_event(session)
output = res_event.as_xml()
for child in output:
xml.append(child)
logger.info(
"Building acquisition activities for timespan from %s to %s",
session.dt_from.isoformat(),
session.dt_to.isoformat(),
)
activities = build_acq_activities(
session.instrument,
session.dt_from,
session.dt_to,
generate_previews,
)
for i, this_activity in enumerate(activities):
a_xml = this_activity.as_xml(i, sample_id)
xml.append(a_xml)
return etree.tostring(
xml,
xml_declaration=True,
encoding="UTF-8",
pretty_print=True,
).decode()
[docs]def get_reservation_event(session: Session) -> ReservationEvent:
"""
Get a ReservationEvent representation of a Session.
Handles the abstraction of choosing the right "version" of the
``res_event_from_session`` method from the harvester specified in the
instrument database. This allows for one consistent function name to call
a different method depending on which harvester is specified for each
instrument (currently just NEMO or Sharepoint).
Parameters
----------
session
The :py:class:`~nexusLIMS.db.session_handler.Session` for which to
fetch a matching
:py:class:`~nexusLIMS.harvesters.reservation_event.ReservationEvent` from
the relevant harvester
Returns
-------
res_event : ~nexusLIMS.harvesters.reservation_event.ReservationEvent
A :py:class:`~nexusLIMS.harvesters.reservation_event.ReservationEvent`
representation of a reservation that matches the instrument and timespan
specified in ``session``.
"""
# try to find module and raise error if not found:
if (
util.find_spec(f".{session.instrument.harvester}", "nexusLIMS.harvesters")
is None
):
msg = (
f"Harvester {session.instrument.harvester} not found in "
"nexusLIMS.harvesters"
)
raise NotImplementedError(msg)
# use import_module to choose the correct harvester based on the instrument
harvester = import_module(
f".{session.instrument.harvester}",
"nexusLIMS.harvesters",
)
# for PyCharm typing, explicitly specify what modules may be in `harvester`
# harvester: Union[nemo, sharepoint_calendar] # noqa: ERA001
# DONE: check if that method exists for the given harvester and raise
# NotImplementedError if not
if not hasattr(harvester, "res_event_from_session"):
msg = (
f"res_event_from_session has not been implemented for {harvester}, which "
f"is required to use this method."
)
raise NotImplementedError(msg)
return harvester.res_event_from_session(session)
[docs]def build_acq_activities(instrument, dt_from, dt_to, generate_previews):
"""
Build an XML string representation of each AcquisitionActivity for a session.
This includes setup parameters and metadata
associated with each dataset obtained during a microscopy session. Unique
AcquisitionActivities are delimited via clustering of file collection
time to detect "long" breaks during a session.
Parameters
----------
instrument : :py:class:`~nexusLIMS.instruments.Instrument`
One of the NexusLIMS instruments contained in the
:py:attr:`~nexusLIMS.instruments.instrument_db` database.
Controls what instrument calendar is used to get events.
dt_from : datetime.datetime
The starting timestamp that will be used to determine which files go
in this record
dt_to : datetime.datetime
The ending timestamp used to determine the last point in time for
which files should be associated with this record
generate_previews : bool
Whether or not to create the preview thumbnail images
Returns
-------
activities : :obj:`list` of
:obj:`~nexusLIMS.schemas.activity.AcquisitionActivity`:
The list of :py:class:`~nexusLIMS.schemas.activity.AcquisitionActivity`
objects generated for the record
"""
logging.getLogger("hyperspy.io_plugins.digital_micrograph").setLevel(
logging.WARNING,
)
start_timer = default_timer()
path = Path(os.environ["mmfnexus_path"]) / instrument.filestore_path
# find the files to be included (list of Paths)
files = get_files(path, dt_from, dt_to)
logger.info(
"Found %i files in %.2f seconds",
len(files),
default_timer() - start_timer,
)
# raise error if no file found were found
if len(files) == 0:
msg = "No files found in this time range"
raise FileNotFoundError(msg)
# get the timestamp boundaries of acquisition activities
aa_bounds = cluster_filelist_mtimes(files)
# add the last file's modification time to the boundaries list to make
# the loop below easier to process
aa_bounds.append(files[-1].stat().st_mtime)
activities: List[Optional[AcquisitionActivity]] = [None] * len(aa_bounds)
i = 0
aa_idx = 0
while i < len(files):
f = files[i]
mtime = f.stat().st_mtime
# check this file's mtime, if it is less than this iteration's value
# in the AA bounds, then it belongs to this iteration's AA
# if not, then we should move to the next activity
if mtime <= aa_bounds[aa_idx]:
# if current activity index is None, we need to start a new AA:
if activities[aa_idx] is None:
activities[aa_idx] = AcquisitionActivity(
start=dt.fromtimestamp(mtime, tz=instrument.timezone),
)
# add this file to the AA
logger.info(
"Adding file %i/%i %s to activity %i",
i,
len(files),
str(f).replace(os.environ["mmfnexus_path"], "").strip("/"),
aa_idx,
)
activities[aa_idx].add_file(fname=f, generate_preview=generate_previews)
# assume this file is the last one in the activity (this will be
# true on the last iteration where mtime is <= to the
# aa_bounds value)
activities[aa_idx].end = dt.fromtimestamp(mtime, tz=instrument.timezone)
i += 1
else:
# this file's mtime is after the boundary and is thus part of the
# next activity, so increment AA counter and reprocess file (do
# not increment i)
aa_idx += 1
# Remove any "None" activities from list
activities: List[AcquisitionActivity] = [a for a in activities if a is not None]
logger.info("Finished detecting activities")
for i, this_activity in enumerate(activities):
logger.info("Activity %i: storing setup parameters", i)
this_activity.store_setup_params()
logger.info("Activity %i: storing unique metadata values", i)
this_activity.store_unique_metadata()
return activities
[docs]def get_files(
path: Path,
dt_from: dt,
dt_to: dt,
) -> List[Path]:
"""
Get files under a path that were last modified between the two given timestamps.
Parameters
----------
path
The file path in which to search for files
dt_from : datetime.datetime
The starting timestamp that will be used to determine which files go
in this record
dt_to : datetime.datetime
The ending timestamp used to determine the last point in time for
which files should be associated with this record
Returns
-------
files : List[pathlib.Path]
A list of the files that have modification times within the
time range provided (sorted by modification time)
"""
logger.info("Starting new file-finding in %s", path)
# read file finding strategy from environment and set to default of exclusive
strategy = os.environ.get("NexusLIMS_file_strategy", default="exclusive").lower()
if strategy not in ["inclusive", "exclusive"]:
logger.warning(
'File finding strategy (env variable "NexusLIMS_file_strategy") had '
'an unexpected value: "%s". Setting value to "exclusive".',
strategy,
)
strategy = "exclusive"
extension_arg = None if strategy == "inclusive" else ext_map.keys()
try:
files = gnu_find_files_by_mtime(path, dt_from, dt_to, extensions=extension_arg)
# exclude following from coverage because find_files_by_mtime is deprecated as of
# 1.2.0 and does not support extensions at all (like the above method)
except (NotImplementedError, RuntimeError) as exception: # pragma: no cover
logger.warning(
"GNU find returned error: %s\nFalling back to pure Python implementation",
exception,
)
files = find_files_by_mtime(path, dt_from, dt_to)
return files
[docs]def dump_record(
session: Session,
filename: Optional[Path] = None,
*,
generate_previews: bool = True,
) -> Path:
"""
Dump a record to an XML file.
Writes an XML record for a :py:class:`~nexusLIMS.db.session_handler.Session`
composed of information pulled from the appropriate reservation system
as well as metadata extracted from the microscope data (e.g. dm3 or
other files).
Parameters
----------
session : nexusLIMS.db.session_handler.Session
A :py:class:`~nexusLIMS.db.session_handler.Session` object
representing a unit of time on one of the instruments known to NexusLIMS
filename : typing.Optional[pathlib.Path]
The filename of the dumped xml file to write. If None, a default name
will be generated from the other parameters
generate_previews : bool
Whether or not to create the preview thumbnail images
Returns
-------
filename : pathlib.Path
The name of the created record that was returned
"""
if filename is None:
filename = Path(
"compiled_record"
+ (f"_{session.instrument.name}" if session.instrument else "")
+ session.dt_from.strftime("_%Y-%m-%d")
+ (f"_{session.user}" if session.user else "")
+ ".xml",
)
filename.parent.mkdir(parents=True, exist_ok=True)
with filename.open(mode="w", encoding="utf-8") as f:
text = build_record(session=session, generate_previews=generate_previews)
f.write(text)
return filename
[docs]def validate_record(xml_filename):
"""
Validate an .xml record against the Nexus schema.
Parameters
----------
xml_filename : str or io.StringIO or io.BytesIO
The path to the xml file to be validated (can also be a file-like
object like StringIO or BytesIO)
Returns
-------
validates : bool
Whether the record validates against the Nexus schema
"""
xsd_doc = etree.parse(XSD_PATH) # noqa: S320
xml_schema = etree.XMLSchema(xsd_doc)
xml_doc = etree.parse(xml_filename) # noqa: S320
return xml_schema.validate(xml_doc)
[docs]def build_new_session_records() -> List[Path]:
"""
Build records for new sessions from the database.
Uses :py:func:`~nexusLIMS.db.session_handler.get_sessions_to_build`) and builds
those records using :py:func:`build_record` (saving to the NexusLIMS folder), and
returns a list of resulting .xml files to be uploaded to CDCS.
Returns
-------
xml_files : typing.List[pathlib.Path]
A list of record files that were successfully built and saved to
centralized storage
"""
# get the list of sessions with 'TO_BE_BUILT' status; does not fetch new
# usage events from any NEMO instances;
# nexusLIMS.harvesters.nemo.add_all_usage_events_to_db() must be used
# first to do so
sessions = get_sessions_to_build()
if not sessions:
sys.exit("No 'TO_BE_BUILT' sessions were found. Exiting.")
xml_files = []
# loop through the sessions
for s in sessions:
try:
db_row = s.insert_record_generation_event()
record_text = build_record(session=s)
except ( # pylint: disable=broad-exception-caught
FileNotFoundError,
Exception,
) as exception:
if isinstance(exception, FileNotFoundError):
# if no files were found for this session log, mark it as so in
# the database
path = Path(os.environ["mmfnexus_path"]) / s.instrument.filestore_path
logger.warning(
"No files found in %s between %s and %s",
path,
s.dt_from.isoformat(),
s.dt_to.isoformat(),
)
if has_delay_passed(s.dt_to):
logger.warning(
'Marking %s as "NO_FILES_FOUND"',
s.session_identifier,
)
s.update_session_status("NO_FILES_FOUND")
else:
# if the delay hasn't passed, log and delete the record
# generation event we inserted previously
logger.warning(
"Configured record building delay has not passed; "
"Removing previously inserted RECORD_GENERATION row for %s",
s.session_identifier,
)
db_query(
"DELETE FROM session_log WHERE id_session_log = ?",
( # pylint: disable=used-before-assignment
db_row["id_session_log"],
),
)
elif isinstance(exception, nemo.exceptions.NoDataConsentError):
logger.warning(
"User requested this session not be harvested, "
"so no record was built. %s",
exception,
)
logger.info('Marking %s as "NO_CONSENT"', s.session_identifier)
s.update_session_status("NO_CONSENT")
elif isinstance(exception, nemo.exceptions.NoMatchingReservationError):
logger.warning(
"No matching reservation found for this session, "
"so assuming no consent was given. %s",
exception,
)
logger.info('Marking %s as "NO_RESERVATION"', s.session_identifier)
s.update_session_status("NO_RESERVATION")
else:
logger.exception("Could not generate record text")
logger.exception('Marking %s as "ERROR"', s.session_identifier)
s.update_session_status("ERROR")
else:
xml_files = _record_validation_flow(record_text, s, xml_files)
return xml_files
def _record_validation_flow(record_text, s, xml_files) -> List[Path]:
if validate_record(BytesIO(bytes(record_text, "UTF-8"))):
logger.info("Validated newly generated record")
# generate filename for saved record and make sure path exists
# DONE: fix this for NEMO records since session_identifier is
# a URL and it doesn't work right
if s.instrument.harvester == "nemo":
# for NEMO session_identifier is a URL of usage_event
unique_suffix = f"{nemo_utils.id_from_url(s.session_identifier)}"
else: # pragma: no cover
# assume session_identifier is a UUID
unique_suffix = f'{s.session_identifier.split("-")[0]}'
basename = (
f'{s.dt_from.strftime("%Y-%m-%d")}_'
f"{s.instrument.name}_"
f"{unique_suffix}.xml"
)
filename = Path(os.environ["nexusLIMS_path"]).parent / "records" / basename
filename.parent.mkdir(parents=True, exist_ok=True)
# write the record to disk and append to list of files generated
with filename.open(mode="w", encoding="utf-8") as f:
f.write(record_text)
logger.info("Wrote record to %s", filename)
xml_files.append(Path(filename))
# Mark this session as completed in the database
logger.info('Marking %s as "COMPLETED"', s.session_identifier)
s.update_session_status("COMPLETED")
else:
logger.error('Marking %s as "ERROR"', s.session_identifier)
logger.error("Could not validate record, did not write to disk")
s.update_session_status("ERROR")
return xml_files
[docs]def process_new_records(
*,
dry_run: bool = False,
dt_from: Optional[dt] = None,
dt_to: Optional[dt] = None,
):
"""
Process new records (this is the main entrypoint to the record builder).
Using :py:meth:`build_new_session_records()`, process new records,
save them to disk, and upload them to the NexusLIMS CDCS instance.
Parameters
----------
dry_run
Controls whether or not records will actually be built. If ``True``,
session harvesting and file finding will be performed, but no preview
images or records will be built. Can be used to see what _would_ happen
if ``dry_run`` is set to ``False``.
dt_from
The point in time after which sessions will be fetched. If ``None``,
no date filtering will be performed. This parameter currently only
has an effect for the NEMO harvester. All SharePoint events will always
be fetched.
dt_to
The point in time before which sessions will be fetched. If ``None``,
no date filtering will be performed. This parameter currently only
has an effect for the NEMO harvester. All SharePoint events will always
be fetched.
"""
if dry_run:
logger.info("!!DRY RUN!! Only finding files, not building records")
# get 'TO_BE_BUILT' sessions from the database
sessions = get_sessions_to_build()
# get Session objects for NEMO usage events without adding to DB
# DONE: NEMO usage events fetched should take a time range;
sessions += nemo_utils.get_usage_events_as_sessions(
dt_from=dt_from,
dt_to=dt_to,
)
if not sessions:
logger.warning("No 'TO_BE_BUILT' sessions were found. Exiting.")
return
for s in sessions:
# at this point, sessions can be from any type of harvester
logger.info("")
logger.info("")
# DONE: generalize this from just sharepoint to any harvester
# (prob. new function that takes session and determines
# where it came from and then gets the matching reservation
# event)
get_reservation_event(s)
dry_run_file_find(s)
else:
# DONE: NEMO usage events fetcher should take a time range; we also
# need a consistent response for testing
nemo_utils.add_all_usage_events_to_db(dt_from=dt_from, dt_to=dt_to)
xml_files = build_new_session_records()
if len(xml_files) == 0:
logger.warning("No XML files built, so no files uploaded")
else:
files_uploaded, _ = upload_record_files(xml_files)
for f in files_uploaded:
uploaded_dir = Path(f).parent / "uploaded"
Path(uploaded_dir).mkdir(parents=True, exist_ok=True)
shutil.copy2(f, uploaded_dir)
Path(f).unlink()
files_not_uploaded = [f for f in xml_files if f not in files_uploaded]
if len(files_not_uploaded) > 0:
logger.error(
"Some record files were not uploaded: %s",
files_not_uploaded,
)
return
[docs]def dry_run_get_sharepoint_reservation_event(
s: Session,
) -> ReservationEvent: # pragma: no cover
"""
Get the calendar event that *would* be used based off the supplied session.
Only implemented for the Sharepoint harvester.
Parameters
----------
s
A session read from the database
Returns
-------
res_event : ~nexusLIMS.harvesters.reservation_event.ReservationEvent
A list of strings containing the files that would be included for the
record of this session (if it were not a dry run)
"""
xml = sharepoint_calendar.fetch_xml(s.instrument, s.dt_from, s.dt_to)
res_event = sharepoint_calendar.res_event_from_xml(xml)
logger.info(res_event)
return res_event
[docs]def dry_run_file_find(s: Session) -> List[Path]:
"""
Get the files that *would* be included for a record built for the supplied session.
Parameters
----------
s : nexusLIMS.db.session_handler.Session
A session read from the database
Returns
-------
files : typing.List[pathlib.Path]
A list of Paths containing the files that would be included for the
record of this session (if it were not a dry run)
"""
path = Path(os.environ["mmfnexus_path"]) / s.instrument.filestore_path
logger.info(
"Searching for files for %s in %s between %s and %s",
s.instrument.name,
path,
s.dt_from.isoformat(),
s.dt_to.isoformat(),
)
files = get_files(path, s.dt_from, s.dt_to)
logger.info("Results for %s on %s:", s.session_identifier, s.instrument)
if len(files) == 0:
logger.warning("No files found for this session")
else:
logger.info("Found %i files for this session", len(files))
for f in files:
mtime = dt.fromtimestamp(
f.stat().st_mtime,
tz=s.instrument.timezone,
).isoformat()
logger.info("*mtime* %s - %s", mtime, f)
return files
if __name__ == "__main__": # pragma: no cover
# If running as a module, process new records (with some control flags)
from nexusLIMS.utils import setup_loggers
parser = argparse.ArgumentParser()
# Optional argument flag which defaults to False
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
dest="dry_run",
default=False,
)
# Optional verbosity counter (eg. -v, -vv, -vvv, etc.)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Verbosity (-v, -vv); corresponds to python logging level. "
"0 is WARN, 1 (-v) is INFO, 2 (-vv) is DEBUG. ERROR and "
"CRITICAL are always shown.",
)
# Specify output of "--version"
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s (version {version})",
)
args = parser.parse_args()
# set up logging
logging_levels = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}
if args.dry_run and args.verbose <= 0:
logger.warning('Increasing verbosity so output of "dry-run" will be shown')
args.verbose = 1
setup_loggers(logging_levels[args.verbose])
# when running as script, __name__ is "__main__", so we need to set level
# explicitly since the setup_loggers function won't find it
logger.setLevel(logging_levels[args.verbose])
# by default only fetch the last week's worth of data from the NEMO
# harvesters to speed things up
process_new_records(
dry_run=args.dry_run,
dt_from=dt.now(tz=current_system_tz()) - td(weeks=1),
)