diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; @@ -224,6 +224,89 @@ } Ok(()) } + + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { + // 1. Fetch blobs and holders marked as "unchecked" + let protection_periond = self.config.orphan_protection_period; + let (unchecked_blobs, unchecked_holders) = tokio::try_join!( + self + .db + .find_unchecked_items(UncheckedKind::Blob, protection_periond), + self + .db + .find_unchecked_items(UncheckedKind::Holder, protection_periond) + )?; + + let mut unchecked_items = UncheckedCollection::new(); + + // 2. construct structures of possibly orphaned blobs + for PrimaryKey { blob_hash, .. } in unchecked_blobs { + unchecked_items.insert( + blob_hash.clone(), + UncheckedItem { + blob_hash: Some(blob_hash), + holders: Vec::new(), + }, + ); + } + + // 3. iterate over possibly orphaned holders and fill the structs + for PrimaryKey { blob_hash, holder } in unchecked_holders { + if let Some(item) = unchecked_items.get_mut(&blob_hash) { + item.holders.push(holder); + } else { + unchecked_items.insert( + blob_hash.clone(), + UncheckedItem { + blob_hash: None, + holders: vec![holder], + }, + ); + } + } + + let mut orphans = HashSet::new(); + let mut checked = HashSet::new(); + + // 4. Filter out items that are for sure not orphaned + checked.extend(unchecked_items.filter_out_checked()); + + // 5. Query DDB for additional blobs and holders to check if they exist + let mut fetch_results = Vec::new(); + + // 5a. Query holders - Find if possibly orphan blobs have at least one holder + for blob_hash in unchecked_items.blobs_to_find_holders() { + let holders = self + .db + .list_blob_holders(&blob_hash, Some(1)) + .await? + .into_iter() + .map(|holder| PrimaryKey::new(blob_hash.clone(), holder)); + fetch_results.extend(holders); + } + + // 5b. Query blobs - Find if possibly orphaned holders have blobs + let blobs_to_get = unchecked_items.blobs_to_check_existence(); + let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?; + fetch_results.extend(existing_blobs); + + // 6. Update the struct with query results + // Then do 2nd pass of filtering out checked items (repeat step 4) + unchecked_items.feed_with_query_results(fetch_results); + checked.extend(unchecked_items.filter_out_checked()); + + /// 7. Make changes to database + orphans.extend(unchecked_items.into_primary_keys()); + + tokio::try_join!( + self.db.batch_delete_rows(orphans), + self.db.batch_mark_checked(checked) + )?; + + // TODO: Delete orphaned blobs from S3 + + Ok(()) + } } /// Represents an "unchecked" blob entity. It might miss either