Hedgehog  3.1.0
A library to generate hybrid pipeline workflow systems
Loading...
Searching...
No Matches
core_execution_pipeline.h
Go to the documentation of this file.
1// NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2// software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3// derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4// or works. Modified works should carry a notice stating that you changed the software and should note the date and
5// nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6// source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7// EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9// WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10// CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11// THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12// are solely responsible for determining the appropriateness of using and distributing the software and you assume
13// all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14// with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15// operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16// damage to property. The software developed by NIST employees is not subject to copyright protection within the
17// United States.
18
19#ifndef HEDGEHOG_CORE_EXECUTION_PIPELINE_H_
20#define HEDGEHOG_CORE_EXECUTION_PIPELINE_H_
21
22#include <utility>
23
24#include "../../tools/traits.h"
25#include "../../api/graph/graph.h"
26
27#include "../abstractions/base/node/execution_pipeline_node_abstraction.h"
28#include "../abstractions/node/execution_pipeline_inputs_management_abstraction.h"
29#include "../abstractions/node/execution_pipeline_outputs_management_abstraction.h"
30
32namespace hh {
33
34#ifndef DOXYGEN_SHOULD_SKIP_THIS
38template<size_t Separator, class ...AllTypes>
39class AbstractExecutionPipeline;
40#endif //DOXYGEN_SHOULD_SKIP_THIS
41
43namespace core {
44
46template<size_t Separator, class ...AllTypes>
47using EPIM =
48 tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t<tool::Inputs<Separator, AllTypes...>>;
49
51template<size_t Separator, class ...AllTypes>
52using EPOM =
53 tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t<tool::Outputs<Separator, AllTypes...>>;
54
58template<size_t Separator, class ...AllTypes>
62 public EPIM<Separator, AllTypes...>,
63 public EPOM<Separator, AllTypes...> {
64 private:
65 AbstractExecutionPipeline<Separator, AllTypes...> *const
67 std::vector<std::shared_ptr<CoreGraph<Separator, AllTypes...>>>
69 std::vector<int>
71
72 public:
82 std::shared_ptr<CoreGraph<Separator, AllTypes...>> const coreGraph,
83 std::vector<int> deviceIds,
84 std::string const name = "Execution pipeline")
85 : abstraction::ExecutionPipelineNodeAbstraction(name, executionPipeline),
86 EPIM<Separator, AllTypes...>(executionPipeline),
87 EPOM<Separator, AllTypes...>(),
88 executionPipeline_(executionPipeline),
89 deviceIds_(std::move(deviceIds)) {
91 }
92
102 std::shared_ptr<CoreGraph<Separator, AllTypes...>> const coreGraph,
103 size_t numberGraphs,
104 std::string const name = "Execution pipeline")
105 : abstraction::ExecutionPipelineNodeAbstraction(name, executionPipeline),
106 EPIM<Separator, AllTypes...>(executionPipeline),
107 EPOM<Separator, AllTypes...>(),
108 executionPipeline_(executionPipeline),
109 deviceIds_(std::vector<int>(numberGraphs)) {
110 std::iota(deviceIds_.begin(), deviceIds_.end(), 0);
112 }
113
115 ~CoreExecutionPipeline() override = default;
116
118 void preRun() override {}
119
130 void run() override {
131 std::chrono::time_point<std::chrono::system_clock>
132 start,
133 finish;
134
135 using Inputs_t = typename EPIM<Separator, AllTypes...>::inputs_t;
136 using Indices = std::make_index_sequence<std::tuple_size_v<Inputs_t>>;
137
138 this->isActive(true);
139
140 // Actual computation loop
141 while (!this->canTerminate(true)) {
142 // Wait for a data to arrive or termination
143 start = std::chrono::system_clock::now();
144 volatile bool canTerminate = this->wait();
145 finish = std::chrono::system_clock::now();
146 this->incrementWaitDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
147
148 // If te can terminate break the loop early
149 if (canTerminate) { break; }
150
151 // Operate the connectedReceivers to get a data and send it to execute
152 start = std::chrono::system_clock::now();
153 this->operateReceivers<Inputs_t>(Indices{});
154 finish = std::chrono::system_clock::now();
155 this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
156 }
157
158 // Do the shutdown phase
159 this->postRun();
160 // Wake up node that this is linked to
161 this->wakeUp();
162 }
163
165 void postRun() override {
166 this->disconnectSwitch();
167 for (auto &g : this->coreGraphs_) { g->waitForTermination(); }
168 this->isActive(false);
169 }
170
173 [[nodiscard]] std::string extraPrintingInformation() const override { return {}; }
174
177 void visit(Printer *printer) override {
178 // Register the node
179 if (printer->registerNode(this)) {
180 printer->printExecutionPipelineHeader(this, this->coreSwitch());
182 for (auto coreGraph : coreGraphs_) { coreGraph->visit(printer); }
184 }
185 }
186
190 NodeAbstraction::registerNode(belongingGraph);
191 for (auto coreGraph : coreGraphs_) { coreGraph->registerNode(belongingGraph); }
192 }
193
196 [[nodiscard]] std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxExecutionDuration() const override {
197 std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxExecDuration = {
198 std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
199 };
200
201 for(auto const & graph : coreGraphs_){
202 auto const minMaxGraph = graph->minMaxExecutionDuration();
203 minMaxExecDuration.first = std::min(minMaxExecDuration.first, minMaxGraph.first);
204 minMaxExecDuration.second = std::max(minMaxExecDuration.second, minMaxGraph.second);
205 }
206 return minMaxExecDuration;
207 }
208
211 [[nodiscard]] std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxWaitDuration() const override {
212 std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxWaitDuration = {
213 std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
214 };
215
216 for(auto const & graph : coreGraphs_){
217 auto const minMaxGraph = graph->minMaxWaitDuration();
218 minMaxWaitDuration.first = std::min(minMaxWaitDuration.first, minMaxGraph.first);
219 minMaxWaitDuration.second = std::max(minMaxWaitDuration.second, minMaxGraph.second);
220 }
221 return minMaxWaitDuration;
222 }
223
224 private:
230 if (coreGraph == nullptr) {
231 throw std::runtime_error("An execution pipeline should be created with a valid graph (!= nullptr).");
232 }
233 if (coreGraph->graphStatus() != abstraction::GraphNodeAbstraction::INIT) {
234 throw std::runtime_error("An execution pipeline should be created with a graph that is not already part of "
235 "another graph or being executed.");
236 }
237 if (deviceIds_.empty()) {
238 throw std::runtime_error(
239 "An execution pipeline should be created with a valid number of graph clones (!= 0) and with valid graph ids.");
240 }
241 coreGraphs_.reserve(deviceIds_.size());
242 registerAndDuplicateGraph(coreGraph);
245 }
246
251 void registerAndDuplicateGraph(std::shared_ptr<CoreGraph<Separator, AllTypes...>> const coreGraph) {
252 coreGraph->deviceId(deviceIds_.at(0));
253 coreGraph->graphId(0);
254 coreGraph->setInside();
255 coreGraphs_.push_back(coreGraph);
256 for (size_t graphId = 1; graphId < this->deviceIds_.size(); ++graphId) {
257 std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> correspondenceMap;
258 auto clone = std::dynamic_pointer_cast<CoreGraph<Separator, AllTypes...>>(
259 coreGraph->clone(correspondenceMap)
260 );
261 clone->deviceId(deviceIds_.at(graphId));
262 clone->graphId(graphId);
263 clone->setInside();
264 coreGraphs_.push_back(clone);
265 }
266 }
267
271 for (auto &coreGraph : coreGraphs_) {
274 }
275 }
276
280 template<class InputTypes, size_t ...Indices>
281 void operateReceivers(std::index_sequence<Indices...>) {
282 (this->template operateReceiver<std::tuple_element_t<Indices, InputTypes>>(), ...);
283 }
284
295 template<class Input>
297 this->lockSlotMutex();
298 auto typedReceiver = static_cast<abstraction::ReceiverAbstraction<Input> *>(this);
299 if (!typedReceiver->empty()) {
300 std::shared_ptr<Input> data = typedReceiver->getInputData();
301 this->unlockSlotMutex();
302 for (auto coreGraph : this->coreGraphs_) {
303 if (EPIM<Separator, AllTypes...>::callSendToGraph(data, coreGraph->graphId())) {
304 std::static_pointer_cast<abstraction::ReceiverAbstraction<Input>>(coreGraph)->receive(data);
305 std::static_pointer_cast<abstraction::SlotAbstraction>(coreGraph)->wakeUp();
306 }
307 }
308 } else {
309 this->unlockSlotMutex();
310 }
311 }
312
315 void launchGraphThreads(bool waitForInitialization) override {
316 for (auto coreGraph : this->coreGraphs_) {
317 coreGraph->createInnerGroupsAndLaunchThreads(waitForInitialization);
318 }
319 }
320
324 [[nodiscard]] std::string id() const override { return this->coreSwitch()->id(); }
325
329 std::shared_ptr<abstraction::NodeAbstraction> clone(
330 std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap) override {
331 return std::make_shared<CoreExecutionPipeline>(
332 this->executionPipeline_,
333 std::dynamic_pointer_cast<CoreGraph<Separator, AllTypes...>>(this->coreGraphs_[0]->clone(correspondenceMap)),
334 this->deviceIds_);
335 }
336
339 void duplicateEdge(std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &mapping) override {
340 this->duplicateOutputEdges(mapping);
341 }
342
343};
344
345}
346}
347#endif //HEDGEHOG_CORE_EXECUTION_PIPELINE_H_
Hedgehog main namespace.
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > EPIM
Type alias for an ExecutionPipelineInputsManagementAbstraction from the list of template parameters.
tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > EPOM
Type alias for an ExecutionPipelineOutputsManagementAbstraction from the list of template parameters.
Execution pipeline abstraction.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
Definition: printer.h:52
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
Definition: printer.h:113
virtual void printExecutionPipelineFooter()=0
Print execution pipeline footer.
virtual void printExecutionPipelineHeader(core::abstraction::ExecutionPipelineNodeAbstraction const *ep, core::abstraction::NodeAbstraction const *switchNode)=0
Print execution pipeline header.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
Core's abstraction to receive a piece of data.
std::shared_ptr< Input > getInputData()
Get an input data from the concrete receiver implementation.
ExecutionPipelineNodeAbstraction(std::string const &name, behavior::Node *node)
Constructor using the node name.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
virtual size_t graphId() const
Get the graph identifier (got from belonging graph)
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
GraphNodeAbstraction * belongingGraph() const
Belonging graph accessor.
std::string const & name() const
Accessor to the core's name.
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
bool isActive() const
Accessor to task status.
void launchGraphThreads(bool waitForInitialization) override
Create graph's inner groups and launch graph's threads.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, size_t numberGraphs, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph, and the number of graphs in ...
void operateReceiver()
Operate the receiver for a specific input type.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxExecutionDuration() const override
Getter to the min max execution duration from the nodes inside the graphs in the execution pipeline.
AbstractExecutionPipeline< Separator, AllTypes... > *const executionPipeline_
Pointer to the execution pipeline node.
void postRun() override
Post run logic, disconnects the switch and waits for each graph to terminate.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxWaitDuration() const override
Getter to the min max wait duration from the nodes inside the graphs in the execution pipeline.
std::vector< int > deviceIds_
Device ids matching the core graphs.
void initializeCoreExecutionPipeline(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Initialize the execution pipeline.
std::string extraPrintingInformation() const override
Extra printing information for the execution pipeline.
void visit(Printer *printer) override
Visit an execution pipeline.
void operateReceivers(std::index_sequence< Indices... >)
Operate the receivers for all input types.
void run() override
Main core execution pipeline logic.
std::vector< std::shared_ptr< CoreGraph< Separator, AllTypes... > > > coreGraphs_
Vector of CoreGraph handled by the execution pipeline.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate an execution pipeline when it is part of another graph in an execution pip...
void registerNode(abstraction::GraphNodeAbstraction *belongingGraph) override
Register the execution pipeline into the belongingGraph.
~CoreExecutionPipeline() override=default
Default destructor.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the execution pipeline edge.
std::string id() const override
Get the exec pipeline id.
void connectCoreGraphs()
Connect all of the core graphs into the execution pipeline (connection to the switch and output of th...
void preRun() override
Do nothing as pre-run step.
void registerAndDuplicateGraph(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Register and duplicate the coreGraph in the execution pipeline.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, std::vector< int > deviceIds, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph and the deviceIds to determin...
Graph core.
Definition: core_graph.h:81