diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index 1b6c6d8ad..cb96a6ee4 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,141 +1,148 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } +void AmqpManager::init() { + std::call_once(initOnceFlag, [&]() { + std::thread amqpClientThread([&]() { this->connect(); }); + amqpClientThread.detach(); + }); +} + void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); LOG(INFO) << "AMQP: Connecting to " << amqpUri; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&connection); this->amqpChannel->onError([this](const char *message) { LOG(ERROR) << "AMQP: channel error: " << message << ", will try to reconnect"; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = (uint64_t)AMQP_MESSAGE_TTL; arguments["x-expires"] = (uint64_t)AMQP_QUEUE_TTL; this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { LOG(INFO) << "AMQP: Queue " << name << " created"; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { LOG(ERROR) << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName; this->amqpReady = false; }); this->amqpReady = true; this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body(), message.bodySize()); const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); DeliveryBroker::getInstance().push( messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { LOG(ERROR) << "AMQP: Message parsing exception: " << e.what(); } }) .onError([](const char *message) { LOG(ERROR) << "AMQP: Error on message consume: " << message; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { while (true) { int64_t currentTimestamp = tools::getCurrentTimestamp(); if (this->lastConnectionTimestamp && currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } this->lastConnectionTimestamp = currentTimestamp; this->connectInternal(); } } bool AmqpManager::send(const database::MessageItem *message) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message send error: channel not ready"; return false; } try { const std::string messagePayload = message->getPayload(); AMQP::Envelope env(messagePayload.c_str(), messagePayload.size()); AMQP::Table headers; headers[AMQP_HEADER_MESSAGEID] = message->getMessageID(); headers[AMQP_HEADER_FROM_DEVICEID] = message->getFromDeviceID(); headers[AMQP_HEADER_TO_DEVICEID] = message->getToDeviceID(); // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { LOG(ERROR) << "AMQP: Error while publishing message: " << e.what(); return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message ACK error: channel not ready"; return; } this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h index 5109aa4e3..d622c64ab 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -1,34 +1,36 @@ #pragma once #include "DatabaseManager.h" #include #include #include #include #include namespace comm { namespace network { class AmqpManager { AmqpManager(){}; + std::once_flag initOnceFlag; std::unique_ptr amqpChannel; std::atomic amqpReady; std::atomic lastConnectionTimestamp; void connectInternal(); + void connect(); public: static AmqpManager &getInstance(); - void connect(); + void init(); bool send(const database::MessageItem *message); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; void operator=(AmqpManager const &) = delete; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/server.cpp b/services/tunnelbroker/src/server.cpp index cde07b943..de5215443 100644 --- a/services/tunnelbroker/src/server.cpp +++ b/services/tunnelbroker/src/server.cpp @@ -1,49 +1,44 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "GlobalTools.h" #include "TunnelbrokerServiceImpl.h" #include "GlobalConstants.h" #include #include #include #include namespace comm { namespace network { void RunServer() { TunnelBrokerServiceImpl service; grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort( SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials()); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); LOG(INFO) << "server listening at :" << SERVER_LISTEN_ADDRESS; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. server->Wait(); } -void RunAmqpClient() { - AmqpManager::getInstance().connect(); -} - } // namespace network } // namespace comm int main(int argc, char **argv) { comm::network::tools::InitLogging("tunnelbroker"); comm::network::config::ConfigManager::getInstance().load(); - std::thread amqpThread(comm::network::RunAmqpClient); + comm::network::AmqpManager::getInstance().init(); std::thread grpcThread(comm::network::RunServer); - amqpThread.join(); grpcThread.join(); return 0; } diff --git a/services/tunnelbroker/test/AmqpManagerTest.cpp b/services/tunnelbroker/test/AmqpManagerTest.cpp index b2f4b1adb..9b861264e 100644 --- a/services/tunnelbroker/test/AmqpManagerTest.cpp +++ b/services/tunnelbroker/test/AmqpManagerTest.cpp @@ -1,74 +1,74 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class AmqpManagerTest : public testing::Test { protected: virtual void SetUp() { config::ConfigManager::getInstance().load(); - std::thread amqpThread([]() { AmqpManager::getInstance().connect(); }); + AmqpManager::getInstance().init(); } }; TEST_F(AmqpManagerTest, SentAndPopedMessagesAreSameOnStaticData) { const std::string messageID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002"; const std::string fromDeviceID = "web:JouLWf84zqRIsjBdHLOcHS9M4eSCz7VF84wT1uOD83u1qxDAqmqI4swmxNINjuhd"; const std::string toDeviceID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const std::string payload = "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe"; const database::MessageItem messageItem{ messageID, fromDeviceID, toDeviceID, payload, ""}; EXPECT_EQ(AmqpManager::getInstance().send(&messageItem), true); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(messageID, receivedMessage.messageID); EXPECT_EQ(fromDeviceID, receivedMessage.fromDeviceID); EXPECT_EQ(payload, receivedMessage.payload); AmqpManager::getInstance().ack(receivedMessage.deliveryTag); } TEST_F(AmqpManagerTest, SentAndPopedMessagesAreSameOnGeneratedData) { const std::string messageID = tools::generateUUID(); const std::string fromDeviceID = "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); const std::string toDeviceID = "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); const std::string payload = tools::generateRandomString(512); const database::MessageItem messageItem{ messageID, fromDeviceID, toDeviceID, payload, ""}; EXPECT_EQ(AmqpManager::getInstance().send(&messageItem), true); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(messageID, receivedMessage.messageID) << "Generated messageID \"" << messageID << "\" differs from what was got from amqp message " << receivedMessage.messageID; EXPECT_EQ(fromDeviceID, receivedMessage.fromDeviceID) << "Generated FromDeviceID \"" << fromDeviceID << "\" differs from what was got from amqp message " << receivedMessage.fromDeviceID; EXPECT_EQ(payload, receivedMessage.payload) << "Generated Payload \"" << payload << "\" differs from what was got from amqp message " << receivedMessage.payload; AmqpManager::getInstance().ack(receivedMessage.deliveryTag); }