diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index bb7778faf..55904dfb2 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,36 +1,41 @@ // Assorted constants pub const AWS_REGION: &str = "us-east-2"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; +// 400KiB limit (in docs there is KB but they mean KiB) - +// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html +// This includes both attribute names' and values' lengths +pub const LOG_DATA_SIZE_DATABASE_LIMIT: usize = 1024 * 400; + // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // DynamoDB constants pub const BACKUP_TABLE_NAME: &str = "backup-service-backup"; pub const BACKUP_TABLE_FIELD_USER_ID: &str = "userID"; pub const BACKUP_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const BACKUP_TABLE_FIELD_CREATED: &str = "created"; pub const BACKUP_TABLE_FIELD_RECOVERY_DATA: &str = "recoveryData"; pub const BACKUP_TABLE_FIELD_COMPACTION_HOLDER: &str = "compactionHolder"; pub const BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const BACKUP_TABLE_INDEX_USERID_CREATED: &str = "userID-created-index"; pub const LOG_TABLE_NAME: &str = "backup-service-log"; pub const LOG_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const LOG_TABLE_FIELD_LOG_ID: &str = "logID"; pub const LOG_TABLE_FIELD_PERSISTED_IN_BLOB: &str = "persistedInBlob"; pub const LOG_TABLE_FIELD_VALUE: &str = "value"; pub const LOG_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const LOG_TABLE_FIELD_DATA_HASH: &str = "dataHash"; diff --git a/services/backup/src/database.rs b/services/backup/src/database.rs index 1ed54d500..c0bf7f7c4 100644 --- a/services/backup/src/database.rs +++ b/services/backup/src/database.rs @@ -1,525 +1,573 @@ use aws_sdk_dynamodb::{ model::AttributeValue, output::GetItemOutput, Error as DynamoDBError, }; use chrono::{DateTime, Utc}; use std::{ collections::HashMap, fmt::{Display, Formatter}, sync::Arc, }; use tracing::error; use crate::constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, BACKUP_TABLE_FIELD_COMPACTION_HOLDER, BACKUP_TABLE_FIELD_CREATED, BACKUP_TABLE_FIELD_RECOVERY_DATA, BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_INDEX_USERID_CREATED, BACKUP_TABLE_NAME, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_BACKUP_ID, LOG_TABLE_FIELD_DATA_HASH, LOG_TABLE_FIELD_LOG_ID, LOG_TABLE_FIELD_PERSISTED_IN_BLOB, LOG_TABLE_FIELD_VALUE, LOG_TABLE_NAME, }; #[derive(Clone, Debug)] pub struct BackupItem { pub user_id: String, pub backup_id: String, pub created: DateTime, pub recovery_data: String, pub compaction_holder: String, pub attachment_holders: String, } impl BackupItem { pub fn new( user_id: String, backup_id: String, compaction_holder: String, ) -> Self { BackupItem { user_id, backup_id, compaction_holder, created: chrono::Utc::now(), // TODO: Recovery data is mocked with random string recovery_data: crate::utils::generate_random_string( 20, &mut rand::thread_rng(), ), attachment_holders: String::new(), } } } +static LOG_ITEM_HEADERS_SIZE: usize = { + let mut size: usize = 0; + size += LOG_TABLE_FIELD_BACKUP_ID.as_bytes().len(); + size += LOG_TABLE_FIELD_LOG_ID.as_bytes().len(); + size += LOG_TABLE_FIELD_PERSISTED_IN_BLOB.as_bytes().len(); + size += LOG_TABLE_FIELD_VALUE.as_bytes().len(); + size += LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.as_bytes().len(); + size += LOG_TABLE_FIELD_DATA_HASH.as_bytes().len(); + size +}; + #[derive(Clone, Debug)] pub struct LogItem { pub backup_id: String, pub log_id: String, pub persisted_in_blob: bool, pub value: String, pub attachment_holders: String, pub data_hash: String, } +impl LogItem { + /// Calculates size based on raw log item components, + /// without allocating a new item + pub fn size_from_components( + backup_id: &str, + log_id: &str, + log_hash: &str, + data: &[u8], + ) -> usize { + let mut size: usize = LOG_ITEM_HEADERS_SIZE; + size += backup_id.as_bytes().len(); + size += log_id.as_bytes().len(); + size += data.len(); + size += log_hash.as_bytes().len(); + + // persistent in blob, attachment holders, use defaults here + size += false.to_string().as_bytes().len(); + size += "".as_bytes().len(); + + size + } + + /// Total size of this item in the DynamoDB table. This value must be + /// smaller than [`crate::constants::LOG_DATA_SIZE_DATABASE_LIMIT`] + /// in order to successfully put this item into a DynamoDB database. + pub fn total_size(&self) -> usize { + let mut size: usize = LOG_ITEM_HEADERS_SIZE; + size += self.backup_id.as_bytes().len(); + size += self.log_id.as_bytes().len(); + size += self.persisted_in_blob.to_string().as_bytes().len(); + size += self.value.as_bytes().len(); + size += self.attachment_holders.as_bytes().len(); + size += self.data_hash.as_bytes().len(); + size + } +} + #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } // backup item pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = HashMap::from([ ( BACKUP_TABLE_FIELD_USER_ID.to_string(), AttributeValue::S(backup_item.user_id), ), ( BACKUP_TABLE_FIELD_CREATED.to_string(), AttributeValue::S(backup_item.created.to_rfc3339()), ), ( BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_item.backup_id), ), ( BACKUP_TABLE_FIELD_RECOVERY_DATA.to_string(), AttributeValue::S(backup_item.recovery_data), ), ( BACKUP_TABLE_FIELD_COMPACTION_HOLDER.to_string(), AttributeValue::S(backup_item.compaction_holder), ), ( BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::S(backup_item.attachment_holders), ), ]); self .client .put_item() .table_name(BACKUP_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( BACKUP_TABLE_FIELD_USER_ID.to_string(), AttributeValue::S(user_id.to_string()), ), ( BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ]); match self .client .get_item() .table_name(BACKUP_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { let backup_item = parse_backup_item(item)?; Ok(Some(backup_item)) } _ => Ok(None), } } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(BACKUP_TABLE_NAME) .index_name(BACKUP_TABLE_INDEX_USERID_CREATED) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", BACKUP_TABLE_FIELD_USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .limit(1) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to find last backup"); Error::AwsSdk(e.into()) })?; match response.items.unwrap_or_default().pop() { Some(item) => { let backup_item = parse_backup_item(item)?; Ok(Some(backup_item)) } None => Ok(None), } } pub async fn remove_backup_item(&self, backup_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BACKUP_TABLE_NAME) .key( BACKUP_TABLE_FIELD_BACKUP_ID, AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } // log item pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { let item = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(log_item.backup_id), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_item.log_id), ), ( LOG_TABLE_FIELD_PERSISTED_IN_BLOB.to_string(), AttributeValue::Bool(log_item.persisted_in_blob), ), ( LOG_TABLE_FIELD_VALUE.to_string(), AttributeValue::S(log_item.value), ), ( LOG_TABLE_FIELD_DATA_HASH.to_string(), AttributeValue::S(log_item.data_hash), ), ( LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::S(log_item.attachment_holders), ), ]); self .client .put_item() .table_name(LOG_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_log_item( &self, backup_id: &str, log_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_id.to_string()), ), ]); match self .client .get_item() .table_name(LOG_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { let log_item = parse_log_item(item)?; Ok(Some(log_item)) } _ => Ok(None), } } pub async fn find_log_items_for_backup( &self, backup_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(LOG_TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", LOG_TABLE_FIELD_BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log items for backup"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(Vec::new()); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for item in response.items.unwrap_or_default() { let log_item = parse_log_item(item)?; results.push(log_item); } Ok(results) } pub async fn remove_log_item(&self, log_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(LOG_TABLE_NAME) .key( LOG_TABLE_FIELD_LOG_ID, AttributeValue::S(log_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove log item"); Error::AwsSdk(e.into()) })?; Ok(()) } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: &'static str, attribute_value: Option, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] InvalidTimestamp(chrono::ParseError), } fn parse_string_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } fn parse_bool_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Bool(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } fn parse_datetime_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(datetime)) = &attribute_value { // parse() accepts a relaxed RFC3339 string datetime.parse().map_err(|e| { DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )) } } fn parse_backup_item( mut item: HashMap, ) -> Result { let user_id = parse_string_attribute( BACKUP_TABLE_FIELD_USER_ID, item.remove(BACKUP_TABLE_FIELD_USER_ID), )?; let backup_id = parse_string_attribute( BACKUP_TABLE_FIELD_BACKUP_ID, item.remove(BACKUP_TABLE_FIELD_BACKUP_ID), )?; let created = parse_datetime_attribute( BACKUP_TABLE_FIELD_CREATED, item.remove(BACKUP_TABLE_FIELD_CREATED), )?; let recovery_data = parse_string_attribute( BACKUP_TABLE_FIELD_RECOVERY_DATA, item.remove(BACKUP_TABLE_FIELD_RECOVERY_DATA), )?; let compaction_holder = parse_string_attribute( BACKUP_TABLE_FIELD_COMPACTION_HOLDER, item.remove(BACKUP_TABLE_FIELD_COMPACTION_HOLDER), )?; let attachment_holders = parse_string_attribute( BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, item.remove(BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS), )?; Ok(BackupItem { user_id, backup_id, created, recovery_data, compaction_holder, attachment_holders, }) } fn parse_log_item( mut item: HashMap, ) -> Result { let backup_id = parse_string_attribute( LOG_TABLE_FIELD_BACKUP_ID, item.remove(LOG_TABLE_FIELD_BACKUP_ID), )?; let log_id = parse_string_attribute( LOG_TABLE_FIELD_LOG_ID, item.remove(LOG_TABLE_FIELD_LOG_ID), )?; let persisted_in_blob = parse_bool_attribute( LOG_TABLE_FIELD_PERSISTED_IN_BLOB, item.remove(LOG_TABLE_FIELD_PERSISTED_IN_BLOB), )?; let value = parse_string_attribute( LOG_TABLE_FIELD_VALUE, item.remove(LOG_TABLE_FIELD_VALUE), )?; let data_hash = parse_string_attribute( LOG_TABLE_FIELD_DATA_HASH, item.remove(LOG_TABLE_FIELD_DATA_HASH), )?; let attachment_holders = parse_string_attribute( LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, item.remove(LOG_TABLE_FIELD_ATTACHMENT_HOLDERS), )?; Ok(LogItem { log_id, backup_id, persisted_in_blob, value, data_hash, attachment_holders, }) }