Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_queue_sender.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 
19 
20 #ifndef HEDGEHOG_CORE_QUEUE_SENDER_H
21 #define HEDGEHOG_CORE_QUEUE_SENDER_H
22 
23 #include "core_queue_notifier.h"
24 #include "../receiver/core_queue_receiver.h"
25 #include "../../base/sender/core_sender.h"
26 #include "../../../../tools/traits.h"
27 
29 namespace hh::core {
30 
33 template<class NodeOutput>
34 class CoreQueueSender : public CoreSender<NodeOutput>, public virtual CoreQueueNotifier {
35  std::shared_ptr<std::set<CoreQueueReceiver<NodeOutput> *>> destinations_ = nullptr;
36 
37  public:
42  CoreQueueSender(std::string_view const &name, NodeType const type, size_t const numberThreads)
43  : CoreQueueNotifier(name, type, numberThreads),
44  CoreSender<NodeOutput>(name, type, numberThreads) {
45  HLOG_SELF(0, "Creating CoreQueueSender with type: " << (int) type << " and name: " << name)
46  destinations_ = std::make_shared<std::set<CoreQueueReceiver<NodeOutput> *>>();
47  }
48 
50  ~CoreQueueSender() override {HLOG_SELF(0, "Destructing CoreQueueSender")}
51 
54  virtual std::shared_ptr<std::set<CoreQueueReceiver<NodeOutput> *>> const &destinations() const {
55  return destinations_;
56  }
57 
60  void addReceiver(CoreReceiver<NodeOutput> *receiver) override {
61  HLOG_SELF(0, "Add receiver " << receiver->name() << "(" << receiver->id() << ")")
62  for (auto queueReceiver: receiver->receivers()) {
63  if (auto r = dynamic_cast<CoreQueueReceiver<NodeOutput> *>(queueReceiver)) {
64  this->destinations_->insert(r);
65  } else {
66  std::ostringstream oss;
67  oss
68  << "Internal error, CoreQueueSender connected to a node which is not a CoreQueueReceiver: "
69  << __FUNCTION__;
70  HLOG_SELF(0, oss.str())
71  throw (std::runtime_error(oss.str()));
72  }
73  }
74  }
75 
78  void removeReceiver(CoreReceiver<NodeOutput> *receiver) override {
79  HLOG_SELF(0, "Remove receiver " << receiver->name() << "(" << receiver->id() << ")")
80  for (auto queueReceiver: receiver->receivers()) {
81  if (auto r = dynamic_cast<CoreQueueReceiver<NodeOutput> *>(queueReceiver)) {
82  this->destinations_->erase(r);
83  } else {
84  std::ostringstream oss;
85  oss
86  << "Internal error, CoreQueueSender connected to a node which is not a CoreQueueReceiver: "
87  << __FUNCTION__;
88  HLOG_SELF(0, oss.str())
89  throw (std::runtime_error(oss.str()));
90  }
91  }
92  }
93 
96  void sendAndNotify(std::shared_ptr<NodeOutput> ptr) final {
97  for (CoreQueueReceiver<NodeOutput> *receiver : *(this->destinations_)) {
98  HLOG_SELF(2, "Send data to " << receiver->name() << "(" << receiver->id() << ")")
99  receiver->receive(ptr);
100  HLOG_SELF(2, "Wake up " << receiver->name() << "(" << receiver->id() << ")")
101  receiver->queueSlot()->wakeUp();
102  }
103  }
104 
107  void visit(AbstractPrinter *printer) override {
108  HLOG_SELF(1, "Visit")
109  for (CoreQueueReceiver<NodeOutput> *receiver : *(this->destinations())) {
110  if (receiver->type() != NodeType::Switch || receiver->type() != NodeType::ExecutionPipeline) {
111  printer->printEdge(this,
112  receiver,
113  traits::type_name<NodeOutput>(),
114  receiver->queueSize(),
115  receiver->maxQueueSize(),
116  traits::is_managed_memory_v<NodeOutput>);
117  }
118  }
119  }
120 
123  std::set<CoreSender<NodeOutput> *> getSenders() override { return {this}; }
124 
128  HLOG_SELF(0, "Copy Cluster CoreQueueSender information from " << rhs->name() << "(" << rhs->id() << ")")
129  this->destinations_ = rhs->destinations_;
131  }
132 
133  protected:
137  void duplicateEdge(CoreNode *duplicateNode,
138  std::map<CoreNode *, std::shared_ptr<CoreNode>> &correspondenceMap) override {
139  for (CoreQueueReceiver<NodeOutput> *originalReceiver : *(this->destinations())) {
140  if (auto coreNode = dynamic_cast<CoreNode *>(originalReceiver)) {
141  auto nodeReceiverFound = correspondenceMap.find(coreNode);
142  if (nodeReceiverFound != correspondenceMap.end()) {
143  if (nodeReceiverFound->second->id() == this->id()) {
144  std::ostringstream oss;
145  oss
146  << "Internal error, Receiver found is the same as the original Receiver, copy failed "
147  << __FUNCTION__;
148  HLOG_SELF(0, oss.str())
149  throw (std::runtime_error(oss.str()));
150  }
151  connectSenderToReceiverDuplication(duplicateNode, nodeReceiverFound->second.get());
152  }
153  } else {
154  std::ostringstream oss;
155  oss
156  << "Internal error, The Receiver is not a CoreNode, copy failed "
157  << __FUNCTION__;
158  HLOG_SELF(0, oss.str())
159  throw (std::runtime_error(oss.str()));
160  }
161  }
162  }
163 
164  private:
168  void connectSenderToReceiverDuplication(CoreNode *coreNodeSender, CoreNode *coreNodeReceiver) {
169  auto coreReceiver = dynamic_cast<CoreReceiver<NodeOutput> *>(coreNodeReceiver);
170  auto coreSlot = dynamic_cast<CoreSlot *>(coreNodeReceiver);
171  auto coreNotifier = dynamic_cast<CoreNotifier *>(coreNodeSender);
172 
173  // Do the data connection
174  if (coreReceiver) {
175  for (auto r : coreReceiver->receivers()) {
176  dynamic_cast<CoreQueueSender<NodeOutput> *>(coreNodeSender)->addReceiver(r);
177  }
178  } else {
179  std::ostringstream oss;
180  oss << "Internal error, during edge duplication" << __FUNCTION__;
181  HLOG_SELF(0, oss.str())
182  throw (std::runtime_error(oss.str()));
183  }
184 
185  //Do the signal connection
186  if (coreSlot && coreNotifier) {
187  for (CoreSlot *slot : coreSlot->getSlots()) { coreNotifier->addSlot(slot); }
188  for (auto s : dynamic_cast<CoreQueueSender<NodeOutput> *>(coreNodeSender)->getSenders()) {
189  coreReceiver->addSender(s);
190  coreSlot->addNotifier(s);
191  }
192  } else {
193  std::ostringstream oss;
194  oss << "Internal error, during edge duplication" << __FUNCTION__;
195  HLOG_SELF(0, oss.str())
196  throw (std::runtime_error(oss.str()));
197  }
198  }
199 };
200 
201 }
202 #endif //HEDGEHOG_CORE_QUEUE_SENDER_H
NodeType type() const
Node type accessor.
Definition: core_node.h:132
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
void copyInnerStructure(CoreQueueSender< NodeOutput > *rhs)
Copy the inner structure of a CoreQueueSender (destinations, and notifier)
virtual std::set< CoreReceiver< Input > * > receivers()=0
Accessor to all receivers connected to this receiver.
Core Notifier interface, emit notification to CoreSlot.
Definition: core_notifier.h:34
void copyInnerStructure(CoreQueueNotifier *rhs)
Copy the inner structure of the notifier (set of slots and connections)
std::shared_ptr< std::set< CoreQueueReceiver< NodeOutput > * > > destinations_
Set of receivers linked.
void visit(AbstractPrinter *printer) override
Special visit method for a CoreQueueSender, printing an edge.
std::set< CoreSender< NodeOutput > * > getSenders() override
Get inner CoreSender i.e. this.
Sender for nodes possessing a queue of data.
void connectSenderToReceiverDuplication(CoreNode *coreNodeSender, CoreNode *coreNodeReceiver)
Connect coreNodeSender to a coreNodeReceiver, connect for data transfer and signal handling...
void duplicateEdge(CoreNode *duplicateNode, std::map< CoreNode *, std::shared_ptr< CoreNode >> &correspondenceMap) override
Duplicate all the edges from this to it&#39;s copy duplicateNode.
Printer interface.
Slot interface, receive notification from CoreNotifier.
Definition: core_slot.h:34
Receiver for nodes possessing a queue of data.
~CoreQueueSender() override
CoreQueueSender destructor.
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
virtual void printEdge(core::CoreNode const *from, core::CoreNode const *to, std::string_view const &edgeType, size_t const &queueSize, size_t const &maxQueueSize, bool isMemoryManaged)=0
Print edge information.
Sender interface, send data to CoreReceiver.
Definition: core_sender.h:35
void addReceiver(CoreReceiver< NodeOutput > *receiver) override
Add a receiver to the set of receivers.
Main Hedgehog core abstraction.
Definition: core_node.h:48
Notifier of CoreQueueSlot.
CoreQueueSender(std::string_view const &name, NodeType const type, size_t const numberThreads)
CoreQueueSender constructor.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
Hedgehog core namespace.
Definition: core_execute.h:25
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
void removeReceiver(CoreReceiver< NodeOutput > *receiver) override
Remove a receiver from the set of receivers.
size_t numberThreads() const
Number of threads associated accessor.
Definition: core_node.h:152
virtual std::shared_ptr< std::set< CoreQueueReceiver< NodeOutput > * > > const & destinations() const
Destination accessor.
virtual std::string id() const
Unique Id accessor.
Definition: core_node.h:114
void sendAndNotify(std::shared_ptr< NodeOutput > ptr) final
Send a data to the list of destinations, and notify them.