diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -9,8 +9,12 @@ use super::{handle_db_error, AppContext}; use actix_web::error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, + ErrorRangeNotSatisfiable, +}; +use actix_web::{ + http::header::{ByteRangeSpec, Range}, + web, Error as HttpError, HttpResponse, }; -use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; @@ -18,6 +22,47 @@ use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; +/// Returns a tuple of first and last byte number (inclusive) represented by given range header. +fn parse_range_header( + range_header: &Option>, + file_size: u64, +) -> actix_web::Result<(u64, u64)> { + let (range_start, range_end): (u64, u64) = match range_header { + Some(web::Header(Range::Bytes(ranges))) => { + if ranges.len() > 1 { + return Err(ErrorBadRequest("Multiple ranges not supported")); + } + + match ranges[0] { + ByteRangeSpec::FromTo(start, end) => { + if end >= file_size || start > end { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (start, end) + } + ByteRangeSpec::From(start) => { + if start >= file_size { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (start, file_size - 1) + } + ByteRangeSpec::Last(length) => { + if length >= file_size { + return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); + } + (file_size - length, file_size - 1) + } + } + } + Some(web::Header(Range::Unregistered(..))) => { + return Err(ErrorBadRequest("Use ranges registered at IANA")); + } + None => (0, file_size - 1), + }; + + Ok((range_start, range_end)) +} + #[instrument( name = "get_blob", skip_all, @@ -26,6 +71,7 @@ pub async fn get_blob_handler( ctx: web::Data, params: web::Path, + range_header: Option>, ) -> actix_web::Result { info!("Get blob request"); let blob_hash = params.into_inner(); @@ -57,24 +103,44 @@ let s3 = ctx.s3.clone(); + let (range_start, range_end): (u64, u64) = + parse_range_header(&range_header, file_size)?; + let stream: AsyncStream, _> = try_stream! { - let mut offset: u64 = 0; - while offset < file_size { - let next_size = std::cmp::min(chunk_size, file_size - offset); - let range = offset..(offset + next_size); - trace!(?range, "Getting {} bytes of data", next_size); + debug!(?range_start, ?range_end, "Getting range of data"); + let mut offset: u64 = range_start; - let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?; - yield web::Bytes::from(data); + while offset < range_end { + let next_size = std::cmp::min(chunk_size, range_end - offset + 1); + let range = offset..(offset + next_size); + trace!(?range, "Getting {} bytes of data", next_size); - offset += chunk_size; - } + let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?; + yield web::Bytes::from(data); + + offset += chunk_size; + } }; + let content_length = (range_end - range_start + 1).to_string(); + + if range_header.is_some() { + return Ok( + HttpResponse::PartialContent() + .content_type("application/octet-stream") + .append_header(("Content-Length", content_length)) + .append_header(( + "Content-Range", + format!("bytes {}-{}/{}", range_start, range_end, file_size), + )) + .streaming(Box::pin(stream.in_current_span())), + ); + } + Ok( HttpResponse::Ok() .content_type("application/octet-stream") - .append_header(("Content-Length", file_size.to_string())) + .append_header(("Content-Length", content_length)) .streaming(Box::pin(stream.in_current_span())), ) }