diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs new file mode 100644 index 000000000..190015dc6 --- /dev/null +++ b/services/blob/src/http/context.rs @@ -0,0 +1,74 @@ +use crate::database::ReverseIndexItem; +use crate::database::{BlobItem, DatabaseClient, Error as DBError}; +use crate::s3::{S3Client, S3Path}; +use actix_web::error::{ + ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, +}; +use actix_web::Error as HttpError; +use anyhow::Result; +use aws_sdk_dynamodb::Error as DynamoDBError; +use tracing::{debug, error, warn}; + +/// This structure is passed to every HTTP request handler +/// It should be cloneable because each HTTP worker thread receives a copy +#[derive(Clone)] +pub struct AppContext { + pub db: DatabaseClient, + pub s3: S3Client, +} + +impl AppContext { + #[allow(dead_code)] + pub async fn find_s3_path_by_reverse_index( + &self, + reverse_index_item: &ReverseIndexItem, + ) -> Result { + let blob_hash = &reverse_index_item.blob_hash; + match self.db.find_blob_item(&blob_hash).await { + Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), + Ok(None) => { + debug!("No blob found for {:?}", reverse_index_item); + Err(ErrorNotFound("blob not found")) + } + Err(err) => Err(handle_db_error(err)), + } + } + + #[allow(dead_code)] + pub async fn find_s3_path_by_holder( + &self, + holder: &str, + ) -> Result { + match self.db.find_reverse_index_by_holder(holder).await { + Ok(Some(reverse_index)) => { + self.find_s3_path_by_reverse_index(&reverse_index).await + } + Ok(None) => { + debug!("No db entry found for holder {:?}", holder); + Err(ErrorNotFound("blob not found")) + } + Err(err) => Err(handle_db_error(err)), + } + } +} + +pub fn handle_db_error(db_error: DBError) -> HttpError { + match db_error { + DBError::AwsSdk(DynamoDBError::InternalServerError(_)) + | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( + _, + )) + | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { + warn!("AWS transient error occurred"); + ErrorServiceUnavailable("please retry") + } + DBError::Blob(blob_err) => { + error!("Encountered Blob database error: {}", blob_err); + ErrorInternalServerError("Internal error") + } + err => { + error!("Encountered an unexpected error: {}", err); + ErrorInternalServerError("unexpected error") + } + } +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 29667cb5c..7c8f1d4d5 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,33 +1,40 @@ use crate::config::CONFIG; use crate::database::DatabaseClient; use crate::s3::S3Client; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; +mod context; +use context::AppContext; + async fn hello_handler() -> impl actix_web::Responder { "Hello, world!" } -// disable unused warning for now -#[allow(unused)] pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { + // context that is passed to every handler + let ctx = AppContext { + db: db_client.to_owned(), + s3: s3_client.to_owned(), + }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) + .app_data(web::Data::new(ctx)) .service(web::resource("/hello").route(web::get().to(hello_handler))) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) }