diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h --- a/services/lib/src/DatabaseManagerBase.h +++ b/services/lib/src/DatabaseManagerBase.h @@ -25,6 +25,12 @@ innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); + void innerBatchWriteItem( + const std::string &tableName, + const size_t &chunkSize, + const size_t &backoffStep, + const size_t &maxBackoffTime, + std::vector &writeRequests); }; template diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp --- a/services/lib/src/DatabaseManagerBase.cpp +++ b/services/lib/src/DatabaseManagerBase.cpp @@ -1,11 +1,18 @@ + #include "DatabaseManagerBase.h" #include "Item.h" #include +#include +#include #include +#include +#include +#include #include +#include namespace comm { namespace network { @@ -42,6 +49,57 @@ } } +void DatabaseManagerBase::innerBatchWriteItem( + const std::string &tableName, + const size_t &chunkSize, + const size_t &backoffStep, + const size_t &maxBackoffTime, + std::vector &writeRequests) { + // Split write requests to chunks by chunkSize size and write + // them by batch + Aws::DynamoDB::Model::BatchWriteItemOutcome outcome; + std::vector writeRequestsChunk; + std::vector::iterator chunkPositionStart, + chunkPositionEnd; + for (size_t i = 0; i < writeRequests.size(); i += chunkSize) { + chunkPositionStart = writeRequests.begin() + i; + chunkPositionEnd = + writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize); + writeRequestsChunk = std::vector( + chunkPositionStart, chunkPositionEnd); + + Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest; + writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk); + outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); + if (!outcome.IsSuccess()) { + throw std::runtime_error(outcome.GetError().GetMessage()); + } + + size_t delayRetry, delayMs, jitterMs; + while (!outcome.GetResult().GetUnprocessedItems().empty()) { + if (delayMs = maxBackoffTime) { + throw std::runtime_error( + "InnerBatchWriteItem error: maximum wait time to put unprocessed " + "items to DynamoDB is exceeded."); + } + jitterMs = std::rand() % 99 + 1; + delayRetry++; + delayMs = std::min( + size_t(std::pow(backoffStep, delayRetry) + jitterMs), maxBackoffTime); + LOG(INFO) << "Waiting for a backoff " << delayMs + << "ms delay before putting unprocessed items from batch write " + "to DynamoDB"; + std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); + writeBatchRequest.SetRequestItems( + outcome.GetResult().GetUnprocessedItems()); + outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); + if (!outcome.IsSuccess()) { + throw std::runtime_error(outcome.GetError().GetMessage()); + } + } + } +} + } // namespace database } // namespace network } // namespace comm