HTGS  v2.0
The Hybrid Task Graph Scheduler
ExecutionPipeline.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
17 #ifndef HTGS_EXECUTIONPIPELINE_H
18 #define HTGS_EXECUTIONPIPELINE_H
19 
20 #include <cstring>
21 
23 #include <htgs/api/ITask.hpp>
24 #include <htgs/api/Bookkeeper.hpp>
27 #include <htgs/api/IData.hpp>
28 
29 namespace htgs {
30 
31  class TaskGraphRuntime;
32 
33  template<class V, class W>
34  class IRule;
35 
36  template<class V, class W>
38 
39 
40 
95 template<class T, class U>
96 class ExecutionPipeline : public ITask<T, U> {
97  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
98  static_assert(std::is_base_of<IData, U>::value, "U must derive from IData");
99 
100  public:
108  ExecutionPipeline(size_t numPipelines, TaskGraphConf<T, U> *graph, std::string name = "Execution Pipeline", bool waitForInit = true) {
110  this->graph = graph;
111  this->inputBk = new Bookkeeper<T>();
112  this->runtimes = new std::vector<TaskGraphRuntime *>();
113  this->inputRules = std::shared_ptr<IRuleList<T, T>>(new IRuleList<T, T>());
114  this->graphs = new std::vector<TaskGraphConf<T, U> *>();
115  this->waitForInit = waitForInit;
116  this->name = name;
117  }
118 
127  ExecutionPipeline(size_t numPipelines, TaskGraphConf<T, U> *graph, std::shared_ptr<IRuleList<T, T>> rules, std::string name = "Execution Pipeline", bool waitForInit = true) {
129  this->graph = graph;
130  this->inputBk = new Bookkeeper<T>();
131  this->runtimes = new std::vector<TaskGraphRuntime *>();
132  this->inputRules = rules;
133  this->graphs = new std::vector<TaskGraphConf<T, U> *>();
134  this->waitForInit = waitForInit;
135  this->name = name;
136  }
137 
142  for (TaskGraphRuntime *runtime : *runtimes) {
143  HTGS_DEBUG_VERBOSE("Execution pipeline: " << this << " Freeing memory for runtime: " << runtime);
144  delete runtime;
145  runtime = nullptr;
146  }
147 
148 
149  HTGS_DEBUG_VERBOSE("Execution pipeline: " << this << " Freeing memory for graph: " << graph);
150  delete graph;
151  graph = nullptr;
152 
153  delete runtimes;
154  runtimes = nullptr;
155 
156  delete inputBk;
157  inputBk = nullptr;
158 
159  delete graphs;
160  graphs = nullptr;
161 
162 // delete inputRules;
163 // inputRules = nullptr;
164  }
165 
166  size_t getNumGraphsSpawned() override {
167  return this->numPipelinesExec * graph->getNumberOfSubGraphs() + this->numPipelinesExec;
168 // (graph->getNumberOfSubGraphs() == 0 ? 1 : graph->getNumberOfSubGraphs());
169  }
170 
177  void addInputRule(IRule<T, T> *rule) {
178  this->inputRules->push_back(std::shared_ptr<IRule<T, T>>(rule));
179  }
180 
188  void addInputRule(std::shared_ptr<IRule<T, T>> rule) {
189  this->inputRules->push_back(rule);
190  }
191 
199  void initialize() {
200  HTGS_DEBUG("Initializing Execution pipeline with " << this->numPipelinesExec << " pipelines");
201 
202  // Add a default broadcast rule if the pipeline has no rules
203  if (this->inputRules->size() == 0) {
204  std::cerr
205  << "ERROR: Your execution pipeline does not have any decomposition rules. You must add at least one: addInputRule(IRule)"
206  << std::endl;
207  exit(-1);
208 // this->addInputRule(new ExecutionPipelineBroadcastRule<T>());
209  }
210 
211  // Create output connector using the execution pipelines output
212  std::shared_ptr<Connector<U>>
213  outputConnector = std::static_pointer_cast<Connector<U>>(this->getOwnerTaskManager()->getOutputConnector());
214 
215  for (size_t i = 0; i < numPipelinesExec; i++) {
216  HTGS_DEBUG("Adding pipeline " << i);
218  *graphCopy = this->graph->copy(i, this->numPipelinesExec, nullptr, outputConnector, this->getAddress());
219  // TODO: Remove or Add #ifdef this->getTaskGraphCommunicator());
220 
221 
222 #ifdef WS_PROFILE
223  // TODO: Update parent for graphCopy to point to the execution pipeline
224 #endif
225  HTGS_DEBUG("Setting up input and output of pipeline " << i);
226 
227  for (std::shared_ptr<IRule<T, T>> rule : *this->inputRules) {
228 
229  RuleManager<T, T> *ruleManager = new RuleManager<T, T>(rule /* TODO: Remove or Add #ifdef , this->getTaskGraphCommunicator()*/);
230  ruleManager->setOutputConnector(graphCopy->getInputConnector());
231  ruleManager->initialize(i, this->numPipelinesExec, this->getAddress());
232 
233  this->inputBk->addRuleManager(ruleManager);
234  }
235 
236  graphs->push_back(graphCopy);
237 
238  }
239 
240  for (TaskGraphConf<T, U> *g : *graphs) {
241  TaskGraphRuntime *runtime = new TaskGraphRuntime(g);
242  runtime->executeRuntime();
243  this->runtimes->push_back(runtime);
244  }
245 
246  if (waitForInit) {
247  for (TaskGraphConf <T, U> *g : *graphs) {
248  g->waitForInitialization();
249  }
250  }
251  }
252 
257  void shutdown() {
258  HTGS_DEBUG("Shutting down " << this->getName());
259  this->inputBk->shutdown();
260 
261  // Spawn thread for each runtime to properly wait without blocking.
262  std::vector<std::thread *> shutdownThreads;
263 
264  for (size_t i = 0; i < runtimes->size(); i++)
265  {
266  std::thread *t = new std::thread(&ExecutionPipeline<T, U>::shutdownParallel, this, i);
267  shutdownThreads.push_back(t);
268  }
269 
270  for (std::thread *t : shutdownThreads)
271  {
272  if (t->joinable())
273  t->join();
274 
275  delete t;
276  }
277 
278 //
279 // for (std::vector<TaskGraphRuntime *>::reverse_iterator r = runtimes->rbegin();
280 // r != runtimes->rend(); ++r ) {
281 // (*r)->waitForRuntime();
282 // }
283 
284 // for (TaskGraphRuntime *r : *this->runtimes) {
285 // r->waitForRuntime();
286 // }
287  }
288 
293  void shutdownParallel(int id) {
294  (*runtimes)[id]->waitForRuntime();
295  }
296 
303  void executeTask(std::shared_ptr<T> data) {
304  if (data != nullptr) {
305  this->inputBk->executeTask(data);
306  }
307  }
308 
313  std::string getName() {
314  return name;
315  }
316 
324  return new ExecutionPipeline<T, U>(this->numPipelinesExec,
325  this->graph->copy(this->getPipelineId(), this->getNumPipelines()),
326  this->inputRules, this->name, this->waitForInit);
327  }
328 
333  void debug() {
334  HTGS_DEBUG(this->getName() << " " << numPipelinesExec << " pipelines; details:");
335  inputBk->debug();
336  }
337 
338  virtual void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override {
339  // Gather profile data for each graph
340  if (graphs->size() > 0) {
341  for (auto g : *graphs) {
342  g->gatherProfilingData(taskManagerProfiles);
343  }
344  } else {
345  graph->gatherProfilingData(taskManagerProfiles);
346  }
347 
348  }
349 
350  void printProfile() override {
351  for (auto g : *graphs) {
352  g->printProfile();
353  }
354  }
355 
356 //#ifdef PROFILE
357 // std::string getDotProfile(int flags,
358 // std::unordered_map<std::string, double> *mmap, double val,
359 // std::string desc, std::unordered_map<std::string, std::string> *colorMap)
360 // {
361 // std::ostringstream oss;
362 //
363 // if (graphs->size() > 0)
364 // {
365 // for (auto g : *graphs)
366 // {
367 // oss << g->genProfileGraph(flags, mmap, desc, colorMap);
368 // }
369 // }
370 // else
371 // {
372 // oss << graph->genProfileGraph(flags, mmap, desc, colorMap);
373 // }
374 //
375 // return oss.str();
376 // }
377 //
378 //#endif
379 
380  std::string genCustomDot(ProfileUtils *profileUtils, int colorFlag) override {
381  if (profileUtils == nullptr)
382  return "";
383 
384  std::ostringstream oss;
385  int pipeline = 0;
386  for (auto g : *graphs) {
387  double time = 0.0;
388  if (colorFlag == DOTGEN_COLOR_COMP_TIME)
389  time = (double) g->getGraphComputeTime();
390  else if (colorFlag == DOTGEN_COLOR_WAIT_TIME)
391  time = (double) this->getOwnerTaskManager()->getWaitTime();
392  else if (colorFlag == DOTGEN_COLOR_MAX_Q_SZ)
393  time = (double) this->getOwnerTaskManager()->getMaxQueueSize();
394 
395  oss << "subgraph cluster_" << this->getDotId() << std::to_string(pipeline) << " {" << std::endl;
396 
397  std::string color = profileUtils->getColorForTime(time);
398 
399  oss << (colorFlag != 0 ? "penwidth=5\ncolor=\"" + color + "\"" : "color=orange");
400  oss << std::endl;
401  pipeline++;
402 
403  oss << g->genCustomDotForTasks(profileUtils, colorFlag);
404 
405  oss << "}" << std::endl;
406  }
407 
408  return oss.str();
409  }
410 
411  virtual std::string genDotProducerEdgeToTask(std::map<std::shared_ptr<AnyConnector>, AnyITask *> &inputConnectorDotMap, int dotFlags) override
412  {
413  std::ostringstream oss;
414 
415 // oss << this->getOwnerTaskManager()->getInputConnector()->genDot(dotFlags);
416 
417  std::string inputRuleNames;
418  int count = 0;
419  for (std::shared_ptr<IRule<T, T>> rule : *inputRules) {
420  if (count == 0)
421  inputRuleNames = inputRuleNames + rule->getName();
422  else
423  inputRuleNames = inputRuleNames + ", " + rule->getName();
424  count++;
425  }
426 
427  if (graphs->size() > 0)
428  {
429  for (auto g : *graphs)
430  {
431  auto graphConsumer = g->getGraphConsumerEdge();
432 
433  oss << inputBk->getDotId()
434  << " -> " << graphConsumer->getTaskManager(g)->getTaskFunction()->getConsumerDotIds() << "[label=\"" << inputRuleNames
435  << "\"];" << std::endl;
436  }
437  } else {
438  auto graphConsumer = graph->getGraphConsumerEdge();
439 
440 
441  oss << inputBk->getDotId() << " -> "
442  << graphConsumer->getTaskManager(graph)->getTaskFunction()->getConsumerDotIds() << "[label=\"" << inputRuleNames
443  << "\"];"<< std::endl;
444 
445 
446  }
447 
448  return oss.str();
449 
450  }
451 
452  virtual std::string genDotProducerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags)
453  {
454  return "";
455  }
456 
457  virtual std::string genDotConsumerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags) override
458  {
459 
460  if (this->getOwnerTaskManager()->getInputConnector() != nullptr && connector != nullptr &&
461  this->getOwnerTaskManager()->getInputConnector() == connector) {
462  return connector->getDotId() + " -> " + inputBk->getDotId() + ";\n";
463  }
464  return "";
465  }
466 
467  std::string getConsumerDotIds() override {
468 
469  std::ostringstream oss;
470 
471  // Get inputRule edge name
472  std::string inputRuleNames;
473  int count = 0;
474  for (std::shared_ptr<IRule<T, T>> rule : *inputRules) {
475  if (count == 0)
476  inputRuleNames = inputRuleNames + rule->getName();
477  else
478  inputRuleNames = inputRuleNames + ", " + rule->getName();
479  count++;
480  }
481 
482  oss << inputBk->getDotId();
483 
484 // if (this->graphs->size() > 0)
485 // {
486 // oss << "{";
487 // for (auto g : *this->graphs) {
488 // oss << g->getGraphConsumerTaskManager()->getTaskFunction()->getConsumerDotIds() << ";";
489 // }
490 // oss << "} ";
491 // } else {
492 // oss << graph->getGraphConsumerTaskManager()->getTaskFunction()->getConsumerDotIds();
493 // }
494 
495  oss << " [label=\"" << inputRuleNames
496  << "\"]" << std::endl;
497 
498  return oss.str();
499  }
500 
501 // std::string genDotConsumerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags) override
502 // {
503 // return "";
504 // }
505 //
506 //
507 // std::string genDotProducerEdgeToTask(std::map<std::shared_ptr<AnyConnector>, AnyITask *> &inputConnectorDotMap, int dotFlags) override
508 // {
509 // std::ostringstream oss;
510 //
511 // oss << this->getOwnerTaskManager()->getInputConnector()->genDot(dotFlags);
512 //
513 // std::string inputRuleNames;
514 // int count = 0;
515 // for (std::shared_ptr<IRule<T, T>> rule : *inputRules) {
516 // if (count == 0)
517 // inputRuleNames = inputRuleNames + rule->getName();
518 // else
519 // inputRuleNames = inputRuleNames + ", " + rule->getName();
520 // count++;
521 // }
522 //
523 // if (graphs->size() > 0)
524 // {
525 // for (auto g : *graphs)
526 // {
527 // oss << this->getOwnerTaskManager()->getInputConnector()->getDotId()
528 // << " -> " << g->getGraphConsumerTaskManager()->getTaskFunction()->getDotId() << "[label=\"" << inputRuleNames
529 // << "\"];" << std::endl;
530 // }
531 // } else {
532 // oss << this->getOwnerTaskManager()->getInputConnector()->getDotId() << " -> "
533 // << graph->getGraphConsumerTaskManager()->getTaskFunction()->getDotId() << "[label=\"" << inputRuleNames
534 // << "\"];"<< std::endl;
535 // }
536 //
546 // return oss.str();
547 // }
548 
555  std::string genDot(int flags,
556  std::string dotId,
557  std::shared_ptr<AnyConnector> input,
558  std::shared_ptr<AnyConnector> output) override {
559  std::ostringstream oss;
560 
561  std::string aliveLabel =
562  ((flags & DOTGEN_FLAG_SHOW_TASK_LIVING_STATUS) != 0) ? ("\\nLiving threads: " + std::to_string(this->getOwnerTaskManager()->getThreadsRemaining())) : "";
563  std::string inOutLabel =
564  (((flags & DOTGEN_FLAG_SHOW_IN_OUT_TYPES) != 0) ? ("\\nin: " + this->inTypeName() + "\\nout: "
565  + this->outTypeName()) : "");
566  oss << inputBk->getDotId() + "[label=\"" + "Execution Pipeline Bookkeeper";
567  oss << inOutLabel + "\\n";
568  oss << aliveLabel;
569  oss << "\",shape=" + this->getDotShape();
570  oss << ",style=filled";
571  oss << ",fillcolor=lightgrey";
572  oss << ",color=orange";
573  oss << ",width=.2,height=.2];\n";
574 
575 
576  if ((flags & DOTGEN_FLAG_SHOW_CONNECTORS) != 0 || (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) != 0) {
577  oss << input->genDot(flags);
578  }
579  // Get inputRule edge name
580  std::string inputRuleNames;
581  int count = 0;
582  for (std::shared_ptr<IRule<T, T>> rule : *inputRules) {
583  if (count == 0)
584  inputRuleNames = inputRuleNames + rule->getName();
585  else
586  inputRuleNames = inputRuleNames + ", " + rule->getName();
587  count++;
588  }
589 
590  if ((flags & DOTGEN_FLAG_SHOW_CONNECTORS) != 0 || (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) != 0) {
591  // Draw decomposition rule edge to each graph
592  oss << input->getDotId() << " -> " << inputBk->getDotId() << ";" << std::endl;
593  if (graphs->size() > 0) {
594  for (auto g : *graphs) {
595 
596  oss << inputBk->getDotId() << " -> " << g->getInputConnector()->getDotId() << "[label=\"" << inputRuleNames
597  << "\"];" << std::endl;
598  }
599  } else {
600  oss << inputBk->getDotId() << " -> " << graph->getInputConnector()->getDotId() << "[label=\"" << inputRuleNames
601  << "\"];" << std::endl;
602  }
603  }
604 
605  // Setup subgraph to draw execPipeline graphs
606  if (this->graphs->size() > 0) {
607  int pipeline = 0;
608  for (auto g : *graphs) {
609  std::string computeTimeStr = g->getGraphComputeTime() == 0 ? "" : "Compute time: " + std::to_string((double)g->getGraphComputeTime() / 1000000.0) + " s\\n";
610  std::string createTimeStr = g->getGraphCreationTime() == 0 ? "" : "Creation time: " + std::to_string((double)g->getGraphCreationTime() / 1000000.0) + " s\\n";
611 
612  oss << "subgraph cluster_" << dotId << std::to_string(pipeline) << " {" << std::endl;
613  oss << "label=\"" << getName() << std::to_string(pipeline) << "\\n" << computeTimeStr << createTimeStr << "\";" << std::endl;
614  oss << "style=\"dashed\";" << std::endl;
615  oss << "style =\"filled\";" << std::endl;
616  oss << "fillcolor=lightgrey;" << std::endl;
617  oss << "color=orange;" << std::endl;
618  oss << g->genDotGraphContent(flags);
619  oss << "}" << std::endl;
620  pipeline++;
621 
622  std::string clean = cleanupVisualization(g, oss.str());
623 
624  oss.str("");
625  oss.clear();
626  oss << clean;
627  }
628  } else {
629  oss << "subgraph cluster_" << dotId << " {" << std::endl;
630  oss << "label=\"" << getName() << " x" << this->numPipelinesExec << "\";" << std::endl;
631  oss << "style=\"dashed\";" << std::endl;
632  oss << "style =\"filled\";" << std::endl;
633  oss << "fillcolor=lightgrey;" << std::endl;
634  oss << "color=orange;" << std::endl;
635 
636  graph->setOutputConnector(output);
637  oss << graph->genDotGraphContent(flags);
638  oss << "}" << std::endl;
639 
640  std::string clean = cleanupVisualization(graph, oss.str());
641 
642  oss.str("");
643  oss.clear();
644  oss << clean;
645  }
646 
647  return oss.str();
648 
649  }
650 
651  private:
652 
659  std::string cleanupVisualization(TaskGraphConf<T, U> *graph, std::string str)
660  {
661  std::istringstream iss(str);
662 
663 
664  std::ostringstream ossFinal;
665 
666  auto outputConnectorName = graph->getOutputConnector()->getDotId();
667 
668  std::string line;
669  std::vector<std::string> savedLines;
670  while(getline(iss, line))
671  {
672  if (line.find(outputConnectorName) == std::string::npos)
673  {
674  ossFinal << line << std::endl;
675  }
676  else
677  {
678  savedLines.push_back(line);
679  }
680  }
681 
682  for(std::string line2 : savedLines)
683  {
684  ossFinal << line2 << std::endl;
685  }
686 
687  return ossFinal.str();
688  }
689 
693  std::shared_ptr<IRuleList<T, T>> inputRules;
694  std::vector<TaskGraphRuntime *>
696  std::vector<TaskGraphConf<T, U> *> *graphs;
697  bool waitForInit;
698  std::string name;
699 };
700 }
701 
702 #endif //HTGS_EXECUTIONPIPELINE_H
Implements the task graph configuration class responsible for managing ITask connections.
std::string name
The name given to the execution pipeline task.
Definition: ExecutionPipeline.hpp:698
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
std::string getDotId()
Gets the id used for dot nodes.
Definition: AnyITask.hpp:497
void shutdownParallel(int id)
Waits for all runtimes in parallel to update overall runtime of each graph.
Definition: ExecutionPipeline.hpp:293
std::string genDot(int flags, std::string dotId, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output) override
Virtual function that generates the input/output and per-task dot notation.
Definition: ExecutionPipeline.hpp:555
#define DOTGEN_COLOR_WAIT_TIME
Creates color map using wait time.
Definition: TaskGraphDotGenFlags.hpp:68
virtual std::string getDotShape() override
Gets the shape for graphviz dot.
Definition: ITask.hpp:264
void debug()
Provides debugging output for the execution pipeline.
Definition: ExecutionPipeline.hpp:333
size_t numPipelinesExec
The number of pipelines that will spawn from the ExecutionPipeline.
Definition: ExecutionPipeline.hpp:690
void initialize(size_t pipelineId, size_t numPipelines, std::string address) override
Initializes the RuleManager.
Definition: RuleManager.hpp:126
#define DOTGEN_COLOR_COMP_TIME
Creates color map using compute time.
Definition: TaskGraphDotGenFlags.hpp:56
TaskGraphConf< T, U > * graph
The graph that the ExecutionPipeline manages, duplicates, and executes.
Definition: ExecutionPipeline.hpp:692
#define DOTGEN_FLAG_SHOW_TASK_LIVING_STATUS
Shows the number of threads that are alive running the task.
Definition: TaskGraphDotGenFlags.hpp:98
Spawns threads and binds them to the appropriate ITask within a TaskGraph.
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
void executeTask(std::shared_ptr< T > data)
Executes the execution pipeline task on data and forwards that data to the input rules.
Definition: ExecutionPipeline.hpp:303
Spawns threads and binds them to the appropriate ITask within a TaskGraph.
Definition: TaskGraphRuntime.hpp:84
void executeRuntime()
Executes the Runtime.
Definition: TaskGraphRuntime.hpp:188
std::shared_ptr< IRuleList< T, T > > inputRules
The rules associated with the input Bookkeeper for decomposition.
Definition: ExecutionPipeline.hpp:693
Implements the default execution pipeline rule that broadcasts data to all pipelines.
size_t getNumPipelines() const
Sets the task graph communicator.
Definition: AnyITask.hpp:404
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
Provides an interface to send data along RuleManager edges for processing state and dependencies...
Definition: ExecutionPipeline.hpp:34
size_t getPipelineId()
Gets the pipeline ID.
Definition: AnyITask.hpp:367
std::string getName()
Gets the name for the execution pipeline.
Definition: ExecutionPipeline.hpp:313
Implements the Bookkeeper class.
void addInputRule(std::shared_ptr< IRule< T, T >> rule)
Adds an input rule, which can be used for domain decomposition.
Definition: ExecutionPipeline.hpp:188
void initialize()
Initializes the execution pipeline and duplicates the task graph based on the number of pipelines...
Definition: ExecutionPipeline.hpp:199
void setOutputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the output connector that the RuleManager is attached to.
Definition: RuleManager.hpp:157
std::vector< TaskGraphRuntime * > * runtimes
The list of Runtimes that will execute the TaskGraphs (one for each duplicate TaskGraph) ...
Definition: ExecutionPipeline.hpp:695
ExecutionPipeline(size_t numPipelines, TaskGraphConf< T, U > *graph, std::shared_ptr< IRuleList< T, T >> rules, std::string name="Execution Pipeline", bool waitForInit=true)
Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times...
Definition: ExecutionPipeline.hpp:127
std::string outTypeName() override final
Gets the demangled output type name of the connector.
Definition: ITask.hpp:493
size_t numPipelines
The number of pipelines that exist for this task.
Definition: AnyITask.hpp:605
Implements the IData class, which is used for all data types entering/leaving a task graph...
std::string getAddress() override final
Gets the address from the owner task, which is the address of the task graph.
Definition: ITask.hpp:511
std::shared_ptr< AnyConnector > getOutputConnector() override
Virtual function that gets the connector used for graph output.
Definition: TaskGraphConf.hpp:590
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
std::vector< TaskGraphConf< T, U > * > * graphs
The list of duplicate TaskGraphs.
Definition: ExecutionPipeline.hpp:696
Bookkeeper< T > * inputBk
The input Bookkeeper for the ExecutionPipeline.
Definition: ExecutionPipeline.hpp:691
std::string cleanupVisualization(TaskGraphConf< T, U > *graph, std::string str)
Moves the output connector outside of the execution pipeline graphs to cleanup how the graph looks du...
Definition: ExecutionPipeline.hpp:659
TaskManager< T, U > * getOwnerTaskManager()
Gets the owner task manager for this ITask.
Definition: ITask.hpp:527
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
Connects a Bookkeeper to another ITask using one or more IRule(s).
Definition: RuleManager.hpp:57
Manages a group of connected ITasks and their connections.
Definition: ExecutionPipeline.hpp:37
ITask< T, U > * copy()
Makes a copy of the execution pipeline.
Definition: ExecutionPipeline.hpp:323
#define DOTGEN_FLAG_SHOW_IN_OUT_TYPES
Shows input and output types for all tasks.
Definition: TaskGraphDotGenFlags.hpp:32
bool waitForInit
Flag whether to wait for initialization of sub-graphs to complete or not.
Definition: ExecutionPipeline.hpp:697
std::list< std::shared_ptr< IRule< T, U > >> IRuleList
Stores a list of rules with the specified types.
Definition: Types.hpp:36
#define DOTGEN_COLOR_MAX_Q_SZ
Creates color map using maximum queue size.
Definition: TaskGraphDotGenFlags.hpp:62
std::shared_ptr< AnyConnector > getInputConnector() override
Pure virtual function that gets the task manager that is consuming data from the graph&#39;s input...
Definition: TaskGraphConf.hpp:586
void addInputRule(IRule< T, T > *rule)
Adds an input rule, which can be used for domain decomposition.
Definition: ExecutionPipeline.hpp:177
ExecutionPipeline(size_t numPipelines, TaskGraphConf< T, U > *graph, std::string name="Execution Pipeline", bool waitForInit=true)
Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times...
Definition: ExecutionPipeline.hpp:108
void printProfile() override
Prints the profile data to std::out.
Definition: ExecutionPipeline.hpp:350
std::string getColorForTime(double time)
Gets the color for a given time relative to the entire graph&#39;s execution time.
Definition: ProfileUtils.hpp:23
size_t getNumGraphsSpawned() override
Gets the number of graphs spawned by this ITask.
Definition: ExecutionPipeline.hpp:166
virtual void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override
Gathers profile data.
Definition: ExecutionPipeline.hpp:338
The ExecutionPipeline class is used to duplicate task graphs, such that each duplicate executes concu...
Definition: ExecutionPipeline.hpp:96
void shutdown()
Shuts down the execution pipeline.
Definition: ExecutionPipeline.hpp:257
std::string genCustomDot(ProfileUtils *profileUtils, int colorFlag) override
Virtual function to generate customized dot file.
Definition: ExecutionPipeline.hpp:380
~ExecutionPipeline()
Destructor that frees all memory allocated by the execution pipeline.
Definition: ExecutionPipeline.hpp:141
std::string inTypeName() override final
Gets the demangled input type name of the connector.
Definition: ITask.hpp:475
An interface to process input data and forward results within a TaskGraph.
Definition: ProfileUtils.hpp:13
Definition: Bookkeeper.hpp:23
#define DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE
Shows verbose information within each connector in the graph.
Definition: TaskGraphDotGenFlags.hpp:92
Special task used to manage rules.
Definition: Bookkeeper.hpp:60