Page MenuHomePhabricator

D3279.id9867.diff
No OneTemporary

D3279.id9867.diff

diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
@@ -21,8 +21,11 @@
public:
static AmqpManager &getInstance();
void connect();
- bool
- send(std::string toDeviceID, std::string fromDeviceID, std::string payload);
+ bool send(
+ std::string messageID,
+ std::string toDeviceID,
+ std::string fromDeviceID,
+ std::string payload);
void ack(uint64_t deliveryTag);
AmqpManager(AmqpManager const &) = delete;
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
@@ -61,13 +61,14 @@
try {
AMQP::Table headers = message.headers();
const std::string payload(message.body());
+ const std::string messageID(headers[AMQP_HEADER_MESSAGEID]);
const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]);
const std::string fromDeviceID(
headers[AMQP_HEADER_FROM_DEVICEID]);
std::cout << "AMQP: Message consumed for deviceID: "
<< toDeviceID << std::endl;
DeliveryBroker::getInstance().push(
- deliveryTag, toDeviceID, fromDeviceID, payload);
+ messageID, deliveryTag, toDeviceID, fromDeviceID, payload);
} catch (const std::exception &e) {
std::cout << "AMQP: Message parsing exception: " << e.what()
<< std::endl;
@@ -104,6 +105,7 @@
}
bool AmqpManager::send(
+ std::string messageID,
std::string toDeviceID,
std::string fromDeviceID,
std::string payload) {
@@ -114,6 +116,7 @@
try {
AMQP::Envelope env(payload.c_str(), payload.size());
AMQP::Table headers;
+ headers[AMQP_HEADER_MESSAGEID] = messageID;
headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID;
headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID;
// Set delivery mode to: Durable (2)
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Constants.h b/services/tunnelbroker/docker-server/contents/server/src/Constants.h
--- a/services/tunnelbroker/docker-server/contents/server/src/Constants.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/Constants.h
@@ -32,8 +32,9 @@
// queue TTL in case of no consumers (tunnelbroker is down)
const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours
// routing message headers name
-const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid";
-const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid";
+const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID";
+const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
+const std::string AMQP_HEADER_MESSAGEID = "messageID";
const long long AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL =
1000 * 60; // 1 min
diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
--- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
@@ -8,14 +8,16 @@
#include <condition_variable>
#include <iostream>
#include <string>
-#include <vector>
+#include <unordered_map>
namespace comm {
namespace network {
class DeliveryBroker {
- folly::ConcurrentHashMap<std::string, std::vector<DeliveryBrokerMessage>>
+ folly::ConcurrentHashMap<
+ std::string,
+ std::unordered_map<std::string, DeliveryBrokerMessage>>
messagesMap;
std::mutex localMutex;
std::condition_variable localCv;
@@ -23,14 +25,17 @@
public:
static DeliveryBroker &getInstance();
void push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload);
- std::vector<DeliveryBrokerMessage> get(const std::string deviceID);
- bool isEmpty(const std::string key);
- void remove(const std::string key);
- void wait(const std::string key);
+ std::unordered_map<std::string, DeliveryBrokerMessage>
+ get(const std::string deviceID);
+ bool isEmpty(const std::string deviceID);
+ void remove(const std::string deviceID, const std::string messageID);
+ void erase(const std::string deviceID);
+ void wait(const std::string deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
@@ -9,37 +9,38 @@
};
void DeliveryBroker::push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload) {
try {
std::unique_lock<std::mutex> localLock(this->localMutex);
- std::vector<DeliveryBrokerMessage> messagesList;
+ std::unordered_map<std::string, DeliveryBrokerMessage> messagesList;
const DeliveryBrokerMessage newMessage = {
.deliveryTag = deliveryTag,
.fromDeviceID = fromDeviceID,
.payload = payload};
if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) {
- messagesList.push_back(newMessage);
+ messagesList[messageID] = newMessage;
this->messagesMap.insert({toDeviceID, messagesList});
this->localCv.notify_all();
return;
}
messagesList = this->messagesMap[toDeviceID];
- messagesList.push_back(newMessage);
+ messagesList[messageID] = newMessage;
this->messagesMap.assign(toDeviceID, messagesList);
this->localCv.notify_all();
} catch (const std::exception &e) {
- std::cout << "DeliveryBroker: "
+ std::cout << "DeliveryBroker push: "
<< "Got an exception " << e.what() << std::endl;
this->localCv.notify_all();
}
};
-std::vector<DeliveryBrokerMessage>
+std::unordered_map<std::string, DeliveryBrokerMessage>
DeliveryBroker::get(const std::string deviceID) {
if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
return {};
@@ -47,20 +48,41 @@
return this->messagesMap[deviceID];
};
-bool DeliveryBroker::isEmpty(const std::string key) {
+bool DeliveryBroker::isEmpty(const std::string deviceID) {
if (this->messagesMap.empty()) {
return true;
}
- return (this->messagesMap.find(key) == this->messagesMap.end());
+ return (this->messagesMap.find(deviceID) == this->messagesMap.end());
};
-void DeliveryBroker::remove(const std::string key) {
- this->messagesMap.erase(key);
+void DeliveryBroker::remove(
+ const std::string deviceID,
+ const std::string messageID) {
+ try {
+ std::unique_lock<std::mutex> localLock(this->localMutex);
+ std::unordered_map<std::string, DeliveryBrokerMessage> messagesList;
+ if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
+ return;
+ }
+ messagesList = this->messagesMap[deviceID];
+ messagesList.erase(messageID);
+ this->messagesMap.assign(deviceID, messagesList);
+ this->localCv.notify_all();
+ } catch (const std::exception &e) {
+ std::cout << "DeliveryBroker remove: "
+ << "Got an exception " << e.what() << std::endl;
+ this->localCv.notify_all();
+ }
+}
+
+void DeliveryBroker::erase(const std::string deviceID) {
+ this->messagesMap.erase(deviceID);
};
-void DeliveryBroker::wait(const std::string key) {
+void DeliveryBroker::wait(const std::string deviceID) {
std::unique_lock<std::mutex> localLock(this->localMutex);
- this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); });
+ this->localCv.wait(
+ localLock, [this, &deviceID] { return !this->isEmpty(deviceID); });
};
} // namespace network
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
@@ -7,6 +7,9 @@
#include "DatabaseManager.h"
#include "DeliveryBroker.h"
#include "Tools.h"
+
+#include <unordered_map>
+
namespace comm {
namespace network {
@@ -152,7 +155,9 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
+ const std::string messageID = generateUUID();
if (!AmqpManager::getInstance().send(
+ messageID,
request->todeviceid(),
clientDeviceID,
std::string(request->payload()))) {
@@ -194,10 +199,10 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- std::vector<DeliveryBrokerMessage> messagesToDeliver;
+ std::unordered_map<std::string, DeliveryBrokerMessage> messagesToDeliver;
while (1) {
messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID);
- for (auto const &message : messagesToDeliver) {
+ for (const auto &[messageID, message] : messagesToDeliver) {
tunnelbroker::GetResponse response;
response.set_fromdeviceid(message.fromDeviceID);
response.set_payload(message.payload);
@@ -206,9 +211,10 @@
"gRPC: 'Get' writer error on sending data to the client");
}
comm::network::AmqpManager::getInstance().ack(message.deliveryTag);
+ DeliveryBroker::getInstance().remove(clientDeviceID, messageID);
}
if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().remove(clientDeviceID);
+ DeliveryBroker::getInstance().erase(clientDeviceID);
}
DeliveryBroker::getInstance().wait(clientDeviceID);
}

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 1, 10:40 PM (18 h, 55 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2606235
Default Alt Text
D3279.id9867.diff (10 KB)

Event Timeline