Page MenuHomePhabricator

D4741.id15814.diff
No OneTemporary

D4741.id15814.diff

diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h
--- a/services/tunnelbroker/src/Amqp/AmqpManager.h
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.h
@@ -21,6 +21,7 @@
std::atomic<int64_t> lastConnectionTimestamp;
void connectInternal();
void connect();
+ void waitUntilReady();
public:
static AmqpManager &getInstance();
diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
--- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp
+++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp
@@ -7,6 +7,8 @@
#include <glog/logging.h>
#include <uv.h>
+#include <chrono>
+#include <thread>
namespace comm {
namespace network {
@@ -110,10 +112,7 @@
}
bool AmqpManager::send(const database::MessageItem *message) {
- if (!this->amqpReady) {
- LOG(ERROR) << "AMQP: Message send error: channel not ready";
- return false;
- }
+ waitUntilReady();
try {
const std::string messagePayload = message->getPayload();
AMQP::Envelope env(messagePayload.c_str(), messagePayload.size());
@@ -137,11 +136,19 @@
};
void AmqpManager::ack(uint64_t deliveryTag) {
- if (!this->amqpReady) {
- LOG(ERROR) << "AMQP: Message ACK error: channel not ready";
+ waitUntilReady();
+ this->amqpChannel->ack(deliveryTag);
+}
+
+void AmqpManager::waitUntilReady() {
+ if (this->amqpReady) {
return;
}
- this->amqpChannel->ack(deliveryTag);
+ while (!this->amqpReady) {
+ LOG(INFO) << "AMQP: Connection is not ready, waiting";
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL));
+ }
}
} // namespace network

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 11:17 PM (20 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2572375
Default Alt Text
D4741.id15814.diff (1 KB)

Event Timeline