Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_graph.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 
19 
20 #ifndef HEDGEHOG_CORE_GRAPH_H
21 #define HEDGEHOG_CORE_GRAPH_H
22 
23 #include <ostream>
24 #include <vector>
25 #include <filesystem>
26 
27 #include "../node/core_node.h"
28 #include "../io/base/sender/core_notifier.h"
29 #include "../io/base/receiver/core_multi_receivers.h"
30 #include "../io/graph/receiver/core_graph_multi_receivers.h"
31 #include "../io/graph/receiver/core_graph_sink.h"
32 #include "../io/graph/sender/core_graph_source.h"
33 #include "../../tools/traits.h"
34 #include "../../tools/helper.h"
35 #include "../../api/scheduler/default_scheduler.h"
36 #include "../../api/scheduler/abstract_scheduler.h"
37 #include "../../api/printer/dot_printer.h"
38 
40 namespace hh {
41 
42 #ifndef DOXYGEN_SHOULD_SKIP_THIS
43 template<class GraphOutput, class ...GraphInputs>
47 class Graph;
48 #endif // DOXYGEN_SHOULD_SKIP_THIS
49 
51 namespace core {
52 
62 template<class GraphOutput, class ...GraphInputs>
63 class CoreGraph : public CoreSender<GraphOutput>, public CoreGraphMultiReceivers<GraphInputs...> {
64  private:
65  Graph<GraphOutput, GraphInputs...> *graph_ = nullptr;
66  std::unique_ptr<std::set<CoreNode *>> inputsCoreNodes_ = nullptr;
67  std::unique_ptr<std::set<CoreSender<GraphOutput> *>> outputCoreNodes_ = nullptr;
68  std::unique_ptr<AbstractScheduler> scheduler_ = nullptr;
69  std::shared_ptr<CoreGraphSource<GraphInputs...>> source_ = nullptr;
70  std::shared_ptr<CoreGraphSink<GraphOutput>> sink_ = nullptr;
71  int graphId_ = 0;
72  int deviceId_ = 0;
73 
74  public:
80  CoreGraph(Graph<GraphOutput, GraphInputs...> *graph, NodeType const type, std::string_view const &name,
81  std::unique_ptr<AbstractScheduler> scheduler = std::make_unique<DefaultScheduler>()) :
82  CoreNode(name, type, 1),
83  CoreNotifier(name, type, 1),
84  CoreSlot(name, type, 1),
85  CoreReceiver<GraphInputs>(name, type, 1)...,
86  CoreSender<GraphOutput>(name, type,
87  1),
88  CoreGraphMultiReceivers<GraphInputs...>(name, type,
89  1){
90  HLOG_SELF(0, "Creating CoreGraph with coreGraph: " << graph << " type: " << (int) type << " and name: " << name)
91  if (!scheduler) {
92  std::ostringstream oss;
93  oss << "Internal error, the graph's scheduler is null, please instantiate an AbstractScheduler.";
94  HLOG_SELF(0, oss.str())
95  throw (std::runtime_error(oss.str()));
96  }
97  this->graph_ = graph;
98  this->inputsCoreNodes_ = std::make_unique<std::set<CoreNode *>>();
99  this->outputCoreNodes_ = std::make_unique<std::set<CoreSender < GraphOutput> *>>
100  ();
101  this->source_ = std::make_shared<CoreGraphSource<GraphInputs...>>();
102  this->sink_ = std::make_shared<CoreGraphSink<GraphOutput>>();
103  this->scheduler_ = std::move(scheduler);
104  this->source_->belongingNode(this);
105  this->sink_->belongingNode(this);
106  }
107 
111  CoreNode(rhs.name(), rhs.type(), 1),
112  CoreNotifier(rhs.name(), rhs.type(), 1),
113  CoreSlot(rhs.name(), rhs.type(), 1),
114  CoreReceiver<GraphInputs>(rhs.name(), rhs.type(), 1)...,
115  CoreSender<GraphOutput>(rhs.name(), rhs.type(),1),
116  CoreGraphMultiReceivers<GraphInputs...>(rhs.name(), rhs.type(),1){
117  this->inputsCoreNodes_ = std::make_unique<std::set<CoreNode *>>();
118  this->outputCoreNodes_ = std::make_unique<std::set<CoreSender < GraphOutput> *>>
119  ();
120  this->scheduler_ = rhs.scheduler_->create();
121  this->source_ = std::make_shared<CoreGraphSource<GraphInputs...>>();
122  this->sink_ = std::make_shared<CoreGraphSink<GraphOutput>>();
123 
124  this->source_->belongingNode(this);
125  this->sink_->belongingNode(this);
126 
128 
129  this->hasBeenRegistered(true);
130  this->belongingNode(rhs.belongingNode());
131 
132  this->graph_ = rhs.graph_;
133  }
134 
137  std::shared_ptr<CoreNode> clone() override { return std::make_shared<CoreGraph<GraphOutput, GraphInputs...>>(*this); }
138 
140  ~CoreGraph() override {HLOG_SELF(0, "Destructing CoreGraph")}
141 
144  behavior::Node *node() override { return graph_; }
145 
148  int deviceId() override { return this->deviceId_; }
149 
152  int graphId() override { return this->graphId_; }
153 
156  [[nodiscard]] std::unique_ptr<std::set<CoreNode *>> const &inputsCoreNodes() const { return inputsCoreNodes_; }
157 
160  [[nodiscard]] std::unique_ptr<std::set<CoreSender < GraphOutput> *>> const &
161  outputCoreNodes() const {
162  return outputCoreNodes_;
163  }
164 
167  std::shared_ptr<CoreGraphSource<GraphInputs...>> const &source() const { return source_; }
168 
171  void graphId(size_t graphId) { graphId_ = graphId; }
172 
175  void deviceId(int deviceId) override { this->deviceId_ = deviceId; }
176 
179  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> maxExecutionTime() const override {
180  std::chrono::duration<uint64_t, std::micro>
181  ret = std::chrono::duration<uint64_t, std::micro>::min(),
182  temp{};
183  std::shared_ptr<CoreNode> core;
184  for (auto const &it : *(this->insideNodes())) {
185  core = it.second;
186  switch (core->type()) {
187  case NodeType::Task:
188  case NodeType::Graph:
189  case NodeType::StateManager:
190  case NodeType::ExecutionPipeline:
191  temp = core->maxExecutionTime();
192  if (temp > ret) ret = temp;
193  break;
194  default:break;
195  }
196  }
197  return ret;
198  }
199 
202  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> minExecutionTime() const override {
203  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max(), temp{};
204  std::shared_ptr<CoreNode> core;
205  for (auto const &it : *(this->insideNodes())) {
206  core = it.second;
207  switch (core->type()) {
208  case NodeType::Task:
209  case NodeType::Graph:
210  case NodeType::StateManager:
211  case NodeType::ExecutionPipeline:
212  temp = core->minExecutionTime();
213  if (temp < ret) ret = temp;
214  break;
215  default:break;
216  }
217  }
218  return ret;
219  }
220 
223  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> maxWaitTime() const override {
224  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min(), temp{};
225  std::shared_ptr<CoreNode> core;
226  for (auto const &it : *(this->insideNodes())) {
227  core = it.second;
228  switch (core->type()) {
229  case NodeType::Task:
230  case NodeType::Graph:
231  case NodeType::StateManager:
232  case NodeType::ExecutionPipeline:
233  temp = core->maxWaitTime();
234  if (temp > ret) ret = temp;
235  break;
236  default:break;
237  }
238  }
239  return ret;
240  }
241 
244  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> minWaitTime() const override {
245  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max(), temp{};
246  std::shared_ptr<CoreNode> core;
247  for (auto const &it : *(this->insideNodes())) {
248  core = it.second;
249  switch (core->type()) {
250  case NodeType::Task:
251  case NodeType::Graph:
252  case NodeType::StateManager:
253  case NodeType::ExecutionPipeline:
254  temp = core->minWaitTime();
255  if (temp < ret) ret = temp;
256  break;
257  default:break;
258  }
259  }
260  return ret;
261  }
262 
272  template<
273  class UserDefinedSender, class UserDefinedMultiReceiver,
274  class Output = typename UserDefinedSender::output_t,
275  class Inputs = typename UserDefinedMultiReceiver::inputs_t,
276  class IsSender = typename std::enable_if_t<
277  std::is_base_of_v<
278  behavior::Sender<Output>, UserDefinedSender
279  >
280  >,
281  class IsMultiReceiver = typename std::enable_if_t<
282  std::is_base_of_v<
283  typename helper::HelperMultiReceiversType<Inputs>::type, UserDefinedMultiReceiver
284  >
285  >
286  >
287  void addEdge(std::shared_ptr<UserDefinedSender> from, std::shared_ptr<UserDefinedMultiReceiver> to) {
288  assert(from != nullptr && to != nullptr);
289  static_assert(traits::Contains_v<Output, Inputs>, "The given Receiver cannot be linked to this Sender");
290  if (this->isInside()) {
291  std::ostringstream oss;
292  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
293  HLOG_SELF(0, oss.str())
294  throw (std::runtime_error(oss.str()));
295  }
296 
297  //Get the associated cores
298  auto coreSender = dynamic_cast<CoreSender <Output> *>(std::static_pointer_cast<behavior::Node>(from)->core().get());
299  auto coreNotifier = dynamic_cast<CoreNotifier *>(coreSender);
300  auto coreSlot = dynamic_cast<CoreSlot *>(std::static_pointer_cast<behavior::Node>(to)->core().get());
301  auto
302  coreReceiver = dynamic_cast<CoreReceiver<Output> *>(std::static_pointer_cast<behavior::Node>(to)->core().get());
303 
304  if (from->core().get() == this || to->core().get() == this) {
305  std::ostringstream oss;
306  oss << "You can not connect a graph to itself: " << __FUNCTION__;
307  HLOG_SELF(0, oss.str())
308  throw (std::runtime_error(oss.str()));
309  }
310 
311  if (coreSender->hasBeenRegistered()) {
312  if (coreSender->belongingNode() != this) {
313  std::ostringstream oss;
314  oss << "The Sender node should belong to the graph: " << __FUNCTION__;
315  HLOG_SELF(0, oss.str())
316  throw (std::runtime_error(oss.str()));
317  }
318  }
319 
320  if (coreReceiver->hasBeenRegistered()) {
321  if (coreReceiver->belongingNode() != this) {
322  std::ostringstream oss;
323  oss << "The Receiver node should belong to the graph: " << __FUNCTION__;
324  HLOG_SELF(0, oss.str())
325  throw (std::runtime_error(oss.str()));
326  }
327  }
328 
329  HLOG_SELF(0,
330  "Add edge from " << coreSender->name() << "(" << coreSender->id() << ") to " << coreReceiver->name()
331  << "(" << coreReceiver->id()
332  << ")")
333 
334  for (auto r : coreReceiver->receivers()) { coreSender->addReceiver(r); }
335  for (CoreSlot *slot : coreSlot->getSlots()) { coreNotifier->addSlot(slot); }
336  for (auto s : coreSender->getSenders()) {
337  coreReceiver->addSender(s);
338  coreSlot->addNotifier(s);
339  }
340 
341  this->registerNode(std::dynamic_pointer_cast<CoreNode>(from->core()));
342  this->registerNode(std::dynamic_pointer_cast<CoreNode>(to->core()));
343  }
344 
352  template<
353  class UserDefinedMultiReceiver,
354  class InputsMR = typename UserDefinedMultiReceiver::inputs_t,
355  class InputsG = typename behavior::MultiReceivers<GraphInputs...>::inputs_t,
356  class isMultiReceiver = typename std::enable_if_t<
357  std::is_base_of_v<typename helper::HelperMultiReceiversType<InputsMR>::type, UserDefinedMultiReceiver>
358  >,
359  class isInputCompatible = typename std::enable_if_t<traits::is_included_v<InputsMR, InputsG>>>
360  void input(std::shared_ptr<UserDefinedMultiReceiver> inputNode) {
361  if (this->isInside()) {
362  std::ostringstream oss;
363  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
364  HLOG_SELF(0, oss.str())
365  throw (std::runtime_error(oss.str()));
366  }
367 
368  if (auto inputCoreNode =
369  dynamic_cast<typename helper::HelperCoreMultiReceiversType<InputsMR>::type *>(inputNode->core().get())) {
370  HLOG_SELF(0, "Set " << inputCoreNode->name() << "(" << inputCoreNode->id() << ") as input")
371 
372  if (inputCoreNode->hasBeenRegistered()) {
373  if (inputCoreNode->belongingNode() != this) {
374  std::ostringstream oss;
375  oss << "The node " << inputCoreNode->name() << " belong already to another coreGraph: "
376  << __FUNCTION__;
377  HLOG_SELF(0, oss.str())
378  throw (std::runtime_error(oss.str()));
379  }
380  }
381 
382  //Add it as input of the coreGraph
383  this->inputsCoreNodes_->insert(inputCoreNode);
384  this->addReceiversToSource(inputCoreNode);
385  } else {
386  std::ostringstream oss;
387  oss << "The node " << inputCoreNode->name() << " is not a multi receiver: " << __FUNCTION__;
388  HLOG_SELF(0, oss.str())
389  throw (std::runtime_error(oss.str()));
390  }
391  this->registerNode(std::static_pointer_cast<CoreNode>(inputNode->core()));
392  }
393 
398  void output(std::shared_ptr<behavior::Sender<GraphOutput>> outputNode) {
399  if (this->isInside()) {
400  std::ostringstream oss;
401  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
402  HLOG_SELF(0, oss.str())
403  throw (std::runtime_error(oss.str()));
404  }
405 
406  if (auto outputCoreNode = dynamic_cast<CoreSender <GraphOutput> *>(outputNode->core().get())) {
407  HLOG_SELF(0, "Set " << outputCoreNode->name() << "(" << outputCoreNode->id() << ") as outputNode")
408  if (outputCoreNode->hasBeenRegistered()) {
409  if (outputCoreNode->belongingNode() != this) {
410  std::ostringstream oss;
411  oss
412  << "The node " << outputCoreNode->name() << " belong already to another coreGraph: "
413  << __FUNCTION__;
414  HLOG_SELF(0, oss.str())
415  throw (std::runtime_error(oss.str()));
416  }
417  }
418  this->outputCoreNodes_->insert(outputCoreNode);
419  for (CoreSender <GraphOutput> *sender : outputCoreNode->getSenders()) {
420  this->sink_->addNotifier(sender);
421  this->sink_->addSender(sender);
422  }
423  outputCoreNode->addSlot(this->sink_.get());
424  outputCoreNode->addReceiver(this->sink_.get());
425  } else {
426  std::ostringstream oss;
427  oss << "Internal error, the output node is not a valid CoreSender: " << __FUNCTION__;
428  HLOG_SELF(0, oss.str())
429  throw (std::runtime_error(oss.str()));
430  }
431 
432  this->registerNode(std::static_pointer_cast<CoreNode>(outputNode->core()));
433  }
434 
438  template<
439  class Input,
440  class = typename std::enable_if_t<traits::Contains<Input, GraphInputs...>::value>
441  >
442  void broadcastAndNotifyToAllInputs(std::shared_ptr<Input> &data) {
443  HLOG_SELF(2, "Broadcast data and notify all coreGraph's inputs")
444  if (this->isInside()) {
445  std::ostringstream oss;
446  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
447  HLOG_SELF(0, oss.str())
448  throw (std::runtime_error(oss.str()));
449  }
450  std::static_pointer_cast<CoreQueueSender<Input>>(this->source_)->sendAndNotify(data);
451  }
452 
454  void setInside() override {
455  assert(!this->isInside());
456  HLOG_SELF(0, "Set the coreGraph inside")
458  // Remove the connection between the sink and the input nodes
459  for (CoreNode *inputNode: *(this->inputsCoreNodes_)) {
460  if (auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
461  ( coreSlot->removeNotifier(
462  static_cast<CoreNotifier *>(
463  static_cast<CoreQueueSender<GraphInputs> *>(
464  this->source_.get()
465  )
466  )
467  ), ...);
468 
469  // Remove the sender connection
470  this->removeForAllSenders(inputNode);
471  } else {
472  std::ostringstream oss;
473  oss << "Internal error, the input node is not a slot, when graph is set inside : " << __FUNCTION__;
474  HLOG_SELF(0, oss.str())
475  throw (std::runtime_error(oss.str()));
476  }
477  }
478 
479  // Disconnect the sink anf the output nodes
480  std::for_each(this->outputCoreNodes_->begin(), this->outputCoreNodes_->end(),
481  [this](CoreSender <GraphOutput> *s) {
482  s->removeSlot(this->sink_.get());
483  s->removeReceiver(this->sink_.get());
484  });
485 
486  this->removeInsideNode(this->source_.get());
487  this->removeInsideNode(this->sink_.get());
488  this->source_ = nullptr;
489  this->sink_ = nullptr;
490  }
491 
494  [[nodiscard]] std::vector<std::pair<std::string, std::string>> ids() const final {
495  std::vector<std::pair<std::string, std::string>> v{};
496  for (auto input : *(this->inputsCoreNodes_)) {
497  for (std::pair<std::string, std::string> const &innerInput : input->ids()) { v.push_back(innerInput); }
498  }
499  return v;
500  }
501 
504  void executeGraph() {
505  HLOG_SELF(2, "Execute the coreGraph")
506  if (this->isInside()) {
507  std::ostringstream oss;
508  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
509  HLOG_SELF(0, oss.str())
510  throw (std::runtime_error(oss.str()));
511  }
512  this->startExecutionTimeStamp(std::chrono::high_resolution_clock::now());
514  auto finishCreationTime = std::chrono::high_resolution_clock::now();
515  this->creationDuration(std::chrono::duration_cast<std::chrono::microseconds>(
516  finishCreationTime - this->creationTimeStamp()));
517  }
518 
521  HLOG_SELF(2, "Wait for the coreGraph to terminate")
522  this->scheduler_->joinAll();
523  std::chrono::time_point<std::chrono::high_resolution_clock>
524  endExecutionTimeStamp = std::chrono::high_resolution_clock::now();
525  this->executionDuration(std::chrono::duration_cast<std::chrono::microseconds>
526  (endExecutionTimeStamp - this->startExecutionTimeStamp()));
527  }
528 
531  HLOG_SELF(2, "Indicate finish pushing data")
532  if (this->isInside()) {
533  std::ostringstream oss;
534  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
535  HLOG_SELF(0, oss.str())
536  throw (std::runtime_error(oss.str()));
537  }
538  this->source_->notifyAllTerminated();
539  }
540 
549  std::shared_ptr<GraphOutput> getBlockingResult() {
550  HLOG_SELF(2, "Get blocking data")
551  if (this->isInside()) {
552  std::ostringstream oss;
553  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
554  HLOG_SELF(0, oss.str())
555  throw (std::runtime_error(oss.str()));
556  }
557  std::shared_ptr<GraphOutput> result = nullptr;
558  this->sink_->waitForNotification();
559  this->sink_->lockUniqueMutex();
560  if (!this->sink_->receiverEmpty()) { result = this->sink_->popFront(); }
561  this->sink_->unlockUniqueMutex();
562  return result;
563  }
564 
567  void createCluster([[maybe_unused]]std::shared_ptr<std::multimap<CoreNode *,
568  std::shared_ptr<CoreNode>>> &insideNodesGraph) override {
570  }
571 
574  void visit(AbstractPrinter *printer) override {
575  HLOG_SELF(1, "Visit")
576  // Test if the coreGraph has already been visited
577  if (printer->hasNotBeenVisited(this)) {
578 
579  // Print the header of the coreGraph
580  printer->printGraphHeader(this);
581 
582  // Print the coreGraph information
583  printer->printNodeInformation(this);
584 
585  if (this->source_ && this->sink_) {
586  this->source_->visit(printer);
587  this->sink_->visit(printer);
588  }
589 
590  // Visit all the insides node of the coreGraph
591  for (auto it = this->insideNodes()->begin(), end = this->insideNodes()->end(); it != end;
592  it = this->insideNodes()->upper_bound(it->first)) {
593  if (this->insideNodes()->count(it->first) == 1) {
594  it->second->visit(printer);
595  } else {
596  this->printCluster(printer, it->second);
597  }
598  }
599  // Print coreGraph footer
600  printer->printGraphFooter(this);
601  }
602  }
603 
604  //Virtual functions
605  //Sender
608  void addReceiver(CoreReceiver<GraphOutput> *receiver) override {
609  HLOG_SELF(0, "Add receiver " << receiver->name() << "(" << receiver->id() << ")")
610  for (CoreSender <GraphOutput> *outputNode: *(this->outputCoreNodes_)) {
611  outputNode->addReceiver(receiver);
612  }
613  }
614 
617  void removeReceiver(CoreReceiver<GraphOutput> *receiver) override {
618  HLOG_SELF(0, "Remove receiver " << receiver->name() << "(" << receiver->id() << ")")
619  for (CoreSender <GraphOutput> *outputNode: *(this->outputCoreNodes_)) {
620  outputNode->removeReceiver(receiver);
621  }
622  }
623 
627  void sendAndNotify([[maybe_unused]]std::shared_ptr<GraphOutput> ptr) override {
628  std::ostringstream oss;
629  oss << "Internal error, a graph do not send data: " << __FUNCTION__;
630  HLOG_SELF(0, oss.str())
631  throw (std::runtime_error(oss.str()));
632  }
633 
634  //Notifier
637  void addSlot(CoreSlot *slot) override {
638  HLOG_SELF(0, "Add Slot " << slot->name() << "(" << slot->id() << ")")
639  for (CoreSender <GraphOutput> *outputNode: *(this->outputCoreNodes_)) {
640  outputNode->addSlot(slot);
641  }
642  }
643 
646  void removeSlot(CoreSlot *slot) override {
647  HLOG_SELF(0, "Remove Slot " << slot->name() << "(" << slot->id() << ")")
648  for (CoreSender <GraphOutput> *outputNode: *(this->outputCoreNodes_)) {
649  outputNode->removeSlot(slot);
650  }
651  }
652 
655  void notifyAllTerminated() override {
656  std::ostringstream oss;
657  oss << "Internal error, a graph do not notify nodes: " << __FUNCTION__;
658  HLOG_SELF(0, oss.str())
659  throw (std::runtime_error(oss.str()));
660  }
661 
664  void addNotifier(CoreNotifier *notifier) override {
665  HLOG_SELF(0, "Add Notifier " << notifier->name() << "(" << notifier->id() << ")")
666  for (CoreNode *inputNode: *(this->inputsCoreNodes_)) {
667  if (auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
668  coreSlot->addNotifier(notifier);
669  } else {
670  std::ostringstream oss;
671  oss << "Internal error, A graph's input node is not a slot: " << __FUNCTION__;
672  HLOG_SELF(0, oss.str())
673  throw (std::runtime_error(oss.str()));
674  }
675  }
676  }
677 
680  void removeNotifier(CoreNotifier *notifier) override {
681  HLOG_SELF(0, "Remove Notifier " << notifier->name() << "(" << notifier->id() << ")")
682  for (CoreNode *inputNode: *(this->inputsCoreNodes_)) {
683  if (auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
684  coreSlot->removeNotifier(notifier);
685  } else {
686  std::ostringstream oss;
687  oss << "Internal error, A graph's input node is not a slot: " << __FUNCTION__;
688  HLOG_SELF(0, oss.str())
689  throw (std::runtime_error(oss.str()));
690  }
691  }
692  }
693 
696  bool hasNotifierConnected() override {
697  std::ostringstream oss;
698  oss << "Internal error, A graph has no notifier connected: " << __FUNCTION__;
699  HLOG_SELF(0, oss.str())
700  throw (std::runtime_error(oss.str()));
701  }
702 
705  [[nodiscard]] size_t numberInputNodes() const override {
706  std::ostringstream oss;
707  oss << "Internal error, A graph's is not directly connected to the input nodes: " << __FUNCTION__;
708  HLOG_SELF(0, oss.str())
709  throw (std::runtime_error(oss.str()));
710  }
711 
713  void wakeUp() override {
714  HLOG_SELF(2, "Wake up all inputs")
715  for (CoreNode *inputNode: *(this->inputsCoreNodes_)) {
716  if (auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
717  coreSlot->wakeUp();
718  } else {
719  std::ostringstream oss;
720  oss << "Internal error, A graph's input is not a core slot: " << __FUNCTION__;
721  HLOG_SELF(0, oss.str())
722  throw (std::runtime_error(oss.str()));
723  }
724  }
725  }
726 
730  bool waitForNotification() override {
731  std::ostringstream oss;
732  oss << "Internal error, a graph is not connected to input nodes, so do not wait for notification: "
733  << __FUNCTION__;
734  HLOG_SELF(0, oss.str())
735  throw (std::runtime_error(oss.str()));
736  }
737 
740  [[nodiscard]] std::set<CoreSender < GraphOutput>*>
741  getSenders() override {
742  std::set<CoreSender < GraphOutput>*> coreSenders;
743  std::set<CoreSender < GraphOutput>*> tempCoreSenders;
744  for (CoreSender <GraphOutput> *outputNode : *(this->outputCoreNodes_)) {
745  tempCoreSenders = outputNode->getSenders();
746  coreSenders.insert(tempCoreSenders.begin(), tempCoreSenders.end());
747  }
748  return coreSenders;
749  }
750 
753  [[nodiscard]] std::set<CoreSlot *> getSlots() override {
754  std::set<CoreSlot *> coreSlots;
755  std::set<CoreSlot *> tempCoreSlots;
756 
757  for (CoreNode *mr : *(this->inputsCoreNodes_)) {
758  tempCoreSlots = mr->getSlots();
759  coreSlots.insert(tempCoreSlots.begin(), tempCoreSlots.end());
760  }
761  return coreSlots;
762  }
763 
765  void joinThreads() override {
766  HLOG_SELF(2, "Join coreGraph threads")
767  this->scheduler_->joinAll();
768  }
769 
772  HLOG_SELF(0, "Cluster creation")
773  std::vector<std::shared_ptr<CoreNode>> insideCoreNodes;
774  insideCoreNodes.reserve(this->insideNodes()->size());
775  for (auto coreNode : *(this->insideNodes())) { insideCoreNodes.push_back(coreNode.second); }
776  for (auto const &insideCoreNode : insideCoreNodes) { insideCoreNode->createCluster(this->insideNodes()); }
777  launchThreads();
778  }
779 
781  void launchThreads() {
782  HLOG_SELF(0, "Launching threads")
783  std::vector<std::shared_ptr<CoreNode>> insideCoreNodes;
784  insideCoreNodes.reserve(this->insideNodes()->size());
785  for (auto coreNode : *(this->insideNodes())) { insideCoreNodes.push_back(coreNode.second); }
786  this->scheduler_->spawnThreads(insideCoreNodes);
787  }
788 
789  private:
792  void registerNode(const std::shared_ptr<CoreNode> &coreNode) {
793  HLOG_SELF(0, "Register coreNode " << coreNode->name() << "(" << coreNode->id() << ")")
794  if (!coreNode->hasBeenRegistered()) {
795  coreNode->setInside();
796  this->addUniqueInsideNode(coreNode);
797  }
798  }
799 
803  // Inside nodes to copy, MainClusterNode -> ClusterNodes
804  std::multimap<CoreNode *, std::shared_ptr<CoreNode>> &originalInsideNodes = *(rhs.insideNodes());
805  // Correspondence map, original node -> copy node
806  std::map<CoreNode *, std::shared_ptr<CoreNode>> correspondenceMap;
807  //Duplicate node
808  std::shared_ptr<CoreNode> duplicate;
809 
810  // Create all the duplicates and link them to their original node
811  for (std::pair<CoreNode *const, std::shared_ptr<CoreNode>> const &originalNode : originalInsideNodes) {
812  duplicate = originalNode.second->clone();
813  duplicate->belongingNode(this);
814  correspondenceMap.insert({originalNode.second.get(), duplicate});
815  }
816 
817  // Add the duplicate node into the insideNode structure
818  for (std::pair<CoreNode *const, std::shared_ptr<CoreNode>> const &originalNode : originalInsideNodes) {
819  // Original node
820  CoreNode *originalInsideNode = originalNode.second.get();
821  // Copy node
822  std::shared_ptr<CoreNode> duplicateInsideNode = correspondenceMap.find(originalInsideNode)->second;
823  duplicateInsideNode->belongingNode(this);
824  this->insideNodes()->insert({duplicateInsideNode.get(), duplicateInsideNode});
825  }
826 
827  //Do the linkage
828  for (std::pair<CoreNode *const, std::shared_ptr<CoreNode>> const &originalNode : originalInsideNodes) {
829  CoreNode *originalInsideNode = originalNode.second.get();
830  std::shared_ptr<CoreNode> duplicateInsideNode = correspondenceMap.find(originalInsideNode)->second;
831  originalInsideNode->duplicateEdge(duplicateInsideNode.get(), correspondenceMap);
832  }
833 
834  // Duplicate input nodes
835  for (CoreNode *originalInputNode : *(rhs.inputsCoreNodes())) {
836  auto shInputCoreNode = correspondenceMap.find(originalInputNode)->second;
837  auto inputCoreNode = shInputCoreNode.get();
838  this->inputsCoreNodes_->insert(inputCoreNode);
839  (this->duplicateInputNodes<GraphInputs>(dynamic_cast<CoreReceiver<GraphInputs> *>(inputCoreNode)), ...);
840  this->registerNode(shInputCoreNode);
841  }
842 
843  // Duplicate output nodes
844  for (CoreSender <GraphOutput> *originalOutputNode : *(rhs.outputCoreNodes())) {
845  auto shOutputCoreNode = correspondenceMap.find(originalOutputNode)->second;
846  if (auto outputCoreNode = dynamic_cast<CoreSender <GraphOutput> *>(shOutputCoreNode.get())) {
847  this->outputCoreNodes_->insert(outputCoreNode);
848 
849  for (CoreSender <GraphOutput> *sender : outputCoreNode->getSenders()) {
850  this->sink_->addNotifier(sender);
851  this->sink_->addSender(sender);
852  }
853 
854  outputCoreNode->addSlot(this->sink_.get());
855  outputCoreNode->addReceiver(this->sink_.get());
856 
857  this->registerNode(std::static_pointer_cast<CoreNode>(shOutputCoreNode));
858  } else {
859  std::ostringstream oss;
860  oss << "Internal error, the output node is not a sender: " << __FUNCTION__;
861  HLOG_SELF(0, oss.str())
862  throw (std::runtime_error(oss.str()));
863  }
864  }
865  }
866 
870  template<class ...InputNodeTypes>
872  //Set Slot/Notifiers
873  this->source_->addSlot(inputCoreNode);
874  (this->addSourceNotifierInputCoreNode<InputNodeTypes, InputNodeTypes...>(inputCoreNode), ...);
875  // If casting not correct, send nullptr that is test inside CoreGraph::addReceiverToSource
876  (this->addReceiverToSource<InputNodeTypes>(dynamic_cast<CoreReceiver<InputNodeTypes> *>(inputCoreNode)), ...);
877  }
878 
883  template<class InputNodeType, class ...InputNodeTypes>
885  if (auto compatibleSourceType = std::dynamic_pointer_cast<CoreQueueSender<InputNodeType>>(this->source_)) {
886  inputCoreNode->addNotifier(compatibleSourceType.get());
887  compatibleSourceType->addReceiver(inputCoreNode);
888  }
889  }
890 
894  template<class InputNodeType>
896  if (inputCoreNode) {
897  if (auto compatibleSource = dynamic_cast<CoreSender <InputNodeType> *>(this->source_.get())) {
898  inputCoreNode->addSender(compatibleSource);
899  dynamic_cast<CoreGraphReceiver<InputNodeType> *>(this)->addGraphReceiverInput(inputCoreNode);
900  }
901  }
902  }
903 
907  template<class InputNodeType>
909  if (inputCoreNode) {
910  static_cast<CoreGraphReceiver<InputNodeType> *>(this)->addGraphReceiverInput(dynamic_cast<CoreReceiver<
911  InputNodeType> *>(inputCoreNode));
912  if (auto coreSlot = dynamic_cast<CoreSlot *>(inputCoreNode)) {
913  this->source_->addSlot(coreSlot);
914  coreSlot->addNotifier(std::static_pointer_cast<CoreQueueSender<InputNodeType>>(this->source_).get());
915  } else {
916  std::ostringstream oss;
917  oss << "Internal error, the inputCoreNode is not a CoreSlot: " << __FUNCTION__;
918  HLOG_SELF(0, oss.str())
919  throw (std::runtime_error(oss.str()));
920  }
921  std::static_pointer_cast<CoreQueueSender<InputNodeType>>(this->source_)->addReceiver(dynamic_cast<CoreReceiver<
922  InputNodeType> *>(inputCoreNode));
923  dynamic_cast<CoreReceiver<InputNodeType> *>(inputCoreNode)->addSender(static_cast<CoreSender <InputNodeType> *>(this->source_.get()));
924  }
925  }
926 
930  void printCluster(AbstractPrinter *printer, std::shared_ptr<CoreNode> const &node) {
931  printer->printClusterHeader(node->coreClusterNode());
932  for (auto it = this->insideNodes()->equal_range(node.get()).first;
933  it != this->insideNodes()->equal_range(node.get()).second; ++it) {
934  printer->printClusterEdge(it->second.get());
935  it->second->visit(printer);
936  }
937  printer->printClusterFooter();
938  }
939 };
940 }
941 
942 }
943 #endif //HEDGEHOG_CORE_GRAPH_H
NodeType type() const
Node type accessor.
Definition: core_node.h:132
int deviceId() override
Device id accessor.
Definition: core_graph.h:148
std::shared_ptr< GraphOutput > getBlockingResult()
Get data out of the graph.
Definition: core_graph.h:549
std::vector< std::pair< std::string, std::string > > ids() const final
Get ids of input nodes (vector<pair<nodeId, nodeIdCluster>>)
Definition: core_graph.h:494
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
void removeForAllSenders(CoreNode *coreNode)
Remove all coreNode&#39;s senders from this.
virtual void printClusterFooter()=0
Print cluster footer.
bool waitForNotification() override
A graph can&#39;t wait for notification, throws an error in all case.
Definition: core_graph.h:730
std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode > > > const & insideNodes() const
Inside node accessor.
Definition: core_node.h:161
std::chrono::duration< uint64_t, std::micro > maxWaitTime() const override
Compute the maximum wait time for the graph&#39;s inside nodes.
Definition: core_graph.h:223
std::set< CoreSender< GraphOutput > * > getSenders() override
Get the senders from the graphs, gather them from the output nodes.
Definition: core_graph.h:741
Graph< GraphOutput, GraphInputs... > * graph_
User graph.
Definition: core_graph.h:65
void addReceiversToSource(CoreMultiReceivers< InputNodeTypes... > *inputCoreNode)
Add receivers to source and do the connection.
Definition: core_graph.h:871
Graph Receiver for a type GraphInput.
std::unique_ptr< std::set< CoreSender< GraphOutput > * > > const & outputCoreNodes() const
Output node&#39;s CoreSender accessor.
Definition: core_graph.h:161
std::shared_ptr< CoreGraphSource< GraphInputs... > > const & source() const
Source accessor.
Definition: core_graph.h:167
Core Notifier interface, emit notification to CoreSlot.
Definition: core_notifier.h:34
std::unique_ptr< AbstractScheduler > scheduler_
Scheduler.
Definition: core_graph.h:68
std::shared_ptr< CoreGraphSource< GraphInputs... > > source_
Outer graph&#39;s source.
Definition: core_graph.h:69
std::shared_ptr< CoreNode > clone() override
Clone a core graph calling the graph copy constructor.
Definition: core_graph.h:137
std::shared_ptr< CoreGraphSink< GraphOutput > > sink_
Inner graph&#39;s source.
Definition: core_graph.h:70
void createCluster([[maybe_unused]]std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &insideNodesGraph) override
Create all clusters for inside nodes and launch the threads, not gathered into insideNodesGraph.
Definition: core_graph.h:567
void setInside() override
Set the graph as inside, in case of connection to another node.
Definition: core_graph.h:454
std::unique_ptr< std::set< CoreSender< GraphOutput > * > > outputCoreNodes_
Output node&#39;s core.
Definition: core_graph.h:67
Hedgehog main namespace.
Part of the outer graph that sends data from the outside to the input nodes.
virtual void setInside()
Set the node as inside, (inside a graph)
Definition: core_node.h:433
int graphId_
Graph Id.
Definition: core_graph.h:71
std::unique_ptr< std::set< CoreNode * > > const & inputsCoreNodes() const
Input node&#39;s cores accessor.
Definition: core_graph.h:156
void addReceiverToSource(CoreReceiver< InputNodeType > *inputCoreNode)
If the input core node is compatible, connect it to the source.
Definition: core_graph.h:895
CoreGraph(Graph< GraphOutput, GraphInputs... > *graph, NodeType const type, std::string_view const &name, std::unique_ptr< AbstractScheduler > scheduler=std::make_unique< DefaultScheduler >())
CoreGraph constructor.
Definition: core_graph.h:80
bool hasNotifierConnected() override
Test notifier for the graph, should not be used, connection is made to the input nodes.
Definition: core_graph.h:696
void removeInsideNode(CoreNode *coreNode)
Remove a node from the registered inside nodes.
Definition: core_node.h:529
void addGraphReceiverInput(CoreReceiver< GraphInputs > *receiver)
Register a CoreReceiver from an input node.
std::unique_ptr< std::set< CoreNode * > > inputsCoreNodes_
Input node&#39;s core.
Definition: core_graph.h:66
void output(std::shared_ptr< behavior::Sender< GraphOutput >> outputNode)
Set a node as output for the graph.
Definition: core_graph.h:398
int deviceId_
Device Id used for computation on devices.
Definition: core_graph.h:72
void addSlot(CoreSlot *slot) override
Add a slot to a graph, i.e, to all output nodes.
Definition: core_graph.h:637
Sender Behavior definition, node has an output type.
Definition: sender.h:32
Multi receiver interface, gather multiple CoreReceiver.
Check if a template T is in Template pack Ts.
Definition: traits.h:77
Sender for nodes possessing a queue of data.
void finishPushingData()
Notify the graph no more input data will be pushed.
Definition: core_graph.h:530
std::chrono::time_point< std::chrono::high_resolution_clock > const & creationTimeStamp() const
Creation timestamp accessor.
Definition: core_node.h:227
void sendAndNotify([[maybe_unused]]std::shared_ptr< GraphOutput > ptr) override
Send a data and notify receivers, not possible for a graph, throws an error in every case...
Definition: core_graph.h:627
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the graph, i.e, add a receiver to all output nodes.
Definition: core_graph.h:608
void createInnerClustersAndLaunchThreads()
Create inside nodes&#39; cluster and launch the threads.
Definition: core_graph.h:771
Printer interface.
CoreGraph(CoreGraph< GraphOutput, GraphInputs... > const &rhs)
Core graph copy constructor.
Definition: core_graph.h:110
void graphId(size_t graphId)
Graph id setter.
Definition: core_graph.h:171
Node Behavior definition.
Definition: node.h:39
Slot interface, receive notification from CoreNotifier.
Definition: core_slot.h:34
void input(std::shared_ptr< UserDefinedMultiReceiver > inputNode)
Set a node as input for the graph.
Definition: core_graph.h:360
void registerNode(const std::shared_ptr< CoreNode > &coreNode)
Register a node inside the graph.
Definition: core_graph.h:792
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
Main Hedgehog object that does computation.
Definition: graph.h:85
size_t numberInputNodes() const override
Return the number of input nodes connected, a graph should not have such a connection, throws in every case.
Definition: core_graph.h:705
void addNotifier(CoreNotifier *notifier) override
Add a notifier to the graph, ie, to all input nodes.
Definition: core_graph.h:664
void waitForTermination()
Wait for all inside threads to join.
Definition: core_graph.h:520
void executeGraph()
Execute the graph.
Definition: core_graph.h:504
void addEdge(std::shared_ptr< UserDefinedSender > from, std::shared_ptr< UserDefinedMultiReceiver > to)
Add a directed edge from a compatible "from" node to "to" node.
Definition: core_graph.h:287
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
Sender interface, send data to CoreReceiver.
Definition: core_sender.h:35
void deviceId(int deviceId) override
Device id setter.
Definition: core_graph.h:175
void addUniqueInsideNode(const std::shared_ptr< CoreNode > &coreNode)
Add a node to the inside nodes.
Definition: core_node.h:550
void joinThreads() override
Join the threads managed by the graph.
Definition: core_graph.h:765
void duplicateInsideNodes(CoreGraph< GraphOutput, GraphInputs... > const &rhs)
Duplicate inside nodes, called by CoreExecutionPipeline.
Definition: core_graph.h:802
void visit(AbstractPrinter *printer) override
Special visit method for a CoreGraph.
Definition: core_graph.h:574
~CoreGraph() override
Graph&#39;s core default destructor.
Definition: core_graph.h:140
void printCluster(AbstractPrinter *printer, std::shared_ptr< CoreNode > const &node)
Specialized method if the input node is in a cluster.
Definition: core_graph.h:930
Main Hedgehog core abstraction.
Definition: core_node.h:48
MultiReceivers Behavior definition, node has a list of input types.
behavior::Node * node() override
User graph accessor.
Definition: core_graph.h:144
void notifyAllTerminated() override
Notify termination to all connected nodes, not possible for a graph, throw an error in every case...
Definition: core_graph.h:655
void removeReceiver(CoreReceiver< GraphOutput > *receiver) override
Remove a receiver from the graph, i.e, remove a receiver from all output nodes.
Definition: core_graph.h:617
void wakeUp() override
Wake up a graph, wake up all input nodes.
Definition: core_graph.h:713
std::chrono::time_point< std::chrono::high_resolution_clock > const & startExecutionTimeStamp() const
Execution start timestamp accessor.
Definition: core_node.h:233
virtual void printClusterHeader(core::CoreNode const *clusterNode)=0
Print cluster header.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
std::chrono::duration< uint64_t, std::micro > const & executionDuration() const
Execution duration accessor.
Definition: core_node.h:244
virtual void printGraphFooter(core::CoreNode const *node)=0
Print graph footer.
virtual std::set< CoreSender< Output > * > getSenders()=0
Get inner CoreSender represented by this one in the case of outer graph for example.
virtual void printGraphHeader(core::CoreNode const *node)=0
Print graph header.
virtual void addSlot(CoreSlot *slot)=0
Interface to add a CoreSlot to this notifier.
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
virtual void addNotifier(CoreNotifier *notifier)=0
Interface to add a CoreNotifier to this slot.
virtual void printClusterEdge(core::CoreNode const *clusterNode)=0
Print cluster edge.
void addSender(CoreSender< GraphInputs > *sender) final
Add a CoreSender to the graph.
Core associated to the Graph.
Definition: core_graph.h:63
void addSourceNotifierInputCoreNode(CoreMultiReceivers< InputNodeTypes... > *inputCoreNode)
Add an input node to the source.
Definition: core_graph.h:884
void launchThreads()
Launch the threads using the graph&#39;s scheduler.
Definition: core_graph.h:781
CoreNode * belongingNode() const
Belonging node accessor.
Definition: core_node.h:156
bool hasBeenRegistered() const
Node registration property accessor.
Definition: core_node.h:140
virtual void printNodeInformation(core::CoreNode *node)=0
Print node information.
virtual void duplicateEdge(CoreNode *duplicateNode, std::map< CoreNode *, std::shared_ptr< CoreNode >> &correspondenceMap)=0
Duplicate all of the edges from this to its copy duplicateNode.
int graphId() override
Graph id accessor.
Definition: core_graph.h:152
std::chrono::duration< uint64_t, std::micro > minWaitTime() const override
Compute the minimum wait time for the graph&#39;s inside nodes.
Definition: core_graph.h:244
void removeNotifier(CoreNotifier *notifier) override
Remove a notifier from the graph, ie, from all input nodes.
Definition: core_graph.h:680
Base definition of HelperMultiReceiversType.
Definition: helper.h:50
std::chrono::duration< uint64_t, std::micro > maxExecutionTime() const override
Compute the maximum execution time for the graph&#39;s inside nodes.
Definition: core_graph.h:179
std::chrono::duration< uint64_t, std::micro > minExecutionTime() const override
Compute the minimum execution time for the graph&#39;s inside nodes.
Definition: core_graph.h:202
std::chrono::duration< uint64_t, std::micro > const & creationDuration() const
Creation duration accessor.
Definition: core_node.h:240
bool isInside() const
Node inside property accessor.
Definition: core_node.h:136
virtual std::string id() const
Unique Id accessor.
Definition: core_node.h:114
std::set< CoreSlot * > getSlots() override
Get the slots from the graphs, gather them from the input nodes.
Definition: core_graph.h:753
void removeSlot(CoreSlot *slot) override
Remove a slot from a graph, i.e, from all output nodes.
Definition: core_graph.h:646
Base definition of HelperCoreMultiReceiversType.
Definition: helper.h:62
void broadcastAndNotifyToAllInputs(std::shared_ptr< Input > &data)
Broadcast data and notify all input nodes.
Definition: core_graph.h:442
virtual void addSender(CoreSender< Input > *sender)=0
Interface to add a CoreSender to the receiver.
void duplicateInputNodes(CoreReceiver< InputNodeType > *inputCoreNode)
Duplicate input nodes, and do the connections for the source compatible type.
Definition: core_graph.h:908