Page MenuHomePhabricator

D3833.id13121.diff
No OneTemporary

D3833.id13121.diff

diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -154,6 +154,14 @@
}
const std::string clientDeviceID = sessionItem->getDeviceID();
const std::string messageID = tools::generateUUID();
+
+ const database::MessageItem message(
+ messageID,
+ clientDeviceID,
+ request->todeviceid(),
+ request->payload(),
+ "");
+ database::DatabaseManager::getInstance().putMessageItem(message);
if (!AmqpManager::getInstance().send(
messageID,
clientDeviceID,
@@ -197,20 +205,47 @@
}
const std::string clientDeviceID = sessionItem->getDeviceID();
DeliveryBrokerMessage messageToDeliver;
+
+ std::vector<std::shared_ptr<database::MessageItem>> messagesFromDatabase =
+ database::DatabaseManager::getInstance().findMessageItemsByReceiver(
+ clientDeviceID);
+ if (messagesFromDatabase.size() > 0) {
+ // When a client connects and requests GET for the messages first we check
+ // if there are undelivered messages in the database. If so, we are
+ // erasing the messages to deliver from rabbitMQ which are handled by
+ // DeliveryBroker.
+ DeliveryBroker::getInstance().erase(clientDeviceID);
+ }
+ tunnelbroker::GetResponse response;
+ for (auto &messageFromDatabase : messagesFromDatabase) {
+ response.set_fromdeviceid(messageFromDatabase->getFromDeviceID());
+ response.set_payload(messageFromDatabase->getPayload());
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
+ }
+ response.Clear();
+ database::DatabaseManager::getInstance().removeMessageItem(
+ messageFromDatabase->getMessageID());
+ }
while (1) {
messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
- tunnelbroker::GetResponse response;
response.set_fromdeviceid(messageToDeliver.fromDeviceID);
response.set_payload(messageToDeliver.payload);
if (!writer->Write(response)) {
throw std::runtime_error(
"gRPC: 'Get' writer error on sending data to the client");
}
+ response.Clear();
comm::network::AmqpManager::getInstance().ack(
messageToDeliver.deliveryTag);
- if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().erase(clientDeviceID);
- }
+ database::DatabaseManager::getInstance().removeMessageItem(
+ messageToDeliver.messageID);
+ // If messages queue for `clientDeviceID` is empty we don't need to store
+ // `folly::MPMCQueue` for it and need to free memory to fix possible
+ // 'ghost' queues in DeliveryBroker.
+ // We call `deleteQueueIfEmpty()` for this purpose here.
+ DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID);
}
} catch (std::runtime_error &e) {
LOG(ERROR) << "gRPC: "

File Metadata

Mime Type
text/plain
Expires
Sun, Sep 22, 12:41 AM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2166153
Default Alt Text
D3833.id13121.diff (3 KB)

Event Timeline