diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,9 +1,10 @@ -use crate::constants::{ - BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, -}; +#![allow(unused)] + +use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; use crate::database::old::{BlobItem, ReverseIndexItem}; -use crate::http::context::handle_s3_error; -use crate::tools::MemOps; +use crate::http::context::{handle_blob_service_error, handle_s3_error}; +use crate::service::BlobService; +use crate::tools::BoxedError; use crate::validate_identifier; use super::{handle_db_error, AppContext}; @@ -19,7 +20,7 @@ use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use tracing_futures::Instrument; /// Returns a tuple of first and last byte number (inclusive) represented by given range header. @@ -69,7 +70,7 @@ fields(blob_hash = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( - ctx: web::Data, + service: web::Data, params: web::Path, range_header: Option>, ) -> actix_web::Result { @@ -77,52 +78,25 @@ let blob_hash = params.into_inner(); validate_identifier!(blob_hash); - let s3_path = match ctx.db.find_blob_item(&blob_hash).await { - Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), - Ok(None) => { - debug!("Blob with given hash not found in database"); - Err(ErrorNotFound("blob not found")) - } - Err(err) => Err(handle_db_error(err)), - }?; - tracing::Span::current().record("s3_path", s3_path.to_full_path()); - - let object_metadata = ctx - .s3 - .get_object_metadata(&s3_path) - .await - .map_err(handle_s3_error)?; - let file_size: u64 = - object_metadata.content_length().try_into().map_err(|err| { - error!("Failed to get S3 object content length: {:?}", err); - ErrorInternalServerError("server error") - })?; - - // Stream the data in chunks to avoid loading the whole file into memory. - let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; - - let s3 = ctx.s3.clone(); + trace!("Initializing download session"); + let mut download = service.create_download(blob_hash).await?; + let total_size = download.blob_size; let (range_start, range_end): (u64, u64) = - parse_range_header(&range_header, file_size)?; - - let stream: AsyncStream, _> = try_stream! { - debug!(?range_start, ?range_end, "Getting range of data"); - let mut offset: u64 = range_start; - - while offset < range_end { - let next_size = std::cmp::min(chunk_size, range_end - offset + 1); - let range = offset..(offset + next_size); - trace!(?range, "Getting {} bytes of data", next_size); - - let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?; - yield web::Bytes::from(data); - - offset += chunk_size; + parse_range_header(&range_header, total_size)?; + download.set_byte_range(range_start..=range_end); + let content_length = download.download_size(); + + let stream = download + .into_stream() + .map(|data| match data { + Ok(bytes) => Ok(web::Bytes::from(bytes)), + Err(err) => { + warn!("Error during download stream: {:?}", err); + Err(handle_blob_service_error(&err)) } - }; - - let content_length = (range_end - range_start + 1).to_string(); + }) + .in_current_span(); if range_header.is_some() { return Ok( @@ -131,9 +105,9 @@ .append_header(("Content-Length", content_length)) .append_header(( "Content-Range", - format!("bytes {}-{}/{}", range_start, range_end, file_size), + format!("bytes {}-{}/{}", range_start, range_end, total_size), )) - .streaming(Box::pin(stream.in_current_span())), + .streaming(Box::pin(stream)), ); } @@ -141,7 +115,7 @@ HttpResponse::Ok() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) - .streaming(Box::pin(stream.in_current_span())), + .streaming(Box::pin(stream)), ) } @@ -156,9 +130,9 @@ data_exists: bool, } -#[instrument(name = "assign_holder", skip(ctx))] +#[instrument(name = "assign_holder", skip(service))] pub async fn assign_holder_handler( - ctx: web::Data, + service: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Assign holder request"); @@ -166,32 +140,7 @@ validate_identifier!(holder); validate_identifier!(blob_hash); - if ctx - .db - .find_reverse_index_by_holder(&holder) - .await - .map_err(handle_db_error)? - .is_some() - { - warn!("holder already assigned"); - return Err(ErrorConflict("holder already assigned")); - } - - let data_exists = ctx - .db - .find_blob_item(&blob_hash) - .await - .map_err(handle_db_error)? - .is_some(); - debug!(data_exists, "Checked blob item existence"); - - let reverse_index_item = ReverseIndexItem { holder, blob_hash }; - ctx - .db - .put_reverse_index_item(reverse_index_item) - .await - .map_err(handle_db_error)?; - + let data_exists = service.assign_holder(blob_hash, holder).await?; Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } @@ -220,54 +169,9 @@ return Ok(blob_hash); } -async fn process_blob_data( - multipart_payload: &mut actix_multipart::Multipart, - upload_session: &mut crate::s3::MultiPartUploadSession, -) -> Result<(), HttpError> { - let mut s3_chunk: Vec = Vec::new(); - while let Some(mut field) = multipart_payload.try_next().await? { - let field_name = field.name(); - if field_name != "blob_data" { - warn!( - field_name, - "Malfolmed request: 'blob_data' multipart field expected." - ); - return Err(ErrorBadRequest("Bad request")); - } - - while let Some(chunk) = field.try_next().await? { - let mut chunk = chunk.to_vec(); - s3_chunk.append(&mut chunk); - - // New parts should be added to AWS only if they exceed minimum part size, - // Otherwise AWS returns error - if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { - trace!( - chunk_size = s3_chunk.len(), - "Chunk size exceeded, adding new S3 part" - ); - upload_session - .add_part(s3_chunk.take_out()) - .await - .map_err(handle_s3_error)?; - } - } - } - - // add the remaining data as the last S3 part - if !s3_chunk.is_empty() { - upload_session - .add_part(s3_chunk) - .await - .map_err(handle_s3_error)?; - } - - Ok(()) -} - #[instrument(skip_all, name = "upload_blob", fields(blob_hash))] pub async fn upload_blob_handler( - ctx: web::Data, + service: web::Data, mut payload: actix_multipart::Multipart, ) -> actix_web::Result { info!("Upload blob request"); @@ -276,39 +180,26 @@ debug!("Received blob_hash: {}", &blob_hash); tracing::Span::current().record("blob_hash", &blob_hash); - if ctx - .db - .find_blob_item(&blob_hash) - .await - .map_err(handle_db_error)? - .is_some() - { - warn!("Blob with given hash already exists"); - return Err(ErrorConflict("Conflict")); - } - - let blob_item = BlobItem::new(blob_hash); - let mut upload_session = ctx - .s3 - .start_upload_session(&blob_item.s3_path) - .await - .map_err(handle_s3_error)?; - trace!("Receiving blob data"); - process_blob_data(&mut payload, &mut upload_session).await?; - - upload_session - .finish_upload() - .await - .map_err(handle_s3_error)?; + let stream: AsyncStream, BoxedError>, _> = try_stream! { + while let Some(mut field) = payload.try_next().await.map_err(Box::new)? { + let field_name = field.name(); + if field_name != "blob_data" { + warn!( + field_name, + "Malfolmed request: 'blob_data' multipart field expected." + ); + Err(ErrorBadRequest("Bad request"))?; + } - trace!("Upload finished, saving blob item to DB: {:?}", &blob_item); - ctx - .db - .put_blob_item(blob_item) - .await - .map_err(handle_db_error)?; + while let Some(chunk) = field.try_next().await.map_err(Box::new)? { + yield chunk.to_vec(); + } + } + trace!("Stream done"); + }; + service.put_blob(blob_hash, stream).await?; Ok(HttpResponse::NoContent().finish()) } @@ -318,62 +209,16 @@ blob_hash: String, } -#[instrument(name = "remove_holder", skip(ctx))] +#[instrument(name = "remove_holder", skip(service))] pub async fn remove_holder_handler( - ctx: web::Data, + service: web::Data, payload: web::Json, ) -> actix_web::Result { - info!("Remove holder request"); + info!("Revoke holder request"); let RemoveHolderPayload { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); - let reverse_index_item = ctx - .db - .find_reverse_index_by_holder(&holder) - .await - .map_err(handle_db_error)? - .ok_or_else(|| { - debug!("Blob not found"); - ErrorNotFound("Blob not found") - })?; - let blob_hash = &reverse_index_item.blob_hash; - - ctx - .db - .remove_reverse_index_item(&holder) - .await - .map_err(handle_db_error)?; - - // TODO: handle cleanup here properly - // for now the object's being removed right away - // after the last holder was removed - if ctx - .db - .find_reverse_index_by_hash(blob_hash) - .await - .map_err(handle_db_error)? - .is_empty() - { - let s3_path = ctx - .find_s3_path_by_reverse_index(&reverse_index_item) - .await?; - debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path); - - ctx - .s3 - .delete_object(&s3_path) - .await - .map_err(handle_s3_error)?; - - ctx - .db - .remove_blob_item(blob_hash) - .await - .map_err(handle_db_error)?; - } else { - debug!("Blob still has holders, S3 object not deleted"); - } - + service.revoke_holder(blob_hash, holder).await?; Ok(HttpResponse::NoContent().finish()) }