20 #ifndef HEDGEHOG_CORE_GRAPH_H 21 #define HEDGEHOG_CORE_GRAPH_H 27 #include "../node/core_node.h" 28 #include "../io/base/sender/core_notifier.h" 29 #include "../io/base/receiver/core_multi_receivers.h" 30 #include "../io/graph/receiver/core_graph_multi_receivers.h" 31 #include "../io/graph/receiver/core_graph_sink.h" 32 #include "../io/graph/sender/core_graph_source.h" 33 #include "../../tools/traits.h" 34 #include "../../tools/helper.h" 35 #include "../../api/scheduler/default_scheduler.h" 36 #include "../../api/scheduler/abstract_scheduler.h" 37 #include "../../api/printer/dot_printer.h" 42 #ifndef DOXYGEN_SHOULD_SKIP_THIS 43 template<
class GraphOutput,
class ...GraphInputs>
48 #endif // DOXYGEN_SHOULD_SKIP_THIS 62 template<
class GraphOutput,
class ...GraphInputs>
70 std::shared_ptr<CoreGraphSink<GraphOutput>>
sink_ =
nullptr;
81 std::unique_ptr<AbstractScheduler> scheduler = std::make_unique<DefaultScheduler>()) :
90 HLOG_SELF(0,
"Creating CoreGraph with coreGraph: " << graph <<
" type: " << (
int) type <<
" and name: " << name)
92 std::ostringstream oss;
93 oss <<
"Internal error, the graph's scheduler is null, please instantiate an AbstractScheduler.";
94 HLOG_SELF(0, oss.str())
95 throw (std::runtime_error(oss.str()));
98 this->inputsCoreNodes_ = std::make_unique<std::set<CoreNode *>>();
99 this->outputCoreNodes_ = std::make_unique<std::set<CoreSender < GraphOutput> *>>
102 this->sink_ = std::make_shared<CoreGraphSink<GraphOutput>>();
103 this->scheduler_ = std::move(scheduler);
104 this->source_->belongingNode(
this);
105 this->sink_->belongingNode(
this);
111 CoreNode(rhs.name(), rhs.type(), 1),
113 CoreSlot(rhs.name(), rhs.type(), 1),
114 CoreReceiver<GraphInputs>(rhs.name(), rhs.type(), 1)...,
115 CoreSender<GraphOutput>(rhs.name(), rhs.type(),1),
117 this->inputsCoreNodes_ = std::make_unique<std::set<CoreNode *>>();
118 this->outputCoreNodes_ = std::make_unique<std::set<CoreSender < GraphOutput> *>>
122 this->sink_ = std::make_shared<CoreGraphSink<GraphOutput>>();
124 this->
source_->belongingNode(
this);
125 this->sink_->belongingNode(
this);
137 std::shared_ptr<CoreNode>
clone()
override {
return std::make_shared<
CoreGraph<GraphOutput, GraphInputs...>>(*this); }
140 ~CoreGraph()
override {HLOG_SELF(0,
"Destructing CoreGraph")}
160 [[nodiscard]] std::unique_ptr<std::set<CoreSender < GraphOutput> *>>
const &
179 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
maxExecutionTime()
const override {
180 std::chrono::duration<uint64_t, std::micro>
181 ret = std::chrono::duration<uint64_t, std::micro>::min(),
183 std::shared_ptr<CoreNode> core;
186 switch (core->type()) {
188 case NodeType::Graph:
189 case NodeType::StateManager:
190 case NodeType::ExecutionPipeline:
191 temp = core->maxExecutionTime();
192 if (temp > ret) ret = temp;
202 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
minExecutionTime()
const override {
203 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max(), temp{};
204 std::shared_ptr<CoreNode> core;
207 switch (core->type()) {
209 case NodeType::Graph:
210 case NodeType::StateManager:
211 case NodeType::ExecutionPipeline:
212 temp = core->minExecutionTime();
213 if (temp < ret) ret = temp;
223 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
maxWaitTime()
const override {
224 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min(), temp{};
225 std::shared_ptr<CoreNode> core;
228 switch (core->type()) {
230 case NodeType::Graph:
231 case NodeType::StateManager:
232 case NodeType::ExecutionPipeline:
233 temp = core->maxWaitTime();
234 if (temp > ret) ret = temp;
244 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
minWaitTime()
const override {
245 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max(), temp{};
246 std::shared_ptr<CoreNode> core;
249 switch (core->type()) {
251 case NodeType::Graph:
252 case NodeType::StateManager:
253 case NodeType::ExecutionPipeline:
254 temp = core->minWaitTime();
255 if (temp < ret) ret = temp;
273 class UserDefinedSender,
class UserDefinedMultiReceiver,
274 class Output =
typename UserDefinedSender::output_t,
275 class Inputs =
typename UserDefinedMultiReceiver::inputs_t,
276 class IsSender =
typename std::enable_if_t<
281 class IsMultiReceiver =
typename std::enable_if_t<
287 void addEdge(std::shared_ptr<UserDefinedSender> from, std::shared_ptr<UserDefinedMultiReceiver> to) {
288 assert(from !=
nullptr && to !=
nullptr);
289 static_assert(traits::Contains_v<Output, Inputs>,
"The given Receiver cannot be linked to this Sender");
291 std::ostringstream oss;
292 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
293 HLOG_SELF(0, oss.str())
294 throw (std::runtime_error(oss.str()));
299 auto coreNotifier =
dynamic_cast<CoreNotifier *
>(coreSender);
304 if (from->core().get() ==
this || to->core().get() ==
this) {
305 std::ostringstream oss;
306 oss <<
"You can not connect a graph to itself: " << __FUNCTION__;
307 HLOG_SELF(0, oss.str())
308 throw (std::runtime_error(oss.str()));
311 if (coreSender->hasBeenRegistered()) {
312 if (coreSender->belongingNode() !=
this) {
313 std::ostringstream oss;
314 oss <<
"The Sender node should belong to the graph: " << __FUNCTION__;
315 HLOG_SELF(0, oss.str())
316 throw (std::runtime_error(oss.str()));
320 if (coreReceiver->hasBeenRegistered()) {
321 if (coreReceiver->belongingNode() !=
this) {
322 std::ostringstream oss;
323 oss <<
"The Receiver node should belong to the graph: " << __FUNCTION__;
324 HLOG_SELF(0, oss.str())
325 throw (std::runtime_error(oss.str()));
330 "Add edge from " << coreSender->name() <<
"(" << coreSender->id() <<
") to " << coreReceiver->name()
331 <<
"(" << coreReceiver->id()
334 for (
auto r : coreReceiver->receivers()) { coreSender->addReceiver(r); }
336 for (
auto s : coreSender->getSenders()) {
337 coreReceiver->addSender(s);
338 coreSlot->addNotifier(s);
341 this->
registerNode(std::dynamic_pointer_cast<CoreNode>(from->core()));
342 this->
registerNode(std::dynamic_pointer_cast<CoreNode>(to->core()));
353 class UserDefinedMultiReceiver,
354 class InputsMR =
typename UserDefinedMultiReceiver::inputs_t,
356 class isMultiReceiver =
typename std::enable_if_t<
357 std::is_base_of_v<typename helper::HelperMultiReceiversType<InputsMR>::type, UserDefinedMultiReceiver>
359 class isInputCompatible =
typename std::enable_if_t<traits::is_included_v<InputsMR, InputsG>>>
360 void input(std::shared_ptr<UserDefinedMultiReceiver> inputNode) {
362 std::ostringstream oss;
363 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
364 HLOG_SELF(0, oss.str())
365 throw (std::runtime_error(oss.str()));
368 if (
auto inputCoreNode =
370 HLOG_SELF(0,
"Set " << inputCoreNode->name() <<
"(" << inputCoreNode->id() <<
") as input")
372 if (inputCoreNode->hasBeenRegistered()) {
373 if (inputCoreNode->belongingNode() !=
this) {
374 std::ostringstream oss;
375 oss <<
"The node " << inputCoreNode->name() <<
" belong already to another coreGraph: " 377 HLOG_SELF(0, oss.str())
378 throw (std::runtime_error(oss.str()));
383 this->inputsCoreNodes_->insert(inputCoreNode);
386 std::ostringstream oss;
387 oss <<
"The node " << inputCoreNode->name() <<
" is not a multi receiver: " << __FUNCTION__;
388 HLOG_SELF(0, oss.str())
389 throw (std::runtime_error(oss.str()));
391 this->
registerNode(std::static_pointer_cast<CoreNode>(inputNode->core()));
400 std::ostringstream oss;
401 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
402 HLOG_SELF(0, oss.str())
403 throw (std::runtime_error(oss.str()));
407 HLOG_SELF(0,
"Set " << outputCoreNode->name() <<
"(" << outputCoreNode->id() <<
") as outputNode")
408 if (outputCoreNode->hasBeenRegistered()) {
409 if (outputCoreNode->belongingNode() !=
this) {
410 std::ostringstream oss;
412 <<
"The node " << outputCoreNode->name() <<
" belong already to another coreGraph: " 414 HLOG_SELF(0, oss.str())
415 throw (std::runtime_error(oss.str()));
418 this->outputCoreNodes_->insert(outputCoreNode);
420 this->sink_->addNotifier(sender);
421 this->sink_->addSender(sender);
423 outputCoreNode->addSlot(this->sink_.get());
424 outputCoreNode->addReceiver(this->sink_.get());
426 std::ostringstream oss;
427 oss <<
"Internal error, the output node is not a valid CoreSender: " << __FUNCTION__;
428 HLOG_SELF(0, oss.str())
429 throw (std::runtime_error(oss.str()));
432 this->
registerNode(std::static_pointer_cast<CoreNode>(outputNode->core()));
440 class =
typename std::enable_if_t<
traits::Contains<Input, GraphInputs...>::value>
443 HLOG_SELF(2,
"Broadcast data and notify all coreGraph's inputs")
445 std::ostringstream oss;
446 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
447 HLOG_SELF(0, oss.str())
448 throw (std::runtime_error(oss.str()));
456 HLOG_SELF(0,
"Set the coreGraph inside")
459 for (
CoreNode *inputNode: *(this->inputsCoreNodes_)) {
460 if (
auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
461 ( coreSlot->removeNotifier(
462 static_cast<CoreNotifier *>(
472 std::ostringstream oss;
473 oss <<
"Internal error, the input node is not a slot, when graph is set inside : " << __FUNCTION__;
474 HLOG_SELF(0, oss.str())
475 throw (std::runtime_error(oss.str()));
480 std::for_each(this->outputCoreNodes_->begin(), this->outputCoreNodes_->end(),
482 s->removeSlot(this->sink_.get());
483 s->removeReceiver(this->sink_.get());
489 this->sink_ =
nullptr;
494 [[nodiscard]] std::vector<std::pair<std::string, std::string>>
ids() const final {
495 std::vector<std::pair<std::string, std::string>> v{};
496 for (
auto input : *(this->inputsCoreNodes_)) {
497 for (std::pair<std::string, std::string>
const &innerInput :
input->ids()) { v.push_back(innerInput); }
505 HLOG_SELF(2,
"Execute the coreGraph")
507 std::ostringstream oss;
508 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
509 HLOG_SELF(0, oss.str())
510 throw (std::runtime_error(oss.str()));
514 auto finishCreationTime = std::chrono::high_resolution_clock::now();
515 this->
creationDuration(std::chrono::duration_cast<std::chrono::microseconds>(
521 HLOG_SELF(2,
"Wait for the coreGraph to terminate")
522 this->scheduler_->joinAll();
523 std::chrono::time_point<std::chrono::high_resolution_clock>
524 endExecutionTimeStamp = std::chrono::high_resolution_clock::now();
531 HLOG_SELF(2,
"Indicate finish pushing data")
533 std::ostringstream oss;
534 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
535 HLOG_SELF(0, oss.str())
536 throw (std::runtime_error(oss.str()));
538 this->
source_->notifyAllTerminated();
550 HLOG_SELF(2,
"Get blocking data")
552 std::ostringstream oss;
553 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
554 HLOG_SELF(0, oss.str())
555 throw (std::runtime_error(oss.str()));
557 std::shared_ptr<GraphOutput> result =
nullptr;
558 this->sink_->waitForNotification();
559 this->sink_->lockUniqueMutex();
560 if (!this->sink_->receiverEmpty()) { result = this->sink_->popFront(); }
561 this->sink_->unlockUniqueMutex();
568 std::shared_ptr<CoreNode>>> &insideNodesGraph)
override {
575 HLOG_SELF(1,
"Visit")
585 if (this->
source_ && this->sink_) {
587 this->sink_->visit(printer);
592 it = this->
insideNodes()->upper_bound(it->first)) {
594 it->second->visit(printer);
609 HLOG_SELF(0,
"Add receiver " << receiver->
name() <<
"(" << receiver->
id() <<
")")
611 outputNode->addReceiver(receiver);
618 HLOG_SELF(0,
"Remove receiver " << receiver->
name() <<
"(" << receiver->
id() <<
")")
620 outputNode->removeReceiver(receiver);
627 void sendAndNotify([[maybe_unused]]std::shared_ptr<GraphOutput> ptr)
override {
628 std::ostringstream oss;
629 oss <<
"Internal error, a graph do not send data: " << __FUNCTION__;
630 HLOG_SELF(0, oss.str())
631 throw (std::runtime_error(oss.str()));
638 HLOG_SELF(0,
"Add Slot " << slot->
name() <<
"(" << slot->
id() <<
")")
640 outputNode->addSlot(slot);
647 HLOG_SELF(0,
"Remove Slot " << slot->
name() <<
"(" << slot->
id() <<
")")
649 outputNode->removeSlot(slot);
656 std::ostringstream oss;
657 oss <<
"Internal error, a graph do not notify nodes: " << __FUNCTION__;
658 HLOG_SELF(0, oss.str())
659 throw (std::runtime_error(oss.str()));
665 HLOG_SELF(0,
"Add Notifier " << notifier->
name() <<
"(" << notifier->
id() <<
")")
666 for (
CoreNode *inputNode: *(this->inputsCoreNodes_)) {
667 if (
auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
668 coreSlot->addNotifier(notifier);
670 std::ostringstream oss;
671 oss <<
"Internal error, A graph's input node is not a slot: " << __FUNCTION__;
672 HLOG_SELF(0, oss.str())
673 throw (std::runtime_error(oss.str()));
681 HLOG_SELF(0,
"Remove Notifier " << notifier->
name() <<
"(" << notifier->
id() <<
")")
682 for (
CoreNode *inputNode: *(this->inputsCoreNodes_)) {
683 if (
auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
684 coreSlot->removeNotifier(notifier);
686 std::ostringstream oss;
687 oss <<
"Internal error, A graph's input node is not a slot: " << __FUNCTION__;
688 HLOG_SELF(0, oss.str())
689 throw (std::runtime_error(oss.str()));
697 std::ostringstream oss;
698 oss <<
"Internal error, A graph has no notifier connected: " << __FUNCTION__;
699 HLOG_SELF(0, oss.str())
700 throw (std::runtime_error(oss.str()));
706 std::ostringstream oss;
707 oss <<
"Internal error, A graph's is not directly connected to the input nodes: " << __FUNCTION__;
708 HLOG_SELF(0, oss.str())
709 throw (std::runtime_error(oss.str()));
714 HLOG_SELF(2,
"Wake up all inputs")
715 for (
CoreNode *inputNode: *(this->inputsCoreNodes_)) {
716 if (
auto coreSlot = dynamic_cast<CoreSlot *>(inputNode)) {
719 std::ostringstream oss;
720 oss <<
"Internal error, A graph's input is not a core slot: " << __FUNCTION__;
721 HLOG_SELF(0, oss.str())
722 throw (std::runtime_error(oss.str()));
731 std::ostringstream oss;
732 oss <<
"Internal error, a graph is not connected to input nodes, so do not wait for notification: " 734 HLOG_SELF(0, oss.str())
735 throw (std::runtime_error(oss.str()));
740 [[nodiscard]] std::set<CoreSender < GraphOutput>*>
742 std::set<CoreSender < GraphOutput>*> coreSenders;
743 std::set<CoreSender < GraphOutput>*> tempCoreSenders;
745 tempCoreSenders = outputNode->getSenders();
746 coreSenders.insert(tempCoreSenders.begin(), tempCoreSenders.end());
753 [[nodiscard]] std::set<CoreSlot *>
getSlots()
override {
754 std::set<CoreSlot *> coreSlots;
755 std::set<CoreSlot *> tempCoreSlots;
757 for (
CoreNode *mr : *(this->inputsCoreNodes_)) {
758 tempCoreSlots = mr->getSlots();
759 coreSlots.insert(tempCoreSlots.begin(), tempCoreSlots.end());
766 HLOG_SELF(2,
"Join coreGraph threads")
767 this->scheduler_->joinAll();
772 HLOG_SELF(0,
"Cluster creation")
773 std::vector<std::shared_ptr<CoreNode>> insideCoreNodes;
774 insideCoreNodes.reserve(this->
insideNodes()->size());
775 for (
auto coreNode : *(this->
insideNodes())) { insideCoreNodes.push_back(coreNode.second); }
776 for (
auto const &insideCoreNode : insideCoreNodes) { insideCoreNode->createCluster(this->
insideNodes()); }
782 HLOG_SELF(0,
"Launching threads")
783 std::vector<std::shared_ptr<CoreNode>> insideCoreNodes;
784 insideCoreNodes.reserve(this->
insideNodes()->size());
785 for (
auto coreNode : *(this->
insideNodes())) { insideCoreNodes.push_back(coreNode.second); }
786 this->scheduler_->spawnThreads(insideCoreNodes);
793 HLOG_SELF(0,
"Register coreNode " << coreNode->name() <<
"(" << coreNode->id() <<
")")
794 if (!coreNode->hasBeenRegistered()) {
795 coreNode->setInside();
804 std::multimap<CoreNode *, std::shared_ptr<CoreNode>> &originalInsideNodes = *(rhs.
insideNodes());
806 std::map<CoreNode *, std::shared_ptr<CoreNode>> correspondenceMap;
808 std::shared_ptr<CoreNode> duplicate;
811 for (std::pair<
CoreNode *
const, std::shared_ptr<CoreNode>>
const &originalNode : originalInsideNodes) {
812 duplicate = originalNode.second->clone();
813 duplicate->belongingNode(
this);
814 correspondenceMap.insert({originalNode.second.get(), duplicate});
818 for (std::pair<
CoreNode *
const, std::shared_ptr<CoreNode>>
const &originalNode : originalInsideNodes) {
820 CoreNode *originalInsideNode = originalNode.second.get();
822 std::shared_ptr<CoreNode> duplicateInsideNode = correspondenceMap.find(originalInsideNode)->second;
823 duplicateInsideNode->belongingNode(
this);
824 this->
insideNodes()->insert({duplicateInsideNode.get(), duplicateInsideNode});
828 for (std::pair<
CoreNode *
const, std::shared_ptr<CoreNode>>
const &originalNode : originalInsideNodes) {
829 CoreNode *originalInsideNode = originalNode.second.get();
830 std::shared_ptr<CoreNode> duplicateInsideNode = correspondenceMap.find(originalInsideNode)->second;
831 originalInsideNode->
duplicateEdge(duplicateInsideNode.get(), correspondenceMap);
836 auto shInputCoreNode = correspondenceMap.find(originalInputNode)->second;
837 auto inputCoreNode = shInputCoreNode.get();
838 this->inputsCoreNodes_->insert(inputCoreNode);
845 auto shOutputCoreNode = correspondenceMap.find(originalOutputNode)->second;
847 this->outputCoreNodes_->insert(outputCoreNode);
850 this->sink_->addNotifier(sender);
851 this->sink_->addSender(sender);
854 outputCoreNode->addSlot(this->sink_.get());
855 outputCoreNode->addReceiver(this->sink_.get());
857 this->
registerNode(std::static_pointer_cast<CoreNode>(shOutputCoreNode));
859 std::ostringstream oss;
860 oss <<
"Internal error, the output node is not a sender: " << __FUNCTION__;
861 HLOG_SELF(0, oss.str())
862 throw (std::runtime_error(oss.str()));
870 template<
class ...InputNodeTypes>
873 this->
source_->addSlot(inputCoreNode);
883 template<
class InputNodeType,
class ...InputNodeTypes>
886 inputCoreNode->
addNotifier(compatibleSourceType.get());
887 compatibleSourceType->addReceiver(inputCoreNode);
894 template<
class InputNodeType>
898 inputCoreNode->
addSender(compatibleSource);
907 template<
class InputNodeType>
911 InputNodeType
> *>(inputCoreNode));
912 if (
auto coreSlot = dynamic_cast<CoreSlot *>(inputCoreNode)) {
913 this->
source_->addSlot(coreSlot);
916 std::ostringstream oss;
917 oss <<
"Internal error, the inputCoreNode is not a CoreSlot: " << __FUNCTION__;
918 HLOG_SELF(0, oss.str())
919 throw (std::runtime_error(oss.str()));
922 InputNodeType
> *>(inputCoreNode));
932 for (
auto it = this->
insideNodes()->equal_range(node.get()).first;
933 it != this->
insideNodes()->equal_range(node.get()).second; ++it) {
935 it->second->visit(printer);
943 #endif //HEDGEHOG_CORE_GRAPH_H NodeType type() const
Node type accessor.
int deviceId() override
Device id accessor.
std::shared_ptr< GraphOutput > getBlockingResult()
Get data out of the graph.
std::vector< std::pair< std::string, std::string > > ids() const final
Get ids of input nodes (vector<pair<nodeId, nodeIdCluster>>)
Receiver Interface, receive one data type from CoreSender.
void removeForAllSenders(CoreNode *coreNode)
Remove all coreNode's senders from this.
virtual void printClusterFooter()=0
Print cluster footer.
bool waitForNotification() override
A graph can't wait for notification, throws an error in all case.
std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode > > > const & insideNodes() const
Inside node accessor.
std::chrono::duration< uint64_t, std::micro > maxWaitTime() const override
Compute the maximum wait time for the graph's inside nodes.
std::set< CoreSender< GraphOutput > * > getSenders() override
Get the senders from the graphs, gather them from the output nodes.
Graph< GraphOutput, GraphInputs... > * graph_
User graph.
void addReceiversToSource(CoreMultiReceivers< InputNodeTypes... > *inputCoreNode)
Add receivers to source and do the connection.
Graph Receiver for a type GraphInput.
std::unique_ptr< std::set< CoreSender< GraphOutput > * > > const & outputCoreNodes() const
Output node's CoreSender accessor.
std::shared_ptr< CoreGraphSource< GraphInputs... > > const & source() const
Source accessor.
Core Notifier interface, emit notification to CoreSlot.
std::unique_ptr< AbstractScheduler > scheduler_
Scheduler.
std::shared_ptr< CoreGraphSource< GraphInputs... > > source_
Outer graph's source.
std::shared_ptr< CoreNode > clone() override
Clone a core graph calling the graph copy constructor.
std::shared_ptr< CoreGraphSink< GraphOutput > > sink_
Inner graph's source.
void createCluster([[maybe_unused]]std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &insideNodesGraph) override
Create all clusters for inside nodes and launch the threads, not gathered into insideNodesGraph.
void setInside() override
Set the graph as inside, in case of connection to another node.
std::unique_ptr< std::set< CoreSender< GraphOutput > * > > outputCoreNodes_
Output node's core.
Part of the outer graph that sends data from the outside to the input nodes.
virtual void setInside()
Set the node as inside, (inside a graph)
std::unique_ptr< std::set< CoreNode * > > const & inputsCoreNodes() const
Input node's cores accessor.
void addReceiverToSource(CoreReceiver< InputNodeType > *inputCoreNode)
If the input core node is compatible, connect it to the source.
CoreGraph(Graph< GraphOutput, GraphInputs... > *graph, NodeType const type, std::string_view const &name, std::unique_ptr< AbstractScheduler > scheduler=std::make_unique< DefaultScheduler >())
CoreGraph constructor.
bool hasNotifierConnected() override
Test notifier for the graph, should not be used, connection is made to the input nodes.
void removeInsideNode(CoreNode *coreNode)
Remove a node from the registered inside nodes.
void addGraphReceiverInput(CoreReceiver< GraphInputs > *receiver)
Register a CoreReceiver from an input node.
std::unique_ptr< std::set< CoreNode * > > inputsCoreNodes_
Input node's core.
void output(std::shared_ptr< behavior::Sender< GraphOutput >> outputNode)
Set a node as output for the graph.
int deviceId_
Device Id used for computation on devices.
void addSlot(CoreSlot *slot) override
Add a slot to a graph, i.e, to all output nodes.
Sender Behavior definition, node has an output type.
Multi receiver interface, gather multiple CoreReceiver.
Check if a template T is in Template pack Ts.
Sender for nodes possessing a queue of data.
void finishPushingData()
Notify the graph no more input data will be pushed.
std::chrono::time_point< std::chrono::high_resolution_clock > const & creationTimeStamp() const
Creation timestamp accessor.
void sendAndNotify([[maybe_unused]]std::shared_ptr< GraphOutput > ptr) override
Send a data and notify receivers, not possible for a graph, throws an error in every case...
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the graph, i.e, add a receiver to all output nodes.
void createInnerClustersAndLaunchThreads()
Create inside nodes' cluster and launch the threads.
CoreGraph(CoreGraph< GraphOutput, GraphInputs... > const &rhs)
Core graph copy constructor.
void graphId(size_t graphId)
Graph id setter.
Node Behavior definition.
Slot interface, receive notification from CoreNotifier.
void input(std::shared_ptr< UserDefinedMultiReceiver > inputNode)
Set a node as input for the graph.
void registerNode(const std::shared_ptr< CoreNode > &coreNode)
Register a node inside the graph.
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
Main Hedgehog object that does computation.
size_t numberInputNodes() const override
Return the number of input nodes connected, a graph should not have such a connection, throws in every case.
void addNotifier(CoreNotifier *notifier) override
Add a notifier to the graph, ie, to all input nodes.
void waitForTermination()
Wait for all inside threads to join.
void executeGraph()
Execute the graph.
void addEdge(std::shared_ptr< UserDefinedSender > from, std::shared_ptr< UserDefinedMultiReceiver > to)
Add a directed edge from a compatible "from" node to "to" node.
NodeType
Hedgehog node's type.
Sender interface, send data to CoreReceiver.
void deviceId(int deviceId) override
Device id setter.
void addUniqueInsideNode(const std::shared_ptr< CoreNode > &coreNode)
Add a node to the inside nodes.
void joinThreads() override
Join the threads managed by the graph.
void duplicateInsideNodes(CoreGraph< GraphOutput, GraphInputs... > const &rhs)
Duplicate inside nodes, called by CoreExecutionPipeline.
void visit(AbstractPrinter *printer) override
Special visit method for a CoreGraph.
~CoreGraph() override
Graph's core default destructor.
void printCluster(AbstractPrinter *printer, std::shared_ptr< CoreNode > const &node)
Specialized method if the input node is in a cluster.
Main Hedgehog core abstraction.
MultiReceivers Behavior definition, node has a list of input types.
behavior::Node * node() override
User graph accessor.
void notifyAllTerminated() override
Notify termination to all connected nodes, not possible for a graph, throw an error in every case...
void removeReceiver(CoreReceiver< GraphOutput > *receiver) override
Remove a receiver from the graph, i.e, remove a receiver from all output nodes.
void wakeUp() override
Wake up a graph, wake up all input nodes.
std::chrono::time_point< std::chrono::high_resolution_clock > const & startExecutionTimeStamp() const
Execution start timestamp accessor.
virtual void printClusterHeader(core::CoreNode const *clusterNode)=0
Print cluster header.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
std::chrono::duration< uint64_t, std::micro > const & executionDuration() const
Execution duration accessor.
virtual void printGraphFooter(core::CoreNode const *node)=0
Print graph footer.
virtual std::set< CoreSender< Output > * > getSenders()=0
Get inner CoreSender represented by this one in the case of outer graph for example.
virtual void printGraphHeader(core::CoreNode const *node)=0
Print graph header.
virtual void addSlot(CoreSlot *slot)=0
Interface to add a CoreSlot to this notifier.
std::string_view const & name() const
Node name accessor.
virtual void addNotifier(CoreNotifier *notifier)=0
Interface to add a CoreNotifier to this slot.
virtual void printClusterEdge(core::CoreNode const *clusterNode)=0
Print cluster edge.
void addSender(CoreSender< GraphInputs > *sender) final
Add a CoreSender to the graph.
Core associated to the Graph.
void addSourceNotifierInputCoreNode(CoreMultiReceivers< InputNodeTypes... > *inputCoreNode)
Add an input node to the source.
void launchThreads()
Launch the threads using the graph's scheduler.
CoreNode * belongingNode() const
Belonging node accessor.
bool hasBeenRegistered() const
Node registration property accessor.
virtual void printNodeInformation(core::CoreNode *node)=0
Print node information.
virtual void duplicateEdge(CoreNode *duplicateNode, std::map< CoreNode *, std::shared_ptr< CoreNode >> &correspondenceMap)=0
Duplicate all of the edges from this to its copy duplicateNode.
int graphId() override
Graph id accessor.
std::chrono::duration< uint64_t, std::micro > minWaitTime() const override
Compute the minimum wait time for the graph's inside nodes.
void removeNotifier(CoreNotifier *notifier) override
Remove a notifier from the graph, ie, from all input nodes.
Base definition of HelperMultiReceiversType.
std::chrono::duration< uint64_t, std::micro > maxExecutionTime() const override
Compute the maximum execution time for the graph's inside nodes.
std::chrono::duration< uint64_t, std::micro > minExecutionTime() const override
Compute the minimum execution time for the graph's inside nodes.
std::chrono::duration< uint64_t, std::micro > const & creationDuration() const
Creation duration accessor.
bool isInside() const
Node inside property accessor.
virtual std::string id() const
Unique Id accessor.
std::set< CoreSlot * > getSlots() override
Get the slots from the graphs, gather them from the input nodes.
void removeSlot(CoreSlot *slot) override
Remove a slot from a graph, i.e, from all output nodes.
Base definition of HelperCoreMultiReceiversType.
void broadcastAndNotifyToAllInputs(std::shared_ptr< Input > &data)
Broadcast data and notify all input nodes.
virtual void addSender(CoreSender< Input > *sender)=0
Interface to add a CoreSender to the receiver.
void duplicateInputNodes(CoreReceiver< InputNodeType > *inputCoreNode)
Duplicate input nodes, and do the connections for the source compatible type.