diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs new file mode 100644 index 000000000..2af5404dc --- /dev/null +++ b/services/blob/src/http/handlers/holders.rs @@ -0,0 +1,82 @@ +use actix_web::error::ErrorBadRequest; +use actix_web::{web, HttpResponse}; +use serde::{Deserialize, Serialize}; +use tracing::{info, instrument, trace, warn}; + +use crate::service::BlobService; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlobHashAndHolder { + blob_hash: String, + holder: String, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RemoveHoldersPayload { + requests: Vec, + #[serde(default)] + instant_delete: bool, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RemoveHoldersResponse { + failed_requests: Vec, +} + +#[instrument(name = "remove_multiple_holders", skip(service))] +pub async fn remove_holders_handler( + service: web::Data, + payload: web::Json, +) -> actix_web::Result { + let RemoveHoldersPayload { + requests, + instant_delete, + } = payload.into_inner(); + info!( + instant_delete, + "Remove request for {} holders.", + requests.len() + ); + validate_request(&requests)?; + + let mut failed_requests = Vec::new(); + // This has to be done sequentially because `service.revoke_holder()` + // performs a DDB transaction and these transactions could conflict + // with each other, e.g. if two holders were removed for the same blob hash. + for item in requests { + trace!("Removing item: {:?}", &item); + + let BlobHashAndHolder { holder, blob_hash } = &item; + if let Err(err) = service + .revoke_holder(blob_hash, holder, instant_delete) + .await + { + warn!("Holder removal failed: {:?}", err); + failed_requests.push(item); + } + } + let response = RemoveHoldersResponse { failed_requests }; + Ok(HttpResponse::Ok().json(web::Json(response))) +} + +/** + * Returns `HTTP 400 Bad Request` if one or more blob hashes or holders + * have invalid format. See [`comm_lib::tools::is_valid_identifier`] for + * valid format conditions + */ +fn validate_request(items: &[BlobHashAndHolder]) -> actix_web::Result<()> { + use comm_lib::tools::is_valid_identifier; + let all_valid = + items.iter().all(|BlobHashAndHolder { holder, blob_hash }| { + is_valid_identifier(holder) && is_valid_identifier(blob_hash) + }); + + if !all_valid { + return Err(ErrorBadRequest("One or more requests have invalid format")); + } + + Ok(()) +} diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 130dadd9c..55e57fd49 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,53 +1,59 @@ use crate::{config::CONFIG, service::BlobService}; use actix_web::{web, App, HttpResponse, HttpServer}; use anyhow::Result; use comm_lib::{ auth::AuthService, http::auth::get_comm_authentication_middleware, }; use tracing::info; mod errors; mod utils; mod handlers { pub(super) mod blob; + pub(super) mod holders; } pub async fn run_http_server( blob_service: BlobService, auth_service: AuthService, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { let auth_middleware = get_comm_authentication_middleware(); App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(comm_lib::http::cors_config( CONFIG.localstack_endpoint.is_some(), )) .app_data(auth_service.to_owned()) .app_data(web::Data::new(blob_service.to_owned())) .route("/health", web::get().to(HttpResponse::Ok)) .service( web::resource("/blob/{holder}") .wrap(auth_middleware.clone()) .route(web::get().to(handlers::blob::get_blob_handler)), ) .service( web::resource("/blob") - .wrap(auth_middleware) + .wrap(auth_middleware.clone()) .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)) .route(web::delete().to(handlers::blob::remove_holder_handler)), ) + .service( + web::resource("/holders") + .wrap(auth_middleware) + .route(web::delete().to(handlers::holders::remove_holders_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) }