diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h --- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h @@ -21,8 +21,11 @@ public: static AmqpManager &getInstance(); void connect(); - bool - send(std::string toDeviceID, std::string fromDeviceID, std::string payload); + bool send( + std::string messageID, + std::string toDeviceID, + std::string fromDeviceID, + std::string payload); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp --- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp @@ -61,13 +61,14 @@ try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); + const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); std::cout << "AMQP: Message consumed for deviceID: " << toDeviceID << std::endl; DeliveryBroker::getInstance().push( - deliveryTag, toDeviceID, fromDeviceID, payload); + messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { std::cout << "AMQP: Message parsing exception: " << e.what() << std::endl; @@ -104,6 +105,7 @@ } bool AmqpManager::send( + std::string messageID, std::string toDeviceID, std::string fromDeviceID, std::string payload) { @@ -114,6 +116,7 @@ try { AMQP::Envelope env(payload.c_str(), payload.size()); AMQP::Table headers; + headers[AMQP_HEADER_MESSAGEID] = messageID; headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; // Set delivery mode to: Durable (2) diff --git a/services/tunnelbroker/docker-server/contents/server/src/Constants.h b/services/tunnelbroker/docker-server/contents/server/src/Constants.h --- a/services/tunnelbroker/docker-server/contents/server/src/Constants.h +++ b/services/tunnelbroker/docker-server/contents/server/src/Constants.h @@ -32,8 +32,9 @@ // queue TTL in case of no consumers (tunnelbroker is down) const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours // routing message headers name -const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid"; -const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid"; +const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID"; +const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID"; +const std::string AMQP_HEADER_MESSAGEID = "messageID"; const long long AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h --- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h +++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h @@ -3,34 +3,33 @@ #include "Constants.h" #include "DeliveryBrokerEntites.h" +#include #include -#include #include #include -#include namespace comm { namespace network { class DeliveryBroker { - folly::ConcurrentHashMap> + folly::ConcurrentHashMap< + std::string, + std::unique_ptr>> messagesMap; - std::mutex localMutex; - std::condition_variable localCv; public: static DeliveryBroker &getInstance(); void push( + const std::string messageID, const uint64_t deliveryTag, const std::string toDeviceID, const std::string fromDeviceID, const std::string payload); - std::vector get(const std::string deviceID); - bool isEmpty(const std::string key); - void remove(const std::string key); - void wait(const std::string key); + bool isEmpty(const std::string deviceID); + DeliveryBrokerMessage pop(const std::string deviceID); + void erase(const std::string deviceID); }; } // namespace network diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp --- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp @@ -9,58 +9,54 @@ }; void DeliveryBroker::push( + const std::string messageID, const uint64_t deliveryTag, const std::string toDeviceID, const std::string fromDeviceID, const std::string payload) { try { - std::unique_lock localLock(this->localMutex); - std::vector messagesList; + const DeliveryBrokerMessage newMessage = { + .messageID = messageID, .deliveryTag = deliveryTag, .fromDeviceID = fromDeviceID, .payload = payload}; if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) { - messagesList.push_back(newMessage); - this->messagesMap.insert({toDeviceID, messagesList}); - this->localCv.notify_all(); - return; + this->messagesMap.insert( + toDeviceID, + std::make_unique>(100)); } - - messagesList = this->messagesMap[toDeviceID]; - messagesList.push_back(newMessage); - this->messagesMap.assign(toDeviceID, messagesList); - this->localCv.notify_all(); + *(this->messagesMap).find(toDeviceID)->second->blockingWrite(newMessage); } catch (const std::exception &e) { - std::cout << "DeliveryBroker: " + std::cout << "DeliveryBroker push: " << "Got an exception " << e.what() << std::endl; - this->localCv.notify_all(); - } -}; - -std::vector -DeliveryBroker::get(const std::string deviceID) { - if (this->messagesMap.find(deviceID) == this->messagesMap.end()) { - return {}; } - return this->messagesMap[deviceID]; }; -bool DeliveryBroker::isEmpty(const std::string key) { +bool DeliveryBroker::isEmpty(const std::string deviceID) { if (this->messagesMap.empty()) { return true; } - return (this->messagesMap.find(key) == this->messagesMap.end()); + return (this->messagesMap.find(deviceID) == this->messagesMap.end()); }; -void DeliveryBroker::remove(const std::string key) { - this->messagesMap.erase(key); -}; +DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) { + try { + DeliveryBrokerMessage message; + if (this->messagesMap.find(deviceID) == this->messagesMap.end()) { + return message; + } + *(this->messagesMap).find(deviceID)->second->blockingRead(message); + return message; + } catch (const std::exception &e) { + std::cout << "DeliveryBroker remove: " + << "Got an exception " << e.what() << std::endl; + } +} -void DeliveryBroker::wait(const std::string key) { - std::unique_lock localLock(this->localMutex); - this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); }); +void DeliveryBroker::erase(const std::string deviceID) { + this->messagesMap.erase(deviceID); }; } // namespace network diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h --- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h +++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h @@ -7,6 +7,7 @@ namespace network { struct DeliveryBrokerMessage { + std::string messageID; uint64_t deliveryTag; std::string fromDeviceID; std::string payload; diff --git a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp @@ -7,6 +7,9 @@ #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "Tools.h" + +#include + namespace comm { namespace network { @@ -152,7 +155,9 @@ "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); + const std::string messageID = generateUUID(); if (!AmqpManager::getInstance().send( + messageID, request->todeviceid(), clientDeviceID, std::string(request->payload()))) { @@ -194,23 +199,21 @@ "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); - std::vector messagesToDeliver; + DeliveryBrokerMessage messageToDeliver; while (1) { - messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID); - for (auto const &message : messagesToDeliver) { - tunnelbroker::GetResponse response; - response.set_fromdeviceid(message.fromDeviceID); - response.set_payload(message.payload); - if (!writer->Write(response)) { - throw std::runtime_error( - "gRPC: 'Get' writer error on sending data to the client"); - } - comm::network::AmqpManager::getInstance().ack(message.deliveryTag); + messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); + tunnelbroker::GetResponse response; + response.set_fromdeviceid(messageToDeliver.fromDeviceID); + response.set_payload(messageToDeliver.payload); + if (!writer->Write(response)) { + throw std::runtime_error( + "gRPC: 'Get' writer error on sending data to the client"); } + comm::network::AmqpManager::getInstance().ack( + messageToDeliver.deliveryTag); if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { - DeliveryBroker::getInstance().remove(clientDeviceID); + DeliveryBroker::getInstance().erase(clientDeviceID); } - DeliveryBroker::getInstance().wait(clientDeviceID); } } catch (std::runtime_error &e) { std::cout << "gRPC: "