diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs index 294695c9b..aa074abd0 100644 --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -1,417 +1,417 @@ use aws_sdk_dynamodb::{ operation::put_item::PutItemOutput, types::{ AttributeValue, Delete, DeleteRequest, PutRequest, TransactWriteItem, Update, WriteRequest, }, Error as DynamoDBError, }; use chrono::Utc; use comm_lib::database::{ self, batch_operations::ExponentialBackoffConfig, TryFromAttribute, }; use std::collections::HashMap; use tracing::{debug, error, trace}; use crate::constants::db::*; use super::errors::{BlobDBError, Error as DBError}; use super::types::*; #[derive(Clone)] pub struct DatabaseClient { ddb: aws_sdk_dynamodb::Client, } /// public interface implementation impl DatabaseClient { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { DatabaseClient { ddb: aws_sdk_dynamodb::Client::new(aws_config), } } /// Gets a blob item row from the database by its blob hash /// Returns None if the blob item is not found pub async fn get_blob_item( &self, blob_hash: impl Into, ) -> DBResult> { let key = PrimaryKey::for_blob_item(blob_hash); self .get_raw_item(key.clone()) .await? .map(BlobItemRow::try_from) .transpose() } /// Inserts a new blob item row into the database. Returns Error /// if the item already exists. pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> { let item = HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( ATTR_HOLDER.to_string(), AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()), ), ( ATTR_S3_PATH.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes blob item row. Doesn't delete its holders. pub async fn delete_blob_item( &self, blob_hash: impl Into, ) -> DBResult<()> { let key = PrimaryKey::for_blob_item(blob_hash); self .ddb .delete_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to delete blob item: {:?}", err); - DBError::AwsSdk(err.into()) + DBError::AwsSdk(Box::new(err.into())) })?; Ok(()) } // Inserts a new holder assignment row into the database. Returns Error // if the item already exists or holder format is invalid. pub async fn put_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let item = HashMap::from([ (ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_hash)), (ATTR_HOLDER.to_string(), AttributeValue::S(holder)), (ATTR_UNCHECKED.to_string(), UncheckedKind::Holder.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes a holder assignment row from the table. /// If the blob item for given holder assignment exists, it will be marked as unchecked. /// /// Returns Error if the holder format is invalid or race condition happened. /// Doesn't fail if the holder assignment didn't exist before. pub async fn delete_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let mut transaction = Vec::new(); // delete the holder row let assignment_key = PrimaryKey { blob_hash: blob_hash.clone(), holder, }; let delete_request = Delete::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(assignment_key.into())) .build(); transaction .push(TransactWriteItem::builder().delete(delete_request).build()); // mark the blob item as unchecked if exists let blob_primary_key = PrimaryKey::for_blob_item(blob_hash); if self.get_raw_item(blob_primary_key.clone()).await?.is_some() { let update_request = Update::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(blob_primary_key.into())) // even though we checked that the blob item exists, we still need to check it again // using DDB built-in conditions in case it was deleted in meantime .condition_expression( "attribute_exists(#blob_hash) AND attribute_exists(#holder)", ) .update_expression("SET #unchecked = :unchecked, #last_modified = :now") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":unchecked", UncheckedKind::Blob.into()) .expression_attribute_values( ":now", AttributeValue::N(Utc::now().timestamp_millis().to_string()), ) .build(); transaction .push(TransactWriteItem::builder().update(update_request).build()); } self .ddb .transact_write_items() .set_transact_items(Some(transaction)) .send() .await .map_err(|err| { debug!("DynamoDB client failed to run transaction: {:?}", err); - DBError::AwsSdk(err.into()) + DBError::AwsSdk(Box::new(err.into())) })?; Ok(()) } /// Queries the table for a list of holders for given blob hash. /// Optionally limits the number of results. pub async fn list_blob_holders( &self, blob_hash: impl Into, limit: Option, ) -> DBResult> { let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .projection_expression("#holder") .key_condition_expression("#blob_hash = :blob_hash") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_values( ":blob_hash", AttributeValue::S(blob_hash.into()), ) .consistent_read(true) // we need to increase limit by 1 because the blob item itself can be fetched too // it is filtered-out later .set_limit(limit.map(|it| it + 1)) .send() .await .map_err(|err| { error!("DynamoDB client failed to query holders: {:?}", err); - DBError::AwsSdk(err.into()) + DBError::AwsSdk(Box::new(err.into())) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .filter_map(|mut row| { // filter out rows that are blob items // we cannot do it in key condition expression - it doesn't support the <> operator // filter expression doesn't work either - it doesn't support filtering by sort key match String::try_from_attr(ATTR_HOLDER, row.remove(ATTR_HOLDER)) { Ok(value) if value.as_str() == BLOB_ITEM_ROW_HOLDER_VALUE => None, holder => Some(holder), } }) .collect::, _>>() .map_err(DBError::Attribute) } /// Returns a list of primary keys for rows that already exist in the table pub async fn list_existing_keys( &self, keys: impl IntoIterator, ) -> DBResult> { database::batch_operations::batch_get( &self.ddb, BLOB_TABLE_NAME, keys, Some(format!("{}, {}", ATTR_BLOB_HASH, ATTR_HOLDER)), ExponentialBackoffConfig::default(), ) .await? .into_iter() .map(PrimaryKey::try_from) .collect::, _>>() } /// Returns a list of primary keys for "unchecked" items (blob / holder) /// that were last modified at least `min_age` ago. /// We need to specify if we want to get blob or holder items. pub async fn find_unchecked_items( &self, kind: UncheckedKind, min_age: chrono::Duration, ) -> DBResult> { let created_until = Utc::now() - min_age; let timestamp = created_until.timestamp_millis(); let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .index_name(UNCHECKED_INDEX_NAME) .key_condition_expression( "#unchecked = :kind AND #last_modified < :timestamp", ) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":kind", kind.into()) .expression_attribute_values( ":timestamp", AttributeValue::N(timestamp.to_string()), ) .send() .await .map_err(|err| { error!("DynamoDB client failed to query unchecked items: {:?}", err); - DBError::AwsSdk(err.into()) + DBError::AwsSdk(Box::new(err.into())) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .map(PrimaryKey::try_from) .collect::, _>>() } /// For all rows in specified set of primary keys, removes /// the "unchecked" attribute using PutItem operation in batch. pub async fn batch_mark_checked( &self, keys: impl IntoIterator, ) -> DBResult<()> { let items_to_mark = database::batch_operations::batch_get( &self.ddb, BLOB_TABLE_NAME, keys, None, ExponentialBackoffConfig::default(), ) .await?; let write_requests = items_to_mark .into_iter() .filter_map(|mut row| { // filter out rows that are already checked // to save some write capacity row.remove(ATTR_UNCHECKED)?; let put_request = PutRequest::builder().set_item(Some(row)).build(); let request = WriteRequest::builder().put_request(put_request).build(); Some(request) }) .collect(); database::batch_operations::batch_write( &self.ddb, BLOB_TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } /// Performs multiple DeleteItem operations in batch pub async fn batch_delete_rows( &self, keys: impl IntoIterator, ) -> DBResult<()> { let write_requests = keys .into_iter() .map(|key| DeleteRequest::builder().set_key(Some(key.into())).build()) .map(|request| WriteRequest::builder().delete_request(request).build()) .collect::>(); database::batch_operations::batch_write( &self.ddb, BLOB_TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } } // private helpers impl DatabaseClient { /// inserts a new item into the table using PutItem. Returns /// error if the item already exists async fn insert_item( &self, mut item: RawAttributes, ) -> DBResult { // add metadata attributes common for all types of rows let now = Utc::now().timestamp_millis(); item.insert( ATTR_CREATED_AT.to_string(), AttributeValue::N(now.to_string()), ); item.insert( ATTR_LAST_MODIFIED.to_string(), AttributeValue::N(now.to_string()), ); self .ddb .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) // make sure we don't accidentaly overwrite existing row .condition_expression( "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)", ) .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .send() .await .map_err(|err| match DynamoDBError::from(err) { DynamoDBError::ConditionalCheckFailedException(e) => { debug!("DynamoDB client failed to insert: item already exists"); trace!("Conditional check failed with error: {}", e); DBError::ItemAlreadyExists } err => { debug!("DynamoDB client failed to insert: {:?}", err); - DBError::AwsSdk(err) + DBError::AwsSdk(Box::new(err)) } }) } /// Gets a single row from the table using GetItem, without parsing it async fn get_raw_item( &self, key: PrimaryKey, ) -> DBResult> { self .ddb .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to get item: {:?}", err); - DBError::AwsSdk(err.into()) + DBError::AwsSdk(Box::new(err.into())) }) .map(|response| response.item) } } fn validate_holder(holder: &str) -> DBResult<()> { if holder == BLOB_ITEM_ROW_HOLDER_VALUE { debug!("Invalid holder: {}", holder); return Err(DBError::Blob(BlobDBError::InvalidInput(holder.to_string()))); } Ok(()) } diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs index 0ba2b6bbd..1c9fe9f90 100644 --- a/services/blob/src/database/errors.rs +++ b/services/blob/src/database/errors.rs @@ -1,62 +1,62 @@ use std::fmt::{Display, Formatter}; use aws_sdk_dynamodb::Error as DynamoDBError; use comm_lib::database::DBItemError; use crate::s3::S3PathError; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] - AwsSdk(DynamoDBError), + AwsSdk(Box), #[display(...)] Attribute(DBItemError), #[display(...)] #[from(ignore)] Blob(BlobDBError), #[display(...)] ItemAlreadyExists, MaxRetriesExceeded, } impl From for Error { fn from(value: comm_lib::database::Error) -> Self { use comm_lib::database::Error as E; match value { - E::AwsSdk(err) => Self::AwsSdk(err), + E::AwsSdk(err) => Self::AwsSdk(Box::new(err)), E::Attribute(err) => Self::Attribute(err), E::MaxRetriesExceeded => Self::MaxRetriesExceeded, } } } #[derive(Debug)] pub enum BlobDBError { HolderAlreadyExists(String), InvalidS3Path(S3PathError), InvalidInput(String), } impl Display for BlobDBError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { BlobDBError::HolderAlreadyExists(holder) => { write!(f, "Item for given holder [{}] already exists", holder) } BlobDBError::InvalidS3Path(err) => err.fmt(f), BlobDBError::InvalidInput(value) => { write!(f, "Invalid input value [{}]", value) } } } } impl std::error::Error for BlobDBError {} impl From for Error { fn from(err: S3PathError) -> Self { Error::Blob(BlobDBError::InvalidS3Path(err)) } } diff --git a/services/blob/src/http/errors.rs b/services/blob/src/http/errors.rs index ae174f9db..d00164896 100644 --- a/services/blob/src/http/errors.rs +++ b/services/blob/src/http/errors.rs @@ -1,77 +1,86 @@ use actix_web::error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; use actix_web::{Error as HttpError, HttpResponse, ResponseError}; use aws_sdk_dynamodb::Error as DynamoDBError; use http::StatusCode; use tracing::{debug, error, trace, warn}; use crate::database::errors::{BlobDBError, Error as DBError}; use crate::s3::Error as S3Error; use crate::service::BlobServiceError; pub(super) fn handle_blob_service_error(err: &BlobServiceError) -> HttpError { trace!("Handling blob service error: {:?}", err); match err { BlobServiceError::BlobNotFound => ErrorNotFound("not found"), BlobServiceError::BlobAlreadyExists | BlobServiceError::DB(DBError::ItemAlreadyExists) => { ErrorConflict("blob already exists") } BlobServiceError::DB(db_err) => match db_err { - DBError::AwsSdk(DynamoDBError::InternalServerError(_)) - | DBError::AwsSdk( - DynamoDBError::ProvisionedThroughputExceededException(_), - ) - | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { - warn!("AWS transient error occurred"); - ErrorServiceUnavailable("please retry") - } + DBError::AwsSdk(aws_err) => match aws_err.as_ref() { + DynamoDBError::InternalServerError(_) + | DynamoDBError::ProvisionedThroughputExceededException(_) + | DynamoDBError::RequestLimitExceeded(_) => { + warn!("AWS transient error occurred"); + ErrorServiceUnavailable("please retry") + } + unexpected => { + error!("Received an unexpected AWS error: {0:?} - {0}", unexpected); + ErrorInternalServerError("server error") + } + }, DBError::Blob(BlobDBError::InvalidInput(_)) => { ErrorBadRequest("bad request") } unexpected => { error!("Received an unexpected DB error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BlobServiceError::S3(s3_err) => match s3_err { - S3Error::AwsSdk(aws_sdk_s3::Error::NotFound(_)) - | S3Error::AwsSdk(aws_sdk_s3::Error::NoSuchKey(_)) => { - error!("Data inconsistency! Blob is present in database but not present in S3!"); - ErrorInternalServerError("server error") - } + S3Error::AwsSdk(aws_err) => match aws_err.as_ref() { + aws_sdk_s3::Error::NotFound(_) | aws_sdk_s3::Error::NoSuchKey(_) => { + error!("Data inconsistency! Blob is present in database but not present in S3!"); + ErrorInternalServerError("server error") + } + err => { + error!("Received an unexpected AWS S3 error: {0:?} - {0}", err); + ErrorInternalServerError("server error") + } + }, S3Error::EmptyUpload => ErrorBadRequest("empty upload"), unexpected => { error!("Received an unexpected S3 error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BlobServiceError::InputError(err) => { debug!("Received request input error: {0:?} - {0}", err); ErrorBadRequest("bad request") } BlobServiceError::InviteLinkError(invite_link_error) => { debug!("Received invite link error: {0}", invite_link_error); ErrorBadRequest("bad request") } err => { error!("Received an unexpected error: {0:?} - {0}", err); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for BlobServiceError { fn error_response(&self) -> HttpResponse { handle_blob_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_blob_service_error(self) .as_response_error() .status_code() } } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs index d822f0d0a..48bb4d518 100644 --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,362 +1,362 @@ use aws_sdk_s3::{ operation::create_multipart_upload::CreateMultipartUploadOutput, primitives::ByteStream, types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}, Error as S3Error, }; use std::ops::{Bound, RangeBounds}; use tracing::{debug, error, trace}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] - AwsSdk(S3Error), + AwsSdk(Box), #[display(...)] ByteStream(std::io::Error), #[display(...)] InvalidPath(S3PathError), #[display(fmt = "There are no parts to upload")] EmptyUpload, #[display(fmt = "Missing upload ID")] MissingUploadID, } #[derive(Debug, derive_more::Error)] pub enum S3PathError { MissingSeparator(#[error(ignore)] String), MissingBucketName(#[error(ignore)] String), MissingObjectName(#[error(ignore)] String), } impl std::fmt::Display for S3PathError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { S3PathError::MissingSeparator(path) => { write!(f, "S3 path: [{}] should contain the '/' separator", path) } S3PathError::MissingBucketName(path) => { write!(f, "Expected bucket name in S3 path: [{}]", path) } S3PathError::MissingObjectName(path) => { write!(f, "Expected object name in S3 path: [{}]", path) } } } } type S3Result = Result; /// A helper structure representing an S3 object path #[derive(Clone, Debug)] pub struct S3Path { pub bucket_name: String, pub object_name: String, } impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { return Err(S3PathError::MissingSeparator(full_path.to_string())); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() .ok_or_else(|| S3PathError::MissingBucketName(full_path.to_string()))? .to_string(), object_name: split .next() .ok_or_else(|| S3PathError::MissingObjectName(full_path.to_string()))? .to_string(), }) } /// Retrieves full S3 path string in the following format: `[bucket_name]/[object_name]` pub fn to_full_path(&self) -> String { format!("{}/{}", self.bucket_name, self.object_name) } } impl From<&S3Path> for String { fn from(s3_path: &S3Path) -> Self { s3_path.to_full_path() } } impl TryFrom<&str> for S3Path { type Error = S3PathError; fn try_from(full_path: &str) -> Result { Self::from_full_path(full_path) } } #[derive(Clone)] pub struct S3Client { client: aws_sdk_s3::Client, } impl S3Client { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { let s3_config = aws_sdk_s3::config::Builder::from(aws_config) // localstack doesn't support virtual addressing .force_path_style(crate::config::CONFIG.localstack_endpoint.is_some()) .build(); S3Client { client: aws_sdk_s3::Client::from_conf(s3_config), } } /// Creates a new [`MultiPartUploadSession`] pub async fn start_upload_session( &self, s3_path: &S3Path, ) -> S3Result { MultiPartUploadSession::start(&self.client, s3_path).await } /// Returns object metadata (e.g. file size) without downloading the object itself pub async fn get_object_metadata( &self, s3_path: &S3Path, ) -> S3Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() .await .map_err(|e| { error!("S3 failed to get object metadata"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; Ok(response) } /// Downloads object and retrieves data bytes within provided range /// /// * `range` - Range of object bytes to download. pub async fn get_object_bytes( &self, s3_path: &S3Path, range: impl RangeBounds, ) -> S3Result> { let mut request = self .client .get_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name); if range.start_bound() != Bound::Unbounded || range.end_bound() != Bound::Unbounded { // Create a valid HTTP Range header let from = match range.start_bound() { Bound::Included(start) => start.to_string(), _ => "0".to_string(), }; let to = match range.end_bound() { Bound::Included(end) => end.to_string(), Bound::Excluded(end) => (end - 1).to_string(), _ => "".to_string(), }; let range = format!("bytes={}-{}", from, to); request = request.range(range); } let response = request.send().await.map_err(|e| { error!("S3 failed to get object"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; let data = response.body.collect().await.map_err(|e| { error!("S3 failed to stream object bytes"); Error::ByteStream(e.into()) })?; Ok(data.to_vec()) } /// Deletes object at provided path pub async fn delete_object(&self, s3_path: &S3Path) -> S3Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { error!("S3 failed to delete object"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; Ok(()) } pub async fn batch_delete_objects(&self, paths: Vec) -> S3Result<()> { let Some(first_path) = paths.first() else { debug!("No S3 objects to delete"); return Ok(()); }; let bucket_name = &first_path.bucket_name; let objects = paths .iter() .map(|path| ObjectIdentifier::builder().key(&path.object_name).build()) .collect(); self .client .delete_objects() .bucket(bucket_name) .delete(Delete::builder().set_objects(Some(objects)).build()) .send() .await .map_err(|e| { error!("S3 failed to batch delete objects"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; Ok(()) } } /// Represents a multipart upload session to the AWS S3 pub struct MultiPartUploadSession { client: aws_sdk_s3::Client, bucket_name: String, object_name: String, upload_id: String, upload_parts: Vec, } impl MultiPartUploadSession { /// Starts a new upload session and returns its instance /// Don't call this directly, use [`S3Client::start_upload_session()`] instead async fn start( client: &aws_sdk_s3::Client, s3_path: &S3Path, ) -> S3Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { error!("S3 failed to start upload session"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; let upload_id = multipart_upload_res.upload_id().ok_or_else(|| { error!("Upload ID expected to be present"); Error::MissingUploadID })?; debug!("Started multipart upload session with ID: {}", upload_id); Ok(MultiPartUploadSession { client: client.clone(), bucket_name: String::from(&s3_path.bucket_name), object_name: String::from(&s3_path.object_name), upload_id: String::from(upload_id), upload_parts: Vec::new(), }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> S3Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self .client .upload_part() .key(&self.object_name) .bucket(&self.bucket_name) .upload_id(&self.upload_id) .part_number(part_number) .body(stream) .send() .await .map_err(|e| { error!("Failed to add upload part"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) .part_number(part_number) .build(); trace!( upload_id = self.upload_id, e_tag = completed_part.e_tag.as_deref().unwrap_or("N/A"), "Uploaded part {}.", part_number ); self.upload_parts.push(completed_part); Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> S3Result<()> { if self.upload_parts.is_empty() { return Err(Error::EmptyUpload); } let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build(); self .client .complete_multipart_upload() .bucket(&self.bucket_name) .key(&self.object_name) .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() .await .map_err(|e| { error!("Failed to finish upload session"); - Error::AwsSdk(e.into()) + Error::AwsSdk(Box::new(e.into())) })?; debug!(upload_id = self.upload_id, "Multipart upload complete"); Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_s3path_from_full_path() { let full_path = "my_bucket/some_object"; let s3_path = S3Path::from_full_path(full_path); assert!(s3_path.is_ok()); let s3_path = s3_path.unwrap(); assert_eq!(&s3_path.bucket_name, "my_bucket"); assert_eq!(&s3_path.object_name, "some_object"); } #[test] fn test_s3path_from_invalid_path() { let result = S3Path::from_full_path("invalid_path"); assert!(result.is_err()) } #[test] fn test_s3path_to_full_path() { let s3_path = S3Path { bucket_name: "my_bucket".to_string(), object_name: "some_object".to_string(), }; let full_path = s3_path.to_full_path(); assert_eq!(full_path, "my_bucket/some_object"); } }