rmellipse.workflows.archive_interface

Exceptions

ReleaseRecordNotFoundError

Raise when a record is asked for but doesn't exist.

Classes

ArchiveInterface

Interface for interacting through archives utilized by command line utility functions.

CDCSArchive

Archive Interface for a CDCS instance.

FileSystemArchive

Archive Interface for a file system archive.

Functions

is_url(input_string)

chunked_sha(f[, chunk])

Generate a SHA1 for a file in chunks.

get_credentials(host[, user, password])

Get credentials for a given host and user.

get_interface() → ArchiveInterface)

Login to an archive, returns an ArchiveInterface.

Module Contents

exception rmellipse.workflows.archive_interface.ReleaseRecordNotFoundError

Bases: Exception

Raise when a record is asked for but doesn’t exist.

Initialize self. See help(type(self)) for accurate signature.

rmellipse.workflows.archive_interface.is_url(input_string)
rmellipse.workflows.archive_interface.chunked_sha(f: io.BytesIO, chunk: int = 2**25)

Generate a SHA1 for a file in chunks.

Parameters:
  • f (io.BytesIO)

  • chunk (int, optional) – How many bytes to chunk at a time, by default 2^20 (~1MB)

Returns:

sha1 hash as a string.

Return type:

str

rmellipse.workflows.archive_interface.get_credentials(host: str, user: str = None, password: str = None)

Get credentials for a given host and user.

Prompts user for missing username or password. Stores credentials in the system when provided by prompt or as function arguments.

Parameters:
  • host (str) – System host name, typically a URL.

  • user (str, optional) – Username, by default None

  • password (str, optional) – Password, by default None

Returns:

  • user – str

  • password – str

rmellipse.workflows.archive_interface.get_interface(host: str | pathlib.Path, user: str = None, password: str = None, resolve_relative_paths_to: str | pathlib.Path = Path.cwd()) ArchiveInterface

Login to an archive, returns an ArchiveInterface.

Parameters:
  • host (str | Path) – Path to the archive, url or file path.

  • user (str, optional) – User name to the archive (if required)

  • password (str, optional) – Passwords to the archive (if required).

Returns:

Object that implements the ArchiveInterface.

Return type:

ArchiveInterface

class rmellipse.workflows.archive_interface.ArchiveInterface(host: str | pathlib.Path, user: str = None, password: str = None)

Bases: abc.ABC

Interface for interacting through archives utilized by command line utility functions.

Any archive should be written into this interface.

Parameters:

ABC – _description_

host
user = None
get_release_records
download_release(record: dict, workspace: str, max_threads: int, release_cache_folder: str | pathlib.Path, download_chunk_bytes: int, progress_bar: bool = True, file_status: bool = True)

Download a release from the archive to cache_folder.

Generates worker threads to download blobs using the download_blob function. Provideds progress bars if requested.

Parameters:
  • record (dict) – _description_

  • max_threads (int) – _description_

  • progress_bar (bool, optional) – _description_, by default True

abstract download_blob(blob_pid: str, target_file: pathlib.Path, update_dict: dict, expected_size: int, release_record=dict, workspace=str, chunk_size=2**25)

Stream a blob stored in the archive to your local computer.

Parameters:
  • blob_pid (str) – PID of the blob.

  • target_file (Path) – Target file in local path.

  • update_dict (dict) – Empty dictionary, updated with the

  • expected_size (int) – _description_

  • chunk_size (_type_, optional) – _description_, by default 2**25

Returns:

_description_

Return type:

_type_

abstract process_and_upload_blob(posix_rel_path, working_dir, release_title_versionless, release_title, workspace_title, chunk_size, verbose=False) Tuple[str, int]

Upload a blob to the archive.

Returns the blob_PID and the number of bytes.

Parameters:
  • posix_rel_path (_type_) – _description_

  • working_dir (_type_) – _description_

  • release_title_versionless (_type_) – _description_

  • release_title (_type_) – _description_

  • workspace_title (_type_) – _description_

  • chunk_size (_type_) – _description_

  • verbose (bool, optional) – _description_, by default False

Returns:

  • blob_pid (str) – The PID of the blob (i.e. file) that was uploaded

  • nbytes (int) – The size of the uploaded in bytes.

upload_blobs_and_update_mapping(release_title: str, release_title_versionless: str, workspace: str, project_mapping: dict, project_directory: str | pathlib.Path, max_threads: int, chunk_size: int, no_blobs: bool = False)

Upload blobs to the archive and update the project mapping.

The project mapping metadata for each blob is updated with information that is only determined at upload time (i.e. the PID, bytes). Should be called before upload release record.

Parameters:
  • project_mapping (dict) – Project mapping dictionary.

  • project_directory (str | Path) – Root directory of the project.

  • max_threads (int) – Max threads for upload processes. Each blob gets its own process.

  • chunk_size (int) – Max size of chunks for uploading

  • no_blobs (bool, optional) – Dont upload blobs, by default False, for debugging purposes only.

abstract upload_release_record(release_record: dict) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
  • title (str) – Title of the release with version code.

  • release_record (dict) – Releae record dictionary.

Returns:

PID of uploaded object.

Return type:

str

class rmellipse.workflows.archive_interface.CDCSArchive(host: str, user: str, password: str = None)

Bases: ArchiveInterface

Archive Interface for a CDCS instance.

curator
supports_repeat_releases = True
get_release_records(title_versionless: str, version_expressions: list[str], workspace='Global Public Workspace') dict

Get release records matching a version code.

Parameters:
  • title_versionless (str) – Title of the release with out the version.

  • version_expressions (str) – Version code string (i.e [“>=0.1.0”,”<0.2.0] or [“==0.1.2”])

  • workspace (str, optional) – Workspace to look through, by default “Global Public Workspace”

Returns:

Dictionary of release records, keys are the {title}-v{version} format sorted from oldest to most recent version following semver.

Return type:

dict

download_blob(blob_pid: str, target_file: str | pathlib.Path, update_dict: dict, expected_size: int, release_record=dict, workspace=str, chunk_size: int = 2**25)

Download a blob from a PID to it’s a local file.

Parameters:
  • blob_pid (str) – PID of blob, should be url.

  • target_file (str | Path) – Target path to download to.

  • update_dict (dict) – Dictionary with {‘size’:0,’finished’:false}, used to monitor the download process when spun up into threads.

  • expected_size (int) – Expected size of the blob in bytes.

  • chunk_size (int, optional) – Chunk size for downloading, by default 2**25

process_and_upload_blob(posix_rel_path: pathlib.Path, working_dir: pathlib.Path, release_title: str, release_title_versionless: str, workspace_title: str, chunk_size: int, verbose: bool = False) tuple[str]

Upload a file to a CDCS workspace.

Parameters:
  • posix_rel_path (Path) – _description_

  • working_dir (Path) – Working directory of release, from with all paths are relative.

  • release_title (str) – Name of the release with version code. Isn’t required for the CDCS archive, but included for interface compatability.

  • release_title_versionless (str) – Name of the release without the version code.

  • chunk_size (int) – Chunking size. Not used

  • workspace_title (str) – Name of the workspace to upload to.

Returns:

  • blob_pid – blob_id

  • nbytes – size of file in bytes

upload_release_record(release_record: dict, workspace: str) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
  • title (str) – Title of the release with version code.

  • release_record (dict) – Releae record dictionary.

Returns:

PID of uploaded object.

Return type:

str

class rmellipse.workflows.archive_interface.FileSystemArchive(host: str, user: str)

Bases: ArchiveInterface

Archive Interface for a file system archive.

File system archive, which is an archive stored just in a directory with a standard layout. Access permissions are based on file system persmissions in the system.

archive_path
host = ''
get_release_records(title_versionless: str, version_expressions: list[str], workspace='Global Public Workspace') dict

Get release records matching a version code.

Parameters:
  • title_versionless (str) – Title of the release with out the version.

  • version_expressions (str) – Version code string (i.e [“>=0.1.0”,”<0.2.0] or [“==0.1.2”])

  • workspace (str, optional) – Workspace to look through, by default “Global Public Workspace”

Returns:

Dictionary of release records, keys are the {title}-v{version} format sorted from oldest to most recent version following semver.

Return type:

dict

download_blob(blob_pid: str, target_file: str | pathlib.Path, update_dict: dict, expected_size: int, release_record: dict, workspace: str, chunk_size: int = 2**25)

Download a blob from a PID to it’s a local file.

Parameters:
  • blob_pid (str) – PID of blob, should be url.

  • target_file (str | Path) – Target path to download to.

  • update_dict (dict) – Dictionary with {‘size’:0,’finished’:false}, used to monitor the download process when spun up into threads.

  • expected_size (int) – Expected size of the blob in bytes.

  • release_record (dict) – Full release record.

  • workspace (str,) – Workspace to look through

  • chunk_size (int, optional) – Chunk size for downloading, by default 2**25

process_and_upload_blob(posix_rel_path: pathlib.Path, working_dir: pathlib.Path, release_title_versionless: str, release_title: str, workspace_title: str, chunk_size: int, verbose: bool = False) tuple[str]

Upload a file to a CDCS workspace.

Parameters:
  • posix_rel_path (Path) – _description_

  • working_dir (Path,) – Working directory of the project

  • release_title_versionless (str) – Name of release without version code

  • release_title (Str) – Name of release with version

  • verbose (bool,) – Print information

  • workspace_title (str, optional) – Name of the workspace.

  • chunk_size (int) – Size of upload in chunks.

Returns:

  • blob_pid – blob_id

  • nbytes – size of file in bytes

upload_release_record(release_record: dict, workspace_title: str) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
  • title (str) – Title of the release with version code.

  • release_record (dict) – Releae record dictionary.

Returns:

PID of uploaded object.

Return type:

str