13 #ifndef HTGS_TASKMANAGER_HPP 14 #define HTGS_TASKMANAGER_HPP 19 #include <unordered_map> 27 #include <nvtx3/nvToolsExt.h> 31 #include <sys/syscall.h> 39 template<
class T,
class U>
56 template<
class T,
class U>
57 class TaskManager :
public AnyTaskManager {
58 static_assert(std::is_base_of<IData, T>::value,
"T must derive from IData");
59 static_assert(std::is_base_of<IData, U>::value,
"U must derive from IData");
77 super(numThreads, isStartTask, pipelineId, numPipelines, address),
103 taskFunction(taskFunction),
124 HTGS_DEBUG(
"initializing: " << this->prefix() <<
" " << this->
getName() << std::endl);
128 nvtxNameOsThreadA(static_cast<uint32_t>(syscall(SYS_gettid)), nvtxThreadName.c_str());
130 nvtxNameOsThreadA(syscall(SYS_thread_selfid), nvtxThreadName.c_str());
132 nvtxNameOsThread(GetCurrentThreadId(), nvtxThreadName.c_str());
134 nvtxRangeId_t rangeId = this->getProfiler()->startRangeInitializing();
140 this->getProfiler()->endRangeInitializing(rangeId);
168 nvtxRangeId_t rangeId;
170 std::shared_ptr<T> data =
nullptr;
178 auto start = std::chrono::high_resolution_clock::now();
181 this->sendWSProfileUpdate(StatusCode::EXECUTE);
184 rangeId = this->getProfiler()->startRangeExecuting();
189 this->getProfiler()->endRangeExecuting(rangeId);
193 this->sendWSProfileUpdate(StatusCode::WAITING);
197 auto finish = std::chrono::high_resolution_clock::now();
198 this->
incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
201 }
else if (this->
taskFunction->canTerminate(this->inputConnector)) {
204 this->processTaskFunctionTerminated();
209 auto start = std::chrono::high_resolution_clock::now();
213 this->sendWSProfileUpdate(StatusCode::WAITING);
217 rangeId = this->getProfiler()->startRangeWaiting(this->
inputConnector->getQueueSize());
226 this->getProfiler()->endRangeWaiting(rangeId);
231 auto finish = std::chrono::high_resolution_clock::now();
234 #if defined (WS_PROFILE) && defined (VERBOSE_WS_PROFILE) 235 auto waitTime = std::chrono::duration_cast<std::chrono::microseconds>(finish - start);
238 this->
incWaitTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
243 if (data !=
nullptr || this->
isPoll()) {
245 start = std::chrono::high_resolution_clock::now();
249 this->sendWSProfileUpdate(StatusCode::EXECUTE);
252 rangeId = this->getProfiler()->startRangeExecuting();
258 this->getProfiler()->endRangeExecuting(rangeId);
262 finish = std::chrono::high_resolution_clock::now();
263 this->
incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
270 std::string metaDataString = this->
taskFunction->profileStr();
271 #ifdef VERBOSE_WS_PROFILE 273 metaDataString = metaDataString +
";waitTime:" + std::to_string(waitTime.count()) +
";computeTime:" + std::to_string(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
277 "queueSize: " + std::to_string(this->
inputConnector->getQueueSize()) +
278 ";maxQueueSize: " + std::to_string(this->
inputConnector->getMaxQueueSize()));
280 if (metaDataString !=
"")
282 sendWSMetaProfileUpdate(metaDataString);
299 void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles)
override {
301 if (this->
getName() ==
"WebSocketProfiler")
307 taskManagerProfiles->insert(std::pair<AnyTaskManager *, TaskManagerProfile *>(
this, profileData));
310 this->
taskFunction->gatherProfileData(taskManagerProfiles);
319 if (connector !=
nullptr)
331 if (connector !=
nullptr)
345 if (result !=
nullptr)
346 sendWSProfileUpdate(this->
outputConnector.get(), StatusCode::PRODUCE_DATA);
368 auto memManagerConnectorMap = this->
getTaskFunction()->getReleaseMemoryEdges();
370 HTGS_DEBUG(prefix() <<
" " << this->
getName() <<
" Shutting down " << memManagerConnectorMap->size()
371 <<
" memory releasers");
372 for (
auto nameManagerPair : *memManagerConnectorMap) {
373 HTGS_DEBUG(prefix() <<
" " << this->
getName() <<
" Shutting down memory manager: " << nameManagerPair.first);
376 std::shared_ptr<AnyConnector> connector = nameManagerPair.second;
380 this->sendWSProfileUpdate(connector.get(), StatusCode::DECREMENT);
382 connector->producerFinished();
384 if (connector->isInputTerminated())
385 connector->wakeupConsumer();
392 void processTaskFunctionTerminated() {
405 this->sendWSProfileUpdate(StatusCode::SHUTDOWN);
411 auto start = std::chrono::high_resolution_clock::now();
414 nvtxRangeId_t rangeId = this->getProfiler()->startRangeExecuting();
419 this->getProfiler()->endRangeExecuting(rangeId);
421 auto finish = std::chrono::high_resolution_clock::now();
422 this->
incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
436 void sendWSProfileUpdate(StatusCode code)
438 if (this->
getName() ==
"WebSocketProfiler" || this->
getName() ==
"Execution Pipeline")
440 std::shared_ptr<ProfileData> updateStatus(
new ChangeStatusProfile(this->
getTaskFunction(), code));
441 std::shared_ptr<DataPacket> dataPacket(
new DataPacket(this->
getName(), this->
getAddress(),
"WebSocketProfiler",
"0", updateStatus));
442 this->sendDataPacket(dataPacket);
445 void sendWSProfileUpdate(
void * addr, StatusCode code)
447 if (this->
getName() ==
"WebSocketProfiler" || this->
getName() ==
"Execution Pipeline")
449 std::shared_ptr<ProfileData> updateStatus(
new ChangeStatusProfile(addr, code));
450 std::shared_ptr<DataPacket> dataPacket(
new DataPacket(this->
getName(), this->
getAddress(),
"WebSocketProfiler",
"0", updateStatus));
451 this->sendDataPacket(dataPacket);
455 void sendWSMetaProfileUpdate(std::string metaData)
457 if (this->
getName() ==
"WebSocketProfiler" || this->
getName() ==
"Execution Pipeline")
459 std::shared_ptr<ProfileData> updateStatus(
new UpdateMetadataProfile(this->
getTaskFunction(), metaData));
460 std::shared_ptr<DataPacket> dataPacket(
new DataPacket(this->
getName(), this->
getAddress(),
"WebSocketProfiler",
"0", updateStatus));
461 this->sendDataPacket(dataPacket);
465 void sendWSMetaProfileUpdate(
void *addr, std::string metaData)
467 if (this->
getName() ==
"WebSocketProfiler" || this->
getName() ==
"Execution Pipeline")
469 std::shared_ptr<ProfileData> updateStatus(
new UpdateMetadataProfile(addr, metaData));
470 std::shared_ptr<DataPacket> dataPacket(
new DataPacket(this->
getName(), this->
getAddress(),
"WebSocketProfiler",
"0", updateStatus));
471 this->sendDataPacket(dataPacket);
477 std::string nvtxThreadName;
491 #endif //HTGS_TASKMANAGER_HPP std::shared_ptr< AnyConnector > getInputConnector() override
Gets the input Connector.
Definition: TaskManager.hpp:119
Implements the parent class for a Task to remove the template arguments and the TaskManagerThread to ...
void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override
Gathers profiling data for the TaskProfiler.
Definition: TaskManager.hpp:299
size_t getNumThreads() const
Gets the number of threads associated with this TaskManager.
Definition: AnyTaskManager.hpp:298
size_t getThreadId()
Gets the thread id associated with the TaskManager.
Definition: AnyTaskManager.hpp:433
size_t getNumPipelines()
Gets the number of pipelines that this task manager belongs too.
Definition: AnyTaskManager.hpp:277
void addResult(std::shared_ptr< U > result)
Adds the result data to the output connector.
Definition: TaskManager.hpp:341
size_t getThreadsRemaining()
Gets the number of threads remaining.
Definition: AnyTaskManager.hpp:647
std::string getAddress()
Gets the address of the task manager.
Definition: AnyTaskManager.hpp:260
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
unsigned long long int getWaitTime()
Gets the wait time for the task manager.
Definition: AnyTaskManager.hpp:467
void incTaskComputeTime(int64_t val)
Increments the compute time profile value.
Definition: AnyTaskManager.hpp:363
size_t getMaxQueueSize()
Gets the maximum size the input queue became during execution.
Definition: AnyTaskManager.hpp:480
void setTaskManager(TaskManager< T, U > *ownerTask)
Sets the owner task manager for this ITask.
Definition: ITask.hpp:519
std::string getName()
Gets the name of the ITask.
Definition: AnyTaskManager.hpp:390
std::shared_ptr< AnyConnector > getOutputConnector() override
Gets the output Connector.
Definition: TaskManager.hpp:121
void executeTask() override
Executes the TaskManager.
Definition: TaskManager.hpp:166
ITask< T, U > * taskFunction
The task that is managed by the manager.
Definition: TaskManager.hpp:486
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
size_t numPipelines
The number of execution pipelines.
Definition: AnyTaskManager.hpp:552
size_t getPipelineId()
Gets the pipeline identifer for this task from 0 to number of pipelines - 1.
Definition: AnyTaskManager.hpp:292
Implements a task manager profile that holds profiling data for a task manager.
Definition: TaskManagerProfile.hpp:27
bool isPoll()
Gets whether the task manager is polling for data or not.
Definition: AnyTaskManager.hpp:351
void incWaitTime(int64_t val)
Increments the wait time profile value.
Definition: AnyTaskManager.hpp:369
unsigned long long int getComputeTime()
Gets the compute time for the task manager, removing the memory wait time.
Definition: AnyTaskManager.hpp:442
std::shared_ptr< Connector< U > > outputConnector
The output connector for the manager (queue to send data)
Definition: TaskManager.hpp:485
void setStartTask(bool val)
Sets whether this task manager is a start task or not, which will immediately begin executing by send...
Definition: AnyTaskManager.hpp:335
std::string address
The address of the task graph this manager belongs too.
Definition: AnyTaskManager.hpp:553
bool isStartTask()
Gets whether this task manager will begin executing immediately with nullptr data or not...
Definition: AnyTaskManager.hpp:343
size_t getThreadsRemaining() override
Gets the number of threads that are still running for the task.
Definition: TaskManager.hpp:290
void setRuntimeThread(TaskManagerThread *runtimeThread) override
Sets the thread that is executing this TaskManager.
Definition: TaskManager.hpp:146
The parent class for a Task that removes the template arguments.
Definition: AnyTaskManager.hpp:45
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
void setAlive(bool val)
Sets the alive state for this task manager.
Definition: AnyTaskManager.hpp:304
ITask< T, U > * getTaskFunction() override
Gets the ITask function associated with the TaskManager.
Definition: TaskManager.hpp:148
size_t getTimeout()
Gets the timeout period in microseconds for the task when the task is polling for data...
Definition: AnyTaskManager.hpp:357
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
void initialize() override
Initializes the TaskManager.
Definition: TaskManager.hpp:123
TaskManager(ITask< T, U > *taskFunction, size_t numThreads, bool isStartTask, size_t pipelineId, size_t numPipelines, std::string address)
Constructs a TaskManager with an ITask as the task function and specific runtime parameters.
Definition: TaskManager.hpp:71
Manages a TaskManager that is bound to a thread for execution.
Definition: AnyTaskManager.hpp:573
bool decrementAndCheckNumThreadsRemaining()
Decrements the number of threads and checks if there are no threads remaining in a single operation...
Definition: AnyTaskManager.hpp:660
Encapsulates an ITask to interact with an ITask's functionality.
Definition: ITask.hpp:39
Implements a data packet that is transmitted to the TaskGraphCommunicator.
bool poll
Whether the manager should poll for data.
Definition: AnyTaskManager.hpp:542
void setInputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the input BaseConnector.
Definition: TaskManager.hpp:318
size_t numThreads
The number of threads spawned for the manager.
Definition: AnyTaskManager.hpp:549
void setOutputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the output BaseConnector.
Definition: TaskManager.hpp:330
void terminateConnections() override
Terminates all Connector edges.
Definition: TaskManager.hpp:355
AnyTaskManager * copy(bool deep) override
Copies the TaskManager.
Definition: TaskManager.hpp:152
void setInitialized(bool val)
Sets the initialized state for the task manager.
Definition: AnyTaskManager.hpp:319
TaskManagerThread * runtimeThread
The thread that is executing this task's runtime.
Definition: TaskManager.hpp:487
An interface to process input data and forward results within a TaskGraph.
size_t pipelineId
The execution pipeline id.
Definition: AnyTaskManager.hpp:551
Definition: Bookkeeper.hpp:23
void terminate()
Indicates that the thread is ready to be terminated.
Definition: AnyTaskManager.hpp:680
TaskManager(ITask< T, U > *taskFunction, size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime, size_t pipelineId, size_t numPipelines, std::string address)
Constructs a TaskManager with an ITask as the task function and specific runtime parameters.
Definition: TaskManager.hpp:93
std::shared_ptr< Connector< T > > inputConnector
The input connector for the manager (queue to get data from)
Definition: TaskManager.hpp:484