diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -154,6 +154,14 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); + + const database::MessageItem message( + messageID, + clientDeviceID, + request->todeviceid(), + request->payload(), + ""); + database::DatabaseManager::getInstance().putMessageItem(message); if (!AmqpManager::getInstance().send( messageID, clientDeviceID, @@ -197,20 +205,47 @@ } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; + + std::vector> messagesFromDatabase = + database::DatabaseManager::getInstance().findMessageItemsByReceiver( + clientDeviceID); + if (messagesFromDatabase.size() > 0) { + // When a client connects and requests GET for the messages first we check + // if there are undelivered messages in the database. If so, we are + // erasing the messages to deliver from rabbitMQ which are handled by + // DeliveryBroker. + DeliveryBroker::getInstance().erase(clientDeviceID); + } + tunnelbroker::GetResponse response; + for (auto &messageFromDatabase : messagesFromDatabase) { + response.set_fromdeviceid(messageFromDatabase->getFromDeviceID()); + response.set_payload(messageFromDatabase->getPayload()); + if (!writer->Write(response)) { + throw std::runtime_error( + "gRPC: 'Get' writer error on sending data to the client"); + } + response.Clear(); + database::DatabaseManager::getInstance().removeMessageItem( + messageFromDatabase->getMessageID()); + } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); - tunnelbroker::GetResponse response; response.set_fromdeviceid(messageToDeliver.fromDeviceID); response.set_payload(messageToDeliver.payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } + response.Clear(); comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); - if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { - DeliveryBroker::getInstance().erase(clientDeviceID); - } + database::DatabaseManager::getInstance().removeMessageItem( + messageToDeliver.messageID); + // If messages queue for `clientDeviceID` is empty we don't need to store + // `folly::MPMCQueue` for it and need to free memory to fix possible + // 'ghost' queues in DeliveryBroker. + // We call `deleteQueueIfEmpty()` for this purpose here. + DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: "