Page MenuHomePhabricator

D5859.id19308.diff
No OneTemporary

D5859.id19308.diff

diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs
--- a/services/blob/src/database.rs
+++ b/services/blob/src/database.rs
@@ -7,6 +7,7 @@
fmt::{Display, Formatter},
sync::Arc,
};
+use tracing::error;
use crate::{
constants::{
@@ -69,7 +70,10 @@
.set_item(Some(item))
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?;
+ .map_err(|e| {
+ error!("DynamoDB client failed to put blob item");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}
@@ -89,8 +93,10 @@
.set_key(Some(item_key))
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?
- {
+ .map_err(|e| {
+ error!("DynamoDB client failed to find blob item");
+ Error::AwsSdk(e.into())
+ })? {
GetItemOutput {
item: Some(mut item),
..
@@ -129,7 +135,10 @@
)
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?;
+ .map_err(|e| {
+ error!("DynamoDB client failed to remove blob item");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}
@@ -142,6 +151,7 @@
) -> Result<(), Error> {
let holder = &reverse_index_item.holder;
if self.find_reverse_index_by_holder(holder).await?.is_some() {
+ error!("Failed to put reverse index. Holder already exists.");
return Err(Error::Blob(BlobDBError::HolderAlreadyExists(
holder.to_string(),
)));
@@ -164,7 +174,10 @@
.set_item(Some(item))
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?;
+ .map_err(|e| {
+ error!("DynamoDB client failed to put reverse index");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}
@@ -185,8 +198,10 @@
.consistent_read(true)
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?
- {
+ .map_err(|e| {
+ error!("DynamoDB client failed to find reverse index by holder");
+ Error::AwsSdk(e.into())
+ })? {
GetItemOutput {
item: Some(mut item),
..
@@ -226,7 +241,10 @@
)
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?;
+ .map_err(|e| {
+ error!("DynamoDB client failed to find reverse indexes by hash");
+ Error::AwsSdk(e.into())
+ })?;
if response.count == 0 {
return Ok(vec![]);
@@ -264,7 +282,10 @@
)
.send()
.await
- .map_err(|e| Error::AwsSdk(e.into()))?;
+ .map_err(|e| {
+ error!("DynamoDB client failed to remove reverse index");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,4 +1,5 @@
use anyhow::Result;
+use aws_sdk_dynamodb::Error as DynamoDBError;
use blob::blob_service_server::BlobService;
use chrono::Utc;
use std::pin::Pin;
@@ -12,7 +13,7 @@
BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
- database::{BlobItem, DatabaseClient, ReverseIndexItem},
+ database::{BlobItem, DatabaseClient, Error as DBError, ReverseIndexItem},
s3::{MultiPartUploadSession, S3Client, S3Path},
tools::MemOps,
};
@@ -45,10 +46,7 @@
debug!("No blob found for {:?}", reverse_index_item);
Err(Status::not_found("blob not found"))
}
- Err(err) => {
- error!("Failed to find blob item: {:?}", err);
- Err(Status::aborted("internal error"))
- }
+ Err(err) => Err(handle_db_error(err)),
}
}
@@ -64,10 +62,7 @@
debug!("No db entry found for holder {:?}", holder);
Err(Status::not_found("blob not found"))
}
- Err(err) => {
- error!("Failed to find reverse index: {:?}", err);
- Err(Status::aborted("internal error"))
- }
+ Err(err) => Err(handle_db_error(err)),
}
}
}
@@ -205,19 +200,18 @@
.db
.find_reverse_index_by_holder(holder)
.await
- .map_err(|err| {
- error!("Failed to find reverse index: {:?}", err);
- Status::aborted("Internal error")
- })?
+ .map_err(handle_db_error)?
.ok_or_else(|| {
debug!("Blob not found");
Status::not_found("Blob not found")
})?;
let blob_hash = &reverse_index_item.blob_hash;
- if self.db.remove_reverse_index_item(holder).await.is_err() {
- return Err(Status::aborted("Internal error"));
- }
+ self
+ .db
+ .remove_reverse_index_item(holder)
+ .await
+ .map_err(handle_db_error)?;
// TODO handle cleanup here properly
// for now the object's being removed right away
@@ -226,10 +220,7 @@
.db
.find_reverse_index_by_hash(blob_hash)
.await
- .map_err(|err| {
- error!("Failed to find reverse index: {:?}", err);
- Status::aborted("Internal error")
- })?
+ .map_err(handle_db_error)?
.is_empty()
{
let s3_path = self
@@ -241,10 +232,11 @@
Status::aborted("Internal error")
})?;
- if let Err(err) = self.db.remove_blob_item(blob_hash).await {
- error!("Failed to remove blob item from database: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
+ self
+ .db
+ .remove_blob_item(blob_hash)
+ .await
+ .map_err(handle_db_error)?;
}
Ok(Response::new(()))
@@ -349,11 +341,7 @@
}
Err(db_err) => {
self.should_close_stream = true;
- error!(
- "Error when finding BlobItem by hash {}: {:?}",
- blob_hash, db_err
- );
- Err(Status::aborted("Internal error"))
+ Err(handle_db_error(db_err))
}
}
}
@@ -447,10 +435,11 @@
return Err(Status::aborted("Internal error"));
}
- if let Err(err) = self.db.put_blob_item(blob_item).await {
- error!("Failed to save BlobItem: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
+ self
+ .db
+ .put_blob_item(blob_item)
+ .await
+ .map_err(handle_db_error)?;
assign_holder_to_blob(&self.db, holder, blob_hash).await?;
@@ -466,9 +455,28 @@
) -> Result<(), Status> {
let reverse_index_item = ReverseIndexItem { holder, blob_hash };
- if let Err(err) = db.put_reverse_index_item(reverse_index_item).await {
- error!("Failed to put reverse index: {:?}", err);
- return Err(Status::aborted("Internal error"));
+ db.put_reverse_index_item(reverse_index_item)
+ .await
+ .map_err(handle_db_error)
+}
+
+fn handle_db_error(db_error: DBError) -> Status {
+ match db_error {
+ DBError::AwsSdk(DynamoDBError::InternalServerError(_))
+ | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException(
+ _,
+ ))
+ | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => {
+ warn!("AWS transient error occurred");
+ Status::unavailable("please retry")
+ }
+ DBError::Blob(e) => {
+ error!("Encountered Blob database error: {}", e);
+ Status::failed_precondition("Internal error")
+ }
+ e => {
+ error!("Encountered an unexpected error: {}", e);
+ Status::failed_precondition("unexpected error")
+ }
}
- Ok(())
}

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 9, 9:34 AM (5 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2831651
Default Alt Text
D5859.id19308.diff (7 KB)

Event Timeline