HTGS  v2.0
The Hybrid Task Graph Scheduler
TaskGraphConf.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
13 #ifndef HTGS_TASKGRAPHCONF_HPP
14 #define HTGS_TASKGRAPHCONF_HPP
15 
17 #include <htgs/core/graph/edge/GraphTaskProducerEdge.hpp>
19 #include <htgs/core/graph/edge/GraphEdge.hpp>
23 
25 #include <htgs/api/TGTask.hpp>
26 #include <htgs/core/graph/edge/GraphRuleProducerEdge.hpp>
27 #include <htgs/core/graph/edge/GraphTaskConsumerEdge.hpp>
28 
29 #ifdef USE_CUDA
31 #endif
32 
33 #ifdef WS_PROFILE
34 #include <htgs/core/graph/profile/CustomProfile.hpp>
35 #include <htgs/core/graph/profile/WebSocketProfiler.hpp>
36 #endif
37 
38 namespace htgs {
39 
40  template <class T, class U>
41  class ExecutionPipeline;
42 
43  template <class T, class U>
44  class TGTask;
45 
143 template<class T, class U>
144 class TaskGraphConf : public AnyTaskGraphConf {
145  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
146  static_assert(std::is_base_of<IData, U>::value, "U must derive from IData");
147 
148  public:
149 
154  this->input = std::shared_ptr<Connector<T>>(new Connector<T>());
155  this->output = std::shared_ptr<Connector<U>>(new Connector<U>());
156 
157  this->input->incrementInputTaskCount();
158  this->graphConsumerEdge = nullptr;
159  this->graphProducerEdges = new std::list<GraphEdge<U> *>();
160 
161  this->edges = new std::list<EdgeDescriptor *>();
162 
163  // TODO: Delete or Add #ifdef
164 // this->taskConnectorCommunicator = new TaskGraphCommunicator(nullptr, this->getAddress());
165 
166 #ifdef WS_PROFILE
167  // Create web socket profiler task
168  WebSocketProfiler *profileTask = new WebSocketProfiler();
169 
170  // Create input connector and task manager
171  std::shared_ptr<Connector<ProfileData>> wsConnector(new Connector<ProfileData>());
172  this->wsProfileTaskManager= new TaskManager<ProfileData, VoidData>(profileTask,
173  profileTask->getNumThreads(),
174  profileTask->isStartTask(),
175  profileTask->isPoll(),
176  profileTask->getMicroTimeoutTime(),
177  0,
178  1,
179  "0");
180 
181  this->wsProfileTaskManager->setInputConnector(wsConnector);
182  wsConnector->incrementInputTaskCount();
183 
184  // Add task to communicator
185  TaskNameConnectorPair pair("0:" + this->wsProfileTaskManager->getName(), wsConnector);
186 
187  this->taskConnectorCommunicator->addTaskNameConnectorPair(pair);
188 
189  // Send root graph creation
190  std::shared_ptr<ProfileData> createGraphData(new CreateNodeProfile(this, nullptr, ""));
191  this->sendProfileData(createGraphData);
192 
193  std::cout << "Creating " << this->wsProfileTaskManager->getName() << " with connector addr: " << wsConnector << std::endl;
194 
195 #endif
196 
197  }
198 
206  size_t numPipelines,
207  std::string baseAddress
208  // TODO: Delete or Add #ifdef
209 // TaskGraphCommunicator *parentCommunicator
210 #ifdef WS_PROFILE
211  , TaskManager<ProfileData, VoidData> *wsProfileTaskManager
212 #endif
213  ) : AnyTaskGraphConf(pipelineId, numPipelines, baseAddress) {
214  this->input = std::shared_ptr<Connector<T>>(new Connector<T>());
215  this->output = std::shared_ptr<Connector<U>>(new Connector<U>());
216 
217  this->input->incrementInputTaskCount();
218 
219  graphConsumerEdge = nullptr;
220  graphProducerEdges = new std::list<GraphEdge<U> *>();
221 
222  edges = new std::list<EdgeDescriptor *>();
223 
224  // TODO: Delete or Add #ifdef
225 // taskConnectorCommunicator = new TaskGraphCommunicator(parentCommunicator, this->getAddress());
226 
227 #ifdef WS_PROFILE
228  this->wsProfileTaskManager = wsProfileTaskManager;
229  this->wsProfileThread = nullptr;
230 #endif
231 
232  }
233 
237  ~TaskGraphConf() override {
238  for (auto edge : *edges) {
239  if (edge != nullptr) {
240  delete edge;
241  edge = nullptr;
242  }
243  }
244  delete edges;
245  edges = nullptr;
246 
247 #ifdef WS_PROFILE
248  if (wsProfileThread != nullptr)
249  {
250  this->wsProfileTaskManager->getInputConnector()->producerFinished();
251  this->wsProfileTaskManager->getInputConnector()->wakeupConsumer();
252  this->wsProfileThread->join();
253 
254  delete wsProfileTaskManager;
255  wsProfileTaskManager = nullptr;
256 
257  delete wsProfileThread;
258  wsProfileThread = nullptr;
259 
260  delete runtimeThread;
261  runtimeThread = nullptr;
262  }
263 #endif
264 
265 #ifdef USE_NVTX
266  if (this->getAddress() == "0")
267  {
268  delete profiler;
269  profiler = nullptr;
270  }
271 #endif
272 
273  // TODO: Delete or Add #ifdef
274 // if (taskConnectorCommunicator != nullptr)
275 // taskConnectorCommunicator->terminateGracefully();
276 //
277 // delete taskConnectorCommunicator;
278 // taskConnectorCommunicator = nullptr;
279 
280  if (graphConsumerEdge)
281  {
282  delete graphConsumerEdge;
283  graphConsumerEdge = nullptr;
284  }
285 
286  for (auto edge : *graphProducerEdges) {
287  if (edge != nullptr) {
288  delete edge;
289  edge = nullptr;
290  }
291  }
292 
293  delete graphProducerEdges;
294  graphProducerEdges = nullptr;
295  }
296 
297  AnyTaskGraphConf *copy() override {
298  return copy(this->getPipelineId(), this->getNumPipelines());
299  }
300 
308  return copy(pipelineId, numPipelines, nullptr, nullptr, this->getAddress());
309  }
310 
322  size_t numPipelines,
323  std::shared_ptr<Connector<T>> input,
324  std::shared_ptr<Connector<U>> output,
325  std::string baseAddress) {
326  // TODO: Delete or Add #ifdef
327  //TaskGraphCommunicator *parentCommunicator) {
328 #ifdef WS_PROFILE
329  TaskGraphConf<T, U> *graphCopy = new TaskGraphConf<T, U>(pipelineId, numPipelines, baseAddress, parentCommunicator, this->wsProfileTaskManager);
330 
331  // Send child graph creation
332  std::shared_ptr<ProfileData> createGraphData(new CreateNodeProfile(graphCopy, this, ""));
333  this->sendProfileData(createGraphData);
334 #else
335  TaskGraphConf<T, U> *graphCopy = new TaskGraphConf<T, U>(pipelineId, numPipelines, baseAddress/*TODO: Delete or Add #ifdef , parentCommunicator*/);
336 #endif
337 
338  // Copy the tasks to form lookup between old ITasks and new copies
339  graphCopy->copyTasks(this->getTaskManagers());
340 
341  if (input != nullptr) {
342  graphCopy->setInputConnector(input);
343  }
344 
345  if (output != nullptr) {
346  graphCopy->setOutputConnector(output);
347  }
348 
349  // Copy the graph producer and consumer tasks
350  GraphEdge<T> *consumerEdge = graphConsumerEdge->copy(graphCopy);
351  consumerEdge->applyEdge(graphCopy);
352  graphCopy->setGraphConsumerEdge(consumerEdge);
353 
354  for (auto producerEdge : *this->graphProducerEdges)
355  {
356  GraphEdge<U> *producerEdgeCopy = producerEdge->copy(graphCopy);
357  producerEdgeCopy->applyEdge(graphCopy);
358  graphCopy->addGraphProducerEdge(producerEdgeCopy);
359  }
360 
361 
362 
363 // graphCopy->copyAndUpdateGraphConsumerTask(this->graphConsumerEdge);
364 // graphCopy->copyAndUpdateGraphProducerTasks(this->graphProducerEdges);
365 
366  for (EdgeDescriptor *edgeDescriptor : *edges) {
367  // Copy the edge, using the graph copy as a reference for where to get task copies
368  EdgeDescriptor *edgeCopy = edgeDescriptor->copy(graphCopy);
369 
370  // Apply the edge on the graph copy
371  edgeCopy->applyEdge(graphCopy);
372 
373  graphCopy->addEdgeDescriptor(edgeCopy);
374  }
375 
376  return graphCopy;
377  }
378 
387  template<class V, class W, class X>
388  void addEdge(ITask<V, W> *producer, ITask<W, X> *consumer) {
389  auto pce = new ProducerConsumerEdge<V, W, X>(producer, consumer);
390  pce->applyEdge(this);
391  this->addEdgeDescriptor(pce);
392  }
393 
405  template<class V, class IRuleType, class W, class X>
406  void addRuleEdge(Bookkeeper<V> *bookkeeper, std::shared_ptr<IRuleType> rule, ITask<W, X> *consumer) {
407  static_assert(std::is_base_of<IRule<V, W>, IRuleType>::value,
408  "Type mismatch for IRule<V, W>, V must match the input type of the bookkeeper and W must match the input type of the consumer!");
409  std::shared_ptr<IRule<V, W>> ruleCast = std::static_pointer_cast<IRule<V, W>>(rule);
410  auto re = new RuleEdge<V, W, X>(bookkeeper, ruleCast, consumer);
411  re->applyEdge(this);
412  this->addEdgeDescriptor(re);
413  }
414 
424  template<class V, class W, class X>
425  void addRuleEdge(Bookkeeper<V> *bookkeeper, IRule<V, W> *iRule, ITask<W, X> *consumer) {
426  std::shared_ptr<IRule<V, W>> rule = super::getIRule(iRule);
427 
428  auto re = new RuleEdge<V, W, X>(bookkeeper, rule, consumer);
429  re->applyEdge(this);
430  this->addEdgeDescriptor(re);
431  }
432 
433 #ifdef USE_CUDA
434 
447  template<class V, class IMemoryAllocatorType>
448  void addCudaMemoryManagerEdge(std::string name,
449  AnyITask *getMemoryTask,
450  std::shared_ptr<IMemoryAllocatorType> allocator,
451  size_t memoryPoolSize,
452  MMType type,
453  int *gpuIds) {
454  static_assert(std::is_base_of<IMemoryAllocator<V>, IMemoryAllocatorType>::value,
455  "Type mismatch for allocator, allocator must be a MemoryAllocator!");
456 
457  std::shared_ptr<IMemoryAllocator<V>> memAllocator = std::static_pointer_cast<IMemoryAllocator<V>>(allocator);
458 
459  MemoryManager<V> *memoryManager = new CudaMemoryManager<V>(name, gpuIds, memoryPoolSize, memAllocator, type);
460 
461  MemoryEdge<V> *memEdge = new MemoryEdge<V>(name, getMemoryTask, memoryManager);
462  memEdge->applyEdge(this);
463  this->addEdgeDescriptor(memEdge);
464  }
465 
466 
467 
468 
482  template<class V>
483  void addCudaMemoryManagerEdge(std::string name,
484  AnyITask *getMemoryTask,
485  IMemoryAllocator<V> *allocator,
486  size_t memoryPoolSize,
487  MMType type,
488  int *gpuIds) {
489 
490  std::shared_ptr<IMemoryAllocator<V>> memAllocator = this->getMemoryAllocator(allocator);
491 
492  MemoryManager<V> *memoryManager = new CudaMemoryManager<V>(name, gpuIds, memoryPoolSize, memAllocator, type);
493 
494  MemoryEdge<V> *memEdge = new MemoryEdge<V>(name, getMemoryTask, memoryManager);
495  memEdge->applyEdge(this);
496  this->addEdgeDescriptor(memEdge);
497  }
498 #endif
499 
500 
501  template <class V>
502  void addRuleEdgeAsGraphProducer(Bookkeeper<V> *bookkeeper, IRule<V, U> *iRule) {
503  std::shared_ptr<IRule<V, U>> rule = super::getIRule(iRule);
504 
505  auto gore = new GraphRuleProducerEdge<V, U>(bookkeeper, rule, this->output);
506  gore->applyEdge(this);
507  this->addGraphProducerEdge(gore);
508  }
509 
517  template <class V>
518  void addCustomMemoryManagerEdge(AnyITask *getMemoryTask, MemoryManager<V> *memoryManager)
519  {
520  MemoryEdge<V> *memEdge = new MemoryEdge<V>(memoryManager->getMemoryManagerName(), getMemoryTask, memoryManager);
521  memEdge->applyEdge(this);
522  this->addEdgeDescriptor(memEdge);
523  }
524 
536  template<class V, class IMemoryAllocatorType>
537  void addMemoryManagerEdge(std::string name, AnyITask *getMemoryTask,
538  std::shared_ptr<IMemoryAllocatorType> allocator, size_t memoryPoolSize, MMType type) {
539  static_assert(std::is_base_of<IMemoryAllocator<V>, IMemoryAllocatorType>::value,
540  "Type mismatch for allocator, allocator must be a MemoryAllocator!");
541 
542  std::shared_ptr<IMemoryAllocator<V>> memAllocator = std::static_pointer_cast<IMemoryAllocator<V>>(allocator);
543 
544  MemoryManager<V> *memoryManager = new MemoryManager<V>(name, memoryPoolSize, memAllocator, type);
545 
546  MemoryEdge<V> *memEdge = new MemoryEdge<V>(name, getMemoryTask, memoryManager);
547  memEdge->applyEdge(this);
548  this->addEdgeDescriptor(memEdge);
549  }
550 
561  template<class V>
562  void addMemoryManagerEdge(std::string name,
563  AnyITask *getMemoryTask,
564  IMemoryAllocator<V> *allocator,
565  size_t memoryPoolSize,
566  MMType type) {
567 
568  std::shared_ptr<IMemoryAllocator<V>> memAllocator = super::getMemoryAllocator(allocator);
569 
570  MemoryManager<V> *memoryManager = new MemoryManager<V>(name, memoryPoolSize, memAllocator, type);
571 
572  MemoryEdge<V> *memEdge = new MemoryEdge<V>(name, getMemoryTask, memoryManager);
573  memEdge->applyEdge(this);
574  this->addEdgeDescriptor(memEdge);
575  }
576 
577 // AnyTaskManager *getGraphConsumerTaskManager() override {
578 //
579 // return this->graphConsumerEdge->getTaskManager(this);
580 // }
581 
582 // std::list<AnyTaskManager *> *getGraphProducerTaskManagers() override {
583 // return this->graphProducerEdges;
584 // }
585 
586  std::shared_ptr<AnyConnector> getInputConnector() override {
587  return this->input;
588  }
589 
590  std::shared_ptr<AnyConnector> getOutputConnector() override {
591  return this->output;
592  }
593 
594  // TODO: rename virtual function to capture functionality with initializing the profiler
596  for (auto t : *this->getTaskManagers()) {
597  t->updateAddressAndPipelines(this->getAddress(), this->getPipelineId(), this->getNumPipelines());
598 
599 
600 #ifdef USE_NVTX
601  if (this->getAddress() == "0") {
602  nvtxDomainHandle_t graphDomain = nvtxDomainCreateA(TASK_GRAPH_PREFIX_NAME);
603 #ifdef USE_MINIMAL_NVTX
604  profiler = new NVTXProfiler(TASK_GRAPH_PREFIX_NAME, nullptr, graphDomain, graphDomain, graphDomain, graphDomain, graphDomain, graphDomain);
605 #else
606  profiler = new NVTXProfiler(TASK_GRAPH_PREFIX_NAME, graphDomain, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr);
607 #endif
608  }
609 #endif
610 
611 // std::string taskAddressName = this->address + ":" + t->getName();
612 // this->taskConnectorNameMap->insert(TaskNameConnectorPair(taskAddressName, t->getInputConnector()));
613  }
614  }
615 
616  // TODO: Delete or Add #ifdef
617 // TaskGraphCommunicator *getTaskGraphCommunicator() const { return this->taskConnectorCommunicator; }
618 
619 // void updateCommunicator() override {
620 //
621 // auto taskNameConnectorMap = this->getTaskConnectorNameMap();
622 //
623 // // Send the map to the taskGraphCommunicator
624 // this->taskConnectorCommunicator->addTaskNameConnectorMap(taskNameConnectorMap);
625 //
626 // for (auto t : *this->getTaskManagers()) {
627 // t->setTaskGraphCommunicator(this->taskConnectorCommunicator);
628 // }
629 //
630 //#ifdef WS_PROFILE
631 // if (this->getAddress() == "0") {
632 // std::cout << "Launching threaed" << std::endl;
633 // // Create thread
634 // std::shared_ptr<std::atomic_size_t>
635 // atomicNumThreads = std::shared_ptr<std::atomic_size_t>(new std::atomic_size_t(1));
636 // runtimeThread = new TaskManagerThread(0, this->wsProfileTaskManager, atomicNumThreads);
637 // this->wsProfileThread = new std::thread(&TaskManagerThread::run, runtimeThread);
638 //
639 // WebSocketProfiler *profileTask = (WebSocketProfiler *)this->wsProfileTaskManager->getTaskFunction();
640 // profileTask->waitForConnection();
641 //
642 // }
643 //#endif
644 //
645 //
646 // }
647 
652 // void setInputConnector(std::shared_ptr<Connector<T>> input) {
653 // this->input = input;
654 // }
655 
660 // void setOutputConnector(std::shared_ptr<Connector<U>> output) {
661 // this->output = output;
662 // }
663 
671  this->input->incrementInputTaskCount();
672  }
673 
681  this->input->producerFinished();
682  if (this->input->getProducerCount() == 0) {
683  this->input->wakeupConsumer();
684  }
685 #ifdef WS_PROFILE
686  std::shared_ptr<ProfileData> updateStatus(new ChangeStatusProfile(this->input.get(), StatusCode::DECREMENT));
687  this->sendProfileData(updateStatus);
688 #endif
689  }
690 
699  template<class W>
701  GraphTaskConsumerEdge<T, W> *consumerEdge = new GraphTaskConsumerEdge<T, W>(task, this->input);
702 
703  consumerEdge->applyEdge(this);
704  if (this->graphConsumerEdge)
705  {
706  delete graphConsumerEdge;
707  }
708 
709  this->graphConsumerEdge = consumerEdge;
710 
711 #ifdef WS_PROFILE
712  // Add nodes
713  std::shared_ptr<ProfileData> connectorData(new CreateConnectorProfile(input.get(), this, input->getProducerCount(), "Graph Input"));
714  std::shared_ptr<ProfileData> consumerData(new CreateNodeProfile(task, this, task->getName()));
715 
716  this->sendProfileData(consumerData);
717  this->sendProfileData(connectorData);
718 
719  std::shared_ptr<ProfileData> connectorConsumerData(new CreateEdgeProfile(input.get(), task, "", nullptr));
720 
721  this->sendProfileData(connectorConsumerData);
722 #endif
723 
724  }
725 
732  template<class W>
734  GraphTaskProducerEdge<W, U> *producerEdge = new GraphTaskProducerEdge<W, U>(task, this->output);
735  producerEdge->applyEdge(this);
736 
737  this->graphProducerEdges->push_back(producerEdge);
738 
739 #ifdef WS_PROFILE
740  // Add nodes
741  std::shared_ptr<ProfileData> connectorData(new CreateConnectorProfile(output.get(), this, output->getProducerCount(), "Graph Output"));
742  std::shared_ptr<ProfileData> producerData(new CreateNodeProfile(task, this, task->getName()));
743 
744  this->sendProfileData(producerData);
745  this->sendProfileData(connectorData);
746 
747  std::shared_ptr<ProfileData> connectorProducerData(new CreateEdgeProfile(task, output.get(), "", nullptr));
748 
749  this->sendProfileData(connectorProducerData);
750 #endif
751 
752  }
753 
763  void produceData(T *data) {
764  std::shared_ptr<T> dataPtr = std::shared_ptr<T>(data);
765  this->input->produceData(dataPtr);
766  }
767 
775  void produceData(std::shared_ptr<T> data) {
776  this->input->produceData(data);
777  }
778 
786  void produceData(std::list<std::shared_ptr<T>> *dataList) {
787  this->input->produceData(dataList);
788  if (this->input->isInputTerminated())
789  this->input->wakeupConsumer();
790  }
791 
801  std::shared_ptr<U> consumeData() {
802 #ifdef USE_NVTX
803  nvtxRangeId_t id = profiler->startRangeWaiting(this->output->getQueueSize());
804 #endif
805  std::shared_ptr<U> data = this->output->consumeData();
806 #ifdef USE_NVTX
807  profiler->endRangeWaiting(id);
808 #endif
809  return data;
810  }
811 
817  std::shared_ptr<U> pollData(size_t microTimeout) {
818 #ifdef USE_NVTX
819  nvtxRangeId_t id = profiler->startRangeWaiting(this->output->getQueueSize());
820 #endif
821  std::shared_ptr<U> data = this->output->pollConsumeData(microTimeout);
822 #ifdef USE_NVTX
823  profiler->endRangeWaiting(id);
824 #endif
825  return data;
826  }
827 
835  return this->output->isInputTerminated();
836  }
837 
842  void setOutputConnector(std::shared_ptr<AnyConnector> connector) {
843  if (graphProducerEdges != nullptr) {
844  for (auto producerEdge : *graphProducerEdges)
845  producerEdge->updateEdge(std::dynamic_pointer_cast<Connector<U>>(connector), this);
846  }
847 
848  this->output = std::dynamic_pointer_cast<Connector<U>>(connector);
849 
850  }
851 
856  void setInputConnector(std::shared_ptr<AnyConnector> connector) {
857  if (graphConsumerEdge != nullptr) {
858  graphConsumerEdge->updateEdge(std::dynamic_pointer_cast<Connector<T>>(connector), this);
859  }
860 
861  this->input = std::dynamic_pointer_cast<Connector<T>>(connector);
862 
863  }
864 
865 
872  template<class V>
873  [[gnu::deprecated("Replaced by calling 'releaseMemory' directory with htgs::MemoryData (or m_data_t)")]]
874  void releaseMemory(m_data_t<V> memory) {
875  memory->releaseMemory();
876  // TODO: Delete or Add #ifdef
877 // std::shared_ptr<DataPacket> dataPacket = std::shared_ptr<DataPacket>(new DataPacket("TaskGraph",
878 // this->getAddress(),
879 // memory->getMemoryManagerName(),
880 // memory->getAddress(),
881 // memory));
882 #ifdef USE_NVTX
883  profiler->addReleaseMarker();
884 #endif
885 
886 // this->taskConnectorCommunicator->produceDataPacket(dataPacket);
887 
888 
889  }
890 
897  std::string genCustomDotForTasks(ProfileUtils *profileUtils, int colorFlag)
898  {
899  std::ostringstream oss;
900 
901  for (AnyTaskManager *bTask : *this->getTaskManagers()) {
902  oss << bTask->getTaskFunction()->genCustomDot(profileUtils, colorFlag);
903  }
904  return oss.str();
905  }
906 
912  std::string genDotGraphEdgesWithoutConnectors(std::map<AnyTaskManager *, TaskManagerProfile *> *allTaskManagerProfiles, int flags)
913  {
914 
915 
916  std::map<std::shared_ptr<AnyConnector>, AnyTaskManager *> inputConnectorMap;
917  std::map<std::shared_ptr<AnyConnector>, AnyITask *> inputConnectorDotMap;
918 
919  for (auto tmanProfiles : *allTaskManagerProfiles) {
920  auto bTask = tmanProfiles.first;
921  if (bTask->getThreadId() == 0) {
922  inputConnectorMap.insert(
923  std::pair<std::shared_ptr<AnyConnector>, AnyTaskManager *>(bTask->getInputConnector(), bTask));
924  inputConnectorDotMap.insert(
925  std::pair<std::shared_ptr<AnyConnector>, AnyITask *>(bTask->getInputConnector(), bTask->getTaskFunction()));
926  }
927  }
928 
929  // Add in the graph's input/output connectors as well
930  inputConnectorMap.insert(std::pair<std::shared_ptr<AnyConnector>, AnyTaskManager *>(this->getInputConnector(), nullptr));
931  inputConnectorMap.insert(std::pair<std::shared_ptr<AnyConnector>, AnyTaskManager *>(this->getOutputConnector(), nullptr));
932 
933 // inputConnectorDotMap.insert(std::pair<std::shared_ptr<AnyConnector>, std::string>(this->getInputConnector(), this->getInputConnector()->getDotId()));
934 // inputConnectorDotMap.insert(std::pair<std::shared_ptr<AnyConnector>, std::string>(this->getOutputConnector(), this->getOutputConnector()->getDotId()));
935 
936  std::ostringstream oss;
937 
938  for (auto tmanProfiles : *allTaskManagerProfiles)
939  {
940  auto bTask = tmanProfiles.first;
941 
942  if (bTask->getThreadId() == 0) {
943  oss << bTask->getTaskFunction()->genDotProducerEdgeToTask(inputConnectorDotMap, flags);
944  oss << bTask->getTaskFunction()->genDotConsumerEdgeFromConnector(this->getInputConnector(), flags);
945  oss << bTask->getTaskFunction()->genDotProducerEdgeFromConnector(this->getOutputConnector(), flags);
946  }
947 
948 // auto findConsumer = inputConnectorMap.find(bTask->getOutputConnector());
949 // if (findConsumer != inputConnectorMap.end())
950 // {
951 // // bTask is producing for findConsumer
952 // oss << bTask->getTaskFunction()->genDotProducerEdgeWithoutConnector(findConsumer->second->getTaskFunction()->getDotId(), flags);
953 // }
954 //
955 // if (this->getInputConnector() == bTask->getInputConnector())
956 // {
957  // input connector is connected to the task
958 // oss << this->getInputConnector()->getDotId() << " -> " << bTask->getTaskFunction()->getDotId() << ";" << std::endl;
959 // oss << bTask->getTaskFunction()->genDotConsumerEdgeWithoutConnector(this->getInputConnector()->getDotId(), flags);
960 // }
961 //
962 // if (this->getOutputConnector() == bTask->getOutputConnector())
963 // {
964 // // the output connector for the graph is connected to the task
965 // oss <<bTask->getTaskFunction()->genDotProducerEdgeWithoutConnector(this->getOutputConnector()->getDotId(), flags);
966 // }
967 
968 
969  }
970 
971  return oss.str();
972 
973  }
974 
978  std::string genDotGraph(int flags, int colorFlag, std::string graphTitle = "", std::string customTitleText = "") override {
979 
980  std::ostringstream oss;
981 
982  // Create header info for graphViz dot file
983  oss << "digraph { rankdir=\"TB\"" << std::endl;
984  oss << "forcelabels=true;" << std::endl;
985  oss << "node[shape=record, fontsize=10, fontname=\"Verdana\"];" << std::endl;
986  oss << "edge[fontsize=10, fontname=\"Verdana\"];" << std::endl;
987 
988  std::string graphTitleStr = graphTitle == "" ? "" : (graphTitle + "\\n");
989  std::string computeTimeStr = this->getGraphComputeTime() == 0 ? "" : "Compute time: " + std::to_string((double)this->getGraphComputeTime() / 1000000.0) + " s\\n";
990  std::string createTimeStr = this->getGraphCreationTime() == 0 ? "" : "Creation time: " + std::to_string((double)this->getGraphCreationTime() / 1000000.0) + " s\\n";
991 
992 
993 
994  oss << "graph [compound=true, labelloc=top, labeljust=left, "
995  << "label=\"" << graphTitleStr << computeTimeStr << createTimeStr <<customTitleText << "\",pad=\"0.5\", nodesep=\"0.5\", ranksep=\"0\"];" << std::endl;
996 
997 
998  // Gather profile data
999  TaskGraphProfiler profiler(flags);
1000  profiler.buildProfile(this);
1001 
1002  oss << genDotGraphContent(flags);
1003  if ((flags & DOTGEN_FLAG_SHOW_CONNECTORS) == 0 && (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) == 0) {
1005  }
1006 
1007  oss << profiler.genDotProfile(oss.str(), colorFlag);
1008  oss << genCustomDotForTasks(profiler.getProfileUtils(), colorFlag);
1009 
1010  if (this->graphConsumerEdge != nullptr)
1011  oss << this->getInputConnector()->getDotId() << "[label=\"Graph Input\\n"
1012  << ((flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) == 0 ? std::to_string(this->getInputConnector()->getProducerCount()) : "Active Producers: " + std::to_string(this->getInputConnector()->getProducerCount()))
1013  << ((flags & DOTGEN_FLAG_SHOW_CURRENT_Q_SZ) == 0 && (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) == 0 ? "" : "\\nQueue Size: " + std::to_string(this->getInputConnector()->getQueueSize()))
1014  << (((DOTGEN_FLAG_SHOW_IN_OUT_TYPES & flags) != 0) ? ("\\n" + this->getInputConnector()->typeName()) : "")
1015  << "\"];" << std::endl;
1016 
1017 
1018 // if (getGraphProducerTaskManagers()->size() > 0)
1019  if (this->graphProducerEdges->size() > 0)
1020  oss << "{ rank = sink; " << this->getOutputConnector()->getDotId() << "[label=\"Graph Output\\n"
1021  << ((flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) == 0 ? std::to_string(this->getInputConnector()->getProducerCount()) : "Active Producers: " + std::to_string(this->getOutputConnector()->getProducerCount()))
1022  << ((flags & DOTGEN_FLAG_SHOW_CURRENT_Q_SZ) == 0 && (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) == 0 ? "" : "\\nQueue Size: " + std::to_string(this->getOutputConnector()->getQueueSize()))
1023  << (((DOTGEN_FLAG_SHOW_IN_OUT_TYPES & flags) != 0) ? ("\\n" + this->getOutputConnector()->typeName()) : "")
1024  << "\"]; }" << std::endl;
1025 
1026  if (oss.str().find("mainThread") != std::string::npos) {
1027  oss << "{ rank = sink; mainThread[label=\"Main Thread\", fillcolor = aquamarine4]; }" << std::endl;
1028  }
1029 
1030  oss << "}" << std::endl;
1031 
1032  return oss.str();
1033  }
1034 
1039  void debug() {
1040  HTGS_DEBUG("-----------------------------------------------");
1041  HTGS_DEBUG("TaskGraphConf -- num vertices: " << this->getTaskManagers()->size() << " -- DETAILS:");
1042 
1043  for (AnyTaskManager *t : *this->getTaskManagers()) {
1044  t->debug();
1045  }
1046  HTGS_DEBUG("-----------------------------------------------");
1047  }
1048 
1049 #ifdef WS_PROFILE
1050  void sendProfileData(std::shared_ptr<ProfileData> profileData) override
1051  {
1052  this->wsProfileTaskManager->getInputConnector()->produceAnyData(profileData);
1053  }
1054 #endif
1055 
1065  ExecutionPipeline<T, U> *createExecutionPipeline(size_t numPipelines, std::string name = "Execution Pipeline", bool waitForInit = true)
1066  {
1067  return new ExecutionPipeline<T, U>(numPipelines, this, name, waitForInit);
1068  }
1069 
1079  TGTask<T, U> *createTaskGraphTask(std::string name = "TGTask", bool waitForInit = true)
1080  {
1081  return new TGTask<T, U>(this, name, waitForInit);
1082  }
1083 
1084  GraphEdge<T> *getGraphConsumerEdge() const {
1085  return graphConsumerEdge;
1086  }
1087 
1088  std::list<GraphEdge<U> *> *getGraphProducerEdges() const {
1089  return graphProducerEdges;
1090  }
1091 
1092  private:
1093 
1095 
1096 
1097 
1098 // void copyAndUpdateGraphConsumerTask(AnyTaskManager *taskManager) {
1099 // if (taskManager != nullptr) {
1100 // AnyTaskManager *copy = this->getTaskManagerCopy(taskManager->getTaskFunction());
1101 // this->graphConsumerEdge = copy;
1102 // this->graphConsumerEdge->setInputConnector(this->input);
1103 // this->addTaskManager(this->graphConsumerEdge);
1104 //
1105 //#ifdef WS_PROFILE
1106 // // TODO: Ensure this is doable with visualizer, this is an update edge and remove node type task
1107 // // Add nodes
1117 //#endif
1118 //
1119 // }
1120 // }
1121 //
1122 // void copyAndUpdateGraphProducerTasks(std::list<AnyTaskManager *> *taskManagers) {
1123 // for (auto taskManager : *taskManagers) {
1124 // if (taskManager != nullptr) {
1125 // AnyTaskManager *copy = this->getTaskManagerCopy(taskManager->getTaskFunction());
1126 //
1127 // copy->setOutputConnector(this->output);
1128 // this->graphProducerEdges->push_back(copy);
1129 //
1130 // this->output->incrementInputTaskCount();
1131 // this->addTaskManager(copy);
1132 //
1133 //#ifdef WS_PROFILE
1134 // // TODO: Ensure this is doable with visualizer, this is an update edge and remove node type task
1135 // // Add nodes
1145 //#endif
1146 // }
1147 // }
1148 // }
1149 
1150  void setGraphConsumerEdge(GraphEdge<T> *consumerEdge)
1151  {
1152  this->graphConsumerEdge = consumerEdge;
1153  }
1154 
1155  void addGraphProducerEdge(GraphEdge<U> *producerEdge)
1156  {
1157  graphProducerEdges->push_back(producerEdge);
1158  }
1159 
1160  void addEdgeDescriptor(EdgeDescriptor *edge) {
1161  this->edges->push_back(edge);
1162  }
1163 
1164  typedef AnyTaskGraphConf super;
1166 
1167  std::list<EdgeDescriptor *> *
1169 
1171  std::list<GraphEdge<U> *> *
1173 
1174  std::shared_ptr<Connector<T>> input;
1175  std::shared_ptr<Connector<U>> output;
1176 
1177  // TODO: Delete or Add #ifdef
1178 // TaskGraphCommunicator *taskConnectorCommunicator; //!< The task graph communicator for the task graph.
1179 
1180 
1181 #ifdef WS_PROFILE
1182  TaskManager<ProfileData, VoidData> *wsProfileTaskManager; // !< The task manager for the web socket profiler
1183  std::thread *wsProfileThread; // !< The thread for the web socket profiler task manager
1184  TaskManagerThread *runtimeThread;
1185 #endif
1186 
1187 #ifdef USE_NVTX
1188  NVTXProfiler *profiler;
1189 #endif
1190 
1191 };
1192 }
1193 
1194 #endif //HTGS_TASKGRAPHCONF_HPP
void incrementGraphProducer()
Sets the input connector for the task graph.
Definition: TaskGraphConf.hpp:670
ProfileUtils * getProfileUtils()
Gets the profile utility class to obtain color codes based on the total execution time...
Definition: TaskGraphProfiler.hpp:164
size_t getNumThreads() const
Gets the number of threads associated with this TaskManager.
Definition: AnyTaskManager.hpp:298
virtual void applyEdge(AnyTaskGraphConf *graph)=0
Applies an edge to a task graph.
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
TaskGraphConf< T, U > * copy(size_t pipelineId, size_t numPipelines, std::shared_ptr< Connector< T >> input, std::shared_ptr< Connector< U >> output, std::string baseAddress)
Creates a mirror copy of the TaskGraph with the specified pipelineId and number of pipelines...
Definition: TaskGraphConf.hpp:321
void produceData(std::shared_ptr< T > data)
Produces data for the input of the TaskGraph.
Definition: TaskGraphConf.hpp:775
std::string getMemoryManagerName()
Gets the name of the memory manager.
Definition: MemoryManager.hpp:197
Implements the memory edge, which is an edge descriptor.
Implements a MemoryManager that binds the thread responsible for the MemoryManager to a CUDA GPU prio...
Definition: CudaMemoryManager.hpp:34
unsigned long long int getGraphComputeTime() const
Gets the total time the graph was computing.
Definition: AnyTaskGraphConf.hpp:536
MMType
The memory manager types.
Definition: MMType.hpp:38
void setOutputConnector(std::shared_ptr< AnyConnector > connector)
Sets the output connector for the task graph configuration.
Definition: TaskGraphConf.hpp:842
virtual EdgeDescriptor * copy(AnyTaskGraphConf *graph)=0
Creates a copy of the edge descriptor to be added to other graphs, such as those within execution pip...
std::string genDotProfile(std::string curDotGraph, int colorFlag)
Generates the dot profile for the graph.
Definition: TaskGraphProfiler.hpp:102
TGTask is the task graph task, which is used to bundle a graph as a task, which can then be connected...
Definition: TaskGraphConf.hpp:44
void copyTasks(std::list< AnyTaskManager *> *tasks)
Creates a copy of each task from the list of AnyTaskManagers passed as a parameter.
Definition: AnyTaskGraphConf.hpp:576
std::string getAddress()
Gets the address for the task graph.
Definition: AnyTaskGraphConf.hpp:516
unsigned long long int getGraphCreationTime() const
Gets the total time the graph was getting created.
Definition: AnyTaskGraphConf.hpp:543
TaskGraphConf< T, U > * copy(size_t pipelineId, size_t numPipelines)
Creates a mirror copy of the TaskGraph with the specified pipelineId and number of pipelines...
Definition: TaskGraphConf.hpp:307
void debug()
Provides debug output for the TaskGraphConf.
Definition: TaskGraphConf.hpp:1039
Implements the rule edge that is added to the graph.
Definition: RuleEdge.hpp:44
std::list< GraphEdge< U > * > * graphProducerEdges
The list of producers that are outputting data to the TaskGraph&#39;s output connector.
Definition: TaskGraphConf.hpp:1172
Holds the TGTask class implementation.
bool isOutputTerminated()
Checks if the output of the TaskGraph has finished producing data.
Definition: TaskGraphConf.hpp:834
std::list< EdgeDescriptor * > * edges
The list of edges for the graph, represented by edge descriptors to define how the edges are copied/a...
Definition: TaskGraphConf.hpp:1168
void updateTaskManagersAddressingAndPipelines()
Updates the task managers addresses, pipelineIds and the number of pipelines for all tasks in the Tas...
Definition: TaskGraphConf.hpp:595
void finishedProducingData()
Decrements the input connector and wakes up any consumer of the graph&#39;s input if the input connector ...
Definition: TaskGraphConf.hpp:680
void addEdge(ITask< V, W > *producer, ITask< W, X > *consumer)
Adds an edge to the graph, where one task produces data for a consumer task.
Definition: TaskGraphConf.hpp:388
virtual GraphEdge< T > * copy(AnyTaskGraphConf *graph)=0
Creates a copy of the edge descriptor to be added to other graphs, such as those within execution pip...
void addCustomMemoryManagerEdge(AnyITask *getMemoryTask, MemoryManager< V > *memoryManager)
Adds a custom MemoryManager with the specified name to the TaskGraphConf.
Definition: TaskGraphConf.hpp:518
Implements the memory edge that is added to the graph.
Definition: MemoryEdge.hpp:39
Implements the task graph profiler for gathering and communicating the results via graphviz...
void buildProfile(AnyTaskGraphConf *graphConf)
Builds a profile for the graph, (called after execution is done)
Definition: TaskGraphProfiler.hpp:75
std::map< AnyTaskManager *, TaskManagerProfile * > * getTaskManagerProfiles()
Gets the task manager profiles for all tasks in all graphs and sub-graphs.
Definition: TaskGraphProfiler.hpp:173
Definition: GraphEdge.hpp:16
TaskGraphConf(size_t pipelineId, size_t numPipelines, std::string baseAddress)
Constructs a TaskGraph.
Definition: TaskGraphConf.hpp:205
void addCudaMemoryManagerEdge(std::string name, AnyITask *getMemoryTask, IMemoryAllocator< V > *allocator, size_t memoryPoolSize, MMType type, int *gpuIds)
Adds a CudaMemoryManager edge with the specified name to the TaskGraphConf.
Definition: TaskGraphConf.hpp:483
std::string genDotGraphEdgesWithoutConnectors(std::map< AnyTaskManager *, TaskManagerProfile *> *allTaskManagerProfiles, int flags)
Generates the edges between tasks without connectors.
Definition: TaskGraphConf.hpp:912
Implements the rule edge, which is an edge descriptor.
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
Provides an interface to send data along RuleManager edges for processing state and dependencies...
Definition: ExecutionPipeline.hpp:34
Definition: GraphTaskConsumerEdge.hpp:13
void addRuleEdge(Bookkeeper< V > *bookkeeper, std::shared_ptr< IRuleType > rule, ITask< W, X > *consumer)
Creates a rule edge that is managed by a bookkeeper.
Definition: TaskGraphConf.hpp:406
GraphEdge< T > * graphConsumerEdge
The consumer accessing the TaskGraph&#39;s input connector.
Definition: TaskGraphConf.hpp:1170
bool isPoll()
Gets whether the task manager is polling for data or not.
Definition: AnyTaskManager.hpp:351
std::list< AnyTaskManager * > * getTaskManagers()
Virtual function that initiates updating the task graph communicator.
Definition: AnyTaskGraphConf.hpp:180
std::shared_ptr< Connector< T > > input
The input connector for the TaskGraph.
Definition: TaskGraphConf.hpp:1174
void produceData(T *data)
Produces data for the input of the TaskGraph.
Definition: TaskGraphConf.hpp:763
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: GraphTaskConsumerEdge.hpp:20
ExecutionPipeline< T, U > * createExecutionPipeline(size_t numPipelines, std::string name="Execution Pipeline", bool waitForInit=true)
Wraps this task graph into an execution pipeline task, which will then be used to duplicate and execu...
Definition: TaskGraphConf.hpp:1065
std::string genCustomDotForTasks(ProfileUtils *profileUtils, int colorFlag)
Generates the custom dot profiles for all tasks in this graph.
Definition: TaskGraphConf.hpp:897
bool isStartTask()
Gets whether this task manager will begin executing immediately with nullptr data or not...
Definition: AnyTaskManager.hpp:343
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: GraphTaskProducerEdge.hpp:17
void addRuleEdge(Bookkeeper< V > *bookkeeper, IRule< V, W > *iRule, ITask< W, X > *consumer)
Creates a rule edge that is managed by a bookkeeper.
Definition: TaskGraphConf.hpp:425
TaskGraphConf()
Constructs a TaskGraph.
Definition: TaskGraphConf.hpp:153
void releaseMemory(m_data_t< V > memory)
Releases memory back to its memory manager.
Definition: TaskGraphConf.hpp:874
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: ProducerConsumerEdge.hpp:52
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: RuleEdge.hpp:58
The parent class for a Task that removes the template arguments.
Definition: AnyTaskManager.hpp:45
void addMemoryManagerEdge(std::string name, AnyITask *getMemoryTask, IMemoryAllocator< V > *allocator, size_t memoryPoolSize, MMType type)
Adds a MemoryManager edge with the specified name to the TaskGraphConf.
Definition: TaskGraphConf.hpp:562
std::string genDotGraphContent(int flags)
Generate the content only of the graph (excludes all graph definitions and attributes) ...
Definition: AnyTaskGraphConf.hpp:550
void setInputConnector(std::shared_ptr< AnyConnector > connector)
Sets the input connector for the task graph configuration.
Definition: TaskGraphConf.hpp:856
std::shared_ptr< AnyConnector > getOutputConnector() override
Virtual function that gets the connector used for graph output.
Definition: TaskGraphConf.hpp:590
std::shared_ptr< U > consumeData()
Consumes data from the output of a TaskGraph.
Definition: TaskGraphConf.hpp:801
~TaskGraphConf() override
Destructor, handles releasing all ITask memory that is managed by the TaskGraph.
Definition: TaskGraphConf.hpp:237
std::string genDotGraph(int flags, int colorFlag, std::string graphTitle="", std::string customTitleText="") override
Generates the dot graph as a string.
Definition: TaskGraphConf.hpp:978
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
Implements a producer consumer edge, which is a type of edge descriptor.
Processes MemoryData between two ITasks using a memory pool.
Definition: MemoryManager.hpp:47
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
size_t numPipelines
The number of pipelines from this graph.
Definition: AnyTaskGraphConf.hpp:635
Definition: GraphTaskProducerEdge.hpp:11
Abstract class that describes how memory is allocated and freed.
Definition: IMemoryAllocator.hpp:67
Manages a group of connected ITasks and their connections.
Definition: ExecutionPipeline.hpp:37
#define DOTGEN_FLAG_SHOW_IN_OUT_TYPES
Shows input and output types for all tasks.
Definition: TaskGraphDotGenFlags.hpp:32
Manages a TaskManager that is bound to a thread for execution.
Definition: AnyTaskManager.hpp:573
void produceData(std::list< std::shared_ptr< T >> *dataList)
Adds a list of data into the TaskGraph Must specify the TaskGraph input using addGraphInputConsumer()...
Definition: TaskGraphConf.hpp:786
Encapsulates an ITask to interact with an ITask&#39;s functionality.
Definition: ITask.hpp:39
std::shared_ptr< Connector< U > > output
The output connector for the TaskGraph.
Definition: TaskGraphConf.hpp:1175
std::shared_ptr< AnyConnector > getInputConnector() override
Pure virtual function that gets the task manager that is consuming data from the graph&#39;s input...
Definition: TaskGraphConf.hpp:586
size_t pipelineId
The pipelineId for the task graph.
Definition: AnyTaskGraphConf.hpp:634
void addGraphProducerTask(ITask< W, U > *task)
Sets the task that is producing data for the output of the graph.
Definition: TaskGraphConf.hpp:733
AnyTaskGraphConf * copy() override
Creates an exact copy of this task graph.
Definition: TaskGraphConf.hpp:297
Implements the base class for the TaskGraphConf class, removing the template arguments and providing ...
Definition: AnyTaskGraphConf.hpp:66
void setInputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the input BaseConnector.
Definition: TaskManager.hpp:318
void addCudaMemoryManagerEdge(std::string name, AnyITask *getMemoryTask, std::shared_ptr< IMemoryAllocatorType > allocator, size_t memoryPoolSize, MMType type, int *gpuIds)
Adds a CudaMemoryManager edge with the specified name to the TaskGraphConf.
Definition: TaskGraphConf.hpp:448
The edge descriptor is an interface used to describe how an edge is applied and copied to a task grap...
Definition: EdgeDescriptor.hpp:39
void setGraphConsumerTask(ITask< T, W > *task)
Sets the task that is consuming data from the input of the graph.
Definition: TaskGraphConf.hpp:700
Provides the implementation for a MemoryManager for Cuda MemoryData.
std::shared_ptr< IMemoryAllocator< V > > getMemoryAllocator(IMemoryAllocator< V > *allocator)
Gets the shared_ptr reference for a particular IMemoryAllocator.
Definition: AnyTaskGraphConf.hpp:222
std::pair< std::string, std::shared_ptr< AnyConnector > > TaskNameConnectorPair
Defines a pair to be added into a TaskNameConnectorMap.
Definition: AnyTaskGraphConf.hpp:56
size_t getPipelineId()
Gets the pipeline ID for the task graph configuration.
Definition: AnyTaskGraphConf.hpp:418
The ExecutionPipeline class is used to duplicate task graphs, such that each duplicate executes concu...
Definition: ExecutionPipeline.hpp:96
The task graph profiler that gathers profile data and communicates via graphviz.
Definition: TaskGraphProfiler.hpp:42
TGTask< T, U > * createTaskGraphTask(std::string name="TGTask", bool waitForInit=true)
Wraps this task graph into a TGTask.
Definition: TaskGraphConf.hpp:1079
Implements the producer consumer edge that connects two tasks where one task is producing data and th...
Definition: ProducerConsumerEdge.hpp:40
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: MemoryEdge.hpp:56
Implements the task graph communicator task that communicates from each task to all other tasks in a ...
std::shared_ptr< MemoryData< V > > m_data_t
Defines a shared pointer to htgs::MemoryData.
Definition: Types.hpp:101
std::shared_ptr< U > pollData(size_t microTimeout)
Polls for data from the output of the TaskGraph.
Definition: TaskGraphConf.hpp:817
Definition: ProfileUtils.hpp:13
Definition: Bookkeeper.hpp:23
Definition: GraphRuleProducerEdge.hpp:15
size_t getNumPipelines()
Gets the number of pipelines that exist for this task graph.
Definition: AnyTaskGraphConf.hpp:424
ExecutionPipeline encapsulates a task graph and duplicates it, such that each duplicate task graph ex...
void applyEdge(AnyTaskGraphConf *graph) override
Applies an edge to a task graph.
Definition: GraphRuleProducerEdge.hpp:24
#define DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE
Shows verbose information within each connector in the graph.
Definition: TaskGraphDotGenFlags.hpp:92
Special task used to manage rules.
Definition: Bookkeeper.hpp:60
void addMemoryManagerEdge(std::string name, AnyITask *getMemoryTask, std::shared_ptr< IMemoryAllocatorType > allocator, size_t memoryPoolSize, MMType type)
Adds a MemoryManager edge with the specified name to the TaskGraphConf.
Definition: TaskGraphConf.hpp:537
#define DOTGEN_FLAG_SHOW_CURRENT_Q_SZ
Displays the current queue size within each connector.
Definition: TaskGraphDotGenFlags.hpp:86