diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs index 03efc9620..741dc49d5 100644 --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/context.rs @@ -1,70 +1,133 @@ -use crate::database::errors::Error as DBError; +use crate::database::errors::{BlobDBError, Error as DBError}; use crate::database::old::{BlobItem, DatabaseClient, ReverseIndexItem}; use crate::s3::{Error as S3Error, S3Client, S3Path}; +use crate::service::BlobServiceError; use actix_web::error::{ - ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, + ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; -use actix_web::Error as HttpError; +use actix_web::{Error as HttpError, HttpResponse, ResponseError}; use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; -use tracing::{debug, error, warn}; +use http::StatusCode; +use tracing::{debug, error, trace, warn}; /// This structure is passed to every HTTP request handler /// It should be cloneable because each HTTP worker thread receives a copy #[derive(Clone)] pub struct AppContext { pub db: DatabaseClient, pub s3: S3Client, } impl AppContext { pub async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, ) -> Result { let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("No blob found for {:?}", reverse_index_item); Err(ErrorNotFound("blob not found")) } Err(err) => Err(handle_db_error(err)), } } } pub fn handle_db_error(db_error: DBError) -> HttpError { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } DBError::Blob(blob_err) => { error!("Encountered Blob database error: {}", blob_err); ErrorInternalServerError("Internal error") } err => { error!("Encountered an unexpected error: {}", err); ErrorInternalServerError("unexpected error") } } } pub fn handle_s3_error(s3_error: S3Error) -> HttpError { match s3_error { S3Error::EmptyUpload => { warn!("Empty upload. Aborting"); ErrorBadRequest("Empty upload") } err => { error!("Encountered S3 error: {:?}", err); ErrorInternalServerError("Internal error") } } } + +pub(super) fn handle_blob_service_error(err: &BlobServiceError) -> HttpError { + trace!("Handling blob service error: {:?}", err); + match err { + BlobServiceError::BlobNotFound => ErrorNotFound("not found"), + BlobServiceError::BlobAlreadyExists + | BlobServiceError::DB(DBError::ItemAlreadyExists) => { + ErrorConflict("blob already exists") + } + BlobServiceError::DB(db_err) => match db_err { + DBError::AwsSdk(DynamoDBError::InternalServerError(_)) + | DBError::AwsSdk( + DynamoDBError::ProvisionedThroughputExceededException(_), + ) + | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { + warn!("AWS transient error occurred"); + ErrorServiceUnavailable("please retry") + } + DBError::Blob(BlobDBError::InvalidInput(_)) => { + ErrorBadRequest("bad request") + } + unexpected => { + error!("Received an unexpected DB error: {0:?} - {0}", unexpected); + ErrorInternalServerError("server error") + } + }, + BlobServiceError::S3(s3_err) => match s3_err { + S3Error::AwsSdk(aws_sdk_s3::Error::NotFound(_)) + | S3Error::AwsSdk(aws_sdk_s3::Error::NoSuchKey(_)) => { + error!("Data inconsistency! Blob is present in database but not present in S3!"); + ErrorInternalServerError("server error") + } + S3Error::EmptyUpload => ErrorBadRequest("empty upload"), + unexpected => { + error!("Received an unexpected S3 error: {0:?} - {0}", unexpected); + ErrorInternalServerError("server error") + } + }, + BlobServiceError::InputError(err) => { + debug!("Received request input error: {0:?} - {0}", err); + ErrorBadRequest("bad request") + } + err => { + error!("Received an unexpected error: {0:?} - {0}", err); + ErrorInternalServerError("server error") + } + } +} + +/// This allow us to `await?` blob service calls in HTTP handlers +impl ResponseError for BlobServiceError { + fn error_response(&self) -> HttpResponse { + handle_blob_service_error(self).error_response() + } + + fn status_code(&self) -> StatusCode { + handle_blob_service_error(self) + .as_response_error() + .status_code() + } +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 663a9ffc5..5688dc66f 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,71 +1,73 @@ -use crate::config::CONFIG; use crate::database::old::DatabaseClient; use crate::s3::S3Client; +use crate::{config::CONFIG, service::BlobService}; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod context; use context::AppContext; mod utils; mod handlers { pub(super) mod blob; // convenience exports to be used in handlers use super::context::{handle_db_error, AppContext}; } fn cors_config() -> Cors { if CONFIG.is_sandbox { // All origins, methods, request headers and exposed headers allowed. // Credentials supported. Max age 1 hour. Does not send wildcard. return Cors::permissive(); } Cors::default() .allowed_origin("https://web.comm.app") // for local development using prod service .allowed_origin("http://localhost:3000") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allow_any_header() .expose_any_header() } pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, + blob_service: BlobService, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { // context that is passed to every handler let ctx = AppContext { db: db_client.to_owned(), s3: s3_client.to_owned(), }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(cors_config()) .app_data(web::Data::new(ctx)) + .app_data(web::Data::new(blob_service.to_owned())) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)), ) .service( web::resource("/blob") .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)) .route(web::delete().to(handlers::blob::remove_holder_handler)), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs index a05d65769..fde55bece 100644 --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,37 +1,49 @@ pub mod config; pub mod constants; pub mod database; pub mod grpc; pub mod http; pub mod s3; pub mod service; pub mod tools; use anyhow::Result; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; +use crate::service::BlobServiceConfig; + fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db = database::old::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); + let new_db = database::DatabaseClient::new(&aws_config); + let service = service::BlobService::new( + new_db, + s3.clone(), + BlobServiceConfig { + instant_delete_orphaned_blobs: true, + ..Default::default() + }, + ); + tokio::select! { - http_result = crate::http::run_http_server(db.clone(), s3.clone()) => http_result, + http_result = crate::http::run_http_server(db.clone(), s3.clone(), service) => http_result, grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, } }