diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -5,6 +5,7 @@ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; +use tracing::{debug, error, info, instrument, trace, warn, Instrument}; use crate::{ constants::{ @@ -39,8 +40,14 @@ let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), - Ok(None) => Err(Status::not_found("blob not found")), - Err(_) => Err(Status::aborted("internal error")), + Ok(None) => { + debug!("No blob found for {:?}", reverse_index_item); + Err(Status::not_found("blob not found")) + } + Err(err) => { + error!("Failed to find blob item: {:?}", err); + Err(Status::aborted("internal error")) + } } } @@ -52,8 +59,14 @@ Ok(Some(reverse_index)) => { self.find_s3_path_by_reverse_index(&reverse_index).await } - Ok(None) => Err(Status::not_found("blob not found")), - Err(_) => Err(Status::aborted("internal error")), + Ok(None) => { + debug!("No db entry found for holder {:?}", holder); + Err(Status::not_found("blob not found")) + } + Err(err) => { + error!("Failed to find reverse index: {:?}", err); + Err(Status::aborted("internal error")) + } } } } @@ -63,15 +76,18 @@ impl BlobService for MyBlobService { type PutStream = Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>; + + #[instrument(skip_all, fields(holder, blob_hash))] async fn put( &self, request: Request<tonic::Streaming<blob::PutRequest>>, ) -> Result<Response<Self::PutStream>, Status> { + info!("Put blob request: {:?}", request); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); let s3 = self.s3.clone(); - tokio::spawn(async move { + let worker = async move { let mut put_handler = PutHandler::new(&db, &s3); while let Some(message) = in_stream.next().await { @@ -85,21 +101,28 @@ Ok(blob::PutRequest { data: Some(blob::put_request::Data::DataChunk(new_data)), }) => put_handler.handle_data_chunk(new_data).await, - _ => Err(Status::unknown("unknown error")), + unexpected => { + error!("Received an unexpected Result: {:?}", unexpected); + Err(Status::unknown("unknown error")) + } }; + trace!("Sending response: {:?}", response); if let Err(e) = tx.send(response).await { - println!("Response was dropped: {}", e); + error!("Response was dropped: {}", e); break; } if put_handler.should_close_stream { + trace!("Put handler requested to close stream"); break; } } if let Err(status) = put_handler.finish().await { + trace!("Sending error response: {:?}", status); let _ = tx.send(Err(status)).await; } - }); + }; + tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) @@ -107,12 +130,16 @@ type GetStream = Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>; + + #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))] async fn get( &self, request: Request<blob::GetRequest>, ) -> Result<Response<Self::GetStream>, Status> { + info!("Get blob request: {:?}", request); let message: blob::GetRequest = request.into_inner(); let s3_path = self.find_s3_path_by_holder(&message.holder).await?; + tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = self .s3 @@ -121,23 +148,28 @@ .key(s3_path.object_name.clone()) .send() .await - .map_err(|_| Status::aborted("server error"))?; - - let file_size: u64 = object_metadata - .content_length() - .try_into() - .map_err(|_| Status::aborted("server error"))?; + .map_err(|err| { + error!("Failed to get S3 object metadata: {:?}", err); + Status::aborted("server error") + })?; + + let file_size: u64 = + object_metadata.content_length().try_into().map_err(|err| { + error!("Failed to get S3 object content length: {:?}", err); + Status::aborted("server error") + })?; let chunk_size: u64 = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let s3 = self.s3.clone(); - tokio::spawn(async move { + let worker = async move { let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); let range = format!("bytes={}-{}", offset, offset + next_size - 1); + trace!(range, "Getting {} bytes of data", next_size); let data = match s3 .get_object() @@ -158,38 +190,51 @@ Ok(data) => Ok(blob::GetResponse { data_chunk: data.into_bytes().to_vec(), }), - Err(_) => Err(Status::aborted("download failed")), + Err(err) => { + error!("Failed to download data chunk: {:?}", err); + Err(Status::aborted("download failed")) + } }; let should_abort = response.is_err(); if let Err(e) = tx.send(response).await { - println!("Response was dropped: {}", e); + error!("Response was dropped: {}", e); break; } if should_abort { + trace!("Error response, aborting"); break; } offset += chunk_size; } - }); + }; + tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) } + #[instrument(skip_all, fields(holder = %request.get_ref().holder))] async fn remove( &self, request: Request<blob::RemoveRequest>, ) -> Result<Response<()>, Status> { + info!("Remove blob request: {:?}", request); let message = request.into_inner(); let holder = message.holder.as_str(); let reverse_index_item = self .db .find_reverse_index_by_holder(holder) .await - .map_err(|_| Status::aborted("Internal error"))? - .ok_or_else(|| Status::not_found("Blob not found"))?; + .map_err(|err| { + error!("Failed to find reverse index: {:?}", err); + Status::aborted("Internal error") + })? + .ok_or_else(|| { + debug!("Blob not found"); + Status::not_found("Blob not found") + })?; let blob_hash = &reverse_index_item.blob_hash; if self.db.remove_reverse_index_item(holder).await.is_err() { @@ -203,7 +248,10 @@ .db .find_reverse_index_by_hash(blob_hash) .await - .map_err(|_| Status::aborted("Internal error"))? + .map_err(|err| { + error!("Failed to find reverse index: {:?}", err); + Status::aborted("Internal error") + })? .is_empty() { let s3_path = self @@ -217,9 +265,13 @@ .key(&s3_path.object_name) .send() .await - .map_err(|_| Status::aborted("Internal error"))?; + .map_err(|err| { + error!("Failed to delete S3 object: {:?}", err); + Status::aborted("Internal error") + })?; - if self.db.remove_blob_item(blob_hash).await.is_err() { + if let Err(err) = self.db.remove_blob_item(blob_hash).await { + error!("Failed to remove blob item from database: {:?}", err); return Err(Status::aborted("Internal error")); } } @@ -266,16 +318,20 @@ pub async fn handle_holder(&mut self, new_holder: String) -> PutResult { if self.holder.is_some() { + warn!("Holder already provided"); return Err(Status::invalid_argument("Holder already provided")); } + tracing::Span::current().record("holder", &new_holder); self.holder = Some(new_holder); self.determine_action().await } pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult { if self.blob_hash.is_some() { + warn!("Blob hash already provided"); return Err(Status::invalid_argument("Blob hash already provided")); } + tracing::Span::current().record("blob_hash", &new_hash); self.blob_hash = Some(new_hash); self.determine_action().await } @@ -285,6 +341,7 @@ // this should be called only if action isn't determined yet // this error should actually never happen if self.action.is_some() { + error!("Put action is already started"); return Err(Status::failed_precondition("Put action is already started")); } @@ -301,12 +358,14 @@ match self.db.find_blob_item(blob_hash).await { // Hash already exists, so we're only assigning a new holder to it Ok(Some(_)) => { + debug!("Blob found, assigning holder"); self.action = Some(PutAction::AssignHolder); self.should_close_stream = true; Ok(blob::PutResponse { data_exists: true }) } // Hash doesn't exist, so we're starting a new upload session Ok(None) => { + debug!("Blob not found, starting upload action"); self.action = Some(PutAction::UploadNewBlob(BlobItem { blob_hash: blob_hash.to_string(), s3_path: S3Path { @@ -317,8 +376,12 @@ })); Ok(blob::PutResponse { data_exists: false }) } - Err(_db_err) => { + Err(db_err) => { self.should_close_stream = true; + error!( + "Error when finding BlobItem by hash {}: {:?}", + blob_hash, db_err + ); Err(Status::aborted("Internal error")) } } @@ -332,20 +395,24 @@ Some(PutAction::UploadNewBlob(blob_item)) => blob_item, _ => { self.should_close_stream = true; + error!("Data chunk sent before upload action is started"); return Err(Status::invalid_argument( "Holder and hash should be provided before data", )); } }; + trace!("Received {} bytes of data", new_data.len()); // create upload session if it doesn't already exist if self.uploader.is_none() { + debug!("Uploader doesn't exist, starting new session"); self.uploader = match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await { Ok(session) => Some(session), - Err(_) => { + Err(err) => { self.should_close_stream = true; + error!("Failed to create upload session: {:?}", err); return Err(Status::aborted("Internal error")); } } @@ -357,8 +424,10 @@ self.current_chunk.append(&mut new_data); if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { - if uploader.add_part(self.current_chunk.clone()).await.is_err() { + trace!("Chunk size exceeded, adding new S3 part"); + if let Err(err) = uploader.add_part(self.current_chunk.clone()).await { self.should_close_stream = true; + error!("Failed to upload S3 part: {:?}", err); return Err(Status::aborted("Internal error")); } self.current_chunk.clear(); @@ -372,14 +441,17 @@ /// after this is called. pub async fn finish(self) -> Result<(), Status> { if self.action.is_none() { + debug!("No action to perform, finishing now"); return Ok(()); } - let holder = self - .holder - .ok_or_else(|| Status::aborted("Internal error"))?; - let blob_hash = self - .blob_hash - .ok_or_else(|| Status::aborted("Internal error"))?; + let holder = self.holder.ok_or_else(|| { + error!("Cannot finish action. No holder provided!"); + Status::aborted("Internal error") + })?; + let blob_hash = self.blob_hash.ok_or_else(|| { + error!("Cannot finish action. No blob hash provided!"); + Status::aborted("Internal error") + })?; let blob_item = match self.action { None => return Ok(()), Some(PutAction::AssignHolder) => { @@ -388,25 +460,33 @@ Some(PutAction::UploadNewBlob(blob_item)) => blob_item, }; - let mut uploader = self - .uploader - .ok_or_else(|| Status::aborted("Internal error"))?; + let mut uploader = self.uploader.ok_or_else(|| { + // This also happens when client cancels before sending any data chunk + warn!("No uploader was created, finishing now"); + Status::aborted("Internal error") + })?; - if !self.current_chunk.is_empty() - && uploader.add_part(self.current_chunk).await.is_err() - { - return Err(Status::aborted("Internal error")); + if !self.current_chunk.is_empty() { + if let Err(err) = uploader.add_part(self.current_chunk).await { + error!("Failed to upload final part: {:?}", err); + return Err(Status::aborted("Internal error")); + } } - if uploader.finish_upload().await.is_err() { + if let Err(err) = uploader.finish_upload().await { + error!("Failed to finish upload session: {:?}", err); return Err(Status::aborted("Internal error")); } - if self.db.put_blob_item(blob_item).await.is_err() { + if let Err(err) = self.db.put_blob_item(blob_item).await { + error!("Failed to save BlobItem: {:?}", err); return Err(Status::aborted("Internal error")); } - assign_holder_to_blob(&self.db, holder, blob_hash).await + assign_holder_to_blob(&self.db, holder, blob_hash).await?; + + debug!("Upload finished successfully"); + Ok(()) } } @@ -417,7 +497,8 @@ ) -> Result<(), Status> { let reverse_index_item = ReverseIndexItem { holder, blob_hash }; - if db.put_reverse_index_item(reverse_index_item).await.is_err() { + if let Err(err) = db.put_reverse_index_item(reverse_index_item).await { + error!("Failed to put reverse index: {:?}", err); return Err(Status::aborted("Internal error")); } Ok(())