Page MenuHomePhabricator

D8895.id30355.diff
No OneTemporary

D8895.id30355.diff

diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs
--- a/services/comm-services-lib/src/database.rs
+++ b/services/comm-services-lib/src/database.rs
@@ -359,6 +359,103 @@
}
}
+ impl ExponentialBackoffConfig {
+ fn new_counter(&self) -> ExponentialBackoffHelper {
+ ExponentialBackoffHelper::new(self)
+ }
+ fn backoff_enabled(&self) -> bool {
+ self.max_attempts > 0
+ }
+ fn should_retry_on_capacity_exceeded(&self) -> bool {
+ self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded
+ }
+ }
+
+ /// Performs a single DynamoDB table batch write operation. If the batch
+ /// contains more than 25 items, it is splitted into chunks.
+ ///
+ /// The function uses exponential backoff retries when AWS throttles
+ /// the request or maximum provisioned capacity is exceeded
+ #[tracing::instrument(name = "batch_write", skip(ddb, requests, config))]
+ pub async fn batch_write(
+ ddb: &aws_sdk_dynamodb::Client,
+ table_name: &str,
+ mut requests: Vec<WriteRequest>,
+ config: ExponentialBackoffConfig,
+ ) -> Result<(), super::Error> {
+ tracing::debug!(
+ ?config,
+ "Starting batch write operation of {} items...",
+ requests.len()
+ );
+
+ let mut exponential_backoff = config.new_counter();
+ let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT);
+
+ loop {
+ let items_to_drain =
+ std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT);
+ let chunk = requests.drain(..items_to_drain).collect::<Vec<_>>();
+ if chunk.is_empty() {
+ // No more items
+ tracing::trace!("No more items to process. Exiting");
+ break;
+ }
+
+ // we don't need the backup when we don't retry
+ if config.should_retry_on_capacity_exceeded() {
+ chunk.clone_into(&mut backup);
+ }
+
+ tracing::trace!("Attempting to write chunk of {} items...", chunk.len());
+ let result = ddb
+ .batch_write_item()
+ .request_items(table_name, chunk)
+ .send()
+ .await;
+
+ match result {
+ Ok(output) => {
+ if let Some(mut items) = output.unprocessed_items {
+ let requests_to_retry =
+ items.remove(table_name).unwrap_or_default();
+ if requests_to_retry.is_empty() {
+ tracing::trace!("Chunk written successfully. Continuing.");
+ exponential_backoff.reset();
+ continue;
+ }
+
+ exponential_backoff.sleep_and_retry().await?;
+ tracing::debug!(
+ "Some items failed. Retrying {} requests",
+ requests_to_retry.len()
+ );
+ requests.extend(requests_to_retry);
+ } else {
+ tracing::trace!("Unprocessed items was None");
+ }
+ }
+ Err(error) => {
+ if !is_provisioned_capacity_exceeded(&error) {
+ tracing::error!("BatchWriteItem failed: {0:?} - {0}", error);
+ return Err(super::Error::AwsSdk(error.into()));
+ }
+
+ tracing::warn!("Provisioned capacity exceeded!");
+ if !config.retry_on_provisioned_capacity_exceeded {
+ return Err(super::Error::AwsSdk(error.into()));
+ }
+ exponential_backoff.sleep_and_retry().await?;
+ requests.append(&mut backup);
+ trace!("Retrying now...");
+ }
+ };
+ }
+
+ debug!("Batch write completed.");
+ Ok(())
+ }
+
/// internal helper struct
struct ExponentialBackoffHelper<'cfg> {
config: &'cfg ExponentialBackoffConfig,

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 8:41 PM (17 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2564854
Default Alt Text
D8895.id30355.diff (3 KB)

Event Timeline