diff --git a/services/comm-services-lib/Cargo.lock b/services/comm-services-lib/Cargo.lock --- a/services/comm-services-lib/Cargo.lock +++ b/services/comm-services-lib/Cargo.lock @@ -449,6 +449,7 @@ "reqwest", "serde", "serde_json", + "tokio", "tracing", ] diff --git a/services/comm-services-lib/Cargo.toml b/services/comm-services-lib/Cargo.toml --- a/services/comm-services-lib/Cargo.toml +++ b/services/comm-services-lib/Cargo.toml @@ -16,6 +16,7 @@ derive_more = "0.99" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } +tokio = "1.24" tracing = "0.1" # blob client dependencies futures-core = { version = "0.3", optional = true } diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -30,6 +30,8 @@ AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), + #[display(fmt = "Maximum retries exceeded")] + MaxRetriesExceeded, } #[derive(Debug)] @@ -318,8 +320,96 @@ }) } -#[cfg(test)] +pub mod batch_operations { + use aws_sdk_dynamodb::{ + error::SdkError, + operation::batch_write_item::BatchWriteItemError, + types::{PutRequest, WriteRequest}, + }; + use std::time::Duration; + use tracing::{debug, trace}; + + /// DynamoDB hard limit for single BatchWriteItem request + const SINGLE_BATCH_ITEM_LIMIT: usize = 25; + + /// Exponential backoff configuration for batch write operation + pub struct ExponentialBackoffConfig { + /// Maximum retry attempts before the function fails. + /// Set this to 0 to disable exponential backoff. + /// Defaults to **8**. + max_attempts: u32, + /// Base wait duration before retry. Defaults to **25ms**. + /// It is doubled with each attempt: 25ms, 50, 100, 200... + base_duration: Duration, + /// Retry on [`ProvisionedThroughputExceededException`]. + /// Defaults to **true**. + /// + /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException + retry_on_provisioned_capacity_exceeded: bool, + } + + impl Default for ExponentialBackoffConfig { + fn default() -> Self { + ExponentialBackoffConfig { + max_attempts: 8, + base_duration: std::time::Duration::from_millis(25), + retry_on_provisioned_capacity_exceeded: true, + } + } + } + + /// internal helper struct + struct ExponentialBackoffHelper<'cfg> { + config: &'cfg ExponentialBackoffConfig, + attempt: u32, + } + + impl<'cfg> ExponentialBackoffHelper<'cfg> { + fn new(config: &'cfg ExponentialBackoffConfig) -> Self { + ExponentialBackoffHelper { config, attempt: 0 } + } + + /// reset counter after successfull operation + fn reset(&mut self) { + self.attempt = 0; + } + + /// increase counter and sleep in case of failure + async fn sleep_and_retry(&mut self) -> Result<(), super::Error> { + let backoff_multiplier = 2u32.pow(self.attempt); + let sleep_duration = self.config.base_duration * backoff_multiplier; + + self.attempt += 1; + if self.attempt > self.config.max_attempts { + tracing::warn!("Retry limit exceeded!"); + return Err(super::Error::MaxRetriesExceeded); + } + tracing::debug!( + attempt = self.attempt, + "Batch failed. Sleeping for {}ms before retrying...", + sleep_duration.as_millis() + ); + tokio::time::sleep(sleep_duration).await; + Ok(()) + } + } + /// Check if transaction failed due to + /// `ProvisionedThroughputExceededException` exception + fn is_provisioned_capacity_exceeded( + err: &SdkError, + ) -> bool { + let SdkError::ServiceError(service_error) = err else { + return false; + }; + matches!( + service_error.err(), + BatchWriteItemError::ProvisionedThroughputExceededException(_) + ) + } +} + +#[cfg(test)] mod tests { use super::*; diff --git a/services/feature-flags/Cargo.lock b/services/feature-flags/Cargo.lock --- a/services/feature-flags/Cargo.lock +++ b/services/feature-flags/Cargo.lock @@ -735,6 +735,7 @@ "derive_more", "serde", "serde_json", + "tokio", "tracing", ]