HTGS  v2.0
The Hybrid Task Graph Scheduler
TaskGraphCommunicator.hpp
Go to the documentation of this file.
1 //
6 // * @file TaskGraphCommunicator.hpp
7 // * @author Timothy Blattner
8 // * @date March 3, 2017
9 // *
10 // * @brief Implements the task graph communicator task that communicates from each task to
11 // * all other tasks in a graph.
12 // *
13 // */
14 //#ifndef HTGS_TASKGRAPHCOMMUNICATOR
15 //#define HTGS_TASKGRAPHCOMMUNICATOR
16 //
17 //#include <unordered_map>
18 //#include <htgs/core/graph/AnyConnector.hpp>
19 //#include <htgs/core/queue/BlockingQueue.hpp>
20 //#include <thread>
21 //#include <htgs/core/comm/DataPacket.hpp>
22 //
23 //namespace htgs {
24 //
25 //class TaskGraphCommunicator;
26 //
28 // * @typedef TaskCommMap
29 // * A mapping between the name of a task and its task graph communicator
30 // */
31 //typedef std::unordered_map<std::string, TaskGraphCommunicator *> TaskCommMap;
32 //
34 // * @typedef TaskCommPair
35 // * A pair used for the TaskCommMap
36 // */
37 //typedef std::pair<std::string, TaskGraphCommunicator *> TaskCommPair;
38 //
40 // * @class TaskGraphCommunicator TaskGraphCommunicator.hpp <htgs/core/comm/TaskGraphCommunicator.hpp>
41 // * @brief Implements the task graph communicator where a task's address and name are mapped to
42 // * their input connectors.
43 // *
44 // * This class's run function is bound to a thread once all task graphs (including sub-graphs), have
45 // * been built and spawned. Once all threads are active, then a thread is bound to the task graph
46 // * communicator. All graphs within execution pipelines and the main root graph have a separate
47 // * task graph communicator, but share the task address mapping.
48 // *
49 // * Initially the main graph represents the root of a tree of graphs with branches being defined
50 // * by execution pipelines. Once all threads and execution pipelines have been created, then the
51 // * root task graph communicator gathers all addresses and creates the mapping. Once the mapping
52 // * is complete, then all task graph communicators are bound to separate threads. The mapping is
53 // * read-only at this point.
54 // *
55 // * Each task can submit a data packet into the task graph communicator, which will then send the
56 // * data directly into the input connector for that data packet's destination.
57 // *
58 // * A DataPacket is inserted into the task graph communicator, which provides meta data for looking up the end point
59 // * location for the data packet. The data packet holds IData, which is then inserted into the end point's input connector.
60 // *
61 // * @note The IData type must match the end point input connector's data type.
62 // *
63 // *
64 // */
65 //class TaskGraphCommunicator {
66 // public:
67 //
68 // /**
69 // * Constructs the task graph communicator.
70 // * If the parent specified is nullptr, then this instance is the root communicator within a tree of communicators.
71 // * @param parent the parent communicator or nullptr if this communicator is the root
72 // * @param address the address that this task graph communicator represents
73 // */
74 // TaskGraphCommunicator(TaskGraphCommunicator *parent, std::string address)
75 // : parentComm(parent), address(address) {
76 //
77 // taskNameConnectorMap = new std::unordered_multimap<std::string, std::shared_ptr<AnyConnector>>();
78 // flattenedTaskNameConnectorMap = nullptr;
79 // children = new TaskCommMap();
80 // if (parentComm != nullptr) {
81 // parentComm->addChild(this);
82 // }
83 //
84 // numGraphsSpawned = 0;
85 // numGraphsReceived = 0;
86 //
87 // terminated = false;
88 //
89 // thread = nullptr;
90 // }
91 //
92 // ~TaskGraphCommunicator() {
93 // delete taskNameConnectorMap;
94 // taskNameConnectorMap = nullptr;
95 //
96 // delete children;
97 // children = nullptr;
98 //
99 // delete thread;
100 // thread = nullptr;
101 // }
102 //
103 // /**
104 // * Spawns threads only if the task graph communicator calling this function is the root communicator.
105 // * @note If the parent communicator is nullptr, then that instance is the root.
106 // */
107 // void rootSpawnThreads() {
108 // // Validate this is root
109 // if (this->parentComm == nullptr) {
110 // // Flatten lookup table for parent and children
111 // this->flattenedTaskNameConnectorMap = std::shared_ptr<std::unordered_multimap<std::string,
112 // std::shared_ptr<AnyConnector>>>(new std::unordered_multimap<
113 // std::string,
114 // std::shared_ptr<AnyConnector>>());
115 //
116 // this->processFlattenTaskNameConnectorMap(this->flattenedTaskNameConnectorMap);
117 //
118 // this->spawnChildrenThreads();
119 // }
120 // }
121 //
122 // /**
123 // * Spawns the threads for all children communicator
124 // * @note this function is only called by the parent communicator.
125 // */
126 // void spawnChildrenThreads() {
127 // this->spawnThread();
128 //
129 // if (this->children->size() > 0) {
130 // for (auto commChild : *this->children) {
131 // commChild.second->spawnChildrenThreads();
132 // }
133 // }
134 // }
135 //
136 // /**
137 // * Spawns the thread for this communicator
138 // */
139 // void spawnThread() {
140 // this->thread = new std::thread(&TaskGraphCommunicator::run, this);
141 // }
142 //
143 // /**
144 // * Sets the number of graphs spawned
145 // * @param numGraphsSpawned the number of graphs spawned
146 // */
147 // void setNumGraphsSpawned(size_t numGraphsSpawned) { this->numGraphsSpawned = numGraphsSpawned; }
148 //
149 // /**
150 // * Prints the address of the parent communicator recursively to std::cout.
151 // * @param prefix the prefix for printing
152 // */
153 // void printParents(std::string prefix) {
154 // std::cout << prefix << "Address = " << this->address << std::endl;
155 //
156 // if (this->getParentComm() == nullptr) {
157 // std::cout << std::endl << "=====DONE=====" << std::endl;
158 // return;
159 // }
160 //
161 // this->getParentComm()->printParents(prefix + "\t\t");
162 //
163 // }
164 //
165 // /**
166 // * Prints the task graph communicator tree recursively to std::cout.
167 // * @param prefix the prefix for printing
168 // */
169 // void printTree(std::string prefix) {
171 // if (this->getParentComm() == nullptr) {
172 // std::cout << "PARENT addr: " << this->getAddress() << std::endl;
173 // } else {
174 // std::cout << "Parent address = " << this->getParentComm()->getAddress() << std::endl;
175 // }
176 //
177 // std::cout << prefix << "Num children: " << this->getChildren()->size() << " Num connectors = "
178 // << taskNameConnectorMap->size() << std::endl;
179 // for (auto conn : *taskNameConnectorMap) {
180 // std::cout << prefix << "\t\t" << conn.first << std::endl;
181 // }
182 //
183 // for (auto child : *this->getChildren()) {
184 // std::cout << prefix << " CHILD addr: " << child.first << std::endl;
185 // child.second->printTree(prefix + "\t");
186 // }
187 //
188 // }
189 //
190 // /**
191 // * Gets the parent communicator
192 // * @return the parent communicator
193 // * @note If the parent communicator is nullptr, then that communicator is the root.
194 // */
195 // TaskGraphCommunicator *getParentComm() const {
196 // return parentComm;
197 // }
198 //
199 // /**
200 // * Gets the children for the task graph communicator
201 // * @return the children
202 // */
203 // TaskCommMap *getChildren() const {
204 // return children;
205 // }
206 //
207 // /**
208 // * Checks if the root can spawn threads yet or not.
209 // * This function will recursively be called until it reaches the root communicator. The root
210 // * communicator will then verify if the number of graphs received is equal to the number of graphs spawned.
211 // * If they are equal, then all of the threads will be initiated. Doing so ensures all tasks and sub-graphs have
212 // * completed spawning and the mapping between all tasks in the graph has been completed.
213 // */
214 // void checkRootSpawnThreads() {
215 // if (this->parentComm == nullptr) {
216 // if (numGraphsReceived == numGraphsSpawned) {
217 // this->rootSpawnThreads();
218 // }
219 // } else {
220 // this->parentComm->checkRootSpawnThreads();
221 // }
222 // }
223 //
224 // /**
225 // * Increments the number of graphs received by the root communicator.
226 // *
227 // * This function is called recursively, and only the root communicator is incremented.
228 // */
229 // void incrementRootCommunicatorGraphs() {
230 // // Check if this is the root
231 // if (this->parentComm == nullptr) {
232 // numGraphsReceived++;
233 //
234 // // If all the graphs have produced their updates, then begin the communication threads
235 // if (numGraphsReceived == numGraphsSpawned) {
236 // this->rootSpawnThreads();
237 // // TODO: Send graph construction complete
238 // }
239 //
240 // } else {
241 // this->parentComm->incrementRootCommunicatorGraphs();
242 // }
243 // }
244 //
245 // /**
246 // * Gets the number of graphs received by the root communicator.
247 // * This function is called recursively until the root is reached.
248 // *
249 // * @return the number of graphs received by the root communicator
250 // */
251 // size_t getRootNumGraphsReceived() {
252 // if (this->parentComm == nullptr) {
253 // return numGraphsReceived;
254 // } else {
255 // return this->parentComm->getRootNumGraphsReceived();
256 // }
257 // }
258 //
259 // /**
260 // * Gets the number of graphs spawned by the root communicator.
261 // * This function is called recursively until the root is reached
262 // *
263 // * @return the number of graphs spawned by the root communicator
264 // */
265 // size_t getRootTotalSubGraphsSpawned() {
266 // if (this->parentComm == nullptr) {
267 // return numGraphsSpawned;
268 // } else {
269 // return this->parentComm->getRootTotalSubGraphsSpawned();
270 // }
271 // }
272 //
273 // /**
274 // * Flattens the mapping between the addresses and task manager names and their connnectors.
275 // * Doing so allows for constant time look-up for the address to task name connectors
276 // * @param flattenedTaskNameConnectorMap the mapping between the address and manager names to their connects, which is shared among all communicators.
277 // * @note this function is called prior to spawning threads for the task graph communicators
278 // */
279 // void processFlattenTaskNameConnectorMap(std::shared_ptr<std::unordered_multimap<std::string,
280 // std::shared_ptr<AnyConnector>>> flattenedTaskNameConnectorMap) {
281 // this->flattenedTaskNameConnectorMap = flattenedTaskNameConnectorMap;
282 //
283 // for (auto nameConnectorPair : *this->taskNameConnectorMap) {
284 // this->flattenedTaskNameConnectorMap->insert(nameConnectorPair);
285 // }
286 //
287 // // Send to children
288 // if (this->children->size() > 0) {
289 // for (auto child : *this->children) {
290 // child.second->processFlattenTaskNameConnectorMap(this->flattenedTaskNameConnectorMap);
291 // }
292 // }
293 //
294 // }
295 //
296 // /**
297 // * Adds a single task name connector pair to the communicator.
298 // * @param pair the task name and connector pair
299 // */
300 // void addTaskNameConnectorPair(std::pair<std::string, std::shared_ptr<AnyConnector>> pair)
301 // {
302 // taskNameConnectorMap->insert(pair);
303 // }
304 //
305 // /**
306 // * Adds the mapping between a task's address and its name to the input connector for that task.
307 // *
308 // * This will add all of these mappings to this task graph communicator. After which the parent
309 // * communicator increments the number of communicator graphs. If this is called by the parent
310 // * communicator, then it will check if it is ready to spawn threads
311 // *
312 // * @param o
313 // */
314 // void addTaskNameConnectorMap(std::unordered_multimap<std::string, std::shared_ptr<AnyConnector>> *o) {
315 // for (auto nameConnectorPair : *o) {
316 // taskNameConnectorMap->insert(nameConnectorPair);
317 // }
318 //
319 // // Ignore the root, as we only care about sub graphs.
320 // if (this->parentComm != nullptr) {
321 // incrementRootCommunicatorGraphs();
322 // } else {
323 // checkRootSpawnThreads();
324 // }
325 //
328 //
329 // }
330 //
331 // /**
332 // * Adds a child communicator for this task graph communicator
333 // * @param comm the child communicator
334 // */
335 // void addChild(TaskGraphCommunicator *comm) {
337 // this->mutex.lock();
338 // children->insert(TaskCommPair(comm->getAddress(), comm));
339 // this->mutex.unlock();
340 // }
341 //
342 // /**
343 // * Gets the address of the task graph communicator.
344 // *
345 // * This matches the address of the task graph that owns the communicator.
346 // * @return the address
347 // */
348 // std::string getAddress() const {
349 // return address;
350 // }
351 //
352 // /**
353 // * Gracefully terminates the task graph communicator thread.
354 // */
355 // void terminateGracefully() {
356 // if (this->thread != nullptr) {
357 // this->dataQueue.Enqueue(nullptr);
358 // this->thread->join();
359 // }
360 // }
361 //
362 // /**
363 // * Main run function for the thread, which processes data packets until it is terminated.
364 // */
365 // void run() {
366 // while (!terminated) {
367 // this->processDataPacket();
368 // }
369 // }
370 //
371 // /**
372 // * Produces data packet to be processed for the task graph communicator
373 // * @param data the data
374 // * @note this function is thread safe.
375 // */
376 // void produceDataPacket(std::shared_ptr<DataPacket> data) {
377 // this->dataQueue.Enqueue(data);
378 // }
379 //
380 // /**
381 // * Processes one data packet.
382 // * If the data packet is nullptr, then the thread will be terminated.
383 // *
384 // * If there are multiple entries that share the same address and task name, then an error is produced.
385 // * Every task must have a unique name if the communicator is to be used.
386 // */
387 // void processDataPacket() {
388 // auto packet = dataQueue.Dequeue();
389 //
391 // if (packet == nullptr) {
392 // terminated = true;
393 // return;
394 // }
395 //
396 // std::string endPoint = packet->getDestAddr() + ":" + packet->getDestName();
397 //
398 // // Get connector
399 // size_t numItems = flattenedTaskNameConnectorMap->count(endPoint);
400 //
401 // if (numItems == 1) {
402 // auto connIterator = flattenedTaskNameConnectorMap->find(endPoint);
403 //
404 // // Gets the connector for the end point
405 // auto endPointConnector = connIterator->second;
406 //
407 // // Add data
408 // endPointConnector->produceAnyData(packet->getData());
409 //
410 // } else {
411 // if (numItems == 0)
412 // std::cerr << "Graph is unable to find destination task name: '" << endPoint
413 // << "'. Make sure the task's name exists within the graph. Origin: " << packet->getOriginAddr() << ":"
414 // << packet->getOriginName() << std::endl;
415 // else
416 // std::cerr << "Graph has tasks with duplicate name: '" << endPoint
417 // << "' to send data between tasks, each task should have a unique name! Origin: "
418 // << packet->getOriginAddr() << ":" << packet->getOriginName() << std::endl;
419 //
420 // }
421 // }
422 //
423 // /**
424 // * Gets whether the task communicator is terminated or not
425 // * @return true if the communicator is terminated, otherwise false
426 // * @retval TRUE if the communicator is terminated
427 // * @retval FALSE if the communicator is not terminated
428 // */
429 // bool isTerminated() { return this->terminated; }
430 //
431 // private:
432 // std::unordered_multimap<std::string, std::shared_ptr<AnyConnector>>
433 // *taskNameConnectorMap; //!< The local mapping between the task graph communicator and its task graph.
434 //
435 // std::shared_ptr<std::unordered_multimap<std::string, std::shared_ptr<AnyConnector>>>
436 // flattenedTaskNameConnectorMap; //!< The flattened mapping shared between all task graph communicators.
437 //
438 // TaskGraphCommunicator *parentComm; //!< The parent communicator (or nullptr if this is the root communicator).
439 // std::string address; //!< The address of the communicator.
440 //
441 // size_t numGraphsSpawned; //!< The number of graphs spawned.
442 // std::atomic_size_t numGraphsReceived; //!< The number of graphs received.
443 //
444 // TaskCommMap *children; //!< The children communicator.
445 //
446 // std::mutex mutex; //!< A mutex used to ensure thread safety.
447 // BlockingQueue<std::shared_ptr<DataPacket>> dataQueue; //!< The data queue to hold data packets.
448 //
449 // volatile bool terminated; //!< Flag used to indicate if the communicator is terminated or not.
450 // std::thread *thread; //!< The communicator thread.
451 //
452 //};
453 //
454 //}
455 //#endif //HTGS_TASKGRAPHCOMMUNICATOR