HTGS  v2.0
The Hybrid Task Graph Scheduler
htgs::ExecutionPipeline< T, U > Class Template Reference

The ExecutionPipeline class is used to duplicate task graphs, such that each duplicate executes concurrently. More...

#include <htgs/api/ExecutionPipeline.hpp>

Inheritance diagram for htgs::ExecutionPipeline< T, U >:
Inheritance graph
Collaboration diagram for htgs::ExecutionPipeline< T, U >:
Collaboration graph

Public Member Functions

 ExecutionPipeline (size_t numPipelines, TaskGraphConf< T, U > *graph, std::string name="Execution Pipeline", bool waitForInit=true)
 Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times. More...
 
 ExecutionPipeline (size_t numPipelines, TaskGraphConf< T, U > *graph, std::shared_ptr< IRuleList< T, T >> rules, std::string name="Execution Pipeline", bool waitForInit=true)
 Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times. More...
 
 ~ExecutionPipeline ()
 Destructor that frees all memory allocated by the execution pipeline.
 
size_t getNumGraphsSpawned () override
 Gets the number of graphs spawned by this ITask. More...
 
void addInputRule (IRule< T, T > *rule)
 Adds an input rule, which can be used for domain decomposition. More...
 
void addInputRule (std::shared_ptr< IRule< T, T >> rule)
 Adds an input rule, which can be used for domain decomposition. More...
 
void initialize ()
 Initializes the execution pipeline and duplicates the task graph based on the number of pipelines. More...
 
void shutdown ()
 Shuts down the execution pipeline. More...
 
void shutdownParallel (int id)
 Waits for all runtimes in parallel to update overall runtime of each graph. More...
 
void executeTask (std::shared_ptr< T > data)
 Executes the execution pipeline task on data and forwards that data to the input rules. More...
 
std::string getName ()
 Gets the name for the execution pipeline. More...
 
ITask< T, U > * copy ()
 Makes a copy of the execution pipeline. More...
 
void debug ()
 Provides debugging output for the execution pipeline. More...
 
virtual void gatherProfileData (std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override
 Gathers profile data. More...
 
void printProfile () override
 Prints the profile data to std::out. More...
 
std::string genCustomDot (ProfileUtils *profileUtils, int colorFlag) override
 Virtual function to generate customized dot file. More...
 
virtual std::string genDotProducerEdgeToTask (std::map< std::shared_ptr< AnyConnector >, AnyITask *> &inputConnectorDotMap, int dotFlags) override
 
virtual std::string genDotProducerEdgeFromConnector (std::shared_ptr< AnyConnector > connector, int flags)
 
virtual std::string genDotConsumerEdgeFromConnector (std::shared_ptr< AnyConnector > connector, int flags) override
 
std::string getConsumerDotIds () override
 
std::string genDot (int flags, std::string dotId, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output) override
 Virtual function that generates the input/output and per-task dot notation. More...
 
- Public Member Functions inherited from htgs::ITask< T, U >
 ITask ()
 Creates an ITask with number of threads equal to 1.
 
 ITask (size_t numThreads)
 Constructs an ITask with a specified number of threads. More...
 
 ITask (size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
 Constructs an ITask with a specified number of threads as well as additional scheduling options. More...
 
virtual bool canTerminate (std::shared_ptr< AnyConnector > inputConnector) override
 Virtual function that is called when an ITask is checking if it can be terminated. More...
 
virtual void executeTaskFinal () override
 Virtual function that is called just before the task has shutdown. More...
 
virtual std::string getDotLabelName () override
 Virtual function to get the label name used for dot graph viz. More...
 
virtual std::string getDotShapeColor () override
 Gets the color of the shape for graphviz dot. More...
 
virtual std::string getDotFillColor () override
 Gets the color for filling the shape for graphviz dot. More...
 
virtual std::string getDotShape () override
 Gets the shape for graphviz dot. More...
 
virtual std::string getDotCustomProfile () override
 Adds the string text to the profiling of this task in the graphviz dot visualization. More...
 
ITask< T, U > * copyITask (bool deep) override
 Copies the ITask (including a copy of all memory edges) More...
 
void addResult (std::shared_ptr< U > result)
 Adds results to the output list to be sent to the next connected ITask in a TaskGraph. More...
 
void addResult (U *result)
 Adds results to the output list to be sent to the next connected ITask in a TaskGraph. More...
 
void initialize (size_t pipelineId, size_t numPipeline, TaskManager< T, U > *ownerTask)
 Function that is called when an ITask is being initialized by it's owner thread. More...
 
template<class V >
m_data_t< V > getMemory (std::string name, IMemoryReleaseRule *releaseRule)
 Retrieves memory from a memory edge. More...
 
template<class V >
m_data_t< V > getDynamicMemory (std::string name, IMemoryReleaseRule *releaseRule, size_t numElems)
 Retrieves memory from a memory edge. More...
 
template<class V >
void releaseMemory (m_data_t< V > memory)
 Releases memory onto a memory edge, which is transferred by the graph communicator. More...
 
void resetProfile ()
 Resets profile data.
 
size_t getThreadID ()
 Gets the thread ID associated with this task. More...
 
unsigned long long int getTaskComputeTime () const
 Gets the task's compute time. More...
 
std::string inTypeName () override final
 Gets the demangled input type name of the connector. More...
 
std::string outTypeName () override final
 Gets the demangled output type name of the connector. More...
 
std::string getAddress () override final
 Gets the address from the owner task, which is the address of the task graph. More...
 
void setTaskManager (TaskManager< T, U > *ownerTask)
 Sets the owner task manager for this ITask. More...
 
TaskManager< T, U > * getOwnerTaskManager ()
 Gets the owner task manager for this ITask. More...
 
- Public Member Functions inherited from htgs::AnyITask
 AnyITask ()
 Creates an ITask with number of threads equal to 1.
 
 AnyITask (size_t numThreads)
 Constructs an ITask with a specified number of threads. More...
 
 AnyITask (size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
 Constructs an ITask with a specified number of threads as well as additional scheduling options. More...
 
virtual ~AnyITask ()
 Destructor.
 
virtual std::string genDot (int flags, std::string dotId, std::shared_ptr< htgs::AnyConnector > input, std::shared_ptr< htgs::AnyConnector > output)
 Virtual function that generates the input/output and per-task dot notation. More...
 
virtual std::string getProducerDotIds ()
 
virtual std::string genDot (int flags, std::string dotId)
 Virtual function that adds additional dot attributes to this node. More...
 
virtual std::string debugDotNode ()
 Provides debug output for a node in the dot graph. More...
 
virtual void profile ()
 Virtual function that is called to provide profile output for the ITask. More...
 
virtual std::string profileStr ()
 Virtual function that is called after executionTask is called. More...
 
void initialize (size_t pipelineId, size_t numPipeline)
 Virtual function that is called when an ITask is being initialized by it's owner thread. More...
 
void setPipelineId (size_t pipelineId)
 Sets the pipeline Id for this ITask. More...
 
size_t getPipelineId ()
 Gets the pipeline ID. More...
 
void setNumPipelines (size_t numPipelines)
 Sets the number of pipelines that this ITask belongs too. More...
 
size_t getNumPipelines () const
 Sets the task graph communicator. More...
 
size_t getNumThreads () const
 Gets the number of threads associated with this ITask. More...
 
bool isStartTask () const
 Gets whether this ITask is a starting task. More...
 
bool isPoll () const
 Gets whether this ITask is polling for data or not. More...
 
size_t getMicroTimeoutTime () const
 Gets the timeout time for polling. More...
 
void copyMemoryEdges (AnyITask *iTaskCopy)
 Copies the memory edges from this AnyITask to another AnyITask. More...
 
std::string genDot (int flags, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output)
 Creates a dot notation representation for this task. More...
 
void profileITask ()
 Provides profile output for the ITask,. More...
 
std::string getDotId ()
 Gets the id used for dot nodes. More...
 
std::string getNameWithPipelineId ()
 Gets the name of the ITask with it's pipeline ID. More...
 
const std::shared_ptr< ConnectorMap > & getMemoryEdges () const
 Gets the memory edges for the task. More...
 
const std::shared_ptr< ConnectorMap > & getReleaseMemoryEdges () const
 Gets the memory edges for releasing memory for the memory manager, used to shutdown the memory manager. More...
 
bool hasMemoryEdge (std::string name)
 Checks whether this ITask contains a memory edge for a specified name. More...
 
void attachMemoryEdge (std::string name, std::shared_ptr< AnyConnector > getMemoryConnector, std::shared_ptr< AnyConnector > releaseMemoryConnector, MMType type)
 Attaches a memory edge to this ITask to get memory. More...
 
unsigned long long int getMemoryWaitTime () const
 Gets the amount of time the task was waiting for memory. More...
 
void incMemoryWaitTime (unsigned long long int val)
 Increments memory wait time. More...
 

Private Member Functions

std::string cleanupVisualization (TaskGraphConf< T, U > *graph, std::string str)
 Moves the output connector outside of the execution pipeline graphs to cleanup how the graph looks during graph visualization. More...
 

Private Attributes

size_t numPipelinesExec
 The number of pipelines that will spawn from the ExecutionPipeline.
 
Bookkeeper< T > * inputBk
 The input Bookkeeper for the ExecutionPipeline.
 
TaskGraphConf< T, U > * graph
 The graph that the ExecutionPipeline manages, duplicates, and executes.
 
std::shared_ptr< IRuleList< T, T > > inputRules
 The rules associated with the input Bookkeeper for decomposition.
 
std::vector< TaskGraphRuntime * > * runtimes
 The list of Runtimes that will execute the TaskGraphs (one for each duplicate TaskGraph)
 
std::vector< TaskGraphConf< T, U > * > * graphs
 The list of duplicate TaskGraphs.
 
bool waitForInit
 Flag whether to wait for initialization of sub-graphs to complete or not.
 
std::string name
 The name given to the execution pipeline task.
 

Detailed Description

template<class T, class U>
class htgs::ExecutionPipeline< T, U >

The ExecutionPipeline class is used to duplicate task graphs, such that each duplicate executes concurrently.

Each task within the task graph is an exact duplicate of the original's. The only difference is the pipelineId passed to each task. The pipelineId indicates which task graph the task belongs too. This value can be used as a rank to distribute computation and determine if additional functionality is needed to process data (such as copying data from one GPU to another).

An ExecutionPipeline can be used to execute across multiple GPUs on a single system. Any ICudaTask will automatically be bound to a separate GPU based on the CUContext array passed to the ICudaTask. The size of the CUContext array should match the number of execution pipelines specified. If data exists between two GPUs, then additional copying may be required, see ICudaTask for details and example usage for copying data between GPUs.

The execution pipeline can be used to distribute data among each task graph by adding a rule that uses the pipelineId parameter in an IRule. See addInputRule(IRule <T, T> *rule)

If you wish to share a rule with multiple execution pipelines or bookkeepers, you must wrap the rule into a std::shared_ptr prior to calling the addInputRule function.

Example usage:

... build subGraph
int numPipelines = 3;
execPipeline->addInputRule(new DecompRule());
PostProcessTask *taskOutsideExecPipeline = new PostProcessTask();
htgs::TaskGraph<MatrixData, VoidData> *mainGraph = new htgs::TaskGraph<MatrixData, VoidData>();
// Declares that the execPipeline connects to the input of the mainGraph
mainGraph->setGraphConsumerTask(execPipeline);
mainGraph->addEdge(execPipeline, taskOutsideExecPipeline);
while(hasDataToAdd)
{
// Add data to graph
mainGraph->produceData(data);
}
mainGraph->finishProducingData();
// Executes the main graph (spawns sub graphs for execPipeline)
runTime->executeAndWaitForRunTime();
Template Parameters
Tthe input type for the ExecutionPipeline ITask, must derive from IData.
Uthe output type for the ExecutionPipeline ITask, must derive from IData.

Constructor & Destructor Documentation

◆ ExecutionPipeline() [1/2]

template<class T, class U>
htgs::ExecutionPipeline< T, U >::ExecutionPipeline ( size_t  numPipelines,
TaskGraphConf< T, U > *  graph,
std::string  name = "Execution Pipeline",
bool  waitForInit = true 
)
inline

Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times.

Parameters
numPipelinesthe number of times to duplicate the graph
graphthe graph that the execution pipeline manages
namethe name of the execution pipeline
waitForInitIndicates that the execution pipeline will wait for its underlying graphs to initialize prior to marking that it is initialized.

◆ ExecutionPipeline() [2/2]

template<class T, class U>
htgs::ExecutionPipeline< T, U >::ExecutionPipeline ( size_t  numPipelines,
TaskGraphConf< T, U > *  graph,
std::shared_ptr< IRuleList< T, T >>  rules,
std::string  name = "Execution Pipeline",
bool  waitForInit = true 
)
inline

Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times.

Parameters
numPipelinesthe number of times to duplicate the graph
graphthe graph that the execution pipeline manages
rulesthe list of decomposition rules that will be used for this pipeline
namethe name of the execution pipeline
waitForInitIndicates that the execution pipeline will wait for its underlying graphs to initialize prior to marking that it is initialized.

Member Function Documentation

◆ addInputRule() [1/2]

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::addInputRule ( IRule< T, T > *  rule)
inline

Adds an input rule, which can be used for domain decomposition.

This rule should use the pipelineId parameter in the IRule::applyRule function to aid in distributing data to the appropriate execution pipeline TaskGraph

Parameters
rulethe rule to handle distributing data between pipelines

◆ addInputRule() [2/2]

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::addInputRule ( std::shared_ptr< IRule< T, T >>  rule)
inline

Adds an input rule, which can be used for domain decomposition.

This rule should use the pipelineId parameter in the IRule::applyRule function to aid in distributing data to the appropriate execution pipeline TaskGraph This variant should be used if the rule is intended to be shared with multiple bookkeepers or other execution pipelines.

Parameters
rulethe rule as a shared_ptr to handle distributing data between pipelines

◆ cleanupVisualization()

template<class T, class U>
std::string htgs::ExecutionPipeline< T, U >::cleanupVisualization ( TaskGraphConf< T, U > *  graph,
std::string  str 
)
inlineprivate

Moves the output connector outside of the execution pipeline graphs to cleanup how the graph looks during graph visualization.

Parameters
graphthe graph
strthe dot file string to be cleaned up
Returns
the improved dot file text

◆ copy()

template<class T, class U>
ITask<T, U>* htgs::ExecutionPipeline< T, U >::copy ( )
inlinevirtual

Makes a copy of the execution pipeline.

Copies the graph and reuses the same input decomposition rules

Returns
the copied execution pipeline task
Note
This function should only be called by the HTGS API

Implements htgs::ITask< T, U >.

◆ debug()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::debug ( )
inlinevirtual

Provides debugging output for the execution pipeline.

Note
#define DEBUG_FLAG to enable debugging

Reimplemented from htgs::AnyITask.

◆ executeTask()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::executeTask ( std::shared_ptr< T >  data)
inlinevirtual

Executes the execution pipeline task on data and forwards that data to the input rules.

The input rules should parse the data and determine the correct pipelineId to forward the data to.

Parameters
datathe data to be forwarded to the proper execution pipeline
Note
This function should only be called by the HTGS API

Implements htgs::ITask< T, U >.

◆ gatherProfileData()

template<class T, class U>
virtual void htgs::ExecutionPipeline< T, U >::gatherProfileData ( std::map< AnyTaskManager *, TaskManagerProfile *> *  taskManagerProfiles)
inlineoverridevirtual

Gathers profile data.

Parameters
taskManagerProfilesthe mapping between the task manager and the profile data for the task manager.

Reimplemented from htgs::ITask< T, U >.

◆ genCustomDot()

template<class T, class U>
std::string htgs::ExecutionPipeline< T, U >::genCustomDot ( ProfileUtils profileUtils,
int  colorFlag 
)
inlineoverridevirtual

Virtual function to generate customized dot file.

Parameters
profilerthe profiler that contains color maps
Returns
the dot file text that will be added near the end of the dot file

Reimplemented from htgs::AnyITask.

◆ genDot()

template<class T, class U>
std::string htgs::ExecutionPipeline< T, U >::genDot ( int  flags,
std::string  dotId,
std::shared_ptr< AnyConnector input,
std::shared_ptr< AnyConnector output 
)
inlineoverride

Virtual function that generates the input/output and per-task dot notation.

Parameters
flagsthe DOTGEN flags
dotIdthe id for this task
inputthe input connector for this task
outputthe output connector for this task
Returns
the dot that represents the interaction between the input/output and the internal custom dot notation
Note
This function will generate the dot notation for all sub-graphs within the execution pipeline.
If the dot notation is generated prior to execution, then a virtual pipeline is created. Generating the dot notation after execution will show the actual sub-graphs.

◆ getName()

template<class T, class U>
std::string htgs::ExecutionPipeline< T, U >::getName ( )
inlinevirtual

Gets the name for the execution pipeline.

Returns
the name of the execution pipeline

Reimplemented from htgs::ITask< T, U >.

◆ getNumGraphsSpawned()

template<class T, class U>
size_t htgs::ExecutionPipeline< T, U >::getNumGraphsSpawned ( )
inlineoverridevirtual

Gets the number of graphs spawned by this ITask.

Returns
the number of graphs spawned

Reimplemented from htgs::ITask< T, U >.

◆ initialize()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::initialize ( )
inlinevirtual

Initializes the execution pipeline and duplicates the task graph based on the number of pipelines.

If wait for initialization is set to true, then this function will only return once all threads from all sub-graphs have been spawned and initialized.

Note
This function should only be called by the HTGS API

Reimplemented from htgs::ITask< T, U >.

◆ printProfile()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::printProfile ( )
inlineoverridevirtual

Prints the profile data to std::out.

Reimplemented from htgs::ITask< T, U >.

◆ shutdown()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::shutdown ( )
inlinevirtual

Shuts down the execution pipeline.

Note
This function should only be called by the HTGS API

Reimplemented from htgs::ITask< T, U >.

◆ shutdownParallel()

template<class T, class U>
void htgs::ExecutionPipeline< T, U >::shutdownParallel ( int  id)
inline

Waits for all runtimes in parallel to update overall runtime of each graph.

Parameters
idthe thread ID

The documentation for this class was generated from the following file: