diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,7 +1,7 @@ use aws_sdk_s3::{ operation::create_multipart_upload::CreateMultipartUploadOutput, primitives::ByteStream, - types::{CompletedMultipartUpload, CompletedPart}, + types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}, Error as S3Error, }; use std::ops::{Bound, RangeBounds}; @@ -197,6 +197,33 @@ Ok(()) } + + pub async fn batch_delete_objects(&self, paths: Vec) -> S3Result<()> { + let Some(first_path) = paths.first() else { + debug!("No S3 objects to delete"); + return Ok(()); + }; + + let bucket_name = &first_path.bucket_name; + let objects = paths + .iter() + .map(|path| ObjectIdentifier::builder().key(&path.object_name).build()) + .collect(); + + self + .client + .delete_objects() + .bucket(bucket_name) + .delete(Delete::builder().set_objects(Some(objects)).build()) + .send() + .await + .map_err(|e| { + error!("S3 failed to batch delete objects"); + Error::AwsSdk(e.into()) + })?; + + Ok(()) + } } /// Represents a multipart upload session to the AWS S3 diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -11,6 +11,7 @@ use tonic::codegen::futures_core::Stream; use tracing::{debug, error, trace, warn}; +use crate::config::CONFIG; use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; use crate::database::types::{ BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, @@ -295,15 +296,25 @@ unchecked_items.feed_with_query_results(fetch_results); checked.extend(unchecked_items.filter_out_checked()); - // 7. Make changes to database + // 7. Perform actual cleanup orphans.extend(unchecked_items.into_primary_keys()); + let s3_paths: Vec = orphans + .iter() + .filter(|pk| pk.is_blob_item()) + .map(|PrimaryKey { blob_hash, .. }| S3Path { + bucket_name: CONFIG.s3_bucket_name.clone(), + object_name: blob_hash.to_string(), + }) + .collect(); + // 7a. Make changes to database tokio::try_join!( self.db.batch_delete_rows(orphans), self.db.batch_mark_checked(checked) )?; - // TODO: Delete orphaned blobs from S3 + // 7b. Delete orphaned blobs from S3 + self.s3.batch_delete_objects(s3_paths).await?; Ok(()) }