Page MenuHomePhabricator

D3833.id13005.diff
No OneTemporary

D3833.id13005.diff

diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -153,7 +153,15 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- const std::string messageID = generateUUID();
+ const std::string messageID = tools::generateUUID();
+
+ const database::MessageItem message(
+ messageID,
+ clientDeviceID,
+ request->todeviceid(),
+ request->payload(),
+ "");
+ database::DatabaseManager::getInstance().putMessageItem(message);
if (!AmqpManager::getInstance().send(
messageID,
clientDeviceID,
@@ -197,6 +205,28 @@
}
const std::string clientDeviceID = sessionItem->getDeviceID();
DeliveryBrokerMessage messageToDeliver;
+
+ std::vector<std::shared_ptr<database::MessageItem>> messagesFromDatabase =
+ database::DatabaseManager::getInstance().findMessageItemsByReceiver(
+ clientDeviceID);
+ if (messagesFromDatabase.size() > 0) {
+ // When a client connects and requests GET for the messages first we check
+ // if there are undelivered messages in the database. If so, we are
+ // erasing the messages to deliver from rabbitMQ which are handled by
+ // DeliveryBroker.
+ DeliveryBroker::getInstance().erase(clientDeviceID);
+ }
+ for (auto &messageFromDatabase : messagesFromDatabase) {
+ tunnelbroker::GetResponse response;
+ response.set_fromdeviceid(messageFromDatabase->getFromDeviceID());
+ response.set_payload(messageFromDatabase->getPayload());
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
+ }
+ database::DatabaseManager::getInstance().removeMessageItem(
+ messageFromDatabase->getMessageID());
+ }
while (1) {
messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
tunnelbroker::GetResponse response;
@@ -208,9 +238,9 @@
}
comm::network::AmqpManager::getInstance().ack(
messageToDeliver.deliveryTag);
- if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().erase(clientDeviceID);
- }
+ database::DatabaseManager::getInstance().removeMessageItem(
+ messageToDeliver.messageID);
+ DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID);
}
} catch (std::runtime_error &e) {
LOG(ERROR) << "gRPC: "

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 1, 1:16 PM (19 h, 24 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2605089
Default Alt Text
D3833.id13005.diff (2 KB)

Event Timeline