diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index 29a16c522..d4d58aca0 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,32 +1,32 @@ // Assorted constants pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // DynamoDB constants pub const BACKUP_TABLE_NAME: &str = "backup-service-backup"; pub const BACKUP_TABLE_FIELD_USER_ID: &str = "userID"; pub const BACKUP_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const BACKUP_TABLE_FIELD_CREATED: &str = "created"; -pub const BACKUP_TABLE_FIELD_RECOVERY_DATA: &str = "recoveryData"; -pub const BACKUP_TABLE_FIELD_COMPACTION_HOLDER: &str = "compactionHolder"; +pub const BACKUP_TABLE_FIELD_USER_DATA: &str = "userData"; +pub const BACKUP_TABLE_FIELD_USER_KEYS: &str = "userKeys"; pub const BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const BACKUP_TABLE_INDEX_USERID_CREATED: &str = "userID-created-index"; pub const LOG_TABLE_NAME: &str = "backup-service-log"; pub const LOG_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const LOG_TABLE_FIELD_LOG_ID: &str = "logID"; pub const LOG_TABLE_FIELD_PERSISTED_IN_BLOB: &str = "persistedInBlob"; pub const LOG_TABLE_FIELD_VALUE: &str = "value"; pub const LOG_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const LOG_TABLE_FIELD_DATA_HASH: &str = "dataHash"; diff --git a/services/backup/src/database/backup_item.rs b/services/backup/src/database/backup_item.rs index f9186fce8..f2ec37270 100644 --- a/services/backup/src/database/backup_item.rs +++ b/services/backup/src/database/backup_item.rs @@ -1,78 +1,127 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; -use comm_services_lib::database::{DBItemError, TryFromAttribute}; -use std::collections::HashMap; +use comm_services_lib::{ + blob::types::BlobInfo, + database::{DBItemError, TryFromAttribute}, +}; +use std::collections::{HashMap, HashSet}; use crate::constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, - BACKUP_TABLE_FIELD_COMPACTION_HOLDER, BACKUP_TABLE_FIELD_CREATED, - BACKUP_TABLE_FIELD_RECOVERY_DATA, BACKUP_TABLE_FIELD_USER_ID, + BACKUP_TABLE_FIELD_CREATED, BACKUP_TABLE_FIELD_USER_DATA, + BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_FIELD_USER_KEYS, }; #[derive(Clone, Debug)] pub struct BackupItem { pub user_id: String, pub backup_id: String, pub created: DateTime, - pub recovery_data: String, - pub compaction_holder: String, - pub attachment_holders: String, + pub user_keys: BlobInfo, + pub user_data: BlobInfo, + pub attachment_holders: HashSet, } impl BackupItem { pub fn new( user_id: String, backup_id: String, - compaction_holder: String, + user_keys: BlobInfo, + user_data: BlobInfo, + attachment_holders: HashSet, ) -> Self { BackupItem { user_id, backup_id, - compaction_holder, created: chrono::Utc::now(), - // TODO: Recovery data is mocked with random string - recovery_data: crate::utils::generate_random_string( - 20, - &mut rand::thread_rng(), + user_keys, + user_data, + attachment_holders, + } + } +} + +impl From for HashMap { + fn from(value: BackupItem) -> Self { + let mut attrs = HashMap::from([ + ( + BACKUP_TABLE_FIELD_USER_ID.to_string(), + AttributeValue::S(value.user_id), + ), + ( + BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), + AttributeValue::S(value.backup_id), + ), + ( + BACKUP_TABLE_FIELD_CREATED.to_string(), + AttributeValue::S(value.created.to_rfc3339()), + ), + ( + BACKUP_TABLE_FIELD_USER_KEYS.to_string(), + value.user_keys.into(), + ), + ( + BACKUP_TABLE_FIELD_USER_DATA.to_string(), + value.user_data.into(), ), - attachment_holders: String::new(), + ]); + + if !value.attachment_holders.is_empty() { + attrs.insert( + BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), + AttributeValue::Ss(value.attachment_holders.into_iter().collect()), + ); } + + attrs } } -pub fn parse_backup_item( - mut item: HashMap, -) -> Result { - let user_id = String::try_from_attr( - BACKUP_TABLE_FIELD_USER_ID, - item.remove(BACKUP_TABLE_FIELD_USER_ID), - )?; - let backup_id = String::try_from_attr( - BACKUP_TABLE_FIELD_BACKUP_ID, - item.remove(BACKUP_TABLE_FIELD_BACKUP_ID), - )?; - let created = DateTime::::try_from_attr( - BACKUP_TABLE_FIELD_CREATED, - item.remove(BACKUP_TABLE_FIELD_CREATED), - )?; - let recovery_data = String::try_from_attr( - BACKUP_TABLE_FIELD_RECOVERY_DATA, - item.remove(BACKUP_TABLE_FIELD_RECOVERY_DATA), - )?; - let compaction_holder = String::try_from_attr( - BACKUP_TABLE_FIELD_COMPACTION_HOLDER, - item.remove(BACKUP_TABLE_FIELD_COMPACTION_HOLDER), - )?; - let attachment_holders = String::try_from_attr( - BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, - item.remove(BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS), - )?; - Ok(BackupItem { - user_id, - backup_id, - created, - recovery_data, - compaction_holder, - attachment_holders, - }) +impl TryFrom> for BackupItem { + type Error = DBItemError; + + fn try_from( + mut value: HashMap, + ) -> Result { + let user_id = String::try_from_attr( + BACKUP_TABLE_FIELD_USER_ID, + value.remove(BACKUP_TABLE_FIELD_USER_ID), + )?; + let backup_id = String::try_from_attr( + BACKUP_TABLE_FIELD_BACKUP_ID, + value.remove(BACKUP_TABLE_FIELD_BACKUP_ID), + )?; + let created = DateTime::::try_from_attr( + BACKUP_TABLE_FIELD_CREATED, + value.remove(BACKUP_TABLE_FIELD_CREATED), + )?; + + let user_keys = BlobInfo::try_from_attr( + BACKUP_TABLE_FIELD_USER_KEYS, + value.remove(BACKUP_TABLE_FIELD_USER_KEYS), + )?; + let user_data = BlobInfo::try_from_attr( + BACKUP_TABLE_FIELD_USER_DATA, + value.remove(BACKUP_TABLE_FIELD_USER_DATA), + )?; + + let attachments = value.remove(BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS); + let attachment_holders = if attachments.is_some() { + HashSet::::try_from_attr( + BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, + attachments, + )? + } else { + HashSet::new() + }; + + Ok(BackupItem { + user_id, + backup_id, + created, + user_keys, + user_data, + attachment_holders, + }) + } } diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index d0cb48fe9..51be41683 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,308 +1,281 @@ pub mod backup_item; pub mod log_item; use std::collections::HashMap; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::AttributeValue, }; use comm_services_lib::database::Error; use tracing::error; use crate::constants::{ - BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, - BACKUP_TABLE_FIELD_COMPACTION_HOLDER, BACKUP_TABLE_FIELD_CREATED, - BACKUP_TABLE_FIELD_RECOVERY_DATA, BACKUP_TABLE_FIELD_USER_ID, + BACKUP_TABLE_FIELD_BACKUP_ID, BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_INDEX_USERID_CREATED, BACKUP_TABLE_NAME, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_BACKUP_ID, LOG_TABLE_FIELD_DATA_HASH, LOG_TABLE_FIELD_LOG_ID, LOG_TABLE_FIELD_PERSISTED_IN_BLOB, LOG_TABLE_FIELD_VALUE, LOG_TABLE_NAME, }; use self::{ - backup_item::{parse_backup_item, BackupItem}, + backup_item::BackupItem, log_item::{parse_log_item, LogItem}, }; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } // backup item pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { - let item = HashMap::from([ - ( - BACKUP_TABLE_FIELD_USER_ID.to_string(), - AttributeValue::S(backup_item.user_id), - ), - ( - BACKUP_TABLE_FIELD_CREATED.to_string(), - AttributeValue::S(backup_item.created.to_rfc3339()), - ), - ( - BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), - AttributeValue::S(backup_item.backup_id), - ), - ( - BACKUP_TABLE_FIELD_RECOVERY_DATA.to_string(), - AttributeValue::S(backup_item.recovery_data), - ), - ( - BACKUP_TABLE_FIELD_COMPACTION_HOLDER.to_string(), - AttributeValue::S(backup_item.compaction_holder), - ), - ( - BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), - AttributeValue::S(backup_item.attachment_holders), - ), - ]); + let item = backup_item.into(); self .client .put_item() .table_name(BACKUP_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( BACKUP_TABLE_FIELD_USER_ID.to_string(), AttributeValue::S(user_id.to_string()), ), ( BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ]); match self .client .get_item() .table_name(BACKUP_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { - let backup_item = parse_backup_item(item)?; + let backup_item = item.try_into()?; Ok(Some(backup_item)) } _ => Ok(None), } } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(BACKUP_TABLE_NAME) .index_name(BACKUP_TABLE_INDEX_USERID_CREATED) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", BACKUP_TABLE_FIELD_USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .limit(1) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to find last backup"); Error::AwsSdk(e.into()) })?; match response.items.unwrap_or_default().pop() { Some(item) => { - let backup_item = parse_backup_item(item)?; + let backup_item = item.try_into()?; Ok(Some(backup_item)) } None => Ok(None), } } pub async fn remove_backup_item(&self, backup_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BACKUP_TABLE_NAME) .key( BACKUP_TABLE_FIELD_BACKUP_ID, AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } // log item pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { let item = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(log_item.backup_id), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_item.log_id), ), ( LOG_TABLE_FIELD_PERSISTED_IN_BLOB.to_string(), AttributeValue::Bool(log_item.persisted_in_blob), ), ( LOG_TABLE_FIELD_VALUE.to_string(), AttributeValue::S(log_item.value), ), ( LOG_TABLE_FIELD_DATA_HASH.to_string(), AttributeValue::S(log_item.data_hash), ), ( LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::S(log_item.attachment_holders), ), ]); self .client .put_item() .table_name(LOG_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_log_item( &self, backup_id: &str, log_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_id.to_string()), ), ]); match self .client .get_item() .table_name(LOG_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { let log_item = parse_log_item(item)?; Ok(Some(log_item)) } _ => Ok(None), } } pub async fn find_log_items_for_backup( &self, backup_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(LOG_TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", LOG_TABLE_FIELD_BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log items for backup"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(Vec::new()); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for item in response.items.unwrap_or_default() { let log_item = parse_log_item(item)?; results.push(log_item); } Ok(results) } pub async fn remove_log_item(&self, log_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(LOG_TABLE_NAME) .key( LOG_TABLE_FIELD_LOG_ID, AttributeValue::S(log_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove log item"); Error::AwsSdk(e.into()) })?; Ok(()) } } diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs index 8b0409ded..405df941a 100644 --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -1,597 +1,619 @@ use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::{DateTime, Utc}; +use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::num::ParseIntError; use std::str::FromStr; // # Useful type aliases // Rust exports `pub type` only into the so-called "type namespace", but in // order to use them e.g. with the `TryFromAttribute` trait, they also need // to be exported into the "value namespace" which is what `pub use` does. // // To overcome that, a dummy module is created and aliases are re-exported // with `pub use` construct mod aliases { use aws_sdk_dynamodb::types::AttributeValue; use std::collections::HashMap; pub type AttributeMap = HashMap; } pub use self::aliases::AttributeMap; // # Error handling #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(fmt = "Maximum retries exceeded")] MaxRetriesExceeded, } #[derive(Debug)] pub enum Value { AttributeValue(Option), String(String), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: String, attribute_value: Value, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] TimestampOutOfRange, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] InvalidNumberFormat(ParseIntError), } /// Conversion trait for [`AttributeValue`] /// /// Types implementing this trait are able to do the following: /// ```ignore /// use comm_services_lib::database::{TryFromAttribute, AttributeTryInto}; /// /// let foo = SomeType::try_from_attr("MyAttribute", Some(attribute))?; /// /// // if `AttributeTryInto` is imported, also: /// let bar = Some(attribute).attr_try_into("MyAttribute")?; /// ``` pub trait TryFromAttribute: Sized { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result; } /// Do NOT implement this trait directly. Implement [`TryFromAttribute`] instead pub trait AttributeTryInto { fn attr_try_into( self, attribute_name: impl Into, ) -> Result; } // Automatic attr_try_into() for all attribute values // that have TryFromAttribute implemented impl AttributeTryInto for Option { fn attr_try_into( self, attribute_name: impl Into, ) -> Result { T::try_from_attr(attribute_name, self) } } /// Helper trait for extracting attributes from a collection pub trait AttributeExtractor { /// Gets an attribute from the map and tries to convert it to the given type /// This method does not consume the raw attribute - it gets cloned /// See [`AttributeExtractor::take_attr`] for a non-cloning method fn get_attr( &self, attribute_name: &str, ) -> Result; /// Takes an attribute from the map and tries to convert it to the given type /// This method consumes the raw attribute - it gets removed from the map /// See [`AttributeExtractor::get_attr`] for a non-mutating method fn take_attr( &mut self, attribute_name: &str, ) -> Result; } impl AttributeExtractor for AttributeMap { fn get_attr( &self, attribute_name: &str, ) -> Result { T::try_from_attr(attribute_name, self.get(attribute_name).cloned()) } fn take_attr( &mut self, attribute_name: &str, ) -> Result { T::try_from_attr(attribute_name, self.remove(attribute_name)) } } impl TryFromAttribute for String { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for bool { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Bool(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for DateTime { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { match &attribute { Some(AttributeValue::S(datetime)) => datetime.parse().map_err(|e| { DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::InvalidTimestamp(e), ) }), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for AttributeMap { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for Vec { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::B(data)) => Ok(data.into_inner()), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } +impl TryFromAttribute for HashSet { + fn try_from_attr( + attribute_name: impl Into, + attribute_value: Option, + ) -> Result { + match attribute_value { + Some(AttributeValue::Ss(set)) => Ok(set.into_iter().collect()), + Some(_) => Err(DBItemError::new( + attribute_name.into(), + Value::AttributeValue(attribute_value), + DBItemAttributeError::IncorrectType, + )), + None => Err(DBItemError::new( + attribute_name.into(), + Value::AttributeValue(attribute_value), + DBItemAttributeError::Missing, + )), + } + } +} + #[deprecated = "Use `String::try_from_attr()` instead"] pub fn parse_string_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { String::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `bool::try_from_attr()` instead"] pub fn parse_bool_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { bool::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `DateTime::::try_from_attr()` instead"] pub fn parse_datetime_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { DateTime::::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `AttributeMap::try_from_attr()` instead"] pub fn parse_map_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { attribute_value.attr_try_into(attribute_name) } pub fn parse_int_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result where T: FromStr, { match &attribute_value { Some(AttributeValue::N(numeric_str)) => { parse_integer(attribute_name, numeric_str) } Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } /// Parses the UTC timestamp in milliseconds from a DynamoDB numeric attribute pub fn parse_timestamp_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { let attribute_name: String = attribute_name.into(); let timestamp = parse_int_attribute::( attribute_name.clone(), attribute_value.clone(), )?; let naive_datetime = chrono::NaiveDateTime::from_timestamp_millis(timestamp) .ok_or_else(|| { DBItemError::new( attribute_name, Value::AttributeValue(attribute_value), DBItemAttributeError::TimestampOutOfRange, ) })?; Ok(DateTime::from_utc(naive_datetime, Utc)) } pub fn parse_integer( attribute_name: impl Into, attribute_value: &str, ) -> Result where T: FromStr, { attribute_value.parse::().map_err(|e| { DBItemError::new( attribute_name.into(), Value::String(attribute_value.into()), DBItemAttributeError::InvalidNumberFormat(e), ) }) } pub mod batch_operations { use aws_sdk_dynamodb::{ error::SdkError, operation::batch_write_item::BatchWriteItemError, types::WriteRequest, }; use rand::Rng; use std::time::Duration; use tracing::{debug, trace}; /// DynamoDB hard limit for single BatchWriteItem request const SINGLE_BATCH_ITEM_LIMIT: usize = 25; /// Exponential backoff configuration for batch write operation #[derive(derive_more::Constructor, Debug)] pub struct ExponentialBackoffConfig { /// Maximum retry attempts before the function fails. /// Set this to 0 to disable exponential backoff. /// Defaults to **8**. pub max_attempts: u32, /// Base wait duration before retry. Defaults to **25ms**. /// It is doubled with each attempt: 25ms, 50, 100, 200... pub base_duration: Duration, /// Jitter factor for retry delay. Factor 0.5 for 100ms delay /// means that wait time will be between 50ms and 150ms. /// The value must be in range 0.0 - 1.0. It will be clamped /// if out of these bounds. Defaults to **0.3** pub jitter_factor: f32, /// Retry on [`ProvisionedThroughputExceededException`]. /// Defaults to **true**. /// /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException pub retry_on_provisioned_capacity_exceeded: bool, } impl Default for ExponentialBackoffConfig { fn default() -> Self { ExponentialBackoffConfig { max_attempts: 8, base_duration: Duration::from_millis(25), jitter_factor: 0.3, retry_on_provisioned_capacity_exceeded: true, } } } impl ExponentialBackoffConfig { fn new_counter(&self) -> ExponentialBackoffHelper { ExponentialBackoffHelper::new(self) } fn backoff_enabled(&self) -> bool { self.max_attempts > 0 } fn should_retry_on_capacity_exceeded(&self) -> bool { self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded } } /// Performs a single DynamoDB table batch write operation. If the batch /// contains more than 25 items, it is split into chunks. /// /// The function uses exponential backoff retries when AWS throttles /// the request or maximum provisioned capacity is exceeded #[tracing::instrument(name = "batch_write", skip(ddb, requests, config))] pub async fn batch_write( ddb: &aws_sdk_dynamodb::Client, table_name: &str, mut requests: Vec, config: ExponentialBackoffConfig, ) -> Result<(), super::Error> { tracing::debug!( ?config, "Starting batch write operation of {} items...", requests.len() ); let mut exponential_backoff = config.new_counter(); let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT); loop { let items_to_drain = std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT); let chunk = requests.drain(..items_to_drain).collect::>(); if chunk.is_empty() { // No more items tracing::trace!("No more items to process. Exiting"); break; } // we don't need the backup when we don't retry if config.should_retry_on_capacity_exceeded() { chunk.clone_into(&mut backup); } tracing::trace!("Attempting to write chunk of {} items...", chunk.len()); let result = ddb .batch_write_item() .request_items(table_name, chunk) .send() .await; match result { Ok(output) => { if let Some(mut items) = output.unprocessed_items { let requests_to_retry = items.remove(table_name).unwrap_or_default(); if requests_to_retry.is_empty() { tracing::trace!("Chunk written successfully. Continuing."); exponential_backoff.reset(); continue; } exponential_backoff.sleep_and_retry().await?; tracing::debug!( "Some items failed. Retrying {} requests", requests_to_retry.len() ); requests.extend(requests_to_retry); } else { tracing::trace!("Unprocessed items was None"); } } Err(error) => { if !is_provisioned_capacity_exceeded(&error) { tracing::error!("BatchWriteItem failed: {0:?} - {0}", error); return Err(super::Error::AwsSdk(error.into())); } tracing::warn!("Provisioned capacity exceeded!"); if !config.retry_on_provisioned_capacity_exceeded { return Err(super::Error::AwsSdk(error.into())); } exponential_backoff.sleep_and_retry().await?; requests.append(&mut backup); trace!("Retrying now..."); } }; } debug!("Batch write completed."); Ok(()) } /// internal helper struct struct ExponentialBackoffHelper<'cfg> { config: &'cfg ExponentialBackoffConfig, attempt: u32, } impl<'cfg> ExponentialBackoffHelper<'cfg> { fn new(config: &'cfg ExponentialBackoffConfig) -> Self { ExponentialBackoffHelper { config, attempt: 0 } } /// reset counter after successfull operation fn reset(&mut self) { self.attempt = 0; } /// increase counter and sleep in case of failure async fn sleep_and_retry(&mut self) -> Result<(), super::Error> { let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor)); let random_multiplier = 1.0 + rand::thread_rng().gen_range(-jitter_factor..=jitter_factor); let backoff_multiplier = 2u32.pow(self.attempt); let base_duration = self.config.base_duration * backoff_multiplier; let sleep_duration = base_duration.mul_f32(random_multiplier); self.attempt += 1; if self.attempt > self.config.max_attempts { tracing::warn!("Retry limit exceeded!"); return Err(super::Error::MaxRetriesExceeded); } tracing::debug!( attempt = self.attempt, "Batch failed. Sleeping for {}ms before retrying...", sleep_duration.as_millis() ); tokio::time::sleep(sleep_duration).await; Ok(()) } } /// Check if transaction failed due to /// `ProvisionedThroughputExceededException` exception fn is_provisioned_capacity_exceeded( err: &SdkError, ) -> bool { let SdkError::ServiceError(service_error) = err else { return false; }; matches!( service_error.err(), BatchWriteItemError::ProvisionedThroughputExceededException(_) ) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_integer() { assert!(parse_integer::("some_attr", "123").is_ok()); assert!(parse_integer::("negative", "-123").is_ok()); assert!(parse_integer::("float", "3.14").is_err()); assert!(parse_integer::("NaN", "foo").is_err()); assert!(parse_integer::("negative_uint", "-123").is_err()); assert!(parse_integer::("too_large", "65536").is_err()); } #[test] fn test_parse_timestamp() { let timestamp = Utc::now().timestamp_millis(); let attr = AttributeValue::N(timestamp.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_ok()); assert_eq!(parsed_timestamp.unwrap().timestamp_millis(), timestamp); } #[test] fn test_parse_invalid_timestamp() { let attr = AttributeValue::N("foo".to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); } #[test] fn test_parse_timestamp_out_of_range() { let attr = AttributeValue::N(i64::MAX.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); assert!(matches!( parsed_timestamp.unwrap_err().attribute_error, DBItemAttributeError::TimestampOutOfRange )); } }