diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -166,6 +166,61 @@ self.db.put_blob_item(blob_item).await?; Ok(()) } + + pub async fn assign_holder( + &self, + blob_hash: impl Into, + holder: impl Into, + ) -> BlobServiceResult { + let blob_hash: String = blob_hash.into(); + trace!(blob_hash, "Attempting to assign holder"); + self + .db + .put_holder_assignment(blob_hash.clone(), holder.into()) + .await?; + + trace!("Holder assigned. Checking if data exists"); + let data_exists = self.db.get_blob_item(blob_hash).await?.is_some(); + Ok(data_exists) + } + + pub async fn revoke_holder( + &self, + blob_hash: impl Into, + holder: impl Into, + ) -> BlobServiceResult<()> { + let blob_hash: String = blob_hash.into(); + let holder: String = holder.into(); + + trace!(blob_hash, holder, "Attempting to revoke holder"); + self.db.delete_holder_assignment(&blob_hash, holder).await?; + + if self.config.instant_delete_orphaned_blobs { + trace!("Instant orphan deletion enabled. Looking for holders"); + let is_orphan = self + .db + .list_blob_holders(&blob_hash, Some(1)) + .await? + .is_empty(); + if !is_orphan { + trace!("Found holders, nothing to do"); + return Ok(()); + } + + debug!("No holders left, deleting blob if exists"); + trace!("Getting blob item"); + let Some(blob_item) = self.db.get_blob_item(&blob_hash).await? else { + trace!("Blob item not found, nothing to do"); + return Ok(()) + }; + + trace!("Deleting S3 object"); + self.s3.delete_object(&blob_item.s3_path).await?; + trace!("Deleting blob item entry from DB"); + self.db.delete_blob_item(blob_hash).await?; + } + Ok(()) + } } pub struct BlobDownloadObject {