19#ifndef HEDGEHOG_CORE_EXECUTION_PIPELINE_H_ 
   20#define HEDGEHOG_CORE_EXECUTION_PIPELINE_H_ 
   24#include "../../tools/traits.h" 
   25#include "../../api/graph/graph.h" 
   27#include "../abstractions/base/node/execution_pipeline_node_abstraction.h" 
   28#include "../abstractions/node/execution_pipeline_inputs_management_abstraction.h" 
   29#include "../abstractions/node/execution_pipeline_outputs_management_abstraction.h" 
   34#ifndef DOXYGEN_SHOULD_SKIP_THIS 
   38template<
size_t Separator, 
class ...AllTypes>
 
   39class AbstractExecutionPipeline;
 
   46template<
size_t Separator, 
class ...AllTypes>
 
   48    tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t<
tool::Inputs<Separator, AllTypes...>>;
 
   51template<
size_t Separator, 
class ...AllTypes>
 
   53    tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t<
tool::Outputs<Separator, AllTypes...>>;
 
   58template<
size_t Separator, 
class ...AllTypes>
 
   62      public EPIM<Separator, AllTypes...>,
 
   63      public EPOM<Separator, AllTypes...> {
 
   67  std::vector<std::shared_ptr<
CoreGraph<Separator, AllTypes...>>>
 
   83      std::vector<int> deviceIds,
 
   84      std::string 
const name = 
"Execution pipeline")
 
   86        EPIM<Separator, AllTypes...>(executionPipeline),
 
   87        EPOM<Separator, AllTypes...>(),
 
  104      std::string 
const name = 
"Execution pipeline")
 
  106        EPIM<Separator, AllTypes...>(executionPipeline),
 
  107        EPOM<Separator, AllTypes...>(),
 
  131    std::chrono::time_point<std::chrono::system_clock>
 
  135    using Inputs_t = 
typename EPIM<Separator, AllTypes...>::inputs_t;
 
  136    using Indices = std::make_index_sequence<std::tuple_size_v<Inputs_t>>;
 
  141    while (!this->canTerminate(
true)) {
 
  143      start = std::chrono::system_clock::now();
 
  144      volatile bool canTerminate = this->wait();
 
  145      finish = std::chrono::system_clock::now();
 
  149      if (canTerminate) { 
break; }
 
  152      start = std::chrono::system_clock::now();
 
  153      this->operateReceivers<Inputs_t>(Indices{});
 
  154      finish = std::chrono::system_clock::now();
 
  166    this->disconnectSwitch();
 
  167    for (
auto &g : this->
coreGraphs_) { g->waitForTermination(); }
 
  182      for (
auto coreGraph : 
coreGraphs_) { coreGraph->visit(printer); }
 
  197    std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> minMaxExecDuration = {
 
  198        std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
 
  202      auto const minMaxGraph = graph->minMaxExecutionDuration();
 
  203      minMaxExecDuration.first = std::min(minMaxExecDuration.first, minMaxGraph.first);
 
  204      minMaxExecDuration.second = std::max(minMaxExecDuration.second, minMaxGraph.second);
 
  206    return minMaxExecDuration;
 
  211  [[nodiscard]] std::pair<std::chrono::nanoseconds, std::chrono::nanoseconds> 
minMaxWaitDuration()
 const override {
 
  213        std::chrono::nanoseconds::max(), std::chrono::nanoseconds::min()
 
  217      auto const minMaxGraph = graph->minMaxWaitDuration();
 
  230    if (coreGraph == 
nullptr) {
 
  231      throw std::runtime_error(
"An execution pipeline should be created with a valid graph (!= nullptr).");
 
  234      throw std::runtime_error(
"An execution pipeline should be created with a graph that is not already part of " 
  235                               "another graph or being executed.");
 
  238      throw std::runtime_error(
 
  239          "An execution pipeline should be created with a valid number of graph clones (!= 0) and with valid graph ids.");
 
  253    coreGraph->graphId(0);
 
  254    coreGraph->setInside();
 
  257      std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> correspondenceMap;
 
  258      auto clone = std::dynamic_pointer_cast<
CoreGraph<Separator, AllTypes...>>(
 
  259          coreGraph->clone(correspondenceMap)
 
  280  template<
class InputTypes, 
size_t ...Indices>
 
  282    (this->
template operateReceiver<std::tuple_element_t<Indices, InputTypes>>(), ...);
 
  295  template<
class Input>
 
  297    this->lockSlotMutex();
 
  299    if (!typedReceiver->empty()) {
 
  300      std::shared_ptr<Input> data = typedReceiver->
getInputData();
 
  301      this->unlockSlotMutex();
 
  304          std::static_pointer_cast<abstraction::ReceiverAbstraction<Input>>(coreGraph)->receive(data);
 
  305          std::static_pointer_cast<abstraction::SlotAbstraction>(coreGraph)->wakeUp();
 
  309      this->unlockSlotMutex();
 
  317      coreGraph->createInnerGroupsAndLaunchThreads(waitForInitialization);
 
  324  [[nodiscard]] std::string 
id()
 const override { 
return this->coreSwitch()->id(); }
 
  329  std::shared_ptr<abstraction::NodeAbstraction> 
clone(
 
  330      std::map<
NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap)
 override {
 
  331    return std::make_shared<CoreExecutionPipeline>(
 
  340    this->duplicateOutputEdges(mapping);
 
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::ExecutionPipelineInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > EPIM
Type alias for an ExecutionPipelineInputsManagementAbstraction from the list of template parameters.
tool::ExecutionPipelineOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > EPOM
Type alias for an ExecutionPipelineOutputsManagementAbstraction from the list of template parameters.
Execution pipeline abstraction.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
virtual void printExecutionPipelineFooter()=0
Print execution pipeline footer.
virtual void printExecutionPipelineHeader(core::abstraction::ExecutionPipelineNodeAbstraction const *ep, core::abstraction::NodeAbstraction const *switchNode)=0
Print execution pipeline header.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
Core's abstraction to receive a piece of data.
std::shared_ptr< Input > getInputData()
Get an input data from the concrete receiver implementation.
Abstraction specialized for the execution pipeline.
ExecutionPipelineNodeAbstraction(std::string const &name, behavior::Node *node)
Constructor using the node name.
Base graph node abstraction.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
virtual size_t graphId() const
Get the graph identifier (got from belonging graph)
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
GraphNodeAbstraction * belongingGraph() const
Belonging graph accessor.
std::string const & name() const
Accessor to the core's name.
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
bool isActive() const
Accessor to task status.
void launchGraphThreads(bool waitForInitialization) override
Create graph's inner groups and launch graph's threads.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, size_t numberGraphs, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph, and the number of graphs in ...
void operateReceiver()
Operate the receiver for a specific input type.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxExecutionDuration() const override
Getter to the min max execution duration from the nodes inside the graphs in the execution pipeline.
AbstractExecutionPipeline< Separator, AllTypes... > *const executionPipeline_
Pointer to the execution pipeline node.
void postRun() override
Post run logic, disconnects the switch and waits for each graph to terminate.
std::pair< std::chrono::nanoseconds, std::chrono::nanoseconds > minMaxWaitDuration() const override
Getter to the min max wait duration from the nodes inside the graphs in the execution pipeline.
std::vector< int > deviceIds_
Device ids matching the core graphs.
void initializeCoreExecutionPipeline(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Initialize the execution pipeline.
std::string extraPrintingInformation() const override
Extra printing information for the execution pipeline.
void visit(Printer *printer) override
Visit an execution pipeline.
void operateReceivers(std::index_sequence< Indices... >)
Operate the receivers for all input types.
void run() override
Main core execution pipeline logic.
std::vector< std::shared_ptr< CoreGraph< Separator, AllTypes... > > > coreGraphs_
Vector of CoreGraph handled by the execution pipeline.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate an execution pipeline when it is part of another graph in an execution pip...
void registerNode(abstraction::GraphNodeAbstraction *belongingGraph) override
Register the execution pipeline into the belongingGraph.
~CoreExecutionPipeline() override=default
Default destructor.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the execution pipeline edge.
std::string id() const override
Get the exec pipeline id.
void connectCoreGraphs()
Connect all of the core graphs into the execution pipeline (connection to the switch and output of th...
void preRun() override
Do nothing as pre-run step.
void registerAndDuplicateGraph(std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph)
Register and duplicate the coreGraph in the execution pipeline.
CoreExecutionPipeline(AbstractExecutionPipeline< Separator, AllTypes... > *const &executionPipeline, std::shared_ptr< CoreGraph< Separator, AllTypes... > > const coreGraph, std::vector< int > deviceIds, std::string const name="Execution pipeline")
Constructor using a user-defined execution pipeline, the base CoreGraph and the deviceIds to determin...