Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3184529
D3279.id11550.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Referenced Files
None
Subscribers
None
D3279.id11550.diff
View Options
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -21,8 +21,11 @@
public:
static AmqpManager &getInstance();
void connect();
- bool
- send(std::string toDeviceID, std::string fromDeviceID, std::string payload);
+ bool send(
+ std::string messageID,
+ std::string toDeviceID,
+ std::string fromDeviceID,
+ std::string payload);
void ack(uint64_t deliveryTag);
AmqpManager(AmqpManager const &) = delete;
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -61,13 +61,14 @@
try {
AMQP::Table headers = message.headers();
const std::string payload(message.body());
+ const std::string messageID(headers[AMQP_HEADER_MESSAGEID]);
const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]);
const std::string fromDeviceID(
headers[AMQP_HEADER_FROM_DEVICEID]);
std::cout << "AMQP: Message consumed for deviceID: "
<< toDeviceID << std::endl;
DeliveryBroker::getInstance().push(
- deliveryTag, toDeviceID, fromDeviceID, payload);
+ messageID, deliveryTag, toDeviceID, fromDeviceID, payload);
} catch (const std::exception &e) {
std::cout << "AMQP: Message parsing exception: " << e.what()
<< std::endl;
@@ -104,6 +105,7 @@
}
bool AmqpManager::send(
+ std::string messageID,
std::string toDeviceID,
std::string fromDeviceID,
std::string payload) {
@@ -114,6 +116,7 @@
try {
AMQP::Envelope env(payload.c_str(), payload.size());
AMQP::Table headers;
+ headers[AMQP_HEADER_MESSAGEID] = messageID;
headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID;
headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID;
// Set delivery mode to: Durable (2)
diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h
--- a/services/tunnelbroker/src/Constants.h
+++ b/services/tunnelbroker/src/Constants.h
@@ -32,8 +32,9 @@
// queue TTL in case of no consumers (tunnelbroker is down)
const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours
// routing message headers name
-const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid";
-const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid";
+const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID";
+const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
+const std::string AMQP_HEADER_MESSAGEID = "messageID";
const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL =
1000 * 60; // 1 min
@@ -48,5 +49,8 @@
const std::string CONFIG_FILE_PATH =
std::string(std::getenv("HOME")) + "/tunnelbroker/tunnelbroker.ini";
+// DeliveryBroker
+const size_t DELIVERY_BROKER_MAX_QUEUE_SIZE = 100;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
@@ -5,32 +5,28 @@
#include <folly/concurrency/ConcurrentHashMap.h>
-#include <condition_variable>
#include <iostream>
#include <string>
-#include <vector>
namespace comm {
namespace network {
class DeliveryBroker {
- folly::ConcurrentHashMap<std::string, std::vector<DeliveryBrokerMessage>>
+ folly::ConcurrentHashMap<std::string, std::unique_ptr<DeliveryBrokerQueue>>
messagesMap;
- std::mutex localMutex;
- std::condition_variable localCv;
public:
static DeliveryBroker &getInstance();
void push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload);
- std::vector<DeliveryBrokerMessage> get(const std::string deviceID);
- bool isEmpty(const std::string key);
- void remove(const std::string key);
- void wait(const std::string key);
+ bool isEmpty(const std::string deviceID);
+ DeliveryBrokerMessage pop(const std::string deviceID);
+ void erase(const std::string deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
@@ -9,58 +9,56 @@
};
void DeliveryBroker::push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload) {
try {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- std::vector<DeliveryBrokerMessage> messagesList;
- const DeliveryBrokerMessage newMessage = {
- .deliveryTag = deliveryTag,
- .fromDeviceID = fromDeviceID,
- .payload = payload};
-
if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) {
- messagesList.push_back(newMessage);
- this->messagesMap.insert({toDeviceID, messagesList});
- this->localCv.notify_all();
- return;
+ this->messagesMap.insert(
+ toDeviceID, std::make_unique<DeliveryBrokerQueue>());
}
-
- messagesList = this->messagesMap[toDeviceID];
- messagesList.push_back(newMessage);
- this->messagesMap.assign(toDeviceID, messagesList);
- this->localCv.notify_all();
+ if (this->messagesMap.find(toDeviceID)->second->size() >=
+ DELIVERY_BROKER_MAX_QUEUE_SIZE) {
+ throw std::runtime_error(
+ "DeliveryBroker messages queue size for deviceID " + toDeviceID +
+ " is greater than " + DELIVERY_BROKER_MAX_QUEUE_SIZE);
+ }
+ this->messagesMap.find(toDeviceID)
+ ->second->enqueue(DeliveryBrokerMessage{
+ .messageID = messageID,
+ .deliveryTag = deliveryTag,
+ .fromDeviceID = fromDeviceID,
+ .payload = payload});
} catch (const std::exception &e) {
- std::cout << "DeliveryBroker: "
+ std::cout << "DeliveryBroker push: "
<< "Got an exception " << e.what() << std::endl;
- this->localCv.notify_all();
}
};
-std::vector<DeliveryBrokerMessage>
-DeliveryBroker::get(const std::string deviceID) {
+bool DeliveryBroker::isEmpty(const std::string deviceID) {
if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
- return {};
- }
- return this->messagesMap[deviceID];
-};
-
-bool DeliveryBroker::isEmpty(const std::string key) {
- if (this->messagesMap.empty()) {
return true;
- }
- return (this->messagesMap.find(key) == this->messagesMap.end());
+ };
+ return this->messagesMap.find(deviceID)->second->empty();
};
-void DeliveryBroker::remove(const std::string key) {
- this->messagesMap.erase(key);
-};
+DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) {
+ try {
+ if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
+ this->messagesMap.insert(
+ deviceID, std::make_unique<DeliveryBrokerQueue>());
+ }
+ return this->messagesMap.find(deviceID)->second->dequeue();
+ } catch (const std::exception &e) {
+ std::cout << "DeliveryBroker pop: "
+ << "Got an exception " << e.what() << std::endl;
+ }
+}
-void DeliveryBroker::wait(const std::string key) {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); });
+void DeliveryBroker::erase(const std::string deviceID) {
+ this->messagesMap.erase(deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
@@ -1,5 +1,7 @@
#pragma once
+#include <folly/concurrency/UnboundedQueue.h>
+
#include <string>
#include <vector>
@@ -7,11 +9,14 @@
namespace network {
struct DeliveryBrokerMessage {
+ std::string messageID;
uint64_t deliveryTag;
std::string fromDeviceID;
std::string payload;
std::vector<std::string> blobHashes;
};
+typedef folly::UMPMCQueue<DeliveryBrokerMessage, true> DeliveryBrokerQueue;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -7,6 +7,7 @@
#include "DatabaseManager.h"
#include "DeliveryBroker.h"
#include "Tools.h"
+
namespace comm {
namespace network {
@@ -153,7 +154,9 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
+ const std::string messageID = tools::generateUUID();
if (!AmqpManager::getInstance().send(
+ messageID,
request->todeviceid(),
clientDeviceID,
std::string(request->payload()))) {
@@ -195,23 +198,21 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- std::vector<DeliveryBrokerMessage> messagesToDeliver;
+ DeliveryBrokerMessage messageToDeliver;
while (1) {
- messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID);
- for (auto const &message : messagesToDeliver) {
- tunnelbroker::GetResponse response;
- response.set_fromdeviceid(message.fromDeviceID);
- response.set_payload(message.payload);
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC: 'Get' writer error on sending data to the client");
- }
- comm::network::AmqpManager::getInstance().ack(message.deliveryTag);
+ messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
+ tunnelbroker::GetResponse response;
+ response.set_fromdeviceid(messageToDeliver.fromDeviceID);
+ response.set_payload(messageToDeliver.payload);
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
}
- if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().remove(clientDeviceID);
+ comm::network::AmqpManager::getInstance().ack(
+ messageToDeliver.deliveryTag);
+ if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
+ DeliveryBroker::getInstance().erase(clientDeviceID);
}
- DeliveryBroker::getInstance().wait(clientDeviceID);
}
} catch (std::runtime_error &e) {
std::cout << "gRPC: "
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 9, 11:00 AM (17 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2451782
Default Alt Text
D3279.id11550.diff (11 KB)
Attached To
Mode
D3279: [services] Tunnelbroker - Fix delivering messages one by one and remove mutex.
Attached
Detach File
Event Timeline
Log In to Comment