Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/service.rs
Show First 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | ) -> Result<Response<Self::GetStream>, Status> { | ||||
}); | }); | ||||
let out_stream = ReceiverStream::new(rx); | let out_stream = ReceiverStream::new(rx); | ||||
Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) | Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) | ||||
} | } | ||||
async fn remove( | async fn remove( | ||||
&self, | &self, | ||||
_request: Request<blob::RemoveRequest>, | request: Request<blob::RemoveRequest>, | ||||
) -> Result<Response<()>, Status> { | ) -> Result<Response<()>, Status> { | ||||
Err(Status::unimplemented("Not implemented yet")) | let message = request.into_inner(); | ||||
let holder = message.holder.as_str(); | |||||
let reverse_index_item = self | |||||
.db | |||||
.find_reverse_index_by_holder(holder) | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))? | |||||
.ok_or_else(|| Status::not_found("Blob not found"))?; | |||||
let blob_hash = &reverse_index_item.blob_hash; | |||||
if self.db.remove_reverse_index_item(holder).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
// TODO handle cleanup here properly | |||||
// for now the object's being removed right away | |||||
// after the last holder was removed | |||||
if self | |||||
.db | |||||
.find_reverse_index_by_hash(blob_hash) | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))? | |||||
.is_empty() | |||||
{ | |||||
let s3_path = self | |||||
.find_s3_path_by_reverse_index(&reverse_index_item) | |||||
.await?; | |||||
self | |||||
.s3 | |||||
.delete_object() | |||||
.bucket(&s3_path.bucket_name) | |||||
.key(&s3_path.object_name) | |||||
.send() | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))?; | |||||
if self.db.remove_blob_item(blob_hash).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
} | |||||
Ok(Response::new(())) | |||||
} | } | ||||
} | } |