HTGS  v2.0
The Hybrid Task Graph Scheduler
RuleManager.hpp
Go to the documentation of this file.
1 //
2 // Created by tjb3 on 2/22/17.
3 //
4 
13 #ifndef HTGS_RULEMANAGER_HPP
14 #define HTGS_RULEMANAGER_HPP
15 
16 #include <htgs/api/Bookkeeper.hpp>
17 #include <htgs/api/IRule.hpp>
20 
21 namespace htgs {
22 
23 template<class V, class W>
24 class IRule;
25 
56 template<class T, class U>
57 class RuleManager : public AnyRuleManagerInOnly<T> {
58  static_assert(std::is_base_of<IData, T>::value, "T must derive from IData");
59  static_assert(std::is_base_of<IData, U>::value, "U must derive from IData");
60 
61  public:
62 
70  RuleManager(std::shared_ptr<htgs::IRule<T, U>> rule)// TODO: Delete or Add #ifdef, TaskGraphCommunicator *communicator)
71  : rule(rule), /*TODO: Delete or Add #ifdef communicator(communicator),*/ pipelineId(0), numPipelines(1), terminated(false) {}
72 
76  virtual ~RuleManager() override {}
77 
78  void executeTask(std::shared_ptr<T> data) override {
79 
80  if (this->rule->canUseLocks()) {
81  this->rule->getMutex().lock();
82  }
83 
84  // Check if the rule is expecting data or not
86 
87  HTGS_DEBUG_VERBOSE("Rule: " << rule->getName() << " consuming data: " << data);
88  auto result = rule->applyRuleFunction(data, pipelineId);
89 
90  if (result != nullptr && result->size() > 0) {
91  if (this->connector != nullptr) {
92 #ifdef WS_PROFILE
93  sendWSProfileUpdate(this, StatusCode::ACTIVATE_EDGE);
94 #endif
95  this->connector->produceData(result);
96  }
97  }
98 
99 
100  // Check if the rule is ready to be terminated after processing data (in case no more data
102 
103  if (this->rule->canUseLocks()) {
104  this->rule->getMutex().unlock();
105  }
106  }
107 
108  RuleManager<T, U> *copy() override {
109  return new RuleManager<T, U>(this->rule); // TODO: Delete or Add #ifdef , this->communicator);
110  }
111 
112  std::string getName(int flags = 0) override {
113  std::string inOutLabel = (((flags & DOTGEN_FLAG_SHOW_IN_OUT_TYPES) != 0) ? ("\\nout: " + this->outTypeName()) : "");
114 
115  return this->rule->getName() + inOutLabel;
116  }
117 
118  void debug() override {
119  HTGS_DEBUG(this->getName() << " output connector: " << this->connector);
120  }
121 
122  std::shared_ptr<AnyConnector> getConnector() override {
123  return this->connector;
124  }
125 
126  void initialize(size_t pipelineId, size_t numPipelines, std::string address) override {
127  HTGS_DEBUG_VERBOSE("Initialized " << this->getName() << " pipeline id: " << pipelineId);
128  this->pipelineId = pipelineId;
129  this->numPipelines = numPipelines;
130  this->address = address;
131  }
132 
133  void shutdown() override {
134  HTGS_DEBUG("Shutting down " << this->getName() << " pipeline id: " << pipelineId);
135 
136  // Check if the rule manager was terminated by it's rule
137  if (!this->terminated) {
138 
139  // Close any active connections
140  HTGS_DEBUG("Waking up connector");
141  this->connector->producerFinished();
142  this->connector->wakeupConsumer();
143 
144 #ifdef WS_PROFILE
145  sendWSProfileUpdate(this->connector.get(), StatusCode::DECREMENT);
146 #endif
147  }
148 
149  // Shutdown the rule's pipeline ID
150  rule->shutdownRule(this->pipelineId);
151  }
152 
153  bool isTerminated() override {
154  return terminated;
155  }
156 
157  void setOutputConnector(std::shared_ptr<AnyConnector> connector) override {
158  this->connector = std::static_pointer_cast<Connector<U>>(connector);
159  HTGS_DEBUG_VERBOSE("Connector " << this->connector << " adding producer: " << this->getName() << " " << this <<
160  " to connector " << this->connector);
161  }
162 
166  void checkRuleTermination() override {
167  if (!terminated) {
168  // Check if the rule is ready to be terminated before and after processing data
169  if (rule->canTerminateRule(pipelineId)) {
170  terminated = true;
171  this->connector->producerFinished();
172  if (this->connector->isInputTerminated()) {
173  this->connector->wakeupConsumer();
174  }
175 #ifdef WS_PROFILE
176  sendWSProfileUpdate(this->connector.get(), StatusCode::DECREMENT);
177 #endif
178  }
179  }
180  }
181 
182  private:
183 
185 #ifdef WS_PROFILE
186  void sendWSProfileUpdate(void *addr, StatusCode code)
187  {
188  if (this->getName() == "WebSocketProfiler")
189  return;
190  std::shared_ptr<ProfileData> updateStatus(new ChangeStatusProfile(addr, code));
191  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), "", "WebSocketProfiler", "0", updateStatus));
192  this->communicator->produceDataPacket(dataPacket);
193  }
194 #endif
195 
196 
200  std::string inTypeName() {
201 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
202  int status;
203  char *realName = abi::__cxa_demangle(typeid(T).name(), 0, 0, &status);
204  std::string ret(realName);
205 
206  free(realName);
207 
208  return ret;
209 #else
210  return typeid(T).name();
211 #endif
212 
213  }
214 
218  std::string outTypeName() {
219 #if defined( __GLIBCXX__ ) || defined( __GLIBCPP__ )
220  int status;
221  char *realName = abi::__cxa_demangle(typeid(U).name(), 0, 0, &status);
222  std::string ret(realName);
223 
224  free(realName);
225 
226  return ret;
227 #else
228  return typeid(U).name();
229 #endif
230 
231  }
232 
234 
235  std::shared_ptr<IRule<T, U>> rule;
236  // TODO: Delete or Add #ifdef
237  // TaskGraphCommunicator *communicator; //!< The task graph communicator
238  size_t pipelineId;
239  size_t numPipelines;
240  std::string address;
241  std::shared_ptr<htgs::Connector<U>> connector;
242  volatile bool terminated;
243 
244 };
245 
246 }
247 
248 #endif //HTGS_RULEMANAGER_HPP
void executeTask(std::shared_ptr< T > data) override
Processes the input data, which is forwarded to the IRule synchronously.
Definition: RuleManager.hpp:78
Implements the base class for the rule manager, but only providing the input type.
void initialize(size_t pipelineId, size_t numPipelines, std::string address) override
Initializes the RuleManager.
Definition: RuleManager.hpp:126
std::string address
The address for the rule manager.
Definition: RuleManager.hpp:240
#define HTGS_DEBUG_VERBOSE(msg)
Prints a debug message to std:cerr with VERBOSE level.
Definition: debug_message.hpp:75
Provides the Connector class for managing input/output of AbsData between Tasks.
virtual ~RuleManager() override
Destructor.
Definition: RuleManager.hpp:76
volatile bool terminated
Whether this RuleManager is terminated or not.
Definition: RuleManager.hpp:242
Manages the input/output of IData between Tasks.
Definition: Connector.hpp:62
Provides an interface to send data along RuleManager edges for processing state and dependencies...
Definition: ExecutionPipeline.hpp:34
Implements the Bookkeeper class.
void setOutputConnector(std::shared_ptr< AnyConnector > connector) override
Sets the output connector that the RuleManager is attached to.
Definition: RuleManager.hpp:157
std::shared_ptr< AnyConnector > getConnector() override
Gets the output connector associated with the RuleManager.
Definition: RuleManager.hpp:122
Provides an interface to send data along RuleManager edges for processing state and dependencies...
std::shared_ptr< htgs::Connector< U > > connector
The connector for producing data from the rule.
Definition: RuleManager.hpp:241
size_t numPipelines
The number of execution pipelines.
Definition: RuleManager.hpp:239
Implements the base class for the rule manager, but only provides the input type. ...
Definition: AnyRuleManagerInOnly.hpp:27
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
std::string getName(int flags=0) override
Gets the name of the RuleManager and the names of all IRules that it manages.
Definition: RuleManager.hpp:112
RuleManager(std::shared_ptr< htgs::IRule< T, U >> rule)
Creates a rule manager with a rule.
Definition: RuleManager.hpp:70
Connects a Bookkeeper to another ITask using one or more IRule(s).
Definition: RuleManager.hpp:57
#define DOTGEN_FLAG_SHOW_IN_OUT_TYPES
Shows input and output types for all tasks.
Definition: TaskGraphDotGenFlags.hpp:32
size_t pipelineId
The execution pipeline id.
Definition: RuleManager.hpp:238
Implements a data packet that is transmitted to the TaskGraphCommunicator.
void shutdown() override
Shuts down the RuleManager.
Definition: RuleManager.hpp:133
std::shared_ptr< IRule< T, U > > rule
The rule associated with the RuleManager.
Definition: RuleManager.hpp:235
bool isTerminated() override
Checks whether the RuleManager is terminated or not.
Definition: RuleManager.hpp:153
void checkRuleTermination() override
Checks if the rule can be terminated or not.
Definition: RuleManager.hpp:166
void debug() override
Provides debug output.
Definition: RuleManager.hpp:118
Definition: Bookkeeper.hpp:23
RuleManager< T, U > * copy() override
Creates a copy of the RuleManager.
Definition: RuleManager.hpp:108