diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -33,6 +33,7 @@ pub const ATTR_S3_PATH: &str = "s3_path"; pub const ATTR_UNCHECKED: &str = "unchecked"; pub const ATTR_BLOB_SIZE: &str = "blob_size"; + pub const ATTR_MEDIA_INFO: &str = "media_info"; } // Environment variables diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -58,7 +58,7 @@ blob_item: BlobItemInput, blob_size: u64, ) -> DBResult<()> { - let item = HashMap::from([ + let mut item = HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_item.blob_hash), @@ -78,6 +78,10 @@ ), ]); + if let Some(media_info) = blob_item.media_info { + item.insert(ATTR_MEDIA_INFO.to_string(), media_info.into()); + } + self.insert_item(item).await?; Ok(()) } diff --git a/services/blob/src/database/types.rs b/services/blob/src/database/types.rs --- a/services/blob/src/database/types.rs +++ b/services/blob/src/database/types.rs @@ -1,7 +1,8 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_lib::database::{ - parse_timestamp_attribute, AttributeTryInto, DBItemError, Value, + parse_timestamp_attribute, AttributeExtractor, AttributeMap, + AttributeTryInto, DBItemError, TryFromAttribute, Value, }; use derive_more::Constructor; use std::collections::HashMap; @@ -41,16 +42,71 @@ } } +#[derive(Debug)] +pub struct MediaInfo { + pub content_type: Option, + pub custom_metadata: Option, +} + +const MEDIA_INFO_CONTENT_TYPE: &str = "contentType"; +const MEDIA_INFO_CUSTOM_METADATA: &str = "customMetadata"; + +impl From for AttributeValue { + fn from(value: MediaInfo) -> Self { + let mut attrs = HashMap::new(); + if let Some(content_type) = value.content_type { + attrs.insert( + MEDIA_INFO_CONTENT_TYPE.to_string(), + AttributeValue::S(content_type), + ); + } + if let Some(custom_metadata) = value.custom_metadata { + attrs.insert( + MEDIA_INFO_CUSTOM_METADATA.to_string(), + AttributeValue::S(custom_metadata), + ); + } + AttributeValue::M(attrs) + } +} + +impl TryFrom for MediaInfo { + type Error = comm_lib::database::DBItemError; + + fn try_from(mut attrs: AttributeMap) -> Result { + let content_type = attrs.take_attr(MEDIA_INFO_CONTENT_TYPE)?; + let custom_metadata = attrs.take_attr(MEDIA_INFO_CUSTOM_METADATA)?; + Ok(Self { + content_type, + custom_metadata, + }) + } +} + +impl TryFromAttribute for MediaInfo { + fn try_from_attr( + attribute_name: impl Into, + attribute: Option, + ) -> Result { + AttributeMap::try_from_attr(attribute_name, attribute) + .and_then(MediaInfo::try_from) + } +} + /// Represents an input payload for inserting a blob item into the database. /// This contains only the business logic related attributes #[derive(Debug)] pub struct BlobItemInput { pub blob_hash: String, pub s3_path: S3Path, + pub media_info: Option, } impl BlobItemInput { - pub fn new(blob_hash: impl Into) -> Self { + pub fn new( + blob_hash: impl Into, + media_info: Option, + ) -> Self { let blob_hash: String = blob_hash.into(); BlobItemInput { blob_hash: blob_hash.clone(), @@ -58,6 +114,7 @@ bucket_name: CONFIG.s3_bucket_name.clone(), object_name: blob_hash, }, + media_info, } } } @@ -73,18 +130,15 @@ pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, + pub media_info: Option, } impl TryFrom for BlobItemRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { - let blob_hash = attributes - .remove(ATTR_BLOB_HASH) - .attr_try_into(ATTR_BLOB_HASH)?; - let s3_path: String = attributes - .remove(ATTR_S3_PATH) - .attr_try_into(ATTR_S3_PATH)?; + let blob_hash = attributes.take_attr(ATTR_BLOB_HASH)?; + let s3_path: String = attributes.take_attr(ATTR_S3_PATH)?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), @@ -96,12 +150,15 @@ let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Blob)?; let s3_path = S3Path::from_full_path(&s3_path).map_err(DBError::from)?; + let media_info = attributes.take_attr(ATTR_MEDIA_INFO)?; + Ok(BlobItemRow { blob_hash, s3_path, unchecked, created_at, last_modified, + media_info, }) } } diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -199,7 +199,7 @@ trace!("Stream done"); }; - service.put_blob(blob_hash, stream).await?; + service.put_blob(blob_hash, None, stream).await?; Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -20,7 +20,7 @@ INVITE_LINK_BLOB_HASH_PREFIX, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }; use crate::database::types::{ - BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, + BlobItemInput, BlobItemRow, MediaInfo, PrimaryKey, UncheckedKind, }; use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; @@ -162,10 +162,11 @@ pub async fn put_blob( &self, blob_hash: impl Into, + media_info: Option, mut blob_data_stream: impl ByteStream, ) -> Result<(), BlobServiceError> { let blob_hash: String = blob_hash.into(); - let blob_item = BlobItemInput::new(&blob_hash); + let blob_item = BlobItemInput::new(&blob_hash, media_info); if self.db.get_blob_item(&blob_hash).await?.is_some() { debug!("Blob already exists"); @@ -296,7 +297,7 @@ let blob_size = match ddb_results.get(&blob_hash) { Some(ddb_size) => *ddb_size, None => { - let s3_path = BlobItemInput::new(&blob_hash).s3_path; + let s3_path = BlobItemInput::new(&blob_hash, None).s3_path; match self.s3.get_object_size(&s3_path).await { Ok(s3_size) => { updated_values.push((blob_hash.clone(), s3_size));