Page MenuHomePhabricator

D4741.id15326.diff
No OneTemporary

D4741.id15326.diff

diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -20,6 +20,7 @@
std::atomic<int64_t> lastConnectionTimestamp;
void connectInternal();
void connect();
+ void waitUntilReady();
public:
static AmqpManager &getInstance();
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -8,6 +8,8 @@
#include <glog/logging.h>
#include <uv.h>
+#include <chrono>
+#include <thread>
namespace comm {
namespace network {
@@ -116,10 +118,7 @@
}
bool AmqpManager::send(const database::MessageItem *message) {
- if (!this->amqpReady) {
- LOG(ERROR) << "AMQP: Message send error: channel not ready";
- return false;
- }
+ waitUntilReady();
try {
const std::string messagePayload = message->getPayload();
AMQP::Envelope env(messagePayload.c_str(), messagePayload.size());
@@ -143,11 +142,22 @@
};
void AmqpManager::ack(uint64_t deliveryTag) {
- if (!this->amqpReady) {
- LOG(ERROR) << "AMQP: Message ACK error: channel not ready";
+ waitUntilReady();
+ this->amqpChannel->ack(deliveryTag);
+}
+
+void AmqpManager::waitUntilReady() {
+ if (this->amqpReady) {
return;
}
- this->amqpChannel->ack(deliveryTag);
+ while (true) {
+ LOG(INFO) << "AMQP: Connection is not ready, waiting";
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL));
+ if (this->amqpReady) {
+ return;
+ }
+ }
}
} // namespace network

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 11:24 PM (20 h, 7 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2572775
Default Alt Text
D4741.id15326.diff (1 KB)

Event Timeline