diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h --- a/services/lib/src/DatabaseManagerBase.h +++ b/services/lib/src/DatabaseManagerBase.h @@ -25,6 +25,10 @@ innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); + void innerBatchWriteItem( + const std::string &tableName, + const size_t &chunkSize, + std::vector &writeRequests); }; template diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp --- a/services/lib/src/DatabaseManagerBase.cpp +++ b/services/lib/src/DatabaseManagerBase.cpp @@ -3,6 +3,8 @@ #include "Item.h" #include +#include +#include #include #include @@ -42,6 +44,41 @@ } } +void DatabaseManagerBase::innerBatchWriteItem( + const std::string &tableName, + const size_t &chunkSize, + std::vector &writeRequests) { + // Split write requests to chunks by chunkSize size and write + // them by batch + Aws::DynamoDB::Model::BatchWriteItemOutcome outcome; + std::vector writeRequestsChunk; + std::vector::iterator chunkPositionStart, + chunkPositionEnd; + for (size_t i = 0; i < writeRequests.size(); i += chunkSize) { + chunkPositionStart = writeRequests.begin() + i; + chunkPositionEnd = + writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize); + writeRequestsChunk = std::vector( + chunkPositionStart, chunkPositionEnd); + + Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest; + writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk); + outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); + if (!outcome.IsSuccess()) { + throw std::runtime_error(outcome.GetError().GetMessage()); + } + + while (!outcome.GetResult().GetUnprocessedItems().empty()) { + writeBatchRequest.SetRequestItems( + outcome.GetResult().GetUnprocessedItems()); + outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); + if (!outcome.IsSuccess()) { + throw std::runtime_error(outcome.GetError().GetMessage()); + } + } + } +} + } // namespace database } // namespace network } // namespace comm