Hedgehog  0.0.0
A library to generate hybrid pipeline workflow systems
core_queue_receiver.h
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2 // software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3 // derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4 // or works. Modified works should carry a notice stating that you changed the software and should note the date and
5 // nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6 // source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7 // EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9 // WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10 // CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11 // THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12 // are solely responsible for determining the appropriateness of using and distributing the software and you assume
13 // all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14 // with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15 // operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16 // damage to property. The software developed by NIST employees is not subject to copyright protection within the
17 // United States.
18 
19 
20 #ifndef HEDGEHOG_CORE_QUEUE_RECEIVER_H
21 #define HEDGEHOG_CORE_QUEUE_RECEIVER_H
22 
23 #include "../../base/receiver/core_receiver.h"
24 #include "core_queue_slot.h"
25 
27 namespace hh::core {
28 
31 template<class NodeInput>
32 class CoreQueueReceiver : public virtual CoreReceiver<NodeInput> {
33  private:
34  std::shared_ptr<std::queue<std::shared_ptr<NodeInput>>> queue_ = nullptr;
35  std::shared_ptr<std::set<CoreSender < NodeInput> *>> senders_ = nullptr;
36  size_t maxQueueSize_ = 0;
37 
38  public:
43  CoreQueueReceiver(std::string_view const &name, NodeType const type, size_t const numberThreads) : CoreReceiver<
44  NodeInput>(name, type, numberThreads) {
45  HLOG_SELF(0, "Creating CoreQueueReceiver with type: " << (int) type << " and name: " << name)
46  queue_ = std::make_shared<std::queue<std::shared_ptr<NodeInput>>>();
47  senders_ = std::make_shared<std::set<CoreSender<NodeInput> *>>();
48  }
49 
51  ~CoreQueueReceiver() override {HLOG_SELF(0, "Destructing CoreQueueReceiver")}
52 
53  //Virtual
56  virtual CoreQueueSlot *queueSlot() = 0;
57 
60  size_t queueSize() override { return this->queue_->size(); }
61 
64  size_t maxQueueSize() { return this->maxQueueSize_; }
65 
68  void addSender(CoreSender <NodeInput> *sender) final {
69  HLOG_SELF(0, "Adding sender " << sender->name() << "(" << sender->id() << ")")
70  this->senders_->insert(sender);
71  }
72 
75  void removeSender(CoreSender <NodeInput> *sender) final {
76  HLOG_SELF(0, "Remove sender " << sender->name() << "(" << sender->id() << ")")
77  this->senders_->erase(sender);
78  }
79 
83  void receive(std::shared_ptr<NodeInput> data) final {
84  this->queueSlot()->lockUniqueMutex();
85  this->queue_->push(data);
86  HLOG_SELF(2, "Receives data new queue Size " << this->queueSize())
87  if (this->queueSize() > this->maxQueueSize_) { this->maxQueueSize_ = this->queueSize(); }
88  this->queueSlot()->unlockUniqueMutex();
89  }
90 
94  bool receiverEmpty() final {
95  HLOG_SELF(2, "Test queue emptiness")
96  return this->queue_->empty();
97  }
98 
101  std::set<CoreReceiver < NodeInput> *>
102  receivers() override { return {this}; }
103 
107  std::shared_ptr<NodeInput> popFront() {
108  HLOG_SELF(2, "Pop & front from queue")
109  auto element = queue_->front();
110  assert(element != nullptr);
111  queue_->pop();
112  return element;
113  }
114 
115  // Suppress wrong static analysis Used but syntax analysis didn't find it out
119  HLOG_SELF(0, "Copy Cluster CoreQueueReceiver information from " << rhs->name() << "(" << rhs->id() << ")")
120  this->queue_ = rhs->queue_;
121  this->senders_ = rhs->senders_;
122  }
123 };
124 
125 }
126 #endif //HEDGEHOG_CORE_QUEUE_RECEIVER_H
NodeType type() const
Node type accessor.
Definition: core_node.h:132
std::shared_ptr< std::set< CoreSender< NodeInput > * > > senders_
Senders connected to this receiver.
Receiver Interface, receive one data type from CoreSender.
Definition: core_receiver.h:44
void copyInnerStructure(CoreQueueReceiver< NodeInput > *rhs)
Copy the CoreQueueReceiver inner structure (queue and senders list) from rhs to this.
size_t queueSize() override
Return the current waiting data queue size.
Slot of CoreQueueMultiReceiver, receiving from CoreQueueNotifier.
std::shared_ptr< std::queue< std::shared_ptr< NodeInput > > > queue_
Waiting list of data to be processed.
void removeSender(CoreSender< NodeInput > *sender) final
Remove a CoreSender from this.
std::shared_ptr< NodeInput > popFront()
Return the front element of the queue and return it.
size_t maxQueueSize_
Maximum queue size registered.
size_t maxQueueSize()
Return the maximum current waiting data queue size registered.
Receiver for nodes possessing a queue of data.
void addSender(CoreSender< NodeInput > *sender) final
Add a CoreSender to this.
NodeType
Hedgehog node&#39;s type.
Definition: core_node.h:40
Sender interface, send data to CoreReceiver.
Definition: core_sender.h:35
bool receiverEmpty() final
Test emptiness on the queue.
void unlockUniqueMutex()
Unlock the mutex.
~CoreQueueReceiver() override
CoreQueueReceiver default destructor.
Hedgehog core namespace.
Definition: core_execute.h:25
std::string_view const & name() const
Node name accessor.
Definition: core_node.h:128
CoreQueueReceiver(std::string_view const &name, NodeType const type, size_t const numberThreads)
CoreQueueReceiver constructor.
void lockUniqueMutex()
Lock the mutex.
virtual CoreQueueSlot * queueSlot()=0
Queue slot accessor possessed by a CoreQueueMultiReceiver.
size_t numberThreads() const
Number of threads associated accessor.
Definition: core_node.h:152
std::set< CoreReceiver< NodeInput > * > receivers() override
Receivers accessor.
void receive(std::shared_ptr< NodeInput > data) final
Receive a data from a CoreQueueSender, and store it into the waiting queue.
virtual std::string id() const
Unique Id accessor.
Definition: core_node.h:114