diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -24,6 +24,10 @@ namespace database { class DatabaseManager : public DatabaseManagerBase { +private: + template + T populatePutRequestFromMessageItem(T &putRequest, const MessageItem &item); + public: static DatabaseManager &getInstance(); bool isTableAvailable(const std::string &tableName); @@ -43,6 +47,7 @@ void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); + void putMessageItemsByBatch(std::vector &messageItems); std::shared_ptr findMessageItem(const std::string &messageID); std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -135,35 +135,62 @@ this->innerRemoveItem(*item); } -void DatabaseManager::putMessageItem(const MessageItem &item) { - Aws::DynamoDB::Model::PutItemRequest request; - request.SetTableName(item.getTableName()); - request.AddItem( +template +T DatabaseManager::populatePutRequestFromMessageItem( + T &putRequest, + const MessageItem &item) { + putRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); - request.AddItem( + putRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); + return putRequest; +} + +void DatabaseManager::putMessageItem(const MessageItem &item) { + Aws::DynamoDB::Model::PutItemRequest request; + request = this->populatePutRequestFromMessageItem(request, item); + request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } +void DatabaseManager::putMessageItemsByBatch( + std::vector &messageItems) { + std::vector writeRequests; + for (MessageItem &messageItem : messageItems) { + Aws::DynamoDB::Model::PutRequest putRequest; + putRequest = + this->populatePutRequestFromMessageItem(putRequest, messageItem); + Aws::DynamoDB::Model::WriteRequest writeRequest; + writeRequest.SetPutRequest(putRequest); + writeRequests.push_back(writeRequest); + } + this->innerBatchWriteItem( + MESSAGES_TABLE_NAME, + DYNAMODB_MAX_BATCH_ITEMS, + DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, + DYNAMODB_MAX_BACKOFF_TIME, + writeRequests); +} + std::shared_ptr DatabaseManager::findMessageItem(const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request;