Page MenuHomePhabricator

D8894.id30172.diff
No OneTemporary

D8894.id30172.diff

diff --git a/services/comm-services-lib/Cargo.lock b/services/comm-services-lib/Cargo.lock
--- a/services/comm-services-lib/Cargo.lock
+++ b/services/comm-services-lib/Cargo.lock
@@ -449,6 +449,7 @@
"reqwest",
"serde",
"serde_json",
+ "tokio",
"tracing",
]
diff --git a/services/comm-services-lib/Cargo.toml b/services/comm-services-lib/Cargo.toml
--- a/services/comm-services-lib/Cargo.toml
+++ b/services/comm-services-lib/Cargo.toml
@@ -16,6 +16,7 @@
derive_more = "0.99"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
+tokio = "1.24"
tracing = "0.1"
# blob client dependencies
futures-core = { version = "0.3", optional = true }
diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs
--- a/services/comm-services-lib/src/database.rs
+++ b/services/comm-services-lib/src/database.rs
@@ -30,6 +30,8 @@
AwsSdk(DynamoDBError),
#[display(...)]
Attribute(DBItemError),
+ #[display(fmt = "Maximum retries exceeded")]
+ MaxRetriesExceeded,
}
#[derive(Debug)]
@@ -318,8 +320,96 @@
})
}
-#[cfg(test)]
+pub mod batch_operations {
+ use aws_sdk_dynamodb::{
+ error::SdkError,
+ operation::batch_write_item::BatchWriteItemError,
+ types::{PutRequest, WriteRequest},
+ };
+ use std::time::Duration;
+ use tracing::{debug, trace};
+
+ /// DynamoDB hard limit for single BatchWriteItem request
+ const SINGLE_BATCH_ITEM_LIMIT: usize = 25;
+
+ /// Exponential backoff configuration for batch write operation
+ pub struct ExponentialBackoffConfig {
+ /// Maximum retry attempts before the function fails.
+ /// Set this to 0 to disable exponential backoff.
+ /// Defaults to **8**.
+ max_attempts: u32,
+ /// Base wait duration before retry. Defaults to **25ms**.
+ /// It is doubled with each attempt: 25ms, 50, 100, 200...
+ base_duration: Duration,
+ /// Retry on [`ProvisionedThroughputExceededException`].
+ /// Defaults to **true**.
+ ///
+ /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException
+ retry_on_provisioned_capacity_exceeded: bool,
+ }
+
+ impl Default for ExponentialBackoffConfig {
+ fn default() -> Self {
+ ExponentialBackoffConfig {
+ max_attempts: 8,
+ base_duration: std::time::Duration::from_millis(25),
+ retry_on_provisioned_capacity_exceeded: true,
+ }
+ }
+ }
+
+ /// internal helper struct
+ struct ExponentialBackoffHelper<'cfg> {
+ config: &'cfg ExponentialBackoffConfig,
+ attempt: u32,
+ }
+
+ impl<'cfg> ExponentialBackoffHelper<'cfg> {
+ fn new(config: &'cfg ExponentialBackoffConfig) -> Self {
+ ExponentialBackoffHelper { config, attempt: 0 }
+ }
+
+ /// reset counter after successfull operation
+ fn reset(&mut self) {
+ self.attempt = 0;
+ }
+
+ /// increase counter and sleep in case of failure
+ async fn sleep_and_retry(&mut self) -> Result<(), super::Error> {
+ let backoff_multiplier = 2u32.pow(self.attempt);
+ let sleep_duration = self.config.base_duration * backoff_multiplier;
+
+ self.attempt += 1;
+ if self.attempt > self.config.max_attempts {
+ tracing::warn!("Retry limit exceeded!");
+ return Err(super::Error::MaxRetriesExceeded);
+ }
+ tracing::debug!(
+ attempt = self.attempt,
+ "Batch failed. Sleeping for {}ms before retrying...",
+ sleep_duration.as_millis()
+ );
+ tokio::time::sleep(sleep_duration).await;
+ Ok(())
+ }
+ }
+ /// Check if transaction failed due to
+ /// `ProvisionedThroughputExceededException` exception
+ fn is_provisioned_capacity_exceeded(
+ err: &SdkError<BatchWriteItemError>,
+ ) -> bool {
+ let SdkError::ServiceError(service_error) = err else {
+ return false;
+ };
+ matches!(
+ service_error.err(),
+ BatchWriteItemError::ProvisionedThroughputExceededException(_)
+ )
+ }
+}
+
+#[cfg(test)]
mod tests {
use super::*;
diff --git a/services/feature-flags/Cargo.lock b/services/feature-flags/Cargo.lock
--- a/services/feature-flags/Cargo.lock
+++ b/services/feature-flags/Cargo.lock
@@ -735,6 +735,7 @@
"derive_more",
"serde",
"serde_json",
+ "tokio",
"tracing",
]

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 7:03 PM (21 h, 21 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2585838
Default Alt Text
D8894.id30172.diff (4 KB)

Event Timeline