diff --git a/services/backup/src/database/log_item.rs b/services/backup/src/database/log_item.rs --- a/services/backup/src/database/log_item.rs +++ b/services/backup/src/database/log_item.rs @@ -39,20 +39,27 @@ ); self.content.move_to_blob(blob_client).await } -} -impl From for HashMap { - fn from(value: LogItem) -> Self { - let mut attrs = HashMap::from([ + pub fn item_key( + backup_id: impl Into, + log_id: usize, + ) -> HashMap { + HashMap::from([ ( attr::BACKUP_ID.to_string(), - AttributeValue::S(value.backup_id), + AttributeValue::S(backup_id.into()), ), ( attr::LOG_ID.to_string(), - AttributeValue::N(value.log_id.to_string()), + AttributeValue::N(log_id.to_string()), ), - ]); + ]) + } +} + +impl From for HashMap { + fn from(value: LogItem) -> Self { + let mut attrs = LogItem::item_key(value.backup_id, value.log_id); let (content_attr_name, content_attr) = value .content diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -8,9 +8,11 @@ use crate::constants::{backup_table, log_table, LOG_DEFAULT_PAGE_SIZE}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, - types::{AttributeValue, ReturnValue}, + types::{AttributeValue, DeleteRequest, ReturnValue, WriteRequest}, +}; +use comm_lib::database::{ + self, batch_operations::ExponentialBackoffConfig, parse_int_attribute, Error, }; -use comm_lib::database::{parse_int_attribute, Error}; use tracing::{error, trace, warn}; #[derive(Clone)] @@ -131,11 +133,15 @@ Error::AwsSdk(e.into()) })?; - response + let result = response .attributes .map(BackupItem::try_from) .transpose() - .map_err(Error::from) + .map_err(Error::from)?; + + self.remove_log_items_for_backup(backup_id).await?; + + Ok(result) } /// For the purposes of the initial backup version this function @@ -280,4 +286,39 @@ Ok((items, last_id)) } + + pub async fn remove_log_items_for_backup( + &self, + backup_id: &str, + ) -> Result<(), Error> { + let (mut items, mut last_id) = + self.fetch_log_items(backup_id, None).await?; + while last_id.is_some() { + let (mut new_items, new_last_id) = + self.fetch_log_items(backup_id, last_id).await?; + + items.append(&mut new_items); + last_id = new_last_id; + } + + let write_requests = items + .into_iter() + .map(|key| { + DeleteRequest::builder() + .set_key(Some(LogItem::item_key(key.backup_id, key.log_id))) + .build() + }) + .map(|request| WriteRequest::builder().delete_request(request).build()) + .collect::>(); + + database::batch_operations::batch_write( + &self.client, + log_table::TABLE_NAME, + write_requests, + ExponentialBackoffConfig::default(), + ) + .await?; + + Ok(()) + } } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -137,15 +137,15 @@ .collect(); assert_eq!(downloaded_logs, expected_logs); - // Test cleanup - let (cleaned_up_backup, _) = &backup_datas[0]; - let first_backup_descriptor = BackupDescriptor::BackupID { - backup_id: cleaned_up_backup.backup_id.clone(), + // Test backup cleanup + let (removed_backup, _) = &backup_datas[0]; + let removed_backup_descriptor = BackupDescriptor::BackupID { + backup_id: removed_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client - .download_backup_data(&first_backup_descriptor, RequestedData::UserKeys) + .download_backup_data(&removed_backup_descriptor, RequestedData::UserKeys) .await; let Err(BackupClientError::ReqwestError(error)) = response else { @@ -158,6 +158,29 @@ "Expected status 'not found'" ); + // Test log cleanup + let (tx, rx) = backup_client + .download_logs(&user_identity, &removed_backup.backup_id) + .await + .unwrap(); + + tokio::pin!(tx); + tokio::pin!(rx); + + tx.send(DownloadLogsRequest { from_id: None }) + .await + .unwrap(); + + match rx.next().await.unwrap().unwrap() { + LogWSResponse::LogDownloadFinished { last_log_id: None } => (), + msg => { + panic!( + "Logs for first backup should have been removed, \ + instead got response: {msg:?}" + ) + } + }; + Ok(()) }