Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32513496
D9352.1767117601.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
3 KB
Referenced Files
None
Subscribers
None
D9352.1767117601.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,5 +1,5 @@
#![allow(unused)]
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashSet};
use std::ops::{Bound, Range, RangeBounds, RangeInclusive};
use std::sync::Arc;
@@ -224,6 +224,89 @@
}
Ok(())
}
+
+ pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
+ // 1. Fetch blobs and holders marked as "unchecked"
+ let protection_periond = self.config.orphan_protection_period;
+ let (unchecked_blobs, unchecked_holders) = tokio::try_join!(
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Blob, protection_periond),
+ self
+ .db
+ .find_unchecked_items(UncheckedKind::Holder, protection_periond)
+ )?;
+
+ let mut unchecked_items = UncheckedCollection::new();
+
+ // 2. construct structures of possibly orphaned blobs
+ for PrimaryKey { blob_hash, .. } in unchecked_blobs {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: Some(blob_hash),
+ holders: Vec::new(),
+ },
+ );
+ }
+
+ // 3. iterate over possibly orphaned holders and fill the structs
+ for PrimaryKey { blob_hash, holder } in unchecked_holders {
+ if let Some(item) = unchecked_items.get_mut(&blob_hash) {
+ item.holders.push(holder);
+ } else {
+ unchecked_items.insert(
+ blob_hash.clone(),
+ UncheckedItem {
+ blob_hash: None,
+ holders: vec![holder],
+ },
+ );
+ }
+ }
+
+ let mut orphans = HashSet::new();
+ let mut checked = HashSet::new();
+
+ // 4. Filter out items that are for sure not orphaned
+ checked.extend(unchecked_items.filter_out_checked());
+
+ // 5. Query DDB for additional blobs and holders to check if they exist
+ let mut fetch_results = Vec::new();
+
+ // 5a. Query holders - Find if possibly orphan blobs have at least one holder
+ for blob_hash in unchecked_items.blobs_to_find_holders() {
+ let holders = self
+ .db
+ .list_blob_holders(&blob_hash, Some(1))
+ .await?
+ .into_iter()
+ .map(|holder| PrimaryKey::new(blob_hash.clone(), holder));
+ fetch_results.extend(holders);
+ }
+
+ // 5b. Query blobs - Find if possibly orphaned holders have blobs
+ let blobs_to_get = unchecked_items.blobs_to_check_existence();
+ let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?;
+ fetch_results.extend(existing_blobs);
+
+ // 6. Update the struct with query results
+ // Then do 2nd pass of filtering out checked items (repeat step 4)
+ unchecked_items.feed_with_query_results(fetch_results);
+ checked.extend(unchecked_items.filter_out_checked());
+
+ /// 7. Make changes to database
+ orphans.extend(unchecked_items.into_primary_keys());
+
+ tokio::try_join!(
+ self.db.batch_delete_rows(orphans),
+ self.db.batch_mark_checked(checked)
+ )?;
+
+ // TODO: Delete orphaned blobs from S3
+
+ Ok(())
+ }
}
/// Represents an "unchecked" blob entity. It might miss either
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 30, 6:00 PM (13 h, 21 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5868716
Default Alt Text
D9352.1767117601.diff (3 KB)
Attached To
Mode
D9352: [blob] Add function to perform cleanup operation
Attached
Detach File
Event Timeline
Log In to Comment