Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3525980
D9414.id31922.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D9414.id31922.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -9,7 +9,7 @@
use comm_services_lib::tools::BoxedError;
use tokio_stream::StreamExt;
use tonic::codegen::futures_core::Stream;
-use tracing::{debug, error, trace, warn};
+use tracing::{debug, error, info, trace, warn};
use crate::config::CONFIG;
use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE;
@@ -227,7 +227,9 @@
}
pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
+ info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"
+ debug!("Querying for unchecked blobs and holders...");
let protection_periond = self.config.orphan_protection_period;
let (unchecked_blobs, unchecked_holders) = tokio::try_join!(
self
@@ -237,11 +239,18 @@
.db
.find_unchecked_items(UncheckedKind::Holder, protection_periond)
)?;
+ debug!(
+ "Found {} unchecked blobs and {} unchecked holders",
+ unchecked_blobs.len(),
+ unchecked_holders.len()
+ );
let mut unchecked_items = UncheckedCollection::new();
// 2. construct structures of possibly orphaned blobs
+ debug!("Creating structures of possibly orphaned items...");
for PrimaryKey { blob_hash, .. } in unchecked_blobs {
+ trace!("Creating unchecked item for blob hash '{}'", &blob_hash);
unchecked_items.insert(
blob_hash.clone(),
UncheckedItem {
@@ -254,8 +263,18 @@
// 3. iterate over possibly orphaned holders and fill the structs
for PrimaryKey { blob_hash, holder } in unchecked_holders {
if let Some(item) = unchecked_items.get_mut(&blob_hash) {
+ trace!(
+ "Inserting holder '{}' for blob hash '{}'",
+ &holder,
+ &blob_hash
+ );
item.holders.push(holder);
} else {
+ trace!(
+ "Creating empty item for holder '{}' (blob hash: '{}')",
+ &holder,
+ &blob_hash
+ );
unchecked_items.insert(
blob_hash.clone(),
UncheckedItem {
@@ -270,12 +289,15 @@
let mut checked = HashSet::new();
// 4. Filter out items that are for sure not orphaned
- checked.extend(unchecked_items.filter_out_checked());
+ let checked_items = unchecked_items.filter_out_checked();
+ debug!("Filtered out {} checked items", checked_items.len());
+ checked.extend(checked_items);
// 5. Query DDB for additional blobs and holders to check if they exist
let mut fetch_results = Vec::new();
// 5a. Query holders - Find if possibly orphan blobs have at least one holder
+ debug!("Querying holders for possibly orphaned blobs...");
for blob_hash in unchecked_items.blobs_to_find_holders() {
let holders = self
.db
@@ -283,18 +305,35 @@
.await?
.into_iter()
.map(|holder| PrimaryKey::new(blob_hash.to_string(), holder));
+
+ let len_before = fetch_results.len();
fetch_results.extend(holders);
+ trace!(
+ "Found {} holders for blob hash '{}'",
+ fetch_results.len() - len_before,
+ blob_hash
+ );
}
// 5b. Query blobs - Find if possibly orphaned holders have blobs
+ debug!("Querying blobs for possibly orphaned holders...");
let blobs_to_get = unchecked_items.blobs_to_check_existence();
+ let queried_blobs_len = blobs_to_get.len();
let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?;
+ debug!(
+ "Found {} existing blobs out of {} queried",
+ existing_blobs.len(),
+ queried_blobs_len
+ );
fetch_results.extend(existing_blobs);
// 6. Update the struct with query results
// Then do 2nd pass of filtering out checked items (repeat step 4)
+ debug!("Feeding data structure with query results and filtering again...");
unchecked_items.feed_with_query_results(fetch_results);
- checked.extend(unchecked_items.filter_out_checked());
+ let checked_items = unchecked_items.filter_out_checked();
+ debug!("Filtered out {} checked items", checked_items.len());
+ checked.extend(checked_items);
// 7. Perform actual cleanup
orphans.extend(unchecked_items.into_primary_keys());
@@ -307,15 +346,25 @@
})
.collect();
+ let num_orphans = orphans.len();
+ let num_checked = checked.len();
+ let num_s3_blobs = s3_paths.len();
+
// 7a. Make changes to database
+ debug!("Cleaning up database... Marking {} items as checked and deleting {} orphans", num_checked, num_orphans);
tokio::try_join!(
self.db.batch_delete_rows(orphans),
self.db.batch_mark_checked(checked)
)?;
// 7b. Delete orphaned blobs from S3
+ debug!("Cleaning up S3... Deleting {} blobs", num_s3_blobs);
self.s3.batch_delete_objects(s3_paths).await?;
+ info!(
+ "Cleanup complete. Deleted orphaned {} DB items and marked {} items as checked. {} blobs were deleted from S3",
+ num_orphans, num_checked, num_s3_blobs
+ );
Ok(())
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 24, 7:03 PM (8 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2700015
Default Alt Text
D9414.id31922.diff (4 KB)
Attached To
Mode
D9414: [blob] Add logs to cleanup task
Attached
Detach File
Event Timeline
Log In to Comment