diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index 4670fb343..67b4126ba 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,147 +1,144 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); LOG(INFO) << "AMQP: Connecting to " << amqpUri; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&connection); this->amqpChannel->onError([this](const char *message) { LOG(ERROR) << "AMQP: channel error: " << message << ", will try to reconnect"; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = AMQP_MESSAGE_TTL; arguments["x-expires"] = AMQP_QUEUE_TTL; this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { LOG(INFO) << "AMQP: Queue " << name << " created"; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { LOG(ERROR) << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName; this->amqpReady = false; }); this->amqpReady = true; this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); LOG(INFO) << "AMQP: Message consumed for deviceID: " << toDeviceID; DeliveryBroker::getInstance().push( messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { LOG(ERROR) << "AMQP: Message parsing exception: " << e.what(); } }) .onError([](const char *message) { LOG(ERROR) << "AMQP: Error on message consume: " << message; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { while (true) { int64_t currentTimestamp = tools::getCurrentTimestamp(); if (this->lastConnectionTimestamp && currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } this->lastConnectionTimestamp = currentTimestamp; this->connectInternal(); } } -bool AmqpManager::send( - std::string messageID, - std::string fromDeviceID, - std::string toDeviceID, - std::string payload) { +bool AmqpManager::send(const database::MessageItem *message) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message send error: channel not ready"; return false; } try { - AMQP::Envelope env(payload.c_str(), payload.size()); + AMQP::Envelope env( + message->getPayload().c_str(), message->getPayload().size()); AMQP::Table headers; - headers[AMQP_HEADER_MESSAGEID] = messageID; - headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; - headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; + headers[AMQP_HEADER_MESSAGEID] = message->getMessageID(); + headers[AMQP_HEADER_FROM_DEVICEID] = message->getFromDeviceID(); + headers[AMQP_HEADER_TO_DEVICEID] = message->getToDeviceID(); // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { LOG(ERROR) << "AMQP: Error while publishing message: " << e.what(); return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message ACK error: channel not ready"; return; } this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h index 8885ea3f3..5109aa4e3 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -1,36 +1,34 @@ #pragma once +#include "DatabaseManager.h" + #include #include #include #include #include namespace comm { namespace network { class AmqpManager { AmqpManager(){}; std::unique_ptr amqpChannel; std::atomic amqpReady; std::atomic lastConnectionTimestamp; void connectInternal(); public: static AmqpManager &getInstance(); void connect(); - bool send( - std::string messageID, - std::string fromDeviceID, - std::string toDeviceID, - std::string payload); + bool send(const database::MessageItem *message); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; void operator=(AmqpManager const &) = delete; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp index 5ba30c2c8..bc6a3989e 100644 --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,259 +1,255 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = tools::generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = tools::generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { LOG(INFO) << "gRPC: " << "Session sign request not found for deviceID: " << deviceID; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { LOG(INFO) << "gRPC: " << "The public key doesn't match for deviceID"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { LOG(INFO) << "gRPC: " << "Signature for the verification message is not valid"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'NewSession' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); const database::MessageItem message( messageID, clientDeviceID, request->todeviceid(), request->payload(), ""); database::DatabaseManager::getInstance().putMessageItem(message); - if (!AmqpManager::getInstance().send( - messageID, - clientDeviceID, - request->todeviceid(), - std::string(request->payload()))) { + if (!AmqpManager::getInstance().send(&message)) { LOG(ERROR) << "gRPC: " << "Error while publish the message to AMQP"; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Send' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; std::vector> messagesFromDatabase = database::DatabaseManager::getInstance().findMessageItemsByReceiver( clientDeviceID); if (messagesFromDatabase.size() > 0) { // When a client connects and requests GET for the messages first we check // if there are undelivered messages in the database. If so, we are // erasing the messages to deliver from rabbitMQ which are handled by // DeliveryBroker. DeliveryBroker::getInstance().erase(clientDeviceID); } tunnelbroker::GetResponse response; auto respondToWriter = [&writer, &response](std::string fromDeviceID, std::string payload) { response.set_fromdeviceid(fromDeviceID); response.set_payload(payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } response.Clear(); }; for (auto &messageFromDatabase : messagesFromDatabase) { respondToWriter( messageFromDatabase->getFromDeviceID(), messageFromDatabase->getPayload()); database::DatabaseManager::getInstance().removeMessageItem( messageFromDatabase->getMessageID()); } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); respondToWriter(messageToDeliver.fromDeviceID, messageToDeliver.payload); comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); database::DatabaseManager::getInstance().removeMessageItem( messageToDeliver.messageID); // If messages queue for `clientDeviceID` is empty we don't need to store // `folly::MPMCQueue` for it and need to free memory to fix possible // 'ghost' queues in DeliveryBroker. // We call `deleteQueueIfEmpty()` for this purpose here. DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Get' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm