diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -1462,6 +1462,8 @@ "tracing-actix-web", "tracing-futures", "tracing-subscriber", + "urlencoding", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,7 @@ tracing-log = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.5" +urlencoding = "2.1" uuid = "1.3" wasm-bindgen = "0.2" tower = "0.4" diff --git a/services/blob/Cargo.toml b/services/blob/Cargo.toml --- a/services/blob/Cargo.toml +++ b/services/blob/Cargo.toml @@ -35,4 +35,6 @@ tracing-actix-web = { workspace = true } tracing-futures = { workspace = true, features = ["futures-03"] } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +urlencoding = { workspace = true } +uuid = { workspace = true } serde_json = { workspace = true } diff --git a/services/blob/src/http/handlers/media.rs b/services/blob/src/http/handlers/media.rs new file mode 100644 --- /dev/null +++ b/services/blob/src/http/handlers/media.rs @@ -0,0 +1,90 @@ +use crate::database::types::MediaInfo; +use crate::http::errors::handle_blob_service_error; +use crate::service::BlobService; + +use actix_web::error::ErrorBadRequest; +use actix_web::{http::header::Range, web, HttpResponse}; +use async_stream::try_stream; +use comm_lib::blob::types::http::BlobUploadMultimediaResponse; +use comm_lib::blob::types::BlobInfo; +use comm_lib::http::multipart; +use tokio_stream::StreamExt; +use tracing::{info, instrument, trace, warn}; +use tracing_futures::Instrument; + +#[instrument(skip_all, name = "upload_media", fields(blob_hash))] +pub async fn upload_media_handler( + service: web::Data, + mut payload: actix_multipart::Multipart, +) -> actix_web::Result { + info!("Upload media request"); + + let mut mime_type = None; + let mut metadata = None; + + let mut file_field = loop { + let Some(field) = payload.try_next().await? else { + return Err(ErrorBadRequest("Bad request")); + }; + + match field.name() { + "mime_type" => { + let mime = multipart::read_field_to_string(field).await?; + mime_type = Some(mime); + } + "metadata" => { + let metadata_value = multipart::read_field_to_string(field).await?; + metadata = Some(metadata_value); + } + "file" => break field, + unknown => { + warn!("Unknown field: {}. Ignoring", unknown); + continue; + } + }; + }; + + let media_id = uuid::Uuid::new_v4().to_string(); + + let content_type = mime_type.or_else(|| { + file_field + .content_type() + .cloned() + .map(|ct| ct.essence_str().to_string()) + }); + + let stream = try_stream! { + trace!("Got media contents. Streaming..."); + while let Some(chunk) = file_field.try_next().await? { + yield chunk; + } + trace!("Stream done"); + }; + + let media_info = MediaInfo { + content_type: content_type.clone(), + custom_metadata: metadata.clone(), + }; + + // create a blob hash based off media ID + let blob_info = BlobInfo::from_bytes(media_id.as_bytes()); + service + .put_blob(&blob_info.blob_hash, Some(media_info), stream) + .await?; + service + .assign_holder_with_tags( + &blob_info.blob_hash, + &blob_info.holder, + &["media".to_string()], + ) + .await?; + tracing::debug!(media_id, "Stored blob: {:?}.", blob_info); + + let response = BlobUploadMultimediaResponse { + media_id, + blob_hash: blob_info.blob_hash, + content_type, + metadata, + }; + Ok(HttpResponse::Created().json(response)) +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -13,6 +13,7 @@ mod handlers { pub(super) mod blob; pub(super) mod holders; + pub(super) mod media; pub(super) mod metadata; } @@ -54,9 +55,14 @@ ) .service( web::resource("/metadata/get_blob_sizes") - .wrap(auth_middleware) + .wrap(auth_middleware.clone()) .route(web::post().to(handlers::metadata::get_blob_sizes)), ) + .service( + web::resource("/media") + .wrap(auth_middleware) + .route(web::post().to(handlers::media::upload_media_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -86,6 +86,18 @@ pub instant_delete: bool, } + // Multimedia endpoint types + + #[derive(Serialize, Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct BlobUploadMultimediaResponse { + #[serde(rename = "mediaID")] + pub media_id: String, + pub blob_hash: String, + pub content_type: Option, + pub metadata: Option, + } + // impls impl From> for RemoveHoldersRequest { fn from(requests: Vec) -> Self { diff --git a/shared/comm-lib/src/http/multipart.rs b/shared/comm-lib/src/http/multipart.rs --- a/shared/comm-lib/src/http/multipart.rs +++ b/shared/comm-lib/src/http/multipart.rs @@ -27,18 +27,12 @@ pub async fn get_text_field( multipart: &mut actix_multipart::Multipart, ) -> anyhow::Result, MultipartError> { - let Some(mut field): Option = multipart.try_next().await? else { + let Some(field): Option = multipart.try_next().await? else { return Ok(None); }; let name = field.name().to_string(); - - let mut buf = Vec::new(); - while let Some(chunk) = field.try_next().await? { - buf.extend_from_slice(&chunk); - } - - let text = parse_bytes_to_string(buf)?; + let text = read_field_to_string(field).await?; Ok(Some((name, text))) } @@ -77,6 +71,18 @@ Ok(found_fields) } +pub async fn read_field_to_string( + mut field: actix_multipart::Field, +) -> Result { + let mut buf = Vec::new(); + while let Some(chunk) = field.try_next().await? { + buf.extend_from_slice(&chunk); + } + + let text = parse_bytes_to_string(buf)?; + Ok(text) +} + pub fn parse_bytes_to_string( utf8_buffer: Vec, ) -> Result { diff --git a/shared/comm-lib/src/tools.rs b/shared/comm-lib/src/tools.rs --- a/shared/comm-lib/src/tools.rs +++ b/shared/comm-lib/src/tools.rs @@ -198,6 +198,13 @@ assert!(!is_valid_identifier("https://example.com")); } + #[test] + fn urlencoded_url_is_invalid() { + assert!(!is_valid_identifier( + "https%3A%2F%2Fexample.com%2Fpath%2Fto%2Fimage.jpg%3Ffoo%3Dbar%23baz" + )); + } + #[test] fn empty_is_invalid() { assert!(!is_valid_identifier(""));