"""Defines the NemoConnector class that is used to interface with the NEMO API."""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import parse_qs, urljoin, urlparse
from pytz import timezone as pytz_timezone
from nexusLIMS.db.session_handler import Session, SessionLog, db_query
from nexusLIMS.instruments import get_instr_from_api_url, instrument_db
from nexusLIMS.utils import nexus_req
logger = logging.getLogger(__name__)
[docs]class NemoConnector:
"""
A connection to an instance of the API of the NEMO laboratory management software.
Provides helper methods for fetching data from the API.
Parameters
----------
base_url : str
The "root" of the API including a trailing slash;
e.g. 'https://nemo.url.com/api/'
token : str
An authentication token for this NEMO instance
strftime_fmt : str
The "date format" to use when encoding dates to send as filters to the
NEMO API. Should follow the same convention as
:ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format
will be used.
strptime_fmt : str
The "date format" to use when decoding date strings received in the
response from the API. Should follow the same convention as
:ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format
will be used.
timezone : str
The timezone to use when decoding date strings received in the
response from the API. Should be a IANA time zone database string; e.g.
"America/New_York". Useful if no timezone information is returned
from an instance of the NEMO API. If ``None``, no timezone setting will
be done and the code will use whatever was returned from the server
as is.
"""
tools: Dict[int, Dict]
users: Dict[int, Dict]
users_by_username: Dict[str, Dict]
projects: Dict[int, Dict]
def __init__( # pylint: disable=too-many-arguments # noqa: PLR0913
self,
base_url: str,
token: str,
strftime_fmt: Optional[str] = None,
strptime_fmt: Optional[str] = None,
timezone: Optional[str] = None,
):
self.config = {
"base_url": base_url,
"token": token,
"strftime_fmt": strftime_fmt,
"strptime_fmt": strptime_fmt,
"timezone": timezone,
}
# these attributes are used for "memoization" of NEMO content,
# so it can be remembered and used for a cache lookup
# keys should be NEMO internal IDs and values should be the
# dictionary returned by the API
self.tools = {}
self.users = {}
self.users_by_username = {}
self.projects = {}
def __repr__(self):
"""Return custom representation of a NemoConnector."""
return f"Connection to NEMO API at {self.config['base_url']}"
[docs] def strftime(self, date_dt) -> str:
"""
Convert datetime to appropriate string format for this connector.
Using the settings for this NemoConnector, convert a datetime object
to a string that will be understood by the API. If the ``strftime_fmt``
attribute for this NemoConnector is ``None``, ISO 8601 format will be
used.
Parameters
----------
date_dt
The date to be converted as a datetime object
Returns
-------
date_str : str
The date formatted as a string that will be understandable by the
API for this NemoConnector
"""
if self.config["strftime_fmt"] is None:
return date_dt.isoformat()
if (
"%z" in self.config["strftime_fmt"] or "%Z" in self.config["strftime_fmt"]
) and date_dt.tzinfo is None:
# make sure datetime is timezone aware if timezone is
# indicated in strftime_fmt. Use NEMO_tz setting if present,
# otherwise use local server timezone
if self.config["timezone"]:
date_dt = pytz_timezone(self.config["timezone"]).localize(date_dt)
else:
date_dt = date_dt.astimezone()
return date_dt.strftime(self.config["strftime_fmt"])
[docs] def strptime(self, date_str) -> datetime:
"""
Convert string to datetime using this connector's API date format.
Using the settings for this NemoConnector, convert a datetime string
representation from the API into a datetime object that can be used
in Python. If the ``strptime_fmt`` attribute for this NemoConnector
is ``None``, ISO 8601 format will be assumed. If a timezone is
specified for this server, the resulting datetime will be coerced to
that timezone.
Parameters
----------
date_str
The date formatted as a string that is returned by the
API for this NemoConnector
Returns
-------
date_dt : ~datetime.datetime
The date to be converted as a datetime object
"""
if self.config["strptime_fmt"] is None:
date_dt = datetime.fromisoformat(date_str)
else:
# to be defensive here, try without microseconds as well if ".%f"
# is in strptime_fmt and it fails (since sometimes NEMO doesn't
# write microseconds for every time, even if it's supposed to
try:
date_dt = datetime.strptime( # noqa: DTZ007
date_str,
self.config["strptime_fmt"],
)
except ValueError as exception:
if ".%f" in self.config["strptime_fmt"]:
date_dt = datetime.strptime( # noqa: DTZ007
date_str,
self.config["strptime_fmt"].replace(".%f", ""),
)
else:
raise ValueError(str(exception)) from exception # pragma: no cover
if self.config["timezone"]:
# strip any timezone information from the datetime, then localize
# with pytz to whatever timezone specified
date_dt = date_dt.replace(tzinfo=None)
date_dt = pytz_timezone(self.config["timezone"]).localize(date_dt)
return date_dt
[docs] def get_users(self, user_id: Optional[Union[int, List[int]]] = None) -> List[Dict]:
"""
Get a list of one or more users from the NEMO API in a dictionary.
The results will be cached in the NemoConnector instance to prevent multiple
API queries if not necessary.
Parameters
----------
user_id
The user(s) to fetch, as indexed by the NEMO instance (i.e.
``user_id`` should be the internal primary key used by NEMO to
identify the user. If an empty list or None is given, all users
will be returned.
Returns
-------
users : List[Dict]
A list (could be empty) of users that match the ids and/or
usernames given
Raises
------
requests.exceptions.HTTPError
Raised if the request to the API did not go through correctly
"""
params = {}
# list of user ids
if hasattr(user_id, "__iter__"):
params["id__in"] = ",".join([str(i) for i in user_id])
if all(u_id in self.users for u_id in user_id):
logger.debug(
"Using cached user info for users with id in %s: %s",
user_id,
[self.users[u]["username"] for u in user_id],
)
return [self.users[u_id] for u_id in user_id]
# single user id
else:
params["id"] = user_id
if user_id in self.users:
logger.debug(
'Using cached user info for user id %s: "%s"',
user_id,
self.users[user_id]["username"],
)
return [self.users[user_id]]
return self._get_users_helper(params)
[docs] def get_users_by_username(self, username=None) -> List:
"""
Get a list of one or more users from the NEMO API in a dictionary.
The results will be cached in the NemoConnector
instance to prevent multiple API queries if not necessary.
Parameters
----------
username : str or :obj:`list` of :obj:`str`
The user(s) to fetch, as indexed by their usernames in the NEMO
instance. If an empty list or None is given, all users
will be returned.
Returns
-------
users : list
A list (could be empty) of users that match the ids and/or
usernames given
Raises
------
requests.exceptions.HTTPError
Raised if the request to the API did not go through correctly
"""
params = {}
if isinstance(username, str):
params["username__iexact"] = username
if username in self.users_by_username:
logger.debug('Using cached user info for username "%s"', username)
return [self.users_by_username[username]]
else:
params["username__in"] = ",".join(username)
if all(uname in self.users_by_username for uname in username):
logger.debug("Using cached user info for users with id in %s", username)
return [self.users_by_username[uname] for uname in username]
return self._get_users_helper(params)
[docs] def get_projects(self, proj_id: Union[int, List[int]]) -> List[Dict]:
"""
Get a list of one or more projects from the NEMO API in a dictionary.
The local cache will be checked prior to fetching
from the API to save a network request if possible.
Parameters
----------
proj_id
The project(s) to fetch, as indexed by the NEMO instance (i.e.
``proj_id`` should be the internal primary key used by NEMO to
identify the project. If an empty list is given, all projects
will be returned.
Returns
-------
projects : List[Dict]
A list (could be empty) of projects that match the id (or ids) given
in ``proj_id``
Raises
------
requests.exceptions.HTTPError
Raised if the request to the API did not go through correctly
"""
if hasattr(proj_id, "__iter__"):
if all(p_id in self.projects for p_id in proj_id):
logger.debug(
"Using cached project info for projects %s: %s",
proj_id,
[self.projects[p]["name"] for p in proj_id],
)
return [self.projects[p_id] for p_id in proj_id]
params = {"id__in": ",".join([str(i) for i in proj_id])}
else:
if proj_id in self.projects:
logger.debug(
'Using cached project info for project %s: "%s"',
proj_id,
self.projects[proj_id]["name"],
)
return [self.projects[proj_id]]
params = {"id": proj_id}
projects = self._api_caller("GET", "projects/", params)
for params in projects:
# expand the only_allow_tools node
if "only_allow_tools" in params:
params.update(
{
"only_allow_tools": [
self.get_tools(t)[0] for t in params["only_allow_tools"]
],
},
)
self.projects[params["id"]] = params
return projects
[docs] def get_reservations(
self,
dt_from: datetime = None,
dt_to: datetime = None,
tool_id: Optional[Union[int, List[int]]] = None,
*,
cancelled: Optional[bool] = False,
) -> List[Dict]:
"""
Get reservations from the NEMO API filtered in various ways.
Return a list of reservations from the API, filtered by date
(inclusive). If only one argument is provided, the API will return all
reservations either before or after the parameter. With no arguments,
the method will return all reservations. The method will
"auto-expand" linked information such as user, project, tool, etc. so
results will have a full dictionary for each of those fields, rather
than just the index (as returned from the API).
Parameters
----------
dt_from
The "starting point" of the time range; only reservations at or
after this point in time will be returned
dt_to
The "ending point" of the time range; only reservations at
or prior to this point in time will be returned
tool_id
A tool identifier (or list of them) to limit the scope of the
reservation search (this should be the NEMO internal integer ID)
cancelled
Whether to get canceled or active reservations in the response
(default is False -- meaning non-cancelled active reservations
are returned by default). Set to None to include all reservations
regardless of whether they are cancelled or active.
Returns
-------
reservations : List[Dict]
A list (could be empty) of reservations that match the date range
supplied
"""
params = {}
if dt_from:
params["start__gte"] = self.strftime(dt_from)
if dt_to:
params["end__lte"] = self.strftime(dt_to)
if cancelled is not None:
params["cancelled"] = cancelled
if tool_id is not None:
if isinstance(tool_id, list):
params.update({"tool_id__in": ",".join([str(i) for i in tool_id])})
else:
params.update({"tool_id": str(tool_id)})
reservations = self._api_caller("GET", "reservations/", params)
parsed_reservations = []
for reservation in reservations:
parsed_reservations.append(self._parse_reservation(reservation))
return parsed_reservations
def _parse_reservation(self, reservation: dict) -> dict:
# expand various fields within the reservation data
if reservation["user"]:
user = self.get_users(reservation["user"])
if user:
reservation.update({"user": user[0]})
if reservation["creator"]:
user = self.get_users(reservation["creator"])
if user:
reservation.update({"creator": user[0]})
if reservation["tool"]:
tool = self.get_tools(reservation["tool"])
if tool:
reservation.update({"tool": tool[0]})
if reservation["project"]:
params = self.get_projects(reservation["project"])
if params:
reservation.update({"project": params[0]})
if reservation["cancelled_by"]:
user = self.get_users(reservation["cancelled_by"])
if user:
reservation.update({"cancelled_by": user[0]})
return reservation
[docs] def get_usage_events(
self,
event_id: Optional[Union[int, List[int]]] = None,
user: Optional[Union[str, int]] = None,
dt_range: Optional[Tuple[Optional[datetime], Optional[datetime]]] = None,
tool_id: Optional[Union[int, List[int]]] = None,
) -> List:
"""
Get usage events from the NEMO API filtered in various ways.
Return a list of usage events from the API, filtered by date
(inclusive). If only one argument is provided, the API will return all
reservations either before or after the parameter. With no arguments,
the method will return all reservations. The method will
"auto-expand" linked information such as user, project, tool, etc. so
results will have a full dictionary for each of those fields, rather
than just the index (as returned from the API).
Parameters
----------
event_id
The NEMO integer identifier (or a list of them) to fetch. If
``None``, the returned usage events will not be filtered by ID
number
user
The user for which to fetch usage events, as either an integer
representing their id in the NEMO instance, or as a string
containing their username. If ``None`` is given, usage events
from all users will be returned.
dt_range
The "starting" and "end" points of the time range; only usage events
starting between these points in time will be returned
tool_id
A tool identifier (or list of them) to limit the scope of the
usage event search (this should be the NEMO internal integer ID).
Regardless of what value is given, this method will always limit
the API query to tools specified in the NexusLIMS DB for this
harvester
Returns
-------
usage_events : List
A list (could be empty) of usage events that match the filters
supplied
"""
params = self._parse_dt_range(dt_range, {})
if event_id is not None:
if hasattr(event_id, "__iter__"):
params.update({"id__in": ",".join([str(i) for i in event_id])})
else:
params.update({"id": event_id})
if user:
if isinstance(user, str):
u_id = self.get_users_by_username(user)[0]["id"]
else:
u_id = user
params["user_id"] = u_id
# filtering for tool; if at the end of this block tool_id is an empty
# list, we should just immediately return an empty list, since either
# there were no tools for this connector in our DB, or the tools
# specified were not found in the DB, so we know there are no
# usage_events of interest
this_connectors_tools = self.get_known_tool_ids()
if tool_id is None:
# by default (no tool_id specified), we should fetch events from
# only the tools known to the NexusLIMS DB for this connector
tool_id = this_connectors_tools
if isinstance(tool_id, int):
# coerce tool_id to list to make subsequent processing easier
tool_id = [tool_id]
# limit tool_id to values that are present in this_connectors_tools
tool_id = [i for i in tool_id if i in this_connectors_tools]
# if tool_id is empty, we should just return
if not tool_id:
return []
params.update({"tool_id__in": ",".join([str(i) for i in tool_id])})
usage_events = self._api_caller("GET", "usage_events/", params)
parsed_events = []
for event in usage_events:
parsed_events.append(self._parse_event(event))
return parsed_events
def _parse_dt_range(
self,
dt_range: Optional[Tuple[Optional[datetime], Optional[datetime]]],
params: Dict,
) -> Dict:
if dt_range is not None:
dt_from, dt_to = dt_range
if dt_from is not None:
params["start__gte"] = self.strftime(dt_from)
if dt_to is not None:
params["end__lte"] = self.strftime(dt_to)
return params
def _parse_event(self, event: dict) -> dict:
# expand various fields within the usage event data
if event["user"]:
user = self.get_users(event["user"])
if user:
event.update({"user": user[0]})
if event["operator"]:
user = self.get_users(event["operator"])
if user:
event.update({"operator": user[0]})
if event["project"]:
proj = self.get_projects(event["project"])
if proj:
event.update({"project": proj[0]})
if event["tool"]:
tool = self.get_tools(event["tool"])
if tool:
event.update({"tool": tool[0]})
return event
[docs] def write_usage_event_to_session_log(self, event_id: int) -> None:
"""
Write a usage event to the NexusLIMS database session log.
Inserts two rows (if needed) into the ``session_log`` (marking the start
and end of a usage event), only for instruments recognized by
NexusLIMS (i.e. that have a row in the ``instruments`` table of the DB).
If the usage event has not ended yet, no action is performed.
Parameters
----------
event_id
The NEMO id number for the event to insert
"""
event = self.get_usage_events(event_id=event_id)
if event:
# get_usage_events returns list, so pick out first one
event = event[0]
tool_api_url = f"{self.config['base_url']}tools/?id={event['tool']['id']}"
instr = get_instr_from_api_url(tool_api_url)
if instr is None: # pragma: no cover
# this shouldn't happen since we limit our usage event API call
# only to instruments contained in our DB, but we can still
# defend against it regardless
logger.warning(
"Usage event %s was for an instrument (%s) not known "
"to NexusLIMS, so no records will be added to DB.",
event_id,
tool_api_url,
)
return
if event["end"] is None:
logger.warning(
"Usage event %s has not yet ended, so no records "
"will be added to DB.",
event_id,
)
return
session_id = f"{self.config['base_url']}usage_events/?id={event['id']}"
# try to insert start log
res = db_query(
"SELECT * FROM session_log WHERE session_identifier "
"= ? AND event_type = ?",
(session_id, "START"),
)
if len(res[1]) > 0:
# there was already a start log, so warn and don't do anything:
logger.warning(
"A 'START' log with session id \"%s\" was found in the the DB, "
"so a new one will not be inserted for this event",
session_id,
)
else:
start_log = SessionLog(
session_identifier=session_id,
instrument=instr.name,
# make sure to coerce format to ISO before putting in DB
timestamp=self.strptime(event["start"]).isoformat(),
event_type="START",
user=event["user"]["username"],
record_status="TO_BE_BUILT",
)
start_log.insert_log()
# try to insert end log
res = db_query(
"SELECT * FROM session_log WHERE session_identifier "
"= ? AND event_type = ?",
(session_id, "END"),
)
if len(res[1]) > 0:
# there was already an end log, so warn and don't do anything:
logger.warning(
"An 'END' log with session id \"%s\" was found in the the DB, "
"so a new one will not be inserted for this event",
session_id,
)
else:
end_log = SessionLog(
session_identifier=session_id,
instrument=instr.name,
# make sure to coerce format to ISO before putting in DB
timestamp=self.strptime(event["end"]).isoformat(),
event_type="END",
user=event["user"]["username"],
record_status="TO_BE_BUILT",
)
end_log.insert_log()
else:
logger.warning(
"No usage event with id = %s was found for %s",
event_id,
self,
)
[docs] def get_session_from_usage_event(self, event_id: int) -> Optional[Session]:
"""
Get a Session representation of a usage event.
Get a :py:class:`~nexusLIMS.db.session_handler.Session`
representation of a usage event for use in dry runs of the record
builder.
Parameters
----------
event_id
The NEMO id number for the event to insert
Returns
-------
session : ~nexusLIMS.db.session_handler.Session
A representation of the usage_event from NEMO as a
:py:class:`~nexusLIMS.db.session_handler.Session` object
"""
event = self.get_usage_events(event_id=event_id)
if event:
event = event[0]
# we cannot reliably test an unended event, so exlcude from coverage
if event["start"] is not None and event["end"] is None: # pragma: no cover
logger.warning(
"Usage event with id = %s has not yet ended for '%s'",
event_id,
self,
)
return None
instr = get_instr_from_api_url(
f"{self.config['base_url']}tools/?id={event['tool']['id']}",
)
session_id = f"{self.config['base_url']}usage_events/?id={event_id}"
session = Session(
session_identifier=session_id,
instrument=instr,
dt_range=(self.strptime(event["start"]), self.strptime(event["end"])),
user=event["user"]["username"],
)
return session
logger.warning("No usage event with id = %s was found for '%s'", event_id, self)
return None
def _get_users_helper(self, params: Dict[str, str]) -> list:
"""
Call the users API with certain parameters.
Parameters
----------
params
A dictionary of query parameters for the API query
Returns
-------
users
A list (could be empty) of users that match the ids and/or
usernames given
"""
users = self._api_caller("GET", "users/", params)
for user in users:
# cache the users response by ID and username
self.users[user["id"]] = user
self.users_by_username[user["username"]] = user
return users
def _api_caller(
self,
verb: str,
endpoint: str,
params: Dict[str, str],
) -> List[Dict[str, Any]]:
"""
Make a call to the NEMO API.
Helper function to deduplicate actual calls to the API. Takes care of
building the full URL from the base URL and specific endpoint,
as well as authentication, passing of parameters, and parsing the
results.
Parameters
----------
verb
The ``requests`` verb (``'POST'``, ``'GET'``,
``'PATCH'``, etc.) to use for the API request
endpoint
The API endpoint to use. Should be formatted with a trailing
slash and no leading slash. i.e. the endpoint for Projects data
should be supplied as ``'projects/'``
params
The URL parameters to pass along with the request. These are
generally filters for the API data such as ``id`` or a date range
or something similar.
Returns
-------
results
The API response, formatted as a list of dict objects
"""
url = urljoin(self.config["base_url"], endpoint)
logger.info("getting data from %s with parameters %s", url, params)
response = nexus_req(url, verb, token_auth=self.config["token"], params=params)
response.raise_for_status()
return response.json()