diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -18,7 +18,7 @@ BLOB_S3_BUCKET_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, - s3::S3Path, + s3::{S3Path, S3PathError}, }; #[derive(Clone, Debug)] @@ -321,7 +321,7 @@ #[derive(Debug)] pub enum BlobDBError { HolderAlreadyExists(String), - InvalidS3Path(anyhow::Error), + InvalidS3Path(S3PathError), } impl Display for BlobDBError { diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,4 +1,3 @@ -use anyhow::{anyhow, ensure, Result}; use aws_sdk_s3::{ model::{CompletedMultipartUpload, CompletedPart}, output::CreateMultipartUploadOutput, @@ -9,6 +8,7 @@ ops::{Bound, RangeBounds}, sync::Arc, }; +use tracing::error; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, @@ -49,7 +49,6 @@ } } -#[allow(unused)] type S3Result = Result; /// A helper structure representing an S3 object path @@ -62,23 +61,20 @@ impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` - pub fn from_full_path(full_path: &str) -> Result { + pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { - return Err(anyhow!( - "S3 path [{}] should contain the '/' separator", - full_path - )); + return Err(S3PathError::MissingSeparator(full_path.to_string())); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() - .ok_or(anyhow!("Expected bucket name in path [{}]", full_path))? + .ok_or_else(|| S3PathError::MissingBucketName(full_path.to_string()))? .to_string(), object_name: split .next() - .ok_or(anyhow!("Expected object name in path [{}]", full_path))? + .ok_or_else(|| S3PathError::MissingObjectName(full_path.to_string()))? .to_string(), }) } @@ -109,7 +105,7 @@ pub async fn start_upload_session( &self, s3_path: &S3Path, - ) -> Result { + ) -> S3Result { MultiPartUploadSession::start(&self.client, s3_path).await } @@ -117,14 +113,18 @@ pub async fn get_object_metadata( &self, s3_path: &S3Path, - ) -> Result { + ) -> S3Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() - .await?; + .await + .map_err(|e| { + error!("S3 failed to get object metadata"); + Error::AwsSdk(e.into()) + })?; Ok(response) } @@ -136,7 +136,7 @@ &self, s3_path: &S3Path, range: impl RangeBounds, - ) -> Result> { + ) -> S3Result> { let mut request = self .client .get_object() @@ -160,20 +160,30 @@ request = request.range(range); } - let response = request.send().await?; - let data = response.body.collect().await?; + let response = request.send().await.map_err(|e| { + error!("S3 failed to get object"); + Error::AwsSdk(e.into()) + })?; + let data = response.body.collect().await.map_err(|e| { + error!("S3 failed to stream object bytes"); + Error::ByteStream(e.into()) + })?; Ok(data.to_vec()) } /// Deletes object at provided path - pub async fn delete_object(&self, s3_path: &S3Path) -> Result<()> { + pub async fn delete_object(&self, s3_path: &S3Path) -> S3Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() - .await?; + .await + .map_err(|e| { + error!("S3 failed to delete object"); + Error::AwsSdk(e.into()) + })?; Ok(()) } @@ -194,17 +204,22 @@ async fn start( client: &Arc, s3_path: &S3Path, - ) -> Result { + ) -> S3Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() - .await?; + .await + .map_err(|e| { + error!("S3 failed to start upload session"); + Error::AwsSdk(e.into()) + })?; - let upload_id = multipart_upload_res - .upload_id() - .ok_or(anyhow!("Upload ID expected to be present"))?; + let upload_id = multipart_upload_res.upload_id().ok_or_else(|| { + error!("Upload ID expected to be present"); + Error::MissingUploadID + })?; Ok(MultiPartUploadSession { client: client.clone(), @@ -216,7 +231,7 @@ } /// adds data part to the multipart upload - pub async fn add_part(&mut self, part: Vec) -> Result<()> { + pub async fn add_part(&mut self, part: Vec) -> S3Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self @@ -228,7 +243,11 @@ .part_number(part_number) .body(stream) .send() - .await?; + .await + .map_err(|e| { + error!("Failed to add upload part"); + Error::AwsSdk(e.into()) + })?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) @@ -239,12 +258,10 @@ } /// finishes the upload - pub async fn finish_upload(&self) -> Result<()> { - // TODO: handle this as S3-specific error - ensure!( - !self.upload_parts.is_empty(), - "There are no parts to upload" - ); + pub async fn finish_upload(&self) -> S3Result<()> { + if self.upload_parts.is_empty() { + return Err(Error::EmptyUpload); + } let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) @@ -258,7 +275,11 @@ .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() - .await?; + .await + .map_err(|e| { + error!("Failed to finish upload session"); + Error::AwsSdk(e.into()) + })?; Ok(()) }