19#ifndef HEDGEHOG_CORE_TASK_H
20#define HEDGEHOG_CORE_TASK_H
25#include "../../tools/concepts.h"
26#include "../../tools/traits.h"
28#include "../abstractions/base/clonable_abstraction.h"
29#include "../abstractions/base/groupable_abstraction.h"
30#include "../abstractions/base/cleanable_abstraction.h"
31#include "../abstractions/base/can_terminate_abstraction.h"
33#include "../abstractions/base/node/task_node_abstraction.h"
35#include "../abstractions/node/task_inputs_management_abstraction.h"
36#include "../abstractions/node/task_outputs_management_abstraction.h"
41#ifndef DOXYGEN_SHOULD_SKIP_THIS
45template<
size_t Separator,
class ...AllTypes>
53template<
size_t Separator,
class ...AllTypes>
54using TIM = tool::TaskInputsManagementAbstractionTypeDeducer_t<
tool::Inputs<Separator, AllTypes...>>;
57template<
size_t Separator,
class ...AllTypes>
58using TOM = tool::TaskOutputsManagementAbstractionTypeDeducer_t<
tool::Outputs<Separator, AllTypes...>>;
63template<
size_t Separator,
class ...AllTypes>
70 public TIM<Separator, AllTypes...>,
71 public TOM<Separator, AllTypes...> {
89 TIM<Separator, AllTypes...>(task, this),
90 TOM<Separator, AllTypes...>(),
101 std::string
const &
name,
size_t const numberThreads,
bool const automaticStart) :
107 TIM<Separator, AllTypes...>(task, this),
108 TOM<Separator, AllTypes...>(),
111 if (this->
numberThreads() == 0) {
throw std::runtime_error(
"A task needs at least one thread."); }
128 template<
class ConcreteMultiReceivers,
class ConcreteMultiExecutes,
class ConcreteMultiSenders>
131 std::shared_ptr<implementor::ImplementorSlot> concreteSlot,
132 std::shared_ptr<ConcreteMultiReceivers> concreteMultiReceivers,
133 std::shared_ptr<ConcreteMultiExecutes> concreteMultiExecutes,
134 std::shared_ptr<implementor::ImplementorNotifier> concreteNotifier,
135 std::shared_ptr<ConcreteMultiSenders> concreteMultiSenders) :
141 TIM<Separator, AllTypes...>(this, concreteSlot, concreteMultiReceivers, concreteMultiExecutes),
142 TOM<Separator, AllTypes...>(concreteNotifier, concreteMultiSenders),
145 if (this->
numberThreads() == 0) {
throw std::runtime_error(
"A task needs at least one thread."); }
153 [[nodiscard]] std::shared_ptr<AbstractMemoryManager>
memoryManager()
const override {
162 if (lock) { this->lockSlotMutex(); }
164 if (lock) { this->unlockSlotMutex(); }
194 std::chrono::time_point<std::chrono::system_clock>
203 start = std::chrono::system_clock::now();
204 this->callAllExecuteWithNullptr();
205 finish = std::chrono::system_clock::now();
212 start = std::chrono::system_clock::now();
213 volatile bool canTerminate = this->
wait();
214 finish = std::chrono::system_clock::now();
218 if (canTerminate) {
break; }
221 start = std::chrono::system_clock::now();
222 this->operateReceivers();
223 finish = std::chrono::system_clock::now();
239 this->notifyAllTerminated();
246 this->
nvtxProfiler()->startRangeWaiting(this->totalNumberElementsReceived());
247 std::unique_lock<std::mutex> lock(*(this->mutex()));
248 this->slotConditionVariable()->wait(
250 [
this]() {
return !this->receiversEmpty() || this->
callCanTerminate(
false); }
266 if (taskCopy ==
nullptr) {
267 std::ostringstream oss;
268 oss <<
"A copy for the task \"" << this->
name()
269 <<
"\" has been invoked but return nullptr. To fix this error, overload the AbstractTask::copy function and "
270 "return a valid object.";
271 throw (std::runtime_error(oss.str()));
277 auto taskCoreCopy =
dynamic_cast<CoreTask<Separator, AllTypes...
> *>(taskCopy->core().get());
279 if (taskCoreCopy ==
nullptr) {
280 std::ostringstream oss;
281 oss <<
"A copy for the task \"" << this->
name()
282 <<
"\" does not have the same type of cores than the original task.";
283 throw (std::runtime_error(oss.str()));
290 taskCoreCopy->copyInnerStructure(
this);
297 for (
auto notifier : predecessorNotifier->notifiers()) {
298 for (
auto slot : coreCopyAsSlot->
slots()) {
299 slot->addNotifier(notifier);
306 for (
auto slot : successorSlot->slots()) {
307 for (
auto notifier : coreCopyAsNotifier->
notifiers()) {
308 slot->addNotifier(notifier);
309 notifier->addSlot(slot);
335 [[nodiscard]] std::vector<std::pair<std::string const, std::string const>>
ids()
const override {
350 std::shared_ptr<abstraction::NodeAbstraction>
clone(
351 [[maybe_unused]] std::map<
NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap)
override {
354 return clone->core();
360 this->duplicateOutputEdges(mapping);
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::TaskOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > TOM
Type alias for an TaskOutputsManagementAbstraction from the list of template parameters.
tool::TaskInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > TIM
Type alias for an TaskInputsManagementAbstraction from the list of template parameters.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
virtual void printNodeInformation(core::abstraction::NodeAbstraction const *node)=0
Print node information.
Base node for computation.
virtual std::string extraPrintingInformation() const
Print extra information for the task.
virtual void initialize()
initialize step for the task
virtual void shutdown()
shutdown step for the task
std::shared_ptr< AbstractMemoryManager > const & memoryManager() const
Memory manager accessor.
std::string groupRepresentativeId() const
Group id representative accessor.
size_t numberThreads() const
Accessor to the number of threads.
Abstraction for core that present termination condition.
bool callNodeCanTerminate() const
Call user-definable termination.
CanTerminateAbstraction(behavior::CanTerminate *const canTerminateNode)
Constructor using behavior::CanTerminate node abstraction.
Abstraction for cleanable core.
CleanableAbstraction()=default
Constructor used by the CoreGraph to have the handles to clean inner cleanable nodes.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
std::shared_ptr< CopyableNode > callCopy()
Interface to call user-defined copy method.
Typed abstraction for groupable node.
GroupableAbstraction(CopyableNode *const copyableNode, size_t const &numberThreads)
Constructor using the node abstraction to call the user-defined copy and the number of threads.
std::shared_ptr< CopyableNode > callCopyAndRegisterInGroup()
Call the used-defined copy and register the copy in the group.
int threadId() const
Accessor to thread id.
Core abstraction to notify slots.
void addSlot(SlotAbstraction *const slot)
Add a SlotAbstraction to this notifier.
std::set< NotifierAbstraction * > const & notifiers() const
Const accessor to notifiers.
std::set< SlotAbstraction * > const & connectedSlots() const
Accessor to the slots attached to this notifier.
Core's abstraction to receive a signal.
std::set< NotifierAbstraction * > const & connectedNotifiers() const
Accessor to the NotifierAbstraction attached to this slot, protected with mutex.
std::set< SlotAbstraction * > const & slots() const
Const accessor to slots.
virtual std::string id() const
Core's id ('x' + address of abstraction) as string.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
std::string const & name() const
Accessor to the core's name.
virtual int deviceId() const
Get the device identifier (got from belonging graph)
Task core abstraction used to define some method for task-like behaving cores like CoreExecutionPipel...
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
TaskNodeAbstraction(std::string const &name, behavior::Node *node)
Create the abstraction with the node's name.
std::shared_ptr< NvtxProfiler > const & nvtxProfiler() const
Accessor to the NVTX profiler attached to the node.
bool isActive() const
Accessor to task status.
void createGroup(std::map< NodeAbstraction *, std::vector< NodeAbstraction * > > &map) override
Create a group for this task, and connect each copies to the predecessor and successor nodes.
void postRun() override
When a task terminates, set the task to non active, call user-defined shutdown, and disconnect the ta...
void run() override
Main core task logic.
void visit(Printer *printer) override
Visit the task.
~CoreTask() override=default
Default destructor.
void preRun() override
Initialize the task.
std::vector< std::pair< std::string const, std::string const > > ids() const override
Node ids [nodeId, nodeGroupId] accessor.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate a task when it is part of another graph in an execution pipeline.
bool wait()
Wait statement when the node is alive and no input data are available.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the task edge.
std::string extraPrintingInformation() const override
Accessor to user-defined extra information for the task.
std::shared_ptr< AbstractMemoryManager > memoryManager() const override
Accessor to the memory manager.
AbstractTask< Separator, AllTypes... > *const task_
User defined task.
CoreTask(AbstractTask< Separator, AllTypes... > *const task)
Create a CoreTask from a user-defined AbstractTask with one thread.
CoreTask(AbstractTask< Separator, AllTypes... > *const task, std::string const &name, size_t const numberThreads, bool const automaticStart)
Create a CoreTask from a user-defined AbstractTask, its name, the number of threads and the automatic...
void copyInnerStructure(CoreTask< Separator, AllTypes... > *copyableCore) override
Copy task's inner structure.
CoreTask(AbstractTask< Separator, AllTypes... > *const task, std::string const &name, size_t const numberThreads, bool const automaticStart, std::shared_ptr< implementor::ImplementorSlot > concreteSlot, std::shared_ptr< ConcreteMultiReceivers > concreteMultiReceivers, std::shared_ptr< ConcreteMultiExecutes > concreteMultiExecutes, std::shared_ptr< implementor::ImplementorNotifier > concreteNotifier, std::shared_ptr< ConcreteMultiSenders > concreteMultiSenders)
Construct a task from the user-defined task and its concrete abstraction's implementations.
bool callCanTerminate(bool lock) override
Call user-definable termination.
bool hasMemoryManagerAttached() const override
Test if a memory manager is attached.
bool const automaticStart_
Flag for automatic start.