20 #ifndef HEDGEHOG_CORE_QUEUE_RECEIVER_H 21 #define HEDGEHOG_CORE_QUEUE_RECEIVER_H 23 #include "../../base/receiver/core_receiver.h" 24 #include "core_queue_slot.h" 31 template<
class NodeInput>
34 std::shared_ptr<std::queue<std::shared_ptr<NodeInput>>>
queue_ =
nullptr;
35 std::shared_ptr<std::set<CoreSender < NodeInput> *>>
senders_ =
nullptr;
44 NodeInput>(name, type, numberThreads) {
45 HLOG_SELF(0,
"Creating CoreQueueReceiver with type: " << (
int) type <<
" and name: " << name)
46 queue_ = std::make_shared<std::queue<std::shared_ptr<NodeInput>>>();
47 senders_ = std::make_shared<std::set<CoreSender<NodeInput> *>>();
60 size_t queueSize()
override {
return this->queue_->size(); }
69 HLOG_SELF(0,
"Adding sender " << sender->name() <<
"(" << sender->id() <<
")")
70 this->senders_->insert(sender);
76 HLOG_SELF(0,
"Remove sender " << sender->name() <<
"(" << sender->id() <<
")")
77 this->senders_->erase(sender);
83 void receive(std::shared_ptr<NodeInput> data)
final {
85 this->queue_->push(data);
86 HLOG_SELF(2,
"Receives data new queue Size " << this->
queueSize())
87 if (this->
queueSize() > this->maxQueueSize_) { this->maxQueueSize_ = this->
queueSize(); }
95 HLOG_SELF(2,
"Test queue emptiness")
96 return this->queue_->empty();
101 std::set<CoreReceiver < NodeInput> *>
108 HLOG_SELF(2,
"Pop & front from queue")
109 auto element = queue_->front();
110 assert(element !=
nullptr);
119 HLOG_SELF(0,
"Copy Cluster CoreQueueReceiver information from " << rhs->
name() <<
"(" << rhs->
id() <<
")")
120 this->queue_ = rhs->
queue_;
126 #endif //HEDGEHOG_CORE_QUEUE_RECEIVER_H NodeType type() const
Node type accessor.
std::shared_ptr< std::set< CoreSender< NodeInput > * > > senders_
Senders connected to this receiver.
Receiver Interface, receive one data type from CoreSender.
void copyInnerStructure(CoreQueueReceiver< NodeInput > *rhs)
Copy the CoreQueueReceiver inner structure (queue and senders list) from rhs to this.
size_t queueSize() override
Return the current waiting data queue size.
Slot of CoreQueueMultiReceiver, receiving from CoreQueueNotifier.
std::shared_ptr< std::queue< std::shared_ptr< NodeInput > > > queue_
Waiting list of data to be processed.
void removeSender(CoreSender< NodeInput > *sender) final
Remove a CoreSender from this.
std::shared_ptr< NodeInput > popFront()
Return the front element of the queue and return it.
size_t maxQueueSize_
Maximum queue size registered.
size_t maxQueueSize()
Return the maximum current waiting data queue size registered.
Receiver for nodes possessing a queue of data.
void addSender(CoreSender< NodeInput > *sender) final
Add a CoreSender to this.
NodeType
Hedgehog node's type.
Sender interface, send data to CoreReceiver.
bool receiverEmpty() final
Test emptiness on the queue.
void unlockUniqueMutex()
Unlock the mutex.
~CoreQueueReceiver() override
CoreQueueReceiver default destructor.
std::string_view const & name() const
Node name accessor.
CoreQueueReceiver(std::string_view const &name, NodeType const type, size_t const numberThreads)
CoreQueueReceiver constructor.
void lockUniqueMutex()
Lock the mutex.
virtual CoreQueueSlot * queueSlot()=0
Queue slot accessor possessed by a CoreQueueMultiReceiver.
size_t numberThreads() const
Number of threads associated accessor.
std::set< CoreReceiver< NodeInput > * > receivers() override
Receivers accessor.
void receive(std::shared_ptr< NodeInput > data) final
Receive a data from a CoreQueueSender, and store it into the waiting queue.
virtual std::string id() const
Unique Id accessor.