diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,14 +1,18 @@ -use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; -use crate::database::ReverseIndexItem; +use crate::constants::{ + BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, +}; +use crate::database::{BlobItem, ReverseIndexItem}; +use crate::tools::MemOps; use super::{handle_db_error, AppContext}; use actix_web::error::{ - ErrorConflict, ErrorInternalServerError, ErrorNotFound, + ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, }; use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; +use tokio_stream::StreamExt; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; @@ -114,6 +118,125 @@ Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } +async fn get_blob_hash_field( + multipart_payload: &mut actix_multipart::Multipart, +) -> Result { + let Some(mut field) = multipart_payload.try_next().await? else { + debug!("Malfolmed multipart request"); + return Err(ErrorBadRequest("Bad request")); + }; + + if field.name() != "blob_hash" { + warn!("Blob hash is required as a first form field"); + return Err(ErrorBadRequest("Bad request")); + } + + let mut buf = Vec::new(); + while let Some(chunk) = field.try_next().await? { + buf.extend_from_slice(&chunk); + } + + let blob_hash = String::from_utf8(buf) + .map_err(|_| ErrorInternalServerError("Internal error"))?; + return Ok(blob_hash); +} + +async fn process_blob_data( + multipart_payload: &mut actix_multipart::Multipart, + upload_session: &mut crate::s3::MultiPartUploadSession, +) -> Result<(), HttpError> { + let mut s3_chunk: Vec = Vec::new(); + while let Some(mut field) = multipart_payload.try_next().await? { + let field_name = field.name(); + if field_name != "blob_data" { + warn!( + field_name, + "Malfolmed request: 'blob_data' multipart field expected." + ); + return Err(ErrorBadRequest("Bad request")); + } + + while let Some(chunk) = field.try_next().await? { + let mut chunk = chunk.to_vec(); + s3_chunk.append(&mut chunk); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { + trace!( + chunk_size = s3_chunk.len(), + "Chunk size exceeded, adding new S3 part" + ); + if let Err(err) = upload_session.add_part(s3_chunk.take_out()).await { + error!("Failed to upload S3 part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + } + } + + // add the remaining data as the last S3 part + if !s3_chunk.is_empty() { + if let Err(err) = upload_session.add_part(s3_chunk).await { + error!("Failed to upload final part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + + Ok(()) +} + +#[instrument(skip_all, name = "upload_blob", fields(blob_hash))] +pub async fn upload_blob_handler( + ctx: web::Data, + mut payload: actix_multipart::Multipart, +) -> actix_web::Result { + info!("Upload blob request"); + + let blob_hash = get_blob_hash_field(&mut payload).await?; + debug!("Received blob_hash: {}", &blob_hash); + tracing::Span::current().record("blob_hash", &blob_hash); + + if ctx + .db + .find_blob_item(&blob_hash) + .await + .map_err(handle_db_error)? + .is_some() + { + warn!("Blob with given hash already exists"); + return Err(ErrorConflict("Conflict")); + } + + let blob_item = BlobItem::new(blob_hash); + let mut upload_session = ctx + .s3 + .start_upload_session(&blob_item.s3_path) + .await + .map_err(|err| { + error!("Failed to start S3 upload session: {:?}", err); + ErrorInternalServerError("Internal error") + })?; + + trace!("Receiving blob data"); + process_blob_data(&mut payload, &mut upload_session).await?; + + // TODO: Handle "No parts to upload" as HTTP 400 + if let Err(err) = upload_session.finish_upload().await { + error!("Failed to finish upload session: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + + trace!("Upload finished, saving blob item to DB: {:?}", &blob_item); + ctx + .db + .put_blob_item(blob_item) + .await + .map_err(handle_db_error)?; + + Ok(HttpResponse::NoContent().finish()) +} + #[instrument( name = "delete_blob", skip_all, diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -40,6 +40,7 @@ ) .service( web::resource("/blob") + .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)), ) }) diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, ensure, Result}; use aws_sdk_s3::{ model::{CompletedMultipartUpload, CompletedPart}, output::CreateMultipartUploadOutput, @@ -197,6 +197,12 @@ /// finishes the upload pub async fn finish_upload(&self) -> Result<()> { + // TODO: handle this as S3-specific error + ensure!( + !self.upload_parts.is_empty(), + "There are no parts to upload" + ); + let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build();