Page MenuHomePhorge

D15340.1765072077.diff
No OneTemporary

Size
5 KB
Referenced Files
None
Subscribers
None

D15340.1765072077.diff

diff --git a/Cargo.lock b/Cargo.lock
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1453,6 +1453,7 @@
"once_cell",
"prost",
"regex",
+ "reqwest 0.11.27",
"serde",
"serde_json",
"tokio",
diff --git a/services/blob/Cargo.toml b/services/blob/Cargo.toml
--- a/services/blob/Cargo.toml
+++ b/services/blob/Cargo.toml
@@ -27,6 +27,7 @@
once_cell = { workspace = true }
prost = { workspace = true }
regex = { workspace = true }
+reqwest = { workspace = true, features = ["stream", "rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true }
diff --git a/services/blob/src/http/handlers/media.rs b/services/blob/src/http/handlers/media.rs
--- a/services/blob/src/http/handlers/media.rs
+++ b/services/blob/src/http/handlers/media.rs
@@ -3,12 +3,15 @@
use crate::http::utils::parse_range_header;
use crate::service::BlobService;
-use actix_web::error::ErrorBadRequest;
+use actix_web::error::{ErrorBadRequest, ErrorServiceUnavailable};
use actix_web::{http::header::Range, web, HttpResponse};
use async_stream::try_stream;
-use comm_lib::blob::types::http::BlobUploadMultimediaResponse;
+use comm_lib::blob::types::http::{
+ BlobUploadMultimediaResponse, MirrorMultimediaRequest, MirroredMediaInfo,
+};
use comm_lib::blob::types::BlobInfo;
use comm_lib::http::multipart;
+use http::header::CONTENT_TYPE;
use http::uri::Scheme;
use tokio_stream::StreamExt;
use tracing::{info, instrument, trace, warn};
@@ -152,6 +155,90 @@
Ok(HttpResponse::Created().json(response))
}
+#[instrument(skip_all, name = "mirror_media")]
+pub async fn mirror_media(
+ service: web::Data<BlobService>,
+ payload: web::Json<MirrorMultimediaRequest>,
+) -> actix_web::Result<HttpResponse> {
+ let MirrorMultimediaRequest { medias } = payload.into_inner();
+ info!("Mirror multimedia request for {} medias", medias.len());
+
+ let media_blob_infos: Vec<(MirroredMediaInfo, BlobInfo)> = medias
+ .into_iter()
+ .map(|media| {
+ let blob_info = BlobInfo::from_bytes(media.url.as_bytes());
+ (media, blob_info)
+ })
+ .collect();
+
+ let blob_hashes = media_blob_infos
+ .iter()
+ .map(|(_, info)| &info.blob_hash)
+ .collect();
+ let already_existing = service.find_existing_blobs(blob_hashes).await?;
+ tracing::debug!("Found {} already mirrored media.", already_existing.len());
+
+ let new_media = media_blob_infos
+ .into_iter()
+ .filter(|(_, blob_info)| !already_existing.contains(&blob_info.blob_hash));
+
+ let mut cnt = 0;
+ for (media, blob_info) in new_media {
+ trace!("Mirroring '{}'", &media.url);
+
+ let response = reqwest::get(&media.url).await.map_err(|err| {
+ tracing::error!("Fetching media failed: {err:?}");
+ ErrorServiceUnavailable("media_unavailable")
+ })?;
+
+ let content_type = response
+ .headers()
+ .get(CONTENT_TYPE)
+ .cloned()
+ .and_then(|value| value.to_str().map(String::from).ok());
+ trace!("Found content type: {:?}", content_type);
+
+ let custom_metadata = serde_json::json!({
+ "originalUrl": media.url,
+ "originalMediaMetadata": media.original_metadata,
+ });
+ let media_info = MediaInfo {
+ content_type: content_type.clone(),
+ custom_metadata: Some(serde_json::to_string(&custom_metadata).unwrap()),
+ };
+
+ trace!(?media_info, ?blob_info, "Creating blob and holder.");
+
+ let mut stream = response.bytes_stream();
+ let stream = try_stream! {
+ while let Some(chunk) = stream.try_next().await? {
+ yield chunk;
+ }
+ };
+ service
+ .put_blob(&blob_info.blob_hash, Some(media_info), stream)
+ .await?;
+ service
+ .assign_holder_with_tags(
+ &blob_info.blob_hash,
+ &blob_info.holder,
+ &["media".to_string(), "mirrored".to_string()],
+ )
+ .await?;
+
+ tracing::debug!(
+ ?blob_info,
+ ?content_type,
+ "Mirror success for '{}'.",
+ &media.url
+ );
+ cnt += 1;
+ }
+
+ info!("Successfully mirrored {} multimedia.", cnt);
+ Ok(HttpResponse::Ok().finish())
+}
+
fn validate_media_id(media_id: &str) -> Result<(), actix_web::Error> {
if comm_lib::tools::is_valid_identifier(media_id) {
return Ok(());
@@ -159,7 +246,6 @@
if let Ok(decoded_url) = urlencoding::decode(media_id) {
if decoded_url.parse::<http::Uri>().is_ok_and(|uri| {
- // TODO: maybe additional validation here
uri.scheme().is_some_and(|scheme| {
*scheme == Scheme::HTTP || *scheme == Scheme::HTTPS
})
diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs
--- a/services/blob/src/http/mod.rs
+++ b/services/blob/src/http/mod.rs
@@ -58,13 +58,18 @@
.wrap(auth_middleware.clone())
.route(web::post().to(handlers::metadata::get_blob_sizes)),
)
+ .service(
+ web::resource("/media/mirror")
+ .wrap(auth_middleware.clone())
+ .route(web::post().to(handlers::media::mirror_media)),
+ )
.service(
web::resource("/media/{media_id}")
.route(web::get().to(handlers::media::get_media_handler)),
)
.service(
web::resource("/media")
- .wrap(auth_middleware)
+ .wrap(auth_middleware.clone())
.route(web::post().to(handlers::media::upload_media_handler)),
)
})
diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs
--- a/shared/comm-lib/src/blob/types.rs
+++ b/shared/comm-lib/src/blob/types.rs
@@ -98,6 +98,17 @@
pub metadata: Option<String>,
}
+ #[derive(Debug, serde::Serialize, serde::Deserialize)]
+ pub struct MirroredMediaInfo {
+ pub url: String,
+ pub original_metadata: String,
+ }
+
+ #[derive(Debug, serde::Serialize, serde::Deserialize)]
+ pub struct MirrorMultimediaRequest {
+ pub medias: Vec<MirroredMediaInfo>,
+ }
+
// impls
impl From<Vec<BlobInfo>> for RemoveHoldersRequest {
fn from(requests: Vec<BlobInfo>) -> Self {

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 7, 1:47 AM (12 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5842026
Default Alt Text
D15340.1765072077.diff (5 KB)

Event Timeline