diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -398,6 +398,103 @@ } } + impl ExponentialBackoffConfig { + fn new_counter(&self) -> ExponentialBackoffHelper { + ExponentialBackoffHelper::new(self) + } + fn backoff_enabled(&self) -> bool { + self.max_attempts > 0 + } + fn should_retry_on_capacity_exceeded(&self) -> bool { + self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded + } + } + + /// Performs a single DynamoDB table batch write operation. If the batch + /// contains more than 25 items, it is split into chunks. + /// + /// The function uses exponential backoff retries when AWS throttles + /// the request or maximum provisioned capacity is exceeded + #[tracing::instrument(name = "batch_write", skip(ddb, requests, config))] + pub async fn batch_write( + ddb: &aws_sdk_dynamodb::Client, + table_name: &str, + mut requests: Vec, + config: ExponentialBackoffConfig, + ) -> Result<(), super::Error> { + tracing::debug!( + ?config, + "Starting batch write operation of {} items...", + requests.len() + ); + + let mut exponential_backoff = config.new_counter(); + let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT); + + loop { + let items_to_drain = + std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT); + let chunk = requests.drain(..items_to_drain).collect::>(); + if chunk.is_empty() { + // No more items + tracing::trace!("No more items to process. Exiting"); + break; + } + + // we don't need the backup when we don't retry + if config.should_retry_on_capacity_exceeded() { + chunk.clone_into(&mut backup); + } + + tracing::trace!("Attempting to write chunk of {} items...", chunk.len()); + let result = ddb + .batch_write_item() + .request_items(table_name, chunk) + .send() + .await; + + match result { + Ok(output) => { + if let Some(mut items) = output.unprocessed_items { + let requests_to_retry = + items.remove(table_name).unwrap_or_default(); + if requests_to_retry.is_empty() { + tracing::trace!("Chunk written successfully. Continuing."); + exponential_backoff.reset(); + continue; + } + + exponential_backoff.sleep_and_retry().await?; + tracing::debug!( + "Some items failed. Retrying {} requests", + requests_to_retry.len() + ); + requests.extend(requests_to_retry); + } else { + tracing::trace!("Unprocessed items was None"); + } + } + Err(error) => { + if !is_provisioned_capacity_exceeded(&error) { + tracing::error!("BatchWriteItem failed: {0:?} - {0}", error); + return Err(super::Error::AwsSdk(error.into())); + } + + tracing::warn!("Provisioned capacity exceeded!"); + if !config.retry_on_provisioned_capacity_exceeded { + return Err(super::Error::AwsSdk(error.into())); + } + exponential_backoff.sleep_and_retry().await?; + requests.append(&mut backup); + trace!("Retrying now..."); + } + }; + } + + debug!("Batch write completed."); + Ok(()) + } + /// internal helper struct struct ExponentialBackoffHelper<'cfg> { config: &'cfg ExponentialBackoffConfig,