Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_task.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 
19 
20 #ifndef HEDGEHOG_CORE_TASK_H
21 #define HEDGEHOG_CORE_TASK_H
22 
23 #include <variant>
24 #include <string_view>
25 
26 #include "../../tools/traits.h"
27 #include "../io/queue/receiver/core_queue_multi_receivers.h"
28 #include "../io/queue/sender/core_queue_sender.h"
29 
30 #include "../../behavior/execute.h"
31 
32 #include "../../tools/logger.h"
33 #include "../../tools/nvtx_profiler.h"
34 #include "../behavior/core_execute.h"
35 
36 #include "../../api/memory_manager/abstract_memory_manager.h"
37 
39 namespace hh {
40 
41 #ifndef DOXYGEN_SHOULD_SKIP_THIS
42 template<class TaskOutput, class ...TaskInputs>
46 class AbstractTask;
47 #endif //DOXYGEN_SHOULD_SKIP_THIS
48 
50 namespace core {
51 
55 template<class TaskOutput, class ...TaskInputs>
56 class CoreTask
57  : public virtual CoreQueueSender<TaskOutput>,
58  public CoreQueueMultiReceivers<TaskInputs...>,
59  public virtual CoreExecute<TaskInputs> ... {
60  private:
61  AbstractTask<TaskOutput, TaskInputs...> *task_ = nullptr;
62  bool automaticStart_ = false;
63  std::vector<std::shared_ptr<AbstractTask<TaskOutput, TaskInputs...>>>
65 
66  std::shared_ptr<NvtxProfiler> nvtxProfiler_ = nullptr;
67 
68  public:
75  CoreTask(std::string_view const &name,
76  size_t const numberThreads,
77  NodeType const type,
79  bool automaticStart) : CoreNode(name, type, numberThreads),
80  CoreNotifier(name, type, numberThreads),
81  CoreQueueSender<TaskOutput>(name, type, numberThreads),
82  CoreSlot(name, type, numberThreads),
83  CoreReceiver<TaskInputs>(name, type, numberThreads)...,
84  CoreQueueMultiReceivers<TaskInputs...>(name, type, numberThreads), task_(task),
85  automaticStart_(automaticStart) {
86  HLOG_SELF(0, "Creating CoreTask with task: " << task << " type: " << (int) type << " and name: " << name)
87  nvtxProfiler_ = std::make_shared<NvtxProfiler>(this->name());
88  }
89 
91  ~CoreTask() override {
92  HLOG_SELF(0, "Destructing CoreTask")
93  task_ = nullptr;
94  }
95 
98  [[nodiscard]] bool automaticStart() const { return automaticStart_; }
99 
102  std::shared_ptr<NvtxProfiler> &nvtxProfiler() { return nvtxProfiler_; }
103 
106  behavior::Node *node() override { return task_; }
107 
110  AbstractTask<TaskOutput, TaskInputs...> *task() const { return task_; }
111 
114  void automaticStart(bool automaticStart) { automaticStart_ = automaticStart; }
115 
118  void visit(AbstractPrinter *printer) override {
119  HLOG_SELF(1, "Visit")
120  if (printer->hasNotBeenVisited(this)) {
121  printer->printNodeInformation(this);
122  // Print the CoreQueueSender informations, i.e. the edges
124  }
125  }
126 
130  HLOG_SELF(0, "Copy cluster CoreTask information from " << rhs->name() << "(" << rhs->id() << ")")
134  }
135 
138  void createCluster(std::shared_ptr<std::multimap<CoreNode *, std::shared_ptr<CoreNode>>> &insideNodesGraph) override {
139  auto mm = this->task()->memoryManager();
140  // Set cluster property
141  if (this->numberThreads() > 1) { this->setInCluster(); }
142 
143  // For each copy
144  for (size_t threadId = 1; threadId < this->numberThreads(); threadId++) {
145  // Copy the task
146  auto taskCopy = createCopyFromThis();
147  // Duplicate the memory manager, each of them will have a separate instance
148  connectMemoryManager(mm, taskCopy);
149  // Set property
150  auto coreCopy = std::dynamic_pointer_cast<CoreTask<TaskOutput, TaskInputs...>>(taskCopy->core());
151  coreCopy->threadId(threadId);
152  coreCopy->coreClusterNode(this);
153  coreCopy->copyInnerStructure(this);
154  coreCopy->setInCluster();
155  insideNodesGraph->insert({this, coreCopy});
156  }
157  }
158 
160  void run() override {
161  HLOG_SELF(2, "Run")
162  std::chrono::time_point<std::chrono::high_resolution_clock>
163  start,
164  finish;
165 
166  this->isActive(true);
167  this->nvtxProfiler()->initialize(this->threadId());
168  // Do the initialization phase
169  this->preRun();
170 
171  // If automatic start is enable send nullptr to all input nodes and wake them up
172  if (this->automaticStart()) {
173  start = std::chrono::high_resolution_clock::now();
174  (static_cast<CoreExecute<TaskInputs> *>(this)->callExecute(nullptr), ...);
175  finish = std::chrono::high_resolution_clock::now();
176  this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::microseconds>(finish - start));
177  }
178 
179  // Actual computation loop
180  while (!this->callCanTerminate(true)) {
181  start = std::chrono::high_resolution_clock::now();
182  // Wait for a data to arrive or termination
183  volatile bool canTerminate = this->waitForNotification();
184  finish = std::chrono::high_resolution_clock::now();
185  this->incrementWaitDuration(std::chrono::duration_cast<std::chrono::microseconds>(finish - start));
186  // If can terminate break the loop early
187  if (canTerminate) { break; }
188  start = std::chrono::high_resolution_clock::now();
189  // Operate the receivers to get a data and send it to execute
190  (this->operateReceiver<TaskInputs>(), ...);
191  finish = std::chrono::high_resolution_clock::now();
192  this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::microseconds>(finish - start));
193  }
194 
195  // Do the shutdown phase
196  this->postRun();
197  // Wake up node that this is linked to
198  this->wakeUp();
199  }
200 
204  virtual bool callCanTerminate(bool lock) {
205  bool result;
206  if (lock) { this->lockUniqueMutex(); }
207  result = this->node()->canTerminate();
208  HLOG_SELF(2, "callCanTerminate: " << std::boolalpha << result)
209  if (lock) { this->unlockUniqueMutex(); }
210  return result;
211  };
212 
215  template<class Input>
217  HLOG_SELF(2, "Operate receivers")
218  // Lock the mutex
219  this->lockUniqueMutex();
220  // Get the receiver with the right type
221  auto receiver = static_cast<CoreQueueReceiver<Input> *>(this);
222  // If receiver's queue not empty
223  if (!receiver->receiverEmpty()) {
224  // Get the data
225  std::shared_ptr<Input> data = receiver->popFront();
226  this->unlockUniqueMutex();
227  // Call execute
228  static_cast<CoreExecute<Input> *>(this)->callExecute(data);
229  } else {
230  // Unlock the mutex
231  this->unlockUniqueMutex();
232  }
233  }
234 
238  bool waitForNotification() override {
239  this->nvtxProfiler()->startRangeWaiting(this->totalQueueSize());
240  std::unique_lock<std::mutex> lock(*(this->slotMutex()));
241  HLOG_SELF(2, "Wait for notification")
242  this->notifyConditionVariable()->wait(lock,
243  [this]() {
244  bool receiversEmpty = this->receiversEmpty();
245  bool callCanTerminate = this->callCanTerminate(false);
246  HLOG_SELF(2,
247  "Check for notification: " << std::boolalpha
248  << (bool) (!receiversEmpty) << "||"
249  << std::boolalpha
250  << (bool) callCanTerminate)
251  return !receiversEmpty || callCanTerminate;
252  });
253  HLOG_SELF(2, "Notification received")
254  this->nvtxProfiler()->endRangeWaiting();
255  return callCanTerminate(false);
256  }
257 
260  std::shared_ptr<AbstractTask<TaskOutput, TaskInputs...>> createCopyFromThis() {
261  HLOG_SELF(0, "Copy Whole Task")
262 
263  std::shared_ptr<AbstractTask<TaskOutput, TaskInputs...>> sharedAbstractTaskCopy = this->task()->copy();
264  // The copy method has not been redefined and return nullptr by default
265  if (!sharedAbstractTaskCopy) {
266  std::ostringstream oss;
267  oss << "A copy for the task \"" << this->name()
268  << "\" has been invoked but return nullptr. To fix this error, overload the AbstractTask::copy function and "
269  "return a valid object.";
270  HLOG_SELF(0, oss.str())
271  throw (std::runtime_error(oss.str()));
272  }
273 
274  // Add the copy into the cluster
275  this->clusterAbstractTask_.push_back(sharedAbstractTaskCopy);
276 
277  // Get the core from the copy
278  auto coreCopy = std::dynamic_pointer_cast<CoreTask<TaskOutput, TaskInputs...>>(sharedAbstractTaskCopy->core());
279 
280  if (coreCopy == nullptr) {
281  std::ostringstream oss;
282  oss << "A copy for the task " << this->name()
283  << " has been invoked but the AbstractTask constructor has not been called. To fix this error, call the "
284  "AbstractTask constructor in your specialized task constructor.";
285  HLOG_SELF(0, oss.str())
286  throw (std::runtime_error(oss.str()));
287  }
288 
289  // Copy the property
290  coreCopy->automaticStart_ = this->automaticStart();
291  coreCopy->threadId(this->threadId());
292  coreCopy->isInside(true);
293  if (this->isInCluster()) { coreCopy->setInCluster(); }
294  coreCopy->numberThreads(this->numberThreads());
295 
296  // Copy the memory manager
297  if (this->task()->memoryManager()) {
298  auto copyMemoryManager = this->task()->memoryManager()->copy();
299  if (!copyMemoryManager) {
300  std::ostringstream oss;
301  oss << "An execution pipeline has been created with a graph that hold a task named \""
302  << this->name()
303  << "\" connected to a memory manager. Or the memory manager does not have a compliant copy method. "
304  "Please implement it.";
305  HLOG_SELF(0, oss.str())
306  throw (std::runtime_error(oss.str()));
307  }
308  HLOG_SELF(0, "Copy the MM " << this->task()->memoryManager() << " to: " << copyMemoryManager
309  << " and set it to the task: " << coreCopy->name() << " / " << coreCopy->id())
310  connectMemoryManager(copyMemoryManager, sharedAbstractTaskCopy);
311  }
312  return sharedAbstractTaskCopy;
313  }
314 
315  private:
321  std::shared_ptr<AbstractTask<TaskOutput, TaskInputs...>> taskCopy) {
322  if constexpr (traits::is_managed_memory_v<TaskOutput>) {
323  if (mm) {
324  HLOG_SELF(1, "Copy the MM pointer " << mm
325  << " from: " << this->name() << " / " << this->id()
326  << " to: " << taskCopy->core()->name() << " / " << taskCopy->core()->id())
327  taskCopy->connectMemoryManager(mm);
328  }
329  }
330  }
331 
332 };
333 
334 }
335 }
336 #endif //HEDGEHOG_CORE_TASK_H
NodeType type() const
Node type accessor.
Definition: core_node.h:132
void incrementExecutionDuration(std::chrono::duration< uint64_t, std::micro > const &exec)
Increment execution duration.
Definition: core_node.h:565
void connectMemoryManager(std::shared_ptr< AbstractMemoryManager< TaskOutput >> const &mm, std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... >> taskCopy)
Connect a memory manager to a task if the type is valid (HedgehogTraits::is_managed_memory_v<TaskOutp...
Definition: core_task.h:320
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
Base node for computation.
Definition: abstract_task.h:76
behavior::Node * node() override
Node accessor.
Definition: core_task.h:106
bool automaticStart_
Automatic start property.
Definition: core_task.h:62
void copyInnerStructure(CoreQueueSender< NodeOutput > *rhs)
Copy the inner structure of a CoreQueueSender (destinations, and notifier)
Core Notifier interface, emit notification to CoreSlot.
Definition: core_notifier.h:34
std::shared_ptr< std::mutex > const & slotMutex() const
Mutex accessor.
std::vector< std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > > clusterAbstractTask_
Store all AbstractTask clones (hold memory)
Definition: core_task.h:64
virtual bool callCanTerminate(bool lock)
Call user-defined or default canTerminate and lock/unlock the mutex if lock is true.
Definition: core_task.h:204
void automaticStart(bool automaticStart)
Automatic start property accessor.
Definition: core_task.h:114
Hedgehog main namespace.
void setInCluster()
Set the task as part of a cluster.
Definition: core_node.h:436
bool isActive() const
Is active property accessor.
Definition: core_node.h:191
void visit(AbstractPrinter *printer) override
Special visit method for a CoreQueueSender, printing an edge.
Core of the task node.
Definition: core_task.h:56
std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > createCopyFromThis()
Create a copy from this instance.
Definition: core_task.h:260
Abstract interface for Hedgehog&#39;s Memory manager.
Sender for nodes possessing a queue of data.
virtual bool canTerminate()
Determine if the node can terminate.
Definition: node.h:55
~CoreTask() override
Core Task destructor.
Definition: core_task.h:91
Printer interface.
Node Behavior definition.
Definition: node.h:39
Slot interface, receive notification from CoreNotifier.
Definition: core_slot.h:34
bool hasNotBeenVisited(core::CoreNode const *node)
Accessor to check if a node has been visited by the printer.
std::shared_ptr< NodeInput > popFront()
Return the front element of the queue and return it.
int threadId() const
Thread id accessor.
Definition: core_node.h:148
Receiver for nodes possessing a queue of data.
void copyInnerStructure(CoreTask< TaskOutput, TaskInputs... > *rhs)
Copy the inner structure from rhs to this CoreTask.
Definition: core_task.h:129
void copyInnerStructure(CoreNode *rhs)
Copy inner structure from rhs nodes to this.
Definition: core_node.h:537
virtual std::shared_ptr< AbstractTask< TaskOutput, TaskInputs... > > copy()
Default copy overload, fail if cluster need to be copied.
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
AbstractTask< TaskOutput, TaskInputs... > * task_
Task node pointer (just for reference)
Definition: core_task.h:61
virtual void preRun()
Method defining what to do before the run.
Definition: core_node.h:494
void unlockUniqueMutex()
Unlock the mutex.
Main Hedgehog core abstraction.
Definition: core_node.h:48
virtual void postRun()
Method defining what to do after the run.
Definition: core_node.h:500
Multi receivers for nodes possessing a queue of data.
std::shared_ptr< std::condition_variable > const & notifyConditionVariable() const
Condition variable accessor.
void createCluster(std::shared_ptr< std::multimap< CoreNode *, std::shared_ptr< CoreNode >>> &insideNodesGraph) override
Create a cluster for a CoreTask.
Definition: core_task.h:138
bool isInCluster() const
In cluster property accessor.
Definition: core_node.h:187
void visit(AbstractPrinter *printer) override
Special visit method for a CoreTask.
Definition: core_task.h:118
Interface to call execute on the different nodes.
Definition: core_execute.h:30
std::shared_ptr< AbstractMemoryManager< TaskOutput > > const & memoryManager() const
Task&#39;s memory manager accessor.
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
void wakeUp() final
Wake up and notify a node connected to the condition variable CoreQueueSlot::notifyConditionVariable_...
void lockUniqueMutex()
Lock the mutex.
bool receiversEmpty() final
Test emptiness of all receivers.
bool automaticStart() const
Automatic start property accessor.
Definition: core_task.h:98
AbstractTask< TaskOutput, TaskInputs... > * task() const
Node accessor.
Definition: core_task.h:110
size_t numberThreads() const
Number of threads associated accessor.
Definition: core_node.h:152
void operateReceiver()
Operate a CoreTasks&#39;s receiver for a specific type, thread safe.
Definition: core_task.h:216
void incrementWaitDuration(std::chrono::duration< uint64_t, std::micro > const &wait)
Increment wait duration.
Definition: core_node.h:561
virtual void printNodeInformation(core::CoreNode *node)=0
Print node information.
std::shared_ptr< NvtxProfiler > & nvtxProfiler()
NVTX profiler accessor.
Definition: core_task.h:102
virtual void callExecute(std::shared_ptr< TaskInputs > data)=0
Wrapper to call the user-defined Execute::execute.
CoreTask(std::string_view const &name, size_t const numberThreads, NodeType const type, AbstractTask< TaskOutput, TaskInputs... > *task, bool automaticStart)
CoreTask constructor (Used for AbstractTask and StateManager)
Definition: core_task.h:75
std::string_view name()
Task&#39;s name accessor.
virtual std::string id() const
Unique Id accessor.
Definition: core_node.h:114
bool waitForNotification() override
Wait method for notification.
Definition: core_task.h:238
void run() override
Main loop for the CoreTask.
Definition: core_task.h:160
std::shared_ptr< NvtxProfiler > nvtxProfiler_
Store a nvtx profiler for the task.
Definition: core_task.h:66
size_t totalQueueSize() final
Sums the queue sizes for all receivers.