Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3386100
D9352.id31920.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Referenced Files
None
Subscribers
None
D9352.id31920.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,5 +1,5 @@
#![allow(unused)]
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashSet};
use std::ops::{Bound, Range, RangeBounds, RangeInclusive};
use std::sync::Arc;
@@ -224,6 +224,89 @@
}
Ok(())
}
+
+ pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
+ // 1. Fetch blobs and holders marked as "unchecked"
+ let protection_periond = self.config.orphan_protection_period;
+ let (unchecked_blobs, unchecked_holders) = tokio::try_join!(
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Blob, protection_periond),
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Holder, protection_periond)
+ )?;
+
+ let mut unchecked_items = UncheckedCollection::new();
+
+ // 2. construct structures of possibly orphaned blobs
+ for PrimaryKey { blob_hash, .. } in unchecked_blobs {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: Some(blob_hash),
+ holders: Vec::new(),
+ },
+ );
+ }
+
+ // 3. iterate over possibly orphaned holders and fill the structs
+ for PrimaryKey { blob_hash, holder } in unchecked_holders {
+ if let Some(item) = unchecked_items.get_mut(&blob_hash) {
+ item.holders.push(holder);
+ } else {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: None,
+ holders: vec![holder],
+ },
+ );
+ }
+ }
+
+ let mut orphans = HashSet::new();
+ let mut checked = HashSet::new();
+
+ // 4. Filter out items that are for sure not orphaned
+ checked.extend(unchecked_items.filter_out_checked());
+
+ // 5. Query DDB for additional blobs and holders to check if they exist
+ let mut fetch_results = Vec::new();
+
+ // 5a. Query holders - Find if possibly orphan blobs have at least one holder
+ for blob_hash in unchecked_items.blobs_to_find_holders() {
+ let holders = self
+ .db
+ .list_blob_holders(blob_hash, Some(1))
+ .await?
+ .into_iter()
+ .map(|holder| PrimaryKey::new(blob_hash.to_string(), holder));
+ fetch_results.extend(holders);
+ }
+
+ // 5b. Query blobs - Find if possibly orphaned holders have blobs
+ let blobs_to_get = unchecked_items.blobs_to_check_existence();
+ let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?;
+ fetch_results.extend(existing_blobs);
+
+ // 6. Update the struct with query results
+ // Then do 2nd pass of filtering out checked items (repeat step 4)
+ unchecked_items.feed_with_query_results(fetch_results);
+ checked.extend(unchecked_items.filter_out_checked());
+
+ // 7. Make changes to database
+ orphans.extend(unchecked_items.into_primary_keys());
+
+ tokio::try_join!(
+ self.db.batch_delete_rows(orphans),
+ self.db.batch_mark_checked(checked)
+ )?;
+
+ // TODO: Delete orphaned blobs from S3
+
+ Ok(())
+ }
}
// A B-tree map performs well for both random and sequential access.
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 30, 3:18 AM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2598876
Default Alt Text
D9352.id31920.diff (3 KB)
Attached To
Mode
D9352: [blob] Add function to perform cleanup operation
Attached
Detach File
Event Timeline
Log In to Comment