diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -9,7 +9,7 @@ use comm_services_lib::tools::BoxedError; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::config::CONFIG; use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; @@ -227,7 +227,9 @@ } pub async fn perform_cleanup(&self) -> anyhow::Result<()> { + info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked" + debug!("Querying for unchecked blobs and holders..."); let protection_periond = self.config.orphan_protection_period; let (unchecked_blobs, unchecked_holders) = tokio::try_join!( self @@ -237,11 +239,18 @@ .db .find_unchecked_items(UncheckedKind::Holder, protection_periond) )?; + debug!( + "Found {} unchecked blobs and {} unchecked holders", + unchecked_blobs.len(), + unchecked_holders.len() + ); let mut unchecked_items = UncheckedCollection::new(); // 2. construct structures of possibly orphaned blobs + debug!("Creating structures of possibly orphaned items..."); for PrimaryKey { blob_hash, .. } in unchecked_blobs { + trace!("Creating unchecked item for blob hash '{}'", &blob_hash); unchecked_items.insert( blob_hash.clone(), UncheckedItem { @@ -254,8 +263,18 @@ // 3. iterate over possibly orphaned holders and fill the structs for PrimaryKey { blob_hash, holder } in unchecked_holders { if let Some(item) = unchecked_items.get_mut(&blob_hash) { + trace!( + "Inserting holder '{}' for blob hash '{}'", + &holder, + &blob_hash + ); item.holders.push(holder); } else { + trace!( + "Creating empty item for holder '{}' (blob hash: '{}')", + &holder, + &blob_hash + ); unchecked_items.insert( blob_hash.clone(), UncheckedItem { @@ -270,12 +289,15 @@ let mut checked = HashSet::new(); // 4. Filter out items that are for sure not orphaned - checked.extend(unchecked_items.filter_out_checked()); + let checked_items = unchecked_items.filter_out_checked(); + debug!("Filtered out {} checked items", checked_items.len()); + checked.extend(checked_items); // 5. Query DDB for additional blobs and holders to check if they exist let mut fetch_results = Vec::new(); // 5a. Query holders - Find if possibly orphan blobs have at least one holder + debug!("Querying holders for possibly orphaned blobs..."); for blob_hash in unchecked_items.blobs_to_find_holders() { let holders = self .db @@ -283,18 +305,35 @@ .await? .into_iter() .map(|holder| PrimaryKey::new(blob_hash.clone(), holder)); + + let len_before = fetch_results.len(); fetch_results.extend(holders); + trace!( + "Found {} holders for blob hash '{}'", + fetch_results.len() - len_before, + blob_hash + ); } // 5b. Query blobs - Find if possibly orphaned holders have blobs + debug!("Querying blobs for possibly orphaned holders..."); let blobs_to_get = unchecked_items.blobs_to_check_existence(); + let queried_blobs_len = blobs_to_get.len(); let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?; + debug!( + "Found {} existing blobs out of {} queried", + existing_blobs.len(), + queried_blobs_len + ); fetch_results.extend(existing_blobs); // 6. Update the struct with query results // Then do 2nd pass of filtering out checked items (repeat step 4) + debug!("Feeding data structure with query results and filtering again..."); unchecked_items.feed_with_query_results(fetch_results); - checked.extend(unchecked_items.filter_out_checked()); + let checked_items = unchecked_items.filter_out_checked(); + debug!("Filtered out {} checked items", checked_items.len()); + checked.extend(checked_items); // 7. Perform actual cleanup orphans.extend(unchecked_items.into_primary_keys()); @@ -310,15 +349,25 @@ }) .collect(); + let num_orphans = orphans.len(); + let num_checked = checked.len(); + let num_s3_blobs = s3_paths.len(); + /// 7a. Make changes to database + debug!("Cleaning up database... Marking {} items as checked and deleting {} orphans", num_checked, num_orphans); tokio::try_join!( self.db.batch_delete_rows(orphans), self.db.batch_mark_checked(checked) )?; // 7b. Delete orphaned blobs from S3 + debug!("Cleaning up S3... Deleting {} blobs", num_s3_blobs); self.s3.batch_delete_objects(s3_paths).await?; + info!( + "Cleanup complete. Deleted orphaned {} DB items and marked {} items as checked. {} blobs were deleted from S3", + num_orphans, num_checked, num_s3_blobs + ); Ok(()) } }