diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -406,15 +406,20 @@ pub mod batch_operations { use aws_sdk_dynamodb::{ - error::SdkError, operation::batch_write_item::BatchWriteItemError, - types::WriteRequest, + error::SdkError, + operation::batch_write_item::BatchWriteItemError, + types::{KeysAndAttributes, WriteRequest}, + Error as DynamoDBError, }; use rand::Rng; use std::time::Duration; use tracing::{debug, trace}; + use super::AttributeMap; + /// DynamoDB hard limit for single BatchWriteItem request - const SINGLE_BATCH_ITEM_LIMIT: usize = 25; + const SINGLE_BATCH_WRITE_ITEM_LIMIT: usize = 25; + const SINGLE_BATCH_GET_ITEM_LIMIT: usize = 100; /// Exponential backoff configuration for batch write operation #[derive(derive_more::Constructor, Debug)] @@ -461,6 +466,117 @@ } } + #[tracing::instrument(name = "batch_get", skip(ddb, primary_keys, config))] + pub async fn batch_get( + ddb: &aws_sdk_dynamodb::Client, + table_name: &str, + primary_keys: K, + projection_expression: Option, + config: ExponentialBackoffConfig, + ) -> Result, super::Error> + where + K: IntoIterator, + K::Item: Into, + { + let mut primary_keys: Vec<_> = + primary_keys.into_iter().map(Into::into).collect(); + let mut results = Vec::with_capacity(primary_keys.len()); + tracing::debug!( + ?config, + "Starting batch read operation of {} items...", + primary_keys.len() + ); + + let mut exponential_backoff = config.new_counter(); + let mut backup = Vec::with_capacity(SINGLE_BATCH_GET_ITEM_LIMIT); + + loop { + let items_to_drain = + std::cmp::min(primary_keys.len(), SINGLE_BATCH_GET_ITEM_LIMIT); + let chunk = primary_keys.drain(..items_to_drain).collect::>(); + if chunk.is_empty() { + // No more items + tracing::trace!("No more items to process. Exiting"); + break; + } + + // we don't need the backup when we don't retry + if config.should_retry_on_capacity_exceeded() { + chunk.clone_into(&mut backup); + } + + tracing::trace!("Attempting to get chunk of {} items...", chunk.len()); + let result = ddb + .batch_get_item() + .request_items( + table_name, + KeysAndAttributes::builder() + .set_keys(Some(chunk)) + .consistent_read(true) + .set_projection_expression(projection_expression.clone()) + .build(), + ) + .send() + .await; + + match result { + Ok(output) => { + if let Some(mut responses) = output.responses { + if let Some(items) = responses.remove(table_name) { + tracing::trace!("Successfully read {} items", items.len()); + results.extend(items); + } + } else { + tracing::warn!("Responses was None"); + } + + if let Some(mut unprocessed) = output.unprocessed_keys { + let keys_to_retry = match unprocessed.remove(table_name) { + Some(KeysAndAttributes { + keys: Some(keys), .. + }) if !keys.is_empty() => keys, + _ => { + tracing::trace!("Chunk read successfully. Continuing."); + exponential_backoff.reset(); + continue; + } + }; + + exponential_backoff.sleep_and_retry().await?; + tracing::debug!( + "Some items failed. Retrying {} requests", + keys_to_retry.len() + ); + primary_keys.extend(keys_to_retry); + } else { + tracing::trace!("Unprocessed items was None"); + } + } + Err(error) => { + let error: DynamoDBError = error.into(); + if !matches!( + error, + DynamoDBError::ProvisionedThroughputExceededException(_) + ) { + tracing::error!("BatchGetItem failed: {0:?} - {0}", error); + return Err(error.into()); + } + + tracing::warn!("Provisioned capacity exceeded!"); + if !config.retry_on_provisioned_capacity_exceeded { + return Err(error.into()); + } + exponential_backoff.sleep_and_retry().await?; + primary_keys.append(&mut backup); + trace!("Retrying now..."); + } + }; + } + + debug!("Batch read completed."); + Ok(results) + } + /// Performs a single DynamoDB table batch write operation. If the batch /// contains more than 25 items, it is split into chunks. /// @@ -480,11 +596,11 @@ ); let mut exponential_backoff = config.new_counter(); - let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT); + let mut backup = Vec::with_capacity(SINGLE_BATCH_WRITE_ITEM_LIMIT); loop { let items_to_drain = - std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT); + std::cmp::min(requests.len(), SINGLE_BATCH_WRITE_ITEM_LIMIT); let chunk = requests.drain(..items_to_drain).collect::>(); if chunk.is_empty() { // No more items