diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,14 +1,18 @@ -use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; -use crate::database::ReverseIndexItem; +use crate::constants::{ + BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, +}; +use crate::database::{BlobItem, ReverseIndexItem}; +use crate::tools::MemOps; use super::{handle_db_error, AppContext}; use actix_web::error::{ - ErrorConflict, ErrorInternalServerError, ErrorNotFound, + ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, }; use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; +use tokio_stream::StreamExt; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; @@ -114,6 +118,126 @@ Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } +async fn get_blob_hash_field( + multipart_payload: &mut actix_multipart::Multipart, +) -> Result { + let Some(mut field) = multipart_payload.try_next().await? else { + debug!("Malfolmed multipart request"); + return Err(ErrorBadRequest("Bad request")); + }; + + if field.name() != "blob_hash" { + warn!("Blob hash is required as a first form field"); + return Err(ErrorBadRequest("Bad request")); + } + + let mut buf = Vec::new(); + while let Some(chunk) = field.try_next().await? { + buf.extend_from_slice(&chunk); + } + + let blob_hash = String::from_utf8(buf) + .map_err(|_| ErrorInternalServerError("Internal error"))?; + return Ok(blob_hash); +} + +#[instrument(skip_all, name = "upload_blob", fields(blob_hash))] +pub async fn upload_blob_handler( + ctx: web::Data, + mut payload: actix_multipart::Multipart, +) -> actix_web::Result { + info!("Upload blob request"); + + let blob_hash = get_blob_hash_field(&mut payload).await?; + debug!("Received blob_hash: {}", &blob_hash); + tracing::Span::current().record("blob_hash", &blob_hash); + + if ctx + .db + .find_blob_item(&blob_hash) + .await + .map_err(handle_db_error)? + .is_some() + { + warn!("Blob with given hash already exists"); + return Err(ErrorConflict("Conflict")); + } + + trace!("Receiving blob data"); + let blob_item = BlobItem::new(blob_hash); + let mut current_chunk: Vec = Vec::new(); + let mut upload_session: Option = None; + while let Some(mut blob_field) = payload.try_next().await? { + let field_name = blob_field.name(); + if field_name != "blob_data" { + warn!( + field_name, + "Malfolmed request: 'blob_data' multipart field expected." + ); + return Err(ErrorBadRequest("Bad request")); + } + while let Some(chunk) = blob_field.try_next().await? { + if upload_session.is_none() { + upload_session = Some( + ctx + .s3 + .start_upload_session(&blob_item.s3_path) + .await + .map_err(|err| { + error!("Failed to start S3 upload session: {:?}", err); + ErrorInternalServerError("Internal error") + })?, + ); + } + let uploader = upload_session + .as_mut() + .expect("Upload session should exist at this point"); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + let mut chunk = chunk.to_vec(); + current_chunk.append(&mut chunk); + if current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { + trace!( + chunk_size = current_chunk.len(), + "Chunk size exceeded, adding new S3 part" + ); + if let Err(err) = uploader.add_part(current_chunk.take_out()).await { + error!("Failed to upload S3 part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + } + } + + let mut uploader = upload_session.ok_or_else(|| { + // This also happens when client sends form without 'blob_data' field + warn!("No blob data was provided"); + ErrorBadRequest("Bad request") + })?; + + if !current_chunk.is_empty() { + if let Err(err) = uploader.add_part(current_chunk).await { + error!("Failed to upload final part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + + if let Err(err) = uploader.finish_upload().await { + error!("Failed to finish upload session: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + + trace!("Upload finished, saving blob item to DB: {:?}", &blob_item); + ctx + .db + .put_blob_item(blob_item) + .await + .map_err(handle_db_error)?; + + Ok(HttpResponse::NoContent().finish()) +} + #[instrument( name = "delete_blob", skip_all, diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -40,6 +40,7 @@ ) .service( web::resource("/blob") + .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)), ) })