diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,7 +1,7 @@ use aws_sdk_s3::{ operation::create_multipart_upload::CreateMultipartUploadOutput, primitives::ByteStream, - types::{CompletedMultipartUpload, CompletedPart}, + types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}, Error as S3Error, }; use std::ops::{Bound, RangeBounds}; @@ -197,6 +197,33 @@ Ok(()) } + + pub async fn batch_delete_objects(&self, paths: Vec) -> S3Result<()> { + let Some(first_path) = paths.first() else { + debug!("No S3 objects to delete"); + return Ok(()); + }; + + let bucket_name = &first_path.bucket_name; + let objects = paths + .iter() + .map(|path| ObjectIdentifier::builder().key(&path.object_name).build()) + .collect(); + + self + .client + .delete_objects() + .bucket(bucket_name) + .delete(Delete::builder().set_objects(Some(objects)).build()) + .send() + .await + .map_err(|e| { + error!("S3 failed to batch delete objects"); + Error::AwsSdk(e.into()) + })?; + + Ok(()) + } } /// Represents a multipart upload session to the AWS S3 diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; @@ -11,6 +11,7 @@ use tonic::codegen::futures_core::Stream; use tracing::{debug, error, trace, warn}; +use crate::config::CONFIG; use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; use crate::database::types::{ BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, @@ -295,15 +296,28 @@ unchecked_items.feed_with_query_results(fetch_results); checked.extend(unchecked_items.filter_out_checked()); - /// 7. Make changes to database + // 7. Perform actual cleanup orphans.extend(unchecked_items.into_primary_keys()); + let s3_paths = orphans + .iter() + .map(|item| &item.blob_hash) + // remove duplicates (they come from holders having the same blob hash) + .collect::>() + .into_iter() + .map(|blob_hash| S3Path { + bucket_name: CONFIG.s3_bucket_name.clone(), + object_name: blob_hash.to_string(), + }) + .collect(); + /// 7a. Make changes to database tokio::try_join!( self.db.batch_delete_rows(orphans), self.db.batch_mark_checked(checked) )?; - // TODO: Delete orphaned blobs from S3 + // 7b. Delete orphaned blobs from S3 + self.s3.batch_delete_objects(s3_paths).await?; Ok(()) }