Page MenuHomePhabricator

D9414.id31922.diff
No OneTemporary

D9414.id31922.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -9,7 +9,7 @@
use comm_services_lib::tools::BoxedError;
use tokio_stream::StreamExt;
use tonic::codegen::futures_core::Stream;
-use tracing::{debug, error, trace, warn};
+use tracing::{debug, error, info, trace, warn};
use crate::config::CONFIG;
use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE;
@@ -227,7 +227,9 @@
}
pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
+ info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"
+ debug!("Querying for unchecked blobs and holders...");
let protection_periond = self.config.orphan_protection_period;
let (unchecked_blobs, unchecked_holders) = tokio::try_join!(
self
@@ -237,11 +239,18 @@
.db
.find_unchecked_items(UncheckedKind::Holder, protection_periond)
)?;
+ debug!(
+ "Found {} unchecked blobs and {} unchecked holders",
+ unchecked_blobs.len(),
+ unchecked_holders.len()
+ );
let mut unchecked_items = UncheckedCollection::new();
// 2. construct structures of possibly orphaned blobs
+ debug!("Creating structures of possibly orphaned items...");
for PrimaryKey { blob_hash, .. } in unchecked_blobs {
+ trace!("Creating unchecked item for blob hash '{}'", &blob_hash);
unchecked_items.insert(
blob_hash.clone(),
UncheckedItem {
@@ -254,8 +263,18 @@
// 3. iterate over possibly orphaned holders and fill the structs
for PrimaryKey { blob_hash, holder } in unchecked_holders {
if let Some(item) = unchecked_items.get_mut(&blob_hash) {
+ trace!(
+ "Inserting holder '{}' for blob hash '{}'",
+ &holder,
+ &blob_hash
+ );
item.holders.push(holder);
} else {
+ trace!(
+ "Creating empty item for holder '{}' (blob hash: '{}')",
+ &holder,
+ &blob_hash
+ );
unchecked_items.insert(
blob_hash.clone(),
UncheckedItem {
@@ -270,12 +289,15 @@
let mut checked = HashSet::new();
// 4. Filter out items that are for sure not orphaned
- checked.extend(unchecked_items.filter_out_checked());
+ let checked_items = unchecked_items.filter_out_checked();
+ debug!("Filtered out {} checked items", checked_items.len());
+ checked.extend(checked_items);
// 5. Query DDB for additional blobs and holders to check if they exist
let mut fetch_results = Vec::new();
// 5a. Query holders - Find if possibly orphan blobs have at least one holder
+ debug!("Querying holders for possibly orphaned blobs...");
for blob_hash in unchecked_items.blobs_to_find_holders() {
let holders = self
.db
@@ -283,18 +305,35 @@
.await?
.into_iter()
.map(|holder| PrimaryKey::new(blob_hash.to_string(), holder));
+
+ let len_before = fetch_results.len();
fetch_results.extend(holders);
+ trace!(
+ "Found {} holders for blob hash '{}'",
+ fetch_results.len() - len_before,
+ blob_hash
+ );
}
// 5b. Query blobs - Find if possibly orphaned holders have blobs
+ debug!("Querying blobs for possibly orphaned holders...");
let blobs_to_get = unchecked_items.blobs_to_check_existence();
+ let queried_blobs_len = blobs_to_get.len();
let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?;
+ debug!(
+ "Found {} existing blobs out of {} queried",
+ existing_blobs.len(),
+ queried_blobs_len
+ );
fetch_results.extend(existing_blobs);
// 6. Update the struct with query results
// Then do 2nd pass of filtering out checked items (repeat step 4)
+ debug!("Feeding data structure with query results and filtering again...");
unchecked_items.feed_with_query_results(fetch_results);
- checked.extend(unchecked_items.filter_out_checked());
+ let checked_items = unchecked_items.filter_out_checked();
+ debug!("Filtered out {} checked items", checked_items.len());
+ checked.extend(checked_items);
// 7. Perform actual cleanup
orphans.extend(unchecked_items.into_primary_keys());
@@ -307,15 +346,25 @@
})
.collect();
+ let num_orphans = orphans.len();
+ let num_checked = checked.len();
+ let num_s3_blobs = s3_paths.len();
+
// 7a. Make changes to database
+ debug!("Cleaning up database... Marking {} items as checked and deleting {} orphans", num_checked, num_orphans);
tokio::try_join!(
self.db.batch_delete_rows(orphans),
self.db.batch_mark_checked(checked)
)?;
// 7b. Delete orphaned blobs from S3
+ debug!("Cleaning up S3... Deleting {} blobs", num_s3_blobs);
self.s3.batch_delete_objects(s3_paths).await?;
+ info!(
+ "Cleanup complete. Deleted orphaned {} DB items and marked {} items as checked. {} blobs were deleted from S3",
+ num_orphans, num_checked, num_s3_blobs
+ );
Ok(())
}
}

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 24, 7:03 PM (8 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2700015
Default Alt Text
D9414.id31922.diff (4 KB)

Event Timeline