rmellipse.workflows.archive_interface

Exceptions

ReleaseRecordNotFoundError

Raise when a record is asked for but doesn't exist.

Classes

ArchiveInterface

Interface for interacting through archives utilized by command line utility functions.

CDCSArchive

Archive Interface for a CDCS instance.

FileSystemArchive

Archive Interface for a file system archive.

Functions

is_url(input_string)

chunked_sha(f[, chunk])

Generate a SHA1 for a file in chunks.

get_credentials(host[, user, password])

Get credentials for a given host and user.

get_interface() → ArchiveInterface)

Login to an archive, returns an ArchiveInterface.

Module Contents

exception rmellipse.workflows.archive_interface.ReleaseRecordNotFoundError

Bases: Exception

Raise when a record is asked for but doesn’t exist.

Initialize self. See help(type(self)) for accurate signature.

rmellipse.workflows.archive_interface.is_url(input_string)
rmellipse.workflows.archive_interface.chunked_sha(f: io.BytesIO, chunk: int = 2**25)

Generate a SHA1 for a file in chunks.

Parameters:
fio.BytesIO
chunkint, optional

How many bytes to chunk at a time, by default 2^20 (~1MB)

Returns:
str

sha1 hash as a string.

rmellipse.workflows.archive_interface.get_credentials(host: str, user: str = None, password: str = None)

Get credentials for a given host and user.

Prompts user for missing username or password. Stores credentials in the system when provided by prompt or as function arguments.

Parameters:
hoststr

System host name, typically a URL.

userstr, optional

Username, by default None

passwordstr, optional

Password, by default None

Returns:
user

str

password

str

rmellipse.workflows.archive_interface.get_interface(host: str | pathlib.Path, user: str = None, password: str = None, resolve_relative_paths_to: str | pathlib.Path = Path.cwd()) ArchiveInterface

Login to an archive, returns an ArchiveInterface.

Parameters:
hoststr | Path

Path to the archive, url or file path.

userstr, optional

User name to the archive (if required)

passwordstr, optional

Passwords to the archive (if required).

Returns:
ArchiveInterface

Object that implements the ArchiveInterface.

class rmellipse.workflows.archive_interface.ArchiveInterface(host: str | pathlib.Path, user: str = None, password: str = None)

Bases: abc.ABC

Interface for interacting through archives utilized by command line utility functions.

Any archive should be written into this interface.

Parameters:
ABC

_description_

host
user = None
get_release_records
download_release(record: dict, workspace: str, max_threads: int, release_cache_folder: str | pathlib.Path, download_chunk_bytes: int, progress_bar: bool = True, file_status: bool = True)

Download a release from the archive to cache_folder.

Generates worker threads to download blobs using the download_blob function. Provideds progress bars if requested.

Parameters:
recorddict

_description_

max_threadsint

_description_

progress_barbool, optional

_description_, by default True

abstract download_blob(blob_pid: str, target_file: pathlib.Path, update_dict: dict, expected_size: int, release_record=dict, workspace=str, chunk_size=2**25)

Stream a blob stored in the archive to your local computer.

Parameters:
blob_pidstr

PID of the blob.

target_filePath

Target file in local path.

update_dictdict

Empty dictionary, updated with the

expected_sizeint

_description_

chunk_size_type_, optional

_description_, by default 2**25

Returns:
_type_

_description_

abstract process_and_upload_blob(posix_rel_path, working_dir, release_title_versionless, release_title, workspace_title, chunk_size, verbose=False) Tuple[str, int]

Upload a blob to the archive.

Returns the blob_PID and the number of bytes.

Parameters:
posix_rel_path_type_

_description_

working_dir_type_

_description_

release_title_versionless_type_

_description_

release_title_type_

_description_

workspace_title_type_

_description_

chunk_size_type_

_description_

verbosebool, optional

_description_, by default False

Returns:
blob_pid: str

The PID of the blob (i.e. file) that was uploaded

nbytes: int

The size of the uploaded in bytes.

upload_blobs_and_update_mapping(release_title: str, release_title_versionless: str, workspace: str, project_mapping: dict, project_directory: str | pathlib.Path, max_threads: int, chunk_size: int, no_blobs: bool = False)

Upload blobs to the archive and update the project mapping.

The project mapping metadata for each blob is updated with information that is only determined at upload time (i.e. the PID, bytes). Should be called before upload release record.

Parameters:
project_mappingdict

Project mapping dictionary.

project_directorystr | Path

Root directory of the project.

max_threadsint

Max threads for upload processes. Each blob gets its own process.

chunk_sizeint

Max size of chunks for uploading

no_blobsbool, optional

Dont upload blobs, by default False, for debugging purposes only.

abstract upload_release_record(release_record: dict) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
titlestr

Title of the release with version code.

release_recorddict

Releae record dictionary.

Returns:
str

PID of uploaded object.

class rmellipse.workflows.archive_interface.CDCSArchive(host: str, user: str, password: str = None)

Bases: ArchiveInterface

Archive Interface for a CDCS instance.

curator
supports_repeat_releases = True
get_release_records(title_versionless: str, version_expressions: list[str], workspace='Global Public Workspace') dict

Get release records matching a version code.

Parameters:
title_versionlessstr

Title of the release with out the version.

version_expressionsstr

Version code string (i.e [“>=0.1.0”,”<0.2.0] or [“==0.1.2”])

workspacestr, optional

Workspace to look through, by default “Global Public Workspace”

Returns:
dict

Dictionary of release records, keys are the {title}-v{version} format sorted from oldest to most recent version following semver.

download_blob(blob_pid: str, target_file: str | pathlib.Path, update_dict: dict, expected_size: int, release_record=dict, workspace=str, chunk_size: int = 2**25)

Download a blob from a PID to it’s a local file.

Parameters:
blob_pidstr

PID of blob, should be url.

target_filestr | Path

Target path to download to.

update_dictdict

Dictionary with {‘size’:0,’finished’:false}, used to monitor the download process when spun up into threads.

expected_sizeint

Expected size of the blob in bytes.

chunk_sizeint, optional

Chunk size for downloading, by default 2**25

process_and_upload_blob(posix_rel_path: pathlib.Path, working_dir: pathlib.Path, release_title: str, release_title_versionless: str, workspace_title: str, chunk_size: int, verbose: bool = False) tuple[str]

Upload a file to a CDCS workspace.

Parameters:
posix_rel_pathPath

_description_

working_dirPath

Working directory of release, from with all paths are relative.

release_title: str

Name of the release with version code. Isn’t required for the CDCS archive, but included for interface compatability.

release_title_versionless: str

Name of the release without the version code.

chunk_sizeint

Chunking size. Not used

workspace_title: str

Name of the workspace to upload to.

Returns:
blob_pid:

blob_id

nbytes:

size of file in bytes

upload_release_record(release_record: dict, workspace: str) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
titlestr

Title of the release with version code.

release_recorddict

Releae record dictionary.

Returns:
str

PID of uploaded object.

class rmellipse.workflows.archive_interface.FileSystemArchive(host: str, user: str)

Bases: ArchiveInterface

Archive Interface for a file system archive.

File system archive, which is an archive stored just in a directory with a standard layout. Access permissions are based on file system persmissions in the system.

archive_path
host = ''
get_release_records(title_versionless: str, version_expressions: list[str], workspace='Global Public Workspace') dict

Get release records matching a version code.

Parameters:
title_versionlessstr

Title of the release with out the version.

version_expressionsstr

Version code string (i.e [“>=0.1.0”,”<0.2.0] or [“==0.1.2”])

workspacestr, optional

Workspace to look through, by default “Global Public Workspace”

Returns:
dict

Dictionary of release records, keys are the {title}-v{version} format sorted from oldest to most recent version following semver.

download_blob(blob_pid: str, target_file: str | pathlib.Path, update_dict: dict, expected_size: int, release_record: dict, workspace: str, chunk_size: int = 2**25)

Download a blob from a PID to it’s a local file.

Parameters:
blob_pidstr

PID of blob, should be url.

target_filestr | Path

Target path to download to.

update_dictdict

Dictionary with {‘size’:0,’finished’:false}, used to monitor the download process when spun up into threads.

expected_sizeint

Expected size of the blob in bytes.

release_record: dict

Full release record.

workspace: str,

Workspace to look through

chunk_sizeint, optional

Chunk size for downloading, by default 2**25

process_and_upload_blob(posix_rel_path: pathlib.Path, working_dir: pathlib.Path, release_title_versionless: str, release_title: str, workspace_title: str, chunk_size: int, verbose: bool = False) tuple[str]

Upload a file to a CDCS workspace.

Parameters:
posix_rel_pathPath

_description_

working_dirPath,

Working directory of the project

release_title_versionless: str

Name of release without version code

release_title: Str

Name of release with version

verbosebool,

Print information

workspace_title: str, optional

Name of the workspace.

chunk_size: int

Size of upload in chunks.

Returns:
blob_pid:

blob_id

nbytes:

size of file in bytes

upload_release_record(release_record: dict, workspace_title: str) str

Upload a release record.

The record should be assigned a PID during the upload process.

Parameters:
titlestr

Title of the release with version code.

release_recorddict

Releae record dictionary.

Returns:
str

PID of uploaded object.