diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -21,6 +21,7 @@ std::atomic lastConnectionTimestamp; void connectInternal(); void connect(); + void waitUntilReady(); public: static AmqpManager &getInstance(); diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include namespace comm { namespace network { @@ -110,10 +112,7 @@ } bool AmqpManager::send(const database::MessageItem *message) { - if (!this->amqpReady) { - LOG(ERROR) << "AMQP: Message send error: channel not ready"; - return false; - } + waitUntilReady(); try { const std::string messagePayload = message->getPayload(); AMQP::Envelope env(messagePayload.c_str(), messagePayload.size()); @@ -137,11 +136,19 @@ }; void AmqpManager::ack(uint64_t deliveryTag) { - if (!this->amqpReady) { - LOG(ERROR) << "AMQP: Message ACK error: channel not ready"; + waitUntilReady(); + this->amqpChannel->ack(deliveryTag); +} + +void AmqpManager::waitUntilReady() { + if (this->amqpReady) { return; } - this->amqpChannel->ack(deliveryTag); + while (!this->amqpReady) { + LOG(INFO) << "AMQP: Connection is not ready, waiting"; + std::this_thread::sleep_for( + std::chrono::milliseconds(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL)); + } } } // namespace network