diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs --- a/services/blob/src/database/errors.rs +++ b/services/blob/src/database/errors.rs @@ -1,7 +1,9 @@ use std::fmt::{Display, Formatter}; use aws_sdk_dynamodb::Error as DynamoDBError; -use comm_lib::database::DBItemError; +use comm_lib::{ + database::DBItemError, tools::exponential_backoff::MaxRetriesExceededError, +}; use crate::s3::S3PathError; @@ -32,6 +34,12 @@ } } +impl From for Error { + fn from(_: MaxRetriesExceededError) -> Self { + Self::MaxRetriesExceeded + } +} + #[derive(Debug)] pub enum BlobDBError { HolderAlreadyExists(String), diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,5 +1,6 @@ use comm_lib::aws::DynamoDBError; use comm_lib::database::DBItemError; +use comm_lib::tools::exponential_backoff::MaxRetriesExceededError; use tracing::error; #[derive( @@ -66,3 +67,9 @@ } } } + +impl From for Error { + fn from(_: MaxRetriesExceededError) -> Self { + Self::MaxRetriesExceeded + } +} diff --git a/shared/comm-lib/Cargo.toml b/shared/comm-lib/Cargo.toml --- a/shared/comm-lib/Cargo.toml +++ b/shared/comm-lib/Cargo.toml @@ -37,7 +37,7 @@ derive_more = { workspace = true } grpc_clients = { path = "../grpc_clients", optional = true } rand = "0.8" -tokio = { workspace = true } +tokio = { workspace = true, features = ["time"] } tracing = { workspace = true } anyhow = { workspace = true } hex = { workspace = true } diff --git a/shared/comm-lib/src/database.rs b/shared/comm-lib/src/database.rs --- a/shared/comm-lib/src/database.rs +++ b/shared/comm-lib/src/database.rs @@ -44,6 +44,13 @@ MaxRetriesExceeded, } +use crate::tools::exponential_backoff::MaxRetriesExceededError; +impl From for Error { + fn from(_: MaxRetriesExceededError) -> Self { + Self::MaxRetriesExceeded + } +} + #[derive(Debug, derive_more::From)] pub enum Value { AttributeValue(Option), @@ -446,61 +453,15 @@ types::{KeysAndAttributes, WriteRequest}, Error as DynamoDBError, }; - use rand::Rng; - use std::time::Duration; use tracing::{debug, trace}; use super::AttributeMap; + pub use crate::tools::exponential_backoff::ExponentialBackoffConfig; /// DynamoDB hard limit for single BatchWriteItem request const SINGLE_BATCH_WRITE_ITEM_LIMIT: usize = 25; const SINGLE_BATCH_GET_ITEM_LIMIT: usize = 100; - /// Exponential backoff configuration for batch write operation - #[derive(derive_more::Constructor, Debug)] - pub struct ExponentialBackoffConfig { - /// Maximum retry attempts before the function fails. - /// Set this to 0 to disable exponential backoff. - /// Defaults to **8**. - pub max_attempts: u32, - /// Base wait duration before retry. Defaults to **25ms**. - /// It is doubled with each attempt: 25ms, 50, 100, 200... - pub base_duration: Duration, - /// Jitter factor for retry delay. Factor 0.5 for 100ms delay - /// means that wait time will be between 50ms and 150ms. - /// The value must be in range 0.0 - 1.0. It will be clamped - /// if out of these bounds. Defaults to **0.3** - pub jitter_factor: f32, - /// Retry on [`ProvisionedThroughputExceededException`]. - /// Defaults to **true**. - /// - /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException - pub retry_on_provisioned_capacity_exceeded: bool, - } - - impl Default for ExponentialBackoffConfig { - fn default() -> Self { - ExponentialBackoffConfig { - max_attempts: 8, - base_duration: Duration::from_millis(25), - jitter_factor: 0.3, - retry_on_provisioned_capacity_exceeded: true, - } - } - } - - impl ExponentialBackoffConfig { - pub fn new_counter(&self) -> ExponentialBackoffHelper { - ExponentialBackoffHelper::new(self) - } - fn backoff_enabled(&self) -> bool { - self.max_attempts > 0 - } - fn should_retry_on_capacity_exceeded(&self) -> bool { - self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded - } - } - #[tracing::instrument(name = "batch_get", skip(ddb, primary_keys, config))] pub async fn batch_get( ddb: &aws_sdk_dynamodb::Client, @@ -696,51 +657,6 @@ Ok(()) } - /// Utility for managing retries with exponential backoff - pub struct ExponentialBackoffHelper<'cfg> { - config: &'cfg ExponentialBackoffConfig, - attempt: u32, - } - - impl<'cfg> ExponentialBackoffHelper<'cfg> { - fn new(config: &'cfg ExponentialBackoffConfig) -> Self { - ExponentialBackoffHelper { config, attempt: 0 } - } - - /// reset counter after successfull operation - pub fn reset(&mut self) { - self.attempt = 0; - } - - /// Returns 1 before the first retry, then 2,3,... after subsequent retries - pub fn attempt(&self) -> u32 { - self.attempt + 1 - } - - /// increase counter and sleep in case of failure - pub async fn sleep_and_retry(&mut self) -> Result<(), super::Error> { - let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor)); - let random_multiplier = - 1.0 + rand::thread_rng().gen_range(-jitter_factor..=jitter_factor); - let backoff_multiplier = 2u32.pow(self.attempt); - let base_duration = self.config.base_duration * backoff_multiplier; - let sleep_duration = base_duration.mul_f32(random_multiplier); - - self.attempt += 1; - if self.attempt > self.config.max_attempts { - tracing::warn!("Retry limit exceeded!"); - return Err(super::Error::MaxRetriesExceeded); - } - tracing::debug!( - attempt = self.attempt, - "Batch failed. Sleeping for {}ms before retrying...", - sleep_duration.as_millis() - ); - tokio::time::sleep(sleep_duration).await; - Ok(()) - } - } - /// Check if transaction failed due to /// `ProvisionedThroughputExceededException` exception fn is_provisioned_capacity_exceeded( diff --git a/shared/comm-lib/src/tools.rs b/shared/comm-lib/src/tools.rs --- a/shared/comm-lib/src/tools.rs +++ b/shared/comm-lib/src/tools.rs @@ -1,5 +1,7 @@ use rand::{distributions::DistString, CryptoRng, Rng}; +pub mod exponential_backoff; + // colon is valid because it is used as a separator // in some backup service identifiers const VALID_IDENTIFIER_CHARS: &[char] = &['_', '-', '=', ':']; diff --git a/shared/comm-lib/src/tools/exponential_backoff.rs b/shared/comm-lib/src/tools/exponential_backoff.rs new file mode 100644 --- /dev/null +++ b/shared/comm-lib/src/tools/exponential_backoff.rs @@ -0,0 +1,102 @@ +use rand::Rng; +use std::time::Duration; + +#[derive(Debug, derive_more::Display, derive_more::Error)] +#[display(fmt = "[Exponential backoff] Maximum retries exceeded")] +pub struct MaxRetriesExceededError; + +/// Exponential backoff configuration for batch write operation +#[derive(derive_more::Constructor, Debug)] +pub struct ExponentialBackoffConfig { + /// Maximum retry attempts before the function fails. + /// Set this to 0 to disable exponential backoff. + /// Defaults to **8**. + pub max_attempts: u32, + /// Base wait duration before retry. Defaults to **25ms**. + /// It is doubled with each attempt: 25ms, 50, 100, 200... + pub base_duration: Duration, + /// Jitter factor for retry delay. Factor 0.5 for 100ms delay + /// means that wait time will be between 50ms and 150ms. + /// The value must be in range 0.0 - 1.0. It will be clamped + /// if out of these bounds. Defaults to **0.3** + pub jitter_factor: f32, + /// Retry on [`ProvisionedThroughputExceededException`]. + /// Defaults to **true**. + /// + /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException + #[cfg(feature = "aws")] + pub retry_on_provisioned_capacity_exceeded: bool, +} + +impl Default for ExponentialBackoffConfig { + fn default() -> Self { + ExponentialBackoffConfig { + max_attempts: 8, + base_duration: Duration::from_millis(25), + jitter_factor: 0.3, + #[cfg(feature = "aws")] + retry_on_provisioned_capacity_exceeded: true, + } + } +} + +impl ExponentialBackoffConfig { + pub fn new_counter(&self) -> ExponentialBackoffHelper { + ExponentialBackoffHelper::new(self) + } + fn backoff_enabled(&self) -> bool { + self.max_attempts > 0 + } + + #[cfg(feature = "aws")] + pub(crate) fn should_retry_on_capacity_exceeded(&self) -> bool { + self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded + } +} + +/// Utility for managing retries with exponential backoff +pub struct ExponentialBackoffHelper<'cfg> { + config: &'cfg ExponentialBackoffConfig, + attempt: u32, +} + +impl<'cfg> ExponentialBackoffHelper<'cfg> { + fn new(config: &'cfg ExponentialBackoffConfig) -> Self { + ExponentialBackoffHelper { config, attempt: 0 } + } + + /// reset counter after successfull operation + pub fn reset(&mut self) { + self.attempt = 0; + } + + /// Returns 1 before the first retry, then 2,3,... after subsequent retries + pub fn attempt(&self) -> u32 { + self.attempt + 1 + } + + /// increase counter and sleep in case of failure + pub async fn sleep_and_retry( + &mut self, + ) -> Result<(), MaxRetriesExceededError> { + let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor)); + let random_multiplier = + 1.0 + rand::thread_rng().gen_range(-jitter_factor..=jitter_factor); + let backoff_multiplier = 2u32.pow(self.attempt); + let base_duration = self.config.base_duration * backoff_multiplier; + let sleep_duration = base_duration.mul_f32(random_multiplier); + + self.attempt += 1; + if self.attempt > self.config.max_attempts { + tracing::warn!("Retry limit exceeded!"); + return Err(MaxRetriesExceededError); + } + tracing::debug!( + attempt = self.attempt, + "Batch failed. Sleeping for {}ms before retrying...", + sleep_duration.as_millis() + ); + tokio::time::sleep(sleep_duration).await; + Ok(()) + } +}