diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -16,7 +16,7 @@ blob::client::BlobServiceClient, database::{ self, batch_operations::ExponentialBackoffConfig, parse_int_attribute, - Error, + AttributeMap, Error, }, }; use tracing::{error, trace, warn}; @@ -223,24 +223,29 @@ Ok(()) } - pub async fn fetch_log_items( + /// Utility internal function to fetch a page of log DDB items + /// without further processing them. + async fn fetch_raw_log_items( &self, user_id: &str, backup_id: &str, + projection_expression: Option<&String>, from_id: Option, - ) -> Result<(Vec, Option), Error> { + limit: Option, + ) -> Result<(Vec, Option), Error> { let id = LogItem::partition_key(user_id, backup_id); let mut query = self .client .query() .table_name(log_table::TABLE_NAME) + .set_projection_expression(projection_expression.cloned()) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(id.clone()), ) - .limit(LOG_DEFAULT_PAGE_SIZE); + .set_limit(limit); if let Some(from_id) = from_id { query = query @@ -269,9 +274,31 @@ }) .transpose()?; - let items = response - .items - .unwrap_or_default() + let items = response.items.unwrap_or_default(); + Ok((items, last_id)) + } + + /// Fetches single page of log items, starting from [`from_id`]. + /// Pare size is [`LOG_DEFAULT_PAGE_SIZE`]. + /// Returns log items and ID of last processed ID which can be passed + /// to the [`from_id`] argument of a subsequent call. + pub async fn fetch_log_items( + &self, + user_id: &str, + backup_id: &str, + from_id: Option, + ) -> Result<(Vec, Option), Error> { + let (raw_items, last_id) = self + .fetch_raw_log_items( + user_id, + backup_id, + None, + from_id, + Some(LOG_DEFAULT_PAGE_SIZE), + ) + .await?; + + let items = raw_items .into_iter() .map(LogItem::try_from) .collect::, _>>()?; @@ -279,21 +306,39 @@ Ok((items, last_id)) } + /// Fetches all log items for given backup. + pub async fn fetch_all_log_items_for_backup( + &self, + user_id: &str, + backup_id: &str, + ) -> Result, Error> { + let (mut raw_items, mut last_id) = (Vec::new(), None); + while { + let (new_items, new_last_id) = self + .fetch_raw_log_items(user_id, backup_id, None, last_id, None) + .await?; + + raw_items.extend(new_items); + last_id = new_last_id; + last_id.is_some() + } {} + + let items = raw_items + .into_iter() + .map(LogItem::try_from) + .collect::, _>>()?; + Ok(items) + } + pub async fn remove_log_items_for_backup( &self, user_id: &str, backup_id: &str, blob_client: &BlobServiceClient, ) -> Result<(), Error> { - let (mut items, mut last_id) = - self.fetch_log_items(user_id, backup_id, None).await?; - while last_id.is_some() { - let (mut new_items, new_last_id) = - self.fetch_log_items(user_id, backup_id, last_id).await?; - - items.append(&mut new_items); - last_id = new_last_id; - } + let items = self + .fetch_all_log_items_for_backup(user_id, backup_id) + .await?; for log_item in &items { log_item.revoke_holders(blob_client);