22 #ifndef HEDGEHOG_CORE_EXECUTION_PIPELINE_H 23 #define HEDGEHOG_CORE_EXECUTION_PIPELINE_H 25 #include "core_switch.h" 26 #include "../core_task.h" 27 #include "../../../api/graph.h" 32 #ifndef DOXYGEN_SHOULD_SKIP_THIS 33 template<
class GraphOutput,
class ...GraphInputs>
37 class AbstractExecutionPipeline;
38 #endif //DOXYGEN_SHOULD_SKIP_THIS 49 template<
class GraphOutput,
class ...GraphInputs>
78 :
CoreTask<GraphOutput, GraphInputs...>(name, 1,
NodeType::ExecutionPipeline, nullptr, automaticStart),
80 numberGraphs_(numberGraphs),
81 deviceIds_(deviceIds),
83 if (this->numberGraphs_ == 0) { this->numberGraphs_ = 1; }
85 if (coreBaseGraph->isInside() || this->
isInside()) {
86 std::ostringstream oss;
87 oss <<
"You can not modify a graph that is connected inside another graph: " << __FUNCTION__;
88 HLOG_SELF(0, oss.str())
89 throw (std::runtime_error(oss.str()));
91 if (numberGraphs_ != deviceIds.size()) {
92 std::ostringstream oss;
94 <<
"The number of device Ids do not correspond to the number of coreGraph duplications you sent to the " 95 "execution pipeline \"" 97 <<
"\". Even if you do not associate the graphs duplicates to a specific device, please set the deviceIds " 98 "to the map to the number of duplicates specified." 101 HLOG_SELF(0, oss.str())
102 throw (std::runtime_error(oss.str()));
106 coreBaseGraph->graphId(0);
107 coreBaseGraph->deviceId(this->deviceIds_[0]);
130 [[nodiscard]] std::string
id()
const override {
return this->
coreSwitch_->id(); }
142 std::ostringstream oss;
143 oss <<
"Internal error, an execution pipeline has not device id: " << __FUNCTION__;
144 HLOG_SELF(0, oss.str())
145 throw (std::runtime_error(oss.str()));
151 std::set<CoreSender<GraphOutput> *>
153 std::set<CoreSender<GraphOutput> *>
158 senders = epGraph->getSenders();
177 std::ostringstream oss;
178 oss <<
"Internal error, a receiver added to an execution pipeline is not a coreQueueReceiver : " 180 HLOG_SELF(0, oss.str())
181 throw (std::runtime_error(oss.str()));
192 for (
auto graph : this->
epGraphs_) { graph->addSlot(slot); }
201 (this->printEdgeSwitchGraphs<GraphInputs>(printer, graph.get()), ...);
202 graph->visit(printer);
210 std::shared_ptr<CoreNode>>> &)
override {
212 epGraph->createInnerClustersAndLaunchThreads();
218 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
maxExecutionTime()
const override {
219 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min();
221 std::chrono::duration<uint64_t, std::micro> temp = graph->maxExecutionTime();
222 if (temp > ret) ret = temp;
229 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
minExecutionTime()
const override {
230 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max();
232 std::chrono::duration<uint64_t, std::micro> temp = graph->minExecutionTime();
233 if (temp < ret) ret = temp;
240 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
maxWaitTime()
const override {
241 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::min();
243 std::chrono::duration<uint64_t, std::micro> temp = graph->maxWaitTime();
244 if (temp > ret) ret = temp;
251 [[nodiscard]] std::chrono::duration<uint64_t, std::micro>
minWaitTime()
const override {
252 std::chrono::duration<uint64_t, std::micro> ret = std::chrono::duration<uint64_t, std::micro>::max();
254 std::chrono::duration<uint64_t, std::micro> temp = graph->minWaitTime();
255 if (temp < ret) ret = temp;
268 HLOG_SELF(2,
"callCanTerminate: " << std::boolalpha << result)
276 for (
size_t numberGraph = 1; numberGraph < this->
numberGraphs(); ++numberGraph) {
277 auto graphDuplicate =
279 graphDuplicate->
graphId(numberGraph);
280 graphDuplicate->deviceId(this->deviceIds_[numberGraph]);
288 template<
class GraphInput>
291 auto coreNotifier = std::static_pointer_cast<
CoreNotifier>(coreSender);
292 auto coreSlot = std::static_pointer_cast<
CoreSlot>(graph);
295 for (
auto r : coreReceiver->receivers()) { coreSender->addReceiver(r); }
296 for (
auto s : coreSender->getSenders()) {
298 coreSlot->addNotifier(s);
300 for (
CoreSlot *slot : coreSlot->
getSlots()) { coreNotifier->addSlot(slot); }
306 coreGraph->setInside();
307 coreGraph->belongingNode(
this);
308 coreGraph->hasBeenRegistered(
true);
309 (addEdgeSwitchGraph<GraphInputs>(coreGraph), ...);
333 template<
class GraphInput>
339 traits::type_name<GraphInput>(),
340 coreQueueReceiver->queueSize(),
341 coreQueueReceiver->maxQueueSize(),
342 traits::is_managed_memory_v<GraphInput>);
350 #endif //HEDGEHOG_CORE_EXECUTION_PIPELINE_H behavior::Node * node() override
Return the user's node.
Receiver Interface, receive one data type from CoreSender.
std::chrono::duration< uint64_t, std::micro > maxWaitTime() const override
Return the maximum wait time of all inside graphs.
virtual void printExecutionPipelineHeader(core::CoreNode *epNode, core::CoreNode *switchNode)=0
Print execution pipeline header.
std::vector< std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > > epGraphs_
Core Copies of the graphs (actual memory is stored here)
CoreExecutionPipeline()=delete
Deleted Default constructor.
std::chrono::duration< uint64_t, std::micro > minWaitTime() const override
Return the minimum wait time of all inside graphs.
bool hasNotifierConnected() final
Test if CoreNotifier are linked to this CoreQueueSlot.
std::chrono::duration< uint64_t, std::micro > minExecutionTime() const override
Return the minimum execution time of all inside graphs.
virtual std::set< CoreReceiver< Input > * > receivers()=0
Accessor to all receivers connected to this receiver.
std::shared_ptr< CoreSwitch< GraphInputs... > > coreSwitch_
Switch use to divert the data to the graphs.
std::chrono::duration< uint64_t, std::micro > maxExecutionTime() const override
Return the maximum execution time of all inside graphs.
Core Notifier interface, emit notification to CoreSlot.
void connectGraphsOutputToReceiver(CoreGraph< GraphOutput, GraphInputs... > *graph, CoreReceiver< GraphOutput > *coreReceiver)
Connect a CoreReceiver to all output of a graph.
void addSlot(CoreSlot *slot) override
Add a slot to a execution pipeline, i.e. to all inside graphs.
void printEdgeSwitchGraphs(AbstractPrinter *printer, CoreGraph< GraphOutput, GraphInputs... > *graph)
Print an edge for GraphInput input from the switch to all graph's input node.
std::unique_ptr< std::set< CoreNode * > > const & inputsCoreNodes() const
Input node's cores accessor.
void addEdgeSwitchGraph(std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> &graph)
Add data and notification link between switch and one inside graph.
AbstractExecutionPipeline< GraphOutput, GraphInputs... > * executionPipeline() const
User execution pipeline accessor.
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the execution pipeline, to all inside graphs.
virtual ~CoreExecutionPipeline()=default
Default destructor.
Node interface made for duplicating a graph, for example in the case of multi-GPU computation...
CoreExecutionPipeline(std::string_view const &name, AbstractExecutionPipeline< GraphOutput, GraphInputs... > *executionPipeline, std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> coreBaseGraph, size_t numberGraphs, std::vector< int > const &deviceIds, bool automaticStart)
The core execution pipeline constructor.
void addReceiver(CoreReceiver< GraphOutput > *receiver) override
Add a receiver to the graph, i.e, add a receiver to all output nodes.
Core switch, determine where to divert data to the graphs.
Node Behavior definition.
AbstractExecutionPipeline< GraphOutput, GraphInputs... > * executionPipeline_
User's execution pipeline.
Slot interface, receive notification from CoreNotifier.
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
size_t numberGraphs_
Total number of graphs in the execution pipeline.
Receiver for nodes possessing a queue of data.
int deviceId() override
Get a device id, not possible for an execution pipeline, throw an error in every case.
NodeType
Hedgehog node's type.
Sender interface, send data to CoreReceiver.
void mergeSenders(std::set< CoreSender< GraphOutput > * > &superSet, std::set< CoreSender< GraphOutput > *> &graphSenders)
Add the graph's senders to the super set.
size_t numberGraphs() const
Number of execution pipeline's graphs accessor.
void unlockUniqueMutex()
Unlock the mutex.
std::set< CoreSender< GraphOutput > * > getSenders() override
Execution pipeline's senders accessor.
Main Hedgehog core abstraction.
void createCluster([[maybe_unused]]std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &) override
Create inner graphs clusters and launch the threads.
virtual void printExecutionPipelineFooter()=0
Print execution pipeline footer.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
void visit(AbstractPrinter *printer) override
Special visit method for an execution pipeline, visit also all inside graphs.
std::vector< int > const & deviceIds() const
Device ids accessor.
std::string_view const & name() const
Node name accessor.
void lockUniqueMutex()
Lock the mutex.
bool receiversEmpty() final
Test emptiness of all receivers.
Core associated to the Graph.
bool callCanTerminate(bool lock) override
Can terminate for the ep, specialised to not call user's defined one.
bool automaticStart() const
Automatic start property accessor.
virtual void printEdgeSwitchGraphs(core::CoreNode *to, std::string const &idSwitch, std::string_view const &edgeType, size_t const &queueSize, size_t const &maxQueueSize, bool isMemoryManaged)=0
Print the edges from the switch representation to a node.
int graphId() override
Graph id accessor.
void duplicateGraphs()
Duplicate the graphs and link it to the switch.
virtual std::shared_ptr< std::set< CoreQueueReceiver< GraphOutput > *> > const & destinations() const
Destination accessor.
std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... > > baseCoreGraph()
Return the core of the base graph.
bool isInside() const
Node inside property accessor.
std::string id() const override
Execution pipeline id, i.e switch id accessor.
virtual void addSender(CoreSender< Input > *sender)=0
Interface to add a CoreSender to the receiver.
void connectGraphToEP(std::shared_ptr< CoreGraph< GraphOutput, GraphInputs... >> &coreGraph)
Connect a graph to the switch and register it.
std::vector< int > deviceIds_
Device id's value to set to the different graphs into the execution pipeline.