diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp index 6a49ead00..38349e472 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -1,238 +1,265 @@ #include "DatabaseManager.h" #include "DynamoDBTools.h" #include "GlobalTools.h" namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionItem(const std::string &sessionID) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { std::shared_ptr item = this->findSessionSignItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { std::shared_ptr item = this->findPublicKeyItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } -void DatabaseManager::putMessageItem(const MessageItem &item) { - Aws::DynamoDB::Model::PutItemRequest request; - request.SetTableName(item.getTableName()); - request.AddItem( +template +T DatabaseManager::populatePutRequestFromMessageItem( + T &putRequest, + const MessageItem &item) { + putRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); + return putRequest; +} + +void DatabaseManager::putMessageItem(const MessageItem &item) { + Aws::DynamoDB::Model::PutItemRequest request; + request = this->populatePutRequestFromMessageItem(request, item); + request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } +void DatabaseManager::putMessageItemsByBatch( + std::vector &messageItems) { + std::vector writeRequests; + for (MessageItem &messageItem : messageItems) { + Aws::DynamoDB::Model::PutRequest putRequest; + putRequest = + this->populatePutRequestFromMessageItem(putRequest, messageItem); + Aws::DynamoDB::Model::WriteRequest writeRequest; + writeRequest.SetPutRequest(putRequest); + writeRequests.push_back(writeRequest); + } + this->innerBatchWriteItem( + MESSAGES_TABLE_NAME, + DYNAMODB_MAX_BATCH_ITEMS, + DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, + DYNAMODB_MAX_BACKOFF_TIME, + writeRequests); +} + std::shared_ptr DatabaseManager::findMessageItem(const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findMessageItemsByReceiver(const std::string &toDeviceID) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(MessageItem().getTableName()); req.SetKeyConditionExpression( MessageItem::FIELD_TO_DEVICE_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", toDeviceID); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName(MessageItem::INDEX_TO_DEVICE_ID); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeMessageItem(const std::string &messageID) { std::shared_ptr item = this->findMessageItem(messageID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID) { std::vector writeRequests; for (std::string &messageID : messageIDs) { Aws::DynamoDB::Model::DeleteRequest deleteRequest; deleteRequest.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); deleteRequest.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); Aws::DynamoDB::Model::WriteRequest currentWriteRequest; currentWriteRequest.SetDeleteRequest(deleteRequest); writeRequests.push_back(currentWriteRequest); } this->innerBatchWriteItem( MESSAGES_TABLE_NAME, DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h index 32d29c4e6..354c60de9 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -1,57 +1,62 @@ #pragma once #include "AwsTools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManagerBase.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace database { class DatabaseManager : public DatabaseManagerBase { +private: + template + T populatePutRequestFromMessageItem(T &putRequest, const MessageItem &item); + public: static DatabaseManager &getInstance(); bool isTableAvailable(const std::string &tableName); void putSessionItem(const DeviceSessionItem &item); std::shared_ptr findSessionItem(const std::string &deviceID); void removeSessionItem(const std::string &sessionID); void putSessionSignItem(const SessionSignItem &item); std::shared_ptr findSessionSignItem(const std::string &deviceID); void removeSessionSignItem(const std::string &deviceID); void putPublicKeyItem(const PublicKeyItem &item); std::shared_ptr findPublicKeyItem(const std::string &deviceID); void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); + void putMessageItemsByBatch(std::vector &messageItems); std::shared_ptr findMessageItem(const std::string &messageID); std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); void removeMessageItem(const std::string &messageID); void removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID); }; } // namespace database } // namespace network } // namespace comm