Page MenuHomePhabricator

D3279.id10998.diff
No OneTemporary

D3279.id10998.diff

diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h
@@ -21,8 +21,11 @@
public:
static AmqpManager &getInstance();
void connect();
- bool
- send(std::string toDeviceID, std::string fromDeviceID, std::string payload);
+ bool send(
+ std::string messageID,
+ std::string toDeviceID,
+ std::string fromDeviceID,
+ std::string payload);
void ack(uint64_t deliveryTag);
AmqpManager(AmqpManager const &) = delete;
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp
@@ -61,13 +61,14 @@
try {
AMQP::Table headers = message.headers();
const std::string payload(message.body());
+ const std::string messageID(headers[AMQP_HEADER_MESSAGEID]);
const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]);
const std::string fromDeviceID(
headers[AMQP_HEADER_FROM_DEVICEID]);
std::cout << "AMQP: Message consumed for deviceID: "
<< toDeviceID << std::endl;
DeliveryBroker::getInstance().push(
- deliveryTag, toDeviceID, fromDeviceID, payload);
+ messageID, deliveryTag, toDeviceID, fromDeviceID, payload);
} catch (const std::exception &e) {
std::cout << "AMQP: Message parsing exception: " << e.what()
<< std::endl;
@@ -104,6 +105,7 @@
}
bool AmqpManager::send(
+ std::string messageID,
std::string toDeviceID,
std::string fromDeviceID,
std::string payload) {
@@ -114,6 +116,7 @@
try {
AMQP::Envelope env(payload.c_str(), payload.size());
AMQP::Table headers;
+ headers[AMQP_HEADER_MESSAGEID] = messageID;
headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID;
headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID;
// Set delivery mode to: Durable (2)
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Constants.h b/services/tunnelbroker/docker-server/contents/server/src/Constants.h
--- a/services/tunnelbroker/docker-server/contents/server/src/Constants.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/Constants.h
@@ -32,8 +32,9 @@
// queue TTL in case of no consumers (tunnelbroker is down)
const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours
// routing message headers name
-const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid";
-const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid";
+const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID";
+const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
+const std::string AMQP_HEADER_MESSAGEID = "messageID";
const long long AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL =
1000 * 60; // 1 min
diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
--- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.h
@@ -3,34 +3,33 @@
#include "Constants.h"
#include "DeliveryBrokerEntites.h"
+#include <folly/MPMCQueue.h>
#include <folly/concurrency/ConcurrentHashMap.h>
-#include <condition_variable>
#include <iostream>
#include <string>
-#include <vector>
namespace comm {
namespace network {
class DeliveryBroker {
- folly::ConcurrentHashMap<std::string, std::vector<DeliveryBrokerMessage>>
+ folly::ConcurrentHashMap<
+ std::string,
+ std::unique_ptr<folly::MPMCQueue<DeliveryBrokerMessage>>>
messagesMap;
- std::mutex localMutex;
- std::condition_variable localCv;
public:
static DeliveryBroker &getInstance();
void push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload);
- std::vector<DeliveryBrokerMessage> get(const std::string deviceID);
- bool isEmpty(const std::string key);
- void remove(const std::string key);
- void wait(const std::string key);
+ bool isEmpty(const std::string deviceID);
+ DeliveryBrokerMessage pop(const std::string deviceID);
+ void erase(const std::string deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
@@ -9,58 +9,54 @@
};
void DeliveryBroker::push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload) {
try {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- std::vector<DeliveryBrokerMessage> messagesList;
+
const DeliveryBrokerMessage newMessage = {
+ .messageID = messageID,
.deliveryTag = deliveryTag,
.fromDeviceID = fromDeviceID,
.payload = payload};
if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) {
- messagesList.push_back(newMessage);
- this->messagesMap.insert({toDeviceID, messagesList});
- this->localCv.notify_all();
- return;
+ this->messagesMap.insert(
+ toDeviceID,
+ std::make_unique<folly::MPMCQueue<DeliveryBrokerMessage>>(100));
}
-
- messagesList = this->messagesMap[toDeviceID];
- messagesList.push_back(newMessage);
- this->messagesMap.assign(toDeviceID, messagesList);
- this->localCv.notify_all();
+ *(this->messagesMap).find(toDeviceID)->second->blockingWrite(newMessage);
} catch (const std::exception &e) {
- std::cout << "DeliveryBroker: "
+ std::cout << "DeliveryBroker push: "
<< "Got an exception " << e.what() << std::endl;
- this->localCv.notify_all();
- }
-};
-
-std::vector<DeliveryBrokerMessage>
-DeliveryBroker::get(const std::string deviceID) {
- if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
- return {};
}
- return this->messagesMap[deviceID];
};
-bool DeliveryBroker::isEmpty(const std::string key) {
+bool DeliveryBroker::isEmpty(const std::string deviceID) {
if (this->messagesMap.empty()) {
return true;
}
- return (this->messagesMap.find(key) == this->messagesMap.end());
+ return (this->messagesMap.find(deviceID) == this->messagesMap.end());
};
-void DeliveryBroker::remove(const std::string key) {
- this->messagesMap.erase(key);
-};
+DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) {
+ try {
+ DeliveryBrokerMessage message;
+ if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
+ return message;
+ }
+ *(this->messagesMap).find(deviceID)->second->blockingRead(message);
+ return message;
+ } catch (const std::exception &e) {
+ std::cout << "DeliveryBroker remove: "
+ << "Got an exception " << e.what() << std::endl;
+ }
+}
-void DeliveryBroker::wait(const std::string key) {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); });
+void DeliveryBroker::erase(const std::string deviceID) {
+ this->messagesMap.erase(deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h
--- a/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h
+++ b/services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBrokerEntites.h
@@ -7,6 +7,7 @@
namespace network {
struct DeliveryBrokerMessage {
+ std::string messageID;
uint64_t deliveryTag;
std::string fromDeviceID;
std::string payload;
diff --git a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
@@ -7,6 +7,9 @@
#include "DatabaseManager.h"
#include "DeliveryBroker.h"
#include "Tools.h"
+
+#include <unordered_map>
+
namespace comm {
namespace network {
@@ -152,7 +155,9 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
+ const std::string messageID = generateUUID();
if (!AmqpManager::getInstance().send(
+ messageID,
request->todeviceid(),
clientDeviceID,
std::string(request->payload()))) {
@@ -194,23 +199,21 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- std::vector<DeliveryBrokerMessage> messagesToDeliver;
+ DeliveryBrokerMessage messageToDeliver;
while (1) {
- messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID);
- for (auto const &message : messagesToDeliver) {
- tunnelbroker::GetResponse response;
- response.set_fromdeviceid(message.fromDeviceID);
- response.set_payload(message.payload);
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC: 'Get' writer error on sending data to the client");
- }
- comm::network::AmqpManager::getInstance().ack(message.deliveryTag);
+ messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
+ tunnelbroker::GetResponse response;
+ response.set_fromdeviceid(messageToDeliver.fromDeviceID);
+ response.set_payload(messageToDeliver.payload);
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
}
+ comm::network::AmqpManager::getInstance().ack(
+ messageToDeliver.deliveryTag);
if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().remove(clientDeviceID);
+ DeliveryBroker::getInstance().erase(clientDeviceID);
}
- DeliveryBroker::getInstance().wait(clientDeviceID);
}
} catch (std::runtime_error &e) {
std::cout << "gRPC: "

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 1, 9:59 PM (17 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2606191
Default Alt Text
D3279.id10998.diff (11 KB)

Event Timeline