diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,11 +1,15 @@ use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; +use crate::database::ReverseIndexItem; use super::{handle_db_error, AppContext}; -use actix_web::error::{ErrorInternalServerError, ErrorNotFound}; +use actix_web::error::{ + ErrorConflict, ErrorInternalServerError, ErrorNotFound, +}; use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; -use tracing::{debug, error, info, instrument, trace}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; #[instrument( @@ -62,6 +66,54 @@ ) } +#[derive(Deserialize, Debug)] +pub struct AssignHolderPayload { + holder: String, + blob_hash: String, +} + +#[derive(Serialize)] +struct AssignHolderResponnse { + data_exists: bool, +} + +#[instrument(name = "assign_holder", skip(ctx))] +pub async fn assign_holder_handler( + ctx: web::Data, + payload: web::Json, +) -> actix_web::Result { + info!("Assign holder request"); + let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); + + if ctx + .db + .find_reverse_index_by_holder(&holder) + .await + .map_err(handle_db_error)? + .is_some() + { + warn!("holder already assigned"); + return Err(ErrorConflict("holder already assigned")); + } + + let data_exists = ctx + .db + .find_blob_item(&blob_hash) + .await + .map_err(handle_db_error)? + .is_some(); + debug!(data_exists, "Checked blob item existence"); + + let reverse_index_item = ReverseIndexItem { holder, blob_hash }; + ctx + .db + .put_reverse_index_item(reverse_index_item) + .await + .map_err(handle_db_error)?; + + Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) +} + #[instrument( name = "delete_blob", skip_all, diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -38,6 +38,10 @@ .route(web::get().to(handlers::blob::get_blob_handler)) .route(web::delete().to(handlers::blob::delete_blob_handler)), ) + .service( + web::resource("/blob") + .route(web::post().to(handlers::blob::assign_holder_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run()