diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index 8a3aeb6f7..e4013ab62 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,147 +1,146 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "Tools.h" +#include + #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); - std::cout << "AMQP: Connecting to " << amqpUri << std::endl; + LOG(INFO) << "AMQP: Connecting to " << amqpUri; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&connection); this->amqpChannel->onError([this](const char *message) { - std::cout << "AMQP: channel error: " << message << ", will try to reconnect" - << std::endl; + LOG(ERROR) << "AMQP: channel error: " << message + << ", will try to reconnect"; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = std::to_string(AMQP_MESSAGE_TTL); arguments["x-expires"] = std::to_string(AMQP_QUEUE_TTL); this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { - std::cout << "AMQP: Queue " << name << " created" << std::endl; + LOG(INFO) << "AMQP: Queue " << name << " created"; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { - std::cout << "AMQP: Failed to bind queue: " << tunnelbrokerID - << " to exchange: " << fanoutExchangeName << std::endl; + LOG(ERROR) << "AMQP: Failed to bind queue: " << tunnelbrokerID + << " to exchange: " << fanoutExchangeName; this->amqpReady = false; }); this->amqpReady = true; this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); - std::cout << "AMQP: Message consumed for deviceID: " - << toDeviceID << std::endl; + LOG(INFO) << "AMQP: Message consumed for deviceID: " + << toDeviceID; DeliveryBroker::getInstance().push( messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { - std::cout << "AMQP: Message parsing exception: " << e.what() - << std::endl; + LOG(ERROR) << "AMQP: Message parsing exception: " << e.what(); } }) .onError([](const char *message) { - std::cout << "AMQP: Error on message consume: " << message - << std::endl; + LOG(ERROR) << "AMQP: Error on message consume: " << message; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { while (true) { int64_t currentTimestamp = tools::getCurrentTimestamp(); if (this->lastConnectionTimestamp && currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } this->lastConnectionTimestamp = currentTimestamp; this->connectInternal(); } } bool AmqpManager::send( std::string messageID, std::string fromDeviceID, std::string toDeviceID, std::string payload) { if (!this->amqpReady) { - std::cout << "AMQP: Message send error: channel not ready" << std::endl; + LOG(ERROR) << "AMQP: Message send error: channel not ready"; return false; } try { AMQP::Envelope env(payload.c_str(), payload.size()); AMQP::Table headers; headers[AMQP_HEADER_MESSAGEID] = messageID; headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { - std::cout << "AMQP: Error while publishing message: " << e.what() - << std::endl; + LOG(ERROR) << "AMQP: Error while publishing message: " << e.what(); return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { if (!this->amqpReady) { - std::cout << "AMQP: Message ACK error: channel not ready" << std::endl; + LOG(ERROR) << "AMQP: Message ACK error: channel not ready"; return; } this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp index 46e217db4..f450cdfaf 100644 --- a/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp +++ b/services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp @@ -1,68 +1,70 @@ #include "DeliveryBroker.h" +#include + namespace comm { namespace network { DeliveryBroker &DeliveryBroker::getInstance() { static DeliveryBroker instance; return instance; }; void DeliveryBroker::push( const std::string messageID, const uint64_t deliveryTag, const std::string toDeviceID, const std::string fromDeviceID, const std::string payload) { try { if (this->messagesMap.find(toDeviceID) == this->messagesMap.end()) { this->messagesMap.insert( toDeviceID, std::make_unique( DELIVERY_BROKER_MAX_QUEUE_SIZE)); } this->messagesMap.find(toDeviceID) ->second->blockingWrite(DeliveryBrokerMessage{ .messageID = messageID, .deliveryTag = deliveryTag, .fromDeviceID = fromDeviceID, .payload = payload}); } catch (const std::exception &e) { - std::cout << "DeliveryBroker push: " - << "Got an exception " << e.what() << std::endl; + LOG(ERROR) << "DeliveryBroker push: " + << "Got an exception " << e.what(); } }; bool DeliveryBroker::isEmpty(const std::string deviceID) { if (this->messagesMap.find(deviceID) == this->messagesMap.end()) { return true; }; return this->messagesMap.find(deviceID)->second->isEmpty(); }; DeliveryBrokerMessage DeliveryBroker::pop(const std::string deviceID) { try { // If we don't already have a queue, insert it for the blocking read purpose // in case we listen first before the insert happens. if (this->messagesMap.find(deviceID) == this->messagesMap.end()) { this->messagesMap.insert( deviceID, std::make_unique( DELIVERY_BROKER_MAX_QUEUE_SIZE)); } DeliveryBrokerMessage receievedMessage; this->messagesMap.find(deviceID)->second->blockingRead(receievedMessage); return receievedMessage; } catch (const std::exception &e) { - std::cout << "DeliveryBroker pop: " - << "Got an exception " << e.what() << std::endl; + LOG(ERROR) << "DeliveryBroker pop: " + << "Got an exception " << e.what(); } return {}; }; void DeliveryBroker::erase(const std::string deviceID) { this->messagesMap.erase(deviceID); }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp index 57ec7273a..dfc1a73fe 100644 --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,227 +1,223 @@ #include "TunnelbrokerServiceImpl.h" - #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "Tools.h" +#include + namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { - std::cout << "gRPC: " - << "Format validation failed for " << deviceID << std::endl; + LOG(INFO) << "gRPC: " + << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = tools::generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { - std::cout << "gRPC: " - << "Format validation failed for " << deviceID << std::endl; + LOG(INFO) << "gRPC: " + << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = tools::generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { - std::cout << "gRPC: " - << "Session sign request not found for deviceID: " << deviceID - << std::endl; + LOG(INFO) << "gRPC: " + << "Session sign request not found for deviceID: " << deviceID; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { - std::cout << "gRPC: " - << "The public key doesn't match for deviceID" << std::endl; + LOG(INFO) << "gRPC: " + << "The public key doesn't match for deviceID"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { - std::cout << "gRPC: " - << "Signature for the verification message is not valid" - << std::endl; + LOG(INFO) << "gRPC: " + << "Signature for the verification message is not valid"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { - std::cout << "gRPC: " - << "Error while processing 'NewSession' request: " << e.what() - << std::endl; + LOG(ERROR) << "gRPC: " + << "Error while processing 'NewSession' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { - std::cout << "gRPC: " - << "Format validation failed for " << sessionID << std::endl; + LOG(INFO) << "gRPC: " + << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { - std::cout << "gRPC: " - << "Session " << sessionID << " not found" << std::endl; + LOG(INFO) << "gRPC: " + << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); if (!AmqpManager::getInstance().send( messageID, clientDeviceID, request->todeviceid(), std::string(request->payload()))) { - std::cout << "gRPC: " - << "Error while publish the message to AMQP" << std::endl; + LOG(ERROR) << "gRPC: " + << "Error while publish the message to AMQP"; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { - std::cout << "gRPC: " - << "Error while processing 'Send' request: " << e.what() - << std::endl; + LOG(ERROR) << "gRPC: " + << "Error while processing 'Send' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { - std::cout << "gRPC: " - << "Format validation failed for " << sessionID << std::endl; + LOG(INFO) << "gRPC: " + << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { - std::cout << "gRPC: " - << "Session " << sessionID << " not found" << std::endl; + LOG(INFO) << "gRPC: " + << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); tunnelbroker::GetResponse response; response.set_fromdeviceid(messageToDeliver.fromDeviceID); response.set_payload(messageToDeliver.payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { DeliveryBroker::getInstance().erase(clientDeviceID); } } } catch (std::runtime_error &e) { - std::cout << "gRPC: " - << "Error while processing 'Get' request: " << e.what() - << std::endl; + LOG(ERROR) << "gRPC: " + << "Error while processing 'Get' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/CryptoTools.cpp b/services/tunnelbroker/src/Tools/CryptoTools.cpp index 0fa344ea4..db395b8b7 100644 --- a/services/tunnelbroker/src/Tools/CryptoTools.cpp +++ b/services/tunnelbroker/src/Tools/CryptoTools.cpp @@ -1,46 +1,47 @@ #include "CryptoTools.h" #include #include #include #include #include #include +#include #include namespace comm { namespace network { namespace crypto { bool rsaVerifyString( const std::string &publicKeyBase64, const std::string &message, const std::string &signatureBase64) { CryptoPP::RSA::PublicKey publicKey; std::string decodedSignature; try { publicKey.Load(CryptoPP::StringSource( publicKeyBase64, true, new CryptoPP::Base64Decoder()) .Ref()); CryptoPP::StringSource stringSource( signatureBase64, true, new CryptoPP::Base64Decoder( new CryptoPP::StringSink(decodedSignature))); CryptoPP::RSASSA_PKCS1v15_SHA_Verifier verifierSha256(publicKey); return verifierSha256.VerifyMessage( reinterpret_cast(message.c_str()), message.length(), reinterpret_cast(decodedSignature.c_str()), decodedSignature.length()); } catch (const std::exception &e) { - std::cout << "CryptoTools: " - << "Got an exception " << e.what() << std::endl; + LOG(ERROR) << "CryptoTools: " + << "Got an exception " << e.what(); return false; } } } // namespace crypto } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/Tools.cpp b/services/tunnelbroker/src/Tools/Tools.cpp index 1138a76f9..6650597f5 100644 --- a/services/tunnelbroker/src/Tools/Tools.cpp +++ b/services/tunnelbroker/src/Tools/Tools.cpp @@ -1,87 +1,86 @@ #include "Tools.h" #include "ConfigManager.h" #include "Constants.h" +#include #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace tools { std::string generateRandomString(std::size_t length) { const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; thread_local std::random_device generator; std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); std::string random_string; for (std::size_t i = 0; i < length; ++i) { random_string += CHARACTERS[distribution(generator)]; } return random_string; } int64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } bool validateDeviceID(std::string deviceID) { try { static const std::regex deviceIDKeyserverRegexp("^ks:.*"); if (std::regex_match(deviceID, deviceIDKeyserverRegexp)) { return ( deviceID == config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DEFAULT_KEYSERVER_ID)); } return std::regex_match(deviceID, DEVICEID_FORMAT_REGEX); } catch (const std::exception &e) { - std::cout << "Tools: " - << "Got an exception at `validateDeviceID`: " << e.what() - << std::endl; + LOG(ERROR) << "Tools: " + << "Got an exception at `validateDeviceID`: " << e.what(); return false; } } std::string generateUUID() { thread_local boost::uuids::random_generator random_generator; return boost::uuids::to_string(random_generator()); } bool validateSessionID(std::string sessionID) { try { return std::regex_match(sessionID, SESSION_ID_FORMAT_REGEX); } catch (const std::exception &e) { - std::cout << "Tools: " - << "Got an exception at `validateSessionId`: " << e.what() - << std::endl; + LOG(ERROR) << "Tools: " + << "Got an exception at `validateSessionId`: " << e.what(); return false; } } void checkIfNotEmpty(std::string fieldName, std::string stringToCheck) { if (stringToCheck.empty()) { throw std::runtime_error( "Error: Required text field " + fieldName + " is empty."); } } void checkIfNotZero(std::string fieldName, uint64_t numberToCheck) { if (numberToCheck == 0) { throw std::runtime_error( "Error: Required number " + fieldName + " is zero."); } } } // namespace tools } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/server.cpp b/services/tunnelbroker/src/server.cpp index d55de84f8..7862865c9 100644 --- a/services/tunnelbroker/src/server.cpp +++ b/services/tunnelbroker/src/server.cpp @@ -1,48 +1,48 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "TunnelbrokerServiceImpl.h" +#include #include #include #include #include namespace comm { namespace network { void RunServer() { TunnelBrokerServiceImpl service; grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort( SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials()); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - std::cout << "gRPC Server listening at :" << SERVER_LISTEN_ADDRESS - << std::endl; + LOG(INFO) << "gRPC Server listening at :" << SERVER_LISTEN_ADDRESS; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. server->Wait(); } void RunAmqpClient() { AmqpManager::getInstance().connect(); } } // namespace network } // namespace comm int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); comm::network::config::ConfigManager::getInstance().load(); - std::thread amqpThread(comm::network::RunAmqpClient); std::thread grpcThread(comm::network::RunServer); amqpThread.join(); grpcThread.join(); return 0; }