21#ifndef HEDGEHOG_CORE_GRAPH_H
22#define HEDGEHOG_CORE_GRAPH_H
27#include "../abstractions/base/clonable_abstraction.h"
28#include "../abstractions/base/cleanable_abstraction.h"
29#include "../abstractions/base/node/graph_node_abstraction.h"
31#include "../abstractions/node/graph_inputs_management_abstraction.h"
32#include "../abstractions/node/graph_outputs_management_abstraction.h"
34#include "../abstractions/base/node/execution_pipeline_node_abstraction.h"
36#include "../../tools/traits.h"
37#include "../../tools/meta_functions.h"
38#include "../../api/graph/scheduler.h"
39#include "../../api/printer/printer.h"
40#include "../../api/graph/default_scheduler.h"
45#ifndef DOXYGEN_SHOULD_SKIP_THIS
49template<
size_t Separator,
class ...AllTypes>
56#ifndef DOXYGEN_SHOULD_SKIP_THIS
60template<
size_t Separator,
class ...AllTypes>
61class CoreExecutionPipeline;
65template<
size_t Separator,
class...AllTypes>
66using GIM = tool::GraphInputsManagementAbstractionTypeDeducer_t<
tool::Inputs<Separator, AllTypes...>>;
69template<
size_t Separator,
class...AllTypes>
70using GOM = tool::GraphOutputsManagementAbstractionTypeDeducer_t<
tool::Outputs<Separator, AllTypes...>>;
75template<
size_t Separator,
class ...AllTypes>
80 public GIM<Separator, AllTypes...>,
81 public GOM<Separator, AllTypes...> {
85 std::unique_ptr<Scheduler>
const
95 std::unique_ptr<Scheduler> scheduler,
98 this->source()->registerNode(
this);
99 this->sink()->registerNode(
this);
109 this->source()->registerNode(
this);
110 this->sink()->registerNode(
this);
123 class CoreInputTypes,
128 using Input_t =
tool::Intersect_t<CoreInputTypes,
typename GIM<Separator, AllTypes...>::inputs_t>;
129 using Indices = std::make_index_sequence<std::tuple_size_v<Input_t>>;
132 std::ostringstream oss;
133 oss <<
"The receiver node " << core->name()
134 <<
"has a malformed core (missing inheritance to SlotAbstraction)";
135 throw (std::runtime_error(oss.str()));
138 callAddInputNodeToGraph<Input_t>(core, Indices{});
149 class CoreInputTypes,
153 typename GIM<Separator, AllTypes...>::inputs_t> InputCore>
156 using input_t =
tool::Intersect_t<CoreInputTypes,
typename GIM<Separator, AllTypes...>::inputs_t>;
157 using Indices = std::make_index_sequence<std::tuple_size_v<input_t>>;
159 testAbstractReceivers<input_t>(core, Indices{});
162 this->
template addInputNodeToGraph<InputType>(core);
170 class CoreOutputTypes,
174 using CommonTypes =
tool::Intersect_t<CoreOutputTypes,
typename GOM<Separator, AllTypes...>::outputs_t>;
175 using Indices = std::make_index_sequence<std::tuple_size_v<CommonTypes>>;
177 testAbstractSenders<CoreOutputTypes>(core, indices);
179 callAddOutputNodeToGraph<CommonTypes>(core, indices);
187 template<
class OutputType,
188 class CoreOutputTypes,
190 OutputType, CoreOutputTypes,
typename GOM<Separator, AllTypes...
191 >::outputs_t> OutputCore>
195 using OutputType_t = std::tuple<OutputType>;
196 using IndicesCommonTypes = std::make_index_sequence<std::tuple_size_v<OutputType_t>>;
197 IndicesCommonTypes indices{};
198 testAbstractSenders<OutputType_t>(core, indices);
201 this->
template addOutputNodeToGraph<OutputType>(core);
209 template<
class OutputTypesSenderTuple,
class InputTypeReceiverTuple>
215 using IndicesCommonTypes = std::make_index_sequence<std::tuple_size_v<CommonTypes>>;
216 IndicesCommonTypes indices{};
219 std::tuple_size_v<CommonTypes> != 0,
220 "When adding an edge between two nodes, they should share at least one type."
223 testAbstractReceivers<CommonTypes>(receiverCore, indices);
224 testAbstractSenders<CommonTypes>(senderCore, indices);
230 drawEdges<CommonTypes>(senderCore, receiverCore, indices);
239 template<
class CommonType,
240 class OutputTypesSenderTuple,
241 class InputTypeReceiverTuple>
244 using CommonType_t = std::tuple<CommonType>;
245 using Indices = std::make_index_sequence<std::tuple_size_v<CommonType_t>>;
250 std::tuple_size_v<CommonTypes> != 0,
251 "When adding an edge between two nodes, they should share at least one type.");
253 tool::isContainedInTuple_v<CommonType, CommonTypes>,
254 "The type should be part of the types shared between the two cores that you want to connect."
257 testAbstractReceivers<CommonType_t>(receiverCore, indices);
258 testAbstractSenders<CommonType_t>(senderCore, indices);
264 drawEdge<CommonType>(senderCore, receiverCore);
274 auto finishCreationTime = std::chrono::system_clock::now();
283 this->terminateSource();
290 std::chrono::time_point<std::chrono::system_clock>
291 endExecutionTimeStamp = std::chrono::system_clock::now();
302 this->sendInputDataToSource(data);
310 this->printSource(printer);
311 this->printSink(printer);
314 if (insideNodeGroups.second.size() == 0) {
316 printableNode->visit(printer);
318 printer->
printGroup(insideNodeGroups.first, insideNodeGroups.second);
330 std::unordered_set<behavior::Cleanable *> cleanableSet;
332 if (
auto cleanableReresentative =
334 cleanableReresentative->gatherCleanable(cleanableSet);
335 for (
auto copy : insideNodeGroups.second) {
340 for (
auto &cleanableNode : cleanableSet) {
341 cleanableNode->clean();
344 throw std::runtime_error(
"It is not possible to clean a graph while it is terminating.");
351 [[nodiscard]] std::vector<std::pair<std::string const, std::string const>>
ids()
const override {
352 std::vector<std::pair<std::string const, std::string const>> idInputNodesAndGroups;
354 for (
auto inputNotifier : inputNodes->notifiers()) {
356 for (
auto const &
id : inputNode->ids()) { idInputNodesAndGroups.push_back(
id); }
358 throw std::runtime_error(
"An input node should derive from NodeAbstraction.");
362 return idInputNodesAndGroups;
376 this->disconnectSource();
377 this->disconnectSink();
385 if (core->isRegistered()) {
386 if (core->belongingGraph() !=
this) {
387 std::ostringstream oss;
388 oss <<
"You can not modify a graph that is connected inside another graph.";
389 throw (std::runtime_error(oss.str()));
393 core->registerNode(
this);
396 std::ostringstream oss;
397 oss <<
"You can not add a graph to itself.";
398 throw (std::runtime_error(oss.str()));
407 template<
class InputTypes,
size_t ...Indices>
409 (this->
template addInputNodeToGraph<std::tuple_element_t<Indices, InputTypes>>(core), ...);
416 template<
class OutputTypes,
size_t ...Indices>
418 (this->
template addOutputNodeToGraph<std::tuple_element_t<Indices, OutputTypes>>(core), ...);
429 if (notifierAbstraction ==
nullptr) {
430 std::ostringstream oss;
431 oss <<
"The sender node " << senderCore->name()
432 <<
"has a malformed core (missing inheritance to NotifierAbstraction)";
433 throw (std::runtime_error(oss.str()));
436 if (slotAbstraction ==
nullptr) {
437 std::ostringstream oss;
438 oss <<
"The receiver node " << receiverCore->name()
439 <<
"has a malformed core (missing inheritance to SlotAbstraction)";
440 throw (std::runtime_error(oss.str()));
443 for (
auto notifier : notifierAbstraction->notifiers()) {
444 for (
auto slot : slotAbstraction->slots()) {
445 notifier->addSlot(slot);
455 template<
class CommonType>
460 assert(senderAbstraction !=
nullptr && receiverAbstraction !=
nullptr);
462 for (
auto sender : senderAbstraction->senders()) {
463 for (
auto receiver : receiverAbstraction->receivers()) {
464 sender->addReceiver(receiver);
465 receiver->addSender(sender);
475 template<
class CommonTypes,
size_t ...Indexes>
479 std::index_sequence<Indexes...>) {
480 (drawEdge<std::tuple_element_t<Indexes, CommonTypes>>(senderCore, receiverCore), ...);
488 template<
class TupleInputs,
size_t... Indices>
492 ==
nullptr) || ... )) {
493 std::ostringstream oss;
494 oss <<
"The node " << core->name()
495 <<
" does not have a well defined core (missing inheritance from ReceiverAbstraction).";
496 throw std::runtime_error(oss.str());
505 template<
class TupleOutputs,
size_t... Indices>
509 ==
nullptr) || ... )) {
510 std::ostringstream oss;
511 oss <<
"The node " << core->name()
512 <<
" does not have a well defined core (missing inheritance from SenderAbstraction).";
513 throw std::runtime_error(oss.str());
521 std::ostringstream oss;
522 oss <<
"You can not modify a graph that is connected inside another graph: " << funcName;
523 throw (std::runtime_error(oss.str()));
532 copyableNode->createGroup(*this->insideNodesAndGroups_);
533 for (
auto © : insideNode.second) {
534 copy->registerNode(
this);
537 graph->createInnerGroupsAndLaunchThreads(waitForInitialization);
539 ep->launchGraphThreads(waitForInitialization);
548 std::set<NodeAbstraction *> nodeAbstractions;
550 nodeAbstractions.insert(insideNode.first);
552 insideNode.second.cbegin(), insideNode.second.cend(),
553 [&nodeAbstractions](
auto &nodeAbstraction) { nodeAbstractions.insert(nodeAbstraction); });
555 scheduler_->spawnThreads(nodeAbstractions, waitForInitialization);
560 void gatherCleanable(std::unordered_set<hh::behavior::Cleanable *> &cleanableSet)
override {
562 if (
auto cleanableReresentative =
564 cleanableReresentative->gatherCleanable(cleanableSet);
565 for (
auto copy : insideNodeGroups.second) {
567 cleanableCopy->gatherCleanable(cleanableSet);
568 }
else {
throw std::runtime_error(
"A copy of a cleanable node should be a cleanable node."); }
578 std::shared_ptr<NodeAbstraction>> &correspondenceMap)
override {
579 return std::make_shared<
CoreGraph<Separator, AllTypes...>>(*
this, correspondenceMap);
586 std::map<
NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap) {
588 std::shared_ptr<NodeAbstraction> nodeClone;
590 for (
auto &originalNode : originalInsideNodes) {
592 if (!correspondenceMap.contains(originalNode.first)) {
593 nodeClone = originalAsClonable->clone(correspondenceMap);
596 correspondenceMap.insert({originalNode.first, nodeClone});
601 this->duplicateSourceEdges(rhs, correspondenceMap);
602 this->duplicateSinkEdges(rhs, correspondenceMap);
604 for (
auto &originalNode : originalInsideNodes) {
606 originalAsClonable->duplicateEdge(correspondenceMap);
614 this->duplicateOutputEdges(mapping);
typename internals::Intersect< Tuple1, Tuple2 >::type Intersect_t
Helper getting the intersection of types between two type tuples.
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::GraphOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > GOM
Type alias for an GraphOutputsManagementAbstraction from the list of template parameters.
tool::GraphInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > GIM
Type alias for an GraphInputsManagementAbstraction from the list of template parameters.
Hedgehog graph abstraction.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
virtual void printGraphFooter(core::abstraction::GraphNodeAbstraction const *graph)=0
Print graph footer.
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
virtual void printGroup(core::abstraction::NodeAbstraction *representative, std::vector< core::abstraction::NodeAbstraction * > const &group)=0
Print group of nodes.
virtual void printGraphHeader(core::abstraction::GraphNodeAbstraction const *graph)=0
Print graph header.
Behavior abstraction for the base node.
Abstraction for cores/nodes that can form groups.
Abstraction for cleanable core.
CleanableAbstraction()=default
Constructor used by the CoreGraph to have the handles to clean inner cleanable nodes.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
void storeClone(std::shared_ptr< abstraction::NodeAbstraction > const &clone)
Store a core clone.
Core abstraction to notify slots.
Core's abstraction to receive a piece of data.
Core abstraction to send data.
Core's abstraction to receive a signal.
std::set< NotifierAbstraction * > const & connectedNotifiers() const
Accessor to the NotifierAbstraction attached to this slot, protected with mutex.
void addNotifier(NotifierAbstraction *const notifier)
Add a NotifierAbstraction to this slot.
Abstraction specialized for the execution pipeline.
Base graph node abstraction.
std::chrono::nanoseconds const & graphConstructionDuration() const
Graph construction duration accessor.
GraphNodeAbstraction(std::string const &name)
Base graph abstraction.
Status graphStatus_
Group status.
std::chrono::time_point< std::chrono::system_clock > const & graphStartCreation() const
Graph start creation timestamp accessor.
std::unique_ptr< std::map< NodeAbstraction *, std::vector< NodeAbstraction * > > > const insideNodesAndGroups_
All nodes of the graph, mapped to their group nodes.
std::chrono::time_point< std::chrono::system_clock > const & startExecutionTimeStamp() const
Accessor to the starting execution timestamp.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
bool isRegistered() const
Accessor to registration flag.
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
std::string const & name() const
Accessor to the core's name.
PrintAbstraction, used to determine if the node should be visited by the printer and colored.
void cleanGraph()
Clean the graph.
std::unique_ptr< Scheduler > const scheduler_
Scheduler used by the graph.
void callAddInputNodeToGraph(NodeAbstraction *const core, std::index_sequence< Indices... >)
Call addInputNodeToGraph for all type-elements of a tuple.
void visit(Printer *printer) override
Visit the graph.
void joinThreads() override
Wait for the threads to join.
void drawEdges(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore, std::index_sequence< Indexes... >)
Do the actual typed connections between a sender and receiver.
void callAddOutputNodeToGraph(NodeAbstraction *const core, std::index_sequence< Indices... >)
Call addOutputNodeToGraph for all type-elements of a tuple.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone the current graph.
void setInputForAllCommonTypes(InputCore *const core)
Connect a core's node as input of the graph for all compatible types.
void createInnerGroupsAndLaunchThreads(bool waitForInitialization) override
Create the inner groups and launch the threads inside of a graph.
void executeGraph(bool waitForInitialization)
Execute the graph.
void testAbstractSenders(NodeAbstraction *const core, std::index_sequence< Indices... >)
Test a core if it can send data of specific types.
~CoreGraph() override=default
Default destructor.
void testAbstractReceivers(NodeAbstraction *const core, std::index_sequence< Indices... >)
Test a core if it can receives data of specific types.
void testRegistered(auto const &funcName)
Test if the graph has been registered.
void addEdgeForACommonType(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Connect two nodes together for a common type.
void setOutputForAllCommonTypes(OutputCore *const core)
Connect a core's node as output of the graph for all compatible types.
std::vector< std::pair< std::string const, std::string const > > ids() const override
Node ids [nodeId, nodeGroupId] accessor.
CoreGraph(CoreGraph const &rhs, std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap)
Copy constructor using an added correspondence map.
void broadcastAndNotifyAllInputNodes(std::shared_ptr< CompatibleInputType_t > &data)
Broadcast an input data to all inputs nodes.
behavior::Node * node() const override
Node accessor.
void connectNotifierToSlot(NodeAbstraction *senderCore, NodeAbstraction *receiverCore)
Connect a notifier to a slot, used when connecting two nodes.
void duplicateInsideNodes(CoreGraph const &rhs, std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap)
Clone all of the inside nodes.
void finishPushingData()
Indicate to the graph that no more input will be sent, trigger the termination of the graph.
CoreGraph(std::string const &name, std::unique_ptr< Scheduler > scheduler, hh::Graph< Separator, AllTypes... > *graph)
Core Graph constructor using the name and the scheduler.
hh::Graph< Separator, AllTypes... > *const graph_
Graph attached to this core.
void waitForTermination()
Wait for the graph to terminate.
void launchThreads(bool waitForInitialization)
Launch the threads with the scheduler.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the graph output's edges.
void gatherCleanable(std::unordered_set< hh::behavior::Cleanable * > &cleanableSet) override
Gather cleanable nodes.
void registerNodeInsideGraph(NodeAbstraction *const core) override
Register a node inside of a graph.
void addEdgeForAllCommonTypes(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Connect two nodes together for all common types.
void drawEdge(NodeAbstraction *const senderCore, NodeAbstraction *const receiverCore)
Do the actual typed connection between a sender and receiver.
void setInputForACommonType(InputCore *const core)
Connect a core's node as input of the graph for an compatible input type.
void setOutputForACommonType(OutputCore *const core)
Connect a core's node as output of the graph for a compatible type.
void setInside() override
Set the graph as inside of another graph.
Test if an input type is in the list of input types (tuple)
Test if a core can be input of a graph (derives from core::abstraction::NodeAbstraction and shares at...
Test if a core can be input of a graph for a type (derives from core::abstraction::NodeAbstraction an...
Test if a core can be output of a graph (derives from core::abstraction::NodeAbstraction and shares a...
Test if a core can be output of a graph for a type (derives from core::abstraction::NodeAbstraction a...