diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index 53f44f47a..364bcbaf8 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,49 +1,53 @@ // Assorted constants pub const DEFAULT_GRPC_PORT: u16 = 50051; pub const DEFAULT_HTTP_PORT: u16 = 51001; pub const AWS_REGION: &str = "us-east-2"; pub const LOCALSTACK_URL: &str = "http://localstack:4566"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; /// 4MB limit /// /// WARNING: use keeping in mind that grpc adds its own headers to messages /// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md /// so the message that actually is being sent over the network looks like this /// ``` /// [Compressed-Flag] [Message-Length] [Message] /// [Compressed-Flag] 1 byte - added by grpc /// [Message-Length] 4 bytes - added by grpc /// [Message] N bytes - actual data /// ``` /// so for every message we get 5 additional bytes of data /// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), /// gRPC stream may contain more than one message pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; /// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; +// HTTP constants + +pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; + // DynamoDB constants pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path"; pub const BLOB_TABLE_CREATED_FIELD: &str = "created"; pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index"; pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder"; pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index"; // Environment variables pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // S3 constants pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob"; pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs index 190015dc6..a6534fdcb 100644 --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/context.rs @@ -1,74 +1,72 @@ use crate::database::ReverseIndexItem; use crate::database::{BlobItem, DatabaseClient, Error as DBError}; use crate::s3::{S3Client, S3Path}; use actix_web::error::{ ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; use actix_web::Error as HttpError; use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use tracing::{debug, error, warn}; /// This structure is passed to every HTTP request handler /// It should be cloneable because each HTTP worker thread receives a copy #[derive(Clone)] pub struct AppContext { pub db: DatabaseClient, pub s3: S3Client, } impl AppContext { - #[allow(dead_code)] pub async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, ) -> Result { let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("No blob found for {:?}", reverse_index_item); Err(ErrorNotFound("blob not found")) } Err(err) => Err(handle_db_error(err)), } } - #[allow(dead_code)] pub async fn find_s3_path_by_holder( &self, holder: &str, ) -> Result { match self.db.find_reverse_index_by_holder(holder).await { Ok(Some(reverse_index)) => { self.find_s3_path_by_reverse_index(&reverse_index).await } Ok(None) => { debug!("No db entry found for holder {:?}", holder); Err(ErrorNotFound("blob not found")) } Err(err) => Err(handle_db_error(err)), } } } pub fn handle_db_error(db_error: DBError) -> HttpError { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } DBError::Blob(blob_err) => { error!("Encountered Blob database error: {}", blob_err); ErrorInternalServerError("Internal error") } err => { error!("Encountered an unexpected error: {}", err); ErrorInternalServerError("unexpected error") } } } diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs new file mode 100644 index 000000000..58a2f46b4 --- /dev/null +++ b/services/blob/src/http/handlers/blob.rs @@ -0,0 +1,63 @@ +use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; + +use super::AppContext; +use actix_web::error::ErrorInternalServerError; +use actix_web::{web, Error as HttpError, HttpResponse}; +use anyhow::Result; +use async_stream::{try_stream, AsyncStream}; +use tracing::{error, info, instrument, trace}; +use tracing_futures::Instrument; + +#[instrument( + name = "get_blob", + skip_all, + fields(holder = %params.as_ref().as_str(), s3_path)) +] +pub async fn get_blob_handler( + ctx: web::Data, + params: web::Path, +) -> actix_web::Result { + info!("Get blob request"); + let holder = params.into_inner(); + let s3_path = ctx.find_s3_path_by_holder(&holder).await?; + tracing::Span::current().record("s3_path", s3_path.to_full_path()); + + let object_metadata = + ctx.s3.get_object_metadata(&s3_path).await.map_err(|err| { + error!("Failed to get S3 object metadata: {:?}", err); + ErrorInternalServerError("server error") + })?; + let file_size: u64 = + object_metadata.content_length().try_into().map_err(|err| { + error!("Failed to get S3 object content length: {:?}", err); + ErrorInternalServerError("server error") + })?; + + // Stream the data in chunks to avoid loading the whole file into memory. + let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; + + let s3 = ctx.s3.clone(); + + let stream: AsyncStream, _> = try_stream! { + let mut offset: u64 = 0; + while offset < file_size { + let next_size = std::cmp::min(chunk_size, file_size - offset); + let range = offset..(offset + next_size); + trace!(?range, "Getting {} bytes of data", next_size); + + let data = s3.get_object_bytes(&s3_path, range).await.map_err(|err| { + error!("Failed to download data chunk: {:?}", err); + ErrorInternalServerError("download failed") + })?; + yield web::Bytes::from(data); + + offset += chunk_size; + } + }; + + Ok( + HttpResponse::Ok() + .content_type("application/octet-stream") + .streaming(Box::pin(stream.in_current_span())), + ) +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 7c8f1d4d5..f135936bb 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,40 +1,47 @@ use crate::config::CONFIG; use crate::database::DatabaseClient; use crate::s3::S3Client; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod context; use context::AppContext; -async fn hello_handler() -> impl actix_web::Responder { - "Hello, world!" +mod handlers { + pub(super) mod blob; + + // convenience exports to be used in handlers + #[allow(unused)] + use super::context::{handle_db_error, AppContext}; } pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { // context that is passed to every handler let ctx = AppContext { db: db_client.to_owned(), s3: s3_client.to_owned(), }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .app_data(web::Data::new(ctx)) - .service(web::resource("/hello").route(web::get().to(hello_handler))) + .service( + web::resource("/blob/{holder}") + .route(web::get().to(handlers::blob::get_blob_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) }