21#ifndef HEDGEHOG_CORE_GRAPH_H 
   22#define HEDGEHOG_CORE_GRAPH_H 
   27#include "../abstractions/base/clonable_abstraction.h" 
   28#include "../abstractions/base/cleanable_abstraction.h" 
   29#include "../abstractions/base/node/graph_node_abstraction.h" 
   31#include "../abstractions/node/graph_inputs_management_abstraction.h" 
   32#include "../abstractions/node/graph_outputs_management_abstraction.h" 
   34#include "../abstractions/base/node/execution_pipeline_node_abstraction.h" 
   36#include "../../tools/traits.h" 
   37#include "../../tools/meta_functions.h" 
   38#include "../../api/graph/scheduler.h" 
   39#include "../../api/printer/printer.h" 
   40#include "../../api/graph/default_scheduler.h" 
   45#ifndef DOXYGEN_SHOULD_SKIP_THIS 
   49template<
size_t Separator, 
class ...AllTypes>
 
   56#ifndef DOXYGEN_SHOULD_SKIP_THIS 
   60template<
size_t Separator, 
class ...AllTypes>
 
   61class CoreExecutionPipeline;
 
   65template<
size_t Separator, 
class...AllTypes>
 
   66using GIM = tool::GraphInputsManagementAbstractionTypeDeducer_t<
tool::Inputs<Separator, AllTypes...>>;
 
   69template<
size_t Separator, 
class...AllTypes>
 
   70using GOM = tool::GraphOutputsManagementAbstractionTypeDeducer_t<
tool::Outputs<Separator, AllTypes...>>;
 
   75template<
size_t Separator, 
class ...AllTypes>
 
   80    public GIM<Separator, AllTypes...>,
 
   81    public GOM<Separator, AllTypes...> {
 
   85  std::unique_ptr<Scheduler> 
const 
   95                     std::unique_ptr<Scheduler> scheduler,
 
   98    this->source()->registerNode(
this);
 
   99    this->sink()->registerNode(
this);
 
  109    this->source()->registerNode(
this);
 
  110    this->sink()->registerNode(
this);
 
  123      class CoreInputTypes,
 
  128    using Input_t = 
tool::Intersect_t<CoreInputTypes, 
typename GIM<Separator, AllTypes...>::inputs_t>;
 
  129    using Indices = std::make_index_sequence<std::tuple_size_v<Input_t>>;
 
  132      std::ostringstream oss;
 
  133      oss << 
"The receiver node " << core->name()
 
  134          << 
"has a malformed core (missing inheritance to SlotAbstraction)";
 
  135      throw (std::runtime_error(oss.str()));
 
  138    callAddInputNodeToGraph<Input_t>(core, Indices{});
 
  149      class CoreInputTypes,
 
  153          typename GIM<Separator, AllTypes...>::inputs_t> InputCore>
 
  156    using input_t = 
tool::Intersect_t<CoreInputTypes, 
typename GIM<Separator, AllTypes...>::inputs_t>;
 
  157    using Indices = std::make_index_sequence<std::tuple_size_v<input_t>>;
 
  159    testAbstractReceivers<input_t>(core, Indices{});
 
  162    this->
template addInputNodeToGraph<InputType>(core);
 
  170      class CoreOutputTypes,
 
  174    using CommonTypes = 
tool::Intersect_t<CoreOutputTypes, 
typename GOM<Separator, AllTypes...>::outputs_t>;
 
  175    using Indices = std::make_index_sequence<std::tuple_size_v<CommonTypes>>;
 
  177    testAbstractSenders<CoreOutputTypes>(core, indices);
 
  179    callAddOutputNodeToGraph<CommonTypes>(core, indices);
 
  187  template<
class OutputType,
 
  188      class CoreOutputTypes,
 
  190          OutputType, CoreOutputTypes, 
typename GOM<Separator, AllTypes...
 
  191          >::outputs_t> OutputCore>
 
  195    using OutputType_t = std::tuple<OutputType>;
 
  196    using IndicesCommonTypes = std::make_index_sequence<std::tuple_size_v<OutputType_t>>;
 
  197    IndicesCommonTypes indices{};
 
  198    testAbstractSenders<OutputType_t>(core, indices);
 
  201    this->
template addOutputNodeToGraph<OutputType>(core);
 
  209  template<
class OutputTypesSenderTuple, 
class InputTypeReceiverTuple>
 
  215    using IndicesCommonTypes = std::make_index_sequence<std::tuple_size_v<CommonTypes>>;
 
  216    IndicesCommonTypes indices{};
 
  219        std::tuple_size_v<CommonTypes> != 0,
 
  220        "When adding an edge between two nodes, they should share at least one type." 
  223    testAbstractReceivers<CommonTypes>(receiverCore, indices);
 
  224    testAbstractSenders<CommonTypes>(senderCore, indices);
 
  230    drawEdges<CommonTypes>(senderCore, receiverCore, indices);
 
  239  template<
class CommonType,
 
  240      class OutputTypesSenderTuple,
 
  241      class InputTypeReceiverTuple>
 
  244    using CommonType_t = std::tuple<CommonType>;
 
  245    using Indices = std::make_index_sequence<std::tuple_size_v<CommonType_t>>;
 
  250        std::tuple_size_v<CommonTypes> != 0,
 
  251        "When adding an edge between two nodes, they should share at least one type.");
 
  253        tool::isContainedInTuple_v<CommonType, CommonTypes>,
 
  254        "The type should be part of the types shared between the two cores that you want to connect." 
  257    testAbstractReceivers<CommonType_t>(receiverCore, indices);
 
  258    testAbstractSenders<CommonType_t>(senderCore, indices);
 
  264    drawEdge<CommonType>(senderCore, receiverCore);
 
  274    auto finishCreationTime = std::chrono::system_clock::now();
 
  283    this->terminateSource();
 
  290    std::chrono::time_point<std::chrono::system_clock>
 
  291        endExecutionTimeStamp = std::chrono::system_clock::now();
 
  302    this->sendInputDataToSource(data);
 
  310      this->printSource(printer);
 
  311      this->printSink(printer);
 
  314        if (insideNodeGroups.second.size() == 0) {
 
  316            printableNode->visit(printer);
 
  318          printer->
printGroup(insideNodeGroups.first, insideNodeGroups.second);
 
  330      std::unordered_set<behavior::Cleanable *> cleanableSet;
 
  332        if (
auto cleanableReresentative =
 
  334          cleanableReresentative->gatherCleanable(cleanableSet);
 
  335          for (
auto copy : insideNodeGroups.second) {
 
  340      for (
auto &cleanableNode : cleanableSet) {
 
  341        cleanableNode->clean();
 
  344      throw std::runtime_error(
"It is not possible to clean a graph while it is terminating.");
 
  351  [[nodiscard]] std::vector<std::pair<std::string const, std::string const>> 
ids()
 const override {
 
  352    std::vector<std::pair<std::string const, std::string const>> idInputNodesAndGroups;
 
  354      for (
auto inputNotifier : inputNodes->notifiers()) {
 
  356          for (
auto const &
id : inputNode->ids()) { idInputNodesAndGroups.push_back(
id); }
 
  358          throw std::runtime_error(
"An input node should derive from NodeAbstraction.");
 
  362    return idInputNodesAndGroups;
 
  376    this->disconnectSource();
 
  377    this->disconnectSink();
 
  385    if (core->isRegistered()) {
 
  386      if (core->belongingGraph() != 
this) {
 
  387        std::ostringstream oss;
 
  388        oss << 
"You can not modify a graph that is connected inside another graph.";
 
  389        throw (std::runtime_error(oss.str()));
 
  393        core->registerNode(
this);
 
  396        std::ostringstream oss;
 
  397        oss << 
"You can not add a graph to itself.";
 
  398        throw (std::runtime_error(oss.str()));
 
  407  template<
class InputTypes, 
size_t ...Indices>
 
  409    (this->
template addInputNodeToGraph<std::tuple_element_t<Indices, InputTypes>>(core), ...);
 
  416  template<
class OutputTypes, 
size_t ...Indices>
 
  418    (this->
template addOutputNodeToGraph<std::tuple_element_t<Indices, OutputTypes>>(core), ...);
 
  429    if (notifierAbstraction == 
nullptr) {
 
  430      std::ostringstream oss;
 
  431      oss << 
"The sender node " << senderCore->name()
 
  432          << 
"has a malformed core (missing inheritance to NotifierAbstraction)";
 
  433      throw (std::runtime_error(oss.str()));
 
  436    if (slotAbstraction == 
nullptr) {
 
  437      std::ostringstream oss;
 
  438      oss << 
"The receiver node " << receiverCore->name()
 
  439          << 
"has a malformed core (missing inheritance to SlotAbstraction)";
 
  440      throw (std::runtime_error(oss.str()));
 
  443    for (
auto notifier : notifierAbstraction->notifiers()) {
 
  444      for (
auto slot : slotAbstraction->slots()) {
 
  445        notifier->addSlot(slot);
 
  455  template<
class CommonType>
 
  460    assert(senderAbstraction != 
nullptr && receiverAbstraction != 
nullptr);
 
  462    for (
auto sender : senderAbstraction->senders()) {
 
  463      for (
auto receiver : receiverAbstraction->receivers()) {
 
  464        sender->addReceiver(receiver);
 
  465        receiver->addSender(sender);
 
  475  template<
class CommonTypes, 
size_t ...Indexes>
 
  479      std::index_sequence<Indexes...>) {
 
  480    (drawEdge<std::tuple_element_t<Indexes, CommonTypes>>(senderCore, receiverCore), ...);
 
  488  template<
class TupleInputs, 
size_t... Indices>
 
  492        == 
nullptr) || ... )) {
 
  493      std::ostringstream oss;
 
  494      oss << 
"The node " << core->name()
 
  495          << 
" does not have a well defined core (missing inheritance from ReceiverAbstraction).";
 
  496      throw std::runtime_error(oss.str());
 
  505  template<
class TupleOutputs, 
size_t... Indices>
 
  509        == 
nullptr) || ... )) {
 
  510      std::ostringstream oss;
 
  511      oss << 
"The node " << core->name()
 
  512          << 
" does not have a well defined core (missing inheritance from SenderAbstraction).";
 
  513      throw std::runtime_error(oss.str());
 
  521      std::ostringstream oss;
 
  522      oss << 
"You can not modify a graph that is connected inside another graph: " << funcName;
 
  523      throw (std::runtime_error(oss.str()));
 
  532        copyableNode->createGroup(*this->insideNodesAndGroups_);
 
  533        for (
auto © : insideNode.second) {
 
  534          copy->registerNode(
this);
 
  537        graph->createInnerGroupsAndLaunchThreads(waitForInitialization);
 
  539        ep->launchGraphThreads(waitForInitialization);
 
  548    std::set<NodeAbstraction *> nodeAbstractions;
 
  550      nodeAbstractions.insert(insideNode.first);
 
  552          insideNode.second.cbegin(), insideNode.second.cend(),
 
  553          [&nodeAbstractions](
auto &nodeAbstraction) { nodeAbstractions.insert(nodeAbstraction); });
 
  555    scheduler_->spawnThreads(nodeAbstractions, waitForInitialization);
 
  560  void gatherCleanable(std::unordered_set<hh::behavior::Cleanable *> &cleanableSet)
 override {
 
  562      if (
auto cleanableReresentative =
 
  564        cleanableReresentative->gatherCleanable(cleanableSet);
 
  565        for (
auto copy : insideNodeGroups.second) {
 
  567            cleanableCopy->gatherCleanable(cleanableSet);
 
  568          } 
else { 
throw std::runtime_error(
"A copy of a cleanable node should be a cleanable node."); }
 
  578                                                               std::shared_ptr<NodeAbstraction>> &correspondenceMap)
 override {
 
  579    return std::make_shared<
CoreGraph<Separator, AllTypes...>>(*
this, correspondenceMap);
 
  586                            std::map<
NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap) {
 
  588    std::shared_ptr<NodeAbstraction> nodeClone;
 
  590    for (
auto &originalNode : originalInsideNodes) {
 
  592        if (!correspondenceMap.contains(originalNode.first)) {
 
  593          nodeClone = originalAsClonable->clone(correspondenceMap);
 
  596          correspondenceMap.insert({originalNode.first, nodeClone});
 
  601    this->duplicateSourceEdges(rhs, correspondenceMap);
 
  602    this->duplicateSinkEdges(rhs, correspondenceMap);
 
  604    for (
auto &originalNode : originalInsideNodes) {
 
  606        originalAsClonable->duplicateEdge(correspondenceMap);
 
  614    this->duplicateOutputEdges(mapping);
 
typename internals::Intersect< Tuple1, Tuple2 >::type Intersect_t
Helper getting the intersection of types between two type tuples.
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::GraphOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > GOM
Type alias for an GraphOutputsManagementAbstraction from the list of template parameters.
tool::GraphInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > GIM
Type alias for an GraphInputsManagementAbstraction from the list of template parameters.
Hedgehog graph abstraction.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
virtual void printGraphFooter(core::abstraction::GraphNodeAbstraction const *graph)=0
Print graph footer.
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
virtual void printGroup(core::abstraction::NodeAbstraction *representative, std::vector< core::abstraction::NodeAbstraction * > const &group)=0
Print group of nodes.
virtual void printGraphHeader(core::abstraction::GraphNodeAbstraction const *graph)=0
Print graph header.
Behavior abstraction for the base node.
Abstraction for cores/nodes that can form groups.
Abstraction for cleanable core.
CleanableAbstraction()=default
Constructor used by the CoreGraph to have the handles to clean inner cleanable nodes.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
void storeClone(std::shared_ptr< abstraction::NodeAbstraction > const &clone)
Store a core clone.
Core abstraction to notify slots.
Core's abstraction to receive a piece of data.
Core abstraction to send data.
Core's abstraction to receive a signal.
std::set< NotifierAbstraction * > const & connectedNotifiers() const
Accessor to the NotifierAbstraction attached to this slot, protected with mutex.
void addNotifier(NotifierAbstraction *const notifier)
Add a NotifierAbstraction to this slot.
Abstraction specialized for the execution pipeline.
Base graph node abstraction.
std::chrono::nanoseconds const & graphConstructionDuration() const
Graph construction duration accessor.
GraphNodeAbstraction(std::string const &name)
Base graph abstraction.
Status graphStatus_
Group status.
std::chrono::time_point< std::chrono::system_clock > const & graphStartCreation() const
Graph start creation timestamp accessor.
std::unique_ptr< std::map< NodeAbstraction *, std::vector< NodeAbstraction * > > > const insideNodesAndGroups_
All nodes of the graph, mapped to their group nodes.
std::chrono::time_point< std::chrono::system_clock > const & startExecutionTimeStamp() const
Accessor to the starting execution timestamp.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
bool isRegistered() const
Accessor to registration flag.
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
std::string const & name() const
Accessor to the core's name.
PrintAbstraction, used to determine if the node should be visited by the printer and colored.
void cleanGraph()
Clean the graph.
std::unique_ptr< Scheduler > const scheduler_
Scheduler used by the graph.
void callAddInputNodeToGraph(NodeAbstraction *const core, std::index_sequence< Indices... >)
Call addInputNodeToGraph for all type-elements of a tuple.
void visit(Printer *printer) override
Visit the graph.
void joinThreads() override
Wait for the threads to join.
void drawEdges(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore, std::index_sequence< Indexes... >)
Do the actual typed connections between a sender and receiver.
void callAddOutputNodeToGraph(NodeAbstraction *const core, std::index_sequence< Indices... >)
Call addOutputNodeToGraph for all type-elements of a tuple.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone the current graph.
void setInputForAllCommonTypes(InputCore *const core)
Connect a core's node as input of the graph for all compatible types.
void createInnerGroupsAndLaunchThreads(bool waitForInitialization) override
Create the inner groups and launch the threads inside of a graph.
void executeGraph(bool waitForInitialization)
Execute the graph.
void testAbstractSenders(NodeAbstraction *const core, std::index_sequence< Indices... >)
Test a core if it can send data of specific types.
~CoreGraph() override=default
Default destructor.
void testAbstractReceivers(NodeAbstraction *const core, std::index_sequence< Indices... >)
Test a core if it can receives data of specific types.
void testRegistered(auto const &funcName)
Test if the graph has been registered.
void addEdgeForACommonType(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Connect two nodes together for a common type.
void setOutputForAllCommonTypes(OutputCore *const core)
Connect a core's node as output of the graph for all compatible types.
std::vector< std::pair< std::string const, std::string const > > ids() const override
Node ids [nodeId, nodeGroupId] accessor.
CoreGraph(CoreGraph const &rhs, std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap)
Copy constructor using an added correspondence map.
void broadcastAndNotifyAllInputNodes(std::shared_ptr< CompatibleInputType_t > &data)
Broadcast an input data to all inputs nodes.
behavior::Node * node() const override
Node accessor.
void connectNotifierToSlot(NodeAbstraction *senderCore, NodeAbstraction *receiverCore)
Connect a notifier to a slot, used when connecting two nodes.
void duplicateInsideNodes(CoreGraph const &rhs, std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap)
Clone all of the inside nodes.
void finishPushingData()
Indicate to the graph that no more input will be sent, trigger the termination of the graph.
CoreGraph(std::string const &name, std::unique_ptr< Scheduler > scheduler, hh::Graph< Separator, AllTypes... > *graph)
Core Graph constructor using the name and the scheduler.
hh::Graph< Separator, AllTypes... > *const graph_
Graph attached to this core.
void waitForTermination()
Wait for the graph to terminate.
void launchThreads(bool waitForInitialization)
Launch the threads with the scheduler.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the graph output's edges.
void gatherCleanable(std::unordered_set< hh::behavior::Cleanable * > &cleanableSet) override
Gather cleanable nodes.
void registerNodeInsideGraph(NodeAbstraction *const core) override
Register a node inside of a graph.
void addEdgeForAllCommonTypes(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Connect two nodes together for all common types.
void drawEdge(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Do the actual typed connection between a sender and receiver.
void setInputForACommonType(InputCore *const core)
Connect a core's node as input of the graph for an compatible input type.
void setOutputForACommonType(OutputCore *const core)
Connect a core's node as output of the graph for a compatible type.
void setInside() override
Set the graph as inside of another graph.
Test if an input type is in the list of input types (tuple)
Test if a core can be input of a graph (derives from core::abstraction::NodeAbstraction and shares at...
Test if a core can be input of a graph for a type (derives from core::abstraction::NodeAbstraction an...
Test if a core can be output of a graph (derives from core::abstraction::NodeAbstraction and shares a...
Test if a core can be output of a graph for a type (derives from core::abstraction::NodeAbstraction a...