diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs index 9466fcb85..a8750d863 100644 --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -1,368 +1,380 @@ -use anyhow::{anyhow, Context, Result}; use aws_sdk_dynamodb::{ model::AttributeValue, output::GetItemOutput, Error as DynamoDBError, }; use chrono::{DateTime, Utc}; use std::{ collections::HashMap, fmt::{Display, Formatter}, sync::Arc, }; use crate::{ constants::{ BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, s3::S3Path, }; #[derive(Clone, Debug)] pub struct BlobItem { pub blob_hash: String, pub s3_path: S3Path, pub created: DateTime, } #[derive(Clone, Debug)] pub struct ReverseIndexItem { pub holder: String, pub blob_hash: String, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } // Blob item - pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<()> { + pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> { let item = HashMap::from([ ( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( BLOB_TABLE_S3_PATH_FIELD.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), ( BLOB_TABLE_CREATED_FIELD.to_string(), AttributeValue::S(blob_item.created.to_rfc3339()), ), ]); self .client .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) .send() .await - .context("Failed to put blob item")?; + .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn find_blob_item( &self, blob_hash: &str, - ) -> Result> { + ) -> Result, Error> { let item_key = HashMap::from([( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_hash.to_string()), )]); match self .client .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(item_key)) .send() .await - .with_context(|| { - format!("Failed to find blob item with hash: [{}]", blob_hash) - })? { + .map_err(|e| Error::AwsSdk(e.into()))? + { GetItemOutput { item: Some(mut item), .. } => { let blob_hash = parse_string_attribute( BLOB_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_TABLE_BLOB_HASH_FIELD), )?; let s3_path = parse_string_attribute( BLOB_TABLE_S3_PATH_FIELD, item.remove(BLOB_TABLE_S3_PATH_FIELD), )?; let created = parse_datetime_attribute( BLOB_TABLE_CREATED_FIELD, item.remove(BLOB_TABLE_CREATED_FIELD), )?; Ok(Some(BlobItem { blob_hash, - s3_path: S3Path::from_full_path(&s3_path)?, + s3_path: S3Path::from_full_path(&s3_path) + .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, created, })) } _ => Ok(None), } } - pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<()> { + pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_TABLE_NAME) .key( BLOB_TABLE_BLOB_HASH_FIELD, AttributeValue::S(blob_hash.to_string()), ) .send() .await - .with_context(|| { - format!("Failed to remove blob item with hash: [{}]", blob_hash) - })?; + .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } // Reverse index item pub async fn put_reverse_index_item( &self, reverse_index_item: ReverseIndexItem, - ) -> Result<()> { - if self - .find_reverse_index_by_holder(&reverse_index_item.holder) - .await? - .is_some() - { - return Err(anyhow!( - "An item for the given holder [{}] already exists", - reverse_index_item.holder - )); + ) -> Result<(), Error> { + let holder = &reverse_index_item.holder; + if self.find_reverse_index_by_holder(holder).await?.is_some() { + return Err(Error::Blob(BlobDBError::HolderAlreadyExists( + holder.to_string(), + ))); } let item = HashMap::from([ ( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(reverse_index_item.holder), ), ( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(reverse_index_item.blob_hash), ), ]); self .client .put_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_item(Some(item)) .send() .await - .context("Failed to put reverse index item")?; + .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn find_reverse_index_by_holder( &self, holder: &str, - ) -> Result> { + ) -> Result, Error> { let item_key = HashMap::from([( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(holder.to_string()), )]); match self .client .get_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_key(Some(item_key)) .consistent_read(true) .send() .await - .with_context(|| { - format!("Failed to find reverse index for holder: [{}]", holder) - })? { + .map_err(|e| Error::AwsSdk(e.into()))? + { GetItemOutput { item: Some(mut item), .. } => { let holder = parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; Ok(Some(ReverseIndexItem { holder, blob_hash })) } _ => Ok(None), } } pub async fn find_reverse_index_by_hash( &self, blob_hash: &str, - ) -> Result> { + ) -> Result, Error> { let response = self .client .query() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) .key_condition_expression("#blobHash = :valueToMatch") .expression_attribute_names( "#blobHash", BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, ) .expression_attribute_values( ":valueToMatch", AttributeValue::S(blob_hash.to_string()), ) .send() .await - .with_context(|| { - format!("Failed to find reverse index for hash: [{}]", blob_hash) - })?; + .map_err(|e| Error::AwsSdk(e.into()))?; if response.count == 0 { return Ok(vec![]); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for mut item in response.items.unwrap_or_default() { let holder = parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; results.push(ReverseIndexItem { holder, blob_hash }); } return Ok(results); } - pub async fn remove_reverse_index_item(&self, holder: &str) -> Result<()> { + pub async fn remove_reverse_index_item( + &self, + holder: &str, + ) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .key( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, AttributeValue::S(holder.to_string()), ) .send() .await - .with_context(|| { - format!("Failed to remove reverse index for holder: [{}]", holder) - })?; + .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), + #[display(...)] + Blob(BlobDBError), +} + +#[derive(Debug)] +pub enum BlobDBError { + HolderAlreadyExists(String), + InvalidS3Path(anyhow::Error), } +impl Display for BlobDBError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + BlobDBError::HolderAlreadyExists(holder) => { + write!(f, "Item for given holder [{}] already exists", holder) + } + BlobDBError::InvalidS3Path(err) => err.fmt(f), + } + } +} + +impl std::error::Error for BlobDBError {} + #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: &'static str, attribute_value: Option, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] InvalidTimestamp(chrono::ParseError), } fn parse_string_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } fn parse_datetime_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(datetime)) = &attribute_value { // parse() accepts a relaxed RFC3339 string datetime.parse().map_err(|e| { DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )) } } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs index fedccb30c..f862d55ea 100644 --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,240 +1,243 @@ use anyhow::{anyhow, Result}; use aws_sdk_s3::{ model::{CompletedMultipartUpload, CompletedPart}, output::CreateMultipartUploadOutput, types::ByteStream, }; use std::{ ops::{Bound, RangeBounds}, sync::Arc, }; /// A helper structure representing an S3 object path #[derive(Clone, Debug)] pub struct S3Path { pub bucket_name: String, pub object_name: String, } impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { - return Err(anyhow!("S3 path should contain the '/' separator")); + return Err(anyhow!( + "S3 path [{}] should contain the '/' separator", + full_path + )); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() .ok_or(anyhow!("Expected bucket name in path [{}]", full_path))? .to_string(), object_name: split .next() .ok_or(anyhow!("Expected object name in path [{}]", full_path))? .to_string(), }) } /// Retrieves full S3 path string in the following format: `[bucket_name]/[object_name]` pub fn to_full_path(&self) -> String { format!("{}/{}", self.bucket_name, self.object_name) } } #[derive(Clone)] pub struct S3Client { client: Arc, } impl S3Client { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { S3Client { client: Arc::new(aws_sdk_s3::Client::new(aws_config)), } } /// Creates a new [`MultiPartUploadSession`] pub async fn start_upload_session( &self, s3_path: &S3Path, ) -> Result { MultiPartUploadSession::start(&self.client, s3_path).await } /// Returns object metadata (e.g. file size) without downloading the object itself pub async fn get_object_metadata( &self, s3_path: &S3Path, ) -> Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() .await?; Ok(response) } /// Downloads object and retrieves data bytes within provided range /// /// * `range` - Range of object bytes to download. pub async fn get_object_bytes( &self, s3_path: &S3Path, range: impl RangeBounds, ) -> Result> { let mut request = self .client .get_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name); if range.start_bound() != Bound::Unbounded || range.end_bound() != Bound::Unbounded { // Create a valid HTTP Range header let from = match range.start_bound() { Bound::Included(start) => start.to_string(), _ => "0".to_string(), }; let to = match range.end_bound() { Bound::Included(end) => end.to_string(), Bound::Excluded(end) => (end - 1).to_string(), _ => "".to_string(), }; let range = format!("bytes={}-{}", from, to); request = request.range(range); } let response = request.send().await?; let data = response.body.collect().await?; Ok(data.into_bytes().to_vec()) } /// Deletes object at provided path pub async fn delete_object(&self, s3_path: &S3Path) -> Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await?; Ok(()) } } /// Represents a multipart upload session to the AWS S3 pub struct MultiPartUploadSession { client: Arc, bucket_name: String, object_name: String, upload_id: String, upload_parts: Vec, } impl MultiPartUploadSession { /// Starts a new upload session and returns its instance /// Don't call this directly, use [`S3Client::start_upload_session()`] instead async fn start( client: &Arc, s3_path: &S3Path, ) -> Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await?; let upload_id = multipart_upload_res .upload_id() .ok_or(anyhow!("Upload ID expected to be present"))?; Ok(MultiPartUploadSession { client: client.clone(), bucket_name: String::from(&s3_path.bucket_name), object_name: String::from(&s3_path.object_name), upload_id: String::from(upload_id), upload_parts: Vec::new(), }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self .client .upload_part() .key(&self.object_name) .bucket(&self.bucket_name) .upload_id(&self.upload_id) .part_number(part_number) .body(stream) .send() .await?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) .part_number(part_number) .build(); self.upload_parts.push(completed_part); Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> Result<()> { let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build(); self .client .complete_multipart_upload() .bucket(&self.bucket_name) .key(&self.object_name) .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() .await?; Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_s3path_from_full_path() { let full_path = "my_bucket/some_object"; let s3_path = S3Path::from_full_path(full_path); assert!(s3_path.is_ok()); let s3_path = s3_path.unwrap(); assert_eq!(&s3_path.bucket_name, "my_bucket"); assert_eq!(&s3_path.object_name, "some_object"); } #[test] fn test_s3path_from_invalid_path() { let result = S3Path::from_full_path("invalid_path"); assert!(result.is_err()) } #[test] fn test_s3path_to_full_path() { let s3_path = S3Path { bucket_name: "my_bucket".to_string(), object_name: "some_object".to_string(), }; let full_path = s3_path.to_full_path(); assert_eq!(full_path, "my_bucket/some_object"); } }