diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -20,7 +20,7 @@ std::once_flag initOnceFlag; std::unique_ptr amqpChannel; std::atomic amqpReady; - std::atomic lastConnectionTimestamp; + std::atomic reconnectAttempt; void connectInternal(); void connect(); void waitUntilReady(); diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -42,10 +42,10 @@ this->amqpChannel->onReady([this]() { LOG(INFO) << "AMQP: Channel is ready"; this->amqpReady = true; + this->reconnectAttempt = 0; }); this->amqpChannel->onError([this](const char *message) { - LOG(ERROR) << "AMQP: channel error: " << message - << ", will try to reconnect"; + LOG(ERROR) << "AMQP: Channel error: " << message; this->amqpReady = false; }); @@ -87,28 +87,24 @@ }); }) .onError([](const char *message) { - throw std::runtime_error( - "AMQP: Queue creation error: " + std::string(message)); + LOG(ERROR) << "AMQP: Queue creation error: " + std::string(message); }); uv_run(localUvLoop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { - while (true) { - int64_t currentTimestamp = tools::getCurrentTimestamp(); - if (this->lastConnectionTimestamp && - currentTimestamp - this->lastConnectionTimestamp < - AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { - throw std::runtime_error( - "AMQP reconnection attempt interval too short, tried to reconnect " - "after " + - std::to_string(currentTimestamp - this->lastConnectionTimestamp) + - "ms, the shortest allowed interval is " + - std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); - } - this->lastConnectionTimestamp = currentTimestamp; + this->connectInternal(); + while (this->reconnectAttempt < AMQP_RECONNECT_MAX_ATTEMPTS) { + this->reconnectAttempt++; + LOG(INFO) << "AMQP: Attempt " << this->reconnectAttempt + << " to reconnect in " << AMQP_RECONNECT_ATTEMPT_INTERVAL_MS + << " ms"; + std::this_thread::sleep_for( + std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS)); this->connectInternal(); } + LOG(FATAL) << "Cannot connect to AMQP server after " + << AMQP_RECONNECT_MAX_ATTEMPTS << " attempts"; } bool AmqpManager::send(const database::MessageItem *message) { @@ -147,7 +143,7 @@ while (!this->amqpReady) { LOG(INFO) << "AMQP: Connection is not ready, waiting"; std::this_thread::sleep_for( - std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL)); + std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS)); } } diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h --- a/services/tunnelbroker/src/Constants.h +++ b/services/tunnelbroker/src/Constants.h @@ -37,7 +37,8 @@ const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID"; const std::string AMQP_HEADER_MESSAGEID = "messageID"; -const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min +const size_t AMQP_RECONNECT_ATTEMPT_INTERVAL_MS = 3000; +const size_t AMQP_RECONNECT_MAX_ATTEMPTS = 10; // DeviceID // DEVICEID_CHAR_LENGTH has to be kept in sync with deviceIDCharLength