diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -7,6 +7,7 @@ fmt::{Display, Formatter}, sync::Arc, }; +use tracing::error; use crate::{ constants::{ @@ -69,7 +70,10 @@ .set_item(Some(item)) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .map_err(|e| { + error!("DynamoDB client failed to put blob item"); + Error::AwsSdk(e.into()) + })?; Ok(()) } @@ -89,8 +93,10 @@ .set_key(Some(item_key)) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))? - { + .map_err(|e| { + error!("DynamoDB client failed to find blob item"); + Error::AwsSdk(e.into()) + })? { GetItemOutput { item: Some(mut item), .. @@ -129,7 +135,10 @@ ) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .map_err(|e| { + error!("DynamoDB client failed to remove blob item"); + Error::AwsSdk(e.into()) + })?; Ok(()) } @@ -142,6 +151,7 @@ ) -> Result<(), Error> { let holder = &reverse_index_item.holder; if self.find_reverse_index_by_holder(holder).await?.is_some() { + error!("Failed to put reverse index. Holder already exists."); return Err(Error::Blob(BlobDBError::HolderAlreadyExists( holder.to_string(), ))); @@ -164,7 +174,10 @@ .set_item(Some(item)) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .map_err(|e| { + error!("DynamoDB client failed to put reverse index"); + Error::AwsSdk(e.into()) + })?; Ok(()) } @@ -185,8 +198,10 @@ .consistent_read(true) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))? - { + .map_err(|e| { + error!("DynamoDB client failed to find reverse index by holder"); + Error::AwsSdk(e.into()) + })? { GetItemOutput { item: Some(mut item), .. @@ -226,7 +241,10 @@ ) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .map_err(|e| { + error!("DynamoDB client failed to find reverse indexes by hash"); + Error::AwsSdk(e.into()) + })?; if response.count == 0 { return Ok(vec![]); @@ -264,7 +282,10 @@ ) .send() .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .map_err(|e| { + error!("DynamoDB client failed to remove reverse index"); + Error::AwsSdk(e.into()) + })?; Ok(()) } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use aws_sdk_dynamodb::Error as DynamoDBError; use blob::blob_service_server::BlobService; use chrono::Utc; use std::pin::Pin; @@ -12,7 +13,7 @@ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, - database::{BlobItem, DatabaseClient, ReverseIndexItem}, + database::{BlobItem, DatabaseClient, Error as DBError, ReverseIndexItem}, s3::{MultiPartUploadSession, S3Client, S3Path}, tools::MemOps, }; @@ -45,10 +46,7 @@ debug!("No blob found for {:?}", reverse_index_item); Err(Status::not_found("blob not found")) } - Err(err) => { - error!("Failed to find blob item: {:?}", err); - Err(Status::aborted("internal error")) - } + Err(err) => Err(handle_db_error(err)), } } @@ -64,10 +62,7 @@ debug!("No db entry found for holder {:?}", holder); Err(Status::not_found("blob not found")) } - Err(err) => { - error!("Failed to find reverse index: {:?}", err); - Err(Status::aborted("internal error")) - } + Err(err) => Err(handle_db_error(err)), } } } @@ -205,19 +200,18 @@ .db .find_reverse_index_by_holder(holder) .await - .map_err(|err| { - error!("Failed to find reverse index: {:?}", err); - Status::aborted("Internal error") - })? + .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); Status::not_found("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; - if self.db.remove_reverse_index_item(holder).await.is_err() { - return Err(Status::aborted("Internal error")); - } + self + .db + .remove_reverse_index_item(holder) + .await + .map_err(handle_db_error)?; // TODO handle cleanup here properly // for now the object's being removed right away @@ -226,10 +220,7 @@ .db .find_reverse_index_by_hash(blob_hash) .await - .map_err(|err| { - error!("Failed to find reverse index: {:?}", err); - Status::aborted("Internal error") - })? + .map_err(handle_db_error)? .is_empty() { let s3_path = self @@ -241,10 +232,11 @@ Status::aborted("Internal error") })?; - if let Err(err) = self.db.remove_blob_item(blob_hash).await { - error!("Failed to remove blob item from database: {:?}", err); - return Err(Status::aborted("Internal error")); - } + self + .db + .remove_blob_item(blob_hash) + .await + .map_err(handle_db_error)?; } Ok(Response::new(())) @@ -349,11 +341,7 @@ } Err(db_err) => { self.should_close_stream = true; - error!( - "Error when finding BlobItem by hash {}: {:?}", - blob_hash, db_err - ); - Err(Status::aborted("Internal error")) + Err(handle_db_error(db_err)) } } } @@ -447,10 +435,11 @@ return Err(Status::aborted("Internal error")); } - if let Err(err) = self.db.put_blob_item(blob_item).await { - error!("Failed to save BlobItem: {:?}", err); - return Err(Status::aborted("Internal error")); - } + self + .db + .put_blob_item(blob_item) + .await + .map_err(handle_db_error)?; assign_holder_to_blob(&self.db, holder, blob_hash).await?; @@ -466,9 +455,28 @@ ) -> Result<(), Status> { let reverse_index_item = ReverseIndexItem { holder, blob_hash }; - if let Err(err) = db.put_reverse_index_item(reverse_index_item).await { - error!("Failed to put reverse index: {:?}", err); - return Err(Status::aborted("Internal error")); + db.put_reverse_index_item(reverse_index_item) + .await + .map_err(handle_db_error) +} + +fn handle_db_error(db_error: DBError) -> Status { + match db_error { + DBError::AwsSdk(DynamoDBError::InternalServerError(_)) + | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( + _, + )) + | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { + warn!("AWS transient error occurred"); + Status::unavailable("please retry") + } + DBError::Blob(e) => { + error!("Encountered Blob database error: {}", e); + Status::failed_precondition("Internal error") + } + e => { + error!("Encountered an unexpected error: {}", e); + Status::failed_precondition("unexpected error") + } } - Ok(()) }