Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3381654
D3279.id11848.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Referenced Files
None
Subscribers
None
D3279.id11848.diff
View Options
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -21,8 +21,11 @@
public:
static AmqpManager &getInstance();
void connect();
- bool
- send(std::string toDeviceID, std::string fromDeviceID, std::string payload);
+ bool send(
+ std::string messageID,
+ std::string fromDeviceID,
+ std::string toDeviceID,
+ std::string payload);
void ack(uint64_t deliveryTag);
AmqpManager(AmqpManager const &) = delete;
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -61,13 +61,14 @@
try {
AMQP::Table headers = message.headers();
const std::string payload(message.body());
+ const std::string messageID(headers[AMQP_HEADER_MESSAGEID]);
const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]);
const std::string fromDeviceID(
headers[AMQP_HEADER_FROM_DEVICEID]);
std::cout << "AMQP: Message consumed for deviceID: "
<< toDeviceID << std::endl;
DeliveryBroker::getInstance().push(
- deliveryTag, toDeviceID, fromDeviceID, payload);
+ messageID, deliveryTag, toDeviceID, fromDeviceID, payload);
} catch (const std::exception &e) {
std::cout << "AMQP: Message parsing exception: " << e.what()
<< std::endl;
@@ -104,8 +105,9 @@
}
bool AmqpManager::send(
- std::string toDeviceID,
+ std::string messageID,
std::string fromDeviceID,
+ std::string toDeviceID,
std::string payload) {
if (!this->amqpReady) {
std::cout << "AMQP: Message send error: channel not ready" << std::endl;
@@ -114,6 +116,7 @@
try {
AMQP::Envelope env(payload.c_str(), payload.size());
AMQP::Table headers;
+ headers[AMQP_HEADER_MESSAGEID] = messageID;
headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID;
headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID;
// Set delivery mode to: Durable (2)
diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h
--- a/services/tunnelbroker/src/Constants.h
+++ b/services/tunnelbroker/src/Constants.h
@@ -32,8 +32,9 @@
// queue TTL in case of no consumers (tunnelbroker is down)
const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours
// routing message headers name
-const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid";
-const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid";
+const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID";
+const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
+const std::string AMQP_HEADER_MESSAGEID = "messageID";
const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL =
1000 * 60; // 1 min
@@ -48,5 +49,8 @@
const std::string CONFIG_FILE_PATH =
std::string(std::getenv("HOME")) + "/tunnelbroker/tunnelbroker.ini";
+// DeliveryBroker
+const size_t DELIVERY_BROKER_MAX_QUEUE_SIZE = 100;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.h
@@ -5,32 +5,28 @@
#include <folly/concurrency/ConcurrentHashMap.h>
-#include <condition_variable>
#include <iostream>
#include <string>
-#include <vector>
namespace comm {
namespace network {
class DeliveryBroker {
- folly::ConcurrentHashMap<std::string, std::vector<DeliveryBrokerMessage>>
+ folly::ConcurrentHashMap<std::string, std::unique_ptr<DeliveryBrokerQueue>>
messagesMap;
- std::mutex localMutex;
- std::condition_variable localCv;
public:
static DeliveryBroker &getInstance();
void push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload);
- std::vector<DeliveryBrokerMessage> get(const std::string deviceID);
- bool isEmpty(const std::string key);
- void remove(const std::string key);
- void wait(const std::string key);
+ bool isEmpty(const std::string deviceID);
+ DeliveryBrokerMessage pop(const std::string deviceID);
+ void erase(const std::string deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
@@ -9,58 +9,59 @@
};
void DeliveryBroker::push(
+ const std::string messageID,
const uint64_t deliveryTag,
const std::string toDeviceID,
const std::string fromDeviceID,
const std::string payload) {
try {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- std::vector<DeliveryBrokerMessage> messagesList;
- const DeliveryBrokerMessage newMessage = {
- .deliveryTag = deliveryTag,
- .fromDeviceID = fromDeviceID,
- .payload = payload};
-
if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) {
- messagesList.push_back(newMessage);
- this->messagesMap.insert({toDeviceID, messagesList});
- this->localCv.notify_all();
- return;
+ this->messagesMap.insert(
+ toDeviceID,
+ std::make_unique<DeliveryBrokerQueue>(
+ DELIVERY_BROKER_MAX_QUEUE_SIZE));
}
-
- messagesList = this->messagesMap[toDeviceID];
- messagesList.push_back(newMessage);
- this->messagesMap.assign(toDeviceID, messagesList);
- this->localCv.notify_all();
+ this->messagesMap.find(toDeviceID)
+ ->second->blockingWrite(DeliveryBrokerMessage{
+ .messageID = messageID,
+ .deliveryTag = deliveryTag,
+ .fromDeviceID = fromDeviceID,
+ .payload = payload});
} catch (const std::exception &e) {
- std::cout << "DeliveryBroker: "
+ std::cout << "DeliveryBroker push: "
<< "Got an exception " << e.what() << std::endl;
- this->localCv.notify_all();
}
};
-std::vector<DeliveryBrokerMessage>
-DeliveryBroker::get(const std::string deviceID) {
+bool DeliveryBroker::isEmpty(const std::string deviceID) {
if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
- return {};
- }
- return this->messagesMap[deviceID];
-};
-
-bool DeliveryBroker::isEmpty(const std::string key) {
- if (this->messagesMap.empty()) {
return true;
- }
- return (this->messagesMap.find(key) == this->messagesMap.end());
+ };
+ return this->messagesMap.find(deviceID)->second->isEmpty();
};
-void DeliveryBroker::remove(const std::string key) {
- this->messagesMap.erase(key);
+DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) {
+ try {
+ // If we don't already have a queue, insert it for the blocking read purpose
+ // in case we listen first before the insert happens.
+ if (this->messagesMap.find(deviceID) == this->messagesMap.end()) {
+ this->messagesMap.insert(
+ deviceID,
+ std::make_unique<DeliveryBrokerQueue>(
+ DELIVERY_BROKER_MAX_QUEUE_SIZE));
+ }
+ DeliveryBrokerMessage receievedMessage;
+ this->messagesMap.find(deviceID)->second->blockingRead(receievedMessage);
+ return receievedMessage;
+ } catch (const std::exception &e) {
+ std::cout << "DeliveryBroker pop: "
+ << "Got an exception " << e.what() << std::endl;
+ }
+ return {};
};
-void DeliveryBroker::wait(const std::string key) {
- std::unique_lock<std::mutex> localLock(this->localMutex);
- this->localCv.wait(localLock, [this, &key] { return !this->isEmpty(key); });
+void DeliveryBroker::erase(const std::string deviceID) {
+ this->messagesMap.erase(deviceID);
};
} // namespace network
diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
--- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
+++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBrokerEntites.h
@@ -1,5 +1,7 @@
#pragma once
+#include <folly/MPMCQueue.h>
+
#include <string>
#include <vector>
@@ -7,11 +9,14 @@
namespace network {
struct DeliveryBrokerMessage {
+ std::string messageID;
uint64_t deliveryTag;
std::string fromDeviceID;
std::string payload;
std::vector<std::string> blobHashes;
};
+typedef folly::MPMCQueue<DeliveryBrokerMessage> DeliveryBrokerQueue;
+
} // namespace network
} // namespace comm
diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -7,6 +7,7 @@
#include "DatabaseManager.h"
#include "DeliveryBroker.h"
#include "Tools.h"
+
namespace comm {
namespace network {
@@ -153,9 +154,11 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
+ const std::string messageID = tools::generateUUID();
if (!AmqpManager::getInstance().send(
- request->todeviceid(),
+ messageID,
clientDeviceID,
+ request->todeviceid(),
std::string(request->payload()))) {
std::cout << "gRPC: "
<< "Error while publish the message to AMQP" << std::endl;
@@ -195,23 +198,21 @@
"No such session found. SessionID: " + sessionID);
}
const std::string clientDeviceID = sessionItem->getDeviceID();
- std::vector<DeliveryBrokerMessage> messagesToDeliver;
+ DeliveryBrokerMessage messageToDeliver;
while (1) {
- messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID);
- for (auto const &message : messagesToDeliver) {
- tunnelbroker::GetResponse response;
- response.set_fromdeviceid(message.fromDeviceID);
- response.set_payload(message.payload);
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC: 'Get' writer error on sending data to the client");
- }
- comm::network::AmqpManager::getInstance().ack(message.deliveryTag);
+ messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
+ tunnelbroker::GetResponse response;
+ response.set_fromdeviceid(messageToDeliver.fromDeviceID);
+ response.set_payload(messageToDeliver.payload);
+ if (!writer->Write(response)) {
+ throw std::runtime_error(
+ "gRPC: 'Get' writer error on sending data to the client");
}
- if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
- DeliveryBroker::getInstance().remove(clientDeviceID);
+ comm::network::AmqpManager::getInstance().ack(
+ messageToDeliver.deliveryTag);
+ if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) {
+ DeliveryBroker::getInstance().erase(clientDeviceID);
}
- DeliveryBroker::getInstance().wait(clientDeviceID);
}
} catch (std::runtime_error &e) {
std::cout << "gRPC: "
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Nov 29, 5:43 AM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2596076
Default Alt Text
D3279.id11848.diff (11 KB)
Attached To
Mode
D3279: [services] Tunnelbroker - Fix delivering messages one by one and remove mutex.
Attached
Detach File
Event Timeline
Log In to Comment