diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -19,10 +19,11 @@ std::atomic amqpReady; std::atomic lastConnectionTimestamp; void connectInternal(); + void connect(); public: static AmqpManager &getInstance(); - void connect(); + void init(); bool send(const database::MessageItem *message); void ack(uint64_t deliveryTag); diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -17,6 +17,16 @@ return instance; } +void AmqpManager::init() { + if (this->amqpChannel != nullptr) { + LOG(INFO) << "Skipping AMQP initialization because the channel is already " + "initialized"; + return; + } + std::thread amqpClientThread([&]() { connect(); }); + amqpClientThread.detach(); +} + void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); diff --git a/services/tunnelbroker/src/server.cpp b/services/tunnelbroker/src/server.cpp --- a/services/tunnelbroker/src/server.cpp +++ b/services/tunnelbroker/src/server.cpp @@ -32,19 +32,14 @@ server->Wait(); } -void RunAmqpClient() { - AmqpManager::getInstance().connect(); -} - } // namespace network } // namespace comm int main(int argc, char **argv) { comm::network::tools::InitLogging("tunnelbroker"); comm::network::config::ConfigManager::getInstance().load(); - std::thread amqpThread(comm::network::RunAmqpClient); + comm::network::AmqpManager::getInstance().init(); std::thread grpcThread(comm::network::RunServer); - amqpThread.join(); grpcThread.join(); return 0; } diff --git a/services/tunnelbroker/test/AmqpManagerTest.cpp b/services/tunnelbroker/test/AmqpManagerTest.cpp --- a/services/tunnelbroker/test/AmqpManagerTest.cpp +++ b/services/tunnelbroker/test/AmqpManagerTest.cpp @@ -16,7 +16,7 @@ protected: virtual void SetUp() { config::ConfigManager::getInstance().load(); - std::thread amqpThread([]() { AmqpManager::getInstance().connect(); }); + AmqpManager::getInstance().init(); } };