Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_default_execution_pipeline.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 
19 
20 #ifndef HEDGEHOG_CORE_DEFAULT_EXECUTION_PIPELINE_H
21 #define HEDGEHOG_CORE_DEFAULT_EXECUTION_PIPELINE_H
22 
23 #include "../node/execution_pipeline/core_execution_pipeline.h"
24 #include "../../behavior/switch_rule.h"
25 
27 namespace hh::core {
28 
29 #if defined (__clang__)
30 #pragma clang diagnostic push
31 #pragma clang diagnostic ignored "-Woverloaded-virtual"
32 #endif //__clang//
33 template<class GraphInput, class GraphOutput, class ...GraphInputs>
38 class CoreDefaultExecutionPipelineExecute : public virtual CoreExecutionPipeline<GraphOutput, GraphInputs...> {
39  public:
47  CoreDefaultExecutionPipelineExecute(std::string_view const &name,
49  std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> coreBaseGraph,
50  size_t numberGraphs,
51  std::vector<int> const &deviceIds,
52  bool automaticStart = false
53  ) : CoreExecutionPipeline<GraphOutput, GraphInputs...>(name,
54  executionPipeline,
55  coreBaseGraph,
56  numberGraphs,
57  deviceIds,
58  automaticStart) {}
59 
63  void callExecute([[maybe_unused]]std::shared_ptr<GraphInput> data) override {
64  for (auto graph : this->epGraphs_) {
65  if (this->callSendToGraph<GraphInput>(data, graph->graphId())) {
66  std::static_pointer_cast<CoreGraphReceiver<GraphInput>>(graph)->receive(data);
67  graph->wakeUp();
68  }
69  }
70  }
71 
72  private:
78  template<class Input>
79  bool callSendToGraph(std::shared_ptr<Input> &data, size_t const &graphId) {
80  return static_cast<behavior::SwitchRule<Input> *>(this->executionPipeline())->sendToGraph(data, graphId);
81  }
82 
83 };
84 #if defined (__clang__)
85 #pragma clang diagnostic pop
86 #endif //__clang//
87 
91 template<class GraphOutput, class ...GraphInputs>
93  GraphOutput,
94  GraphInputs...> ... {
95  public:
96  using CoreDefaultExecutionPipelineExecute<GraphInputs, GraphOutput, GraphInputs...>::callExecute...;
97 
106  std::string_view const &name,
108  std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> coreBaseGraph,
109  size_t numberGraphs,
110  std::vector<int> const &deviceIds,
111  bool automaticStart) :
112  CoreNode(name, NodeType::ExecutionPipeline, 1),
113  CoreNotifier(name, NodeType::ExecutionPipeline, 1),
114  CoreQueueNotifier(name, NodeType::ExecutionPipeline, 1),
115  CoreQueueSender<GraphOutput>(name, NodeType::ExecutionPipeline, 1),
116  CoreSlot(name, NodeType::ExecutionPipeline, 1),
117  CoreReceiver<GraphInputs>(name, NodeType::ExecutionPipeline, 1)...,
118  CoreExecutionPipeline<GraphOutput, GraphInputs...>(
119  name, executionPipeline, coreBaseGraph, numberGraphs, deviceIds, automaticStart),
120  CoreDefaultExecutionPipelineExecute<GraphInputs, GraphOutput, GraphInputs...>(
121  name, executionPipeline, coreBaseGraph, numberGraphs, deviceIds, automaticStart)
122  ... {}
123 
128  std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> baseGraph) :
129  CoreNode(rhs.name(), NodeType::ExecutionPipeline, 1),
130  CoreNotifier(rhs.name(), NodeType::ExecutionPipeline, 1),
131  CoreQueueNotifier(rhs.name(), NodeType::ExecutionPipeline, 1),
132  CoreQueueSender<GraphOutput>(rhs.name(), NodeType::ExecutionPipeline, 1),
133  CoreSlot(rhs.name(), NodeType::ExecutionPipeline, 1),
134  CoreReceiver<GraphInputs>(rhs.name(), NodeType::ExecutionPipeline, 1)...,
135  CoreExecutionPipeline<GraphOutput, GraphInputs...>(
136  rhs.name(),
137  rhs.executionPipeline(),
138  baseGraph,
139  rhs.numberGraphs(),
140  rhs.deviceIds(),
141  rhs.automaticStart()),
142  CoreDefaultExecutionPipelineExecute<GraphInputs, GraphOutput, GraphInputs...>(
143  rhs.name(),
144  rhs.executionPipeline(),
145  baseGraph,
146  rhs.numberGraphs(),
147  rhs.deviceIds(),
148  rhs.automaticStart())...
149  { }
150 
152  virtual ~CoreDefaultExecutionPipeline() = default;
153 
156  std::shared_ptr<CoreNode> clone() override {
157  return std::make_shared<CoreDefaultExecutionPipeline<GraphOutput, GraphInputs...>>(
158  *this,
159  std::dynamic_pointer_cast<CoreGraph<GraphOutput, GraphInputs...>>(
160  this->baseCoreGraph()->clone()
161  )
162  );
163  }
164 
167  bool waitForNotification() override {
168  //Lock the slot
169  std::unique_lock<std::mutex> lock(*(this->slotMutex()));
170  HLOG_SELF(2, "Wait for notification")
171  // Wait on data or termination
172  this->notifyConditionVariable()->wait(lock,
173  [this]() {
174  bool receiversEmpty = this->receiversEmpty();
175  bool callCanTerminate = this->callCanTerminate(false);
176  HLOG_SELF(2,
177  "Check for notification: " << std::boolalpha
178  << (bool) (!receiversEmpty) << "||"
179  << std::boolalpha
180  << (bool) callCanTerminate)
181  return !receiversEmpty || callCanTerminate;
182  });
183  HLOG_SELF(2, "Notification received")
184  return this->callCanTerminate(false);
185  }
186 
188  void postRun() override {
189  this->isActive(false);
190  this->nvtxProfiler()->startRangeShuttingDown();
191  // Disconnect the CoreDefaultExecutionPipeline of all of its inside graph
192  for (std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> graph : this->epGraphs_) {
193  (removeSwitchReceiver<GraphInputs>(graph.get()), ...);
194  }
195  // Notify all inside graphs
196  this->coreSwitch_->notifyAllTerminated();
197 
198  // Wait for them to terminate before own termination
199  for (std::shared_ptr<CoreGraph<GraphOutput, GraphInputs...>> graph : this->epGraphs_) {
200  graph->waitForTermination();
201  }
202  this->nvtxProfiler()->endRangeShuttingDown();
203  }
204 
205  private:
209  template<class GraphInput>
211  for (auto r : coreGraphReceiver->receivers()) {
212  static_cast<CoreQueueSender<GraphInput> *>(this->coreSwitch_.get())->removeReceiver(r);
213  }
214  coreGraphReceiver->removeSender(static_cast<CoreQueueSender<GraphInput> *>(this->coreSwitch_.get()));
215  }
216 };
217 }
218 #endif //HEDGEHOG_CORE_DEFAULT_EXECUTION_PIPELINE_H
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
CoreDefaultExecutionPipeline(CoreDefaultExecutionPipeline< GraphOutput, GraphInputs... > const &rhs, std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> baseGraph)
Constructor used to clone a CoreDefaultExecutionPipeline.
void callExecute([[maybe_unused]]std::shared_ptr< GraphInput > data) override
Definition of CoreExecute::callExecute for CoreDefaultExecutionPipeline.
std::vector< std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > > epGraphs_
Core Copies of the graphs (actual memory is stored here)
bool callSendToGraph(std::shared_ptr< Input > &data, size_t const &graphId)
Wrapper to the user-defined SwitchRule::sendToGraph.
Graph Receiver for a type GraphInput.
std::shared_ptr< CoreSwitch< GraphInputs... > > coreSwitch_
Switch use to divert the data to the graphs.
Core Notifier interface, emit notification to CoreSlot.
Definition: core_notifier.h:34
std::shared_ptr< std::mutex > const & slotMutex() const
Mutex accessor.
bool waitForNotification() override
Define how the CoreDefaultExecutionPipeline wait for data.
bool isActive() const
Is active property accessor.
Definition: core_node.h:191
AbstractExecutionPipeline< GraphOutput, GraphInputs... > * executionPipeline() const
User execution pipeline accessor.
Sender for nodes possessing a queue of data.
Node core for the default execution pipeline.
Slot interface, receive notification from CoreNotifier.
Definition: core_slot.h:34
virtual int graphId()
Graph id accessor.
Definition: core_node.h:199
CoreDefaultExecutionPipeline(std::string_view const &name, AbstractExecutionPipeline< GraphOutput, GraphInputs... > *executionPipeline, std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> coreBaseGraph, size_t numberGraphs, std::vector< int > const &deviceIds, bool automaticStart)
Node core for the default execution pipeline constructor.
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
size_t numberGraphs() const
Number of execution pipeline&#39;s graphs accessor.
Main Hedgehog core abstraction.
Definition: core_node.h:48
void postRun() override
Post Execute loop for CoreDefaultExecutionPipeline.
Notifier of CoreQueueSlot.
std::shared_ptr< std::condition_variable > const & notifyConditionVariable() const
Condition variable accessor.
std::shared_ptr< CoreNode > clone() override
CoreDefaultExecutionPipeline clone used if it is embedded in another CoreDefaultExecutionPipeline.
Hedgehog core namespace.
Definition: core_execute.h:25
std::vector< int > const & deviceIds() const
Device ids accessor.
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
void removeReceiver(CoreReceiver< GraphOutput > *receiver) override
Remove a receiver from the set of receivers.
bool receiversEmpty() final
Test emptiness of all receivers.
CoreDefaultExecutionPipelineExecute(std::string_view const &name, AbstractExecutionPipeline< GraphOutput, GraphInputs... > *executionPipeline, std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> coreBaseGraph, size_t numberGraphs, std::vector< int > const &deviceIds, bool automaticStart=false)
CoreDefaultExecutionPipelineExecute constructor.
Core associated to the Graph.
Definition: core_graph.h:63
bool callCanTerminate(bool lock) override
Can terminate for the ep, specialised to not call user&#39;s defined one.
void removeSender(CoreSender< GraphInput > *sender) final
Remove a CoreSender from the graph.
bool automaticStart() const
Automatic start property accessor.
Definition: core_task.h:98
void receive(std::shared_ptr< NodeInputs > data) final
Receive a data from a CoreQueueSender, and store it into the waiting queue.
std::shared_ptr< NvtxProfiler > & nvtxProfiler()
NVTX profiler accessor.
Definition: core_task.h:102
std::set< CoreReceiver< GraphInput > * > receivers() override
Get a set of CoreReceiver built from the input nodes.
Behavior definition for dispatching data to a Graph managed by an AbstractExecutionPipeline.
Definition: switch_rule.h:30
std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > baseCoreGraph()
Return the core of the base graph.
void removeSwitchReceiver(CoreGraphReceiver< GraphInput > *coreGraphReceiver)
Remove a specific Receiver of all input inside graph.
Middle class used to propose a default definition of CoreExecute::callExecute for DefaultExecutionPip...