diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/context.rs @@ -1,14 +1,16 @@ -use crate::database::errors::Error as DBError; +use crate::database::errors::{BlobDBError, Error as DBError}; use crate::database::old::{BlobItem, DatabaseClient, ReverseIndexItem}; use crate::s3::{Error as S3Error, S3Client, S3Path}; +use crate::service::BlobServiceError; use actix_web::error::{ - ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, + ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; -use actix_web::Error as HttpError; +use actix_web::{Error as HttpError, HttpResponse, ResponseError}; use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; -use tracing::{debug, error, warn}; +use http::StatusCode; +use tracing::{debug, error, trace, warn}; /// This structure is passed to every HTTP request handler /// It should be cloneable because each HTTP worker thread receives a copy @@ -68,3 +70,64 @@ } } } + +pub(super) fn handle_blob_service_error(err: &BlobServiceError) -> HttpError { + trace!("Handling blob service error: {:?}", err); + match err { + BlobServiceError::BlobNotFound => ErrorNotFound("not found"), + BlobServiceError::BlobAlreadyExists + | BlobServiceError::DB(DBError::ItemAlreadyExists) => { + ErrorConflict("blob already exists") + } + BlobServiceError::DB(db_err) => match db_err { + DBError::AwsSdk(DynamoDBError::InternalServerError(_)) + | DBError::AwsSdk( + DynamoDBError::ProvisionedThroughputExceededException(_), + ) + | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { + warn!("AWS transient error occurred"); + ErrorServiceUnavailable("please retry") + } + DBError::Blob(BlobDBError::InvalidInput(_)) => { + ErrorBadRequest("bad request") + } + unexpected => { + error!("Received an unexpected DB error: {0:?} - {0}", unexpected); + ErrorInternalServerError("server error") + } + }, + BlobServiceError::S3(s3_err) => match s3_err { + S3Error::AwsSdk(aws_sdk_s3::Error::NotFound(_)) + | S3Error::AwsSdk(aws_sdk_s3::Error::NoSuchKey(_)) => { + error!("Data inconsistency! Blob is present in database but not present in S3!"); + ErrorInternalServerError("server error") + } + S3Error::EmptyUpload => ErrorBadRequest("empty upload"), + unexpected => { + error!("Received an unexpected S3 error: {0:?} - {0}", unexpected); + ErrorInternalServerError("server error") + } + }, + BlobServiceError::InputError(err) => { + debug!("Received request input error: {0:?} - {0}", err); + ErrorBadRequest("bad request") + } + err => { + error!("Received an unexpected error: {0:?} - {0}", err); + ErrorInternalServerError("server error") + } + } +} + +/// This allow us to `await?` blob service calls in HTTP handlers +impl ResponseError for BlobServiceError { + fn error_response(&self) -> HttpResponse { + handle_blob_service_error(self).error_response() + } + + fn status_code(&self) -> StatusCode { + handle_blob_service_error(self) + .as_response_error() + .status_code() + } +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,6 +1,6 @@ -use crate::config::CONFIG; use crate::database::old::DatabaseClient; use crate::s3::S3Client; +use crate::{config::CONFIG, service::BlobService}; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; @@ -37,6 +37,7 @@ pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, + blob_service: BlobService, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", @@ -52,6 +53,7 @@ .wrap(tracing_actix_web::TracingLogger::default()) .wrap(cors_config()) .app_data(web::Data::new(ctx)) + .app_data(web::Data::new(blob_service.to_owned())) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)), diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -10,6 +10,8 @@ use anyhow::Result; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; +use crate::service::BlobServiceConfig; + fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -30,8 +32,18 @@ let db = database::old::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); + let new_db = database::DatabaseClient::new(&aws_config); + let service = service::BlobService::new( + new_db, + s3.clone(), + BlobServiceConfig { + instant_delete_orphaned_blobs: true, + ..Default::default() + }, + ); + tokio::select! { - http_result = crate::http::run_http_server(db.clone(), s3.clone()) => http_result, + http_result = crate::http::run_http_server(db.clone(), s3.clone(), service) => http_result, grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, } }