Page MenuHomePhabricator

D4373.id14570.diff
No OneTemporary

D4373.id14570.diff

diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h
--- a/services/lib/src/DatabaseManagerBase.h
+++ b/services/lib/src/DatabaseManagerBase.h
@@ -25,6 +25,12 @@
innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request);
void innerRemoveItem(const Item &item);
+ void innerBatchWriteItem(
+ const std::string &tableName,
+ const size_t &chunkSize,
+ const size_t &backoffFirstRetryDelay,
+ const size_t &maxBackoffTime,
+ std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests);
};
template <typename T>
diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp
--- a/services/lib/src/DatabaseManagerBase.cpp
+++ b/services/lib/src/DatabaseManagerBase.cpp
@@ -3,9 +3,15 @@
#include "Item.h"
#include <aws/core/utils/Outcome.h>
+#include <aws/dynamodb/model/BatchWriteItemRequest.h>
+#include <aws/dynamodb/model/BatchWriteItemResult.h>
#include <aws/dynamodb/model/DeleteItemRequest.h>
+#include <glog/logging.h>
+#include <chrono>
+#include <cmath>
#include <iostream>
+#include <thread>
namespace comm {
namespace network {
@@ -42,6 +48,58 @@
}
}
+void DatabaseManagerBase::innerBatchWriteItem(
+ const std::string &tableName,
+ const size_t &chunkSize,
+ const size_t &backoffFirstRetryDelay,
+ const size_t &maxBackoffTime,
+ std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests) {
+ // Split write requests to chunks by chunkSize size and write
+ // them by batch
+ Aws::DynamoDB::Model::BatchWriteItemOutcome outcome;
+ std::vector<Aws::DynamoDB::Model::WriteRequest> writeRequestsChunk;
+ std::vector<Aws::DynamoDB::Model::WriteRequest>::iterator chunkPositionStart,
+ chunkPositionEnd;
+ for (size_t i = 0; i < writeRequests.size(); i += chunkSize) {
+ chunkPositionStart = writeRequests.begin() + i;
+ chunkPositionEnd =
+ writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize);
+ writeRequestsChunk = std::vector<Aws::DynamoDB::Model::WriteRequest>(
+ chunkPositionStart, chunkPositionEnd);
+
+ Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest;
+ writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk);
+ outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
+ if (!outcome.IsSuccess()) {
+ throw std::runtime_error(outcome.GetError().GetMessage());
+ }
+
+ size_t delayRetry, delayMs, jitterMs;
+ while (!outcome.GetResult().GetUnprocessedItems().empty()) {
+ if (delayMs == maxBackoffTime) {
+ throw std::runtime_error(
+ "InnerBatchWriteItem error: maximum wait time to put unprocessed "
+ "items to DynamoDB is exceeded.");
+ }
+ jitterMs = std::rand() % 99 + 1;
+ delayRetry++;
+ delayMs = std::min(
+ size_t(backoffFirstRetryDelay * std::pow(2, delayRetry) + jitterMs),
+ maxBackoffTime);
+ LOG(INFO) << "Waiting for a backoff " << delayMs
+ << "ms delay before putting unprocessed items from batch write "
+ "to DynamoDB";
+ std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
+ writeBatchRequest.SetRequestItems(
+ outcome.GetResult().GetUnprocessedItems());
+ outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
+ if (!outcome.IsSuccess()) {
+ throw std::runtime_error(outcome.GetError().GetMessage());
+ }
+ }
+ }
+}
+
} // namespace database
} // namespace network
} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Mon, Dec 2, 10:32 AM (18 h, 3 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2607184
Default Alt Text
D4373.id14570.diff (3 KB)

Event Timeline