HTGS  v2.0
The Hybrid Task Graph Scheduler
TaskGraphRuntime.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
14 #ifndef HTGS_TASKGRAPHRUNTIME_HPP
15 #define HTGS_TASKGRAPHRUNTIME_HPP
16 
17 #include <thread>
20 
21 namespace htgs {
85 
86  public:
87 
93  this->graph = graph;
94  this->executed = false;
95 #if defined (USE_NVTX) && defined (USE_MINIMAL_NVTX)
96  domainInitialize = nvtxDomainCreateA("Initialize");
97  domainExecute = nvtxDomainCreateA("Execute");
98  domainWait = nvtxDomainCreateA("Wait");
99  domainWaitForMem = nvtxDomainCreateA("Wait for Memory");
100  domainReleaseMem = nvtxDomainCreateA("Release Memory");
101  domainShutdown = nvtxDomainCreateA("Shutdown");
102 #elif defined (USE_NVTX)
103  domainInitialize = nullptr;
104  domainExecute = nullptr;
105  domainWait = nullptr;
106  domainWaitForMem = nullptr;
107  domainReleaseMem = nullptr;
108  domainShutdown = nullptr;
109 #endif
110  }
111 
116  for (TaskManagerThread *t : runtimeThreads) {
117  if (t) {
118  HTGS_DEBUG_VERBOSE("TaskGraphRuntime: " << this << " Freeing memory for TaskManagerThread: " << t);
119 
120  delete t;
121  t = nullptr;
122  }
123  }
124 
125  for (std::thread *t : threads) {
126  if (t) {
127 
128 
129  HTGS_ASSERT(!t->joinable(), "Trying to delete thread that has not been joined. You must call 'waitForRuntime' prior to deleting the TaskGraphRuntime. (see https://pages.nist.gov/HTGS/doxygen/classhtgs_1_1_task_graph_runtime.html#a8f2eaf040695178b6f61db7b0ee16c89)");
130 
131 
132  delete t;
133  t = nullptr;
134  }
135  }
136 
137  if (graph) {
138  HTGS_DEBUG_VERBOSE("TaskGraphRuntime: " << this << " Freeing memory for graph: " << graph);
139 
140  delete graph;
141  graph = nullptr;
142  }
143 #if defined (USE_NVTX) && (USE_MINIMAL_NVTX)
144  nvtxDomainDestroy(domainInitialize);
145  nvtxDomainDestroy(domainExecute);
146  nvtxDomainDestroy(domainWait);
147  nvtxDomainDestroy(domainWaitForMem);
148  nvtxDomainDestroy(domainReleaseMem);
149  nvtxDomainDestroy(domainShutdown);
150 #endif
151  }
152 
157  void waitForRuntime() {
158  for (std::thread *t : threads) {
159  if (t->joinable())
160  t->join();
161  }
162 
163  this->graph->shutdown();
164  }
165 
170  executeRuntime();
171  waitForRuntime();
172  }
173 
179  void terminateAll() {
180  for (TaskManagerThread *t : runtimeThreads) {
181  t->terminate();
182  }
183  }
184 
188  void executeRuntime() {
189  if (executed)
190  return;
191 
192  // Initialize graph and setup task graph taskGraphCommunicator
193  this->graph->initialize();
194 
195  std::list<AnyTaskManager *> *vertices = this->graph->getTaskManagers();
196  std::list<AnyTaskManager *> newVertices;
197  HTGS_DEBUG_VERBOSE("Launching runtime for " << vertices->size() << " vertices");
198 
199 
200  for (AnyTaskManager *task : *vertices) {
201 
202  size_t numThreads = task->getNumThreads();
203 
204  HTGS_DEBUG_VERBOSE("Spawning " << numThreads << " threads for task " << task->getName());
205 
206  if (numThreads > 0) {
207  std::list<AnyTaskManager *> taskList;
208  std::shared_ptr<std::atomic_size_t>
209  atomicNumThreads = std::shared_ptr<std::atomic_size_t>(new std::atomic_size_t(numThreads));
210  taskList.push_back(task);
211 
212  for (size_t i = 1; i < numThreads; i++) {
213  AnyTaskManager *taskCopy = task->copy(true);
214 
215 #ifdef WS_PROFILE
216  // Generate . . . and send data . . .
217  std::shared_ptr<ProfileData> producerData(new CreateNodeProfile(taskCopy->getTaskFunction(), graph, taskCopy->getName()));
218  graph->sendProfileData(producerData);
219 
220  std::shared_ptr<ProfileData> connectorConsumerData(new CreateEdgeProfile(taskCopy->getInputConnector().get(), taskCopy->getTaskFunction(), "", nullptr));
221  std::shared_ptr<ProfileData> producerConnectorData(new CreateEdgeProfile(taskCopy->getTaskFunction(), taskCopy->getOutputConnector().get(), "", nullptr));
222 
223  graph->sendProfileData(connectorConsumerData);
224  graph->sendProfileData(producerConnectorData);
225 #endif
226 
227  // Add communicator to task copy to enable communication
228  // TODO: Delete or Add #ifdef
229 // taskCopy->setTaskGraphCommunicator(graph->getTaskGraphCommunicator());
230  taskList.push_back(taskCopy);
231  newVertices.push_back(taskCopy);
232  }
233  size_t threadId = 0;
234 
235 #if defined (USE_NVTX) && defined (USE_MINIMAL_NVTX)
236  nvtxDomainHandle_t taskDomain = nullptr;
237 #elif defined (USE_NVTX)
238  nvtxDomainHandle_t taskDomain = nvtxDomainCreateA(task->getName().c_str());
239 #endif
240 
241  for (AnyTaskManager *taskItem : taskList) {
242 #if defined (USE_NVTX) && defined (USE_MINIMAL_NVTX)
243  NVTXProfiler *profiler = new NVTXProfiler(std::to_string(threadId) + ":" + taskItem->getName(), taskDomain, domainInitialize, domainExecute, domainWait, domainWaitForMem, domainReleaseMem, domainShutdown);
244  taskItem->setProfiler(profiler);
245 #elif defined (USE_NVTX)
246  NVTXProfiler *profiler = new NVTXProfiler(std::to_string(threadId), taskDomain, domainInitialize, domainExecute, domainWait, domainWaitForMem, domainReleaseMem, domainShutdown);
247  taskItem->setProfiler(profiler);
248 #endif
249 
250 
251 
252  TaskManagerThread *runtimeThread = new TaskManagerThread(threadId, taskItem, atomicNumThreads, graph->getInitializationCondition(), graph->getInitializationMutex());
253  std::thread *thread = new std::thread(&TaskManagerThread::run, runtimeThread);
254  this->threads.push_back(thread);
255  runtimeThreads.push_back(runtimeThread);
256  threadId++;
257  }
258 
259  } else {
260  std::cerr << task->getName() << " has no threads specified." << std::endl;
261  }
262  }
263 
264 
265 #ifdef WS_PROFILE
266  std::shared_ptr<ProfileData> graphCreationComplete(new GraphCompleteProfile(graph));
267  graph->sendProfileData(graphCreationComplete);
268 #endif
269 
270  for (AnyTaskManager *newVertex : newVertices) {
271  graph->addTaskManager(newVertex);
272  }
273 
274 
275  this->executed = true;
276 
277  graph->finishedSetup();
278  }
279 
280  private:
281  std::list<std::thread *> threads;
283  std::list<TaskManagerThread *> runtimeThreads;
284  bool executed;
285 
286 #ifdef USE_NVTX
287  nvtxDomainHandle_t domainInitialize;
288  nvtxDomainHandle_t domainExecute;
289  nvtxDomainHandle_t domainWait;
290  nvtxDomainHandle_t domainWaitForMem;
291  nvtxDomainHandle_t domainReleaseMem;
292  nvtxDomainHandle_t domainShutdown;
293 #endif
294 
295 };
296 }
297 
298 #endif //HTGS_TASKGRAPHRUNTIME_HPP
bool executed
Whether the Runtime has been executed.
Definition: TaskGraphRuntime.hpp:284
Implements the parent class for a Task to remove the template arguments and the TaskManagerThread to ...
std::list< std::thread * > threads
A list of all threads spawned for the Runtime.
Definition: TaskGraphRuntime.hpp:281
virtual AnyTaskManager * copy(bool deep)=0
Copies the TaskManager.
Implements the task graph configuration class responsible for managing ITask connections.
void addTaskManager(AnyTaskManager *taskManager)
Adds a task manager to the task graph.
Definition: AnyTaskGraphConf.hpp:396
void terminateAll()
Terminates the Runtime.
Definition: TaskGraphRuntime.hpp:179
void executeAndWaitForRuntime()
Executes the Runtime and then waits for it to finish processing.
Definition: TaskGraphRuntime.hpp:169
void shutdown()
Called when all the threads in this graph have finished executing.
Definition: AnyTaskGraphConf.hpp:262
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
int run()
Executes the task until the underlying Task has been terminated.
Definition: AnyTaskManager.hpp:607
Spawns threads and binds them to the appropriate ITask within a TaskGraph.
Definition: TaskGraphRuntime.hpp:84
void executeRuntime()
Executes the Runtime.
Definition: TaskGraphRuntime.hpp:188
virtual AnyITask * getTaskFunction()=0
Gets the ITask function associated with the TaskManager.
std::string getName()
Gets the name of the ITask.
Definition: AnyTaskManager.hpp:390
std::list< TaskManagerThread * > runtimeThreads
The list of TaskManagers bound to each thread.
Definition: TaskGraphRuntime.hpp:283
void initialize()
Initializes the task graph just prior to spawning threads.
Definition: AnyTaskGraphConf.hpp:238
std::list< AnyTaskManager * > * getTaskManagers()
Virtual function that initiates updating the task graph communicator.
Definition: AnyTaskGraphConf.hpp:180
virtual std::shared_ptr< AnyConnector > getInputConnector()=0
Gets the input Connector.
TaskGraphRuntime(AnyTaskGraphConf *graph)
Constructs a Runtime for a TaskGraph.
Definition: TaskGraphRuntime.hpp:92
virtual void finishedSetup()
Called when the task graph has finished setting up its tasks and launched all threads for the graph...
Definition: AnyTaskGraphConf.hpp:257
The parent class for a Task that removes the template arguments.
Definition: AnyTaskManager.hpp:45
Manages a TaskManager that is bound to a thread for execution.
Definition: AnyTaskManager.hpp:573
std::mutex * getInitializationMutex()
Gets the initialization mutex, used for signaling when initialization is done.
Definition: AnyTaskGraphConf.hpp:305
AnyTaskGraphConf * graph
The TaskGraph associated with the Runtime.
Definition: TaskGraphRuntime.hpp:282
~TaskGraphRuntime()
Destructor.
Definition: TaskGraphRuntime.hpp:115
Implements the base class for the TaskGraphConf class, removing the template arguments and providing ...
Definition: AnyTaskGraphConf.hpp:66
#define HTGS_ASSERT(condition, message)
Prints a more meaningful assertion message and terminates if the condition fails. ...
Definition: debug_message.hpp:25
virtual std::shared_ptr< AnyConnector > getOutputConnector()=0
Gets the output Connector.
void waitForRuntime()
Waits for the Runtime to finish executing.
Definition: TaskGraphRuntime.hpp:157
Definition: Bookkeeper.hpp:23
std::condition_variable * getInitializationCondition()
Notifies the task graph to check if all task managers have been initialized or not.
Definition: AnyTaskGraphConf.hpp:297