HTGS  v2.0
The Hybrid Task Graph Scheduler
TaskManager.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
13 #ifndef HTGS_TASKMANAGER_HPP
14 #define HTGS_TASKMANAGER_HPP
15 
16 #include <chrono>
17 #include <vector>
18 #include <memory>
19 #include <unordered_map>
20 #include <mutex>
21 #include <sstream>
22 
24 #include <htgs/api/ITask.hpp>
25 
26 #ifdef USE_NVTX
27 #include <nvtx3/nvToolsExt.h>
28 #ifdef __linux__
29 #include <syscall.h>
30 #elif __APPLE__
31 #include <sys/syscall.h>
32 #elif _WIN32
33 #include <windows.h>
34 #endif
35 #endif
36 
37 namespace htgs {
38 
39 template<class T, class U>
40 class ITask;
41 
56 template<class T, class U>
57 class TaskManager : public AnyTaskManager {
58  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
59  static_assert(std::is_base_of<IData, U>::value, "U must derive from IData");
60 
61  public:
72  size_t numThreads,
73  bool isStartTask,
74  size_t pipelineId,
75  size_t numPipelines,
76  std::string address) :
77  super(numThreads, isStartTask, pipelineId, numPipelines, address),
78  inputConnector(nullptr), outputConnector(nullptr), taskFunction(taskFunction), runtimeThread(nullptr) {
79  taskFunction->setTaskManager(this);
80  }
81 
93  TaskManager(ITask<T, U> *taskFunction, size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime,
94  size_t pipelineId, size_t numPipelines, std::string address) : super(numThreads,
95  isStartTask,
96  poll,
97  microTimeoutTime,
98  pipelineId,
99  numPipelines,
100  address),
101  inputConnector(nullptr),
102  outputConnector(nullptr),
103  taskFunction(taskFunction),
104  runtimeThread(nullptr) {
105  taskFunction->setTaskManager(this);
106  }
107 
108 
112 
113  ~TaskManager() override {
114  HTGS_DEBUG_VERBOSE("TaskManager: " << this << " is deleting task function " << taskFunction);
115  delete taskFunction;
116  taskFunction = nullptr;
117  }
118 
119  std::shared_ptr<AnyConnector> getInputConnector() override { return this->inputConnector; }
120 
121  std::shared_ptr<AnyConnector> getOutputConnector() override { return this->outputConnector; }
122 
123  void initialize() override {
124  HTGS_DEBUG("initializing: " << this->prefix() << " " << this->getName() << std::endl);
125 #ifdef USE_NVTX
126  nvtxThreadName = this->getName() + " (" + std::to_string(this->getThreadId()) + ")";
127 #ifdef __linux__
128  nvtxNameOsThreadA(static_cast<uint32_t>(syscall(SYS_gettid)), nvtxThreadName.c_str());
129 #elif __APPLE__
130  nvtxNameOsThreadA(syscall(SYS_thread_selfid), nvtxThreadName.c_str());
131 #elif _WIN32
132  nvtxNameOsThread(GetCurrentThreadId(), nvtxThreadName.c_str());
133 #endif
134  nvtxRangeId_t rangeId = this->getProfiler()->startRangeInitializing();
135 #endif
136 
137  this->taskFunction->initialize(this->getPipelineId(), this->getNumPipelines(), this);
138 
139 #ifdef USE_NVTX
140  this->getProfiler()->endRangeInitializing(rangeId);
141 #endif
142 
143  this->setInitialized(true);
144  }
145 
146  void setRuntimeThread(TaskManagerThread *runtimeThread) override { this->runtimeThread = runtimeThread; }
147 
149  return this->taskFunction;
150  }
151 
152  AnyTaskManager *copy(bool deep) override {
153  ITask<T, U> *iTask = this->taskFunction->copyITask(deep);
154 
156  *newTask =
157  new TaskManager<T, U>(iTask, this->getNumThreads(), this->isStartTask(), this->isPoll(), this->getTimeout(),
158  this->getPipelineId(), this->getNumPipelines(), this->getAddress());
159  if (deep) {
160  newTask->setInputConnector(this->getInputConnector());
161  newTask->setOutputConnector(this->getOutputConnector());
162  }
163  return (AnyTaskManager *) newTask;
164  }
165 
166  void executeTask() override {
167 #ifdef USE_NVTX
168  nvtxRangeId_t rangeId;
169 #endif
170  std::shared_ptr<T> data = nullptr;
171 
172  HTGS_DEBUG_VERBOSE(prefix() << "Running task: " << this->getName());
173 
174  if (this->isStartTask()) {
175  HTGS_DEBUG_VERBOSE(prefix() << this->getName() << " is a start task");
176  this->setStartTask(false);
177 #ifdef PROFILE
178  auto start = std::chrono::high_resolution_clock::now();
179 #endif
180 #ifdef WS_PROFILE
181  this->sendWSProfileUpdate(StatusCode::EXECUTE);
182 #endif
183 #ifdef USE_NVTX
184  rangeId = this->getProfiler()->startRangeExecuting();
185 #endif
186  this->taskFunction->executeTask(nullptr);
187 
188 #ifdef USE_NVTX
189  this->getProfiler()->endRangeExecuting(rangeId);
190 #endif
191 
192 #ifdef WS_PROFILE
193  this->sendWSProfileUpdate(StatusCode::WAITING);
194 #endif
195 
196 #ifdef PROFILE
197  auto finish = std::chrono::high_resolution_clock::now();
198  this->incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
199 #endif
200  return;
201  } else if (this->taskFunction->canTerminate(this->inputConnector)) {
202 
203  HTGS_DEBUG(prefix() << this->getName() << " task function is terminated");
204  this->processTaskFunctionTerminated();
205 
206  return;
207  }
208 #ifdef PROFILE
209  auto start = std::chrono::high_resolution_clock::now();
210 #endif
211 
212 #ifdef WS_PROFILE
213  this->sendWSProfileUpdate(StatusCode::WAITING);
214 #endif
215 
216 #ifdef USE_NVTX
217  rangeId = this->getProfiler()->startRangeWaiting(this->inputConnector->getQueueSize());
218 #endif
219 
220  if (this->isPoll())
221  data = this->inputConnector->pollConsumeData(this->getTimeout());
222  else
223  data = this->inputConnector->consumeData();
224 
225 #ifdef USE_NVTX
226  this->getProfiler()->endRangeWaiting(rangeId);
227 #endif
228 
229 
230 #ifdef PROFILE
231  auto finish = std::chrono::high_resolution_clock::now();
232 #endif
233 
234 #if defined (WS_PROFILE) && defined (VERBOSE_WS_PROFILE)
235  auto waitTime = std::chrono::duration_cast<std::chrono::microseconds>(finish - start);
236 #endif
237 #ifdef PROFILE
238  this->incWaitTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
239 #endif
240 
241  HTGS_DEBUG_VERBOSE(prefix() << this->getName() << " received data: " << data << " from " << inputConnector);
242 
243  if (data != nullptr || this->isPoll()) {
244 #ifdef PROFILE
245  start = std::chrono::high_resolution_clock::now();
246 #endif
247 #ifdef WS_PROFILE
248 // sendWSProfileUpdate(this->inputConnector.get(), StatusCode::CONSUME_DATA);
249  this->sendWSProfileUpdate(StatusCode::EXECUTE);
250 #endif
251 #ifdef USE_NVTX
252  rangeId = this->getProfiler()->startRangeExecuting();
253 #endif
254 
255  this->taskFunction->executeTask(data);
256 
257 #ifdef USE_NVTX
258  this->getProfiler()->endRangeExecuting(rangeId);
259 #endif
260 
261 #ifdef PROFILE
262  finish = std::chrono::high_resolution_clock::now();
263  this->incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
264 #endif
265 
266 
267 
268 #ifdef WS_PROFILE
269  // Produce meta data for task
270  std::string metaDataString = this->taskFunction->profileStr();
271 #ifdef VERBOSE_WS_PROFILE
272  // Send compute time and wait time meta
273  metaDataString = metaDataString + ";waitTime:" + std::to_string(waitTime.count()) + ";computeTime:" + std::to_string(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
274 
275  // Send input connector queue size
276  sendWSMetaProfileUpdate(this->inputConnector.get(),
277  "queueSize: " + std::to_string(this->inputConnector->getQueueSize()) +
278  ";maxQueueSize: " + std::to_string(this->inputConnector->getMaxQueueSize()));
279 #endif
280  if (metaDataString != "")
281  {
282  sendWSMetaProfileUpdate(metaDataString);
283  }
284 #endif
285 
286  }
287 
288  }
289 
290  size_t getThreadsRemaining() override {
291  return this->runtimeThread->getThreadsRemaining();
292  }
293 
294 
298 
299  void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override {
300 #ifdef WS_PROFILE
301  if (this->getName() == "WebSocketProfiler")
302  return;
303 #endif
304  // Create profile data for this task
306  *profileData = new TaskManagerProfile(this->getComputeTime(), this->getWaitTime(), this->getMaxQueueSize(), taskFunction->getMemoryWaitTime());
307  taskManagerProfiles->insert(std::pair<AnyTaskManager *, TaskManagerProfile *>(this, profileData));
308 
309  // Pass gatherProfileData to ITask for further processing
310  this->taskFunction->gatherProfileData(taskManagerProfiles);
311 
312  }
313 
318  void setInputConnector(std::shared_ptr<AnyConnector> connector) override {
319  if (connector != nullptr)
320  this->inputConnector = std::static_pointer_cast<Connector<T>>(connector);
321  else
322  this->inputConnector = nullptr;
323 
324  }
325 
330  void setOutputConnector(std::shared_ptr<AnyConnector> connector) override {
331  if (connector != nullptr)
332  this->outputConnector = std::static_pointer_cast<Connector<U>>(connector);
333  else
334  this->outputConnector = nullptr;
335  }
336 
341  void addResult(std::shared_ptr<U> result) {
342  if (this->outputConnector != nullptr) {
343  this->outputConnector->produceData(result);
344 #ifdef WS_PROFILE
345  if (result != nullptr)
346  sendWSProfileUpdate(this->outputConnector.get(), StatusCode::PRODUCE_DATA);
347 #endif
348  }
349  }
350 
355  void terminateConnections() override
356  {
357 #ifdef WS_PROFILE
358  // Update output connector, this task is no longer producing for it
359  this->sendWSProfileUpdate(this->getOutputConnector().get(), StatusCode::DECREMENT);
360 #endif
361  if (this->getOutputConnector() != nullptr) {
362  this->getOutputConnector()->producerFinished();
363 
364 // if (this->getOutputConnector()->isInputTerminated())
365  this->getOutputConnector()->wakeupConsumer();
366  }
367 
368  auto memManagerConnectorMap = this->getTaskFunction()->getReleaseMemoryEdges();
369 
370  HTGS_DEBUG(prefix() << " " << this->getName() << " Shutting down " << memManagerConnectorMap->size()
371  << " memory releasers");
372  for (auto nameManagerPair : *memManagerConnectorMap) {
373  HTGS_DEBUG(prefix() << " " << this->getName() << " Shutting down memory manager: " << nameManagerPair.first);
374 
375 
376  std::shared_ptr<AnyConnector> connector = nameManagerPair.second;
377 #ifdef WS_PROFILE
378  // TODO: This might not be necessary
379  // Update memory manager connector, this task is no longer producing for it
380  this->sendWSProfileUpdate(connector.get(), StatusCode::DECREMENT);
381 #endif
382  connector->producerFinished();
383 
384  if (connector->isInputTerminated())
385  connector->wakeupConsumer();
386  }
387  }
388 
389  private:
390 
392  void processTaskFunctionTerminated() {
393  // Task is now terminated, so it is no longer alive
394  this->setAlive(false);
395 
396  // Wake up the threads for this task
397  if (this->getInputConnector() != nullptr)
398  this->getInputConnector()->wakeupConsumer();
399 
400  // If there is a runtime thread, then begin termination
401  if (this->runtimeThread != nullptr) {
402  this->runtimeThread->terminate();
403 
404 #ifdef WS_PROFILE
405  this->sendWSProfileUpdate(StatusCode::SHUTDOWN);
406 #endif
407 
408  // If this is the last thread for this task then execute the task a final time (only the last thread will call this)
410 
411  auto start = std::chrono::high_resolution_clock::now();
412  // Final execution for the task
413 #ifdef USE_NVTX
414  nvtxRangeId_t rangeId = this->getProfiler()->startRangeExecuting();
415 #endif
416  this->taskFunction->executeTaskFinal();
417 
418 #ifdef USE_NVTX
419  this->getProfiler()->endRangeExecuting(rangeId);
420 #endif
421  auto finish = std::chrono::high_resolution_clock::now();
422  this->incTaskComputeTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
423  }
424  } else {
425  if (this->getOutputConnector() != nullptr) {
426  this->getOutputConnector()->producerFinished();
427 
428  if (this->getOutputConnector()->isInputTerminated()) {
429  this->getOutputConnector()->wakeupConsumer();
430  }
431  }
432  }
433  }
434 
435 #ifdef WS_PROFILE
436  void sendWSProfileUpdate(StatusCode code)
437  {
438  if (this->getName() == "WebSocketProfiler" || this->getName() == "Execution Pipeline")
439  return;
440  std::shared_ptr<ProfileData> updateStatus(new ChangeStatusProfile(this->getTaskFunction(), code));
441  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), this->getAddress(), "WebSocketProfiler", "0", updateStatus));
442  this->sendDataPacket(dataPacket);
443  }
444 
445  void sendWSProfileUpdate(void * addr, StatusCode code)
446  {
447  if (this->getName() == "WebSocketProfiler" || this->getName() == "Execution Pipeline")
448  return;
449  std::shared_ptr<ProfileData> updateStatus(new ChangeStatusProfile(addr, code));
450  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), this->getAddress(), "WebSocketProfiler", "0", updateStatus));
451  this->sendDataPacket(dataPacket);
452  }
453 
454 
455  void sendWSMetaProfileUpdate(std::string metaData)
456  {
457  if (this->getName() == "WebSocketProfiler" || this->getName() == "Execution Pipeline")
458  return;
459  std::shared_ptr<ProfileData> updateStatus(new UpdateMetadataProfile(this->getTaskFunction(), metaData));
460  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), this->getAddress(), "WebSocketProfiler", "0", updateStatus));
461  this->sendDataPacket(dataPacket);
462  }
463 
464 
465  void sendWSMetaProfileUpdate(void *addr, std::string metaData)
466  {
467  if (this->getName() == "WebSocketProfiler" || this->getName() == "Execution Pipeline")
468  return;
469  std::shared_ptr<ProfileData> updateStatus(new UpdateMetadataProfile(addr, metaData));
470  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), this->getAddress(), "WebSocketProfiler", "0", updateStatus));
471  this->sendDataPacket(dataPacket);
472  }
473 
474 #endif
475 
476 #ifdef USE_NVTX
477  std::string nvtxThreadName;
478 #endif
479 
480 
481  typedef AnyTaskManager super;
483 
484  std::shared_ptr<Connector<T>> inputConnector;
485  std::shared_ptr<Connector<U>> outputConnector;
488 };
489 }
490 
491 #endif //HTGS_TASKMANAGER_HPP
std::shared_ptr< AnyConnector > getInputConnector() override
Gets the input Connector.
Definition: TaskManager.hpp:119
Implements the parent class for a Task to remove the template arguments and the TaskManagerThread to ...
void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override
Gathers profiling data for the TaskProfiler.
Definition: TaskManager.hpp:299
size_t getNumThreads() const
Gets the number of threads associated with this TaskManager.
Definition: AnyTaskManager.hpp:298
size_t getThreadId()
Gets the thread id associated with the TaskManager.
Definition: AnyTaskManager.hpp:433
size_t getNumPipelines()
Gets the number of pipelines that this task manager belongs too.
Definition: AnyTaskManager.hpp:277
void addResult(std::shared_ptr< U > result)
Adds the result data to the output connector.
Definition: TaskManager.hpp:341
size_t getThreadsRemaining()
Gets the number of threads remaining.
Definition: AnyTaskManager.hpp:647
std::string getAddress()
Gets the address of the task manager.
Definition: AnyTaskManager.hpp:260
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
unsigned long long int getWaitTime()
Gets the wait time for the task manager.
Definition: AnyTaskManager.hpp:467
void incTaskComputeTime(int64_t val)
Increments the compute time profile value.
Definition: AnyTaskManager.hpp:363
size_t getMaxQueueSize()
Gets the maximum size the input queue became during execution.
Definition: AnyTaskManager.hpp:480
void setTaskManager(TaskManager< T, U > *ownerTask)
Sets the owner task manager for this ITask.
Definition: ITask.hpp:519
std::string getName()
Gets the name of the ITask.
Definition: AnyTaskManager.hpp:390
std::shared_ptr< AnyConnector > getOutputConnector() override
Gets the output Connector.
Definition: TaskManager.hpp:121
void executeTask() override
Executes the TaskManager.
Definition: TaskManager.hpp:166
ITask< T, U > * taskFunction
The task that is managed by the manager.
Definition: TaskManager.hpp:486
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
size_t numPipelines
The number of execution pipelines.
Definition: AnyTaskManager.hpp:552
size_t getPipelineId()
Gets the pipeline identifer for this task from 0 to number of pipelines - 1.
Definition: AnyTaskManager.hpp:292
Implements a task manager profile that holds profiling data for a task manager.
Definition: TaskManagerProfile.hpp:27
bool isPoll()
Gets whether the task manager is polling for data or not.
Definition: AnyTaskManager.hpp:351
void incWaitTime(int64_t val)
Increments the wait time profile value.
Definition: AnyTaskManager.hpp:369
unsigned long long int getComputeTime()
Gets the compute time for the task manager, removing the memory wait time.
Definition: AnyTaskManager.hpp:442
std::shared_ptr< Connector< U > > outputConnector
The output connector for the manager (queue to send data)
Definition: TaskManager.hpp:485
void setStartTask(bool val)
Sets whether this task manager is a start task or not, which will immediately begin executing by send...
Definition: AnyTaskManager.hpp:335
std::string address
The address of the task graph this manager belongs too.
Definition: AnyTaskManager.hpp:553
bool isStartTask()
Gets whether this task manager will begin executing immediately with nullptr data or not...
Definition: AnyTaskManager.hpp:343
size_t getThreadsRemaining() override
Gets the number of threads that are still running for the task.
Definition: TaskManager.hpp:290
void setRuntimeThread(TaskManagerThread *runtimeThread) override
Sets the thread that is executing this TaskManager.
Definition: TaskManager.hpp:146
The parent class for a Task that removes the template arguments.
Definition: AnyTaskManager.hpp:45
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
void setAlive(bool val)
Sets the alive state for this task manager.
Definition: AnyTaskManager.hpp:304
ITask< T, U > * getTaskFunction() override
Gets the ITask function associated with the TaskManager.
Definition: TaskManager.hpp:148
size_t getTimeout()
Gets the timeout period in microseconds for the task when the task is polling for data...
Definition: AnyTaskManager.hpp:357
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
void initialize() override
Initializes the TaskManager.
Definition: TaskManager.hpp:123
TaskManager(ITask< T, U > *taskFunction, size_t numThreads, bool isStartTask, size_t pipelineId, size_t numPipelines, std::string address)
Constructs a TaskManager with an ITask as the task function and specific runtime parameters.
Definition: TaskManager.hpp:71
Manages a TaskManager that is bound to a thread for execution.
Definition: AnyTaskManager.hpp:573
bool decrementAndCheckNumThreadsRemaining()
Decrements the number of threads and checks if there are no threads remaining in a single operation...
Definition: AnyTaskManager.hpp:660
Encapsulates an ITask to interact with an ITask&#39;s functionality.
Definition: ITask.hpp:39
Implements a data packet that is transmitted to the TaskGraphCommunicator.
bool poll
Whether the manager should poll for data.
Definition: AnyTaskManager.hpp:542
void setInputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the input BaseConnector.
Definition: TaskManager.hpp:318
size_t numThreads
The number of threads spawned for the manager.
Definition: AnyTaskManager.hpp:549
void setOutputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the output BaseConnector.
Definition: TaskManager.hpp:330
void terminateConnections() override
Terminates all Connector edges.
Definition: TaskManager.hpp:355
AnyTaskManager * copy(bool deep) override
Copies the TaskManager.
Definition: TaskManager.hpp:152
void setInitialized(bool val)
Sets the initialized state for the task manager.
Definition: AnyTaskManager.hpp:319
TaskManagerThread * runtimeThread
The thread that is executing this task&#39;s runtime.
Definition: TaskManager.hpp:487
An interface to process input data and forward results within a TaskGraph.
size_t pipelineId
The execution pipeline id.
Definition: AnyTaskManager.hpp:551
Definition: Bookkeeper.hpp:23
void terminate()
Indicates that the thread is ready to be terminated.
Definition: AnyTaskManager.hpp:680
TaskManager(ITask< T, U > *taskFunction, size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime, size_t pipelineId, size_t numPipelines, std::string address)
Constructs a TaskManager with an ITask as the task function and specific runtime parameters.
Definition: TaskManager.hpp:93
std::shared_ptr< Connector< T > > inputConnector
The input connector for the manager (queue to get data from)
Definition: TaskManager.hpp:484