20 #ifndef HEDGEHOG_CORE_TASK_H 21 #define HEDGEHOG_CORE_TASK_H 24 #include <string_view> 26 #include "../../tools/traits.h" 27 #include "../io/queue/receiver/core_queue_multi_receivers.h" 28 #include "../io/queue/sender/core_queue_sender.h" 30 #include "../../behavior/execute.h" 32 #include "../../tools/logger.h" 33 #include "../../tools/nvtx_profiler.h" 34 #include "../behavior/core_execute.h" 36 #include "../../api/memory_manager/abstract_memory_manager.h" 41 #ifndef DOXYGEN_SHOULD_SKIP_THIS 42 template<
class TaskOutput,
class ...TaskInputs>
47 #endif //DOXYGEN_SHOULD_SKIP_THIS 55 template<
class TaskOutput,
class ...TaskInputs>
63 std::vector<std::shared_ptr<
AbstractTask<TaskOutput, TaskInputs...>>>
85 automaticStart_(automaticStart) {
86 HLOG_SELF(0,
"Creating CoreTask with task: " << task <<
" type: " << (
int) type <<
" and name: " << name)
87 nvtxProfiler_ = std::make_shared<NvtxProfiler>(this->
name());
92 HLOG_SELF(0,
"Destructing CoreTask")
119 HLOG_SELF(1,
"Visit")
130 HLOG_SELF(0,
"Copy cluster CoreTask information from " << rhs->
name() <<
"(" << rhs->
id() <<
")")
138 void createCluster(std::shared_ptr<std::multimap<
CoreNode *, std::shared_ptr<CoreNode>>> &insideNodesGraph)
override {
150 auto coreCopy = std::dynamic_pointer_cast<
CoreTask<TaskOutput, TaskInputs...>>(taskCopy->core());
152 coreCopy->coreClusterNode(
this);
153 coreCopy->copyInnerStructure(
this);
154 coreCopy->setInCluster();
155 insideNodesGraph->insert({
this, coreCopy});
162 std::chrono::time_point<std::chrono::high_resolution_clock>
173 start = std::chrono::high_resolution_clock::now();
175 finish = std::chrono::high_resolution_clock::now();
181 start = std::chrono::high_resolution_clock::now();
184 finish = std::chrono::high_resolution_clock::now();
187 if (canTerminate) {
break; }
188 start = std::chrono::high_resolution_clock::now();
190 (this->operateReceiver<TaskInputs>(), ...);
191 finish = std::chrono::high_resolution_clock::now();
208 HLOG_SELF(2,
"callCanTerminate: " << std::boolalpha << result)
215 template<
class Input>
217 HLOG_SELF(2,
"Operate receivers")
223 if (!receiver->receiverEmpty()) {
225 std::shared_ptr<Input> data = receiver->
popFront();
240 std::unique_lock<std::mutex> lock(*(this->
slotMutex()));
241 HLOG_SELF(2,
"Wait for notification")
247 "Check for notification: " << std::boolalpha
248 << (
bool) (!receiversEmpty) <<
"||" 250 << (
bool) callCanTerminate)
253 HLOG_SELF(2,
"Notification received")
261 HLOG_SELF(0,
"Copy Whole Task")
263 std::shared_ptr<
AbstractTask<TaskOutput, TaskInputs...>> sharedAbstractTaskCopy = this->
task()->
copy();
265 if (!sharedAbstractTaskCopy) {
266 std::ostringstream oss;
267 oss <<
"A copy for the task \"" << this->
name()
268 <<
"\" has been invoked but return nullptr. To fix this error, overload the AbstractTask::copy function and " 269 "return a valid object.";
270 HLOG_SELF(0, oss.str())
271 throw (std::runtime_error(oss.str()));
278 auto coreCopy = std::dynamic_pointer_cast<
CoreTask<TaskOutput, TaskInputs...>>(sharedAbstractTaskCopy->core());
280 if (coreCopy ==
nullptr) {
281 std::ostringstream oss;
282 oss <<
"A copy for the task " << this->
name()
283 <<
" has been invoked but the AbstractTask constructor has not been called. To fix this error, call the " 284 "AbstractTask constructor in your specialized task constructor.";
285 HLOG_SELF(0, oss.str())
286 throw (std::runtime_error(oss.str()));
291 coreCopy->threadId(this->
threadId());
292 coreCopy->isInside(
true);
293 if (this->
isInCluster()) { coreCopy->setInCluster(); }
297 if (this->
task()->memoryManager()) {
299 if (!copyMemoryManager) {
300 std::ostringstream oss;
301 oss <<
"An execution pipeline has been created with a graph that hold a task named \"" 303 <<
"\" connected to a memory manager. Or the memory manager does not have a compliant copy method. " 304 "Please implement it.";
305 HLOG_SELF(0, oss.str())
306 throw (std::runtime_error(oss.str()));
308 HLOG_SELF(0,
"Copy the MM " << this->
task()->memoryManager() <<
" to: " << copyMemoryManager
309 <<
" and set it to the task: " << coreCopy->
name() <<
" / " << coreCopy->id())
312 return sharedAbstractTaskCopy;
322 if constexpr (traits::is_managed_memory_v<TaskOutput>) {
324 HLOG_SELF(1,
"Copy the MM pointer " << mm
325 <<
" from: " << this->
name() <<
" / " << this->
id()
326 <<
" to: " << taskCopy->core()->name() <<
" / " << taskCopy->core()->id())
327 taskCopy->connectMemoryManager(mm);
336 #endif //HEDGEHOG_CORE_TASK_H NodeType type() const
Node type accessor.
void incrementExecutionDuration(std::chrono::duration< uint64_t, std::micro > const &exec)
Increment execution duration.
void connectMemoryManager(std::shared_ptr< AbstractMemoryManager< TaskOutput >> const &mm, std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... >> taskCopy)
Connect a memory manager to a task if the type is valid (HedgehogTraits::is_managed_memory_v<TaskOutp...
Receiver Interface, receive one data type from CoreSender.
Base node for computation.
behavior::Node * node() override
Node accessor.
bool automaticStart_
Automatic start property.
void copyInnerStructure(CoreQueueSender< NodeOutput > *rhs)
Copy the inner structure of a CoreQueueSender (destinations, and notifier)
Core Notifier interface, emit notification to CoreSlot.
std::shared_ptr< std::mutex > const & slotMutex() const
Mutex accessor.
std::vector< std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > > clusterAbstractTask_
Store all AbstractTask clones (hold memory)
virtual bool callCanTerminate(bool lock)
Call user-defined or default canTerminate and lock/unlock the mutex if lock is true.
void automaticStart(bool automaticStart)
Automatic start property accessor.
void setInCluster()
Set the task as part of a cluster.
bool isActive() const
Is active property accessor.
void visit(AbstractPrinter *printer) override
Special visit method for a CoreQueueSender, printing an edge.
std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > createCopyFromThis()
Create a copy from this instance.
Abstract interface for Hedgehog's Memory manager.
Sender for nodes possessing a queue of data.
virtual bool canTerminate()
Determine if the node can terminate.
~CoreTask() override
Core Task destructor.
Node Behavior definition.
Slot interface, receive notification from CoreNotifier.
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
std::shared_ptr< NodeInput > popFront()
Return the front element of the queue and return it.
int threadId() const
Thread id accessor.
Receiver for nodes possessing a queue of data.
void copyInnerStructure(CoreTask< TaskOutput, TaskInputs... > *rhs)
Copy the inner structure from rhs to this CoreTask.
void copyInnerStructure(CoreNode *rhs)
Copy inner structure from rhs nodes to this.
virtual std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > copy()
Default copy overload, fail if cluster need to be copied.
NodeType
Hedgehog node's type.
AbstractTask< TaskOutput, TaskInputs... > * task_
Task node pointer (just for reference)
virtual void preRun()
Method defining what to do before the run.
void unlockUniqueMutex()
Unlock the mutex.
Main Hedgehog core abstraction.
virtual void postRun()
Method defining what to do after the run.
Multi receivers for nodes possessing a queue of data.
std::shared_ptr< std::condition_variable > const & notifyConditionVariable() const
Condition variable accessor.
void createCluster(std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &insideNodesGraph) override
Create a cluster for a CoreTask.
bool isInCluster() const
In cluster property accessor.
void visit(AbstractPrinter *printer) override
Special visit method for a CoreTask.
Interface to call execute on the different nodes.
std::shared_ptr< AbstractMemoryManager< TaskOutput > > const & memoryManager() const
Task's memory manager accessor.
std::string_view const & name() const
Node name accessor.
void wakeUp() final
Wake up and notify a node connected to the condition variable CoreQueueSlot::notifyConditionVariable_...
void lockUniqueMutex()
Lock the mutex.
bool receiversEmpty() final
Test emptiness of all receivers.
bool automaticStart() const
Automatic start property accessor.
AbstractTask< TaskOutput, TaskInputs... > * task() const
Node accessor.
size_t numberThreads() const
Number of threads associated accessor.
void operateReceiver()
Operate a CoreTasks's receiver for a specific type, thread safe.
void incrementWaitDuration(std::chrono::duration< uint64_t, std::micro > const &wait)
Increment wait duration.
virtual void printNodeInformation(core::CoreNode *node)=0
Print node information.
std::shared_ptr< NvtxProfiler > & nvtxProfiler()
NVTX profiler accessor.
virtual void callExecute(std::shared_ptr< TaskInputs > data)=0
Wrapper to call the user-defined Execute::execute.
CoreTask(std::string_view const &name, size_t const numberThreads, NodeType const type, AbstractTask< TaskOutput, TaskInputs... > *task, bool automaticStart)
CoreTask constructor (Used for AbstractTask and StateManager)
std::string_view name()
Task's name accessor.
virtual std::string id() const
Unique Id accessor.
bool waitForNotification() override
Wait method for notification.
void run() override
Main loop for the CoreTask.
std::shared_ptr< NvtxProfiler > nvtxProfiler_
Store a nvtx profiler for the task.
size_t totalQueueSize() final
Sums the queue sizes for all receivers.