20 #ifndef HEDGEHOG_CORE_QUEUE_SENDER_H 21 #define HEDGEHOG_CORE_QUEUE_SENDER_H 23 #include "core_queue_notifier.h" 24 #include "../receiver/core_queue_receiver.h" 25 #include "../../base/sender/core_sender.h" 26 #include "../../../../tools/traits.h" 33 template<
class NodeOutput>
35 std::shared_ptr<std::set<CoreQueueReceiver<NodeOutput> *>>
destinations_ =
nullptr;
44 CoreSender<NodeOutput>(name, type, numberThreads) {
45 HLOG_SELF(0,
"Creating CoreQueueSender with type: " << (
int) type <<
" and name: " << name)
46 destinations_ = std::make_shared<std::set<CoreQueueReceiver<NodeOutput> *>>();
54 virtual std::shared_ptr<std::set<CoreQueueReceiver<NodeOutput> *>>
const &
destinations()
const {
61 HLOG_SELF(0,
"Add receiver " << receiver->
name() <<
"(" << receiver->
id() <<
")")
62 for (
auto queueReceiver: receiver->
receivers()) {
64 this->destinations_->insert(r);
66 std::ostringstream oss;
68 <<
"Internal error, CoreQueueSender connected to a node which is not a CoreQueueReceiver: " 70 HLOG_SELF(0, oss.str())
71 throw (std::runtime_error(oss.str()));
79 HLOG_SELF(0,
"Remove receiver " << receiver->
name() <<
"(" << receiver->
id() <<
")")
80 for (
auto queueReceiver: receiver->
receivers()) {
82 this->destinations_->erase(r);
84 std::ostringstream oss;
86 <<
"Internal error, CoreQueueSender connected to a node which is not a CoreQueueReceiver: " 88 HLOG_SELF(0, oss.str())
89 throw (std::runtime_error(oss.str()));
98 HLOG_SELF(2,
"Send data to " << receiver->name() <<
"(" << receiver->id() <<
")")
99 receiver->receive(ptr);
100 HLOG_SELF(2,
"Wake up " << receiver->name() <<
"(" << receiver->id() <<
")")
101 receiver->queueSlot()->wakeUp();
108 HLOG_SELF(1,
"Visit")
110 if (receiver->type() != NodeType::Switch || receiver->type() != NodeType::ExecutionPipeline) {
113 traits::type_name<NodeOutput>(),
114 receiver->queueSize(),
115 receiver->maxQueueSize(),
116 traits::is_managed_memory_v<NodeOutput>);
123 std::set<CoreSender<NodeOutput> *>
getSenders()
override {
return {
this}; }
128 HLOG_SELF(0,
"Copy Cluster CoreQueueSender information from " << rhs->
name() <<
"(" << rhs->
id() <<
")")
138 std::map<
CoreNode *, std::shared_ptr<CoreNode>> &correspondenceMap)
override {
140 if (
auto coreNode = dynamic_cast<CoreNode *>(originalReceiver)) {
141 auto nodeReceiverFound = correspondenceMap.find(coreNode);
142 if (nodeReceiverFound != correspondenceMap.end()) {
143 if (nodeReceiverFound->second->id() == this->
id()) {
144 std::ostringstream oss;
146 <<
"Internal error, Receiver found is the same as the original Receiver, copy failed " 148 HLOG_SELF(0, oss.str())
149 throw (std::runtime_error(oss.str()));
154 std::ostringstream oss;
156 <<
"Internal error, The Receiver is not a CoreNode, copy failed " 158 HLOG_SELF(0, oss.str())
159 throw (std::runtime_error(oss.str()));
170 auto coreSlot =
dynamic_cast<CoreSlot *
>(coreNodeReceiver);
171 auto coreNotifier =
dynamic_cast<CoreNotifier *
>(coreNodeSender);
175 for (
auto r : coreReceiver->receivers()) {
179 std::ostringstream oss;
180 oss <<
"Internal error, during edge duplication" << __FUNCTION__;
181 HLOG_SELF(0, oss.str())
182 throw (std::runtime_error(oss.str()));
186 if (coreSlot && coreNotifier) {
187 for (
CoreSlot *slot : coreSlot->
getSlots()) { coreNotifier->addSlot(slot); }
189 coreReceiver->addSender(s);
190 coreSlot->addNotifier(s);
193 std::ostringstream oss;
194 oss <<
"Internal error, during edge duplication" << __FUNCTION__;
195 HLOG_SELF(0, oss.str())
196 throw (std::runtime_error(oss.str()));
202 #endif //HEDGEHOG_CORE_QUEUE_SENDER_H NodeType type() const
Node type accessor.
Receiver Interface, receive one data type from CoreSender.
void copyInnerStructure(CoreQueueSender< NodeOutput > *rhs)
Copy the inner structure of a CoreQueueSender (destinations, and notifier)
virtual std::set< CoreReceiver< Input > * > receivers()=0
Accessor to all receivers connected to this receiver.
Core Notifier interface, emit notification to CoreSlot.
void copyInnerStructure(CoreQueueNotifier *rhs)
Copy the inner structure of the notifier (set of slots and connections)
std::shared_ptr< std::set< CoreQueueReceiver< NodeOutput > * > > destinations_
Set of receivers linked.
void visit(AbstractPrinter *printer) override
Special visit method for a CoreQueueSender, printing an edge.
std::set< CoreSender< NodeOutput > * > getSenders() override
Get inner CoreSender i.e. this.
Sender for nodes possessing a queue of data.
void connectSenderToReceiverDuplication(CoreNode *coreNodeSender, CoreNode *coreNodeReceiver)
Connect coreNodeSender to a coreNodeReceiver, connect for data transfer and signal handling...
void duplicateEdge(CoreNode *duplicateNode, std::map< CoreNode *, std::shared_ptr< CoreNode >> &correspondenceMap) override
Duplicate all the edges from this to it's copy duplicateNode.
Slot interface, receive notification from CoreNotifier.
Receiver for nodes possessing a queue of data.
~CoreQueueSender() override
CoreQueueSender destructor.
NodeType
Hedgehog node's type.
virtual void printEdge(core::CoreNode const *from, core::CoreNode const *to, std::string_view const &edgeType, size_t const &queueSize, size_t const &maxQueueSize, bool isMemoryManaged)=0
Print edge information.
Sender interface, send data to CoreReceiver.
void addReceiver(CoreReceiver< NodeOutput > *receiver) override
Add a receiver to the set of receivers.
Main Hedgehog core abstraction.
Notifier of CoreQueueSlot.
CoreQueueSender(std::string_view const &name, NodeType const type, size_t const numberThreads)
CoreQueueSender constructor.
virtual std::set< CoreSlot * > getSlots()=0
Slots accessor for the node.
std::string_view const & name() const
Node name accessor.
void removeReceiver(CoreReceiver< NodeOutput > *receiver) override
Remove a receiver from the set of receivers.
size_t numberThreads() const
Number of threads associated accessor.
virtual std::shared_ptr< std::set< CoreQueueReceiver< NodeOutput > * > > const & destinations() const
Destination accessor.
virtual std::string id() const
Unique Id accessor.
void sendAndNotify(std::shared_ptr< NodeOutput > ptr) final
Send a data to the list of destinations, and notify them.