rmellipse.workflows.workflowtree¶
Classes¶
Represents an edge in a job tree |
|
dict() -> new empty dictionary |
|
dict() -> new empty dictionary |
Functions¶
|
Execute a set of concurrent jobs in wft. |
|
Validates if datapointers exist within a project. |
Module Contents¶
- class rmellipse.workflows.workflowtree.DataPointer(o: dict | pathlib.Path)¶
Bases:
dictRepresents an edge in a job tree
Initialize self. See help(type(self)) for accurate signature.
- property name¶
- property path¶
- class rmellipse.workflows.workflowtree.Job(job_dict: dict, project_settings: rmellipse.workflows._settings.ProjectSettings)¶
Bases:
dictdict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s
(key, value) pairs
- dict(iterable) -> new dictionary initialized as if via:
d = {} for k, v in iterable:
d[k] = v
- dict(**kwargs) -> new dictionary initialized with the name=value pairs
in the keyword argument list. For example: dict(one=1, two=2)
Initialize a Job object.
- Parameters:
- job_dictdict | io.FileIO | Path
- project_settingsProjectSettings
Project settings object.
- Raises:
- TypeError
_description_
- ValueError
_description_
- TypeError
_description_
- logfile¶
- config¶
- result¶
- started = False¶
- property thread¶
- property name¶
- property commands¶
- run()¶
- class rmellipse.workflows.workflowtree.WorkflowTree(d: dict = None)¶
Bases:
dictdict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s
(key, value) pairs
- dict(iterable) -> new dictionary initialized as if via:
d = {} for k, v in iterable:
d[k] = v
- dict(**kwargs) -> new dictionary initialized with the name=value pairs
in the keyword argument list. For example: dict(one=1, two=2)
Initialize self. See help(type(self)) for accurate signature.
- property release¶
Release information about the workflow tree.
- property nodes¶
Access all the nodes in the graph representation.
- property jobs¶
All the job nodes.
- property data_pointers¶
All the data_pointer nodes.
- property edges¶
All the edges in the tree.
- add_data_pointer(data_pointer: DataPointer)¶
Add a data pointer to the tree.
- add_edge(node1: DataPointer | Job, node2: DataPointer | Job, attrs: dict = None)¶
Add an edge to the workflow tree.
Edges are always between a DataPointer and a Job.
- Parameters:
- node1DataPointer | Job
DataPointer or Job
- node2DataPointer | Job
DataPointer or Job
- attrsdict, optional
Dict of metadata, by default None
- iter_data_pointer_paths()¶
Iterate over all the data_pointer paths.
- Yields:
- Path
Path to data_pointers
- iter_topological_groups()¶
Iterate over groups in a topologial sorting of jobs and datasets.
- Yields:
- list
List of nodes in the next level of the topological sorting.
- classmethod from_workflow_config(project_settings: rmellipse.workflows._settings.ProjectSettings, wf_config: rmellipse.workflows._settings.WorkflowConfig)¶
- rmellipse.workflows.workflowtree.execute_concurrent_jobs(jobs: list[Job], level: int, max_threads: int = None)¶
Execute a set of concurrent jobs in wft.
- Parameters:
- jobslist[job]
List of jobs to execute concurrently.
- group_namestr
Name of group, used for printing.
- max_threadsint
Max number of threads (i.e. subprocesses) that can be spun up at a time.
- Raises:
- TypeError
If non-jobs are passed.
- Exception
When a job fails.
- rmellipse.workflows.workflowtree.validate_datapointers(datapointers: list[DataPointer], project_settings: rmellipse.workflows._settings.ProjectSettings, level: int)¶
Validates if datapointers exist within a project.
- Parameters:
- datapointerslist[DataPointer]
List of DataPointer objects.
- project_settingsProjectSettings
Project settings
- levelint
Execution level in the DAG
- Raises:
- FileNotFoundError
If a dataset is missing