17 #ifndef HTGS_EXECUTIONPIPELINE_H 18 #define HTGS_EXECUTIONPIPELINE_H 31 class TaskGraphRuntime;
33 template<
class V,
class W>
36 template<
class V,
class W>
95 template<
class T,
class U>
97 static_assert(std::is_base_of<IData, T>::value,
"T must derive from IData");
98 static_assert(std::is_base_of<IData, U>::value,
"U must derive from IData");
112 this->
runtimes =
new std::vector<TaskGraphRuntime *>();
114 this->
graphs =
new std::vector<TaskGraphConf<T, U> *>();
131 this->
runtimes =
new std::vector<TaskGraphRuntime *>();
133 this->
graphs =
new std::vector<TaskGraphConf<T, U> *>();
143 HTGS_DEBUG_VERBOSE(
"Execution pipeline: " <<
this <<
" Freeing memory for runtime: " << runtime);
205 <<
"ERROR: Your execution pipeline does not have any decomposition rules. You must add at least one: addInputRule(IRule)" 212 std::shared_ptr<Connector<U>>
218 *graphCopy = this->
graph->copy(i, this->numPipelinesExec,
nullptr, outputConnector, this->
getAddress());
225 HTGS_DEBUG(
"Setting up input and output of pipeline " << i);
233 this->
inputBk->addRuleManager(ruleManager);
236 graphs->push_back(graphCopy);
248 g->waitForInitialization();
262 std::vector<std::thread *> shutdownThreads;
264 for (
size_t i = 0; i <
runtimes->size(); i++)
267 shutdownThreads.push_back(t);
270 for (std::thread *t : shutdownThreads)
294 (*runtimes)[id]->waitForRuntime();
304 if (data !=
nullptr) {
305 this->
inputBk->executeTask(data);
338 virtual void gatherProfileData(std::map<AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles)
override {
342 g->gatherProfilingData(taskManagerProfiles);
345 graph->gatherProfilingData(taskManagerProfiles);
381 if (profileUtils ==
nullptr)
384 std::ostringstream oss;
389 time = (double) g->getGraphComputeTime();
395 oss <<
"subgraph cluster_" << this->
getDotId() << std::to_string(pipeline) <<
" {" << std::endl;
399 oss << (colorFlag != 0 ?
"penwidth=5\ncolor=\"" + color +
"\"" :
"color=orange");
403 oss << g->genCustomDotForTasks(profileUtils, colorFlag);
405 oss <<
"}" << std::endl;
411 virtual std::string genDotProducerEdgeToTask(std::map<std::shared_ptr<AnyConnector>,
AnyITask *> &inputConnectorDotMap,
int dotFlags)
override 413 std::ostringstream oss;
417 std::string inputRuleNames;
421 inputRuleNames = inputRuleNames + rule->getName();
423 inputRuleNames = inputRuleNames +
", " + rule->getName();
431 auto graphConsumer = g->getGraphConsumerEdge();
434 <<
" -> " << graphConsumer->getTaskManager(g)->getTaskFunction()->getConsumerDotIds() <<
"[label=\"" << inputRuleNames
435 <<
"\"];" << std::endl;
438 auto graphConsumer =
graph->getGraphConsumerEdge();
441 oss <<
inputBk->getDotId() <<
" -> " 442 << graphConsumer->getTaskManager(
graph)->getTaskFunction()->getConsumerDotIds() <<
"[label=\"" << inputRuleNames
443 <<
"\"];"<< std::endl;
452 virtual std::string genDotProducerEdgeFromConnector(std::shared_ptr<AnyConnector> connector,
int flags)
457 virtual std::string genDotConsumerEdgeFromConnector(std::shared_ptr<AnyConnector> connector,
int flags)
override 462 return connector->getDotId() +
" -> " +
inputBk->getDotId() +
";\n";
467 std::string getConsumerDotIds()
override {
469 std::ostringstream oss;
472 std::string inputRuleNames;
476 inputRuleNames = inputRuleNames + rule->getName();
478 inputRuleNames = inputRuleNames +
", " + rule->getName();
495 oss <<
" [label=\"" << inputRuleNames
496 <<
"\"]" << std::endl;
557 std::shared_ptr<AnyConnector> input,
558 std::shared_ptr<AnyConnector> output)
override {
559 std::ostringstream oss;
561 std::string aliveLabel =
563 std::string inOutLabel =
566 oss <<
inputBk->getDotId() +
"[label=\"" +
"Execution Pipeline Bookkeeper";
567 oss << inOutLabel +
"\\n";
570 oss <<
",style=filled";
571 oss <<
",fillcolor=lightgrey";
572 oss <<
",color=orange";
573 oss <<
",width=.2,height=.2];\n";
577 oss << input->genDot(flags);
580 std::string inputRuleNames;
584 inputRuleNames = inputRuleNames + rule->getName();
586 inputRuleNames = inputRuleNames +
", " + rule->getName();
590 if ((flags & DOTGEN_FLAG_SHOW_CONNECTORS) != 0 || (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) != 0) {
592 oss << input->getDotId() <<
" -> " <<
inputBk->getDotId() <<
";" << std::endl;
596 oss <<
inputBk->getDotId() <<
" -> " << g->getInputConnector()->getDotId() <<
"[label=\"" << inputRuleNames
597 <<
"\"];" << std::endl;
600 oss <<
inputBk->getDotId() <<
" -> " <<
graph->getInputConnector()->getDotId() <<
"[label=\"" << inputRuleNames
601 <<
"\"];" << std::endl;
606 if (this->
graphs->size() > 0) {
609 std::string computeTimeStr = g->getGraphComputeTime() == 0 ?
"" :
"Compute time: " + std::to_string((
double)g->getGraphComputeTime() / 1000000.0) +
" s\\n";
610 std::string createTimeStr = g->getGraphCreationTime() == 0 ?
"" :
"Creation time: " + std::to_string((
double)g->getGraphCreationTime() / 1000000.0) +
" s\\n";
612 oss <<
"subgraph cluster_" << dotId << std::to_string(pipeline) <<
" {" << std::endl;
613 oss <<
"label=\"" <<
getName() << std::to_string(pipeline) <<
"\\n" << computeTimeStr << createTimeStr <<
"\";" << std::endl;
614 oss <<
"style=\"dashed\";" << std::endl;
615 oss <<
"style =\"filled\";" << std::endl;
616 oss <<
"fillcolor=lightgrey;" << std::endl;
617 oss <<
"color=orange;" << std::endl;
618 oss << g->genDotGraphContent(flags);
619 oss <<
"}" << std::endl;
629 oss <<
"subgraph cluster_" << dotId <<
" {" << std::endl;
631 oss <<
"style=\"dashed\";" << std::endl;
632 oss <<
"style =\"filled\";" << std::endl;
633 oss <<
"fillcolor=lightgrey;" << std::endl;
634 oss <<
"color=orange;" << std::endl;
636 graph->setOutputConnector(output);
637 oss <<
graph->genDotGraphContent(flags);
638 oss <<
"}" << std::endl;
661 std::istringstream iss(str);
664 std::ostringstream ossFinal;
669 std::vector<std::string> savedLines;
670 while(getline(iss, line))
672 if (line.find(outputConnectorName) == std::string::npos)
674 ossFinal << line << std::endl;
678 savedLines.push_back(line);
682 for(std::string line2 : savedLines)
684 ossFinal << line2 << std::endl;
687 return ossFinal.str();
694 std::vector<TaskGraphRuntime *>
696 std::vector<TaskGraphConf<T, U> *> *
graphs;
702 #endif //HTGS_EXECUTIONPIPELINE_H Implements the task graph configuration class responsible for managing ITask connections.
std::string name
The name given to the execution pipeline task.
Definition: ExecutionPipeline.hpp:698
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
std::string getDotId()
Gets the id used for dot nodes.
Definition: AnyITask.hpp:497
void shutdownParallel(int id)
Waits for all runtimes in parallel to update overall runtime of each graph.
Definition: ExecutionPipeline.hpp:293
std::string genDot(int flags, std::string dotId, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output) override
Virtual function that generates the input/output and per-task dot notation.
Definition: ExecutionPipeline.hpp:555
#define DOTGEN_COLOR_WAIT_TIME
Creates color map using wait time.
Definition: TaskGraphDotGenFlags.hpp:68
virtual std::string getDotShape() override
Gets the shape for graphviz dot.
Definition: ITask.hpp:264
void debug()
Provides debugging output for the execution pipeline.
Definition: ExecutionPipeline.hpp:333
size_t numPipelinesExec
The number of pipelines that will spawn from the ExecutionPipeline.
Definition: ExecutionPipeline.hpp:690
void initialize(size_t pipelineId, size_t numPipelines, std::string address) override
Initializes the RuleManager.
Definition: RuleManager.hpp:126
#define DOTGEN_COLOR_COMP_TIME
Creates color map using compute time.
Definition: TaskGraphDotGenFlags.hpp:56
TaskGraphConf< T, U > * graph
The graph that the ExecutionPipeline manages, duplicates, and executes.
Definition: ExecutionPipeline.hpp:692
#define DOTGEN_FLAG_SHOW_TASK_LIVING_STATUS
Shows the number of threads that are alive running the task.
Definition: TaskGraphDotGenFlags.hpp:98
Spawns threads and binds them to the appropriate ITask within a TaskGraph.
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
void executeTask(std::shared_ptr< T > data)
Executes the execution pipeline task on data and forwards that data to the input rules.
Definition: ExecutionPipeline.hpp:303
Spawns threads and binds them to the appropriate ITask within a TaskGraph.
Definition: TaskGraphRuntime.hpp:84
void executeRuntime()
Executes the Runtime.
Definition: TaskGraphRuntime.hpp:188
std::shared_ptr< IRuleList< T, T > > inputRules
The rules associated with the input Bookkeeper for decomposition.
Definition: ExecutionPipeline.hpp:693
Implements the default execution pipeline rule that broadcasts data to all pipelines.
size_t getNumPipelines() const
Sets the task graph communicator.
Definition: AnyITask.hpp:404
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
Provides an interface to send data along RuleManager edges for processing state and dependencies...
Definition: ExecutionPipeline.hpp:34
size_t getPipelineId()
Gets the pipeline ID.
Definition: AnyITask.hpp:367
std::string getName()
Gets the name for the execution pipeline.
Definition: ExecutionPipeline.hpp:313
Implements the Bookkeeper class.
void addInputRule(std::shared_ptr< IRule< T, T >> rule)
Adds an input rule, which can be used for domain decomposition.
Definition: ExecutionPipeline.hpp:188
void initialize()
Initializes the execution pipeline and duplicates the task graph based on the number of pipelines...
Definition: ExecutionPipeline.hpp:199
void setOutputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the output connector that the RuleManager is attached to.
Definition: RuleManager.hpp:157
std::vector< TaskGraphRuntime * > * runtimes
The list of Runtimes that will execute the TaskGraphs (one for each duplicate TaskGraph) ...
Definition: ExecutionPipeline.hpp:695
ExecutionPipeline(size_t numPipelines, TaskGraphConf< T, U > *graph, std::shared_ptr< IRuleList< T, T >> rules, std::string name="Execution Pipeline", bool waitForInit=true)
Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times...
Definition: ExecutionPipeline.hpp:127
std::string outTypeName() override final
Gets the demangled output type name of the connector.
Definition: ITask.hpp:493
size_t numPipelines
The number of pipelines that exist for this task.
Definition: AnyITask.hpp:605
Implements the IData class, which is used for all data types entering/leaving a task graph...
std::string getAddress() override final
Gets the address from the owner task, which is the address of the task graph.
Definition: ITask.hpp:511
std::shared_ptr< AnyConnector > getOutputConnector() override
Virtual function that gets the connector used for graph output.
Definition: TaskGraphConf.hpp:590
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
std::vector< TaskGraphConf< T, U > * > * graphs
The list of duplicate TaskGraphs.
Definition: ExecutionPipeline.hpp:696
Bookkeeper< T > * inputBk
The input Bookkeeper for the ExecutionPipeline.
Definition: ExecutionPipeline.hpp:691
std::string cleanupVisualization(TaskGraphConf< T, U > *graph, std::string str)
Moves the output connector outside of the execution pipeline graphs to cleanup how the graph looks du...
Definition: ExecutionPipeline.hpp:659
TaskManager< T, U > * getOwnerTaskManager()
Gets the owner task manager for this ITask.
Definition: ITask.hpp:527
An interface to process input data and forward results within a TaskGraph.
Definition: ITask.hpp:165
Connects a Bookkeeper to another ITask using one or more IRule(s).
Definition: RuleManager.hpp:57
Manages a group of connected ITasks and their connections.
Definition: ExecutionPipeline.hpp:37
ITask< T, U > * copy()
Makes a copy of the execution pipeline.
Definition: ExecutionPipeline.hpp:323
#define DOTGEN_FLAG_SHOW_IN_OUT_TYPES
Shows input and output types for all tasks.
Definition: TaskGraphDotGenFlags.hpp:32
bool waitForInit
Flag whether to wait for initialization of sub-graphs to complete or not.
Definition: ExecutionPipeline.hpp:697
std::list< std::shared_ptr< IRule< T, U > >> IRuleList
Stores a list of rules with the specified types.
Definition: Types.hpp:36
#define DOTGEN_COLOR_MAX_Q_SZ
Creates color map using maximum queue size.
Definition: TaskGraphDotGenFlags.hpp:62
std::shared_ptr< AnyConnector > getInputConnector() override
Pure virtual function that gets the task manager that is consuming data from the graph's input...
Definition: TaskGraphConf.hpp:586
void addInputRule(IRule< T, T > *rule)
Adds an input rule, which can be used for domain decomposition.
Definition: ExecutionPipeline.hpp:177
ExecutionPipeline(size_t numPipelines, TaskGraphConf< T, U > *graph, std::string name="Execution Pipeline", bool waitForInit=true)
Creates an execution pipeline, which encapsulates a graph and duplicates it numPipelines times...
Definition: ExecutionPipeline.hpp:108
void printProfile() override
Prints the profile data to std::out.
Definition: ExecutionPipeline.hpp:350
std::string getColorForTime(double time)
Gets the color for a given time relative to the entire graph's execution time.
Definition: ProfileUtils.hpp:23
size_t getNumGraphsSpawned() override
Gets the number of graphs spawned by this ITask.
Definition: ExecutionPipeline.hpp:166
virtual void gatherProfileData(std::map< AnyTaskManager *, TaskManagerProfile *> *taskManagerProfiles) override
Gathers profile data.
Definition: ExecutionPipeline.hpp:338
The ExecutionPipeline class is used to duplicate task graphs, such that each duplicate executes concu...
Definition: ExecutionPipeline.hpp:96
void shutdown()
Shuts down the execution pipeline.
Definition: ExecutionPipeline.hpp:257
std::string genCustomDot(ProfileUtils *profileUtils, int colorFlag) override
Virtual function to generate customized dot file.
Definition: ExecutionPipeline.hpp:380
~ExecutionPipeline()
Destructor that frees all memory allocated by the execution pipeline.
Definition: ExecutionPipeline.hpp:141
std::string inTypeName() override final
Gets the demangled input type name of the connector.
Definition: ITask.hpp:475
An interface to process input data and forward results within a TaskGraph.
Definition: ProfileUtils.hpp:13
Definition: Bookkeeper.hpp:23
#define DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE
Shows verbose information within each connector in the graph.
Definition: TaskGraphDotGenFlags.hpp:92
Special task used to manage rules.
Definition: Bookkeeper.hpp:60