diff --git a/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.cpp b/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.cpp index f15ffb922..42d2228be 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.cpp @@ -1,165 +1,202 @@ #include "DatabaseManager.h" namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request) { const Aws::DynamoDB::Model::PutItemOutcome outcome = getDynamoDBClient()->PutItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } template std::shared_ptr DatabaseManager::innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request) { std::shared_ptr item = createItemByType(); request.SetTableName(item->getTableName()); const Aws::DynamoDB::Model::GetItemOutcome &outcome = getDynamoDBClient()->GetItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); if (!outcomeItem.size()) { return nullptr; } item->assignItemFromDatabase(outcomeItem); return std::move(item); } void DatabaseManager::innerRemoveItem( const Item &item, const std::string &key) { Aws::DynamoDB::Model::DeleteItemRequest request; request.SetTableName(item.getTableName()); request.AddKey( item.getPrimaryKey(), Aws::DynamoDB::Model::AttributeValue(key)); const Aws::DynamoDB::Model::DeleteItemOutcome &outcome = getDynamoDBClient()->DeleteItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { this->innerRemoveItem(*(createItemByType()), deviceID); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { this->innerRemoveItem(*(createItemByType()), deviceID); } +void DatabaseManager::putMessageItem(const MessageItem &item) { + Aws::DynamoDB::Model::PutItemRequest request; + request.SetTableName(item.getTableName()); + request.AddItem( + MessageItem::FIELD_MESSAGE_ID, + Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); + request.AddItem( + MessageItem::FIELD_FROM_DEVICE_ID, + Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); + request.AddItem( + MessageItem::FIELD_TO_DEVICE_ID, + Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); + request.AddItem( + MessageItem::FIELD_PAYLOAD, + Aws::DynamoDB::Model::AttributeValue(item.getPayload())); + request.AddItem( + MessageItem::FIELD_BLOB_HASHES, + Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); + request.AddItem( + MessageItem::FIELD_EXPIRE, + Aws::DynamoDB::Model::AttributeValue(std::to_string(item.getExpire()))); + this->innerPutItem(std::make_shared(item), request); +} + +std::shared_ptr +DatabaseManager::findMessageItem(const std::string &messageID) { + Aws::DynamoDB::Model::GetItemRequest request; + request.AddKey( + MessageItem::FIELD_MESSAGE_ID, + Aws::DynamoDB::Model::AttributeValue(messageID)); + return std::move(this->innerFindItem(request)); +} + +void DatabaseManager::removeMessageItem(const std::string &messageID) { + this->innerRemoveItem(*(createItemByType()), messageID); +} + } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.h b/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.h index b4a76c28b..a68a377eb 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/docker-server/contents/server/src/Database/DatabaseManager.h @@ -1,55 +1,58 @@ #pragma once #include "AwsTools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" -#include "DeviceSessionItem.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace database { class DatabaseManager { void innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request); template std::shared_ptr innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item, const std::string &key); public: static DatabaseManager &getInstance(); bool isTableAvailable(const std::string &tableName); void putSessionItem(const DeviceSessionItem &item); std::shared_ptr findSessionItem(const std::string &deviceID); void putSessionSignItem(const SessionSignItem &item); std::shared_ptr findSessionSignItem(const std::string &deviceID); void removeSessionSignItem(const std::string &deviceID); void putPublicKeyItem(const PublicKeyItem &item); std::shared_ptr findPublicKeyItem(const std::string &deviceID); void removePublicKeyItem(const std::string &deviceID); + + void putMessageItem(const MessageItem &item); + std::shared_ptr findMessageItem(const std::string &messageID); + void removeMessageItem(const std::string &messageID); }; } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp index bd1a9e093..66cb70bc7 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,223 +1,225 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "Tools.h" namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( - config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE)}; + config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), + config::ConfigManager::getInstance().getParameter( + config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!validateDeviceID(deviceID)) { std::cout << "gRPC: " << "Format validation failed for " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!validateDeviceID(deviceID)) { std::cout << "gRPC: " << "Format validation failed for " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { std::cout << "gRPC: " << "Session sign request not found for deviceID: " << deviceID << std::endl; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { std::cout << "gRPC: " << "The public key doesn't match for deviceID" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { std::cout << "gRPC: " << "Signature for the verification message is not valid" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'NewSession' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!validateSessionID(sessionID)) { std::cout << "gRPC: " << "Format validation failed for " << sessionID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { std::cout << "gRPC: " << "Session " << sessionID << " not found" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); if (!AmqpManager::getInstance().send( request->todeviceid(), clientDeviceID, std::string(request->payload()))) { std::cout << "gRPC: " << "Error while publish the message to AMQP" << std::endl; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'Send' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!validateSessionID(sessionID)) { std::cout << "gRPC: " << "Format validation failed for " << sessionID << std::endl; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { std::cout << "gRPC: " << "Session " << sessionID << " not found" << std::endl; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); std::vector messagesToDeliver; while (1) { messagesToDeliver = DeliveryBroker::getInstance().get(clientDeviceID); for (auto const &message : messagesToDeliver) { tunnelbroker::GetResponse response; response.set_fromdeviceid(message.fromDeviceID); response.set_payload(message.payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } comm::network::AmqpManager::getInstance().ack(message.deliveryTag); } if (!DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { DeliveryBroker::getInstance().remove(clientDeviceID); } DeliveryBroker::getInstance().wait(clientDeviceID); } } catch (std::runtime_error &e) { std::cout << "gRPC: " << "Error while processing 'Get' request: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm