Page MenuHomePhabricator

D3279.id11550.diff
No OneTemporary

D3279.id11550.diff

diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -21,8 +21,11 @@
public:
static AmqpManager &getInstance();
void connect();
- bool
- send(std::string toDeviceID, std::string fromDeviceID, std::string payload);
+ bool send(
+ std::string messageID,
+ std::string toDeviceID,
+ std::string fromDeviceID,
+ std::string payload);
void ack(uint64_t deliveryTag);
AmqpManager(AmqpManager const &) = delete;
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -61,13 +61,14 @@
try {
AMQP::Table headers = message.headers();
const std::string payload(message.body());
+ const std::string messageID(headers[AMQP_HEADER_MESSAGEID]);
const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]);
const std::string fromDeviceID(
headers[AMQP_HEADER_FROM_DEVICEID]);
std::cout << "AMQP: Message consumed for deviceID: "
<< toDeviceID << std::endl;
DeliveryBroker::getInstance().push(
- deliveryTag, toDeviceID, fromDeviceID, payload);
+ messageID, deliveryTag, toDeviceID, fromDeviceID, payload);
} catch (const std::exception &e) {
std::cout << "AMQP: Message parsing exception: " << e.what()
<< std::endl;
@@ -104,6 +105,7 @@
}
bool AmqpManager::send(
+ std::string messageID,
std::string toDeviceID,
std::string fromDeviceID,
std::string payload) {
@@ -114,6 +116,7 @@
try {
AMQP::Envelope env(payload.c_str(), payload.size());
AMQP::Table headers;
+ headers[AMQP_HEADER_MESSAGEID] = messageID;
headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID;
headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID;
// Set delivery mode to: Durable (2)
diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h
--- a/services/tunnelbroker/src/Constants.h
+++ b/services/tunnelbroker/src/Constants.h
@@ -32,8 +32,9 @@
// queue TTL in case of no consumers (tunnelbroker is down)
const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours
// routing message headers name
-const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid";
-const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid";
+const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID";
+const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
+const std::string AMQP_HEADER_MESSAGEID = "messageID";
const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL =
1000 * 60; // 1 min
@@ -48,5 +49,8 @@
const std::string CONFIG_FILE_PATH =
std::string(std::getenv("HOME")) + "/tunnelbroker/tunnelbroker.ini";
+// DeliveryBroker
+const size_t DELIVERY_BROKER_MAX_QUEUE_SIZE = 100;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
@@ -5,32 +5,28 @@
#include <folly/concurrency/ConcurrentHashMap.h>
-#include <condition_variable>
#include <iostream>
#include <string>
-#include <vector>
namespace comm {
namespace network {
class DeliveryBroker {
- folly::ConcurrentHashMap<std::string, std::vector<DeliveryBrokerMessage>>
+ folly::ConcurrentHashMap<std::string, std::unique_ptr<DeliveryBrokerQueue>>
messagesMap;
- std::mutex localMutex;
- std::condition_variable localCv;
public:
static DeliveryBroker &getInstance();
void push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload);
- std::vector<DeliveryBrokerMessage> get(const std::string deviceID);
- bool isEmpty(const std::string key);
- void remove(const std::string key);
- void wait(const std::string key);
+ bool isEmpty(const std::string deviceID);
+ DeliveryBrokerMessage pop(const std::string deviceID);
+ void erase(const std::string deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
@@ -9,58 +9,56 @@
};
void DeliveryBroker::push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload) {
try {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- std::vector<DeliveryBrokerMessage> messagesList;
- const DeliveryBrokerMessage newMessage = {
- .deliveryTag = deliveryTag,
- .fromDeviceID = fromDeviceID,
- .payload = payload};
-
if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) {
- messagesList.push_back(newMessage);
- this->messagesMap.insert({toDeviceID, messagesList});
- this->localCv.notify_all();
- return;
+ this->messagesMap.insert(
+ toDeviceID, std::make_unique<DeliveryBrokerQueue>());
}
-
- messagesList = this->messagesMap[toDeviceID];
- messagesList.push_back(newMessage);
- this->messagesMap.assign(toDeviceID, messagesList);
- this->localCv.notify_all();
+ if (this->messagesMap.find(toDeviceID)->second->size() >=
+ DELIVERY_BROKER_MAX_QUEUE_SIZE) {
+ throw std::runtime_error(
+ "DeliveryBroker messages queue size for deviceID " + toDeviceID +
+ " is greater than " + DELIVERY_BROKER_MAX_QUEUE_SIZE);
+ }
+ this->messagesMap.find(toDeviceID)
+ ->second->enqueue(DeliveryBrokerMessage{
+ .messageID = messageID,
+ .deliveryTag = deliveryTag,
+ .fromDeviceID = fromDeviceID,
+ .payload = payload});
} catch (const std::exception &e) {
- std::cout << "DeliveryBroker: "
+ std::cout << "DeliveryBroker push: "
<< "Got an exception " << e.what() << std::endl;
- this->localCv.notify_all();
}
};
-std::vector<DeliveryBrokerMessage>
-DeliveryBroker::get(const std::string deviceID) {
+bool DeliveryBroker::isEmpty(const std::string deviceID) {
if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
- return {};
- }
- return this->messagesMap[deviceID];
-};
-
-bool DeliveryBroker::isEmpty(const std::string key) {
- if (this->messagesMap.empty()) {
return true;
- }
- return (this->messagesMap.find(key) == this->messagesMap.end());
+ };
+ return this->messagesMap.find(deviceID)->second->empty();
};
-void DeliveryBroker::remove(const std::string key) {
- this->messagesMap.erase(key);
-};
+DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) {
+ try {
+ if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
+ this->messagesMap.insert(
+ deviceID, std::make_unique<DeliveryBrokerQueue>());
+ }
+ return this->messagesMap.find(deviceID)->second->dequeue();
+ } catch (const std::exception &e) {
+ std::cout << "DeliveryBroker pop: "
+ << "Got an exception " << e.what() << std::endl;
+ }
+}
-void DeliveryBroker::wait(const std::string key) {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); });
+void DeliveryBroker::erase(const std::string deviceID) {
+ this->messagesMap.erase(deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
@@ -1,5 +1,7 @@
#pragma once
+#include <folly/concurrency/UnboundedQueue.h>
+
#include <string>
#include <vector>
@@ -7,11 +9,14 @@
namespace network {
struct DeliveryBrokerMessage {
+ std::string messageID;
uint64_t deliveryTag;
std::string fromDeviceID;
std::string payload;
std::vector<std::string> blobHashes;
};
+typedef folly::UMPMCQueue<DeliveryBrokerMessage, true> DeliveryBrokerQueue;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -7,6 +7,7 @@
#include "DatabaseManager.h"
#include "DeliveryBroker.h"
#include "Tools.h"
+
namespace comm {
namespace network {
@@ -153,7 +154,9 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
+ const std::string messageID = tools::generateUUID();
if (!AmqpManager::getInstance().send(
+ messageID,
request->todeviceid(),
clientDeviceID,
std::string(request->payload()))) {
@@ -195,23 +198,21 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- std::vector<DeliveryBrokerMessage> messagesToDeliver;
+ DeliveryBrokerMessage messageToDeliver;
while (1) {
- messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID);
- for (auto const &message : messagesToDeliver) {
- tunnelbroker::GetResponse response;
- response.set_fromdeviceid(message.fromDeviceID);
- response.set_payload(message.payload);
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC: 'Get' writer error on sending data to the client");
- }
- comm::network::AmqpManager::getInstance().ack(message.deliveryTag);
+ messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
+ tunnelbroker::GetResponse response;
+ response.set_fromdeviceid(messageToDeliver.fromDeviceID);
+ response.set_payload(messageToDeliver.payload);
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
}
- if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().remove(clientDeviceID);
+ comm::network::AmqpManager::getInstance().ack(
+ messageToDeliver.deliveryTag);
+ if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
+ DeliveryBroker::getInstance().erase(clientDeviceID);
}
- DeliveryBroker::getInstance().wait(clientDeviceID);
}
} catch (std::runtime_error &e) {
std::cout << "gRPC: "

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 9, 11:00 AM (17 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2451782
Default Alt Text
D3279.id11550.diff (11 KB)

Event Timeline