diff --git a/services/blob/src/DatabaseManager.cpp b/services/blob/src/DatabaseManager.cpp index a0c420bcf..aadb44a9c 100644 --- a/services/blob/src/DatabaseManager.cpp +++ b/services/blob/src/DatabaseManager.cpp @@ -1,120 +1,120 @@ #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } void DatabaseManager::putBlobItem(const BlobItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BlobItem::TABLE_NAME); request.AddItem( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); request.AddItem( BlobItem::FIELD_S3_PATH, Aws::DynamoDB::Model::AttributeValue(item.getS3Path().getFullPath())); request.AddItem( BlobItem::FIELD_CREATED, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findBlobItem(const std::string &blobHash) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(blobHash)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } void DatabaseManager::removeBlobItem(const std::string &blobHash) { std::shared_ptr item = this->findBlobItem(blobHash); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) { if (this->findReverseIndexItemByHolder(item.getHolder()) != nullptr) { throw std::runtime_error( "An item for the given holder [" + item.getHolder() + "] already exists"); } Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(ReverseIndexItem::TABLE_NAME); request.AddItem( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(item.getHolder())); request.AddItem( ReverseIndexItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(holder)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } std::vector> DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(ReverseIndexItem::TABLE_NAME); req.SetKeyConditionExpression("blobHash = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", blobHash); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName("blobHash-index"); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeReverseIndexItem(const std::string &holder) { std::shared_ptr item = findReverseIndexItemByHolder(holder); if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h index f7fd35105..b27a5c94c 100644 --- a/services/lib/src/DatabaseManagerBase.h +++ b/services/lib/src/DatabaseManagerBase.h @@ -1,56 +1,56 @@ #pragma once #include "DatabaseEntitiesTools.h" #include "DynamoDBTools.h" #include #include #include #include namespace comm { namespace network { namespace database { // this class should be thread-safe in case any shared resources appear class DatabaseManagerBase { protected: void innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request); template std::shared_ptr innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); void innerBatchWriteItem( const std::string &tableName, const size_t &chunkSize, const size_t &backoffFirstRetryDelay, const size_t &maxBackoffTime, std::vector &writeRequests); }; template std::shared_ptr DatabaseManagerBase::innerFindItem( Aws::DynamoDB::Model::GetItemRequest &request) { std::shared_ptr item = createItemByType(); request.SetTableName(item->getTableName()); const Aws::DynamoDB::Model::GetItemOutcome &outcome = getDynamoDBClient()->GetItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); if (!outcomeItem.size()) { return nullptr; } item->assignItemFromDatabase(outcomeItem); - return std::move(item); + return item; } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp index 6817ddb19..44b4b1f4f 100644 --- a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp @@ -1,351 +1,351 @@ #include "DatabaseManager.h" #include "DynamoDBTools.h" #include "GlobalTools.h" #include namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); request.AddItem( DeviceSessionItem::FIELD_IS_ONLINE, Aws::DynamoDB::Model::AttributeValue().SetBool(false)); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } void DatabaseManager::removeSessionItem(const std::string &sessionID) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::updateSessionItemIsOnline( const std::string &sessionID, bool isOnline) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { LOG(ERROR) << "Can't find for update sessionItem for sessionID: " << sessionID; return; } Aws::DynamoDB::Model::UpdateItemRequest request; request.SetTableName(item->getTableName()); Aws::DynamoDB::Model::AttributeValue attributeKeyValue; attributeKeyValue.SetS(sessionID); request.AddKey(DeviceSessionItem::FIELD_SESSION_ID, attributeKeyValue); Aws::String update_expression("SET #a = :valueA"); request.SetUpdateExpression(update_expression); Aws::Map expressionAttributeNames; expressionAttributeNames["#a"] = DeviceSessionItem::FIELD_IS_ONLINE; request.SetExpressionAttributeNames(expressionAttributeNames); Aws::DynamoDB::Model::AttributeValue attributeUpdatedValue; attributeUpdatedValue.SetBool(isOnline); Aws::Map expressionAttributeValue; expressionAttributeValue[":valueA"] = attributeUpdatedValue; request.SetExpressionAttributeValues(expressionAttributeValue); const Aws::DynamoDB::Model::UpdateItemOutcome &result = getDynamoDBClient()->UpdateItem(request); if (!result.IsSuccess()) { LOG(ERROR) << "Error updating device online status at " "`updateSessionItemIsOnline`: " << result.GetError().GetMessage(); } } bool DatabaseManager::updateSessionItemDeviceToken( const std::string &sessionID, const std::string &newDeviceToken) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { LOG(ERROR) << "Can't find for update sessionItem for sessionID: " << sessionID; return false; } Aws::DynamoDB::Model::UpdateItemRequest request; request.SetTableName(item->getTableName()); Aws::DynamoDB::Model::AttributeValue attributeKeyValue; attributeKeyValue.SetS(sessionID); request.AddKey(DeviceSessionItem::FIELD_SESSION_ID, attributeKeyValue); Aws::String update_expression("SET #a = :valueA"); request.SetUpdateExpression(update_expression); Aws::Map expressionAttributeNames; expressionAttributeNames["#a"] = DeviceSessionItem::FIELD_NOTIFY_TOKEN; request.SetExpressionAttributeNames(expressionAttributeNames); Aws::DynamoDB::Model::AttributeValue attributeUpdatedValue; attributeUpdatedValue.SetS(newDeviceToken); Aws::Map expressionAttributeValue; expressionAttributeValue[":valueA"] = attributeUpdatedValue; request.SetExpressionAttributeValues(expressionAttributeValue); const Aws::DynamoDB::Model::UpdateItemOutcome &result = getDynamoDBClient()->UpdateItem(request); if (!result.IsSuccess()) { LOG(ERROR) << "Error updating device token at updateSessionItemDeviceToken: " << result.GetError().GetMessage(); return false; } return true; } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { std::shared_ptr item = this->findSessionSignItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { std::shared_ptr item = this->findPublicKeyItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } template T DatabaseManager::populatePutRequestFromMessageItem( T &putRequest, const MessageItem &item) { putRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); putRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); putRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); putRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); putRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); putRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); putRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); return putRequest; } void DatabaseManager::putMessageItem(const MessageItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request = this->populatePutRequestFromMessageItem(request, item); request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } void DatabaseManager::putMessageItemsByBatch( std::vector &messageItems) { std::vector writeRequests; for (MessageItem &messageItem : messageItems) { Aws::DynamoDB::Model::PutRequest putRequest; putRequest = this->populatePutRequestFromMessageItem(putRequest, messageItem); Aws::DynamoDB::Model::WriteRequest writeRequest; writeRequest.SetPutRequest(putRequest); writeRequests.push_back(writeRequest); } this->innerBatchWriteItem( messageItems[0].getTableName(), DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } std::shared_ptr DatabaseManager::findMessageItem( const std::string &toDeviceID, const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); request.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); - return std::move(this->innerFindItem(request)); + return this->innerFindItem(request); } std::vector> DatabaseManager::findMessageItemsByReceiver(const std::string &toDeviceID) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(MessageItem().getTableName()); req.SetKeyConditionExpression( MessageItem::FIELD_TO_DEVICE_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", toDeviceID); req.SetExpressionAttributeValues(attributeValues); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeMessageItem( const std::string &toDeviceID, const std::string &messageID) { std::shared_ptr item = this->findMessageItem(toDeviceID, messageID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID) { std::vector writeRequests; for (std::string &messageID : messageIDs) { Aws::DynamoDB::Model::DeleteRequest deleteRequest; deleteRequest.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); deleteRequest.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); Aws::DynamoDB::Model::WriteRequest currentWriteRequest; currentWriteRequest.SetDeleteRequest(deleteRequest); writeRequests.push_back(currentWriteRequest); } this->innerBatchWriteItem( MessageItem().getTableName(), DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } } // namespace database } // namespace network } // namespace comm