HTGS  v2.0
The Hybrid Task Graph Scheduler
AnyITask.hpp
Go to the documentation of this file.
1 // NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the software in any medium, provided that you keep intact this entire notice. You may improve, modify and create derivative works of the software or any portion of the software, and you may copy and distribute such modifications or works. Modified works should carry a notice stating that you changed the software and should note the date and nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the source of the software.
2 // NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
3 // You are solely responsible for determining the appropriateness of using and distributing the software and you assume all risks associated with its use, including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of operation. This software is not intended to be used in any situation where a failure could cause risk of injury or damage to property. The software developed by NIST employees is not subject to copyright protection within the United States.
4 
13 #ifndef HTGS_ANYITASK_HPP
14 #define HTGS_ANYITASK_HPP
15 
16 #include <memory>
17 #include <cassert>
18 #include <sstream>
19 
21 #include <htgs/api/MemoryData.hpp>
24 #include <htgs/types/Types.hpp>
25 #include <htgs/types/MMType.hpp>
28 #include <htgs/utils/ProfileUtils.hpp>
29 
30 
31 #ifdef WS_PROFILE
32 #include <htgs/core/graph/profile/ProfileData.hpp>
33 #include <htgs/core/graph/profile/CustomProfile.hpp>
34 #endif
35 
36 
37 namespace htgs {
38 
39  class AnyTaskManager;
40 
48 class AnyITask {
49  public:
50 
55  this->numThreads = 1;
56  this->startTask = false;
57  this->poll = false;
58  this->microTimeoutTime = 0;
59  this->memoryWaitTime = 0;
60 
61  memoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
62  releaseMemoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
63 
64  this->pipelineId = 0;
65  this->numPipelines = 1;
66  }
67 
72  AnyITask(size_t numThreads) {
73  this->numThreads = numThreads;
74  this->startTask = false;
75  this->poll = false;
76  this->microTimeoutTime = 0L;
77  this->memoryWaitTime = 0;
78 
79  memoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
80  releaseMemoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
81 
82  this->pipelineId = 0;
83  this->numPipelines = 1;
84  }
85 
94  AnyITask(size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime) {
95  this->numThreads = numThreads;
96  this->startTask = isStartTask;
97  this->poll = poll;
98  this->microTimeoutTime = microTimeoutTime;
99  this->memoryWaitTime = 0;
100 
101  memoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
102  releaseMemoryEdges = std::shared_ptr<ConnectorMap>(new ConnectorMap());
103 
104  this->pipelineId = 0;
105  this->numPipelines = 1;
106  }
107 
108 
112 
116  virtual ~AnyITask() {}
117 
122  virtual AnyITask *copy() = 0;
123 
128  virtual std::string getName() = 0;
129 
134  virtual std::string getDotLabelName() = 0;
135 
140  virtual std::string getDotShapeColor() = 0;
141 
146  virtual std::string getDotFillColor() = 0;
147 
152  virtual std::string getDotShape() = 0;
153 
159  virtual std::string getDotCustomProfile() = 0;
160 
169  virtual void shutdown() = 0;
170 
174  virtual void initialize() = 0;
175 
182  virtual void executeTaskFinal() = 0;
183 
192  virtual bool canTerminate(std::shared_ptr<AnyConnector> inputConnector) = 0;
193 
202  virtual std::string genDot(int flags,
203  std::string dotId,
204  std::shared_ptr<htgs::AnyConnector> input,
205  std::shared_ptr<htgs::AnyConnector> output) {
206  std::ostringstream oss;
207 
208  if ((flags & DOTGEN_FLAG_SHOW_CONNECTORS) != 0 || (flags & DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE) != 0) {
209 
210  if (input != nullptr) {
211  oss << input->getDotId() << " -> " << dotId << ";" << std::endl;
212  oss << input->genDot(flags);
213  }
214 
215  if (output != nullptr) {
216  oss << dotId << " -> " << output->getDotId() << ";" << std::endl;
217  oss << output->genDot(flags);
218  }
219  }
220 
221  oss << genDot(flags, dotId);
222 
223  return oss.str();
224  }
225 
226 
227  virtual std::string getConsumerDotIds() {
228  return this->getDotId();
229  }
230 
231  virtual std::string getProducerDotIds() {
232  return this->getDotId();
233  }
234 
235 
236  virtual std::string genDotProducerEdgeToTask(std::map<std::shared_ptr<AnyConnector>, AnyITask *> &inputConnectorDotMap, int dotFlags) = 0;
237 
238  virtual std::string genDotConsumerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags) = 0;
239  virtual std::string genDotProducerEdgeFromConnector(std::shared_ptr<AnyConnector> connector, int flags) = 0;
240 // {
241 //
243 // }
244 
245 // virtual std::string genDotConsumerEdgeWithoutConnector(std::string producerDotId, int flags)
246 // {
247 // return producerDotId + " -> " + this->getDotId() + ";\n";
248 // }
249 
256  virtual std::string genDot(int flags, std::string dotId) {
257  return dotId + ";\n";
258 // std::string inOutLabel = (((flags & DOTGEN_FLAG_SHOW_IN_OUT_TYPES) != 0) ? ("\nin: "+ this->inTypeName() + "\nout: " + this->outTypeName()) : "");
259 // std::string threadLabel = (((flags & DOTGEN_FLAG_SHOW_ALL_THREADING) != 0) ? "" : (" x" + std::to_string(this->getNumThreads())));
260 // return dotId + "[label=\"" + this->getName() +
261 // (this->debugDotNode() != "" ? ("\n"+this->debugDotNode()+"\n") : "") +
262 // threadLabel + inOutLabel + "\",shape=box,color=black,width=.2,height=.2];\n";
263  }
264 
270  virtual std::string genCustomDot(ProfileUtils *profileUtils, int colorFlag)
271  { return ""; }
272 
276  virtual void debug() {}
277 
282  virtual std::string debugDotNode() { return ""; }
283 
284 
285 
290  virtual void profile() {}
291 
301  virtual std::string profileStr() { return ""; }
302 
307  virtual std::string inTypeName() = 0;
308 
313  virtual std::string outTypeName() = 0;
314 
319  virtual std::string getAddress() = 0;
320 
329  virtual AnyITask *copyITask(bool deep) = 0;
330 
334  virtual void printProfile() = 0;
335 
336 
340 
346  void initialize(size_t pipelineId, size_t numPipeline) {
347  this->pipelineId = pipelineId;
348  this->numPipelines = numPipeline;
349  this->initialize();
350  }
351 
359  void setPipelineId(size_t pipelineId) {
360  this->pipelineId = pipelineId;
361  }
362 
367  size_t getPipelineId() {
368  return this->pipelineId;
369  }
370 
379  this->numPipelines = numPipelines;
380  }
381 
382  // TODO: Delete or Add #ifdef
383 // /**
384 // * Sets the task graph communicator.
385 // * @param communicator
386 // */
387 // void setTaskGraphCommunicator(TaskGraphCommunicator *communicator) {
388 // this->taskGraphCommunicator = communicator;
389 // }
390 //
391 // /**
392 // * Gets the task graph communicator
393 // * @return the task graph communicator
394 // */
395 // TaskGraphCommunicator *getTaskGraphCommunicator() const {
396 // return taskGraphCommunicator;
397 // }
398 
399 
404  size_t getNumPipelines() const {
405  return this->numPipelines;
406  }
407 
412  size_t getNumThreads() const {
413  return this->numThreads;
414  }
415 
422  bool isStartTask() const {
423  return this->startTask;
424  }
425 
432  bool isPoll() const {
433  return this->poll;
434  }
435 
440  size_t getMicroTimeoutTime() const {
441  return this->microTimeoutTime;
442  }
443 
448  void copyMemoryEdges(AnyITask *iTaskCopy) {
449  iTaskCopy->setMemoryEdges(this->memoryEdges);
450  iTaskCopy->setReleaseMemoryEdges(this->releaseMemoryEdges);
451  }
452 
460  std::string genDot(int flags, std::shared_ptr<AnyConnector> input, std::shared_ptr<AnyConnector> output) {
461  std::string dotId = this->getDotId();
462  std::ostringstream oss;
463  oss << genDot(flags, dotId, input, output);
464 
465  if ((flags & DOTGEN_FLAG_HIDE_MEM_EDGES) == 0) {
466  if (memoryEdges->size() > 0) {
467  for (const auto &kv : *this->memoryEdges) {
468  oss << kv.second->getDotId() << " -> " << dotId << "[label=\"get\", color=sienna];" << std::endl;
469  }
470  }
471  }
472 
473  return oss.str();
474  }
475 
482  void profileITask() {
483  if (memoryEdges->size() > 0) {
484  for (const auto &kv : *this->memoryEdges) {
485  std::cout << "Mem getter: " << kv.first << " profile; ";
486  // consume
487  kv.second->profileConsume(this->numThreads, false);
488  }
489  }
490  this->profile();
491  }
492 
497  std::string getDotId() {
498  std::ostringstream id;
499  id << "x" << this;
500  std::string idStr = id.str();
501 
502  return idStr;
503  }
504 
509  std::string getNameWithPipelineId() { return this->getName() + std::to_string(this->pipelineId); }
510 
515  const std::shared_ptr<ConnectorMap> &getMemoryEdges() const {
516  return memoryEdges;
517  }
518 
523  const std::shared_ptr<ConnectorMap> &getReleaseMemoryEdges() const {
524  return releaseMemoryEdges;
525  }
526 
535  bool hasMemoryEdge(std::string name) {
536  return memoryEdges->find(name) != memoryEdges->end();
537  }
538 
549  void attachMemoryEdge(std::string name, std::shared_ptr<AnyConnector> getMemoryConnector,
550  std::shared_ptr<AnyConnector> releaseMemoryConnector, MMType type) {
551  if (hasMemoryEdge(name)) {
552  std::cerr << "ERROR: " << this->getName() << " already has a memory edge named " << name << std::endl;
553  } else {
554  memoryEdges->insert(ConnectorPair(name, getMemoryConnector));
555  releaseMemoryEdges->insert(ConnectorPair(name, releaseMemoryConnector));
556  }
557 
558  HTGS_DEBUG("Num memory getters " << memoryEdges->size());
559  }
560 
561 
566  unsigned long long int getMemoryWaitTime() const {
567  return memoryWaitTime;
568  }
569 
574  void incMemoryWaitTime(unsigned long long int val) { this->memoryWaitTime += val; }
575 
576 #ifdef WS_PROFILE
577  void sendWSProfileUpdate(StatusCode code)
578  {
579  if (this->getName() == "WebSocketProfiler")
580  return;
581  std::shared_ptr<ProfileData> updateStatus(new ChangeStatusProfile(this, code));
582  std::shared_ptr<DataPacket> dataPacket(new DataPacket(this->getName(), this->getAddress(), "WebSocketProfiler", "0", updateStatus));
583  this->taskGraphCommunicator->produceDataPacket(dataPacket);
584  }
585 #endif
586 
587  private:
589  void setMemoryEdges(std::shared_ptr<ConnectorMap> memGetter) { this->memoryEdges = memGetter; }
590 
591  void setReleaseMemoryEdges(const std::shared_ptr<ConnectorMap> &releaseMemoryEdges) {
593  }
595 
596 
597 
598 
599  size_t
601  bool startTask;
602  bool poll;
604  size_t pipelineId;
605  size_t numPipelines;
606 
607  std::shared_ptr<ConnectorMap>
609  std::shared_ptr<ConnectorMap>
611 
612  // TODO: Delete or Add #ifdef
613 // TaskGraphCommunicator *taskGraphCommunicator; //!< Task graph connector communicator
614 
615  unsigned long long int memoryWaitTime;
616 
617 };
618 }
619 
620 #endif //HTGS_ANYITASK_HPP
Implements the parent class for a Task to remove the template arguments and the TaskManagerThread to ...
virtual std::string debugDotNode()
Provides debug output for a node in the dot graph.
Definition: AnyITask.hpp:282
const std::shared_ptr< ConnectorMap > & getReleaseMemoryEdges() const
Gets the memory edges for releasing memory for the memory manager, used to shutdown the memory manage...
Definition: AnyITask.hpp:523
virtual std::string getName()=0
Virtual function to get the name of an ITask.
virtual void printProfile()=0
Prints the profile data to std::out.
virtual void debug()
Virtual function that is called to debug the ITask.
Definition: AnyITask.hpp:276
Implements the parent ITask, which removes the template arguments of an ITask.
Definition: AnyITask.hpp:48
std::string getNameWithPipelineId()
Gets the name of the ITask with it&#39;s pipeline ID.
Definition: AnyITask.hpp:509
std::string getDotId()
Gets the id used for dot nodes.
Definition: AnyITask.hpp:497
virtual std::string profileStr()
Virtual function that is called after executionTask is called.
Definition: AnyITask.hpp:301
bool hasMemoryEdge(std::string name)
Checks whether this ITask contains a memory edge for a specified name.
Definition: AnyITask.hpp:535
MMType
The memory manager types.
Definition: MMType.hpp:38
virtual std::string inTypeName()=0
Gets the demangled input type name of the connector.
size_t getNumThreads() const
Gets the number of threads associated with this ITask.
Definition: AnyITask.hpp:412
Provides the Connector class for managing input/output of AbsData between Tasks.
void profileITask()
Provides profile output for the ITask,.
Definition: AnyITask.hpp:482
bool startTask
Whether the ITask will be a start task used when creating a TaskManager.
Definition: AnyITask.hpp:601
void incMemoryWaitTime(unsigned long long int val)
Increments memory wait time.
Definition: AnyITask.hpp:574
unsigned long long int memoryWaitTime
The amount of time this task waited for memory.
Definition: AnyITask.hpp:615
virtual void profile()
Virtual function that is called to provide profile output for the ITask.
Definition: AnyITask.hpp:290
virtual std::string genDot(int flags, std::string dotId)
Virtual function that adds additional dot attributes to this node.
Definition: AnyITask.hpp:256
void initialize(size_t pipelineId, size_t numPipeline)
Virtual function that is called when an ITask is being initialized by it&#39;s owner thread.
Definition: AnyITask.hpp:346
size_t getNumPipelines() const
Sets the task graph communicator.
Definition: AnyITask.hpp:404
size_t getPipelineId()
Gets the pipeline ID.
Definition: AnyITask.hpp:367
AnyITask(size_t numThreads, bool isStartTask, bool poll, size_t microTimeoutTime)
Constructs an ITask with a specified number of threads as well as additional scheduling options...
Definition: AnyITask.hpp:94
unsigned long long int getMemoryWaitTime() const
Gets the amount of time the task was waiting for memory.
Definition: AnyITask.hpp:566
virtual AnyITask * copyITask(bool deep)=0
Copies the ITask including its list of memGetters and memReleasers.
size_t numThreads
The number of threads to be used with this ITask (forms a thread pool) used when creating a TaskManag...
Definition: AnyITask.hpp:600
virtual std::string genDot(int flags, std::string dotId, std::shared_ptr< htgs::AnyConnector > input, std::shared_ptr< htgs::AnyConnector > output)
Virtual function that generates the input/output and per-task dot notation.
Definition: AnyITask.hpp:202
virtual void initialize()=0
Virtual function that is called when an ITask is being initialized by it&#39;s owner thread.
virtual std::string getDotShape()=0
Gets the shape for graphviz dot.
void attachMemoryEdge(std::string name, std::shared_ptr< AnyConnector > getMemoryConnector, std::shared_ptr< AnyConnector > releaseMemoryConnector, MMType type)
Attaches a memory edge to this ITask to get memory.
Definition: AnyITask.hpp:549
size_t numPipelines
The number of pipelines that exist for this task.
Definition: AnyITask.hpp:605
virtual void shutdown()=0
Virtual function that is called when an ITask is being shutdown by it&#39;s owner thread.
AnyITask()
Creates an ITask with number of threads equal to 1.
Definition: AnyITask.hpp:54
#define DOTGEN_FLAG_HIDE_MEM_EDGES
Hides memory edges during dot generation.
Definition: TaskGraphDotGenFlags.hpp:20
virtual std::string genCustomDot(ProfileUtils *profileUtils, int colorFlag)
Virtual function to generate customized dot file.
Definition: AnyITask.hpp:270
void setNumPipelines(size_t numPipelines)
Sets the number of pipelines that this ITask belongs too.
Definition: AnyITask.hpp:378
Defines the Memory Manager types MMType.
Holds parent class for Connector, removes template type of Connector.
#define HTGS_DEBUG(msg)
Prints a debug message to std::cerr with standard level If DEBUG_FLAG is not defined, this equates to a no op Each message includes the file and line number for where the debug is called.
Definition: debug_message.hpp:65
virtual std::string outTypeName()=0
Gets the demangled output type name of the connector.
bool isStartTask() const
Gets whether this ITask is a starting task.
Definition: AnyITask.hpp:422
size_t getMicroTimeoutTime() const
Gets the timeout time for polling.
Definition: AnyITask.hpp:440
Defines common types used throughout the HTGS API and some of which that are used by users of HTGS su...
virtual ~AnyITask()
Destructor.
Definition: AnyITask.hpp:116
const std::shared_ptr< ConnectorMap > & getMemoryEdges() const
Gets the memory edges for the task.
Definition: AnyITask.hpp:515
virtual std::string getDotLabelName()=0
Virtual function to get the label name used for dot graph viz.
bool poll
Whether the ITask should poll for data used when creating a TaskManager.
Definition: AnyITask.hpp:602
size_t microTimeoutTime
The timeout time for polling in microseconds used when creating a TaskManager.
Definition: AnyITask.hpp:603
Implements a data packet that is transmitted to the TaskGraphCommunicator.
std::pair< std::string, std::shared_ptr< AnyConnector > > ConnectorPair
A pair used for the ConnectorMap.
Definition: Types.hpp:48
virtual std::string getAddress()=0
Gets the address from the owner task, which is the address of the task graph.
void copyMemoryEdges(AnyITask *iTaskCopy)
Copies the memory edges from this AnyITask to another AnyITask.
Definition: AnyITask.hpp:448
std::unordered_map< std::string, std::shared_ptr< AnyConnector > > ConnectorMap
A mapping between the name of a task and its connector.
Definition: Types.hpp:42
std::shared_ptr< ConnectorMap > memoryEdges
A mapping from memory edge name to memory manager connector for getting memory.
Definition: AnyITask.hpp:608
virtual void executeTaskFinal()=0
Virtual function that is called just before the task has shutdown.
std::string genDot(int flags, std::shared_ptr< AnyConnector > input, std::shared_ptr< AnyConnector > output)
Creates a dot notation representation for this task.
Definition: AnyITask.hpp:460
AnyITask(size_t numThreads)
Constructs an ITask with a specified number of threads.
Definition: AnyITask.hpp:72
virtual std::string getDotCustomProfile()=0
Virtual function that can be used to add custom output for dot visualizations.
bool isPoll() const
Gets whether this ITask is polling for data or not.
Definition: AnyITask.hpp:432
void setPipelineId(size_t pipelineId)
Sets the pipeline Id for this ITask.
Definition: AnyITask.hpp:359
virtual bool canTerminate(std::shared_ptr< AnyConnector > inputConnector)=0
Virtual function that is called when an ITask is checking if it can be terminated.
Implements MemoryData used by a MemoryManager, which can be shared among multiple ITask...
size_t pipelineId
The execution pipeline id for the ITask.
Definition: AnyITask.hpp:604
virtual std::string getDotFillColor()=0
Gets the color for filling the shape for graphviz dot.
virtual AnyITask * copy()=0
Pure virtual function to copy an ITask.
std::shared_ptr< ConnectorMap > releaseMemoryEdges
A mapping from the memory edge name to the memory manager&#39;s input connector to shutdown the memory ma...
Definition: AnyITask.hpp:610
Definition: ProfileUtils.hpp:13
Definition: Bookkeeper.hpp:23
virtual std::string getDotShapeColor()=0
Gets the color of the shape for graphviz dot.
Defines DOTGEN flags used for dot file generation.
Provides functionality for debug messaging.
#define DOTGEN_FLAG_SHOW_CONNECTOR_VERBOSE
Shows verbose information within each connector in the graph.
Definition: TaskGraphDotGenFlags.hpp:92