Page MenuHomePhabricator

D5703.id18815.diff
No OneTemporary

D5703.id18815.diff

diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -43,3 +43,4 @@
// S3 constants
pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob";
+pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,17 +1,18 @@
use anyhow::{Context, Result};
use blob::blob_service_server::BlobService;
+use chrono::Utc;
use std::{pin::Pin, sync::Arc};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
use crate::{
constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY,
+ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
+ MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
- database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::S3Path,
+ database::{self, BlobItem, DatabaseClient, ReverseIndexItem},
+ s3::{MultiPartUploadSession, S3Path},
};
pub mod blob {
@@ -64,9 +65,44 @@
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
async fn put(
&self,
- _request: Request<tonic::Streaming<blob::PutRequest>>,
+ request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let s3 = self.s3.clone();
+ tokio::spawn(async move {
+ let mut put_handler = PutHandler::default();
+
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::Holder(new_holder)),
+ }) => put_handler.handle_holder(&db, new_holder).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::BlobHash(new_hash)),
+ }) => put_handler.handle_blob_hash(&db, new_hash).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::DataChunk(new_data)),
+ }) => put_handler.handle_data_chunk(&s3, new_data).await,
+ _ => Err(Status::unknown("unknown error")),
+ };
+ if let Err(e) = tx.send(response).await {
+ println!("Response was dropped: {}", e);
+ break;
+ }
+ if put_handler.should_close_stream {
+ break;
+ }
+ }
+
+ if let Err(status) = put_handler.finish(&db).await {
+ let _ = tx.send(Err(status)).await;
+ }
+ });
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
}
type GetStream =
@@ -188,3 +224,197 @@
Ok(Response::new(()))
}
}
+
+type PutResult = Result<blob::PutResponse, Status>;
+
+enum PutAction {
+ None,
+ AssignHolder,
+ UploadNewBlob(BlobItem),
+}
+
+/// A helper for handling Put RPC requests
+struct PutHandler {
+ /// Should the stream be closed by server
+ pub should_close_stream: bool,
+ action: PutAction,
+
+ holder: Option<String>,
+ blob_hash: Option<String>,
+ current_chunk: Vec<u8>,
+
+ uploader: Option<MultiPartUploadSession>,
+}
+
+impl Default for PutHandler {
+ fn default() -> Self {
+ PutHandler {
+ should_close_stream: false,
+ action: PutAction::None,
+ holder: None,
+ blob_hash: None,
+ current_chunk: Vec::new(),
+ uploader: None,
+ }
+ }
+}
+
+impl PutHandler {
+ pub async fn handle_holder(
+ &mut self,
+ db: &DatabaseClient,
+ new_holder: String,
+ ) -> PutResult {
+ if self.holder.is_some() {
+ return Err(Status::invalid_argument("Holder already provided"));
+ }
+ self.holder = Some(new_holder);
+ return self.determine_action(db).await;
+ }
+
+ pub async fn handle_blob_hash(
+ &mut self,
+ db: &DatabaseClient,
+ new_hash: String,
+ ) -> PutResult {
+ if self.blob_hash.is_some() {
+ return Err(Status::invalid_argument("Blob hash already provided"));
+ }
+ self.blob_hash = Some(new_hash);
+ return self.determine_action(db).await;
+ }
+
+ /// private helper function to determine purpose of this RPC call
+ async fn determine_action(&mut self, db: &DatabaseClient) -> PutResult {
+ // this should be called only if action isn't determined yet
+ // this error should actually never happen
+ if !matches!(self.action, PutAction::None) {
+ return Err(Status::failed_precondition("Put action is already started"));
+ }
+
+ // holder and hash need both to be set in order to continue
+ // otherwise we send a standard response
+ if self.holder.is_none() || self.blob_hash.is_none() {
+ return Ok(blob::PutResponse { data_exists: false });
+ }
+
+ let blob_hash = self.blob_hash.as_ref().unwrap();
+ return match db.find_blob_item(blob_hash).await {
+ // Hash already exists, so we're only assigning a new holder to it
+ Ok(Some(_)) => {
+ self.action = PutAction::AssignHolder;
+ self.should_close_stream = true;
+ Ok(blob::PutResponse { data_exists: true })
+ }
+ // Hash doesn't exist, so we're starting a new upload session
+ Ok(None) => {
+ self.action = PutAction::UploadNewBlob(BlobItem {
+ blob_hash: blob_hash.to_string(),
+ s3_path: S3Path {
+ bucket_name: BLOB_S3_BUCKET_NAME.to_string(),
+ object_name: blob_hash.to_string(),
+ },
+ created: Utc::now(),
+ });
+ Ok(blob::PutResponse { data_exists: false })
+ }
+ Err(_db_err) => {
+ self.should_close_stream = true;
+ Err(Status::aborted("Internal error"))
+ }
+ };
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ s3: &Arc<aws_sdk_s3::Client>,
+ mut new_data: Vec<u8>,
+ ) -> PutResult {
+ let blob_item = match &self.action {
+ PutAction::UploadNewBlob(blob_item) => blob_item,
+ _ => {
+ self.should_close_stream = true;
+ return Err(Status::invalid_argument(
+ "Holder and hash should be provided before data",
+ ));
+ }
+ };
+
+ // create upload session if it doesn't already exist
+ if self.uploader.is_none() {
+ self.uploader =
+ match MultiPartUploadSession::start(s3, &blob_item.s3_path).await {
+ Ok(session) => Some(session),
+ Err(_) => {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+ }
+ let uploader = self.uploader.as_mut().unwrap();
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ self.current_chunk.append(&mut new_data);
+ if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
+ {
+ if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ self.current_chunk.clear();
+ }
+
+ Ok(blob::PutResponse { data_exists: false })
+ }
+
+ /// This consumes `self` so this put handler instance cannot be used after this is called.
+ /// It should be called after the input stream is done
+ pub async fn finish(self, db: &DatabaseClient) -> Result<(), Status> {
+ let blob_item = match self.action {
+ PutAction::None => return Ok(()),
+ PutAction::AssignHolder => {
+ return assign_holder_to_blob(
+ db,
+ self.holder.unwrap(),
+ self.blob_hash.unwrap(),
+ )
+ .await;
+ }
+ PutAction::UploadNewBlob(blob_item) => blob_item,
+ };
+
+ let mut uploader =
+ self.uploader.ok_or(Status::aborted("Internal error"))?;
+
+ if !self.current_chunk.is_empty() {
+ if uploader.add_part(self.current_chunk).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+
+ if uploader.finish_upload().await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if db.put_blob_item(blob_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ assign_holder_to_blob(db, self.holder.unwrap(), self.blob_hash.unwrap())
+ .await
+ }
+}
+
+async fn assign_holder_to_blob(
+ db: &DatabaseClient,
+ holder: String,
+ blob_hash: String,
+) -> Result<(), Status> {
+ let reverse_index_item = database::ReverseIndexItem { holder, blob_hash };
+
+ if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+ Ok(())
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 16, 4:38 PM (22 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2498391
Default Alt Text
D5703.id18815.diff (8 KB)

Event Timeline