diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -155,6 +155,14 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); + + const database::MessageItem message( + messageID, + clientDeviceID, + request->todeviceid(), + request->payload(), + ""); + database::DatabaseManager::getInstance().putMessageItem(message); if (!AmqpManager::getInstance().send( messageID, clientDeviceID, @@ -199,6 +207,21 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; + + std::vector> messagesFromDatabase = + database::DatabaseManager::getInstance().findMessageItemsByReceiver( + clientDeviceID); + for (auto &messageFromDatabase : messagesFromDatabase) { + tunnelbroker::GetResponse response; + response.set_fromdeviceid(messageFromDatabase->getFromDeviceID()); + response.set_payload(messageFromDatabase->getPayload()); + if (!writer->Write(response)) { + throw std::runtime_error( + "gRPC: 'Get' writer error on sending data to the client"); + } + database::DatabaseManager::getInstance().removeMessageItem( + messageFromDatabase->getMessageID()); + } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); tunnelbroker::GetResponse response; @@ -210,6 +233,8 @@ } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); + database::DatabaseManager::getInstance().removeMessageItem( + messageToDeliver.messageID); if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { DeliveryBroker::getInstance().erase(clientDeviceID); }