Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/service.rs
Show First 20 Lines • Show All 135 Lines • ▼ Show 20 Lines | ) -> Result<Response<Self::GetStream>, Status> { | ||||
}); | }); | ||||
let out_stream = ReceiverStream::new(rx); | let out_stream = ReceiverStream::new(rx); | ||||
Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) | Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) | ||||
} | } | ||||
async fn remove( | async fn remove( | ||||
&self, | &self, | ||||
_request: Request<blob::RemoveRequest>, | request: Request<blob::RemoveRequest>, | ||||
) -> Result<Response<()>, Status> { | ) -> Result<Response<()>, Status> { | ||||
Err(Status::unimplemented("Not implemented yet")) | let message = request.into_inner(); | ||||
let holder = message.holder.as_str(); | |||||
let reverse_index_item = self | |||||
.db | |||||
.find_reverse_index_by_holder(holder) | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))? | |||||
.ok_or_else(|| Status::not_found("Blob not found"))?; | |||||
let blob_hash = &reverse_index_item.blob_hash; | |||||
if self.db.remove_reverse_index_item(holder).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
// TODO handle cleanup here properly | |||||
// for now the object's being removed right away | |||||
// after the last holder was removed | |||||
if self | |||||
.db | |||||
.find_reverse_index_by_hash(blob_hash) | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))? | |||||
.is_empty() | |||||
{ | |||||
let s3_path = self | |||||
.find_s3_path_by_reverse_index(&reverse_index_item) | |||||
.await?; | |||||
self | |||||
.s3 | |||||
.delete_object() | |||||
.bucket(&s3_path.bucket_name) | |||||
.key(&s3_path.object_name) | |||||
.send() | |||||
.await | |||||
.map_err(|_| Status::aborted("Internal error"))?; | |||||
if self.db.remove_blob_item(blob_hash).await.is_err() { | |||||
return Err(Status::aborted("Internal error")); | |||||
} | |||||
} | |||||
tomek: If deleting reverse index is successful but then deleting the item fails, we would end up in a… | |||||
varunUnsubmitted Not Done Inline ActionsI think we can introduce a “nanny task” later that checks periodically for “orphaned” items and removes them varun: I think we can introduce a “nanny task” later that checks periodically for “orphaned” items and… | |||||
bartekAuthorUnsubmitted Done Inline ActionsI admit that this code is problematic as failure anywhere results in an invalid state - tables will go out of sync. IMO removing reverse-index first is the best solution for now, because it makes it inaccessible anymore, even when further deletion (s3 or blob table) goes wrong.
+1 on this, I think it's more related to ENG-414 bartek: I admit that this code is problematic as failure anywhere results in an invalid state - tables… | |||||
Ok(Response::new(())) | |||||
} | } | ||||
} | } |
If deleting reverse index is successful but then deleting the item fails, we would end up in a state where retrying will always fail. This isn't too serious, as we're planning to introduce proper cleanup procedure later, but maybe we should consider making it safer now.
The current solution matches what we already have, so maybe it is a good idea to create a followup task for handling deleting in a safer manner.