HTGS  v2.0
The Hybrid Task Graph Scheduler
BlockingQueue.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
13 #ifndef HTGS_BLOCKINGQUEUE_HPP
14 #define HTGS_BLOCKINGQUEUE_HPP
15 
16 #include <iostream>
17 #include <mutex>
18 #include <condition_variable>
19 #include <deque>
20 #include <queue>
21 
22 namespace htgs {
31 template<class T>
33  public:
38  this->queueSize = 0;
39 #ifdef PROFILE_QUEUE
40  enqueueLockTime = 0;
41  dequeueLockTime = 0;
42  enqueueWaitTime = 0;
43  dequeueWaitTime = 0;
44 #endif
45 #ifdef PROFILE
46 
47  queueActiveMaxSize = 0;
48 #endif
49  }
50 
55  BlockingQueue(size_t qSize) {
56  this->queueSize = qSize;
57 #ifdef PROFILE_QUEUE
58  enqueueLockTime = 0;
59  dequeueLockTime = 0;
60  enqueueWaitTime = 0;
61  dequeueWaitTime = 0;
62 #endif
63 #ifdef PROFILE
64  queueActiveMaxSize = 0;
65 #endif
66  }
67 
72  }
73 
79  size_t remainingCapacity() {
80  if (queueSize == 0) {
81  std::cerr << __FILE__ << ":" << __LINE__
82  << "ERROR: Requesting remaining capacity on BlockingQueue that does not have a max size" << std::endl;
83  }
84  return queueSize - queue.size();
85  }
86 
93  bool isEmpty() {
94  return queue.empty();
95  }
96 
101  size_t size() {
102  std::unique_lock<std::mutex> lock(this->mutex);
103  return queue.size();
104  }
105 
114  T remove() {
115  T res = this->queue.front();
116  this->queue.pop();
117  return res;
118  }
119 
126  void Enqueue(T const &value) {
127 
128 #ifdef PROFILE_QUEUE
129  auto start = std::chrono::high_resolution_clock::now();
130 #endif
131  std::unique_lock<std::mutex> lock(this->mutex);
132 #ifdef PROFILE_QUEUE
133  auto end = std::chrono::high_resolution_clock::now();
134  this->enqueueLockTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
135 #endif
136  if (this->queueSize > 0) {
137 #ifdef PROFILE_QUEUE
138  start = std::chrono::high_resolution_clock::now();
139 #endif
140  this->condition.wait(lock, [=] { return this->queue.size() != queueSize; });
141 #ifdef PROFILE_QUEUE
142  end = std::chrono::high_resolution_clock::now();
143  this->enqueueWaitTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
144 #endif
145  }
146  queue.push(value);
147 
148 #ifdef PROFILE
149  if (queue.size() > queueActiveMaxSize)
150  queueActiveMaxSize = queue.size();
151 #endif
152 
153  this->condition.notify_one();
154  }
155 
162  T Dequeue() {
163 #ifdef PROFILE_QUEUE
164  auto start = std::chrono::high_resolution_clock::now();
165 #endif
166  std::unique_lock<std::mutex> lock(this->mutex);
167 #ifdef PROFILE_QUEUE
168  auto end = std::chrono::high_resolution_clock::now();
169  this->dequeueLockTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
170  start = std::chrono::high_resolution_clock::now();
171 #endif
172  this->condition.wait(lock, [=] { return !this->queue.empty(); });
173 #ifdef PROFILE_QUEUE
174  end = std::chrono::high_resolution_clock::now();
175  this->dequeueWaitTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
176 #endif
177  T res = this->queue.front();
178  this->queue.pop();
179  return res;
180  }
181 
189  T poll(size_t timeout) {
190  std::unique_lock<std::mutex> lock(this->mutex);
191  if (this->condition.wait_for(lock, std::chrono::microseconds(timeout),
192  [=] { return !this->queue.empty(); })) {
193  T res = this->queue.front();
194  this->queue.pop();
195  return res;
196  }
197  return nullptr;
198  }
199 
200 
201 
202 #ifdef PROFILE_QUEUE
203  unsigned long long int getEnqueueLockTime() const {
204  return enqueueLockTime;
205  }
206  unsigned long long int getDequeueLockTime() const {
207  return dequeueLockTime;
208  }
209  unsigned long long int getEnqueueWaitTime() const {
210  return enqueueWaitTime;
211  }
212  unsigned long long int getDequeueWaitTime() const {
213  return dequeueWaitTime;
214  }
215 #endif
216 
217 #ifdef PROFILE
218  size_t getQueueActiveMaxSize() const {
219  return queueActiveMaxSize;
220  }
221 
222  void resetMaxQueueSize() {
223  queueActiveMaxSize = 0;
224  }
225 #endif
226 
227  private:
228 #ifdef PROFILE_QUEUE
229  unsigned long long int enqueueLockTime;
230  unsigned long long int dequeueLockTime;
231  unsigned long long int enqueueWaitTime;
232  unsigned long long int dequeueWaitTime;
233 #endif
234 
235 #ifdef PROFILE
236  size_t queueActiveMaxSize;
237 #endif
238  size_t queueSize;
239  std::queue<T> queue;
240  std::mutex mutex;
241  std::condition_variable condition;
242 };
243 }
244 
245 #endif //HTGS_BLOCKINGQUEUE_HPP
~BlockingQueue()
Destructor.
Definition: BlockingQueue.hpp:71
bool isEmpty()
Gets whether the queue is empty or not.
Definition: BlockingQueue.hpp:93
size_t size()
Gets the number of elements in the queue.
Definition: BlockingQueue.hpp:101
Creates a thread-safe queue that will wait when no data is available and can block if the queue is fu...
Definition: BlockingQueue.hpp:32
std::mutex mutex
The mutex to ensure thread safety.
Definition: BlockingQueue.hpp:240
void Enqueue(T const &value)
Adds an element into the queue.
Definition: BlockingQueue.hpp:126
std::condition_variable condition
The condition variable used for waking up waiting threads.
Definition: BlockingQueue.hpp:241
T Dequeue()
Removes an element from the queue.
Definition: BlockingQueue.hpp:162
T poll(size_t timeout)
Polls for data given the specified timeout time in microseconds.
Definition: BlockingQueue.hpp:189
size_t remainingCapacity()
Gets the remaining capacity of the queue based on the queueSize.
Definition: BlockingQueue.hpp:79
BlockingQueue(size_t qSize)
Creates a blocking queue that will block a data requester when the queue is empty or full...
Definition: BlockingQueue.hpp:55
size_t queueSize
The maximum size of the queue, set to -1 for infinite size.
Definition: BlockingQueue.hpp:238
Definition: Bookkeeper.hpp:23
BlockingQueue()
Creates a blocking queue that will only block a data requester when the queue is empty.
Definition: BlockingQueue.hpp:37
std::queue< T > queue
The FIFO queue.
Definition: BlockingQueue.hpp:239