diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -1,5 +1,7 @@ #pragma once +#include "DatabaseManager.h" + #include #include @@ -21,11 +23,7 @@ public: static AmqpManager &getInstance(); void connect(); - bool send( - std::string messageID, - std::string fromDeviceID, - std::string toDeviceID, - std::string payload); + bool send(const database::MessageItem &message); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -105,21 +105,18 @@ } } -bool AmqpManager::send( - std::string messageID, - std::string fromDeviceID, - std::string toDeviceID, - std::string payload) { +bool AmqpManager::send(const database::MessageItem &message) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message send error: channel not ready"; return false; } try { - AMQP::Envelope env(payload.c_str(), payload.size()); + AMQP::Envelope env( + message->getPayload().c_str(), message->getPayload().size()); AMQP::Table headers; - headers[AMQP_HEADER_MESSAGEID] = messageID; - headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; - headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; + headers[AMQP_HEADER_MESSAGEID] = message->getMessageID(); + headers[AMQP_HEADER_FROM_DEVICEID] = message->getFromDeviceID(); + headers[AMQP_HEADER_TO_DEVICEID] = message->getToDeviceID(); // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -162,11 +162,7 @@ request->payload(), ""); database::DatabaseManager::getInstance().putMessageItem(message); - if (!AmqpManager::getInstance().send( - messageID, - clientDeviceID, - request->todeviceid(), - std::string(request->payload()))) { + if (!AmqpManager::getInstance().send(message)) { LOG(ERROR) << "gRPC: " << "Error while publish the message to AMQP"; return grpc::Status(