diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,6 +1453,7 @@ "once_cell", "prost", "regex", + "reqwest 0.11.27", "serde", "serde_json", "tokio", diff --git a/services/blob/Cargo.toml b/services/blob/Cargo.toml --- a/services/blob/Cargo.toml +++ b/services/blob/Cargo.toml @@ -27,6 +27,7 @@ once_cell = { workspace = true } prost = { workspace = true } regex = { workspace = true } +reqwest = { workspace = true, features = ["stream", "rustls-tls"] } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-stream = { workspace = true } diff --git a/services/blob/src/http/handlers/media.rs b/services/blob/src/http/handlers/media.rs --- a/services/blob/src/http/handlers/media.rs +++ b/services/blob/src/http/handlers/media.rs @@ -3,12 +3,15 @@ use crate::http::utils::parse_range_header; use crate::service::BlobService; -use actix_web::error::ErrorBadRequest; +use actix_web::error::{ErrorBadRequest, ErrorServiceUnavailable}; use actix_web::{http::header::Range, web, HttpResponse}; use async_stream::try_stream; -use comm_lib::blob::types::http::BlobUploadMultimediaResponse; +use comm_lib::blob::types::http::{ + BlobUploadMultimediaResponse, MirrorMultimediaRequest, MirroredMediaInfo, +}; use comm_lib::blob::types::BlobInfo; use comm_lib::http::multipart; +use http::header::CONTENT_TYPE; use http::uri::Scheme; use tokio_stream::StreamExt; use tracing::{info, instrument, trace, warn}; @@ -152,6 +155,90 @@ Ok(HttpResponse::Created().json(response)) } +#[instrument(skip_all, name = "mirror_media")] +pub async fn mirror_media( + service: web::Data, + payload: web::Json, +) -> actix_web::Result { + let MirrorMultimediaRequest { medias } = payload.into_inner(); + info!("Mirror multimedia request for {} medias", medias.len()); + + let media_blob_infos: Vec<(MirroredMediaInfo, BlobInfo)> = medias + .into_iter() + .map(|media| { + let blob_info = BlobInfo::from_bytes(media.url.as_bytes()); + (media, blob_info) + }) + .collect(); + + let blob_hashes = media_blob_infos + .iter() + .map(|(_, info)| &info.blob_hash) + .collect(); + let already_existing = service.find_existing_blobs(blob_hashes).await?; + tracing::debug!("Found {} already mirrored media.", already_existing.len()); + + let new_media = media_blob_infos + .into_iter() + .filter(|(_, blob_info)| !already_existing.contains(&blob_info.blob_hash)); + + let mut cnt = 0; + for (media, blob_info) in new_media { + trace!("Mirroring '{}'", &media.url); + + let response = reqwest::get(&media.url).await.map_err(|err| { + tracing::error!("Fetching media failed: {err:?}"); + ErrorServiceUnavailable("media_unavailable") + })?; + + let content_type = response + .headers() + .get(CONTENT_TYPE) + .cloned() + .and_then(|value| value.to_str().map(String::from).ok()); + trace!("Found content type: {:?}", content_type); + + let custom_metadata = serde_json::json!({ + "originalUrl": media.url, + "originalMediaMetadata": media.original_metadata, + }); + let media_info = MediaInfo { + content_type: content_type.clone(), + custom_metadata: Some(serde_json::to_string(&custom_metadata).unwrap()), + }; + + trace!(?media_info, ?blob_info, "Creating blob and holder."); + + let mut stream = response.bytes_stream(); + let stream = try_stream! { + while let Some(chunk) = stream.try_next().await? { + yield chunk; + } + }; + service + .put_blob(&blob_info.blob_hash, Some(media_info), stream) + .await?; + service + .assign_holder_with_tags( + &blob_info.blob_hash, + &blob_info.holder, + &["media".to_string(), "mirrored".to_string()], + ) + .await?; + + tracing::debug!( + ?blob_info, + ?content_type, + "Mirror success for '{}'.", + &media.url + ); + cnt += 1; + } + + info!("Successfully mirrored {} multimedia.", cnt); + Ok(HttpResponse::Ok().finish()) +} + fn validate_media_id(media_id: &str) -> Result<(), actix_web::Error> { if comm_lib::tools::is_valid_identifier(media_id) { return Ok(()); @@ -159,7 +246,6 @@ if let Ok(decoded_url) = urlencoding::decode(media_id) { if decoded_url.parse::().is_ok_and(|uri| { - // TODO: maybe additional validation here uri.scheme().is_some_and(|scheme| { *scheme == Scheme::HTTP || *scheme == Scheme::HTTPS }) diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -58,13 +58,18 @@ .wrap(auth_middleware.clone()) .route(web::post().to(handlers::metadata::get_blob_sizes)), ) + .service( + web::resource("/media/mirror") + .wrap(auth_middleware.clone()) + .route(web::post().to(handlers::media::mirror_media)), + ) .service( web::resource("/media/{media_id}") .route(web::get().to(handlers::media::get_media_handler)), ) .service( web::resource("/media") - .wrap(auth_middleware) + .wrap(auth_middleware.clone()) .route(web::post().to(handlers::media::upload_media_handler)), ) }) diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -98,6 +98,17 @@ pub metadata: Option, } + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct MirroredMediaInfo { + pub url: String, + pub original_metadata: String, + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub struct MirrorMultimediaRequest { + pub medias: Vec, + } + // impls impl From> for RemoveHoldersRequest { fn from(requests: Vec) -> Self {