rmellipse.workflows.workflowtree

Classes

DataPointer

Represents an edge in a job tree

Job

dict() -> new empty dictionary

WorkflowTree

dict() -> new empty dictionary

Functions

execute_concurrent_jobs(jobs, level[, max_threads])

Execute a set of concurrent jobs in wft.

validate_datapointers(datapointers, project_settings, ...)

Validates if datapointers exist within a project.

Module Contents

class rmellipse.workflows.workflowtree.DataPointer(o: dict | pathlib.Path)

Bases: dict

Represents an edge in a job tree

Initialize self. See help(type(self)) for accurate signature.

property name
property path
class rmellipse.workflows.workflowtree.Job(job_dict: dict, project_settings: rmellipse.workflows._settings.ProjectSettings)

Bases: dict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

Initialize a Job object.

Parameters:
job_dictdict | io.FileIO | Path
project_settingsProjectSettings

Project settings object.

Raises:
TypeError

_description_

ValueError

_description_

TypeError

_description_

logfile
config
result
started = False
property thread
property name
property commands
run()
class rmellipse.workflows.workflowtree.WorkflowTree(d: dict = None)

Bases: dict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

Initialize self. See help(type(self)) for accurate signature.

property release

Release information about the workflow tree.

property nodes

Access all the nodes in the graph representation.

property jobs

All the job nodes.

property data_pointers

All the data_pointer nodes.

property edges

All the edges in the tree.

add_data_pointer(data_pointer: DataPointer)

Add a data pointer to the tree.

add_job(job: Job)

Add a job to the tree.

add_edge(node1: DataPointer | Job, node2: DataPointer | Job, attrs: dict = None)

Add an edge to the workflow tree.

Edges are always between a DataPointer and a Job.

Parameters:
node1DataPointer | Job

DataPointer or Job

node2DataPointer | Job

DataPointer or Job

attrsdict, optional

Dict of metadata, by default None

iter_data_pointer_paths()

Iterate over all the data_pointer paths.

Yields:
Path

Path to data_pointers

iter_topological_groups()

Iterate over groups in a topologial sorting of jobs and datasets.

Yields:
list

List of nodes in the next level of the topological sorting.

classmethod from_workflow_config(project_settings: rmellipse.workflows._settings.ProjectSettings, wf_config: rmellipse.workflows._settings.WorkflowConfig)
rmellipse.workflows.workflowtree.execute_concurrent_jobs(jobs: list[Job], level: int, max_threads: int = None)

Execute a set of concurrent jobs in wft.

Parameters:
jobslist[job]

List of jobs to execute concurrently.

group_namestr

Name of group, used for printing.

max_threadsint

Max number of threads (i.e. subprocesses) that can be spun up at a time.

Raises:
TypeError

If non-jobs are passed.

Exception

When a job fails.

rmellipse.workflows.workflowtree.validate_datapointers(datapointers: list[DataPointer], project_settings: rmellipse.workflows._settings.ProjectSettings, level: int)

Validates if datapointers exist within a project.

Parameters:
datapointerslist[DataPointer]

List of DataPointer objects.

project_settingsProjectSettings

Project settings

levelint

Execution level in the DAG

Raises:
FileNotFoundError

If a dataset is missing