19#ifndef HEDGEHOG_CORE_EXECUTION_PIPELINE_H_
20#define HEDGEHOG_CORE_EXECUTION_PIPELINE_H_
24#include "../../tools/traits.h"
25#include "../../api/graph/graph.h"
27#include "../abstractions/base/node/execution_pipeline_node_abstraction.h"
28#include "../abstractions/node/execution_pipeline_inputs_management_abstraction.h"
29#include "../abstractions/node/execution_pipeline_outputs_management_abstraction.h"
34#ifndef DOXYGEN_SHOULD_SKIP_THIS
38template<
size_t Separator,
class ...AllTypes>
39class AbstractExecutionPipeline;
46template<
size_t Separator,
class ...AllTypes>
48 tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t<
tool::Inputs<Separator, AllTypes...>>;
51template<
size_t Separator,
class ...AllTypes>
53 tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t<
tool::Outputs<Separator, AllTypes...>>;
58template<
size_t Separator,
class ...AllTypes>
62 public EPIM<Separator, AllTypes...>,
63 public EPOM<Separator, AllTypes...> {
67 std::vector<std::shared_ptr<
CoreGraph<Separator, AllTypes...>>>
83 std::vector<int> deviceIds,
84 std::string
const name =
"Execution pipeline")
86 EPIM<Separator, AllTypes...>(executionPipeline),
87 EPOM<Separator, AllTypes...>(),
104 std::string
const name =
"Execution pipeline")
106 EPIM<Separator, AllTypes...>(executionPipeline),
107 EPOM<Separator, AllTypes...>(),
131 std::chrono::time_point<std::chrono::system_clock>
135 using Inputs_t =
typename EPIM<Separator, AllTypes...>::inputs_t;
136 using Indices = std::make_index_sequence<std::tuple_size_v<Inputs_t>>;
141 while (!this->canTerminate(
true)) {
143 start = std::chrono::system_clock::now();
144 volatile bool canTerminate = this->wait();
145 finish = std::chrono::system_clock::now();
149 if (canTerminate) {
break; }
152 start = std::chrono::system_clock::now();
153 this->operateReceivers<Inputs_t>(Indices{});
154 finish = std::chrono::system_clock::now();
166 this->disconnectSwitch();
167 for (
auto &g : this->
coreGraphs_) { g->waitForTermination(); }
182 for (
auto coreGraph :
coreGraphs_) { coreGraph->visit(printer); }
197 std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxExecDuration = {
198 std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
202 auto const minMaxGraph = graph->minMaxExecutionDuration();
203 minMaxExecDuration.first = std::min(minMaxExecDuration.first, minMaxGraph.first);
204 minMaxExecDuration.second = std::max(minMaxExecDuration.second, minMaxGraph.second);
206 return minMaxExecDuration;
211 [[nodiscard]] std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds>
minMaxWaitDuration()
const override {
213 std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
217 auto const minMaxGraph = graph->minMaxWaitDuration();
230 if (coreGraph ==
nullptr) {
231 throw std::runtime_error(
"An execution pipeline should be created with a valid graph (!= nullptr).");
234 throw std::runtime_error(
"An execution pipeline should be created with a graph that is not already part of "
235 "another graph or being executed.");
238 throw std::runtime_error(
239 "An execution pipeline should be created with a valid number of graph clones (!= 0) and with valid graph ids.");
253 coreGraph->graphId(0);
254 coreGraph->setInside();
257 std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> correspondenceMap;
258 auto clone = std::dynamic_pointer_cast<
CoreGraph<Separator, AllTypes...>>(
259 coreGraph->clone(correspondenceMap)
280 template<
class InputTypes,
size_t ...Indices>
282 (this->
template operateReceiver<std::tuple_element_t<Indices, InputTypes>>(), ...);
295 template<
class Input>
297 this->lockSlotMutex();
299 if (!typedReceiver->empty()) {
300 std::shared_ptr<Input> data = typedReceiver->
getInputData();
301 this->unlockSlotMutex();
304 std::static_pointer_cast<abstraction::ReceiverAbstraction<Input>>(coreGraph)->receive(data);
305 std::static_pointer_cast<abstraction::SlotAbstraction>(coreGraph)->wakeUp();
309 this->unlockSlotMutex();
317 coreGraph->createInnerGroupsAndLaunchThreads(waitForInitialization);
324 [[nodiscard]] std::string
id()
const override {
return this->coreSwitch()->id(); }
329 std::shared_ptr<abstraction::NodeAbstraction>
clone(
330 std::map<
NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap)
override {
331 return std::make_shared<CoreExecutionPipeline>(
340 this->duplicateOutputEdges(mapping);
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > EPIM
Type alias for an ExecutionPipelineInputsManagementAbstraction from the list of template parameters.
tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > EPOM
Type alias for an ExecutionPipelineOutputsManagementAbstraction from the list of template parameters.
Execution pipeline abstraction.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
virtual void printExecutionPipelineFooter()=0
Print execution pipeline footer.
virtual void printExecutionPipelineHeader(core::abstraction::ExecutionPipelineNodeAbstraction const *ep, core::abstraction::NodeAbstraction const *switchNode)=0
Print execution pipeline header.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
Core's abstraction to receive a piece of data.
std::shared_ptr< Input > getInputData()
Get an input data from the concrete receiver implementation.
Abstraction specialized for the execution pipeline.
ExecutionPipelineNodeAbstraction(std::string const &name, behavior::Node *node)
Constructor using the node name.
Base graph node abstraction.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
virtual size_t graphId() const
Get the graph identifier (got from belonging graph)
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
GraphNodeAbstraction * belongingGraph() const
Belonging graph accessor.
std::string const & name() const
Accessor to the core's name.
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
bool isActive() const
Accessor to task status.
void launchGraphThreads(bool waitForInitialization) override
Create graph's inner groups and launch graph's threads.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, size_t numberGraphs, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph, and the number of graphs in ...
void operateReceiver()
Operate the receiver for a specific input type.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxExecutionDuration() const override
Getter to the min max execution duration from the nodes inside the graphs in the execution pipeline.
AbstractExecutionPipeline< Separator, AllTypes... > *const executionPipeline_
Pointer to the execution pipeline node.
void postRun() override
Post run logic, disconnects the switch and waits for each graph to terminate.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxWaitDuration() const override
Getter to the min max wait duration from the nodes inside the graphs in the execution pipeline.
std::vector< int > deviceIds_
Device ids matching the core graphs.
void initializeCoreExecutionPipeline(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Initialize the execution pipeline.
std::string extraPrintingInformation() const override
Extra printing information for the execution pipeline.
void visit(Printer *printer) override
Visit an execution pipeline.
void operateReceivers(std::index_sequence< Indices... >)
Operate the receivers for all input types.
void run() override
Main core execution pipeline logic.
std::vector< std::shared_ptr< CoreGraph< Separator, AllTypes... > > > coreGraphs_
Vector of CoreGraph handled by the execution pipeline.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate an execution pipeline when it is part of another graph in an execution pip...
void registerNode(abstraction::GraphNodeAbstraction *belongingGraph) override
Register the execution pipeline into the belongingGraph.
~CoreExecutionPipeline() override=default
Default destructor.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the execution pipeline edge.
std::string id() const override
Get the exec pipeline id.
void connectCoreGraphs()
Connect all of the core graphs into the execution pipeline (connection to the switch and output of th...
void preRun() override
Do nothing as pre-run step.
void registerAndDuplicateGraph(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Register and duplicate the coreGraph in the execution pipeline.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, std::vector< int > deviceIds, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph and the deviceIds to determin...