diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,17 +1,18 @@ use anyhow::{Context, Result}; use blob::blob_service_server::BlobService; +use chrono::Utc; use std::{pin::Pin, sync::Arc}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; use crate::{ constants::{ - GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, + BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, }, database::{BlobItem, DatabaseClient, ReverseIndexItem}, - s3::S3Path, + s3::{MultiPartUploadSession, S3Path}, }; pub mod blob { @@ -64,9 +65,44 @@ Pin> + Send>>; async fn put( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let mut in_stream = request.into_inner(); + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let db = self.db.clone(); + let s3 = self.s3.clone(); + tokio::spawn(async move { + let mut put_handler = PutHandler::new(&db, &s3); + + while let Some(message) = in_stream.next().await { + let response = match message { + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::Holder(new_holder)), + }) => put_handler.handle_holder(new_holder).await, + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::BlobHash(new_hash)), + }) => put_handler.handle_blob_hash(new_hash).await, + Ok(blob::PutRequest { + data: Some(blob::put_request::Data::DataChunk(new_data)), + }) => put_handler.handle_data_chunk(new_data).await, + _ => Err(Status::unknown("unknown error")), + }; + if let Err(e) = tx.send(response).await { + println!("Response was dropped: {}", e); + break; + } + if put_handler.should_close_stream { + break; + } + } + + if let Err(status) = put_handler.finish().await { + let _ = tx.send(Err(status)).await; + } + }); + + let out_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) } type GetStream = @@ -191,3 +227,114 @@ Ok(Response::new(())) } } + +type PutResult = Result; + +enum PutAction { + AssignHolder, + UploadNewBlob(BlobItem), +} + +/// A helper for handling Put RPC requests +struct PutHandler { + /// Should the stream be closed by server + pub should_close_stream: bool, + action: Option, + + holder: Option, + blob_hash: Option, + current_chunk: Vec, + + uploader: Option, + db: DatabaseClient, + s3: Arc, +} + +impl PutHandler { + fn new(db: &DatabaseClient, s3: &Arc) -> Self { + PutHandler { + should_close_stream: false, + action: None, + holder: None, + blob_hash: None, + current_chunk: Vec::new(), + uploader: None, + db: db.clone(), + s3: s3.clone(), + } + } + + pub async fn handle_holder(&mut self, new_holder: String) -> PutResult { + if self.holder.is_some() { + return Err(Status::invalid_argument("Holder already provided")); + } + self.holder = Some(new_holder); + self.determine_action().await + } + + pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult { + if self.blob_hash.is_some() { + return Err(Status::invalid_argument("Blob hash already provided")); + } + self.blob_hash = Some(new_hash); + self.determine_action().await + } + + /// private helper function to determine purpose of this RPC call + async fn determine_action(&mut self) -> PutResult { + // this should be called only if action isn't determined yet + // this error should actually never happen + if self.action.is_some() { + return Err(Status::failed_precondition("Put action is already started")); + } + + // holder and hash need both to be set in order to continue + // otherwise we send a standard response + if self.holder.is_none() || self.blob_hash.is_none() { + return Ok(blob::PutResponse { data_exists: false }); + } + let blob_hash = self + .blob_hash + .as_ref() + .ok_or_else(|| Status::failed_precondition("Internal error"))?; + + match self.db.find_blob_item(blob_hash).await { + // Hash already exists, so we're only assigning a new holder to it + Ok(Some(_)) => { + self.action = Some(PutAction::AssignHolder); + self.should_close_stream = true; + Ok(blob::PutResponse { data_exists: true }) + } + // Hash doesn't exist, so we're starting a new upload session + Ok(None) => { + self.action = Some(PutAction::UploadNewBlob(BlobItem { + blob_hash: blob_hash.to_string(), + s3_path: S3Path { + bucket_name: BLOB_S3_BUCKET_NAME.to_string(), + object_name: blob_hash.to_string(), + }, + created: Utc::now(), + })); + Ok(blob::PutResponse { data_exists: false }) + } + Err(_db_err) => { + self.should_close_stream = true; + Err(Status::aborted("Internal error")) + } + } + } + + pub async fn handle_data_chunk( + &mut self, + mut new_data: Vec, + ) -> PutResult { + unimplemented!() + } + + /// This function should be called after the input stream is finished. + /// This consumes `self` so this put handler instance cannot be used + /// after this is called. + pub async fn finish(self) -> Result<(), Status> { + unimplemented!() + } +}