diff --git a/services/backup/src/database/log_item.rs b/services/backup/src/database/log_item.rs index 1cab83768..f5ba3a4d4 100644 --- a/services/backup/src/database/log_item.rs +++ b/services/backup/src/database/log_item.rs @@ -1,107 +1,114 @@ use crate::constants::log_table::attr; use aws_sdk_dynamodb::types::AttributeValue; use comm_lib::{ blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, constants::DDB_ITEM_SIZE_LIMIT, database::{ blob::BlobOrDBContent, calculate_size_in_db, parse_int_attribute, AttributeExtractor, AttributeTryInto, DBItemError, }, }; use std::collections::HashMap; use tracing::debug; #[derive(Clone, Debug)] pub struct LogItem { pub backup_id: String, pub log_id: usize, pub content: BlobOrDBContent, pub attachments: Vec, } impl LogItem { pub async fn ensure_size_constraints( &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { if let Ok(size) = calculate_size_in_db(&self.clone().into()) { if size < DDB_ITEM_SIZE_LIMIT { return Ok(()); }; } debug!( log_id = ?self.log_id, "Log content exceeds DDB item size limit, moving to blob storage" ); self.content.move_to_blob(blob_client).await } -} -impl From for HashMap { - fn from(value: LogItem) -> Self { - let mut attrs = HashMap::from([ + pub fn item_key( + backup_id: impl Into, + log_id: usize, + ) -> HashMap { + HashMap::from([ ( attr::BACKUP_ID.to_string(), - AttributeValue::S(value.backup_id), + AttributeValue::S(backup_id.into()), ), ( attr::LOG_ID.to_string(), - AttributeValue::N(value.log_id.to_string()), + AttributeValue::N(log_id.to_string()), ), - ]); + ]) + } +} + +impl From for HashMap { + fn from(value: LogItem) -> Self { + let mut attrs = LogItem::item_key(value.backup_id, value.log_id); let (content_attr_name, content_attr) = value .content .into_attr_pair(attr::CONTENT_BLOB_INFO, attr::CONTENT_DB); attrs.insert(content_attr_name, content_attr); if !value.attachments.is_empty() { attrs.insert( attr::ATTACHMENTS.to_string(), AttributeValue::L( value .attachments .into_iter() .map(AttributeValue::from) .collect(), ), ); } attrs } } impl TryFrom> for LogItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let backup_id = value.take_attr(attr::BACKUP_ID)?; let log_id = parse_int_attribute(attr::LOG_ID, value.remove(attr::LOG_ID))?; let content = BlobOrDBContent::parse_from_attrs( &mut value, attr::CONTENT_BLOB_INFO, attr::CONTENT_DB, )?; let attachments = value.remove(attr::ATTACHMENTS); let attachments = if attachments.is_some() { attachments.attr_try_into(attr::ATTACHMENTS)? } else { Vec::new() }; Ok(LogItem { backup_id, log_id, content, attachments, }) } } diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index bffbe78e1..3a25b19dc 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,284 +1,325 @@ pub mod backup_item; pub mod log_item; use self::{ backup_item::{BackupItem, OrderedBackupItem}, log_item::LogItem, }; use crate::constants::{backup_table, log_table, LOG_DEFAULT_PAGE_SIZE}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, - types::{AttributeValue, ReturnValue}, + types::{AttributeValue, DeleteRequest, ReturnValue, WriteRequest}, +}; +use comm_lib::database::{ + self, batch_operations::ExponentialBackoffConfig, parse_int_attribute, Error, }; -use comm_lib::database::{parse_int_attribute, Error}; use tracing::{error, trace, warn}; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } } /// Backup functions impl DatabaseClient { pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = backup_item.into(); self .client .put_item() .table_name(backup_table::TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let output = self .client .get_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })?; let GetItemOutput { item: Some(item), .. } = output else { return Ok(None); }; let backup_item = item.try_into()?; Ok(Some(backup_item)) } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(backup_table::TABLE_NAME) .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .limit(1) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to find last backup"); Error::AwsSdk(e.into()) })?; match response.items.unwrap_or_default().pop() { Some(item) => { let backup_item = item.try_into()?; Ok(Some(backup_item)) } None => Ok(None), } } pub async fn remove_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let response = self .client .delete_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; - response + let result = response .attributes .map(BackupItem::try_from) .transpose() - .map_err(Error::from) + .map_err(Error::from)?; + + self.remove_log_items_for_backup(backup_id).await?; + + Ok(result) } /// For the purposes of the initial backup version this function /// removes all backups except for the latest one pub async fn remove_old_backups( &self, user_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(backup_table::TABLE_NAME) .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to fetch backups"); Error::AwsSdk(e.into()) })?; if response.last_evaluated_key().is_some() { // In the intial version of the backup service this function will be run // for every new backup (each user only has one backup), so this shouldn't // happen warn!("Not all old backups have been cleaned up"); } let items = response .items .unwrap_or_default() .into_iter() .map(OrderedBackupItem::try_from) .collect::, _>>()?; let mut removed_backups = vec![]; let Some(latest) = items.iter().map(|item| item.created).max() else { return Ok(removed_backups); }; for item in items { if item.created == latest { trace!( "Skipping removal of the latest backup item: {}", item.backup_id ); continue; } trace!("Removing backup item: {item:?}"); if let Some(backup) = self.remove_backup_item(user_id, &item.backup_id).await? { removed_backups.push(backup); } else { warn!("Backup was found during query, but wasn't found when deleting") }; } Ok(removed_backups) } } /// Backup log functions impl DatabaseClient { pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { let item = log_item.into(); self .client .put_item() .table_name(log_table::TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn fetch_log_items( &self, backup_id: &str, from_id: Option, ) -> Result<(Vec, Option), Error> { let mut query = self .client .query() .table_name(log_table::TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(backup_id.to_string()), ) .limit(LOG_DEFAULT_PAGE_SIZE); if let Some(from_id) = from_id { query = query .exclusive_start_key( log_table::attr::BACKUP_ID, AttributeValue::S(backup_id.to_string()), ) .exclusive_start_key( log_table::attr::LOG_ID, AttributeValue::N(from_id.to_string()), ); } let response = query.send().await.map_err(|e| { error!("DynamoDB client failed to fetch logs"); Error::AwsSdk(e.into()) })?; let last_id = response .last_evaluated_key() .map(|key| { parse_int_attribute( log_table::attr::LOG_ID, key.get(log_table::attr::LOG_ID).cloned(), ) }) .transpose()?; let items = response .items .unwrap_or_default() .into_iter() .map(LogItem::try_from) .collect::, _>>()?; Ok((items, last_id)) } + + pub async fn remove_log_items_for_backup( + &self, + backup_id: &str, + ) -> Result<(), Error> { + let (mut items, mut last_id) = + self.fetch_log_items(backup_id, None).await?; + while last_id.is_some() { + let (mut new_items, new_last_id) = + self.fetch_log_items(backup_id, last_id).await?; + + items.append(&mut new_items); + last_id = new_last_id; + } + + let write_requests = items + .into_iter() + .map(|key| { + DeleteRequest::builder() + .set_key(Some(LogItem::item_key(key.backup_id, key.log_id))) + .build() + }) + .map(|request| WriteRequest::builder().delete_request(request).build()) + .collect::>(); + + database::batch_operations::batch_write( + &self.client, + log_table::TABLE_NAME, + write_requests, + ExponentialBackoffConfig::default(), + ) + .await?; + + Ok(()) + } } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 638c73795..c12ba9441 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,226 +1,249 @@ use std::collections::{HashMap, HashSet}; use backup_client::{ BackupClient, BackupData, BackupDescriptor, Error as BackupClientError, RequestedData, SinkExt, StreamExt, TryStreamExt, }; use bytesize::ByteSize; use comm_lib::{ auth::UserIdentity, backup::{ DownloadLogsRequest, LatestBackupIDResponse, LogWSResponse, UploadLogRequest, }, }; use commtest::{ service_addr, tools::{generate_stable_nbytes, Error}, }; use reqwest::StatusCode; use uuid::Uuid; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let user_identity = UserIdentity { user_id: "1".to_string(), access_token: "dummy access token".to_string(), device_id: "dummy device_id".to_string(), }; let backup_datas = generate_backup_data(); // Upload backups for (backup_data, log_datas) in &backup_datas { backup_client .upload_backup(&user_identity, backup_data.clone()) .await?; let (tx, rx) = backup_client .upload_logs(&user_identity, &backup_data.backup_id) .await .unwrap(); tokio::pin!(tx); tokio::pin!(rx); for log_data in log_datas { tx.send(log_data.clone()).await.unwrap(); } let result: HashSet = rx.take(log_datas.len()).try_collect().await.unwrap(); let expected = log_datas.iter().map(|data| data.log_id).collect(); assert_eq!(result, expected); } // Test direct lookup let (backup_data, log_datas) = &backup_datas[1]; let second_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_data.backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); let user_data = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserData) .await?; assert_eq!(user_data, backup_data.user_data); // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { // Initial version of the backup service uses `user_id` in place of a username username: "1".to_string(), }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let response: LatestBackupIDResponse = serde_json::from_slice(&backup_id_response)?; assert_eq!(response.backup_id, backup_data.backup_id); let user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); // Test log download let (tx, rx) = backup_client .download_logs(&user_identity, &backup_data.backup_id) .await .unwrap(); tokio::pin!(tx); tokio::pin!(rx); tx.send(DownloadLogsRequest { from_id: None }) .await .unwrap(); let mut downloaded_logs = HashMap::new(); 'download: loop { loop { match rx.next().await.unwrap().unwrap() { LogWSResponse::LogDownload { log_id, content, attachments, } => { downloaded_logs.insert(log_id, (content, attachments)); } LogWSResponse::LogDownloadFinished { last_log_id } => { if let Some(last_log_id) = last_log_id { tx.send(DownloadLogsRequest { from_id: Some(last_log_id), }) .await .unwrap(); } else { break 'download; } } msg => panic!("Got response: {msg:?}"), }; } } let expected_logs = log_datas .iter() .cloned() .map(|data| (data.log_id, (data.content, data.attachments))) .collect(); assert_eq!(downloaded_logs, expected_logs); - // Test cleanup - let (cleaned_up_backup, _) = &backup_datas[0]; - let first_backup_descriptor = BackupDescriptor::BackupID { - backup_id: cleaned_up_backup.backup_id.clone(), + // Test backup cleanup + let (removed_backup, _) = &backup_datas[0]; + let removed_backup_descriptor = BackupDescriptor::BackupID { + backup_id: removed_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client - .download_backup_data(&first_backup_descriptor, RequestedData::UserKeys) + .download_backup_data(&removed_backup_descriptor, RequestedData::UserKeys) .await; let Err(BackupClientError::ReqwestError(error)) = response else { panic!("First backup should have been removed, instead got response: {response:?}"); }; assert_eq!( error.status(), Some(StatusCode::NOT_FOUND), "Expected status 'not found'" ); + // Test log cleanup + let (tx, rx) = backup_client + .download_logs(&user_identity, &removed_backup.backup_id) + .await + .unwrap(); + + tokio::pin!(tx); + tokio::pin!(rx); + + tx.send(DownloadLogsRequest { from_id: None }) + .await + .unwrap(); + + match rx.next().await.unwrap().unwrap() { + LogWSResponse::LogDownloadFinished { last_log_id: None } => (), + msg => { + panic!( + "Logs for first backup should have been removed, \ + instead got response: {msg:?}" + ) + } + }; + Ok(()) } fn generate_backup_data() -> [(BackupData, Vec); 2] { [ ( BackupData { backup_id: "b1".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'a'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'A'), ), attachments: vec![], }, generate_log_data(b'a'), ), ( BackupData { backup_id: "b2".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'b'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'B'), ), attachments: vec![], }, generate_log_data(b'b'), ), ] } fn generate_log_data(value: u8) -> Vec { const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; (1..30) .map(|log_id| { let size = if log_id % 2 == 0 { IN_DB_SIZE } else { IN_BLOB_SIZE }; let attachments = if log_id % 10 == 0 { Some(vec![Uuid::new_v4().to_string()]) } else { None }; let mut content = generate_stable_nbytes(size, Some(value)); let unique_suffix = log_id.to_string(); content.extend(unique_suffix.as_bytes()); UploadLogRequest { log_id, content, attachments, } }) .collect() }