diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -25,6 +25,10 @@ /// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; +// HTTP constants + +pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; + // DynamoDB constants pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/context.rs @@ -18,7 +18,6 @@ } impl AppContext { - #[allow(dead_code)] pub async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, @@ -34,7 +33,6 @@ } } - #[allow(dead_code)] pub async fn find_s3_path_by_holder( &self, holder: &str, diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs new file mode 100644 --- /dev/null +++ b/services/blob/src/http/handlers/blob.rs @@ -0,0 +1,63 @@ +use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; + +use super::AppContext; +use actix_web::error::ErrorInternalServerError; +use actix_web::{web, Error as HttpError, HttpResponse}; +use anyhow::Result; +use async_stream::{try_stream, AsyncStream}; +use tracing::{error, info, instrument, trace}; +use tracing_futures::Instrument; + +#[instrument( + name = "get_blob", + skip_all, + fields(holder = %params.as_ref().as_str(), s3_path)) +] +pub async fn get_blob_handler( + ctx: web::Data, + params: web::Path, +) -> actix_web::Result { + info!("Get blob request"); + let holder = params.into_inner(); + let s3_path = ctx.find_s3_path_by_holder(&holder).await?; + tracing::Span::current().record("s3_path", s3_path.to_full_path()); + + let object_metadata = + ctx.s3.get_object_metadata(&s3_path).await.map_err(|err| { + error!("Failed to get S3 object metadata: {:?}", err); + ErrorInternalServerError("server error") + })?; + let file_size: u64 = + object_metadata.content_length().try_into().map_err(|err| { + error!("Failed to get S3 object content length: {:?}", err); + ErrorInternalServerError("server error") + })?; + + // Stream the data in chunks to avoid loading the whole file into memory. + let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; + + let s3 = ctx.s3.clone(); + + let stream: AsyncStream, _> = try_stream! { + let mut offset: u64 = 0; + while offset < file_size { + let next_size = std::cmp::min(chunk_size, file_size - offset); + let range = offset..(offset + next_size); + trace!(?range, "Getting {} bytes of data", next_size); + + let data = s3.get_object_bytes(&s3_path, range).await.map_err(|err| { + error!("Failed to download data chunk: {:?}", err); + ErrorInternalServerError("download failed") + })?; + yield web::Bytes::from(data); + + offset += chunk_size; + } + }; + + Ok( + HttpResponse::Ok() + .content_type("application/octet-stream") + .streaming(Box::pin(stream.in_current_span())), + ) +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -9,8 +9,12 @@ mod context; use context::AppContext; -async fn hello_handler() -> impl actix_web::Responder { - "Hello, world!" +mod handlers { + pub(super) mod blob; + + // convenience exports to be used in handlers + #[allow(unused)] + use super::context::{handle_db_error, AppContext}; } pub async fn run_http_server( @@ -30,7 +34,10 @@ App::new() .wrap(tracing_actix_web::TracingLogger::default()) .app_data(web::Data::new(ctx)) - .service(web::resource("/hello").route(web::get().to(hello_handler))) + .service( + web::resource("/blob/{holder}") + .route(web::get().to(handlers::blob::get_blob_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run()