HTGS  v2.0
The Hybrid Task Graph Scheduler
ITask.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
14 #ifndef HTGS_ITASK_HPP
15 #define HTGS_ITASK_HPP
16 
17 #include <functional>
18 #include <iostream>
19 #include <vector>
20 #include <unordered_map>
21 #include <memory>
22 #include <list>
23 #include <assert.h>
24 #include <sstream>
27 
28 #ifdef USE_NVTX
29 #include <nvtx3/nvToolsExt.h>
30 #endif
31 
32 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
33 #include <cxxabi.h>
34 #endif
35 
36 namespace htgs {
37 
38 template<class T, class U>
40 
164 template<class T, class U>
165 class ITask : public AnyITask {
166  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
167  static_assert(std::is_base_of<IData, U>::value, "U must derive from IData");
168 
169  public:
170 
174  ITask() : super() {}
175 
180  ITask(size_t numThreads) : super(numThreads) {}
181 
190  ITask(size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime) : super(numThreads,
191  isStartTask,
192  poll,
193  microTimeoutTime) {}
194 
195 
199 
200  virtual ~ITask() override {}
201 
202  virtual void initialize() override {}
203 
210  virtual void executeTask(std::shared_ptr<T> data) = 0;
211 
215  virtual bool canTerminate(std::shared_ptr<AnyConnector> inputConnector) override {
216  if (inputConnector == nullptr)
217  return true;
218 
219  return inputConnector->isInputTerminated();
220  }
221 
225  virtual void shutdown() override {}
226 
230  virtual void executeTaskFinal() override {}
231 
232 
236  virtual std::string getName() override {
237  return "UnnamedITask";
238  }
239 
243  virtual std::string getDotLabelName() override {
244  return this->getName();
245  }
246 
250  virtual std::string getDotShapeColor() override {
251  return "black";
252  }
253 
257  virtual std::string getDotFillColor() override {
258  return "white";
259  }
260 
264  virtual std::string getDotShape() override {
265  return "box";
266  }
267 
272  virtual std::string getDotCustomProfile() override {
273  return "";
274  }
275 
279  virtual void printProfile() override {}
280 
284  virtual ITask<T, U> *copy() = 0;
285 
290  virtual size_t getNumGraphsSpawned() { return 0; }
291 
292  virtual std::string genDotProducerEdgeToTask(std::map<std::shared_ptr<AnyConnector>, AnyITask *> &inputConnectorDotMap, int dotFlags) override
293  {
294  auto connectorPair = inputConnectorDotMap.find(this->ownerTask->getOutputConnector());
295  if (connectorPair != inputConnectorDotMap.end())
296  {
297  auto consumerIds = connectorPair->second->getConsumerDotIds();
298  if (consumerIds != "")
299  return this->getDotId() + " -> " + connectorPair->second->getConsumerDotIds() + ";\n";
300  }
301 
302  return "";
303  }
304 
305 
306  virtual std::string genDotConsumerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags) override
307  {
308  if (this->ownerTask->getInputConnector() != nullptr &&
309  connector != nullptr && this->ownerTask->getInputConnector() == connector)
310  {
311  auto consumerIds = this->getConsumerDotIds();
312  if (consumerIds != "")
313  return connector->getDotId() + " -> " + this->getConsumerDotIds() + ";\n";
314  }
315  return "";
316  }
317 
318  virtual std::string genDotProducerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags)
319  {
320  if (this->ownerTask->getOutputConnector() != nullptr &&
321  connector != nullptr && this->ownerTask->getOutputConnector() == connector)
322  {
323  // TODO: Use a new getProducerDotIds ?
324  return this->getDotId() + " -> " + connector->getDotId() + ";\n";
325  }
326 
327  return "";
328  }
329 
330 
331 
335 
341  ITask<T, U> *copyITask(bool deep) override {
342  ITask<T, U> *iTaskCopy = copy();
343 
344  HTGS_ASSERT(iTaskCopy != nullptr, "Copying Task '" << this->getName() << "' resulted in nullptr. Make sure you have the 'copy' function implemented (see https://pages.nist.gov/HTGS/doxygen/classhtgs_1_1_i_task.html#acaedf1466b238036d880efcbf1feafe6)");
345 
346 
347  if (deep)
348  copyMemoryEdges(iTaskCopy);
349 
350  return iTaskCopy;
351  }
352 
357  void addResult(std::shared_ptr<U> result) {
358  this->ownerTask->addResult(result);
359  }
360 
366  void addResult(U *result) {
367  this->ownerTask->addResult(std::shared_ptr<U>(result));
368  }
369 
378  void initialize(size_t pipelineId, size_t numPipeline, TaskManager<T, U> *ownerTask) {
379  this->ownerTask = ownerTask;
380  super::initialize(pipelineId, numPipeline);
381  }
382 
383 
398  template<class V>
399  m_data_t<V> getMemory(std::string name, IMemoryReleaseRule *releaseRule) {
400  return getMemory<V>(name, releaseRule, MMType::Static, 0);
401  }
402 
418  template<class V>
419  m_data_t<V> getDynamicMemory(std::string name, IMemoryReleaseRule *releaseRule, size_t numElems) {
420  return getMemory<V>(name, releaseRule, MMType::Dynamic, numElems);
421  }
422 
429  template<class V>
430  [[gnu::deprecated("Replaced by calling 'releaseMemory' directory with htgs::MemoryData (or m_data_t)")]]
431  void releaseMemory(m_data_t<V> memory) {
432  memory->releaseMemory();
433  // TODO: Delete or Add #ifdef
434 // std::shared_ptr<DataPacket> dataPacket = std::shared_ptr<DataPacket>(new DataPacket(this->getName(),
435 // this->getAddress(),
436 // memory->getMemoryManagerName(),
437 // memory->getAddress(),
438 // memory));
439 #ifdef USE_NVTX
440  this->getOwnerTaskManager()->getProfiler()->addReleaseMarker();
441 #endif
442 // this->getTaskGraphCommunicator()->produceDataPacket(dataPacket);
443  }
444 
445 
450  {
451  this->ownerTask->resetProfile();
452  }
453 
458  size_t getThreadID()
459  {
460  return this->ownerTask->getThreadId();
461  }
462 
467  unsigned long long int getTaskComputeTime() const {
468  return this->ownerTask->getTaskComputeTime();
469  }
470 
471 
475  std::string inTypeName() override final {
476 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
477  int status;
478  char *realName = abi::__cxa_demangle(typeid(T).name(), 0, 0, &status);
479  std::string ret(realName);
480 
481  free(realName);
482 
483  return ret;
484 #else
485  return typeid(T).name();
486 #endif
487 
488  }
489 
493  std::string outTypeName() override final {
494 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
495  int status;
496  char *realName = abi::__cxa_demangle(typeid(U).name(), 0, 0, &status);
497  std::string ret(realName);
498 
499  free(realName);
500 
501  return ret;
502 #else
503  return typeid(U).name();
504 #endif
505 
506  }
507 
511  std::string getAddress() override final {
512  return ownerTask->getAddress();
513  }
514 
520  this->ownerTask = ownerTask;
521  }
522 
528  return this->ownerTask;
529  }
530 
535  virtual void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) {}
536 
537  private:
539  typedef AnyITask super;
540 
541 
542  template<class V>
543  m_data_t<V> getMemory(std::string name, IMemoryReleaseRule *releaseRule, MMType type, size_t nElem) {
544  HTGS_ASSERT(this->getMemoryEdges()->find(name) != this->getMemoryEdges()->end(), "Task '" << this->getName() << "' cannot getMemory as it does not have the memory edge '" << name << "'" );
545 
546  auto conn = getMemoryEdges()->find(name)->second;
547  auto connector = std::dynamic_pointer_cast<Connector<MemoryData<V>>>(conn);
548 
549 #ifdef WS_PROFILE
550  sendWSProfileUpdate(StatusCode::WAITING_FOR_MEM);
551 #endif
552 
553 #ifdef USE_NVTX
554  nvtxRangeId_t rangeId = this->getOwnerTaskManager()->getProfiler()->startRangeWaitingForMemory();
555 #endif
556 
557 #ifdef PROFILE
558  auto start = std::chrono::high_resolution_clock::now();
559 #endif
560  m_data_t<V> memory = connector->consumeData();
561 
562 #ifdef USE_NVTX
563  this->getOwnerTaskManager()->getProfiler()->endRangeWaitingForMem(rangeId);
564 #endif
565 
566 #ifdef PROFILE
567  auto finish = std::chrono::high_resolution_clock::now();
568  this->incMemoryWaitTime(std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count());
569 #endif
570 
571 #ifdef WS_PROFILE
572  sendWSProfileUpdate(StatusCode::EXECUTE);
573 #endif
574 
575 
576 
577  memory->setMemoryReleaseRule(releaseRule);
578 
579  if (memory->getType() != type) {
580  std::cerr
581  << "Error: Incorrect usage of getMemory. Dynamic memory managers use 'getDynamicMemory', Static memory managers use 'getMemory' for task '"
582  << this->getName() << "' on memory edge " << name << std::endl;
583  exit(-1);
584  }
585 
586  if (type == MMType::Dynamic)
587  memory->memAlloc(nElem);
588 
589  return memory;
590  }
591 
593 
595 
596 
597 };
598 
599 }
600 
601 #endif //HTGS_ITASK_HPP
virtual void printProfile() override
Prints the profile data to std::out.
Definition: ITask.hpp:279
virtual void executeTaskFinal() override
Virtual function that is called just before the task has shutdown.
Definition: ITask.hpp:230
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
virtual std::string getDotShapeColor() override
Gets the color of the shape for graphviz dot.
Definition: ITask.hpp:250
std::string getDotId()
Gets the id used for dot nodes.
Definition: AnyITask.hpp:497
virtual void initialize() override
Virtual function that is called when an ITask is being initialized by it&#39;s owner thread.
Definition: ITask.hpp:202
virtual void shutdown() override
Virtual function that is called when an ITask is being shutdown by it&#39;s owner thread.
Definition: ITask.hpp:225
virtual bool canTerminate(std::shared_ptr< AnyConnector > inputConnector) override
Virtual function that is called when an ITask is checking if it can be terminated.
Definition: ITask.hpp:215
virtual std::string getDotShape() override
Gets the shape for graphviz dot.
Definition: ITask.hpp:264
MMType
The memory manager types.
Definition: MMType.hpp:38
TaskManager< T, U > * ownerTask
The owner task for this ITask.
Definition: ITask.hpp:594
Provides the Connector class for managing input/output of AbsData between Tasks.
void setTaskManager(TaskManager< T, U > *ownerTask)
Sets the owner task manager for this ITask.
Definition: ITask.hpp:519
void incMemoryWaitTime(unsigned long long int val)
Increments memory wait time.
Definition: AnyITask.hpp:574
size_t getThreadID()
Gets the thread ID associated with this task.
Definition: ITask.hpp:458
void addResult(std::shared_ptr< U > result)
Adds results to the output list to be sent to the next connected ITask in a TaskGraph.
Definition: ITask.hpp:357
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
virtual void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles)
Gathers profile data.
Definition: ITask.hpp:535
size_t numThreads
The number of threads to be used with this ITask (forms a thread pool) used when creating a TaskManag...
Definition: AnyITask.hpp:600
virtual void executeTask(std::shared_ptr< T > data)=0
Pure virtual function that is called when an ITask&#39;s thread is to execute on data.
m_data_t< V > getDynamicMemory(std::string name, IMemoryReleaseRule *releaseRule, size_t numElems)
Retrieves memory from a memory edge.
Definition: ITask.hpp:419
std::string outTypeName() override final
Gets the demangled output type name of the connector.
Definition: ITask.hpp:493
void initialize(size_t pipelineId, size_t numPipeline, TaskManager< T, U > *ownerTask)
Function that is called when an ITask is being initialized by it&#39;s owner thread.
Definition: ITask.hpp:378
ITask< T, U > * copyITask(bool deep) override
Copies the ITask (including a copy of all memory edges)
Definition: ITask.hpp:341
std::string getAddress() override final
Gets the address from the owner task, which is the address of the task graph.
Definition: ITask.hpp:511
Abstract class that describes when memory can be released/reused.
Definition: IMemoryReleaseRule.hpp:73
ITask(size_t numThreads)
Constructs an ITask with a specified number of threads.
Definition: ITask.hpp:180
virtual ITask< T, U > * copy()=0
Pure virtual function to copy an ITask.
ITask(size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
Constructs an ITask with a specified number of threads as well as additional scheduling options...
Definition: ITask.hpp:190
bool isStartTask() const
Gets whether this ITask is a starting task.
Definition: AnyITask.hpp:422
TaskManager< T, U > * getOwnerTaskManager()
Gets the owner task manager for this ITask.
Definition: ITask.hpp:527
virtual size_t getNumGraphsSpawned()
Gets the number of graphs spawned by this ITask.
Definition: ITask.hpp:290
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
ITask()
Creates an ITask with number of threads equal to 1.
Definition: ITask.hpp:174
const std::shared_ptr< ConnectorMap > & getMemoryEdges() const
Gets the memory edges for the task.
Definition: AnyITask.hpp:515
bool poll
Whether the ITask should poll for data used when creating a TaskManager.
Definition: AnyITask.hpp:602
Encapsulates an ITask to interact with an ITask&#39;s functionality.
Definition: ITask.hpp:39
size_t microTimeoutTime
The timeout time for polling in microseconds used when creating a TaskManager.
Definition: AnyITask.hpp:603
virtual std::string getDotLabelName() override
Virtual function to get the label name used for dot graph viz.
Definition: ITask.hpp:243
void copyMemoryEdges(AnyITask *iTaskCopy)
Copies the memory edges from this AnyITask to another AnyITask.
Definition: AnyITask.hpp:448
virtual std::string getDotCustomProfile() override
Adds the string text to the profiling of this task in the graphviz dot visualization.
Definition: ITask.hpp:272
Implements a TaskManager that interacts with an ITask and holds the input and output Connector for th...
#define HTGS_ASSERT(condition, message)
Prints a more meaningful assertion message and terminates if the condition fails. ...
Definition: debug_message.hpp:25
void addResult(U *result)
Adds results to the output list to be sent to the next connected ITask in a TaskGraph.
Definition: ITask.hpp:366
m_data_t< V > getMemory(std::string name, IMemoryReleaseRule *releaseRule)
Retrieves memory from a memory edge.
Definition: ITask.hpp:399
std::string inTypeName() override final
Gets the demangled input type name of the connector.
Definition: ITask.hpp:475
std::shared_ptr< MemoryData< V > > m_data_t
Defines a shared pointer to htgs::MemoryData.
Definition: Types.hpp:101
void resetProfile()
Resets profile data.
Definition: ITask.hpp:449
size_t pipelineId
The execution pipeline id for the ITask.
Definition: AnyITask.hpp:604
void releaseMemory(m_data_t< V > memory)
Releases memory onto a memory edge, which is transferred by the graph communicator.
Definition: ITask.hpp:431
unsigned long long int getTaskComputeTime() const
Gets the task&#39;s compute time.
Definition: ITask.hpp:467
Definition: Bookkeeper.hpp:23
virtual std::string getDotFillColor() override
Gets the color for filling the shape for graphviz dot.
Definition: ITask.hpp:257
virtual std::string getName() override
Virtual function to get the name of an ITask.
Definition: ITask.hpp:236