diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -154,6 +154,14 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); + + const database::MessageItem message( + messageID, + clientDeviceID, + request->todeviceid(), + request->payload(), + ""); + database::DatabaseManager::getInstance().putMessageItem(message); if (!AmqpManager::getInstance().send( messageID, clientDeviceID, @@ -197,6 +205,28 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; + + std::vector> messagesFromDatabase = + database::DatabaseManager::getInstance().findMessageItemsByReceiver( + clientDeviceID); + if (messagesFromDatabase.size() > 0) { + // When a client connects and requests GET for the messages first we check + // if there are undelivered messages in the database. If so, we are + // erasing the messages to deliver from rabbitMQ which are handled by + // DeliveryBroker. + DeliveryBroker::getInstance().erase(clientDeviceID); + } + for (auto &messageFromDatabase : messagesFromDatabase) { + tunnelbroker::GetResponse response; + response.set_fromdeviceid(messageFromDatabase->getFromDeviceID()); + response.set_payload(messageFromDatabase->getPayload()); + if (!writer->Write(response)) { + throw std::runtime_error( + "gRPC: 'Get' writer error on sending data to the client"); + } + database::DatabaseManager::getInstance().removeMessageItem( + messageFromDatabase->getMessageID()); + } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); tunnelbroker::GetResponse response; @@ -208,9 +238,9 @@ } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); - if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { - DeliveryBroker::getInstance().erase(clientDeviceID); - } + database::DatabaseManager::getInstance().removeMessageItem( + messageToDeliver.messageID); + DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: "