Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33307445
D9355.1768797797.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
2 KB
Referenced Files
None
Subscribers
None
D9355.1768797797.diff
View Options
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -1,7 +1,7 @@
use aws_sdk_s3::{
operation::create_multipart_upload::CreateMultipartUploadOutput,
primitives::ByteStream,
- types::{CompletedMultipartUpload, CompletedPart},
+ types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier},
Error as S3Error,
};
use std::ops::{Bound, RangeBounds};
@@ -197,6 +197,33 @@
Ok(())
}
+
+ pub async fn batch_delete_objects(&self, paths: Vec<S3Path>) -> S3Result<()> {
+ let Some(first_path) = paths.first() else {
+ debug!("No S3 objects to delete");
+ return Ok(());
+ };
+
+ let bucket_name = &first_path.bucket_name;
+ let objects = paths
+ .iter()
+ .map(|path| ObjectIdentifier::builder().key(&path.object_name).build())
+ .collect();
+
+ self
+ .client
+ .delete_objects()
+ .bucket(bucket_name)
+ .delete(Delete::builder().set_objects(Some(objects)).build())
+ .send()
+ .await
+ .map_err(|e| {
+ error!("S3 failed to batch delete objects");
+ Error::AwsSdk(e.into())
+ })?;
+
+ Ok(())
+ }
}
/// Represents a multipart upload session to the AWS S3
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -11,6 +11,7 @@
use tonic::codegen::futures_core::Stream;
use tracing::{debug, error, trace, warn};
+use crate::config::CONFIG;
use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE;
use crate::database::types::{
BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind,
@@ -295,15 +296,28 @@
unchecked_items.feed_with_query_results(fetch_results);
checked.extend(unchecked_items.filter_out_checked());
- /// 7. Make changes to database
+ // 7. Perform actual cleanup
orphans.extend(unchecked_items.into_primary_keys());
+ let s3_paths: Vec<S3Path> = orphans
+ .iter()
+ .map(|item| &item.blob_hash)
+ // remove duplicates (they come from holders having the same blob hash)
+ .collect::<HashSet<_>>()
+ .into_iter()
+ .map(|blob_hash| S3Path {
+ bucket_name: CONFIG.s3_bucket_name.clone(),
+ object_name: blob_hash.to_string(),
+ })
+ .collect();
+ /// 7a. Make changes to database
tokio::try_join!(
self.db.batch_delete_rows(orphans),
self.db.batch_mark_checked(checked)
)?;
- // TODO: Delete orphaned blobs from S3
+ // 7b. Delete orphaned blobs from S3
+ self.s3.batch_delete_objects(s3_paths).await?;
Ok(())
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Jan 19, 4:43 AM (10 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5954409
Default Alt Text
D9355.1768797797.diff (2 KB)
Attached To
Mode
D9355: [blob] Delete S3 objects during cleanup
Attached
Detach File
Event Timeline
Log In to Comment