Hedgehog  3.1.0
A library to generate hybrid pipeline workflow systems
Loading...
Searching...
No Matches
core_state_manager.h
Go to the documentation of this file.
1// NIST-developed software is provided by NIST as a public service. You may use, copy and distribute copies of the
2// software in any medium, provided that you keep intact this entire notice. You may improve, modify and create
3// derivative works of the software or any portion of the software, and you may copy and distribute such modifications
4// or works. Modified works should carry a notice stating that you changed the software and should note the date and
5// nature of any such change. Please explicitly acknowledge the National Institute of Standards and Technology as the
6// source of the software. NIST-developed software is expressly provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND,
7// EXPRESS, IMPLIED, IN FACT OR ARISING BY OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
8// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA ACCURACY. NIST NEITHER REPRESENTS NOR
9// WARRANTS THAT THE OPERATION OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
10// CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF THE SOFTWARE OR THE RESULTS
11// THEREOF, INCLUDING BUT NOT LIMITED TO THE CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE. You
12// are solely responsible for determining the appropriateness of using and distributing the software and you assume
13// all risks associated with its use, including but not limited to the risks and costs of program errors, compliance
14// with applicable laws, damage to or loss of data, programs or equipment, and the unavailability or interruption of
15// operation. This software is not intended to be used in any situation where a failure could cause risk of injury or
16// damage to property. The software developed by NIST employees is not subject to copyright protection within the
17// United States.
18
19#ifndef HEDGEHOG_CORE_STATE_MANAGER_H_
20#define HEDGEHOG_CORE_STATE_MANAGER_H_
21
22#include <ostream>
23#include <sstream>
24
25#include "../../tools/traits.h"
26#include "../../tools/concepts.h"
27#include "../../tools/meta_functions.h"
28
29#include "../abstractions/base/cleanable_abstraction.h"
30#include "../abstractions/base/copyable_abstraction.h"
31#include "../abstractions/base/can_terminate_abstraction.h"
32#include "../abstractions/node/task_inputs_management_abstraction.h"
33#include "../abstractions/node/task_outputs_management_abstraction.h"
34#include "../implementors/concrete_implementor/multi_queue_receivers.h"
35#include "../implementors/concrete_implementor/default_multi_executes.h"
36#include "../../behavior/input_output/state_sender.h"
37
39namespace hh {
40
41#ifndef DOXYGEN_SHOULD_SKIP_THIS
45template<size_t Separator, class ...AllTypes>
46class StateManager;
47
51template<size_t Separator, class ...AllTypes>
52class AbstractState;
53#endif //DOXYGEN_SHOULD_SKIP_THIS
54
56namespace core {
57
59template<size_t Separator, class ...AllTypes>
60using TIM = tool::TaskInputsManagementAbstractionTypeDeducer_t
61 <tool::Inputs<Separator, AllTypes...>>;
62
64template<size_t Separator, class ...AllTypes>
65using TOM = tool::TaskOutputsManagementAbstractionTypeDeducer_t
66 <tool::Outputs<Separator, AllTypes...>>;
67
71template<size_t Separator, class ...AllTypes>
77 public abstraction::CopyableAbstraction<StateManager<Separator, AllTypes...>>,
78 public TIM<Separator, AllTypes...>,
79 public TOM<Separator, AllTypes...> {
80 private:
81 StateManager<Separator, AllTypes...> *const
82 stateManager_ = nullptr;
83
84 std::shared_ptr<AbstractState<Separator, AllTypes...>> const
85 state_ = nullptr;
86
87 bool const
89
90 public:
95 std::shared_ptr<AbstractState<Separator, AllTypes...>> const &state)
96 : CoreStateManager<Separator, AllTypes...>(stateManager, state, "AbstractState Manager", false) {}
97
104 StateManager<Separator, AllTypes...> *const stateManager,
105 std::shared_ptr<AbstractState<Separator, AllTypes...>> const &state,
106 std::string const &name, bool const automaticStart)
107 : TaskNodeAbstraction(name, stateManager),
108 CleanableAbstraction(static_cast<behavior::Cleanable *>(stateManager)),
109 CanTerminateAbstraction(static_cast<behavior::CanTerminate *>(stateManager)),
110 abstraction::CopyableAbstraction<StateManager<Separator, AllTypes...>>(stateManager),
111 TIM<Separator, AllTypes...>(
112 this,
113 std::make_shared<implementor::DefaultSlot>(),
114 std::make_shared<tool::MultiQueueReceiversTypeDeducer_t<tool::Inputs<Separator, AllTypes...>>>(),
115 std::make_shared<hh::tool::DefaultMultiExecutesTypeDeducer_t<tool::Inputs<Separator, AllTypes...>>>(
116 state.get()
117 )
118 ),
119 TOM<Separator, AllTypes...>(),
120 stateManager_(stateManager),
121 state_(state),
123
137 template<class ConcreteMultiReceivers, class ConcreteMultiExecutes, class ConcreteMultiSenders>
139 std::shared_ptr<AbstractState<Separator, AllTypes...>> const &state,
140 std::string const &name, bool const automaticStart,
141 std::shared_ptr<implementor::ImplementorSlot> const &concreteSlot,
142 std::shared_ptr<ConcreteMultiReceivers> concreteMultiReceivers,
143 std::shared_ptr<ConcreteMultiExecutes> concreteMultiExecutes,
144 std::shared_ptr<implementor::ImplementorNotifier> const &concreteNotifier,
145 std::shared_ptr<ConcreteMultiSenders> concreteMultiSenders) :
146 TaskNodeAbstraction(name, stateManager),
147 CleanableAbstraction(static_cast<behavior::Cleanable *>(stateManager)),
148 CanTerminateAbstraction(static_cast<behavior::CanTerminate *>(stateManager)),
149 abstraction::CopyableAbstraction<StateManager<Separator, AllTypes...>>(stateManager),
150 TIM<Separator, AllTypes...>(this, concreteSlot, concreteMultiReceivers, concreteMultiExecutes),
151 TOM<Separator, AllTypes...>(concreteNotifier, concreteMultiSenders),
152 stateManager_(stateManager),
153 state_(state),
155
157 ~CoreStateManager() override = default;
158
161 [[nodiscard]] bool automaticStart() const { return automaticStart_; }
162
166 bool callCanTerminate(bool lock) override {
167 bool result;
168 if (lock) { this->lockSlotMutex(); }
169 result = this->callNodeCanTerminate();
170 if (lock) { this->unlockSlotMutex(); }
171 return result;
172 }
173
176 void visit(Printer *printer) override {
177 if (printer->registerNode(this)) {
178 printer->printNodeInformation(this);
180 }
181 }
182
185 [[nodiscard]] std::vector<std::pair<std::string const, std::string const>> ids() const override {
186 return {{this->id(), this->id()}};
187 }
188
191 void preRun() override {
192 this->nvtxProfiler()->startRangeInitializing();
193 this->stateManager_->initialize();
194 if (this->stateManager_->memoryManager() != nullptr) {
195 this->stateManager_->memoryManager()->profiler(this->nvtxProfiler());
196 this->stateManager_->memoryManager()->deviceId(this->deviceId());
197 this->stateManager_->memoryManager()->initialize();
198 }
199 this->nvtxProfiler()->endRangeInitializing();
200 this->setInitialized();
201 }
202
224 void run() override {
225 using Outputs_t = tool::Outputs<Separator, AllTypes...>;
226 using Indices = std::make_index_sequence<std::tuple_size_v<Outputs_t>>;
227 Indices indices{};
228 std::chrono::time_point<std::chrono::system_clock>
229 start,
230 finish;
231
232 this->isActive(true);
233 this->nvtxProfiler()->initialize(0);
234 this->preRun();
235
236 if (this->automaticStart_) {
237 start = std::chrono::system_clock::now();
238 state_->lock();
239 state_->stateManager(this->stateManager_);
240 this->callAllExecuteWithNullptr();
241 emptyReadyLists<Outputs_t>(indices);
242 state_->stateManager(nullptr);
243 state_->unlock();
244 finish = std::chrono::system_clock::now();
245 this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
246 }
247
248 // Actual computation loop
249 while (!this->callCanTerminate(true)) {
250 // Wait for a data to arrive or termination
251 start = std::chrono::system_clock::now();
252 volatile bool canTerminate = this->wait();
253 finish = std::chrono::system_clock::now();
254 this->incrementWaitDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
255
256 // If it can terminate break the loop early
257 if (canTerminate) { break; }
258
259 // Operate the connectedReceivers to get a data and send it to execute
260 start = std::chrono::system_clock::now();
261 state_->lock();
262 state_->stateManager(this->stateManager_);
263 this->operateReceivers();
264 emptyReadyLists<Outputs_t>(indices);
265 state_->stateManager(nullptr);
266 state_->unlock();
267 finish = std::chrono::system_clock::now();
268 this->incrementExecutionDuration(std::chrono::duration_cast<std::chrono::nanoseconds>(finish - start));
269 }
270
271 // Do the shutdown phase
272 this->postRun();
273 // Wake up node that this is linked to
274 this->wakeUp();
275 }
276
278 void postRun() override {
279 this->nvtxProfiler()->startRangeShuttingDown();
280 this->isActive(false);
281 this->stateManager_->shutdown();
282 this->notifyAllTerminated();
283 this->nvtxProfiler()->endRangeShuttingDown();
284 }
285
288 [[nodiscard]] bool hasMemoryManagerAttached() const override { return this->memoryManager() != nullptr; }
289
292 [[nodiscard]] std::string extraPrintingInformation() const override {
294 }
295
298 [[nodiscard]] std::shared_ptr<AbstractMemoryManager> memoryManager() const override {
299 return this->stateManager_->memoryManager();
300 }
301
302 protected:
305 bool wait() {
306 this->nvtxProfiler()->startRangeWaiting(this->totalNumberElementsReceived());
307 std::unique_lock<std::mutex> lock(*(this->mutex()));
308 this->slotConditionVariable()->wait(
309 lock,
310 [this]() { return !this->receiversEmpty() || this->callCanTerminate(false); }
311 );
312 this->nvtxProfiler()->endRangeWaiting();
313 return callCanTerminate(false);
314 }
315
316 private:
319 void gatherCleanable(std::unordered_set<hh::behavior::Cleanable *> &cleanableSet) override {
320 CleanableAbstraction::gatherCleanable(cleanableSet);
321 cleanableSet.insert(static_cast<behavior::Cleanable *>(this->state_.get()));
322 }
323
327 template<class Outputs_t, size_t ...Indexes>
328 void emptyReadyLists(std::index_sequence<Indexes...>) {
329 (emptyReadyLists<std::tuple_element_t<Indexes, Outputs_t>>(), ...);
330 }
331
334 template<class Output>
336 auto &rdyList = std::static_pointer_cast<behavior::StateSender<Output>>(state_)->readyList();
337 std::shared_ptr<Output> data = nullptr;
338 while (!rdyList->empty()) {
339 data = rdyList->front();
340 rdyList->pop();
341 this->sendAndNotify(data);
342 }
343 }
344
348 std::shared_ptr<abstraction::NodeAbstraction> clone(
349 [[maybe_unused]] std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &correspondenceMap) override {
350 auto clone = std::dynamic_pointer_cast<StateManager<Separator, AllTypes...>>(this->callCopy());
351 if (this->hasMemoryManagerAttached()) { clone->connectMemoryManager(this->memoryManager()->copy()); }
352 return clone->core();
353 }
354
357 void duplicateEdge(std::map<NodeAbstraction *, std::shared_ptr<NodeAbstraction>> &mapping) override {
358 this->duplicateOutputEdges(mapping);
359 }
360
361};
362
363}
364}
365#endif //HEDGEHOG_CORE_STATE_MANAGER_H_
Hedgehog main namespace.
typename internals::Splitter< delta, Types... >::Inputs Inputs
Helper getting the input types from a list of template types (variadic)
typename internals::Splitter< delta, Types... >::Outputs Outputs
Helper getting the output types from a list of template types (variadic)
tool::TaskOutputsManagementAbstractionTypeDeducer_t< tool::Outputs< Separator, AllTypes... > > TOM
Type alias for an TaskOutputsManagementAbstraction from the list of template parameters.
tool::TaskInputsManagementAbstractionTypeDeducer_t< tool::Inputs< Separator, AllTypes... > > TIM
Type alias for an TaskInputsManagementAbstraction from the list of template parameters.
Printer abstraction to get a snapshot of the metrics of the Hedgehog graph.
Definition: printer.h:52
bool registerNode(core::abstraction::NodeAbstraction const *nodeAbstraction)
Register a visited node.
Definition: printer.h:113
virtual void printNodeInformation(core::abstraction::NodeAbstraction const *node)=0
Print node information.
Hedgehog AbstractState.
AbstractState manager.
Definition: state_manager.h:66
Cleanable interface.
Definition: cleanable.h:29
virtual std::string extraPrintingInformation() const
Print extra information for the task.
Definition: task_node.h:89
virtual void initialize()
initialize step for the task
Definition: task_node.h:82
virtual void shutdown()
shutdown step for the task
Definition: task_node.h:85
std::shared_ptr< AbstractMemoryManager > const & memoryManager() const
Memory manager accessor.
Definition: task_node.h:53
Abstraction for core that present termination condition.
bool callNodeCanTerminate() const
Call user-definable termination.
CanTerminateAbstraction(behavior::CanTerminate *const canTerminateNode)
Constructor using behavior::CanTerminate node abstraction.
CleanableAbstraction()=default
Constructor used by the CoreGraph to have the handles to clean inner cleanable nodes.
Core abstraction for clonable nodes, nodes duplicated by execution pipeline.
Core abstraction for copyable nodes.
CopyableAbstraction(CopyableNode *const copyableNode)
Constructor using a node abstraction.
std::shared_ptr< CopyableNode > callCopy()
Interface to call user-defined copy method.
virtual std::string id() const
Core's id ('x' + address of abstraction) as string.
NodeAbstraction(std::string name)
Core node constructor using the core's name.
void incrementExecutionDuration(std::chrono::nanoseconds const &exec)
Increment execution duration.
std::string const & name() const
Accessor to the core's name.
virtual int deviceId() const
Get the device identifier (got from belonging graph)
Task core abstraction used to define some method for task-like behaving cores like CoreExecutionPipel...
void incrementWaitDuration(std::chrono::nanoseconds const &wait)
Increment the wait duration.
void setInitialized()
Set the task as initialized.
TaskNodeAbstraction(std::string const &name, behavior::Node *node)
Create the abstraction with the node's name.
std::shared_ptr< NvtxProfiler > const & nvtxProfiler() const
Accessor to the NVTX profiler attached to the node.
bool isActive() const
Accessor to task status.
AbstractState manager core.
CoreStateManager(StateManager< Separator, AllTypes... > *const stateManager, std::shared_ptr< AbstractState< Separator, AllTypes... > > const &state, std::string const &name, bool const automaticStart, std::shared_ptr< implementor::ImplementorSlot > const &concreteSlot, std::shared_ptr< ConcreteMultiReceivers > concreteMultiReceivers, std::shared_ptr< ConcreteMultiExecutes > concreteMultiExecutes, std::shared_ptr< implementor::ImplementorNotifier > const &concreteNotifier, std::shared_ptr< ConcreteMultiSenders > concreteMultiSenders)
Construct a state manager from the state manager and its concrete abstraction's implementations.
void preRun() override
Initialize the state manager.
void duplicateEdge(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &mapping) override
Duplicate the state manager edge.
bool wait()
Wait statement when the node is alive and no input data are available.
bool hasMemoryManagerAttached() const override
Test if a memory manager is attached.
std::shared_ptr< AbstractMemoryManager > memoryManager() const override
Accessor to the memory manager.
bool automaticStart() const
Accessor to the automatic start flag.
bool const automaticStart_
Flag for automatic start.
void emptyReadyLists(std::index_sequence< Indexes... >)
Interface method to call emptyReadyLists for a type when decomposing types from a tuple.
void run() override
Main core state manager logic.
std::string extraPrintingInformation() const override
Accessor to user-defined extra information for the state-manager.
void visit(Printer *printer) override
Visit the state manager.
void postRun() override
Post run logic, shutdown the state manager, notify successor nodes of state manager termination.
std::shared_ptr< abstraction::NodeAbstraction > clone(std::map< NodeAbstraction *, std::shared_ptr< NodeAbstraction > > &correspondenceMap) override
Clone method, to duplicate a state manager when it is part of another graph in an execution pipeline.
~CoreStateManager() override=default
Default destructor.
StateManager< Separator, AllTypes... > *const stateManager_
User defined state manager.
bool callCanTerminate(bool lock) override
Call user-definable termination.
void gatherCleanable(std::unordered_set< hh::behavior::Cleanable * > &cleanableSet) override
Gather the state manager and the state into the cleanableSet.
void emptyReadyLists()
Empty the ready list for a type.
CoreStateManager(StateManager< Separator, AllTypes... > *const stateManager, std::shared_ptr< AbstractState< Separator, AllTypes... > > const &state)
Construct a state manager from the user state manager and its state.
std::shared_ptr< AbstractState< Separator, AllTypes... > > const state_
Managed state.
std::vector< std::pair< std::string const, std::string const > > ids() const override
Node ids [nodeId, nodeGroupId] accessor.
CoreStateManager(StateManager< Separator, AllTypes... > *const stateManager, std::shared_ptr< AbstractState< Separator, AllTypes... > > const &state, std::string const &name, bool const automaticStart)
Construct a state manager from the user state manager, its state, a name and the automatic start flag...