Hedgehog  3.1.0
A library to generate hybrid pipeline workflow systems
Loading...
Searching...
No Matches
queue_receiver.h
Go to the documentation of this file.
1// NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2// software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3// derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4// or works. Modified works should carry a notice stating that you changed the software and should note the date and
5// nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6// source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7// EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9// WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10// CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11// THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12// are solely responsible for determining the appropriateness of using and distributing the software and you assume
13// all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14// with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15// operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16// damage to property. The software developed by NIST employees is not subject to copyright protection within the
17// United States.
18
19
20
21#ifndef HEDGEHOG_QUEUE_RECEIVER_H
22#define HEDGEHOG_QUEUE_RECEIVER_H
23
24#include <iostream>
25#include <cassert>
26#include <queue>
27#include <mutex>
28#include <set>
29#include <utility>
30
31#include "../../abstractions/base/input_output/receiver_abstraction.h"
32#include "../implementor/implementor_receiver.h"
33
35namespace hh {
37namespace core {
38
40namespace abstraction {
41template<class Output>
42class SenderAbstraction;
43}
44
46namespace implementor {
47
50template<class Input>
51class QueueReceiver : public ImplementorReceiver<Input> {
52 private:
53 std::unique_ptr<std::queue<std::shared_ptr<Input>>> const
54 queue_ = nullptr;
55
56 std::unique_ptr<std::set<abstraction::SenderAbstraction<Input> *>> const
57 senders_ = nullptr;
58
59 size_t
61
62 public:
64 explicit QueueReceiver()
65 : queue_(std::make_unique<std::queue<std::shared_ptr<Input>>>()),
66 senders_(std::make_unique<std::set<abstraction::SenderAbstraction<Input> *>>()) {}
67
69 virtual ~QueueReceiver() = default;
70
73 void receive(std::shared_ptr<Input> const data) final {
74 queue_->push(data);
75 maxSize_ = std::max(queue_->size(), maxSize_);
76 }
77
81 [[nodiscard]] std::shared_ptr<Input> getInputData() override {
82 assert(!queue_->empty());
83 auto front = queue_->front();
84 queue_->pop();
85 return front;
86 }
87
90 [[nodiscard]] size_t numberElementsReceived() const override { return queue_->size(); }
91
94 [[nodiscard]] size_t maxNumberElementsReceived() const override { return maxSize_; }
95
98 [[nodiscard]] bool empty() const override { return queue_->empty(); }
99
102 [[nodiscard]] std::set<abstraction::SenderAbstraction<Input> *> const &connectedSenders() const override {
103 return *senders_;
104 }
105
108 void addSender(abstraction::SenderAbstraction<Input> *const sender) override { senders_->insert(sender); }
109
112 void removeSender(abstraction::SenderAbstraction<Input> *const sender) override { senders_->erase(sender); }
113
114};
115
116}
117}
118}
119#endif //HEDGEHOG_QUEUE_RECEIVER_H
Hedgehog main namespace.
Core abstraction to send data.
Concrete implementation of the receiver core abstraction for a type using a std::queue.
void removeSender(abstraction::SenderAbstraction< Input > *const sender) override
Remove a sender to the set of connected senders.
bool empty() const override
Test if the queue is empty.
std::shared_ptr< Input > getInputData() override
Get a data from the queue.
size_t maxSize_
Maximum size attained by the queue.
void addSender(abstraction::SenderAbstraction< Input > *const sender) override
Add a sender to the set of connected senders.
std::unique_ptr< std::set< abstraction::SenderAbstraction< Input > * > > const senders_
List of senders attached to this receiver.
QueueReceiver()
Default constructor.
std::unique_ptr< std::queue< std::shared_ptr< Input > > > const queue_
Queue storing to be processed data.
size_t numberElementsReceived() const override
Accessor to the current size of the queue.
void receive(std::shared_ptr< Input > const data) final
Receive a data and store it in the queue.
std::set< abstraction::SenderAbstraction< Input > * > const & connectedSenders() const override
Accessor to the set of connected senders.
size_t maxNumberElementsReceived() const override
Accessor to the maximum queue size.
virtual ~QueueReceiver()=default
Default destructor.
Implementor for the ReceiverAbstraction.