diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index b188bd4a5..95f4a7a36 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,123 +1,175 @@ use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; +use crate::database::ReverseIndexItem; use super::{handle_db_error, AppContext}; -use actix_web::error::{ErrorInternalServerError, ErrorNotFound}; +use actix_web::error::{ + ErrorConflict, ErrorInternalServerError, ErrorNotFound, +}; use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; -use tracing::{debug, error, info, instrument, trace}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; #[instrument( name = "get_blob", skip_all, fields(holder = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( ctx: web::Data, params: web::Path, ) -> actix_web::Result { info!("Get blob request"); let holder = params.into_inner(); let s3_path = ctx.find_s3_path_by_holder(&holder).await?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = ctx.s3.get_object_metadata(&s3_path).await.map_err(|err| { error!("Failed to get S3 object metadata: {:?}", err); ErrorInternalServerError("server error") })?; let file_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to get S3 object content length: {:?}", err); ErrorInternalServerError("server error") })?; // Stream the data in chunks to avoid loading the whole file into memory. let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; let s3 = ctx.s3.clone(); let stream: AsyncStream, _> = try_stream! { let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); let data = s3.get_object_bytes(&s3_path, range).await.map_err(|err| { error!("Failed to download data chunk: {:?}", err); ErrorInternalServerError("download failed") })?; yield web::Bytes::from(data); offset += chunk_size; } }; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(Box::pin(stream.in_current_span())), ) } +#[derive(Deserialize, Debug)] +pub struct AssignHolderPayload { + holder: String, + blob_hash: String, +} + +#[derive(Serialize)] +struct AssignHolderResponnse { + data_exists: bool, +} + +#[instrument(name = "assign_holder", skip(ctx))] +pub async fn assign_holder_handler( + ctx: web::Data, + payload: web::Json, +) -> actix_web::Result { + info!("Assign holder request"); + let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); + + if ctx + .db + .find_reverse_index_by_holder(&holder) + .await + .map_err(handle_db_error)? + .is_some() + { + warn!("holder already assigned"); + return Err(ErrorConflict("holder already assigned")); + } + + let data_exists = ctx + .db + .find_blob_item(&blob_hash) + .await + .map_err(handle_db_error)? + .is_some(); + debug!(data_exists, "Checked blob item existence"); + + let reverse_index_item = ReverseIndexItem { holder, blob_hash }; + ctx + .db + .put_reverse_index_item(reverse_index_item) + .await + .map_err(handle_db_error)?; + + Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) +} + #[instrument( name = "delete_blob", skip_all, fields(holder = %params.as_ref().as_str())) ] pub async fn delete_blob_handler( ctx: web::Data, params: web::Path, ) -> actix_web::Result { info!("Delete blob request"); let holder = params.into_inner(); let reverse_index_item = ctx .db .find_reverse_index_by_holder(&holder) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); ErrorNotFound("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; ctx .db .remove_reverse_index_item(&holder) .await .map_err(handle_db_error)?; // TODO: handle cleanup here properly // for now the object's being removed right away // after the last holder was removed if ctx .db .find_reverse_index_by_hash(blob_hash) .await .map_err(handle_db_error)? .is_empty() { let s3_path = ctx .find_s3_path_by_reverse_index(&reverse_index_item) .await?; debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path); ctx.s3.delete_object(&s3_path).await.map_err(|e| { error!("Failed to delete S3 object: {:?}", e); ErrorInternalServerError("server error") })?; ctx .db .remove_blob_item(blob_hash) .await .map_err(handle_db_error)?; } else { debug!("Blob still has holders, S3 object not deleted"); } Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index b19814c94..19cd45ec8 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,47 +1,51 @@ use crate::config::CONFIG; use crate::database::DatabaseClient; use crate::s3::S3Client; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod context; use context::AppContext; mod handlers { pub(super) mod blob; // convenience exports to be used in handlers use super::context::{handle_db_error, AppContext}; } pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { // context that is passed to every handler let ctx = AppContext { db: db_client.to_owned(), s3: s3_client.to_owned(), }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .app_data(web::Data::new(ctx)) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)) .route(web::delete().to(handlers::blob::delete_blob_handler)), ) + .service( + web::resource("/blob") + .route(web::post().to(handlers::blob::assign_holder_handler)), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) }