diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index e6a03cbe3..0bd6420b1 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,155 +1,151 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include #include #include #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } void AmqpManager::init() { std::call_once(initOnceFlag, [&]() { std::thread amqpClientThread([&]() { this->connect(); }); amqpClientThread.detach(); }); } void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); LOG(INFO) << "AMQP: Connecting to " << amqpUri; uv_loop_t *localUvLoop = uv_default_loop(); AMQP::LibUvHandler uvHandler(localUvLoop); AMQP::TcpConnection tcpConnection(&uvHandler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&tcpConnection); this->amqpChannel->onReady([this]() { LOG(INFO) << "AMQP: Channel is ready"; this->amqpReady = true; + this->reconnectAttempt = 0; }); this->amqpChannel->onError([this](const char *message) { - LOG(ERROR) << "AMQP: channel error: " << message - << ", will try to reconnect"; + LOG(ERROR) << "AMQP: Channel error: " << message; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = (uint64_t)AMQP_MESSAGE_TTL; arguments["x-expires"] = (uint64_t)AMQP_QUEUE_TTL; this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { LOG(INFO) << "AMQP: Queue " << name << " created"; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { LOG(ERROR) << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName; }); this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body(), message.bodySize()); const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); DeliveryBroker::getInstance().push( messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { LOG(ERROR) << "AMQP: Message parsing exception: " << e.what(); } }) .onError([](const char *message) { LOG(ERROR) << "AMQP: Error on message consume: " << message; }); }) .onError([](const char *message) { - throw std::runtime_error( - "AMQP: Queue creation error: " + std::string(message)); + LOG(ERROR) << "AMQP: Queue creation error: " + std::string(message); }); uv_run(localUvLoop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { - while (true) { - int64_t currentTimestamp = tools::getCurrentTimestamp(); - if (this->lastConnectionTimestamp && - currentTimestamp - this->lastConnectionTimestamp < - AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { - throw std::runtime_error( - "AMQP reconnection attempt interval too short, tried to reconnect " - "after " + - std::to_string(currentTimestamp - this->lastConnectionTimestamp) + - "ms, the shortest allowed interval is " + - std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); - } - this->lastConnectionTimestamp = currentTimestamp; + this->connectInternal(); + while (this->reconnectAttempt < AMQP_RECONNECT_MAX_ATTEMPTS) { + this->reconnectAttempt++; + LOG(INFO) << "AMQP: Attempt " << this->reconnectAttempt + << " to reconnect in " << AMQP_RECONNECT_ATTEMPT_INTERVAL_MS + << " ms"; + std::this_thread::sleep_for( + std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS)); this->connectInternal(); } + LOG(FATAL) << "Cannot connect to AMQP server after " + << AMQP_RECONNECT_MAX_ATTEMPTS << " attempts"; } bool AmqpManager::send(const database::MessageItem *message) { waitUntilReady(); try { const std::string messagePayload = message->getPayload(); AMQP::Envelope env(messagePayload.c_str(), messagePayload.size()); AMQP::Table headers; headers[AMQP_HEADER_MESSAGEID] = message->getMessageID(); headers[AMQP_HEADER_FROM_DEVICEID] = message->getFromDeviceID(); headers[AMQP_HEADER_TO_DEVICEID] = message->getToDeviceID(); // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); std::scoped_lock lock{this->channelMutex}; this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { LOG(ERROR) << "AMQP: Error while publishing message: " << e.what(); return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { waitUntilReady(); std::scoped_lock lock{this->channelMutex}; this->amqpChannel->ack(deliveryTag); } void AmqpManager::waitUntilReady() { while (!this->amqpReady) { LOG(INFO) << "AMQP: Connection is not ready, waiting"; std::this_thread::sleep_for( - std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL)); + std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS)); } } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h index 91ee69cab..90adef73a 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -1,39 +1,39 @@ #pragma once #include "DatabaseManager.h" #include #include #include #include #include #include namespace comm { namespace network { class AmqpManager { AmqpManager(){}; std::mutex channelMutex; std::once_flag initOnceFlag; std::unique_ptr amqpChannel; std::atomic amqpReady; - std::atomic lastConnectionTimestamp; + std::atomic reconnectAttempt; void connectInternal(); void connect(); void waitUntilReady(); public: static AmqpManager &getInstance(); void init(); bool send(const database::MessageItem *message); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; void operator=(AmqpManager const &) = delete; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h index 634f4d8b9..35461a417 100644 --- a/services/tunnelbroker/src/Constants.h +++ b/services/tunnelbroker/src/Constants.h @@ -1,64 +1,65 @@ #pragma once #include #include #include namespace comm { namespace network { // AWS DynamoDB const size_t DYNAMODB_MAX_BATCH_ITEMS = 25; const size_t DYNAMODB_BACKOFF_FIRST_RETRY_DELAY = 50; const size_t DYNAMODB_MAX_BACKOFF_TIME = 10000; // 10 seconds const std::string DEVICE_SESSIONS_TABLE_NAME = "tunnelbroker-device-sessions"; const std::string DEVICE_SESSIONS_VERIFICATION_MESSAGES_TABLE_NAME = "tunnelbroker-verification-messages"; const std::string DEVICE_PUBLIC_KEY_TABLE_NAME = "tunnelbroker-public-keys"; const std::string MESSAGES_TABLE_NAME = "tunnelbroker-messages"; // Sessions const size_t SIGNATURE_REQUEST_LENGTH = 64; const size_t SESSION_ID_LENGTH = 64; const size_t SESSION_RECORD_TTL = 30 * 24 * 3600; // 30 days const size_t SESSION_SIGN_RECORD_TTL = 24 * 3600; // 24 hours const std::regex SESSION_ID_FORMAT_REGEX( "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); // AMQP (RabbitMQ) const std::string AMQP_FANOUT_EXCHANGE_NAME = "allBrokers"; // Message broker queue message TTL const size_t AMQP_MESSAGE_TTL = 300 * 1000; // 5 min // queue TTL in case of no consumers (tunnelbroker is down) const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours // routing message headers name const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceID"; const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID"; const std::string AMQP_HEADER_MESSAGEID = "messageID"; -const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min +const size_t AMQP_RECONNECT_ATTEMPT_INTERVAL_MS = 3000; +const size_t AMQP_RECONNECT_MAX_ATTEMPTS = 10; // DeviceID // DEVICEID_CHAR_LENGTH has to be kept in sync with deviceIDCharLength // which is defined in web/utils/device-id.js const size_t DEVICEID_CHAR_LENGTH = 64; const std::regex DEVICEID_FORMAT_REGEX( "^(ks|mobile|web):[a-zA-Z0-9]{" + std::to_string(DEVICEID_CHAR_LENGTH) + "}$"); // Config const std::string CONFIG_FILE_DIRECTORY_ENV_VARIABLE = "TUNNELBROKER_CONFIG_FILE_DIRECTORY"; const std::string DEFAULT_CONFIG_FILE_DIRECTORY = std::string(std::getenv("HOME")) + "/tunnelbroker"; const std::string CONFIG_FILE_NAME = "tunnelbroker.ini"; const std::string SANDBOX_CONFIG_FILE_NAME = "tunnelbroker-sandbox.ini"; // DeliveryBroker const size_t DELIVERY_BROKER_MAX_QUEUE_SIZE = 100; // Database messages TTL const size_t MESSAGE_RECORD_TTL = 300 * 24 * 60 * 60; // 300 days } // namespace network } // namespace comm