Page MenuHomePhorge

D8101.1768425181.diff
No OneTemporary

Size
6 KB
Referenced Files
None
Subscribers
None

D8101.1768425181.diff

diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs
--- a/services/blob/src/database.rs
+++ b/services/blob/src/database.rs
@@ -18,7 +18,7 @@
BLOB_S3_BUCKET_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD,
BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD,
},
- s3::S3Path,
+ s3::{S3Path, S3PathError},
};
#[derive(Clone, Debug)]
@@ -321,7 +321,7 @@
#[derive(Debug)]
pub enum BlobDBError {
HolderAlreadyExists(String),
- InvalidS3Path(anyhow::Error),
+ InvalidS3Path(S3PathError),
}
impl Display for BlobDBError {
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -1,4 +1,3 @@
-use anyhow::{anyhow, ensure, Result};
use aws_sdk_s3::{
model::{CompletedMultipartUpload, CompletedPart},
output::CreateMultipartUploadOutput,
@@ -9,6 +8,7 @@
ops::{Bound, RangeBounds},
sync::Arc,
};
+use tracing::error;
#[derive(
Debug, derive_more::Display, derive_more::From, derive_more::Error,
@@ -49,7 +49,6 @@
}
}
-#[allow(unused)]
type S3Result<T> = Result<T, Error>;
/// A helper structure representing an S3 object path
@@ -62,23 +61,20 @@
impl S3Path {
/// Constructs an [`S3Path`] from given string
/// The path should be in the following format: `[bucket_name]/[object_name]`
- pub fn from_full_path(full_path: &str) -> Result<Self> {
+ pub fn from_full_path(full_path: &str) -> Result<Self, S3PathError> {
if !full_path.contains('/') {
- return Err(anyhow!(
- "S3 path [{}] should contain the '/' separator",
- full_path
- ));
+ return Err(S3PathError::MissingSeparator(full_path.to_string()));
}
let mut split = full_path.split('/');
Ok(S3Path {
bucket_name: split
.next()
- .ok_or(anyhow!("Expected bucket name in path [{}]", full_path))?
+ .ok_or_else(|| S3PathError::MissingBucketName(full_path.to_string()))?
.to_string(),
object_name: split
.next()
- .ok_or(anyhow!("Expected object name in path [{}]", full_path))?
+ .ok_or_else(|| S3PathError::MissingObjectName(full_path.to_string()))?
.to_string(),
})
}
@@ -109,7 +105,7 @@
pub async fn start_upload_session(
&self,
s3_path: &S3Path,
- ) -> Result<MultiPartUploadSession> {
+ ) -> S3Result<MultiPartUploadSession> {
MultiPartUploadSession::start(&self.client, s3_path).await
}
@@ -117,14 +113,18 @@
pub async fn get_object_metadata(
&self,
s3_path: &S3Path,
- ) -> Result<aws_sdk_s3::output::HeadObjectOutput> {
+ ) -> S3Result<aws_sdk_s3::output::HeadObjectOutput> {
let response = self
.client
.head_object()
.bucket(s3_path.bucket_name.clone())
.key(s3_path.object_name.clone())
.send()
- .await?;
+ .await
+ .map_err(|e| {
+ error!("S3 failed to get object metadata");
+ Error::AwsSdk(e.into())
+ })?;
Ok(response)
}
@@ -136,7 +136,7 @@
&self,
s3_path: &S3Path,
range: impl RangeBounds<u64>,
- ) -> Result<Vec<u8>> {
+ ) -> S3Result<Vec<u8>> {
let mut request = self
.client
.get_object()
@@ -160,20 +160,30 @@
request = request.range(range);
}
- let response = request.send().await?;
- let data = response.body.collect().await?;
+ let response = request.send().await.map_err(|e| {
+ error!("S3 failed to get object");
+ Error::AwsSdk(e.into())
+ })?;
+ let data = response.body.collect().await.map_err(|e| {
+ error!("S3 failed to stream object bytes");
+ Error::ByteStream(e.into())
+ })?;
Ok(data.to_vec())
}
/// Deletes object at provided path
- pub async fn delete_object(&self, s3_path: &S3Path) -> Result<()> {
+ pub async fn delete_object(&self, s3_path: &S3Path) -> S3Result<()> {
self
.client
.delete_object()
.bucket(&s3_path.bucket_name)
.key(&s3_path.object_name)
.send()
- .await?;
+ .await
+ .map_err(|e| {
+ error!("S3 failed to delete object");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}
@@ -194,17 +204,22 @@
async fn start(
client: &Arc<aws_sdk_s3::Client>,
s3_path: &S3Path,
- ) -> Result<Self> {
+ ) -> S3Result<Self> {
let multipart_upload_res: CreateMultipartUploadOutput = client
.create_multipart_upload()
.bucket(&s3_path.bucket_name)
.key(&s3_path.object_name)
.send()
- .await?;
+ .await
+ .map_err(|e| {
+ error!("S3 failed to start upload session");
+ Error::AwsSdk(e.into())
+ })?;
- let upload_id = multipart_upload_res
- .upload_id()
- .ok_or(anyhow!("Upload ID expected to be present"))?;
+ let upload_id = multipart_upload_res.upload_id().ok_or_else(|| {
+ error!("Upload ID expected to be present");
+ Error::MissingUploadID
+ })?;
Ok(MultiPartUploadSession {
client: client.clone(),
@@ -216,7 +231,7 @@
}
/// adds data part to the multipart upload
- pub async fn add_part(&mut self, part: Vec<u8>) -> Result<()> {
+ pub async fn add_part(&mut self, part: Vec<u8>) -> S3Result<()> {
let stream = ByteStream::from(part);
let part_number: i32 = self.upload_parts.len() as i32 + 1;
let upload_result = self
@@ -228,7 +243,11 @@
.part_number(part_number)
.body(stream)
.send()
- .await?;
+ .await
+ .map_err(|e| {
+ error!("Failed to add upload part");
+ Error::AwsSdk(e.into())
+ })?;
let completed_part = CompletedPart::builder()
.e_tag(upload_result.e_tag.unwrap_or_default())
@@ -239,12 +258,10 @@
}
/// finishes the upload
- pub async fn finish_upload(&self) -> Result<()> {
- // TODO: handle this as S3-specific error
- ensure!(
- !self.upload_parts.is_empty(),
- "There are no parts to upload"
- );
+ pub async fn finish_upload(&self) -> S3Result<()> {
+ if self.upload_parts.is_empty() {
+ return Err(Error::EmptyUpload);
+ }
let completed_multipart_upload = CompletedMultipartUpload::builder()
.set_parts(Some(self.upload_parts.clone()))
@@ -258,7 +275,11 @@
.multipart_upload(completed_multipart_upload)
.upload_id(&self.upload_id)
.send()
- .await?;
+ .await
+ .map_err(|e| {
+ error!("Failed to finish upload session");
+ Error::AwsSdk(e.into())
+ })?;
Ok(())
}

File Metadata

Mime Type
text/plain
Expires
Wed, Jan 14, 9:13 PM (12 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5933947
Default Alt Text
D8101.1768425181.diff (6 KB)

Event Timeline