diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -14,7 +14,7 @@ }; use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; -use crate::tools::{BoxedError, MemOps}; +use crate::tools::{BoxedError, ByteStream, MemOps}; use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient}; #[derive( @@ -112,6 +112,60 @@ }; Ok(session) } + + pub async fn put_blob( + &self, + blob_hash: impl Into, + mut blob_data_stream: impl ByteStream, + ) -> Result<(), BlobServiceError> { + let blob_hash: String = blob_hash.into(); + let blob_item = BlobItemInput::new(&blob_hash); + + if self.db.get_blob_item(blob_hash).await?.is_some() { + debug!("Blob already exists"); + return Err(BlobServiceError::BlobAlreadyExists); + } + + let mut upload_session = + self.s3.start_upload_session(&blob_item.s3_path).await?; + trace!(?blob_item, "Started S3 upload session"); + + tokio::pin!(blob_data_stream); + let mut s3_chunk: Vec = Vec::new(); + while let Some(mut chunk) = + blob_data_stream.try_next().await.map_err(|err| { + warn!("Failed to get data chunk: {:?}", err); + BlobServiceError::InputError(err) + })? + { + s3_chunk.append(&mut chunk); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { + trace!( + chunk_size = s3_chunk.len(), + "Chunk size exceeded, adding new S3 part" + ); + upload_session + .add_part(s3_chunk.take_out()) + .await + .map_err(BlobServiceError::from)?; + } + } + trace!("Upload stream drained"); + // add the remaining data as the last S3 part + if !s3_chunk.is_empty() { + trace!("Uploading remaining {} bytes", s3_chunk.len()); + upload_session.add_part(s3_chunk).await?; + } + // Complete the upload session + upload_session.finish_upload().await?; + + trace!("S3 upload complete, putting item to db"); + self.db.put_blob_item(blob_item).await?; + Ok(()) + } } pub struct BlobDownloadObject { diff --git a/services/blob/src/tools.rs b/services/blob/src/tools.rs --- a/services/blob/src/tools.rs +++ b/services/blob/src/tools.rs @@ -1,4 +1,5 @@ use std::{env, error::Error as StdError}; +use tonic::codegen::futures_core::Stream; use crate::constants; @@ -13,6 +14,16 @@ } pub type BoxedError = Box; +// Trait type aliases aren't supported in Rust, but +// we can workaround this by creating an empty trait +// that extends the traits we want to alias. +#[rustfmt::skip] +pub trait ByteStream: + Stream, BoxedError>> {} +#[rustfmt::skip] +impl ByteStream for T where + T: Stream, BoxedError>> {} + pub trait MemOps { fn take_out(&mut self) -> Self; }