diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index 0fbb3bf60..6d192725c 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,32 +1,36 @@ // Assorted constants pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // DynamoDB constants +pub mod backup_table { + pub const TABLE_NAME: &str = "backup-service-backup"; + pub const CREATED_INDEX: &str = "userID-created-index"; -pub const BACKUP_TABLE_NAME: &str = "backup-service-backup"; -pub const BACKUP_TABLE_FIELD_USER_ID: &str = "userID"; -pub const BACKUP_TABLE_FIELD_BACKUP_ID: &str = "backupID"; -pub const BACKUP_TABLE_FIELD_CREATED: &str = "created"; -pub const BACKUP_TABLE_FIELD_USER_DATA: &str = "userData"; -pub const BACKUP_TABLE_FIELD_USER_KEYS: &str = "userKeys"; -pub const BACKUP_TABLE_FIELD_ATTACHMENTS: &str = "attachments"; -pub const BACKUP_TABLE_INDEX_USERID_CREATED: &str = "userID-created-index"; + pub mod attr { + pub const USER_ID: &str = "userID"; + pub const BACKUP_ID: &str = "backupID"; + pub const CREATED: &str = "created"; + pub const USER_DATA: &str = "userData"; + pub const USER_KEYS: &str = "userKeys"; + pub const ATTACHMENTS: &str = "attachments"; + } +} pub const LOG_TABLE_NAME: &str = "backup-service-log"; pub const LOG_TABLE_FIELD_BACKUP_ID: &str = "backupID"; pub const LOG_TABLE_FIELD_LOG_ID: &str = "logID"; pub const LOG_TABLE_FIELD_PERSISTED_IN_BLOB: &str = "persistedInBlob"; pub const LOG_TABLE_FIELD_VALUE: &str = "value"; pub const LOG_TABLE_FIELD_ATTACHMENT_HOLDERS: &str = "attachmentHolders"; pub const LOG_TABLE_FIELD_DATA_HASH: &str = "dataHash"; diff --git a/services/backup/src/database/backup_item.rs b/services/backup/src/database/backup_item.rs index d4ea6da7a..feb853558 100644 --- a/services/backup/src/database/backup_item.rs +++ b/services/backup/src/database/backup_item.rs @@ -1,188 +1,184 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_services_lib::{ blob::{client::BlobServiceClient, types::BlobInfo}, database::{AttributeTryInto, DBItemError, TryFromAttribute}, }; use std::collections::HashMap; -use crate::constants::{ - BACKUP_TABLE_FIELD_ATTACHMENTS, BACKUP_TABLE_FIELD_BACKUP_ID, - BACKUP_TABLE_FIELD_CREATED, BACKUP_TABLE_FIELD_USER_DATA, - BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_FIELD_USER_KEYS, -}; +use crate::constants::backup_table; #[derive(Clone, Debug)] pub struct BackupItem { pub user_id: String, pub backup_id: String, pub created: DateTime, pub user_keys: BlobInfo, pub user_data: BlobInfo, pub attachments: Vec, } impl BackupItem { pub fn new( user_id: String, backup_id: String, user_keys: BlobInfo, user_data: BlobInfo, attachments: Vec, ) -> Self { BackupItem { user_id, backup_id, created: chrono::Utc::now(), user_keys, user_data, attachments, } } pub async fn revoke_holders(self, blob_client: &BlobServiceClient) { blob_client .schedule_revoke_holder(self.user_keys.blob_hash, self.user_keys.holder); blob_client .schedule_revoke_holder(self.user_data.blob_hash, self.user_data.holder); for attachment_info in self.attachments { blob_client.schedule_revoke_holder( attachment_info.blob_hash, attachment_info.holder, ); } } } impl From for HashMap { fn from(value: BackupItem) -> Self { let mut attrs = HashMap::from([ ( - BACKUP_TABLE_FIELD_USER_ID.to_string(), + backup_table::attr::USER_ID.to_string(), AttributeValue::S(value.user_id), ), ( - BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), + backup_table::attr::BACKUP_ID.to_string(), AttributeValue::S(value.backup_id), ), ( - BACKUP_TABLE_FIELD_CREATED.to_string(), + backup_table::attr::CREATED.to_string(), AttributeValue::S(value.created.to_rfc3339()), ), ( - BACKUP_TABLE_FIELD_USER_KEYS.to_string(), + backup_table::attr::USER_KEYS.to_string(), value.user_keys.into(), ), ( - BACKUP_TABLE_FIELD_USER_DATA.to_string(), + backup_table::attr::USER_DATA.to_string(), value.user_data.into(), ), ]); if !value.attachments.is_empty() { attrs.insert( - BACKUP_TABLE_FIELD_ATTACHMENTS.to_string(), + backup_table::attr::ATTACHMENTS.to_string(), AttributeValue::L( value .attachments .into_iter() .map(AttributeValue::from) .collect(), ), ); } attrs } } impl TryFrom> for BackupItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let user_id = String::try_from_attr( - BACKUP_TABLE_FIELD_USER_ID, - value.remove(BACKUP_TABLE_FIELD_USER_ID), + backup_table::attr::USER_ID, + value.remove(backup_table::attr::USER_ID), )?; let backup_id = String::try_from_attr( - BACKUP_TABLE_FIELD_BACKUP_ID, - value.remove(BACKUP_TABLE_FIELD_BACKUP_ID), + backup_table::attr::BACKUP_ID, + value.remove(backup_table::attr::BACKUP_ID), )?; let created = DateTime::::try_from_attr( - BACKUP_TABLE_FIELD_CREATED, - value.remove(BACKUP_TABLE_FIELD_CREATED), + backup_table::attr::CREATED, + value.remove(backup_table::attr::CREATED), )?; let user_keys = BlobInfo::try_from_attr( - BACKUP_TABLE_FIELD_USER_KEYS, - value.remove(BACKUP_TABLE_FIELD_USER_KEYS), + backup_table::attr::USER_KEYS, + value.remove(backup_table::attr::USER_KEYS), )?; let user_data = BlobInfo::try_from_attr( - BACKUP_TABLE_FIELD_USER_DATA, - value.remove(BACKUP_TABLE_FIELD_USER_DATA), + backup_table::attr::USER_DATA, + value.remove(backup_table::attr::USER_DATA), )?; - let attachments = value.remove(BACKUP_TABLE_FIELD_ATTACHMENTS); + let attachments = value.remove(backup_table::attr::ATTACHMENTS); let attachments = if attachments.is_some() { - attachments.attr_try_into(BACKUP_TABLE_FIELD_ATTACHMENTS)? + attachments.attr_try_into(backup_table::attr::ATTACHMENTS)? } else { Vec::new() }; Ok(BackupItem { user_id, backup_id, created, user_keys, user_data, attachments, }) } } /// Corresponds to the items in the [`crate::constants::BACKUP_TABLE_INDEX_USERID_CREATED`] /// global index #[derive(Clone, Debug)] pub struct OrderedBackupItem { pub user_id: String, pub created: DateTime, pub backup_id: String, pub user_keys: BlobInfo, } impl TryFrom> for OrderedBackupItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let user_id = String::try_from_attr( - BACKUP_TABLE_FIELD_USER_ID, - value.remove(BACKUP_TABLE_FIELD_USER_ID), + backup_table::attr::USER_ID, + value.remove(backup_table::attr::USER_ID), )?; let created = DateTime::::try_from_attr( - BACKUP_TABLE_FIELD_CREATED, - value.remove(BACKUP_TABLE_FIELD_CREATED), + backup_table::attr::CREATED, + value.remove(backup_table::attr::CREATED), )?; let backup_id = String::try_from_attr( - BACKUP_TABLE_FIELD_BACKUP_ID, - value.remove(BACKUP_TABLE_FIELD_BACKUP_ID), + backup_table::attr::BACKUP_ID, + value.remove(backup_table::attr::BACKUP_ID), )?; let user_keys = BlobInfo::try_from_attr( - BACKUP_TABLE_FIELD_USER_KEYS, - value.remove(BACKUP_TABLE_FIELD_USER_KEYS), + backup_table::attr::USER_KEYS, + value.remove(backup_table::attr::USER_KEYS), )?; Ok(OrderedBackupItem { user_id, created, backup_id, user_keys, }) } } diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index 0a6408cef..bf5abeecf 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,366 +1,364 @@ pub mod backup_item; pub mod log_item; use std::collections::HashMap; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::{AttributeValue, ReturnValue}, }; use comm_services_lib::database::Error; use tracing::{error, trace, warn}; use crate::constants::{ - BACKUP_TABLE_FIELD_BACKUP_ID, BACKUP_TABLE_FIELD_USER_ID, - BACKUP_TABLE_INDEX_USERID_CREATED, BACKUP_TABLE_NAME, - LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_BACKUP_ID, + backup_table, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_BACKUP_ID, LOG_TABLE_FIELD_DATA_HASH, LOG_TABLE_FIELD_LOG_ID, LOG_TABLE_FIELD_PERSISTED_IN_BLOB, LOG_TABLE_FIELD_VALUE, LOG_TABLE_NAME, }; use self::{ backup_item::{BackupItem, OrderedBackupItem}, log_item::{parse_log_item, LogItem}, }; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } // backup item pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = backup_item.into(); self .client .put_item() - .table_name(BACKUP_TABLE_NAME) + .table_name(backup_table::TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = Self::get_item_key(user_id, backup_id); let output = self .client .get_item() - .table_name(BACKUP_TABLE_NAME) + .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })?; let GetItemOutput { item: Some(item), .. } = output else { return Ok(None) }; let backup_item = item.try_into()?; Ok(Some(backup_item)) } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() - .table_name(BACKUP_TABLE_NAME) - .index_name(BACKUP_TABLE_INDEX_USERID_CREATED) + .table_name(backup_table::TABLE_NAME) + .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") - .expression_attribute_names("#userID", BACKUP_TABLE_FIELD_USER_ID) + .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .limit(1) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to find last backup"); Error::AwsSdk(e.into()) })?; match response.items.unwrap_or_default().pop() { Some(item) => { let backup_item = item.try_into()?; Ok(Some(backup_item)) } None => Ok(None), } } pub async fn remove_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = Self::get_item_key(user_id, backup_id); let response = self .client .delete_item() - .table_name(BACKUP_TABLE_NAME) + .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; response .attributes .map(BackupItem::try_from) .transpose() .map_err(Error::from) } /// For the purposes of the initial backup version this function /// removes all backups except for the latest one pub async fn remove_old_backups( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() - .table_name(BACKUP_TABLE_NAME) - .index_name(BACKUP_TABLE_INDEX_USERID_CREATED) + .table_name(backup_table::TABLE_NAME) + .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") - .expression_attribute_names("#userID", BACKUP_TABLE_FIELD_USER_ID) + .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to fetch backups"); Error::AwsSdk(e.into()) })?; if response.last_evaluated_key().is_some() { // In the intial version of the backup service this function will be run // for every new backup (each user only has one backup), so this shouldn't // happen warn!("Not all old backups have been cleaned up"); } let items = response .items .unwrap_or_default() .into_iter() .map(OrderedBackupItem::try_from) .collect::, _>>()?; let mut removed_backups = vec![]; let Some(latest) = items.iter().map(|item| item.created).max() else { return Ok(removed_backups); }; for item in items { if item.created == latest { trace!( "Skipping removal of the latest backup item: {}", item.backup_id ); continue; } trace!("Removing backup item: {item:?}"); if let Some(backup) = self.remove_backup_item(user_id, &item.backup_id).await? { removed_backups.push(backup); } else { warn!("Backup was found during query, but wasn't found when deleting") }; } Ok(removed_backups) } fn get_item_key( user_id: &str, backup_id: &str, ) -> HashMap { HashMap::from([ ( - BACKUP_TABLE_FIELD_USER_ID.to_string(), + backup_table::attr::USER_ID.to_string(), AttributeValue::S(user_id.to_string()), ), ( - BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), + backup_table::attr::BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ]) } // log item pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { let item = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(log_item.backup_id), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_item.log_id), ), ( LOG_TABLE_FIELD_PERSISTED_IN_BLOB.to_string(), AttributeValue::Bool(log_item.persisted_in_blob), ), ( LOG_TABLE_FIELD_VALUE.to_string(), AttributeValue::S(log_item.value), ), ( LOG_TABLE_FIELD_DATA_HASH.to_string(), AttributeValue::S(log_item.data_hash), ), ( LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::S(log_item.attachment_holders), ), ]); self .client .put_item() .table_name(LOG_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_log_item( &self, backup_id: &str, log_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_id.to_string()), ), ]); match self .client .get_item() .table_name(LOG_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { let log_item = parse_log_item(item)?; Ok(Some(log_item)) } _ => Ok(None), } } pub async fn find_log_items_for_backup( &self, backup_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(LOG_TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", LOG_TABLE_FIELD_BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log items for backup"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(Vec::new()); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for item in response.items.unwrap_or_default() { let log_item = parse_log_item(item)?; results.push(log_item); } Ok(results) } pub async fn remove_log_item(&self, log_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(LOG_TABLE_NAME) .key( LOG_TABLE_FIELD_LOG_ID, AttributeValue::S(log_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove log item"); Error::AwsSdk(e.into()) })?; Ok(()) } }