diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index 95f4a7a36..de2dda3f1 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,175 +1,298 @@ -use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE; -use crate::database::ReverseIndexItem; +use crate::constants::{ + BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, +}; +use crate::database::{BlobItem, ReverseIndexItem}; +use crate::tools::MemOps; use super::{handle_db_error, AppContext}; use actix_web::error::{ - ErrorConflict, ErrorInternalServerError, ErrorNotFound, + ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, }; use actix_web::{web, Error as HttpError, HttpResponse}; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; +use tokio_stream::StreamExt; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; #[instrument( name = "get_blob", skip_all, fields(holder = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( ctx: web::Data, params: web::Path, ) -> actix_web::Result { info!("Get blob request"); let holder = params.into_inner(); let s3_path = ctx.find_s3_path_by_holder(&holder).await?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = ctx.s3.get_object_metadata(&s3_path).await.map_err(|err| { error!("Failed to get S3 object metadata: {:?}", err); ErrorInternalServerError("server error") })?; let file_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to get S3 object content length: {:?}", err); ErrorInternalServerError("server error") })?; // Stream the data in chunks to avoid loading the whole file into memory. let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; let s3 = ctx.s3.clone(); let stream: AsyncStream, _> = try_stream! { let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); let data = s3.get_object_bytes(&s3_path, range).await.map_err(|err| { error!("Failed to download data chunk: {:?}", err); ErrorInternalServerError("download failed") })?; yield web::Bytes::from(data); offset += chunk_size; } }; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(Box::pin(stream.in_current_span())), ) } #[derive(Deserialize, Debug)] pub struct AssignHolderPayload { holder: String, blob_hash: String, } #[derive(Serialize)] struct AssignHolderResponnse { data_exists: bool, } #[instrument(name = "assign_holder", skip(ctx))] pub async fn assign_holder_handler( ctx: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Assign holder request"); let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); if ctx .db .find_reverse_index_by_holder(&holder) .await .map_err(handle_db_error)? .is_some() { warn!("holder already assigned"); return Err(ErrorConflict("holder already assigned")); } let data_exists = ctx .db .find_blob_item(&blob_hash) .await .map_err(handle_db_error)? .is_some(); debug!(data_exists, "Checked blob item existence"); let reverse_index_item = ReverseIndexItem { holder, blob_hash }; ctx .db .put_reverse_index_item(reverse_index_item) .await .map_err(handle_db_error)?; Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } +async fn get_blob_hash_field( + multipart_payload: &mut actix_multipart::Multipart, +) -> Result { + let Some(mut field) = multipart_payload.try_next().await? else { + debug!("Malfolmed multipart request"); + return Err(ErrorBadRequest("Bad request")); + }; + + if field.name() != "blob_hash" { + warn!("Blob hash is required as a first form field"); + return Err(ErrorBadRequest("Bad request")); + } + + let mut buf = Vec::new(); + while let Some(chunk) = field.try_next().await? { + buf.extend_from_slice(&chunk); + } + + let blob_hash = String::from_utf8(buf) + .map_err(|_| ErrorInternalServerError("Internal error"))?; + return Ok(blob_hash); +} + +async fn process_blob_data( + multipart_payload: &mut actix_multipart::Multipart, + upload_session: &mut crate::s3::MultiPartUploadSession, +) -> Result<(), HttpError> { + let mut s3_chunk: Vec = Vec::new(); + while let Some(mut field) = multipart_payload.try_next().await? { + let field_name = field.name(); + if field_name != "blob_data" { + warn!( + field_name, + "Malfolmed request: 'blob_data' multipart field expected." + ); + return Err(ErrorBadRequest("Bad request")); + } + + while let Some(chunk) = field.try_next().await? { + let mut chunk = chunk.to_vec(); + s3_chunk.append(&mut chunk); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { + trace!( + chunk_size = s3_chunk.len(), + "Chunk size exceeded, adding new S3 part" + ); + if let Err(err) = upload_session.add_part(s3_chunk.take_out()).await { + error!("Failed to upload S3 part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + } + } + + // add the remaining data as the last S3 part + if !s3_chunk.is_empty() { + if let Err(err) = upload_session.add_part(s3_chunk).await { + error!("Failed to upload final part: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + } + + Ok(()) +} + +#[instrument(skip_all, name = "upload_blob", fields(blob_hash))] +pub async fn upload_blob_handler( + ctx: web::Data, + mut payload: actix_multipart::Multipart, +) -> actix_web::Result { + info!("Upload blob request"); + + let blob_hash = get_blob_hash_field(&mut payload).await?; + debug!("Received blob_hash: {}", &blob_hash); + tracing::Span::current().record("blob_hash", &blob_hash); + + if ctx + .db + .find_blob_item(&blob_hash) + .await + .map_err(handle_db_error)? + .is_some() + { + warn!("Blob with given hash already exists"); + return Err(ErrorConflict("Conflict")); + } + + let blob_item = BlobItem::new(blob_hash); + let mut upload_session = ctx + .s3 + .start_upload_session(&blob_item.s3_path) + .await + .map_err(|err| { + error!("Failed to start S3 upload session: {:?}", err); + ErrorInternalServerError("Internal error") + })?; + + trace!("Receiving blob data"); + process_blob_data(&mut payload, &mut upload_session).await?; + + // TODO: Handle "No parts to upload" as HTTP 400 + if let Err(err) = upload_session.finish_upload().await { + error!("Failed to finish upload session: {:?}", err); + return Err(ErrorInternalServerError("Internal error")); + } + + trace!("Upload finished, saving blob item to DB: {:?}", &blob_item); + ctx + .db + .put_blob_item(blob_item) + .await + .map_err(handle_db_error)?; + + Ok(HttpResponse::NoContent().finish()) +} + #[instrument( name = "delete_blob", skip_all, fields(holder = %params.as_ref().as_str())) ] pub async fn delete_blob_handler( ctx: web::Data, params: web::Path, ) -> actix_web::Result { info!("Delete blob request"); let holder = params.into_inner(); let reverse_index_item = ctx .db .find_reverse_index_by_holder(&holder) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); ErrorNotFound("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; ctx .db .remove_reverse_index_item(&holder) .await .map_err(handle_db_error)?; // TODO: handle cleanup here properly // for now the object's being removed right away // after the last holder was removed if ctx .db .find_reverse_index_by_hash(blob_hash) .await .map_err(handle_db_error)? .is_empty() { let s3_path = ctx .find_s3_path_by_reverse_index(&reverse_index_item) .await?; debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path); ctx.s3.delete_object(&s3_path).await.map_err(|e| { error!("Failed to delete S3 object: {:?}", e); ErrorInternalServerError("server error") })?; ctx .db .remove_blob_item(blob_hash) .await .map_err(handle_db_error)?; } else { debug!("Blob still has holders, S3 object not deleted"); } Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index 19cd45ec8..701384d01 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,51 +1,52 @@ use crate::config::CONFIG; use crate::database::DatabaseClient; use crate::s3::S3Client; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod context; use context::AppContext; mod handlers { pub(super) mod blob; // convenience exports to be used in handlers use super::context::{handle_db_error, AppContext}; } pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { // context that is passed to every handler let ctx = AppContext { db: db_client.to_owned(), s3: s3_client.to_owned(), }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .app_data(web::Data::new(ctx)) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)) .route(web::delete().to(handlers::blob::delete_blob_handler)), ) .service( web::resource("/blob") + .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs index 04d576de1..60b5803ea 100644 --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,247 +1,253 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, ensure, Result}; use aws_sdk_s3::{ model::{CompletedMultipartUpload, CompletedPart}, output::CreateMultipartUploadOutput, types::ByteStream, }; use std::{ ops::{Bound, RangeBounds}, sync::Arc, }; /// A helper structure representing an S3 object path #[derive(Clone, Debug)] pub struct S3Path { pub bucket_name: String, pub object_name: String, } impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { return Err(anyhow!( "S3 path [{}] should contain the '/' separator", full_path )); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() .ok_or(anyhow!("Expected bucket name in path [{}]", full_path))? .to_string(), object_name: split .next() .ok_or(anyhow!("Expected object name in path [{}]", full_path))? .to_string(), }) } /// Retrieves full S3 path string in the following format: `[bucket_name]/[object_name]` pub fn to_full_path(&self) -> String { format!("{}/{}", self.bucket_name, self.object_name) } } #[derive(Clone)] pub struct S3Client { client: Arc, } impl S3Client { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { let s3_config = aws_sdk_s3::config::Builder::from(aws_config) // localstack doesn't support virtual addressing .force_path_style(crate::config::CONFIG.is_sandbox) .build(); S3Client { client: Arc::new(aws_sdk_s3::Client::from_conf(s3_config)), } } /// Creates a new [`MultiPartUploadSession`] pub async fn start_upload_session( &self, s3_path: &S3Path, ) -> Result { MultiPartUploadSession::start(&self.client, s3_path).await } /// Returns object metadata (e.g. file size) without downloading the object itself pub async fn get_object_metadata( &self, s3_path: &S3Path, ) -> Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() .await?; Ok(response) } /// Downloads object and retrieves data bytes within provided range /// /// * `range` - Range of object bytes to download. pub async fn get_object_bytes( &self, s3_path: &S3Path, range: impl RangeBounds, ) -> Result> { let mut request = self .client .get_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name); if range.start_bound() != Bound::Unbounded || range.end_bound() != Bound::Unbounded { // Create a valid HTTP Range header let from = match range.start_bound() { Bound::Included(start) => start.to_string(), _ => "0".to_string(), }; let to = match range.end_bound() { Bound::Included(end) => end.to_string(), Bound::Excluded(end) => (end - 1).to_string(), _ => "".to_string(), }; let range = format!("bytes={}-{}", from, to); request = request.range(range); } let response = request.send().await?; let data = response.body.collect().await?; Ok(data.to_vec()) } /// Deletes object at provided path pub async fn delete_object(&self, s3_path: &S3Path) -> Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await?; Ok(()) } } /// Represents a multipart upload session to the AWS S3 pub struct MultiPartUploadSession { client: Arc, bucket_name: String, object_name: String, upload_id: String, upload_parts: Vec, } impl MultiPartUploadSession { /// Starts a new upload session and returns its instance /// Don't call this directly, use [`S3Client::start_upload_session()`] instead async fn start( client: &Arc, s3_path: &S3Path, ) -> Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await?; let upload_id = multipart_upload_res .upload_id() .ok_or(anyhow!("Upload ID expected to be present"))?; Ok(MultiPartUploadSession { client: client.clone(), bucket_name: String::from(&s3_path.bucket_name), object_name: String::from(&s3_path.object_name), upload_id: String::from(upload_id), upload_parts: Vec::new(), }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self .client .upload_part() .key(&self.object_name) .bucket(&self.bucket_name) .upload_id(&self.upload_id) .part_number(part_number) .body(stream) .send() .await?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) .part_number(part_number) .build(); self.upload_parts.push(completed_part); Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> Result<()> { + // TODO: handle this as S3-specific error + ensure!( + !self.upload_parts.is_empty(), + "There are no parts to upload" + ); + let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build(); self .client .complete_multipart_upload() .bucket(&self.bucket_name) .key(&self.object_name) .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() .await?; Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_s3path_from_full_path() { let full_path = "my_bucket/some_object"; let s3_path = S3Path::from_full_path(full_path); assert!(s3_path.is_ok()); let s3_path = s3_path.unwrap(); assert_eq!(&s3_path.bucket_name, "my_bucket"); assert_eq!(&s3_path.object_name, "some_object"); } #[test] fn test_s3path_from_invalid_path() { let result = S3Path::from_full_path("invalid_path"); assert!(result.is_err()) } #[test] fn test_s3path_to_full_path() { let s3_path = S3Path { bucket_name: "my_bucket".to_string(), object_name: "some_object".to_string(), }; let full_path = s3_path.to_full_path(); assert_eq!(full_path, "my_bucket/some_object"); } }