HTGS  v2.0
The Hybrid Task Graph Scheduler
PriorityBlockingQueue.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
14 #ifndef HTGS_PRIORITYBLOCKINGQUEUE_HPP
15 #define HTGS_PRIORITYBLOCKINGQUEUE_HPP
16 
17 #include <condition_variable>
18 #include <mutex>
19 #include <ostream>
20 #include <iostream>
21 #include <queue>
22 #include <htgs/api/IData.hpp>
23 
24 namespace htgs {
32 template<class T>
34  public:
39  this->queueSize = 0;
40 #ifdef PROFILE
41  this->queueActiveMaxSize = 0;
42 #endif
43 #ifdef PROFILE_QUEUE
44  this->dequeueLockTime = 0;
45  this->dequeueWaitTime = 0;
46  this->enqueueLockTime = 0;
47  this->enqueueWaitTime = 0;
48 #endif
49  }
50 
55  PriorityBlockingQueue(size_t qSize) {
56  this->queueSize = qSize;
57 #ifdef PROFILE
58  this->queueActiveMaxSize = 0;
59 #endif
60 #ifdef PROFILE_QUEUE
61  this->dequeueLockTime = 0;
62  this->dequeueWaitTime = 0;
63  this->enqueueLockTime = 0;
64  this->enqueueWaitTime = 0;
65 #endif
66  }
67 
72 
73  }
74 
80  size_t remainingCapacity() {
81  if (queueSize == 0) {
82  std::cerr << __FILE__ << ":" << __LINE__
83  << "ERROR: Requesting remaining capacity on BlockingQueue that does not have a max size" << std::endl;
84  }
85  return queueSize - queue.size();
86  }
87 
94  bool isEmpty() {
95  return queue.empty();
96  }
97 
102  size_t size() {
103  std::unique_lock<std::mutex> lock(this->mutex);
104  return queue.size();
105  }
106 
114  T remove() {
115  T res = this->queue.top();
116  this->queue.pop();
117  return res;
118  }
119 
126  void Enqueue(T const &value) {
127 #ifdef PROFILE_QUEUE
128  auto start = std::chrono::high_resolution_clock::now();
129 #endif
130  std::unique_lock<std::mutex> lock(this->mutex);
131 #ifdef PROFILE_QUEUE
132  auto end = std::chrono::high_resolution_clock::now();
133  this->enqueueLockTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
134 #endif
135  if (this->queueSize > 0) {
136 #ifdef PROFILE_QUEUE
137  start = std::chrono::high_resolution_clock::now();
138 #endif
139  this->condition.wait(lock, [=] { return this->queue.size() != queueSize; });
140 #ifdef PROFILE_QUEUE
141  end = std::chrono::high_resolution_clock::now();
142  this->enqueueWaitTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
143 #endif
144  }
145  queue.push(value);
146 
147 #ifdef PROFILE
148  if (queue.size() > queueActiveMaxSize)
149  queueActiveMaxSize = queue.size();
150 #endif
151 
152  this->condition.notify_one();
153  }
154 
161  T Dequeue() {
162 #ifdef PROFILE_QUEUE
163  auto start = std::chrono::high_resolution_clock::now();
164 #endif
165  std::unique_lock<std::mutex> lock(this->mutex);
166 #ifdef PROFILE_QUEUE
167  auto end = std::chrono::high_resolution_clock::now();
168  this->dequeueLockTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
169 
170  start = std::chrono::high_resolution_clock::now();
171 #endif
172  this->condition.wait(lock, [=] { return !this->queue.empty(); });
173 
174 #ifdef PROFILE_QUEUE
175  end = std::chrono::high_resolution_clock::now();
176  this->dequeueWaitTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
177 #endif
178  T res = this->queue.top();
179  this->queue.pop();
180  return res;
181  }
182 
190  T poll(size_t timeout) {
191  std::unique_lock<std::mutex> lock(this->mutex);
192  if (this->condition.wait_for(lock, std::chrono::microseconds(timeout),
193  [=] { return !this->queue.empty(); })) {
194  T res = this->queue.top();
195  this->queue.pop();
196  return res;
197  }
198  return nullptr;
199  }
200 
201 #ifdef PROFILE_QUEUE
202  unsigned long long int getEnqueueLockTime() const {
203  return enqueueLockTime;
204  }
205  unsigned long long int getDequeueLockTime() const {
206  return dequeueLockTime;
207  }
208  unsigned long long int getEnqueueWaitTime() const {
209  return enqueueWaitTime;
210  }
211  unsigned long long int getDequeueWaitTime() const {
212  return dequeueWaitTime;
213  }
214 #endif
215 
216 #ifdef PROFILE
217  size_t getQueueActiveMaxSize() const {
218  return queueActiveMaxSize;
219  }
220 
221  void resetMaxQueueSize() {
222  queueActiveMaxSize = 0;
223  }
224 #endif
225 
226  private:
227 #ifdef PROFILE_QUEUE
228  unsigned long long int enqueueLockTime;
229  unsigned long long int dequeueLockTime;
230  unsigned long long int enqueueWaitTime;
231  unsigned long long int dequeueWaitTime;
232 #endif
233 #ifdef PROFILE
234  size_t queueActiveMaxSize;
235 #endif
236  size_t queueSize;
237  std::priority_queue<T, std::vector<T>, IData> queue;
238  std::mutex mutex;
239  std::condition_variable condition;
240 };
241 }
242 
243 #endif //HTGS_PRIORITYBLOCKINGQUEUE_HPP
244 
size_t size()
Gets the number of elements in the priority queue.
Definition: PriorityBlockingQueue.hpp:102
std::priority_queue< T, std::vector< T >, IData > queue
The priority queue.
Definition: PriorityBlockingQueue.hpp:237
T poll(size_t timeout)
Polls for data given the specified timeout time in microseconds.
Definition: PriorityBlockingQueue.hpp:190
PriorityBlockingQueue(size_t qSize)
Creates a priority blocking queue that will block a data requester when the queue is empty or full...
Definition: PriorityBlockingQueue.hpp:55
size_t remainingCapacity()
Gets the remaining capacity of the priority queue based on the queueSize.
Definition: PriorityBlockingQueue.hpp:80
PriorityBlockingQueue()
Creates a priority blocking queue that will only block a data requester when the queue is empty...
Definition: PriorityBlockingQueue.hpp:38
std::condition_variable condition
The condition variable used for waking up waiting threads.
Definition: PriorityBlockingQueue.hpp:239
void Enqueue(T const &value)
Adds an element into the priority queue.
Definition: PriorityBlockingQueue.hpp:126
Class to hold any type of data.
Definition: IData.hpp:60
bool isEmpty()
Gets whether the priority queue is empty or not.
Definition: PriorityBlockingQueue.hpp:94
Implements the IData class, which is used for all data types entering/leaving a task graph...
Creates a thread-safe priority queue that will wait when no data is available and can block if the qu...
Definition: PriorityBlockingQueue.hpp:33
std::mutex mutex
The mutex to ensure thread safety.
Definition: PriorityBlockingQueue.hpp:238
size_t queueSize
The maximum size of the queue, set to -1 for infinite size.
Definition: PriorityBlockingQueue.hpp:236
~PriorityBlockingQueue()
Destructor.
Definition: PriorityBlockingQueue.hpp:71
T Dequeue()
Removes an element from the priority queue.
Definition: PriorityBlockingQueue.hpp:161
Definition: Bookkeeper.hpp:23