diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -144,8 +144,50 @@ async fn remove( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let message = request.into_inner(); + let holder = message.holder.as_str(); + let reverse_index_item = self + .db + .find_reverse_index_by_holder(holder) + .await + .map_err(|_| Status::aborted("Internal error"))? + .ok_or_else(|| Status::not_found("Blob not found"))?; + let blob_hash = &reverse_index_item.blob_hash; + + if self.db.remove_reverse_index_item(holder).await.is_err() { + return Err(Status::aborted("Internal error")); + } + + // TODO handle cleanup here properly + // for now the object's being removed right away + // after the last holder was removed + if self + .db + .find_reverse_index_by_hash(blob_hash) + .await + .map_err(|_| Status::aborted("Internal error"))? + .is_empty() + { + let s3_path = self + .find_s3_path_by_reverse_index(&reverse_index_item) + .await?; + + self + .s3 + .delete_object() + .bucket(&s3_path.bucket_name) + .key(&s3_path.object_name) + .send() + .await + .map_err(|_| Status::aborted("Internal error"))?; + + if self.db.remove_blob_item(blob_hash).await.is_err() { + return Err(Status::aborted("Internal error")); + } + } + + Ok(Response::new(())) } }