diff --git a/services/blob/src/http/errors.rs b/services/blob/src/http/errors.rs --- a/services/blob/src/http/errors.rs +++ b/services/blob/src/http/errors.rs @@ -76,6 +76,10 @@ debug!("Received request input error: {0:?} - {0}", err); ErrorBadRequest("bad request") } + BlobServiceError::BlobIsNotMedia => { + debug!("Tried to directly access a blob that was not a media"); + ErrorBadRequest("bad request") + } BlobServiceError::InviteLinkError(invite_link_error) => { match invite_link_error { InviteLinkError::Offensive => { diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,15 +1,13 @@ use std::collections::HashSet; use crate::http::errors::handle_blob_service_error; +use crate::http::utils::parse_range_header; use crate::service::BlobService; use crate::validate_identifier; -use actix_web::error::{ErrorBadRequest, ErrorRangeNotSatisfiable}; +use actix_web::error::ErrorBadRequest; use actix_web::web::Bytes; -use actix_web::{ - http::header::{ByteRangeSpec, Range}, - web, HttpResponse, -}; +use actix_web::{http::header::Range, web, HttpResponse}; use async_stream::try_stream; use base64::Engine; use comm_lib::blob::types::http::{ @@ -20,47 +18,6 @@ use tracing::{debug, info, instrument, trace, warn}; use tracing_futures::Instrument; -/// Returns a tuple of first and last byte number (inclusive) represented by given range header. -fn parse_range_header( - range_header: &Option>, - file_size: u64, -) -> actix_web::Result<(u64, u64)> { - let (range_start, range_end): (u64, u64) = match range_header { - Some(web::Header(Range::Bytes(ranges))) => { - if ranges.len() > 1 { - return Err(ErrorBadRequest("Multiple ranges not supported")); - } - - match ranges[0] { - ByteRangeSpec::FromTo(start, end) => { - if end >= file_size || start > end { - return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); - } - (start, end) - } - ByteRangeSpec::From(start) => { - if start >= file_size { - return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); - } - (start, file_size - 1) - } - ByteRangeSpec::Last(length) => { - if length >= file_size { - return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); - } - (file_size - length, file_size - 1) - } - } - } - Some(web::Header(Range::Unregistered(..))) => { - return Err(ErrorBadRequest("Use ranges registered at IANA")); - } - None => (0, file_size - 1), - }; - - Ok((range_start, range_end)) -} - #[instrument( name = "get_blob", skip_all, diff --git a/services/blob/src/http/handlers/media.rs b/services/blob/src/http/handlers/media.rs --- a/services/blob/src/http/handlers/media.rs +++ b/services/blob/src/http/handlers/media.rs @@ -1,5 +1,6 @@ use crate::database::types::MediaInfo; use crate::http::errors::handle_blob_service_error; +use crate::http::utils::parse_range_header; use crate::service::BlobService; use actix_web::error::ErrorBadRequest; @@ -8,10 +9,72 @@ use comm_lib::blob::types::http::BlobUploadMultimediaResponse; use comm_lib::blob::types::BlobInfo; use comm_lib::http::multipart; +use http::uri::Scheme; use tokio_stream::StreamExt; use tracing::{info, instrument, trace, warn}; use tracing_futures::Instrument; +#[instrument( + name = "get_media", + skip_all, + fields(media_id = %params.as_ref().as_str(), s3_path)) +] +pub async fn get_media_handler( + service: web::Data, + params: web::Path, + range_header: Option>, +) -> actix_web::Result { + info!("Get media request"); + let media_id = params.into_inner(); + validate_media_id(&media_id)?; + + trace!("Initializing download session"); + let (mut download, media_info) = + service.create_media_download(&media_id).await?; + + let total_size = download.blob_size; + let (range_start, range_end): (u64, u64) = + parse_range_header(&range_header, total_size)?; + download.set_byte_range(range_start..=range_end); + let content_length = download.download_size(); + + let stream = download + .into_stream() + .map(|data| match data { + Ok(bytes) => Ok(web::Bytes::from(bytes)), + Err(err) => { + warn!("Error during download stream: {:?}", err); + Err(handle_blob_service_error(&err)) + } + }) + .in_current_span(); + + let content_type = media_info + .content_type + .as_deref() + .unwrap_or("application/octet-stream"); + + if range_header.is_some() { + return Ok( + HttpResponse::PartialContent() + .content_type(content_type) + .append_header(("Content-Length", content_length)) + .append_header(( + "Content-Range", + format!("bytes {}-{}/{}", range_start, range_end, total_size), + )) + .streaming(Box::pin(stream)), + ); + } + + Ok( + HttpResponse::Ok() + .content_type(content_type) + .append_header(("Content-Length", content_length)) + .streaming(Box::pin(stream)), + ) +} + #[instrument(skip_all, name = "upload_media", fields(blob_hash))] pub async fn upload_media_handler( service: web::Data, @@ -88,3 +151,22 @@ }; Ok(HttpResponse::Created().json(response)) } + +fn validate_media_id(media_id: &str) -> Result<(), actix_web::Error> { + if comm_lib::tools::is_valid_identifier(media_id) { + return Ok(()); + } + + if let Ok(decoded_url) = urlencoding::decode(media_id) { + if decoded_url.parse::().is_ok_and(|uri| { + // TODO: maybe additional validation here + uri.scheme().is_some_and(|scheme| { + *scheme == Scheme::HTTP || *scheme == Scheme::HTTPS + }) + }) { + return Ok(()); + } + } + + Err(ErrorBadRequest("bad request")) +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -58,6 +58,10 @@ .wrap(auth_middleware.clone()) .route(web::post().to(handlers::metadata::get_blob_sizes)), ) + .service( + web::resource("/media/{media_id}") + .route(web::get().to(handlers::media::get_media_handler)), + ) .service( web::resource("/media") .wrap(auth_middleware) diff --git a/services/blob/src/http/utils.rs b/services/blob/src/http/utils.rs --- a/services/blob/src/http/utils.rs +++ b/services/blob/src/http/utils.rs @@ -1,4 +1,9 @@ use actix_web::error::ErrorForbidden; +use actix_web::error::{ErrorBadRequest, ErrorRangeNotSatisfiable}; +use actix_web::{ + http::header::{ByteRangeSpec, Range}, + web, +}; use comm_lib::auth::AuthorizationCredential; /// Validates given identifier variable and returns HTTP 400 @@ -29,3 +34,44 @@ )), } } + +/// Returns a tuple of first and last byte number (inclusive) represented by given range header. +pub fn parse_range_header( + range_header: &Option>, + file_size: u64, +) -> actix_web::Result<(u64, u64)> { + let (range_start, range_end): (u64, u64) = match range_header { + Some(web::Header(Range::Bytes(ranges))) => { + if ranges.len() > 1 { + return Err(ErrorBadRequest("Multiple ranges not supported")); + } + + match ranges[0] { + ByteRangeSpec::FromTo(start, end) => { + if end >= file_size || start > end { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (start, end) + } + ByteRangeSpec::From(start) => { + if start >= file_size { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (start, file_size - 1) + } + ByteRangeSpec::Last(length) => { + if length >= file_size { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (file_size - length, file_size - 1) + } + } + } + Some(web::Header(Range::Unregistered(..))) => { + return Err(ErrorBadRequest("Use ranges registered at IANA")); + } + None => (0, file_size - 1), + }; + + Ok((range_start, range_end)) +} diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -44,6 +44,7 @@ pub enum BlobServiceError { BlobNotFound, BlobAlreadyExists, + BlobIsNotMedia, DB(DBError), S3(S3Error), #[from(ignore)] @@ -115,7 +116,6 @@ &self, blob_hash: impl Into, ) -> BlobServiceResult { - // 1. Get S3 path let s3_path = match self.db.get_blob_item(blob_hash.into()).await { Ok(Some(BlobItemRow { s3_path, .. })) => Ok(s3_path), Ok(None) => { @@ -124,13 +124,43 @@ } Err(err) => Err(BlobServiceError::DB(err)), }?; - debug!("S3 path: {:?}", s3_path); - // 2. Get S3 object size + self.create_download_session(s3_path).await + } + + pub async fn create_media_download( + &self, + media_id: &str, + ) -> BlobServiceResult<(BlobDownloadObject, MediaInfo)> { + let blob_hash = BlobInfo::from_bytes(media_id.as_bytes()).blob_hash; + + let blob_item = match self.db.get_blob_item(blob_hash).await { + Ok(Some(item)) => Ok(item), + Ok(None) => { + debug!("Blob not found"); + Err(BlobServiceError::BlobNotFound) + } + Err(err) => Err(BlobServiceError::DB(err)), + }?; + + let Some(media_info) = blob_item.media_info else { + debug!("Blob is not media"); + return Err(BlobServiceError::BlobIsNotMedia); + }; + + let download_session = + self.create_download_session(blob_item.s3_path).await?; + Ok((download_session, media_info)) + } + + async fn create_download_session( + &self, + s3_path: S3Path, + ) -> BlobServiceResult { let blob_size = self.s3.get_object_size(&s3_path).await?; + debug!("S3 path: {:?}", s3_path); debug!("S3 object size: {} bytes", blob_size); - // 3. Create download session let session = BlobDownloadObject { s3_path, blob_size,