Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3249867
D5703.id18815.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Referenced Files
None
Subscribers
None
D5703.id18815.diff
View Options
diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -43,3 +43,4 @@
// S3 constants
pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob";
+pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -1,17 +1,18 @@
use anyhow::{Context, Result};
use blob::blob_service_server::BlobService;
+use chrono::Utc;
use std::{pin::Pin, sync::Arc};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status};
use crate::{
constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY,
+ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
+ MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
- database::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::S3Path,
+ database::{self, BlobItem, DatabaseClient, ReverseIndexItem},
+ s3::{MultiPartUploadSession, S3Path},
};
pub mod blob {
@@ -64,9 +65,44 @@
Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
async fn put(
&self,
- _request: Request<tonic::Streaming<blob::PutRequest>>,
+ request: Request<tonic::Streaming<blob::PutRequest>>,
) -> Result<Response<Self::PutStream>, Status> {
- Err(Status::unimplemented("Not implemented yet"))
+ let mut in_stream = request.into_inner();
+ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let db = self.db.clone();
+ let s3 = self.s3.clone();
+ tokio::spawn(async move {
+ let mut put_handler = PutHandler::default();
+
+ while let Some(message) = in_stream.next().await {
+ let response = match message {
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::Holder(new_holder)),
+ }) => put_handler.handle_holder(&db, new_holder).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::BlobHash(new_hash)),
+ }) => put_handler.handle_blob_hash(&db, new_hash).await,
+ Ok(blob::PutRequest {
+ data: Some(blob::put_request::Data::DataChunk(new_data)),
+ }) => put_handler.handle_data_chunk(&s3, new_data).await,
+ _ => Err(Status::unknown("unknown error")),
+ };
+ if let Err(e) = tx.send(response).await {
+ println!("Response was dropped: {}", e);
+ break;
+ }
+ if put_handler.should_close_stream {
+ break;
+ }
+ }
+
+ if let Err(status) = put_handler.finish(&db).await {
+ let _ = tx.send(Err(status)).await;
+ }
+ });
+
+ let out_stream = ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
}
type GetStream =
@@ -188,3 +224,197 @@
Ok(Response::new(()))
}
}
+
+type PutResult = Result<blob::PutResponse, Status>;
+
+enum PutAction {
+ None,
+ AssignHolder,
+ UploadNewBlob(BlobItem),
+}
+
+/// A helper for handling Put RPC requests
+struct PutHandler {
+ /// Should the stream be closed by server
+ pub should_close_stream: bool,
+ action: PutAction,
+
+ holder: Option<String>,
+ blob_hash: Option<String>,
+ current_chunk: Vec<u8>,
+
+ uploader: Option<MultiPartUploadSession>,
+}
+
+impl Default for PutHandler {
+ fn default() -> Self {
+ PutHandler {
+ should_close_stream: false,
+ action: PutAction::None,
+ holder: None,
+ blob_hash: None,
+ current_chunk: Vec::new(),
+ uploader: None,
+ }
+ }
+}
+
+impl PutHandler {
+ pub async fn handle_holder(
+ &mut self,
+ db: &DatabaseClient,
+ new_holder: String,
+ ) -> PutResult {
+ if self.holder.is_some() {
+ return Err(Status::invalid_argument("Holder already provided"));
+ }
+ self.holder = Some(new_holder);
+ return self.determine_action(db).await;
+ }
+
+ pub async fn handle_blob_hash(
+ &mut self,
+ db: &DatabaseClient,
+ new_hash: String,
+ ) -> PutResult {
+ if self.blob_hash.is_some() {
+ return Err(Status::invalid_argument("Blob hash already provided"));
+ }
+ self.blob_hash = Some(new_hash);
+ return self.determine_action(db).await;
+ }
+
+ /// private helper function to determine purpose of this RPC call
+ async fn determine_action(&mut self, db: &DatabaseClient) -> PutResult {
+ // this should be called only if action isn't determined yet
+ // this error should actually never happen
+ if !matches!(self.action, PutAction::None) {
+ return Err(Status::failed_precondition("Put action is already started"));
+ }
+
+ // holder and hash need both to be set in order to continue
+ // otherwise we send a standard response
+ if self.holder.is_none() || self.blob_hash.is_none() {
+ return Ok(blob::PutResponse { data_exists: false });
+ }
+
+ let blob_hash = self.blob_hash.as_ref().unwrap();
+ return match db.find_blob_item(blob_hash).await {
+ // Hash already exists, so we're only assigning a new holder to it
+ Ok(Some(_)) => {
+ self.action = PutAction::AssignHolder;
+ self.should_close_stream = true;
+ Ok(blob::PutResponse { data_exists: true })
+ }
+ // Hash doesn't exist, so we're starting a new upload session
+ Ok(None) => {
+ self.action = PutAction::UploadNewBlob(BlobItem {
+ blob_hash: blob_hash.to_string(),
+ s3_path: S3Path {
+ bucket_name: BLOB_S3_BUCKET_NAME.to_string(),
+ object_name: blob_hash.to_string(),
+ },
+ created: Utc::now(),
+ });
+ Ok(blob::PutResponse { data_exists: false })
+ }
+ Err(_db_err) => {
+ self.should_close_stream = true;
+ Err(Status::aborted("Internal error"))
+ }
+ };
+ }
+
+ pub async fn handle_data_chunk(
+ &mut self,
+ s3: &Arc<aws_sdk_s3::Client>,
+ mut new_data: Vec<u8>,
+ ) -> PutResult {
+ let blob_item = match &self.action {
+ PutAction::UploadNewBlob(blob_item) => blob_item,
+ _ => {
+ self.should_close_stream = true;
+ return Err(Status::invalid_argument(
+ "Holder and hash should be provided before data",
+ ));
+ }
+ };
+
+ // create upload session if it doesn't already exist
+ if self.uploader.is_none() {
+ self.uploader =
+ match MultiPartUploadSession::start(s3, &blob_item.s3_path).await {
+ Ok(session) => Some(session),
+ Err(_) => {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+ }
+ let uploader = self.uploader.as_mut().unwrap();
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ self.current_chunk.append(&mut new_data);
+ if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
+ {
+ if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ self.current_chunk.clear();
+ }
+
+ Ok(blob::PutResponse { data_exists: false })
+ }
+
+ /// This consumes `self` so this put handler instance cannot be used after this is called.
+ /// It should be called after the input stream is done
+ pub async fn finish(self, db: &DatabaseClient) -> Result<(), Status> {
+ let blob_item = match self.action {
+ PutAction::None => return Ok(()),
+ PutAction::AssignHolder => {
+ return assign_holder_to_blob(
+ db,
+ self.holder.unwrap(),
+ self.blob_hash.unwrap(),
+ )
+ .await;
+ }
+ PutAction::UploadNewBlob(blob_item) => blob_item,
+ };
+
+ let mut uploader =
+ self.uploader.ok_or(Status::aborted("Internal error"))?;
+
+ if !self.current_chunk.is_empty() {
+ if uploader.add_part(self.current_chunk).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+
+ if uploader.finish_upload().await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if db.put_blob_item(blob_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ assign_holder_to_blob(db, self.holder.unwrap(), self.blob_hash.unwrap())
+ .await
+ }
+}
+
+async fn assign_holder_to_blob(
+ db: &DatabaseClient,
+ holder: String,
+ blob_hash: String,
+) -> Result<(), Status> {
+ let reverse_index_item = database::ReverseIndexItem { holder, blob_hash };
+
+ if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+ Ok(())
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 16, 4:38 PM (22 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2498391
Default Alt Text
D5703.id18815.diff (8 KB)
Attached To
Mode
D5703: [services][blob] Upload 1/2 - Introduce Put handlers
Attached
Detach File
Event Timeline
Log In to Comment