Page MenuHomePhorge

D4744.1767339198.diff
No OneTemporary

Size
3 KB
Referenced Files
None
Subscribers
None

D4744.1767339198.diff

diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -19,7 +19,7 @@
std::mutex channelMutex;
std::unique_ptr<AMQP::TcpChannel> amqpChannel;
std::atomic<bool> amqpReady;
- std::atomic<int64_t> lastConnectionTimestamp;
+ std::atomic<std::size_t> reconnectAttempt;
void connectInternal();
void connect();
void waitUntilReady();
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -46,10 +46,10 @@
this->amqpChannel->onReady([this]() {
LOG(INFO) << "AMQP: Channel is ready";
this->amqpReady = true;
+ this->reconnectAttempt = 0;
});
this->amqpChannel->onError([this](const char *message) {
- LOG(ERROR) << "AMQP: channel error: " << message
- << ", will try to reconnect";
+ LOG(ERROR) << "AMQP: Channel error: " << message;
this->amqpReady = false;
});
@@ -93,28 +93,23 @@
});
})
.onError([](const char *message) {
- throw std::runtime_error(
- "AMQP: Queue creation error: " + std::string(message));
+ LOG(ERROR) << "AMQP: Queue creation error: " + std::string(message);
});
uv_run(localUvLoop, UV_RUN_DEFAULT);
};
void AmqpManager::connect() {
- while (true) {
- int64_t currentTimestamp = tools::getCurrentTimestamp();
- if (this->lastConnectionTimestamp &&
- currentTimestamp - this->lastConnectionTimestamp <
- AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) {
- throw std::runtime_error(
- "AMQP reconnection attempt interval too short, tried to reconnect "
- "after " +
- std::to_string(currentTimestamp - this->lastConnectionTimestamp) +
- "ms, the shortest allowed interval is " +
- std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms");
- }
- this->lastConnectionTimestamp = currentTimestamp;
+ while (this->reconnectAttempt < AMQP_RECONNECT_MAX_ATTEMPTS) {
this->connectInternal();
+ this->reconnectAttempt++;
+ LOG(INFO) << "AMQP: Attempting " << this->reconnectAttempt
+ << " to reconnect in " << AMQP_RECONNECT_ATTEMPT_INTERVAL
+ << " ms";
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL));
}
+ LOG(FATAL) << "Cannot connect to AMQP server after "
+ << AMQP_RECONNECT_MAX_ATTEMPTS << " attempts";
}
bool AmqpManager::send(const database::MessageItem *message) {
@@ -156,7 +151,7 @@
while (true) {
LOG(INFO) << "AMQP: Connection is not ready, waiting";
std::this_thread::sleep_for(
- std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL));
+ std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL));
if (this->amqpReady) {
return;
}
diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h
--- a/services/tunnelbroker/src/Constants.h
+++ b/services/tunnelbroker/src/Constants.h
@@ -37,7 +37,8 @@
const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
const std::string AMQP_HEADER_MESSAGEID = "messageID";
-const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min
+const size_t AMQP_RECONNECT_ATTEMPT_INTERVAL = 3000; // 3 seconds
+const size_t AMQP_RECONNECT_MAX_ATTEMPTS = 10;
// DeviceID
const size_t DEVICEID_CHAR_LENGTH = 64;

File Metadata

Mime Type
text/plain
Expires
Fri, Jan 2, 7:33 AM (10 h, 10 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5878689
Default Alt Text
D4744.1767339198.diff (3 KB)

Event Timeline