diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -43,3 +43,4 @@ // S3 constants pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob"; +pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,17 +1,18 @@ use anyhow::{Context, Result}; use blob::blob_service_server::BlobService; +use chrono::Utc; use std::{pin::Pin, sync::Arc}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; use crate::{ constants::{ - GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, - MPSC_CHANNEL_BUFFER_CAPACITY, + BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, + MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, - database::{BlobItem, DatabaseClient, ReverseIndexItem}, - s3::S3Path, + database::{self, BlobItem, DatabaseClient, ReverseIndexItem}, + s3::{MultiPartUploadSession, S3Path}, }; pub mod blob { @@ -64,9 +65,44 @@ Pin> + Send>>; async fn put( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let mut in_stream = request.into_inner(); + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let db = self.db.clone(); + let s3 = self.s3.clone(); + tokio::spawn(async move { + let mut put_handler = PutHandler::default(); + + while let Some(message) = in_stream.next().await { + let response = match message { + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::Holder(new_holder)), + }) => put_handler.handle_holder(&db, new_holder).await, + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::BlobHash(new_hash)), + }) => put_handler.handle_blob_hash(&db, new_hash).await, + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::DataChunk(new_data)), + }) => put_handler.handle_data_chunk(&s3, new_data).await, + _ => Err(Status::unknown("unknown error")), + }; + if let Err(e) = tx.send(response).await { + println!("Response was dropped: {}", e); + break; + } + if put_handler.should_close_stream { + break; + } + } + + if let Err(status) = put_handler.finish(&db).await { + let _ = tx.send(Err(status)).await; + } + }); + + let out_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) } type GetStream = @@ -188,3 +224,197 @@ Ok(Response::new(())) } } + +type PutResult = Result; + +enum PutAction { + None, + AssignHolder, + UploadNewBlob(BlobItem), +} + +/// A helper for handling Put RPC requests +struct PutHandler { + /// Should the stream be closed by server + pub should_close_stream: bool, + action: PutAction, + + holder: Option, + blob_hash: Option, + current_chunk: Vec, + + uploader: Option, +} + +impl Default for PutHandler { + fn default() -> Self { + PutHandler { + should_close_stream: false, + action: PutAction::None, + holder: None, + blob_hash: None, + current_chunk: Vec::new(), + uploader: None, + } + } +} + +impl PutHandler { + pub async fn handle_holder( + &mut self, + db: &DatabaseClient, + new_holder: String, + ) -> PutResult { + if self.holder.is_some() { + return Err(Status::invalid_argument("Holder already provided")); + } + self.holder = Some(new_holder); + return self.determine_action(db).await; + } + + pub async fn handle_blob_hash( + &mut self, + db: &DatabaseClient, + new_hash: String, + ) -> PutResult { + if self.blob_hash.is_some() { + return Err(Status::invalid_argument("Blob hash already provided")); + } + self.blob_hash = Some(new_hash); + return self.determine_action(db).await; + } + + /// private helper function to determine purpose of this RPC call + async fn determine_action(&mut self, db: &DatabaseClient) -> PutResult { + // this should be called only if action isn't determined yet + // this error should actually never happen + if !matches!(self.action, PutAction::None) { + return Err(Status::failed_precondition("Put action is already started")); + } + + // holder and hash need both to be set in order to continue + // otherwise we send a standard response + if self.holder.is_none() || self.blob_hash.is_none() { + return Ok(blob::PutResponse { data_exists: false }); + } + + let blob_hash = self.blob_hash.as_ref().unwrap(); + return match db.find_blob_item(blob_hash).await { + // Hash already exists, so we're only assigning a new holder to it + Ok(Some(_)) => { + self.action = PutAction::AssignHolder; + self.should_close_stream = true; + Ok(blob::PutResponse { data_exists: true }) + } + // Hash doesn't exist, so we're starting a new upload session + Ok(None) => { + self.action = PutAction::UploadNewBlob(BlobItem { + blob_hash: blob_hash.to_string(), + s3_path: S3Path { + bucket_name: BLOB_S3_BUCKET_NAME.to_string(), + object_name: blob_hash.to_string(), + }, + created: Utc::now(), + }); + Ok(blob::PutResponse { data_exists: false }) + } + Err(_db_err) => { + self.should_close_stream = true; + Err(Status::aborted("Internal error")) + } + }; + } + + pub async fn handle_data_chunk( + &mut self, + s3: &Arc, + mut new_data: Vec, + ) -> PutResult { + let blob_item = match &self.action { + PutAction::UploadNewBlob(blob_item) => blob_item, + _ => { + self.should_close_stream = true; + return Err(Status::invalid_argument( + "Holder and hash should be provided before data", + )); + } + }; + + // create upload session if it doesn't already exist + if self.uploader.is_none() { + self.uploader = + match MultiPartUploadSession::start(s3, &blob_item.s3_path).await { + Ok(session) => Some(session), + Err(_) => { + self.should_close_stream = true; + return Err(Status::aborted("Internal error")); + } + } + } + let uploader = self.uploader.as_mut().unwrap(); + + // New parts should be added to AWS only if they exceed minimum part size, + // Otherwise AWS returns error + self.current_chunk.append(&mut new_data); + if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + { + if uploader.add_part(self.current_chunk.clone()).await.is_err() { + self.should_close_stream = true; + return Err(Status::aborted("Internal error")); + } + self.current_chunk.clear(); + } + + Ok(blob::PutResponse { data_exists: false }) + } + + /// This consumes `self` so this put handler instance cannot be used after this is called. + /// It should be called after the input stream is done + pub async fn finish(self, db: &DatabaseClient) -> Result<(), Status> { + let blob_item = match self.action { + PutAction::None => return Ok(()), + PutAction::AssignHolder => { + return assign_holder_to_blob( + db, + self.holder.unwrap(), + self.blob_hash.unwrap(), + ) + .await; + } + PutAction::UploadNewBlob(blob_item) => blob_item, + }; + + let mut uploader = + self.uploader.ok_or(Status::aborted("Internal error"))?; + + if !self.current_chunk.is_empty() { + if uploader.add_part(self.current_chunk).await.is_err() { + return Err(Status::aborted("Internal error")); + } + } + + if uploader.finish_upload().await.is_err() { + return Err(Status::aborted("Internal error")); + } + + if db.put_blob_item(blob_item).await.is_err() { + return Err(Status::aborted("Internal error")); + } + + assign_holder_to_blob(db, self.holder.unwrap(), self.blob_hash.unwrap()) + .await + } +} + +async fn assign_holder_to_blob( + db: &DatabaseClient, + holder: String, + blob_hash: String, +) -> Result<(), Status> { + let reverse_index_item = database::ReverseIndexItem { holder, blob_hash }; + + if db.put_reverse_index_item(reverse_index_item).await.is_err() { + return Err(Status::aborted("Internal error")); + } + Ok(()) +}