Hedgehog  3.1.0
A library to generate hybrid pipeline workflow systems
Loading...
Searching...
No Matches
core_task.h
Go to the documentation of this file.
1// NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2// software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3// derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4// or works. Modified works should carry a notice stating that you changed the software and should note the date and
5// nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6// source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7// EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9// WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10// CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11// THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12// are solely responsible for determining the appropriateness of using and distributing the software and you assume
13// all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14// with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15// operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16// damage to property. The software developed by NIST employees is not subject to copyright protection within the
17// United States.
18
19#ifndef HEDGEHOG_CORE_TASK_H
20#define HEDGEHOG_CORE_TASK_H
21
22#include <ostream>
23#include <sstream>
24
25#include "../../tools/concepts.h"
26#include "../../tools/traits.h"
27
28#include "../abstractions/base/clonable_abstraction.h"
29#include "../abstractions/base/groupable_abstraction.h"
30#include "../abstractions/base/cleanable_abstraction.h"
31#include "../abstractions/base/can_terminate_abstraction.h"
32
33#include "../abstractions/base/node/task_node_abstraction.h"
34
35#include "../abstractions/node/task_inputs_management_abstraction.h"
36#include "../abstractions/node/task_outputs_management_abstraction.h"
37
39namespace hh {
40
41#ifndef DOXYGEN_SHOULD_SKIP_THIS
45template<size_t Separator, class ...AllTypes>
46class AbstractTask;
47#endif //DOXYGEN_SHOULD_SKIP_THIS
48
50namespace core {
51
53template<size_t Separator, class ...AllTypes>
54using TIM = tool::TaskInputsManagementAbstractionTypeDeducer_t<tool::Inputs<Separator, AllTypes...>>;
55
57template<size_t Separator, class ...AllTypes>
58using TOM = tool::TaskOutputsManagementAbstractionTypeDeducer_t<tool::Outputs<Separator, AllTypes...>>;
59
63template<size_t Separator, class ...AllTypes>
69 public abstraction::GroupableAbstraction<AbstractTask<Separator, AllTypes...>, CoreTask<Separator, AllTypes...>>,
70 public TIM<Separator, AllTypes...>,
71 public TOM<Separator, AllTypes...> {
72
73 private:
74 AbstractTask<Separator, AllTypes...> *const
75 task_ = nullptr;
76
77 bool const
79
80 public:
84 TaskNodeAbstraction("Task", task),
85 CleanableAbstraction(static_cast<behavior::Cleanable *>(task)),
86 CanTerminateAbstraction(static_cast<behavior::CanTerminate *>(task)),
87 abstraction::GroupableAbstraction<AbstractTask<Separator, AllTypes...>, CoreTask<Separator, AllTypes...>>(task,
88 1),
89 TIM<Separator, AllTypes...>(task, this),
90 TOM<Separator, AllTypes...>(),
91 task_(task),
92 automaticStart_(false) {}
93
101 std::string const &name, size_t const numberThreads, bool const automaticStart) :
103 CleanableAbstraction(static_cast<behavior::Cleanable *>(task)),
104 CanTerminateAbstraction(static_cast<behavior::CanTerminate *>(task)),
105 abstraction::GroupableAbstraction<AbstractTask<Separator, AllTypes...>, CoreTask<Separator, AllTypes...>>(task,
107 TIM<Separator, AllTypes...>(task, this),
108 TOM<Separator, AllTypes...>(),
109 task_(task),
110 automaticStart_(automaticStart) {
111 if (this->numberThreads() == 0) { throw std::runtime_error("A task needs at least one thread."); }
112 }
113
128 template<class ConcreteMultiReceivers, class ConcreteMultiExecutes, class ConcreteMultiSenders>
130 std::string const &name, size_t const numberThreads, bool const automaticStart,
131 std::shared_ptr<implementor::ImplementorSlot> concreteSlot,
132 std::shared_ptr<ConcreteMultiReceivers> concreteMultiReceivers,
133 std::shared_ptr<ConcreteMultiExecutes> concreteMultiExecutes,
134 std::shared_ptr<implementor::ImplementorNotifier> concreteNotifier,
135 std::shared_ptr<ConcreteMultiSenders> concreteMultiSenders) :
137 CleanableAbstraction(static_cast<behavior::Cleanable *>(task)),
138 CanTerminateAbstraction(static_cast<behavior::CanTerminate *>(task)),
139 abstraction::GroupableAbstraction<AbstractTask<Separator, AllTypes...>, CoreTask<Separator, AllTypes...>>
140 (task, numberThreads),
141 TIM<Separator, AllTypes...>(this, concreteSlot, concreteMultiReceivers, concreteMultiExecutes),
142 TOM<Separator, AllTypes...>(concreteNotifier, concreteMultiSenders),
143 task_(task),
144 automaticStart_(automaticStart) {
145 if (this->numberThreads() == 0) { throw std::runtime_error("A task needs at least one thread."); }
146 }
147
149 ~CoreTask() override = default;
150
153 [[nodiscard]] std::shared_ptr<AbstractMemoryManager> memoryManager() const override {
154 return this->task_->memoryManager();
155 }
156
160 bool callCanTerminate(bool lock) override {
161 bool result;
162 if (lock) { this->lockSlotMutex(); }
163 result = this->callNodeCanTerminate();
164 if (lock) { this->unlockSlotMutex(); }
165 return result;
166 };
167
170 void preRun() override {
171 this->nvtxProfiler()->startRangeInitializing();
172 this->task_->initialize();
173 if (this->task_->memoryManager() != nullptr) {
174 this->task_->memoryManager()->profiler(this->nvtxProfiler());
175 this->task_->memoryManager()->deviceId(this->deviceId());
176 this->task_->memoryManager()->initialize();
177 }
178 this->nvtxProfiler()->endRangeInitializing();
179 this->setInitialized();
180 }
181
193 void run() override {
194 std::chrono::time_point<std::chrono::system_clock>
195 start,
196 finish;
197
198 this->isActive(true);
199 this->nvtxProfiler()->initialize(this->threadId());
200 this->preRun();
201
202 if (this->automaticStart_) {
203 start = std::chrono::system_clock::now();
204 this->callAllExecuteWithNullptr();
205 finish = std::chrono::system_clock::now();
206 this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
207 }
208
209 // Actual computation loop
210 while (!this->callCanTerminate(true)) {
211 // Wait for a data to arrive or termination
212 start = std::chrono::system_clock::now();
213 volatile bool canTerminate = this->wait();
214 finish = std::chrono::system_clock::now();
215 this->incrementWaitDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
216
217 // If loop can terminate break the loop early
218 if (canTerminate) { break; }
219
220 // Operate the connectedReceivers to get a data and send it to execute
221 start = std::chrono::system_clock::now();
222 this->operateReceivers();
223 finish = std::chrono::system_clock::now();
224 this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
225 }
226
227 // Do the shutdown phase
228 this->postRun();
229 // Wake up node that this is linked to
230 this->wakeUp();
231 }
232
235 void postRun() override {
236 this->nvtxProfiler()->startRangeShuttingDown();
237 this->isActive(false);
238 this->task_->shutdown();
239 this->notifyAllTerminated();
240 this->nvtxProfiler()->endRangeShuttingDown();
241 }
242
245 bool wait() {
246 this->nvtxProfiler()->startRangeWaiting(this->totalNumberElementsReceived());
247 std::unique_lock<std::mutex> lock(*(this->mutex()));
248 this->slotConditionVariable()->wait(
249 lock,
250 [this]() { return !this->receiversEmpty() || this->callCanTerminate(false); }
251 );
252 this->nvtxProfiler()->endRangeWaiting();
253 return callCanTerminate(false);
254 }
255
259 void createGroup(std::map<NodeAbstraction *, std::vector<NodeAbstraction *>> &map) override {
260 abstraction::SlotAbstraction *coreCopyAsSlot;
261 abstraction::NotifierAbstraction *coreCopyAsNotifier;
262
263 for (size_t threadId = 1; threadId < this->numberThreads(); ++threadId) {
264 auto taskCopy = this->callCopyAndRegisterInGroup();
265
266 if (taskCopy == nullptr) {
267 std::ostringstream oss;
268 oss << "A copy for the task \"" << this->name()
269 << "\" has been invoked but return nullptr. To fix this error, overload the AbstractTask::copy function and "
270 "return a valid object.";
271 throw (std::runtime_error(oss.str()));
272 }
273
274 // Copy the memory manager
275 taskCopy->connectMemoryManager(this->task_->memoryManager());
276
277 auto taskCoreCopy = dynamic_cast<CoreTask<Separator, AllTypes...> *>(taskCopy->core().get());
278
279 if (taskCoreCopy == nullptr) {
280 std::ostringstream oss;
281 oss << "A copy for the task \"" << this->name()
282 << "\" does not have the same type of cores than the original task.";
283 throw (std::runtime_error(oss.str()));
284 }
285
286 // Deal with the group registration in the graph
287 map.at(static_cast<NodeAbstraction *>(this)).push_back(taskCoreCopy);
288
289 // Copy inner structures
290 taskCoreCopy->copyInnerStructure(this);
291
292 // Make necessary connections
293 coreCopyAsSlot = static_cast<abstraction::SlotAbstraction *>(taskCoreCopy);
294 coreCopyAsNotifier = static_cast<abstraction::NotifierAbstraction *>(taskCoreCopy);
295
296 for (auto predecessorNotifier : static_cast<abstraction::SlotAbstraction *>(this)->connectedNotifiers()) {
297 for (auto notifier : predecessorNotifier->notifiers()) {
298 for (auto slot : coreCopyAsSlot->slots()) {
299 slot->addNotifier(notifier);
300 notifier->addSlot(slot);
301 }
302 }
303 }
304
305 for (auto successorSlot : static_cast<abstraction::NotifierAbstraction *>(this)->connectedSlots()) {
306 for (auto slot : successorSlot->slots()) {
307 for (auto notifier : coreCopyAsNotifier->notifiers()) {
308 slot->addNotifier(notifier);
309 notifier->addSlot(slot);
310 }
311 }
312 }
313 }
314 }
315
318 [[nodiscard]] bool hasMemoryManagerAttached() const override { return this->memoryManager() != nullptr; }
319
322 [[nodiscard]] std::string extraPrintingInformation() const override {
323 return this->task_->extraPrintingInformation();
324 }
325
331 }
332
335 [[nodiscard]] std::vector<std::pair<std::string const, std::string const>> ids() const override {
336 return {{this->id(), this->groupRepresentativeId()}};
337 }
340 void visit(Printer *printer) override {
341 if (printer->registerNode(this)) {
342 printer->printNodeInformation(this);
344 }
345 }
346
350 std::shared_ptr<abstraction::NodeAbstraction> clone(
351 [[maybe_unused]] std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap) override {
352 auto clone = std::dynamic_pointer_cast<AbstractTask<Separator, AllTypes...>>(this->callCopy());
353 if (this->hasMemoryManagerAttached()) { clone->connectMemoryManager(this->memoryManager()->copy()); }
354 return clone->core();
355 }
356
359 void duplicateEdge(std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &mapping) override {
360 this->duplicateOutputEdges(mapping);
361 }
362
363};
364}
365}
366#endif //HEDGEHOG_CORE_TASK_H
Hedgehog main namespace.
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::TaskOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > TOM
Type alias for an TaskOutputsManagementAbstraction from the list of template parameters.
tool::TaskInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > TIM
Type alias for an TaskInputsManagementAbstraction from the list of template parameters.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
Definition: printer.h:52
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
Definition: printer.h:113
virtual void printNodeInformation(core::abstraction::NodeAbstraction const *node)=0
Print node information.
Base node for computation.
virtual std::string extraPrintingInformation() const
Print extra information for the task.
Definition: task_node.h:89
virtual void initialize()
initialize step for the task
Definition: task_node.h:82
virtual void shutdown()
shutdown step for the task
Definition: task_node.h:85
std::shared_ptr< AbstractMemoryManager > const & memoryManager() const
Memory manager accessor.
Definition: task_node.h:53
std::string groupRepresentativeId() const
Group id representative accessor.
size_t numberThreads() const
Accessor to the number of threads.
Abstraction for core that present termination condition.
bool callNodeCanTerminate() const
Call user-definable termination.
CanTerminateAbstraction(behavior::CanTerminate *const canTerminateNode)
Constructor using behavior::CanTerminate node abstraction.
CleanableAbstraction()=default
Constructor used by the CoreGraph to have the handles to clean inner cleanable nodes.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
std::shared_ptr< CopyableNode > callCopy()
Interface to call user-defined copy method.
Typed abstraction for groupable node.
GroupableAbstraction(CopyableNode *const copyableNode, size_t const &numberThreads)
Constructor using the node abstraction to call the user-defined copy and the number of threads.
std::shared_ptr< CopyableNode > callCopyAndRegisterInGroup()
Call the used-defined copy and register the copy in the group.
Core abstraction to notify slots.
void addSlot(SlotAbstraction *const slot)
Add a SlotAbstraction to this notifier.
std::set< NotifierAbstraction * > const & notifiers() const
Const accessor to notifiers.
std::set< SlotAbstraction * > const & connectedSlots() const
Accessor to the slots attached to this notifier.
Core's abstraction to receive a signal.
std::set< NotifierAbstraction * > const & connectedNotifiers() const
Accessor to the NotifierAbstraction attached to this slot, protected with mutex.
std::set< SlotAbstraction * > const & slots() const
Const accessor to slots.
virtual std::string id() const
Core's id ('x' + address of abstraction) as string.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
std::string const & name() const
Accessor to the core's name.
virtual int deviceId() const
Get the device identifier (got from belonging graph)
Task core abstraction used to define some method for task-like behaving cores like CoreExecutionPipel...
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
TaskNodeAbstraction(std::string const &name, behavior::Node *node)
Create the abstraction with the node's name.
std::shared_ptr< NvtxProfiler > const & nvtxProfiler() const
Accessor to the NVTX profiler attached to the node.
bool isActive() const
Accessor to task status.
Task core.
Definition: core_task.h:71
void createGroup(std::map< NodeAbstraction *, std::vector< NodeAbstraction * > > &map) override
Create a group for this task, and connect each copies to the predecessor and successor nodes.
Definition: core_task.h:259
void postRun() override
When a task terminates, set the task to non active, call user-defined shutdown, and disconnect the ta...
Definition: core_task.h:235
void run() override
Main core task logic.
Definition: core_task.h:193
void visit(Printer *printer) override
Visit the task.
Definition: core_task.h:340
~CoreTask() override=default
Default destructor.
void preRun() override
Initialize the task.
Definition: core_task.h:170
std::vector< std::pair< std::string const, std::string const > > ids() const override
Node ids [nodeId, nodeGroupId] accessor.
Definition: core_task.h:335
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate a task when it is part of another graph in an execution pipeline.
Definition: core_task.h:350
bool wait()
Wait statement when the node is alive and no input data are available.
Definition: core_task.h:245
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the task edge.
Definition: core_task.h:359
std::string extraPrintingInformation() const override
Accessor to user-defined extra information for the task.
Definition: core_task.h:322
std::shared_ptr< AbstractMemoryManager > memoryManager() const override
Accessor to the memory manager.
Definition: core_task.h:153
AbstractTask< Separator, AllTypes... > *const task_
User defined task.
Definition: core_task.h:75
CoreTask(AbstractTask< Separator, AllTypes... > *const task)
Create a CoreTask from a user-defined AbstractTask with one thread.
Definition: core_task.h:83
CoreTask(AbstractTask< Separator, AllTypes... > *const task, std::string const &name, size_t const numberThreads, bool const automaticStart)
Create a CoreTask from a user-defined AbstractTask, its name, the number of threads and the automatic...
Definition: core_task.h:100
void copyInnerStructure(CoreTask< Separator, AllTypes... > *copyableCore) override
Copy task's inner structure.
Definition: core_task.h:328
CoreTask(AbstractTask< Separator, AllTypes... > *const task, std::string const &name, size_t const numberThreads, bool const automaticStart, std::shared_ptr< implementor::ImplementorSlot > concreteSlot, std::shared_ptr< ConcreteMultiReceivers > concreteMultiReceivers, std::shared_ptr< ConcreteMultiExecutes > concreteMultiExecutes, std::shared_ptr< implementor::ImplementorNotifier > concreteNotifier, std::shared_ptr< ConcreteMultiSenders > concreteMultiSenders)
Construct a task from the user-defined task and its concrete abstraction's implementations.
Definition: core_task.h:129
bool callCanTerminate(bool lock) override
Call user-definable termination.
Definition: core_task.h:160
bool hasMemoryManagerAttached() const override
Test if a memory manager is attached.
Definition: core_task.h:318
bool const automaticStart_
Flag for automatic start.
Definition: core_task.h:78