Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_execution_pipeline.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 #include <utility>
19 
20 
21 
22 #ifndef HEDGEHOG_CORE_EXECUTION_PIPELINE_H
23 #define HEDGEHOG_CORE_EXECUTION_PIPELINE_H
24 
25 #include "core_switch.h"
26 #include "../core_task.h"
27 #include "../../../api/graph.h"
28 
30 namespace hh {
31 
32 #ifndef DOXYGEN_SHOULD_SKIP_THIS
33 template<class GraphOutput, class ...GraphInputs>
37 class AbstractExecutionPipeline;
38 #endif //DOXYGEN_SHOULD_SKIP_THIS
39 
41 namespace core {
42 
49 template<class GraphOutput, class ...GraphInputs>
50 class CoreExecutionPipeline : public CoreTask<GraphOutput, GraphInputs...> {
51  private:
52  AbstractExecutionPipeline<GraphOutput, GraphInputs...> *executionPipeline_ = nullptr;
53  size_t numberGraphs_ = 0;
54  std::vector<int> deviceIds_ = {};
55 
56  protected:
57  std::shared_ptr<CoreSwitch<GraphInputs...>> coreSwitch_;
58  std::vector<std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>>> epGraphs_ = {};
59 
61  public:
63  CoreExecutionPipeline() = delete;
64 
72  CoreExecutionPipeline(std::string_view const &name,
74  std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> coreBaseGraph,
75  size_t numberGraphs,
76  std::vector<int> const &deviceIds,
77  bool automaticStart)
78  : CoreTask<GraphOutput, GraphInputs...>(name, 1, NodeType::ExecutionPipeline, nullptr, automaticStart),
79  executionPipeline_(executionPipeline),
80  numberGraphs_(numberGraphs),
81  deviceIds_(deviceIds),
82  coreSwitch_(std::make_shared<CoreSwitch<GraphInputs...>>()) {
83  if (this->numberGraphs_ == 0) { this->numberGraphs_ = 1; }
84 
85  if (coreBaseGraph->isInside() || this->isInside()) {
86  std::ostringstream oss;
87  oss << "You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
88  HLOG_SELF(0, oss.str())
89  throw (std::runtime_error(oss.str()));
90  }
91  if (numberGraphs_ != deviceIds.size()) {
92  std::ostringstream oss;
93  oss
94  << "The number of device Ids do not correspond to the number of coreGraph duplications you sent to the "
95  "execution pipeline \""
96  << name
97  << "\". Even if you do not associate the graphs duplicates to a specific device, please set the deviceIds "
98  "to the map to the number of duplicates specified."
99  << std::endl;
100 
101  HLOG_SELF(0, oss.str())
102  throw (std::runtime_error(oss.str()));
103  }
104  epGraphs_.reserve(this->numberGraphs_);
105 
106  coreBaseGraph->graphId(0);
107  coreBaseGraph->deviceId(this->deviceIds_[0]);
108  connectGraphToEP(coreBaseGraph);
109 
110  this->duplicateGraphs();
111  }
112 
114  virtual ~CoreExecutionPipeline() = default;
115 
118  [[nodiscard]] behavior::Node *node() override { return this->executionPipeline_; }
119 
122  [[nodiscard]] std::vector<int> const &deviceIds() const { return deviceIds_; }
123 
126  [[nodiscard]] size_t numberGraphs() const { return numberGraphs_; }
127 
130  [[nodiscard]] std::string id() const override { return this->coreSwitch_->id(); }
131 
134  AbstractExecutionPipeline<GraphOutput, GraphInputs...> *executionPipeline() const {
135  return executionPipeline_;
136  }
137 
141  int deviceId() override {
142  std::ostringstream oss;
143  oss << "Internal error, an execution pipeline has not device id: " << __FUNCTION__;
144  HLOG_SELF(0, oss.str())
145  throw (std::runtime_error(oss.str()));
146  }
147 
151  std::set<CoreSender<GraphOutput> *>
152  getSenders() override {
153  std::set<CoreSender<GraphOutput> *>
154  res = {},
155  senders = {};
156  for (auto epGraph : this->epGraphs_) {
157  senders.clear();
158  senders = epGraph->getSenders();
159  mergeSenders(res, senders);
160  }
161  return res;
162  }
163 
166  std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> baseCoreGraph() {
167  return this->epGraphs_.empty() ? nullptr : this->epGraphs_[0];
168  }
169 
172  void addReceiver(CoreReceiver<GraphOutput> *receiver) override {
173  for (CoreReceiver<GraphOutput> *r : receiver->receivers()) {
174  if (auto coreQueueReceiver = dynamic_cast<CoreQueueReceiver<GraphOutput> *>(r)) {
175  this->destinations()->insert(coreQueueReceiver);
176  } else {
177  std::ostringstream oss;
178  oss << "Internal error, a receiver added to an execution pipeline is not a coreQueueReceiver : "
179  << __FUNCTION__;
180  HLOG_SELF(0, oss.str())
181  throw (std::runtime_error(oss.str()));
182  }
183  }
184  for (auto epGraph : this->epGraphs_) {
185  connectGraphsOutputToReceiver(epGraph.get(), receiver);
186  }
187  }
188 
191  void addSlot(CoreSlot *slot) override {
192  for (auto graph : this->epGraphs_) { graph->addSlot(slot); }
193  }
194 
197  void visit(AbstractPrinter *printer) override {
198  if (printer->hasNotBeenVisited(this)) {
199  printer->printExecutionPipelineHeader(this, coreSwitch_.get());
200  for (auto graph: epGraphs_) {
201  (this->printEdgeSwitchGraphs<GraphInputs>(printer, graph.get()), ...);
202  graph->visit(printer);
203  }
204  printer->printExecutionPipelineFooter();
205  }
206  }
207 
209  void createCluster([[maybe_unused]]std::shared_ptr<std::multimap<CoreNode *,
210  std::shared_ptr<CoreNode>>> &) override {
211  for (std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> epGraph : this->epGraphs_) {
212  epGraph->createInnerClustersAndLaunchThreads();
213  }
214  }
215 
218  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> maxExecutionTime() const override {
219  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min();
220  for (auto graph: epGraphs_) {
221  std::chrono::duration<uint64_t, std::micro> temp = graph->maxExecutionTime();
222  if (temp > ret) ret = temp;
223  }
224  return ret;
225  }
226 
229  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> minExecutionTime() const override {
230  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max();
231  for (auto graph: epGraphs_) {
232  std::chrono::duration<uint64_t, std::micro> temp = graph->minExecutionTime();
233  if (temp < ret) ret = temp;
234  }
235  return ret;
236  }
237 
240  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> maxWaitTime() const override {
241  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min();
242  for (auto graph: epGraphs_) {
243  std::chrono::duration<uint64_t, std::micro> temp = graph->maxWaitTime();
244  if (temp > ret) ret = temp;
245  }
246  return ret;
247  }
248 
251  [[nodiscard]] std::chrono::duration<uint64_t, std::micro> minWaitTime() const override {
252  std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max();
253  for (auto graph: epGraphs_) {
254  std::chrono::duration<uint64_t, std::micro> temp = graph->minWaitTime();
255  if (temp < ret) ret = temp;
256  }
257  return ret;
258  }
259 
260  protected:
264  bool callCanTerminate(bool lock) override {
265  bool result;
266  if (lock) { this->lockUniqueMutex(); }
267  result = !this->hasNotifierConnected() && this->receiversEmpty();
268  HLOG_SELF(2, "callCanTerminate: " << std::boolalpha << result)
269  if (lock) { this->unlockUniqueMutex(); }
270  return result;
271  };
272 
273  private:
276  for (size_t numberGraph = 1; numberGraph < this->numberGraphs(); ++numberGraph) {
277  auto graphDuplicate =
278  std::dynamic_pointer_cast<CoreGraph<GraphOutput, GraphInputs...>>(this->baseCoreGraph()->clone());
279  graphDuplicate->graphId(numberGraph);
280  graphDuplicate->deviceId(this->deviceIds_[numberGraph]);
281  connectGraphToEP(graphDuplicate);
282  }
283  }
284 
288  template<class GraphInput>
290  auto coreSender = std::static_pointer_cast<CoreSender<GraphInput>>(this->coreSwitch_);
291  auto coreNotifier = std::static_pointer_cast<CoreNotifier>(coreSender);
292  auto coreSlot = std::static_pointer_cast<CoreSlot>(graph);
293  auto coreReceiver = std::static_pointer_cast<CoreReceiver<GraphInput>>(graph);
294 
295  for (auto r : coreReceiver->receivers()) { coreSender->addReceiver(r); }
296  for (auto s : coreSender->getSenders()) {
297  coreReceiver->addSender(s);
298  coreSlot->addNotifier(s);
299  }
300  for (CoreSlot *slot : coreSlot->getSlots()) { coreNotifier->addSlot(slot); }
301  }
302 
305  void connectGraphToEP(std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> &coreGraph) {
306  coreGraph->setInside();
307  coreGraph->belongingNode(this);
308  coreGraph->hasBeenRegistered(true);
309  (addEdgeSwitchGraph<GraphInputs>(coreGraph), ...);
310  this->epGraphs_.push_back(coreGraph);
311  }
312 
317  CoreReceiver<GraphOutput> *coreReceiver) {
318  for (CoreReceiver<GraphOutput> *receiver : coreReceiver->receivers()) { graph->addReceiver(receiver); }
319  }
320 
325  > &superSet, std::set<CoreSender<GraphOutput> *> &graphSenders) {
326  for (CoreSender<GraphOutput> *sender : graphSenders) { superSet.insert(sender); }
327  }
328 
333  template<class GraphInput>
335  for (CoreNode *graphInputNode : *(graph->inputsCoreNodes())) {
336  if (auto *coreQueueReceiver = dynamic_cast<CoreQueueReceiver<GraphInput> *>(graphInputNode)) {
337  printer->printEdgeSwitchGraphs(coreQueueReceiver,
338  this->id(),
339  traits::type_name<GraphInput>(),
340  coreQueueReceiver->queueSize(),
341  coreQueueReceiver->maxQueueSize(),
342  traits::is_managed_memory_v<GraphInput>);
343  }
344  }
345  }
346 };
347 
348 }
349 }
350 #endif //HEDGEHOG_CORE_EXECUTION_PIPELINE_H
behavior::Node * node() override
Return the user&#39;s node.
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
std::chrono::duration< uint64_t, std::micro > maxWaitTime() const override
Return the maximum wait time of all inside graphs.
virtual void printExecutionPipelineHeader(core::CoreNode *epNode, core::CoreNode *switchNode)=0
Print execution pipeline header.
std::vector< std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > > epGraphs_
Core Copies of the graphs (actual memory is stored here)
CoreExecutionPipeline()=delete
Deleted Default constructor.
std::chrono::duration< uint64_t, std::micro > minWaitTime() const override
Return the minimum wait time of all inside graphs.
bool hasNotifierConnected() final
Test if CoreNotifier are linked to this CoreQueueSlot.
std::chrono::duration< uint64_t, std::micro > minExecutionTime() const override
Return the minimum execution time of all inside graphs.
virtual std::set< CoreReceiver< Input > * > receivers()=0
Accessor to all receivers connected to this receiver.
std::shared_ptr< CoreSwitch< GraphInputs... > > coreSwitch_
Switch use to divert the data to the graphs.
std::chrono::duration< uint64_t, std::micro > maxExecutionTime() const override
Return the maximum execution time of all inside graphs.
Core Notifier interface, emit notification to CoreSlot.
Definition: core_notifier.h:34
void connectGraphsOutputToReceiver(CoreGraph< GraphOutput, GraphInputs... > *graph, CoreReceiver< GraphOutput > *coreReceiver)
Connect a CoreReceiver to all output of a graph.
void addSlot(CoreSlot *slot) override
Add a slot to a execution pipeline, i.e. to all inside graphs.
void printEdgeSwitchGraphs(AbstractPrinter *printer, CoreGraph< GraphOutput, GraphInputs... > *graph)
Print an edge for GraphInput input from the switch to all graph&#39;s input node.
Hedgehog main namespace.
std::unique_ptr< std::set< CoreNode * > > const & inputsCoreNodes() const
Input node&#39;s cores accessor.
Definition: core_graph.h:156
void addEdgeSwitchGraph(std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> &graph)
Add data and notification link between switch and one inside graph.
AbstractExecutionPipeline< GraphOutput, GraphInputs... > * executionPipeline() const
User execution pipeline accessor.
Core of the task node.
Definition: core_task.h:56
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the execution pipeline, to all inside graphs.
virtual ~CoreExecutionPipeline()=default
Default destructor.
Node interface made for duplicating a graph, for example in the case of multi-GPU computation...
CoreExecutionPipeline(std::string_view const &name, AbstractExecutionPipeline< GraphOutput, GraphInputs... > *executionPipeline, std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> coreBaseGraph, size_t numberGraphs, std::vector< int > const &deviceIds, bool automaticStart)
The core execution pipeline constructor.
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the graph, i.e, add a receiver to all output nodes.
Definition: core_graph.h:608
Printer interface.
Core switch, determine where to divert data to the graphs.
Definition: core_switch.h:32
Node Behavior definition.
Definition: node.h:39
AbstractExecutionPipeline< GraphOutput, GraphInputs... > * executionPipeline_
User&#39;s execution pipeline.
Slot interface, receive notification from CoreNotifier.
Definition: core_slot.h:34
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
size_t numberGraphs_
Total number of graphs in the execution pipeline.
Receiver for nodes possessing a queue of data.
int deviceId() override
Get a device id, not possible for an execution pipeline, throw an error in every case.
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
Sender interface, send data to CoreReceiver.
Definition: core_sender.h:35
void mergeSenders(std::set< CoreSender< GraphOutput > * > &superSet, std::set< CoreSender< GraphOutput > *> &graphSenders)
Add the graph&#39;s senders to the super set.
size_t numberGraphs() const
Number of execution pipeline&#39;s graphs accessor.
void unlockUniqueMutex()
Unlock the mutex.
std::set< CoreSender< GraphOutput > * > getSenders() override
Execution pipeline&#39;s senders accessor.
Main Hedgehog core abstraction.
Definition: core_node.h:48
STL namespace.
void createCluster([[maybe_unused]]std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &) override
Create inner graphs clusters and launch the threads.
virtual void printExecutionPipelineFooter()=0
Print execution pipeline footer.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
void visit(AbstractPrinter *printer) override
Special visit method for an execution pipeline, visit also all inside graphs.
std::vector< int > const & deviceIds() const
Device ids accessor.
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
void lockUniqueMutex()
Lock the mutex.
bool receiversEmpty() final
Test emptiness of all receivers.
Core associated to the Graph.
Definition: core_graph.h:63
bool callCanTerminate(bool lock) override
Can terminate for the ep, specialised to not call user&#39;s defined one.
bool automaticStart() const
Automatic start property accessor.
Definition: core_task.h:98
virtual void printEdgeSwitchGraphs(core::CoreNode *to, std::string const &idSwitch, std::string_view const &edgeType, size_t const &queueSize, size_t const &maxQueueSize, bool isMemoryManaged)=0
Print the edges from the switch representation to a node.
int graphId() override
Graph id accessor.
Definition: core_graph.h:152
void duplicateGraphs()
Duplicate the graphs and link it to the switch.
virtual std::shared_ptr< std::set< CoreQueueReceiver< GraphOutput > *> > const & destinations() const
Destination accessor.
std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > baseCoreGraph()
Return the core of the base graph.
bool isInside() const
Node inside property accessor.
Definition: core_node.h:136
std::string id() const override
Execution pipeline id, i.e switch id accessor.
virtual void addSender(CoreSender< Input > *sender)=0
Interface to add a CoreSender to the receiver.
void connectGraphToEP(std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> &coreGraph)
Connect a graph to the switch and register it.
std::vector< int > deviceIds_
Device id&#39;s value to set to the different graphs into the execution pipeline.