HTGS  v2.0
The Hybrid Task Graph Scheduler
Connector.hpp
Go to the documentation of this file.
1 
2 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
3 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
4 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
5 
13 #ifndef HTGS_CONNECTOR_HPP
14 #define HTGS_CONNECTOR_HPP
15 
16 #include <atomic>
17 
18 #include <list>
19 
20 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
21 #include <cxxabi.h>
22 #endif
23 
25 #include <htgs/api/IData.hpp>
26 
27 #ifdef USE_PRIORITY_QUEUE
29 #else
31 #endif
32 
34 
35 namespace htgs {
36 
61 template<class T>
62 class Connector : public AnyConnector {
63  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
64 
65  public:
69  Connector() {}
70 
75 
76  bool isInputTerminated() override { return super::getProducerCount() == 0 && this->queue.isEmpty(); }
77 
78  Connector<T> *copy() override { return new Connector<T>(); }
79 
80  void wakeupConsumer() override { this->queue.Enqueue(nullptr); }
81 
82  void profileProduce(size_t numThreads) override {}
83 
84  void profileConsume(size_t numThreads, bool showQueueSize) override {
85 #ifdef PROFILE
86  std::cout << "consume largest queue size: " << queue.getQueueActiveMaxSize() << std::endl;
87 #endif
88  }
89 
90  size_t getQueueSize() override {
91  return this->queue.size();
92  }
93 
94  size_t getMaxQueueSize() override {
95 #ifdef PROFILE
96  return queue.getQueueActiveMaxSize();
97 #else
98  return 0;
99 #endif
100  }
101 
102  void resetMaxQueueSize() override {
103 #ifdef PROFILE
104  this->queue.resetMaxQueueSize();
105 #endif
106  }
107 
108  void produceAnyData(std::shared_ptr<IData> data) override {
109  HTGS_DEBUG_VERBOSE("Connector " << this << " producing any data: " << data);
110  std::shared_ptr<T> dataCast = std::dynamic_pointer_cast<T>(data);
111  this->queue.Enqueue(dataCast);
112 
113  }
114 
125  std::shared_ptr<T> pollConsumeData(size_t timeout) {
126  std::shared_ptr<T> data = this->queue.poll(timeout);
127  return data;
128  }
129 
137  std::shared_ptr<T> consumeData() {
138  std::shared_ptr<T> data = this->queue.Dequeue();
139  return data;
140  }
141 
146  void produceData(std::shared_ptr<T> data) {
147  HTGS_DEBUG_VERBOSE("Connector " << this << " producing data: " << data);
148  this->queue.Enqueue(data);
149  }
150 
155  void produceData(std::list<std::shared_ptr<T>> *data) {
156  for (std::shared_ptr<T> v : *data) {
157  HTGS_DEBUG_VERBOSE("Connector " << this << " producing list data: " << v);
158 
159  this->queue.Enqueue(v);
160  }
161  }
162 
163 #ifdef PROFILE_QUEUE
164  std::string getQueueTiming() override {
165  return std::to_string(queue.getEnqueueLockTime()) + ", "
166  + std::to_string(queue.getDequeueLockTime()) + ", "
167  + std::to_string(queue.getEnqueueWaitTime()) + ", "
168  + std::to_string(queue.getDequeueWaitTime());
169  }
170 #endif
171 
176  std::string typeName() override {
177 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
178  int status;
179  char *realName = abi::__cxa_demangle(typeid(T).name(), 0, 0, &status);
180  std::string ret(realName);
181 
182  free(realName);
183 
184  return ret;
185 #else
186  return typeid(T).name();
187 #endif
188 
189  }
190 
191  private:
193  typedef AnyConnector super;
195 
196 #ifdef USE_PRIORITY_QUEUE
198 #else
200 #endif
202 };
203 }
204 
205 #endif //HTGS_CONNECTOR_HPP
Connector()
Initializes the Connector with no producer tasks.
Definition: Connector.hpp:69
void wakeupConsumer() override
Awakens all Tasks that are consuming data from this connector.
Definition: Connector.hpp:80
std::string typeName() override
Gets the demangled type name of the connector.
Definition: Connector.hpp:176
bool isEmpty()
Gets whether the queue is empty or not.
Definition: BlockingQueue.hpp:93
Parent class for Connector, which removes the template type of the Connector.
Definition: AnyConnector.hpp:39
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
void produceData(std::shared_ptr< T > data)
Produces data into the queue.
Definition: Connector.hpp:146
size_t size()
Gets the number of elements in the queue.
Definition: BlockingQueue.hpp:101
Creates a thread-safe queue that will wait when no data is available and can block if the queue is fu...
Definition: BlockingQueue.hpp:32
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
Implements a thread-safe BlockingQueue.
void profileConsume(size_t numThreads, bool showQueueSize) override
Provides profile output for the consume operation.
Definition: Connector.hpp:84
void Enqueue(T const &value)
Adds an element into the queue.
Definition: BlockingQueue.hpp:126
void resetMaxQueueSize() override
Resets the max queue size profile.
Definition: Connector.hpp:102
~Connector()
Destructor.
Definition: Connector.hpp:74
Implements the IData class, which is used for all data types entering/leaving a task graph...
Implements a thread-safe PriorityBlockingQueue.
size_t getQueueSize() override
Gets the size of the queue that this connector has in its data queue.
Definition: Connector.hpp:90
Holds parent class for Connector, removes template type of Connector.
BlockingQueue< std::shared_ptr< T > > queue
The blocking queue associated with the connector (thread safe) (can be switched to a priority queue u...
Definition: Connector.hpp:201
Creates a thread-safe priority queue that will wait when no data is available and can block if the qu...
Definition: PriorityBlockingQueue.hpp:33
T Dequeue()
Removes an element from the queue.
Definition: BlockingQueue.hpp:162
bool isInputTerminated() override
Checks whether the producer for this Connector has finished pushing data onto its queue...
Definition: Connector.hpp:76
T poll(size_t timeout)
Polls for data given the specified timeout time in microseconds.
Definition: BlockingQueue.hpp:189
std::shared_ptr< T > pollConsumeData(size_t timeout)
Polls for data for a consumer given a timeout.
Definition: Connector.hpp:125
std::shared_ptr< T > consumeData()
Consumes data from the queue.
Definition: Connector.hpp:137
size_t getMaxQueueSize() override
Gets the maximum queue size that this connector has in its data queue.
Definition: Connector.hpp:94
Connector< T > * copy() override
Creates a copy of the BaseConnector.
Definition: Connector.hpp:78
void produceAnyData(std::shared_ptr< IData > data) override
Produces any data into the queue.
Definition: Connector.hpp:108
void produceData(std::list< std::shared_ptr< T >> *data)
Produces a list of data adding each element into the queue.
Definition: Connector.hpp:155
void profileProduce(size_t numThreads) override
Provide profile output for the produce operation.
Definition: Connector.hpp:82
Definition: Bookkeeper.hpp:23
Provides functionality for debug messaging.