rmellipse.workflows.workflowtree ================================ .. py:module:: rmellipse.workflows.workflowtree Classes ------- .. autoapisummary:: rmellipse.workflows.workflowtree.DataPointer rmellipse.workflows.workflowtree.Job rmellipse.workflows.workflowtree.WorkflowTree Functions --------- .. autoapisummary:: rmellipse.workflows.workflowtree.execute_concurrent_jobs rmellipse.workflows.workflowtree.validate_datapointers Module Contents --------------- .. py:class:: DataPointer(o: dict | pathlib.Path) Bases: :py:obj:`dict` Represents an edge in a job tree Initialize self. See help(type(self)) for accurate signature. .. !! processed by numpydoc !! .. py:property:: name .. py:property:: path .. py:class:: Job(job_dict: dict, project_settings: rmellipse.workflows._settings.ProjectSettings) Bases: :py:obj:`dict` dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2) Initialize a Job object. :Parameters: **job_dict** : dict | io.FileIO | Path .. **project_settings** : ProjectSettings Project settings object. :Raises: TypeError _description_ ValueError _description_ TypeError _description_ .. !! processed by numpydoc !! .. py:attribute:: logfile .. py:attribute:: config .. py:attribute:: result .. py:attribute:: started :value: False .. py:property:: thread .. py:property:: name .. py:property:: commands .. py:method:: run() .. py:class:: WorkflowTree(d: dict = None) Bases: :py:obj:`dict` dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2) Initialize self. See help(type(self)) for accurate signature. .. !! processed by numpydoc !! .. py:property:: release Release information about the workflow tree. .. !! processed by numpydoc !! .. py:property:: nodes Access all the nodes in the graph representation. .. !! processed by numpydoc !! .. py:property:: jobs All the job nodes. .. !! processed by numpydoc !! .. py:property:: data_pointers All the data_pointer nodes. .. !! processed by numpydoc !! .. py:property:: edges All the edges in the tree. .. !! processed by numpydoc !! .. py:method:: add_data_pointer(data_pointer: DataPointer) Add a data pointer to the tree. .. !! processed by numpydoc !! .. py:method:: add_job(job: Job) Add a job to the tree. .. !! processed by numpydoc !! .. py:method:: add_edge(node1: DataPointer | Job, node2: DataPointer | Job, attrs: dict = None) Add an edge to the workflow tree. Edges are always between a DataPointer and a Job. :Parameters: **node1** : DataPointer | Job DataPointer or Job **node2** : DataPointer | Job DataPointer or Job **attrs** : dict, optional Dict of metadata, by default None .. !! processed by numpydoc !! .. py:method:: iter_data_pointer_paths() Iterate over all the data_pointer paths. :Yields: Path Path to data_pointers .. !! processed by numpydoc !! .. py:method:: iter_topological_groups() Iterate over groups in a topologial sorting of jobs and datasets. :Yields: list List of nodes in the next level of the topological sorting. .. !! processed by numpydoc !! .. py:method:: from_workflow_config(project_settings: rmellipse.workflows._settings.ProjectSettings, wf_config: rmellipse.workflows._settings.WorkflowConfig) :classmethod: .. py:function:: execute_concurrent_jobs(jobs: list[Job], level: int, max_threads: int = None) Execute a set of concurrent jobs in wft. :Parameters: **jobs** : list[job] List of jobs to execute concurrently. **group_name** : str Name of group, used for printing. **max_threads** : int Max number of threads (i.e. subprocesses) that can be spun up at a time. :Raises: TypeError If non-jobs are passed. Exception When a job fails. .. !! processed by numpydoc !! .. py:function:: validate_datapointers(datapointers: list[DataPointer], project_settings: rmellipse.workflows._settings.ProjectSettings, level: int) Validates if datapointers exist within a project. :Parameters: **datapointers** : list[DataPointer] List of DataPointer objects. **project_settings** : ProjectSettings Project settings **level** : int Execution level in the DAG :Raises: FileNotFoundError If a dataset is missing .. !! processed by numpydoc !!