diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -24,6 +24,10 @@ namespace database { class DatabaseManager : public DatabaseManagerBase { +private: + template + T populatePutRequestFromMessageItem(T &PutRequest, const MessageItem &item); + public: static DatabaseManager &getInstance(); bool isTableAvailable(const std::string &tableName); @@ -43,6 +47,7 @@ void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); + void putMessageItemsByBatch(std::vector &messageItems); std::shared_ptr findMessageItem(const std::string &messageID); std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -135,35 +135,58 @@ this->innerRemoveItem(*item); } -void DatabaseManager::putMessageItem(const MessageItem &item) { - Aws::DynamoDB::Model::PutItemRequest request; - request.SetTableName(item.getTableName()); - request.AddItem( +template +T DatabaseManager::populatePutRequestFromMessageItem( + T &PutRequest, + const MessageItem &item) { + PutRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); - request.AddItem( + PutRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); + return PutRequest; +} + +void DatabaseManager::putMessageItem(const MessageItem &item) { + Aws::DynamoDB::Model::PutItemRequest request; + request = this->populatePutRequestFromMessageItem(request, item); + request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } +void DatabaseManager::putMessageItemsByBatch( + std::vector &messageItems) { + std::vector writeRequests; + for (MessageItem &messageItem : messageItems) { + Aws::DynamoDB::Model::PutRequest putRequest; + putRequest = + this->populatePutRequestFromMessageItem(putRequest, messageItem); + Aws::DynamoDB::Model::WriteRequest curWriteRequest; + curWriteRequest.SetPutRequest(putRequest); + writeRequests.push_back(curWriteRequest); + } + this->innerBatchWriteItem( + MESSAGES_TABLE_NAME, MAX_DYNAMODB_BATCH_ITEMS, writeRequests); +} + std::shared_ptr DatabaseManager::findMessageItem(const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request;