diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp index 4ecde355a..f571ef01a 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.cpp @@ -1,143 +1,144 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "Tools.h" -#include #include namespace comm { namespace network { -static std::unique_ptr amqpChannel; -static std::atomic amqpReady; -static long long lastConnectionTimestamp; +AmqpManager &AmqpManager::getInstance() { + static AmqpManager instance; + return instance; +} -void AMQPConnectInternal() { +void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); std::cout << "AMQP: Connecting to " << amqpUri << std::endl; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); - amqpChannel = std::make_unique(&connection); - amqpChannel->onError([](const char *message) { + this->amqpChannel = std::make_unique(&connection); + this->amqpChannel->onError([this](const char *message) { std::cout << "AMQP: channel error: " << message << ", will try to reconnect" << std::endl; - amqpReady = false; + this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = AMQP_MESSAGE_TTL; arguments["x-expires"] = AMQP_QUEUE_TTL; - amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); - amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) - .onSuccess([tunnelbrokerID, fanoutExchangeName]( + this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); + this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) + .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { std::cout << "AMQP: Queue " << name << " created" << std::endl; - amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") - .onError([tunnelbrokerID, fanoutExchangeName](const char *message) { + this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") + .onError([this, tunnelbrokerID, fanoutExchangeName]( + const char *message) { std::cout << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName << std::endl; - amqpReady = false; + this->amqpReady = false; }); - amqpReady = true; - amqpChannel->consume(tunnelbrokerID) - .onReceived([&](const AMQP::Message &message, - uint64_t deliveryTag, - bool redelivered) { + this->amqpReady = true; + this->amqpChannel->consume(tunnelbrokerID) + .onReceived([](const AMQP::Message &message, + uint64_t deliveryTag, + bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); std::cout << "AMQP: Message consumed for deviceID: " << toDeviceID << std::endl; DeliveryBroker::getInstance().push( deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { std::cout << "AMQP: Message parsing exception: " << e.what() << std::endl; } }) .onError([](const char *message) { std::cout << "AMQP: Error on message consume: " << message << std::endl; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; -void AMQPConnect() { +void AmqpManager::connect() { while (true) { long long currentTimestamp = getCurrentTimestamp(); - if (lastConnectionTimestamp && - currentTimestamp - lastConnectionTimestamp < + if (this->lastConnectionTimestamp && + currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + - std::to_string(currentTimestamp - lastConnectionTimestamp) + + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } - lastConnectionTimestamp = currentTimestamp; - AMQPConnectInternal(); + this->lastConnectionTimestamp = currentTimestamp; + this->connectInternal(); } } -bool AMQPSend( +bool AmqpManager::send( std::string toDeviceID, std::string fromDeviceID, std::string payload) { - if (!amqpReady) { + if (!this->amqpReady) { std::cout << "AMQP: Message send error: channel not ready" << std::endl; return false; } try { AMQP::Envelope env(payload.c_str(), payload.size()); AMQP::Table headers; headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); - amqpChannel->publish( + this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { std::cout << "AMQP: Error while publishing message: " << e.what() << std::endl; return false; } return true; }; -void AMQPAck(uint64_t deliveryTag) { - if (!amqpReady) { +void AmqpManager::ack(uint64_t deliveryTag) { + if (!this->amqpReady) { std::cout << "AMQP: Message ACK error: channel not ready" << std::endl; return; } - amqpChannel->ack(deliveryTag); + this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h index 0015bd9f3..7743218f3 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/docker-server/contents/server/src/Amqp/AmqpManager.h @@ -1,21 +1,33 @@ #pragma once #include +#include #include #include #include namespace comm { namespace network { -void AMQPConnect(); -void AMQPConnectInternal(); -bool AMQPSend( - std::string toDeviceID, - std::string fromDeviceID, - std::string payload); -void AMQPAck(uint64_t deliveryTag); +class AmqpManager { + AmqpManager(){}; + + std::unique_ptr amqpChannel; + std::atomic amqpReady; + std::atomic lastConnectionTimestamp; + void connectInternal(); + +public: + static AmqpManager &getInstance(); + void connect(); + bool + send(std::string toDeviceID, std::string fromDeviceID, std::string payload); + void ack(uint64_t deliveryTag); + + AmqpManager(AmqpManager const &) = delete; + void operator=(AmqpManager const &) = delete; +}; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp index 50e7d808a..bd1a9e093 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,223 +1,223 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "Tools.h" namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!validateDeviceID(deviceID)) { std::cout << "gRPC: " << "Format validation failed for " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!validateDeviceID(deviceID)) { std::cout << "gRPC: " << "Format validation failed for " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { std::cout << "gRPC: " << "Session sign request not found for deviceID: " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { std::cout << "gRPC: " << "The public key doesn't match for deviceID" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { std::cout << "gRPC: " << "Signature for the verification message is not valid" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'NewSession' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!validateSessionID(sessionID)) { std::cout << "gRPC: " << "Format validation failed for " << sessionID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { std::cout << "gRPC: " << "Session " << sessionID << " not found" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); - if (!AMQPSend( + if (!AmqpManager::getInstance().send( request->todeviceid(), clientDeviceID, std::string(request->payload()))) { std::cout << "gRPC: " << "Error while publish the message to AMQP" << std::endl; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'Send' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!validateSessionID(sessionID)) { std::cout << "gRPC: " << "Format validation failed for " << sessionID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { std::cout << "gRPC: " << "Session " << sessionID << " not found" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); std::vector messagesToDeliver; while (1) { messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID); for (auto const &message : messagesToDeliver) { tunnelbroker::GetResponse response; response.set_fromdeviceid(message.fromDeviceID); response.set_payload(message.payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } - AMQPAck(message.deliveryTag); + comm::network::AmqpManager::getInstance().ack(message.deliveryTag); } if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { DeliveryBroker::getInstance().remove(clientDeviceID); } DeliveryBroker::getInstance().wait(clientDeviceID); } } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'Get' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/server.cpp b/services/tunnelbroker/docker-server/contents/server/src/server.cpp index f315595a1..d55de84f8 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/server.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/server.cpp @@ -1,44 +1,48 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "TunnelbrokerServiceImpl.h" #include #include #include #include namespace comm { namespace network { void RunServer() { TunnelBrokerServiceImpl service; grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort( SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials()); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); std::cout << "gRPC Server listening at :" << SERVER_LISTEN_ADDRESS << std::endl; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. server->Wait(); } +void RunAmqpClient() { + AmqpManager::getInstance().connect(); +} + } // namespace network } // namespace comm int main(int argc, char **argv) { comm::network::config::ConfigManager::getInstance().load(); - std::thread amqpThread(comm::network::AMQPConnect); + std::thread amqpThread(comm::network::RunAmqpClient); std::thread grpcThread(comm::network::RunServer); amqpThread.join(); grpcThread.join(); return 0; }