Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3389055
D4744.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Referenced Files
None
Subscribers
None
D4744.diff
View Options
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -20,7 +20,7 @@
std::once_flag initOnceFlag;
std::unique_ptr<AMQP::TcpChannel> amqpChannel;
std::atomic<bool> amqpReady;
- std::atomic<int64_t> lastConnectionTimestamp;
+ std::atomic<std::size_t> reconnectAttempt;
void connectInternal();
void connect();
void waitUntilReady();
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -42,10 +42,10 @@
this->amqpChannel->onReady([this]() {
LOG(INFO) << "AMQP: Channel is ready";
this->amqpReady = true;
+ this->reconnectAttempt = 0;
});
this->amqpChannel->onError([this](const char *message) {
- LOG(ERROR) << "AMQP: channel error: " << message
- << ", will try to reconnect";
+ LOG(ERROR) << "AMQP: Channel error: " << message;
this->amqpReady = false;
});
@@ -87,28 +87,24 @@
});
})
.onError([](const char *message) {
- throw std::runtime_error(
- "AMQP: Queue creation error: " + std::string(message));
+ LOG(ERROR) << "AMQP: Queue creation error: " + std::string(message);
});
uv_run(localUvLoop, UV_RUN_DEFAULT);
};
void AmqpManager::connect() {
- while (true) {
- int64_t currentTimestamp = tools::getCurrentTimestamp();
- if (this->lastConnectionTimestamp &&
- currentTimestamp - this->lastConnectionTimestamp <
- AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) {
- throw std::runtime_error(
- "AMQP reconnection attempt interval too short, tried to reconnect "
- "after " +
- std::to_string(currentTimestamp - this->lastConnectionTimestamp) +
- "ms, the shortest allowed interval is " +
- std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms");
- }
- this->lastConnectionTimestamp = currentTimestamp;
+ this->connectInternal();
+ while (this->reconnectAttempt < AMQP_RECONNECT_MAX_ATTEMPTS) {
+ this->reconnectAttempt++;
+ LOG(INFO) << "AMQP: Attempt " << this->reconnectAttempt
+ << " to reconnect in " << AMQP_RECONNECT_ATTEMPT_INTERVAL_MS
+ << " ms";
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS));
this->connectInternal();
}
+ LOG(FATAL) << "Cannot connect to AMQP server after "
+ << AMQP_RECONNECT_MAX_ATTEMPTS << " attempts";
}
bool AmqpManager::send(const database::MessageItem *message) {
@@ -147,7 +143,7 @@
while (!this->amqpReady) {
LOG(INFO) << "AMQP: Connection is not ready, waiting";
std::this_thread::sleep_for(
- std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL));
+ std::chrono::milliseconds(AMQP_RECONNECT_ATTEMPT_INTERVAL_MS));
}
}
diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h
--- a/services/tunnelbroker/src/Constants.h
+++ b/services/tunnelbroker/src/Constants.h
@@ -37,7 +37,8 @@
const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceID";
const std::string AMQP_HEADER_MESSAGEID = "messageID";
-const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min
+const size_t AMQP_RECONNECT_ATTEMPT_INTERVAL_MS = 3000;
+const size_t AMQP_RECONNECT_MAX_ATTEMPTS = 10;
// DeviceID
// DEVICEID_CHAR_LENGTH has to be kept in sync with deviceIDCharLength
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 30, 5:19 PM (21 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2601756
Default Alt Text
D4744.diff (3 KB)
Attached To
Mode
D4744: [services] Tunnelbroker - Fix AMQP client reconnection algorithm
Attached
Detach File
Event Timeline
Log In to Comment