Page MenuHomePhabricator

D9352.diff
No OneTemporary

D9352.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,5 +1,5 @@
#![allow(unused)]
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashSet};
use std::ops::{Bound, Range, RangeBounds, RangeInclusive};
use std::sync::Arc;
@@ -224,6 +224,89 @@
}
Ok(())
}
+
+ pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
+ // 1. Fetch blobs and holders marked as "unchecked"
+ let protection_periond = self.config.orphan_protection_period;
+ let (unchecked_blobs, unchecked_holders) = tokio::try_join!(
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Blob, protection_periond),
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Holder, protection_periond)
+ )?;
+
+ let mut unchecked_items = UncheckedCollection::new();
+
+ // 2. construct structures of possibly orphaned blobs
+ for PrimaryKey { blob_hash, .. } in unchecked_blobs {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: Some(blob_hash),
+ holders: Vec::new(),
+ },
+ );
+ }
+
+ // 3. iterate over possibly orphaned holders and fill the structs
+ for PrimaryKey { blob_hash, holder } in unchecked_holders {
+ if let Some(item) = unchecked_items.get_mut(&blob_hash) {
+ item.holders.push(holder);
+ } else {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: None,
+ holders: vec![holder],
+ },
+ );
+ }
+ }
+
+ let mut orphans = HashSet::new();
+ let mut checked = HashSet::new();
+
+ // 4. Filter out items that are for sure not orphaned
+ checked.extend(unchecked_items.filter_out_checked());
+
+ // 5. Query DDB for additional blobs and holders to check if they exist
+ let mut fetch_results = Vec::new();
+
+ // 5a. Query holders - Find if possibly orphan blobs have at least one holder
+ for blob_hash in unchecked_items.blobs_to_find_holders() {
+ let holders = self
+ .db
+ .list_blob_holders(blob_hash, Some(1))
+ .await?
+ .into_iter()
+ .map(|holder| PrimaryKey::new(blob_hash.to_string(), holder));
+ fetch_results.extend(holders);
+ }
+
+ // 5b. Query blobs - Find if possibly orphaned holders have blobs
+ let blobs_to_get = unchecked_items.blobs_to_check_existence();
+ let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?;
+ fetch_results.extend(existing_blobs);
+
+ // 6. Update the struct with query results
+ // Then do 2nd pass of filtering out checked items (repeat step 4)
+ unchecked_items.feed_with_query_results(fetch_results);
+ checked.extend(unchecked_items.filter_out_checked());
+
+ // 7. Make changes to database
+ orphans.extend(unchecked_items.into_primary_keys());
+
+ tokio::try_join!(
+ self.db.batch_delete_rows(orphans),
+ self.db.batch_mark_checked(checked)
+ )?;
+
+ // TODO: Delete orphaned blobs from S3
+
+ Ok(())
+ }
}
// A B-tree map performs well for both random and sequential access.

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 30, 12:59 AM (21 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2598876
Default Alt Text
D9352.diff (3 KB)

Event Timeline