Page MenuHomePhabricator

D5728.diff
No OneTemporary

D5728.diff

diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -43,3 +43,4 @@
// S3 constants
pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob";
+pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -9,7 +9,7 @@
use crate::{
constants::{
BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY,
+ MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
s3::{MultiPartUploadSession, S3Path},
@@ -328,13 +328,97 @@
&mut self,
mut new_data: Vec<u8>,
) -> PutResult {
- unimplemented!()
+ let blob_item = match &self.action {
+ Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
+ _ => {
+ self.should_close_stream = true;
+ return Err(Status::invalid_argument(
+ "Holder and hash should be provided before data",
+ ));
+ }
+ };
+
+ // create upload session if it doesn't already exist
+ if self.uploader.is_none() {
+ self.uploader =
+ match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
+ {
+ Ok(session) => Some(session),
+ Err(_) => {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+ }
+ let uploader = self.uploader.as_mut().unwrap();
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ self.current_chunk.append(&mut new_data);
+ if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
+ {
+ if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ self.current_chunk.clear();
+ }
+
+ Ok(blob::PutResponse { data_exists: false })
}
/// This function should be called after the input stream is finished.
/// This consumes `self` so this put handler instance cannot be used
/// after this is called.
pub async fn finish(self) -> Result<(), Status> {
- unimplemented!()
+ if self.action.is_none() {
+ return Ok(());
+ }
+ let holder = self
+ .holder
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+ let blob_hash = self
+ .blob_hash
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+ let blob_item = match self.action {
+ None => return Ok(()),
+ Some(PutAction::AssignHolder) => {
+ return assign_holder_to_blob(&self.db, holder, blob_hash).await;
+ }
+ Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
+ };
+
+ let mut uploader = self
+ .uploader
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+
+ if !self.current_chunk.is_empty()
+ && uploader.add_part(self.current_chunk).await.is_err()
+ {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if uploader.finish_upload().await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if self.db.put_blob_item(blob_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ assign_holder_to_blob(&self.db, holder, blob_hash).await
+ }
+}
+
+async fn assign_holder_to_blob(
+ db: &DatabaseClient,
+ holder: String,
+ blob_hash: String,
+) -> Result<(), Status> {
+ let reverse_index_item = ReverseIndexItem { holder, blob_hash };
+
+ if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
}
+ Ok(())
}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 10:45 AM (16 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687451
Default Alt Text
D5728.diff (3 KB)

Event Timeline