diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -43,3 +43,4 @@ // S3 constants pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob"; +pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -9,7 +9,7 @@ use crate::{ constants::{ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, - MPSC_CHANNEL_BUFFER_CAPACITY, + MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, database::{BlobItem, DatabaseClient, ReverseIndexItem}, s3::{MultiPartUploadSession, S3Path}, @@ -328,13 +328,97 @@ &mut self, mut new_data: Vec, ) -> PutResult { - unimplemented!() + let blob_item = match &self.action { + Some(PutAction::UploadNewBlob(blob_item)) => blob_item, + _ => { + self.should_close_stream = true; + return Err(Status::invalid_argument( + "Holder and hash should be provided before data", + )); + } + }; + + // create upload session if it doesn't already exist + if self.uploader.is_none() { + self.uploader = + match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await + { + Ok(session) => Some(session), + Err(_) => { + self.should_close_stream = true; + return Err(Status::aborted("Internal error")); + } + } + } + let uploader = self.uploader.as_mut().unwrap(); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + self.current_chunk.append(&mut new_data); + if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + { + if uploader.add_part(self.current_chunk.clone()).await.is_err() { + self.should_close_stream = true; + return Err(Status::aborted("Internal error")); + } + self.current_chunk.clear(); + } + + Ok(blob::PutResponse { data_exists: false }) } /// This function should be called after the input stream is finished. /// This consumes `self` so this put handler instance cannot be used /// after this is called. pub async fn finish(self) -> Result<(), Status> { - unimplemented!() + if self.action.is_none() { + return Ok(()); + } + let holder = self + .holder + .ok_or_else(|| Status::aborted("Internal error"))?; + let blob_hash = self + .blob_hash + .ok_or_else(|| Status::aborted("Internal error"))?; + let blob_item = match self.action { + None => return Ok(()), + Some(PutAction::AssignHolder) => { + return assign_holder_to_blob(&self.db, holder, blob_hash).await; + } + Some(PutAction::UploadNewBlob(blob_item)) => blob_item, + }; + + let mut uploader = self + .uploader + .ok_or_else(|| Status::aborted("Internal error"))?; + + if !self.current_chunk.is_empty() + && uploader.add_part(self.current_chunk).await.is_err() + { + return Err(Status::aborted("Internal error")); + } + + if uploader.finish_upload().await.is_err() { + return Err(Status::aborted("Internal error")); + } + + if self.db.put_blob_item(blob_item).await.is_err() { + return Err(Status::aborted("Internal error")); + } + + assign_holder_to_blob(&self.db, holder, blob_hash).await + } +} + +async fn assign_holder_to_blob( + db: &DatabaseClient, + holder: String, + blob_hash: String, +) -> Result<(), Status> { + let reverse_index_item = ReverseIndexItem { holder, blob_hash }; + + if db.put_reverse_index_item(reverse_index_item).await.is_err() { + return Err(Status::aborted("Internal error")); } + Ok(()) }