diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -12,6 +12,79 @@ holder: String, } +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AssignHoldersPayload { + requests: Vec, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +struct HolderAssignmentResult { + #[serde(flatten)] + request: BlobHashAndHolder, + success: bool, + data_exists: bool, + holder_already_exists: bool, +} +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +struct AssignHoldersResponse { + results: Vec, +} + +#[instrument(name = "assign_multiple_holders", skip_all)] +pub async fn assign_holders_handler( + service: web::Data, + payload: web::Json, +) -> actix_web::Result { + use crate::database::DBError; + use crate::service::BlobServiceError; + + let AssignHoldersPayload { requests } = payload.into_inner(); + info!("Assign holder request for {} holders", requests.len()); + validate_request(&requests)?; + + let mut results = Vec::with_capacity(requests.len()); + + for item in requests { + let BlobHashAndHolder { blob_hash, holder } = &item; + let result = match service.assign_holder(blob_hash, holder).await { + Ok(data_exists) => HolderAssignmentResult { + request: item, + success: true, + data_exists, + holder_already_exists: false, + }, + Err(BlobServiceError::DB(DBError::ItemAlreadyExists)) => { + let data_exists = + service.blob_hash_exists(blob_hash).await.unwrap_or(false); + HolderAssignmentResult { + request: item, + success: true, + data_exists, + holder_already_exists: true, + } + } + Err(err) => { + warn!("Holder assignment error: {:?}", err); + let data_exists = + service.blob_hash_exists(blob_hash).await.unwrap_or(false); + HolderAssignmentResult { + request: item, + success: false, + data_exists, + holder_already_exists: false, + } + } + }; + results.push(result); + } + + let response = AssignHoldersResponse { results }; + Ok(HttpResponse::Ok().json(web::Json(response))) +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct RemoveHoldersPayload { diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -48,6 +48,7 @@ .service( web::resource("/holders") .wrap(auth_middleware) + .route(web::post().to(handlers::holders::assign_holders_handler)) .route(web::delete().to(handlers::holders::remove_holders_handler)), ) }) diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -282,6 +282,19 @@ Ok(()) } + pub async fn blob_hash_exists( + &self, + blob_hash: impl Into, + ) -> BlobServiceResult { + match self.db.get_blob_item(blob_hash).await { + Ok(item) => Ok(item.is_some()), + Err(err) => { + warn!("Failed to check if blob exists: {err:?}"); + Err(err.into()) + } + } + } + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked"