diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -43,6 +43,7 @@ void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); + void putMessageItemsByBatch(std::vector &messageItems); std::shared_ptr findMessageItem(const std::string &messageID); std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -164,6 +164,42 @@ this->innerPutItem(std::make_shared(item), request); } +void DatabaseManager::putMessageItemsByBatch( + std::vector &messageItems) { + std::vector writeRequests; + for (MessageItem &messageItem : messageItems) { + Aws::DynamoDB::Model::PutRequest putRequest; + putRequest.AddItem( + MessageItem::FIELD_MESSAGE_ID, + Aws::DynamoDB::Model::AttributeValue(messageItem.getMessageID())); + putRequest.AddItem( + MessageItem::FIELD_FROM_DEVICE_ID, + Aws::DynamoDB::Model::AttributeValue(messageItem.getFromDeviceID())); + putRequest.AddItem( + MessageItem::FIELD_TO_DEVICE_ID, + Aws::DynamoDB::Model::AttributeValue(messageItem.getToDeviceID())); + putRequest.AddItem( + MessageItem::FIELD_PAYLOAD, + Aws::DynamoDB::Model::AttributeValue(messageItem.getPayload())); + putRequest.AddItem( + MessageItem::FIELD_BLOB_HASHES, + Aws::DynamoDB::Model::AttributeValue(messageItem.getBlobHashes())); + putRequest.AddItem( + MessageItem::FIELD_EXPIRE, + Aws::DynamoDB::Model::AttributeValue(std::to_string( + static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); + putRequest.AddItem( + MessageItem::FIELD_CREATED_AT, + Aws::DynamoDB::Model::AttributeValue( + std::to_string(tools::getCurrentTimestamp()))); + Aws::DynamoDB::Model::WriteRequest curWriteRequest; + curWriteRequest.SetPutRequest(putRequest); + writeRequests.push_back(curWriteRequest); + } + this->innerBatchWriteItem( + MESSAGES_TABLE_NAME, MAX_DYNAMODB_BATCH_ITEMS, writeRequests); +} + std::shared_ptr DatabaseManager::findMessageItem(const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request;