12 #ifndef HTGS_ANYTASKMANAGER_HPP 13 #define HTGS_ANYTASKMANAGER_HPP 25 #include <nvtx3/nvToolsExt.h> 29 #include <htgs/core/graph/profile/ProfileData.hpp> 30 #include <htgs/core/graph/profile/CustomProfile.hpp> 34 class TaskManagerThread;
84 size_t microTimeoutTime,
91 this->
timeout = microTimeoutTime;
188 virtual void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) = 0;
204 std::cout <<
"===================== " << this->
getName() <<
" " << prefix() <<
" ===================" << std::endl;
208 std::cout <<
"Input connector: ";
216 std::cout <<
"-------------------------- " << this->
getName() <<
" (thread: " << this->
getThreadId()
217 <<
") -------------------------- " << std::endl << std::endl;
375 HTGS_DEBUG(
"shutting down: " << this->prefix() <<
" " << this->
getName() << std::endl);
377 nvtxRangeId_t rangeId = this->nvtxProfiler->startRangeShuttingDown();
382 this->nvtxProfiler->endRangeShuttingDown(rangeId);
511 std::string prefix() {
513 "Thread id: " + std::to_string(this->
threadId) +
" (out of " + std::to_string(this->
numThreads)
514 +
"); Pipeline id " + std::to_string(this->
pipelineId) +
" (out of " + std::to_string(this->
numPipelines));
521 void setProfiler(NVTXProfiler *profiler) {
522 this->nvtxProfiler = profiler;
524 NVTXProfiler *getProfiler()
const {
528 void releaseProfiler() {
530 nvtxProfiler =
nullptr;
558 NVTXProfiler *nvtxProfiler;
586 this->terminated =
false;
590 this->numThreadsAfterDecrement = *this->
numThreads;
591 this->taskGraphInitializeCond = taskGraphInitializeCond;
592 this->taskGraphInitializeMutex = taskGraphInitializeMutex;
614 HTGS_DEBUG(
"Starting Thread for task : " << task->getName());
615 this->task->initialize();
618 std::unique_lock<std::mutex> lock(*this->taskGraphInitializeMutex);
619 this->taskGraphInitializeCond->notify_all();
622 while (!this->terminated) {
623 this->task->executeTask();
625 this->task->shutdown();
628 if (numThreadsAfterDecrement == 0)
630 this->task->terminateConnections();
634 if(hasNoThreadsRemaining())
636 this->task->releaseProfiler();
662 size_t current = this->
numThreads->fetch_sub(1) - 1;
663 numThreadsAfterDecrement = current;
686 size_t numThreadsAfterDecrement;
693 #endif //HTGS_ANYTASKMANAGER_HPP virtual void terminateConnections()=0
Terminates all Connector edges.
virtual void executeTask()=0
Executes the TaskManager.
virtual AnyTaskManager * copy(bool deep)=0
Copies the TaskManager.
virtual std::string getName()=0
Virtual function to get the name of an ITask.
size_t getNumThreads() const
Gets the number of threads associated with this TaskManager.
Definition: AnyTaskManager.hpp:298
virtual void printProfile()=0
Prints the profile data to std::out.
virtual void debug()
Virtual function that is called to debug the ITask.
Definition: AnyITask.hpp:276
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
std::string getNameWithPipelineId()
Gets the name of the ITask with it's pipeline ID.
Definition: AnyITask.hpp:509
volatile bool terminated
Whether the thread is ready to be terminated or not.
Definition: AnyTaskManager.hpp:683
size_t getThreadId()
Gets the thread id associated with the TaskManager.
Definition: AnyTaskManager.hpp:433
size_t getNumPipelines()
Gets the number of pipelines that this task manager belongs too.
Definition: AnyTaskManager.hpp:277
void shutdown()
Shuts down the TaskManager.
Definition: AnyTaskManager.hpp:374
NVTX Profiler uses NVIDIA's NVTX API to produce profiling metrics that are visualized with Nsight Sys...
virtual void setInputConnector(std::shared_ptr< AnyConnector > connector)=0
Sets the input BaseConnector.
size_t getThreadsRemaining()
Gets the number of threads remaining.
Definition: AnyTaskManager.hpp:647
AnyTaskManager * task
The TaskManager that is called from the thread.
Definition: AnyTaskManager.hpp:685
AnyTaskManager(size_t numThreads, bool isStartTask, size_t pipelineId, size_t numPipelines, std::string address)
Constructs an AnyTaskManager with an ITask as the task function and specific runtime parameters...
Definition: AnyTaskManager.hpp:56
std::string getAddress()
Gets the address of the task manager.
Definition: AnyTaskManager.hpp:260
unsigned long long int getExecuteTime()
Gets the total execution time for the task manager, including any waiting for memory within the execu...
Definition: AnyTaskManager.hpp:454
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
unsigned long long int getWaitTime()
Gets the wait time for the task manager.
Definition: AnyTaskManager.hpp:467
void profileITask()
Provides profile output for the ITask,.
Definition: AnyITask.hpp:482
bool startTask
Whether the task should start immediately.
Definition: AnyTaskManager.hpp:544
void incTaskComputeTime(int64_t val)
Increments the compute time profile value.
Definition: AnyTaskManager.hpp:363
std::string getDot(int flags)
Gets the dot notation for this TaskManager.
Definition: AnyTaskManager.hpp:411
int run()
Executes the task until the underlying Task has been terminated.
Definition: AnyTaskManager.hpp:607
void decrementNumThreadsRemaining()
Decrements the number of threads remaining by one.
Definition: AnyTaskManager.hpp:652
size_t getMaxQueueSize()
Gets the maximum size the input queue became during execution.
Definition: AnyTaskManager.hpp:480
void resetProfile()
Resets the profile data for this task.
Definition: AnyTaskManager.hpp:491
bool isInitialized()
Gets whether the TaskManager has initialized or not.
Definition: AnyTaskManager.hpp:327
unsigned long long int getTaskComputeTime()
Gets the task's compute time.
Definition: AnyTaskManager.hpp:505
virtual AnyITask * getTaskFunction()=0
Gets the ITask function associated with the TaskManager.
std::string getName()
Gets the name of the ITask.
Definition: AnyTaskManager.hpp:390
Implements parent ITask, removing template arguments.
unsigned long long int taskWaitTime
The total wait time for the task.
Definition: AnyTaskManager.hpp:539
size_t numPipelines
The number of execution pipelines.
Definition: AnyTaskManager.hpp:552
size_t getPipelineId()
Gets the pipeline identifer for this task from 0 to number of pipelines - 1.
Definition: AnyTaskManager.hpp:292
virtual ~AnyTaskManager()
Destructor.
Definition: AnyTaskManager.hpp:109
unsigned long long int getMemoryWaitTime() const
Gets the amount of time the task was waiting for memory.
Definition: AnyITask.hpp:566
bool isPoll()
Gets whether the task manager is polling for data or not.
Definition: AnyTaskManager.hpp:351
void debug()
Provides debug output.
Definition: AnyTaskManager.hpp:396
void incWaitTime(int64_t val)
Increments the wait time profile value.
Definition: AnyTaskManager.hpp:369
virtual void initialize()=0
Initializes the TaskManager.
unsigned long long int getComputeTime()
Gets the compute time for the task manager, removing the memory wait time.
Definition: AnyTaskManager.hpp:442
virtual std::string genDot(int flags, std::string dotId, std::shared_ptr< htgs::AnyConnector > input, std::shared_ptr< htgs::AnyConnector > output)
Virtual function that generates the input/output and per-task dot notation.
Definition: AnyITask.hpp:202
void updateAddressAndPipelines(std::string address, size_t pipelineId, size_t numPipelines)
Sets the task graph communicator.
Definition: AnyTaskManager.hpp:249
virtual std::shared_ptr< AnyConnector > getInputConnector()=0
Gets the input Connector.
void setStartTask(bool val)
Sets whether this task manager is a start task or not, which will immediately begin executing by send...
Definition: AnyTaskManager.hpp:335
std::string address
The address of the task graph this manager belongs too.
Definition: AnyTaskManager.hpp:553
virtual void shutdown()=0
Virtual function that is called when an ITask is being shutdown by it's owner thread.
bool isStartTask()
Gets whether this task manager will begin executing immediately with nullptr data or not...
Definition: AnyTaskManager.hpp:343
std::string getNameWithPipelineId()
Gets the name of the ITask with it's pipeline ID.
Definition: AnyTaskManager.hpp:406
bool initialized
Whether the task has been intitialized or not (called initialize function)
Definition: AnyTaskManager.hpp:546
void setNumPipelines(size_t numPipelines)
Sets the number of pipelines that this ITask belongs too.
Definition: AnyITask.hpp:378
The parent class for a Task that removes the template arguments.
Definition: AnyTaskManager.hpp:45
virtual void setOutputConnector(std::shared_ptr< AnyConnector > connector)=0
Sets the output BaseConnector.
~TaskManagerThread()
Destructor.
Definition: AnyTaskManager.hpp:598
bool isAlive()
Gets whether the TaskManager is alive or not.
Definition: AnyTaskManager.hpp:312
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
void setAlive(bool val)
Sets the alive state for this task manager.
Definition: AnyTaskManager.hpp:304
TaskManagerThread(size_t threadId, AnyTaskManager *task, std::shared_ptr< std::atomic_size_t > numThreads, std::condition_variable *taskGraphInitializeCond, std::mutex *taskGraphInitializeMutex)
Constructs a TaskManagerThread with a specified AnyTaskManager and atomic number of threads that is s...
Definition: AnyTaskManager.hpp:584
std::mutex * taskGraphInitializeMutex
The mutex used to notify the task has been initialized.
Definition: AnyTaskManager.hpp:688
size_t getTimeout()
Gets the timeout period in microseconds for the task when the task is polling for data...
Definition: AnyTaskManager.hpp:357
AnyTaskManager(size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime, size_t pipelineId, size_t numPipelines, std::string address)
Constructs an AnyTaskManager with an ITask as the task function and specific runtime parameters...
Definition: AnyTaskManager.hpp:81
void printProfile()
Prints the profiling data to std::cout.
Definition: AnyTaskManager.hpp:203
std::condition_variable * taskGraphInitializeCond
The condition variable that is used by the owner task graph for checking if all tasks have been initi...
Definition: AnyTaskManager.hpp:687
Defines common types used throughout the HTGS API and some of which that are used by users of HTGS su...
unsigned long long int taskComputeTime
The total compute time for the task.
Definition: AnyTaskManager.hpp:538
virtual void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles)=0
Gathers profiling data for the TaskProfiler.
Manages a TaskManager that is bound to a thread for execution.
Definition: AnyTaskManager.hpp:573
bool decrementAndCheckNumThreadsRemaining()
Decrements the number of threads and checks if there are no threads remaining in a single operation...
Definition: AnyTaskManager.hpp:660
bool alive
Whether the task is still alive.
Definition: AnyTaskManager.hpp:545
bool poll
Whether the manager should poll for data.
Definition: AnyTaskManager.hpp:542
virtual void setRuntimeThread(TaskManagerThread *runtimeThread)=0
Sets the thread that is executing this TaskManager.
Implements the TaskManagerProfile class that is used to gather profile data for a task manager...
size_t numThreads
The number of threads spawned for the manager.
Definition: AnyTaskManager.hpp:549
size_t timeout
The timeout time for polling in microseconds.
Definition: AnyTaskManager.hpp:541
void setPipelineId(size_t pipelineId)
Sets the pipeline Id for this ITask.
Definition: AnyITask.hpp:359
#define DOTGEN_FLAG_SHOW_ALL_THREADING
Shows all threading fully expanded during dot generation.
Definition: TaskGraphDotGenFlags.hpp:26
virtual std::shared_ptr< AnyConnector > getOutputConnector()=0
Gets the output Connector.
Implements the task graph communicator task that communicates from each task to all other tasks in a ...
void setNumPipelines(size_t numPipelines)
Sets the number of pipelines associated with the TaskManager.
Definition: AnyTaskManager.hpp:268
void setInitialized(bool val)
Sets the initialized state for the task manager.
Definition: AnyTaskManager.hpp:319
virtual size_t getThreadsRemaining()=0
Gets the number of threads that are still running for the task.
void setPipelineId(size_t id)
Sets the pipeline Id associated with the TaskManager.
Definition: AnyTaskManager.hpp:283
size_t pipelineId
The execution pipeline id.
Definition: AnyTaskManager.hpp:551
size_t threadId
The thread id for the task (set after initialization)
Definition: AnyTaskManager.hpp:548
void setThreadId(size_t id)
Sets the thread id associated with the TaskManager.
Definition: AnyTaskManager.hpp:425
Definition: Bookkeeper.hpp:23
void terminate()
Indicates that the thread is ready to be terminated.
Definition: AnyTaskManager.hpp:680
bool hasNoThreadsRemaining()
Checks if there are no more threads executing a Task.
Definition: AnyTaskManager.hpp:673
std::shared_ptr< std::atomic_size_t > numThreads
The number of total threads managing the TaskManager.
Definition: AnyTaskManager.hpp:684