HTGS  v2.0
The Hybrid Task Graph Scheduler
htgs::ITask< T, U > Class Template Referenceabstract

An interface to process input data and forward results within a TaskGraph. More...

#include <htgs/api/ITask.hpp>

Inheritance diagram for htgs::ITask< T, U >:
Inheritance graph
Collaboration diagram for htgs::ITask< T, U >:
Collaboration graph

Public Member Functions

 ITask ()
 Creates an ITask with number of threads equal to 1.
 
 ITask (size_t numThreads)
 Constructs an ITask with a specified number of threads. More...
 
 ITask (size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
 Constructs an ITask with a specified number of threads as well as additional scheduling options. More...
 
virtual void initialize () override
 Virtual function that is called when an ITask is being initialized by it's owner thread.
 
virtual void executeTask (std::shared_ptr< T > data)=0
 Pure virtual function that is called when an ITask's thread is to execute on data. More...
 
virtual bool canTerminate (std::shared_ptr< AnyConnector > inputConnector) override
 Virtual function that is called when an ITask is checking if it can be terminated. More...
 
virtual void shutdown () override
 Virtual function that is called when an ITask is being shutdown by it's owner thread. More...
 
virtual void executeTaskFinal () override
 Virtual function that is called just before the task has shutdown. More...
 
virtual std::string getName () override
 Virtual function to get the name of an ITask. More...
 
virtual std::string getDotLabelName () override
 Virtual function to get the label name used for dot graph viz. More...
 
virtual std::string getDotShapeColor () override
 Gets the color of the shape for graphviz dot. More...
 
virtual std::string getDotFillColor () override
 Gets the color for filling the shape for graphviz dot. More...
 
virtual std::string getDotShape () override
 Gets the shape for graphviz dot. More...
 
virtual std::string getDotCustomProfile () override
 Adds the string text to the profiling of this task in the graphviz dot visualization. More...
 
virtual void printProfile () override
 Prints the profile data to std::out. More...
 
virtual ITask< T, U > * copy ()=0
 Pure virtual function to copy an ITask. More...
 
virtual size_t getNumGraphsSpawned ()
 Gets the number of graphs spawned by this ITask. More...
 
virtual std::string genDotProducerEdgeToTask (std::map< std::shared_ptr< AnyConnector >, AnyITask *> &inputConnectorDotMap, int dotFlags) override
 
virtual std::string genDotConsumerEdgeFromConnector (std::shared_ptr< AnyConnector > connector, int flags) override
 
virtual std::string genDotProducerEdgeFromConnector (std::shared_ptr< AnyConnector > connector, int flags)
 
ITask< T, U > * copyITask (bool deep) override
 Copies the ITask (including a copy of all memory edges) More...
 
void addResult (std::shared_ptr< U > result)
 Adds results to the output list to be sent to the next connected ITask in a TaskGraph. More...
 
void addResult (U *result)
 Adds results to the output list to be sent to the next connected ITask in a TaskGraph. More...
 
void initialize (size_t pipelineId, size_t numPipeline, TaskManager< T, U > *ownerTask)
 Function that is called when an ITask is being initialized by it's owner thread. More...
 
template<class V >
m_data_t< V > getMemory (std::string name, IMemoryReleaseRule *releaseRule)
 Retrieves memory from a memory edge. More...
 
template<class V >
m_data_t< V > getDynamicMemory (std::string name, IMemoryReleaseRule *releaseRule, size_t numElems)
 Retrieves memory from a memory edge. More...
 
template<class V >
void releaseMemory (m_data_t< V > memory)
 Releases memory onto a memory edge, which is transferred by the graph communicator. More...
 
void resetProfile ()
 Resets profile data.
 
size_t getThreadID ()
 Gets the thread ID associated with this task. More...
 
unsigned long long int getTaskComputeTime () const
 Gets the task's compute time. More...
 
std::string inTypeName () override final
 Gets the demangled input type name of the connector. More...
 
std::string outTypeName () override final
 Gets the demangled output type name of the connector. More...
 
std::string getAddress () override final
 Gets the address from the owner task, which is the address of the task graph. More...
 
void setTaskManager (TaskManager< T, U > *ownerTask)
 Sets the owner task manager for this ITask. More...
 
TaskManager< T, U > * getOwnerTaskManager ()
 Gets the owner task manager for this ITask. More...
 
virtual void gatherProfileData (std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles)
 Gathers profile data. More...
 
- Public Member Functions inherited from htgs::AnyITask
 AnyITask ()
 Creates an ITask with number of threads equal to 1.
 
 AnyITask (size_t numThreads)
 Constructs an ITask with a specified number of threads. More...
 
 AnyITask (size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
 Constructs an ITask with a specified number of threads as well as additional scheduling options. More...
 
virtual ~AnyITask ()
 Destructor.
 
virtual std::string genDot (int flags, std::string dotId, std::shared_ptr< htgs::AnyConnector > input, std::shared_ptr< htgs::AnyConnector > output)
 Virtual function that generates the input/output and per-task dot notation. More...
 
virtual std::string getConsumerDotIds ()
 
virtual std::string getProducerDotIds ()
 
virtual std::string genDot (int flags, std::string dotId)
 Virtual function that adds additional dot attributes to this node. More...
 
virtual std::string genCustomDot (ProfileUtils *profileUtils, int colorFlag)
 Virtual function to generate customized dot file. More...
 
virtual void debug ()
 Virtual function that is called to debug the ITask.
 
virtual std::string debugDotNode ()
 Provides debug output for a node in the dot graph. More...
 
virtual void profile ()
 Virtual function that is called to provide profile output for the ITask. More...
 
virtual std::string profileStr ()
 Virtual function that is called after executionTask is called. More...
 
void initialize (size_t pipelineId, size_t numPipeline)
 Virtual function that is called when an ITask is being initialized by it's owner thread. More...
 
void setPipelineId (size_t pipelineId)
 Sets the pipeline Id for this ITask. More...
 
size_t getPipelineId ()
 Gets the pipeline ID. More...
 
void setNumPipelines (size_t numPipelines)
 Sets the number of pipelines that this ITask belongs too. More...
 
size_t getNumPipelines () const
 Sets the task graph communicator. More...
 
size_t getNumThreads () const
 Gets the number of threads associated with this ITask. More...
 
bool isStartTask () const
 Gets whether this ITask is a starting task. More...
 
bool isPoll () const
 Gets whether this ITask is polling for data or not. More...
 
size_t getMicroTimeoutTime () const
 Gets the timeout time for polling. More...
 
void copyMemoryEdges (AnyITask *iTaskCopy)
 Copies the memory edges from this AnyITask to another AnyITask. More...
 
std::string genDot (int flags, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output)
 Creates a dot notation representation for this task. More...
 
void profileITask ()
 Provides profile output for the ITask,. More...
 
std::string getDotId ()
 Gets the id used for dot nodes. More...
 
std::string getNameWithPipelineId ()
 Gets the name of the ITask with it's pipeline ID. More...
 
const std::shared_ptr< ConnectorMap > & getMemoryEdges () const
 Gets the memory edges for the task. More...
 
const std::shared_ptr< ConnectorMap > & getReleaseMemoryEdges () const
 Gets the memory edges for releasing memory for the memory manager, used to shutdown the memory manager. More...
 
bool hasMemoryEdge (std::string name)
 Checks whether this ITask contains a memory edge for a specified name. More...
 
void attachMemoryEdge (std::string name, std::shared_ptr< AnyConnector > getMemoryConnector, std::shared_ptr< AnyConnector > releaseMemoryConnector, MMType type)
 Attaches a memory edge to this ITask to get memory. More...
 
unsigned long long int getMemoryWaitTime () const
 Gets the amount of time the task was waiting for memory. More...
 
void incMemoryWaitTime (unsigned long long int val)
 Increments memory wait time. More...
 

Private Attributes

TaskManager< T, U > * ownerTask
 The owner task for this ITask.
 

Detailed Description

template<class T, class U>
class htgs::ITask< T, U >

An interface to process input data and forward results within a TaskGraph.

To use the ITask, a new class inherits ITask and defines the pure virtual functions. The ITask is then connected into a TaskGraphConf, which is bound to a TaskGraphRuntime. The ITask contains metadata that describes threading and scheduling rules. Using this metadata, the TaskGraphRuntime spawns threads into a pool for the ITask. Each thread is bound to a separate instance of the ITask, which is generated through the copy function.

The purpose of this interface is to provide the functions necessary to represent computational and logical operations for algorithms, which are added to a TaskGraphConf. Custom behavior for an ITask can be implemented, as demonstrated with other classes that derive from ITask; i.e., Bookkeeper, ExecutionPipeline, and ICudaTask.

An ITask should represent some component of an algorithm, such that multiple threads can concurrently process and stream data through a TaskGraphConf. The main pieces that impact the performance are: (1) Memory (reuse/capacity/locality), (2) Data dependencies, and (3) Computational complexity.

There are two methods for handling memory.

The first type of memory for an ITask is local memory. This type should be allocated in the initialize() function and freed in the shutdown() function. It is duplicated (one per thread) and should be local to that thread only.

The second type of memory is shared memory, which can be used by other tasks in a TaskGraphConf. One ITask is responsible for getting memory, while any other ITask or the main thread is responsible for releasing that memory. The memory is managed by an external MemoryManager, which allocates the memory and frees the memory once the TaskGraphConf is finished. Use the TaskGraph::addMemoryManagerEdge to attach a task to get memory. The memory that is acquired should be incorporated into the output data of the ITask and forwarded until it is released by some other task. The release memory operation should exist within the same task graph.

An ITask can get and release memory using the AnyITask::getMemory and AnyITask::releaseMemory routines, respectively. If there are cases where the ITask getting memory may not have a memory edge, then that task can use the AnyITask::hasMemoryEdge routine to verify if the edge exists.

If there are multiple computational ITasks within a TaskGraph, the number of threads processing each ITask should be determined based on the workload of each ITask with the aim of reducing the wait period for every ITask (if possible). By doing so, the overall compute time of a TaskGraph can be evenly distributed. The number of threads in use for computation should not exceed the number of logical cores on a system.

Example Implementation:

class ReadTask : public htgs::ITask<Data1, Data2>
{
public:
ReadTask(int numThreads, long memorySize) : ITask(numThreads), memorySize(memorySize) {}
virtual void initialize(int pipelineId, int numPipelines)
{
// This memory will be allocated multiple times (one for each thread that is bound to the ReadTask ITask)
reusableMemory = new double[memorySize];
}
virtual void shutdown() { delete [] reusableMemory; }
virtual void executeTask(std::shared_ptr<Data1> data)
{
...
// get shared memory
htgs::m_data_t<int> readBuffer = this->memGet<int>("readMemory", new ReleaseCountRule(1));
readData(data->getFile(), readBuffer->get());
// Shared memory release example
this->releaseMemory(data->getMemory());
// Add memory to output edge
addResult(new Data2(readBuffer));
...
}
virtual std::string getName() { return "ReadTask"; }
virtual htgs::ITask<Data1, Data2> *copy() { return new ReadTask(this->getNumThreads(), memorySize); }
// Optional. Default will check inputConnector->isInputTerminated();
virtual bool isTerminated(std::shared_ptr<htgs::BaseConnector> inputConnector) { return inputConnector->isInputTerminated(); }
private:
// reusableMemory is not used in computation, but shows an example of local memory allocation
double *reusableMemory;
long memorySize;
}

Usage Example:

int numThreadsPreprocess = 1;
int numThreadsRead = 2;
int numThreadsMultiply = 10;
PreProcessTask *preProcTask = new PreProcessTask(numThreadsPreprocess);
ReadTask *readTask = new ReadTask(numThreadsRead);
MultiplyTask *mulTask = new MultiplyTask(numThreadsMultiply);
// Add tasks to task graph (each task must be added before using addMemoryManagerEdge)
taskGraph->setGraphConsumerTask(preProcTask);
taskGraph->addEdge(preProcTask, readTask);
taskGraph->addEdge(readTask, mulTask);
taskGraph->addGraphProducerTask(mulTask);
// Add memory edges. The types for the allocator must match the type specified when an ITask uses getMemory
// Memory pool size is specified based on algorithm scheduling and memory release rules.
int otherMemoryPoolSize = 100;
int readMemoryPoolSize = 200;
// Creates the memory edge "otherMemory" with preProcTask as the getter, and readTask as the releaser
taskGraph->addMemoryManagerEdge("otherMemory", preProcTask, new OtherMemoryAllocator(), otherMemoryPoolSize);
// Creates the memory edge "readMemory" with readTask as the getter, and mulTask as the releaser
taskGraph->addMemoryManagerEdge("readMemory", readTask, new ReadMemoryAllocator(), readMemoryPoolSize);
htgs::TaskGraphRuntime *executeGraph = new htgs::TaskGraphRuntime(taskGraph);
// Launches threads and binds them to ITasks
executeGraph->executeAndWaitForRuntime();
Template Parameters
Tthe input data type for the ITask, T must derive from IData.
Uthe output data type for the ITask, U must derive from IData.

Constructor & Destructor Documentation

◆ ITask() [1/2]

template<class T, class U>
htgs::ITask< T, U >::ITask ( size_t  numThreads)
inline

Constructs an ITask with a specified number of threads.

Parameters
numThreadsthe number of threads associated with this ITask

◆ ITask() [2/2]

template<class T, class U>
htgs::ITask< T, U >::ITask ( size_t  numThreads,
bool  isStartTask,
bool  poll,
size_t  microTimeoutTime 
)
inline

Constructs an ITask with a specified number of threads as well as additional scheduling options.

Parameters
numThreadsthe number of threads associated with this ITask
isStartTaskwhether this ITask starts executing immediately and passes nullptr to executeTask()
pollwhether this task will poll for data, if the timeout period expires, nullptr is passed to executeTask()
microTimeoutTimethe timeout period for checking for data
Note
If the ITask is declared as a start task or is polling, then executeTask() should properly handle nullptr data

Member Function Documentation

◆ addResult() [1/2]

template<class T, class U>
void htgs::ITask< T, U >::addResult ( std::shared_ptr< U >  result)
inline

Adds results to the output list to be sent to the next connected ITask in a TaskGraph.

Parameters
resultthe result data to be passed

◆ addResult() [2/2]

template<class T, class U>
void htgs::ITask< T, U >::addResult ( U *  result)
inline

Adds results to the output list to be sent to the next connected ITask in a TaskGraph.

The result pointer will be wrapped into a shared smart pointer and then placed in the output list.

Parameters
resultthe result data to be passed

◆ canTerminate()

template<class T, class U>
virtual bool htgs::ITask< T, U >::canTerminate ( std::shared_ptr< AnyConnector inputConnector)
inlineoverridevirtual

Virtual function that is called when an ITask is checking if it can be terminated.

Parameters
inputConnectorthe connector responsible for giving data to this Task
Returns
whether the ITask can be terminated or not
Return values
TRUEif the ITask is ready to be terminated
FALSEif the ITask is not ready to be terminated
Note
By default this function checks if the input no longer sending data using inputConnector->isInputTerminated()

Implements htgs::AnyITask.

Reimplemented in htgs::TGTask< T, U >, and htgs::Bookkeeper< T >.

◆ copy()

template<class T, class U>
virtual ITask<T, U>* htgs::ITask< T, U >::copy ( )
pure virtual

◆ copyITask()

template<class T, class U>
ITask<T, U>* htgs::ITask< T, U >::copyITask ( bool  deep)
inlineoverridevirtual

Copies the ITask (including a copy of all memory edges)

Parameters
deepwhether to do a deep copy and copy the memory managers as well
Returns
the copy of the ITask

Implements htgs::AnyITask.

◆ executeTask()

template<class T, class U>
virtual void htgs::ITask< T, U >::executeTask ( std::shared_ptr< T >  data)
pure virtual

Pure virtual function that is called when an ITask's thread is to execute on data.

Parameters
datathe data to be executed
Note
To send output data use addResult()
If the ITask is a start task or is polling, data might be nullptr

Implemented in htgs::ExecutionPipeline< T, U >, htgs::ICudaTask< T, U >, htgs::TGTask< T, U >, and htgs::Bookkeeper< T >.

◆ executeTaskFinal()

template<class T, class U>
virtual void htgs::ITask< T, U >::executeTaskFinal ( )
inlineoverridevirtual

Virtual function that is called just before the task has shutdown.

This is the last call for executeTask. This is only called by the very last thread that is bound for this task.

Example: If you have multiple threads bound to a task, then this function is only called once for the last thread

Implements htgs::AnyITask.

◆ gatherProfileData()

template<class T, class U>
virtual void htgs::ITask< T, U >::gatherProfileData ( std::map< AnyTaskManager *, TaskManagerProfile *> *  taskManagerProfiles)
inlinevirtual

Gathers profile data.

Parameters
taskManagerProfilesthe mapping between the task manager and the profile data for the task manager.

Reimplemented in htgs::ExecutionPipeline< T, U >, and htgs::TGTask< T, U >.

◆ getAddress()

template<class T, class U>
std::string htgs::ITask< T, U >::getAddress ( )
inlinefinaloverridevirtual

Gets the address from the owner task, which is the address of the task graph.

Returns
the address

Implements htgs::AnyITask.

◆ getDotCustomProfile()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getDotCustomProfile ( )
inlineoverridevirtual

Adds the string text to the profiling of this task in the graphviz dot visualization.

Returns
the extra content to be added to the task during visualization.

Implements htgs::AnyITask.

◆ getDotFillColor()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getDotFillColor ( )
inlineoverridevirtual

Gets the color for filling the shape for graphviz dot.

Returns
the fill color

Implements htgs::AnyITask.

Reimplemented in htgs::MemoryManager< T >, and htgs::ICudaTask< T, U >.

◆ getDotLabelName()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getDotLabelName ( )
inlineoverridevirtual

Virtual function to get the label name used for dot graph viz.

Returns
the label name used for graphviz

Implements htgs::AnyITask.

Reimplemented in htgs::Bookkeeper< T >.

◆ getDotShape()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getDotShape ( )
inlineoverridevirtual

Gets the shape for graphviz dot.

Returns
the shape

Implements htgs::AnyITask.

◆ getDotShapeColor()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getDotShapeColor ( )
inlineoverridevirtual

Gets the color of the shape for graphviz dot.

Returns
the shape color

Implements htgs::AnyITask.

◆ getDynamicMemory()

template<class T, class U>
template<class V >
m_data_t<V> htgs::ITask< T, U >::getDynamicMemory ( std::string  name,
IMemoryReleaseRule releaseRule,
size_t  numElems 
)
inline

Retrieves memory from a memory edge.

Parameters
namethe name of the memory edge
releaseRulethe release rule to be associated with the newly acquired memory
numElemsthe number of elements to allocate (uses internal allocator defined from the memory edge)
Returns
the MemoryData
Template Parameters
Vthe MemoryData type
Note
The name specified must have been attached to this ITask as a memGetter using the TaskGraph::addMemoryManagerEdge routine, which can be verified using hasMemGetter()
This function will block if no memory is available, ensure the memory pool size is sufficient based on memory release rules and data flow.
Memory edge must be defined as MMType::Dynamic

◆ getMemory()

template<class T, class U>
template<class V >
m_data_t<V> htgs::ITask< T, U >::getMemory ( std::string  name,
IMemoryReleaseRule releaseRule 
)
inline

Retrieves memory from a memory edge.

Parameters
namethe name of the memory edge
releaseRulethe release rule to be associated with the newly acquired memory
Returns
the MemoryData
Template Parameters
Vthe MemoryData type
Note
The name specified must have been attached to this ITask as a memGetter using the TaskGraph::addMemoryManagerEdge routine, which can be verified using hasMemGetter()
This function will block if no memory is available, ensure the memory pool size is sufficient based on memory release rules and data flow.
Memory edge must be defined as MMType::Static

◆ getName()

template<class T, class U>
virtual std::string htgs::ITask< T, U >::getName ( )
inlineoverridevirtual

◆ getNumGraphsSpawned()

template<class T, class U>
virtual size_t htgs::ITask< T, U >::getNumGraphsSpawned ( )
inlinevirtual

Gets the number of graphs spawned by this ITask.

Returns
the number of graphs spawned

Reimplemented in htgs::ExecutionPipeline< T, U >, and htgs::TGTask< T, U >.

◆ getOwnerTaskManager()

template<class T, class U>
TaskManager<T, U>* htgs::ITask< T, U >::getOwnerTaskManager ( )
inline

Gets the owner task manager for this ITask.

Returns
the owner task manager

◆ getTaskComputeTime()

template<class T, class U>
unsigned long long int htgs::ITask< T, U >::getTaskComputeTime ( ) const
inline

Gets the task's compute time.

Returns
the compute time in microseconds.

◆ getThreadID()

template<class T, class U>
size_t htgs::ITask< T, U >::getThreadID ( )
inline

Gets the thread ID associated with this task.

Returns
the thread ID

◆ initialize()

template<class T, class U>
void htgs::ITask< T, U >::initialize ( size_t  pipelineId,
size_t  numPipeline,
TaskManager< T, U > *  ownerTask 
)
inline

Function that is called when an ITask is being initialized by it's owner thread.

This initialize function contains the TaskManager associated with the ITask.

Parameters
pipelineIdthe pipelineId, only used if the ITask is inside of an ExecutionPipeline
numPipelinethe number of pipelines, only used if the ITask is inside of an ExecutionPipeline
ownerTaskthe owner Task for this ITask ICudaTask's in an execution pipeline

◆ inTypeName()

template<class T, class U>
std::string htgs::ITask< T, U >::inTypeName ( )
inlinefinaloverridevirtual

Gets the demangled input type name of the connector.

Returns
the demangled type name for the input

Implements htgs::AnyITask.

◆ outTypeName()

template<class T, class U>
std::string htgs::ITask< T, U >::outTypeName ( )
inlinefinaloverridevirtual

Gets the demangled output type name of the connector.

Returns
the demangled output type name for the input

Implements htgs::AnyITask.

◆ printProfile()

template<class T, class U>
virtual void htgs::ITask< T, U >::printProfile ( )
inlineoverridevirtual

Prints the profile data to std::out.

Implements htgs::AnyITask.

Reimplemented in htgs::ExecutionPipeline< T, U >, and htgs::TGTask< T, U >.

◆ releaseMemory()

template<class T, class U>
template<class V >
void htgs::ITask< T, U >::releaseMemory ( m_data_t< V >  memory)
inline

Releases memory onto a memory edge, which is transferred by the graph communicator.

Parameters
memorythe memory to be released
Template Parameters
Vthe MemoryData type
Note
the m_data_t should be acquired from a task using the getMemory function. A reference to this data can be passed along within IData.

◆ setTaskManager()

template<class T, class U>
void htgs::ITask< T, U >::setTaskManager ( TaskManager< T, U > *  ownerTask)
inline

Sets the owner task manager for this ITask.

Parameters
ownerTaskthe task manager that owns this ITask

◆ shutdown()

template<class T, class U>
virtual void htgs::ITask< T, U >::shutdown ( )
inlineoverridevirtual

Virtual function that is called when an ITask is being shutdown by it's owner thread.

This is done once per thread for the task. The output connectors/edges are still active, so it is okay to call ITask::addResult from within this function safely. This is useful for flushing any final remaining data from the thread to the next task.

Note
If you want to do a parallel reduction such that only the last thread produces data, then override the AnyITask::executeTaskFinal function.

Implements htgs::AnyITask.

Reimplemented in htgs::ICudaTask< T, U >, htgs::ExecutionPipeline< T, U >, htgs::Bookkeeper< T >, htgs::TGTask< T, U >, and htgs::MemoryManager< T >.


The documentation for this class was generated from the following file: