Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3504838
D5728.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Referenced Files
None
Subscribers
None
D5728.diff
View Options
diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -43,3 +43,4 @@
// S3 constants
pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob";
+pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -9,7 +9,7 @@
use crate::{
constants::{
BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY,
+ MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
},
database::{BlobItem, DatabaseClient, ReverseIndexItem},
s3::{MultiPartUploadSession, S3Path},
@@ -328,13 +328,97 @@
&mut self,
mut new_data: Vec<u8>,
) -> PutResult {
- unimplemented!()
+ let blob_item = match &self.action {
+ Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
+ _ => {
+ self.should_close_stream = true;
+ return Err(Status::invalid_argument(
+ "Holder and hash should be provided before data",
+ ));
+ }
+ };
+
+ // create upload session if it doesn't already exist
+ if self.uploader.is_none() {
+ self.uploader =
+ match MultiPartUploadSession::start(&self.s3, &blob_item.s3_path).await
+ {
+ Ok(session) => Some(session),
+ Err(_) => {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ }
+ }
+ let uploader = self.uploader.as_mut().unwrap();
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ self.current_chunk.append(&mut new_data);
+ if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
+ {
+ if uploader.add_part(self.current_chunk.clone()).await.is_err() {
+ self.should_close_stream = true;
+ return Err(Status::aborted("Internal error"));
+ }
+ self.current_chunk.clear();
+ }
+
+ Ok(blob::PutResponse { data_exists: false })
}
/// This function should be called after the input stream is finished.
/// This consumes `self` so this put handler instance cannot be used
/// after this is called.
pub async fn finish(self) -> Result<(), Status> {
- unimplemented!()
+ if self.action.is_none() {
+ return Ok(());
+ }
+ let holder = self
+ .holder
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+ let blob_hash = self
+ .blob_hash
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+ let blob_item = match self.action {
+ None => return Ok(()),
+ Some(PutAction::AssignHolder) => {
+ return assign_holder_to_blob(&self.db, holder, blob_hash).await;
+ }
+ Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
+ };
+
+ let mut uploader = self
+ .uploader
+ .ok_or_else(|| Status::aborted("Internal error"))?;
+
+ if !self.current_chunk.is_empty()
+ && uploader.add_part(self.current_chunk).await.is_err()
+ {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if uploader.finish_upload().await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ if self.db.put_blob_item(blob_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
+ }
+
+ assign_holder_to_blob(&self.db, holder, blob_hash).await
+ }
+}
+
+async fn assign_holder_to_blob(
+ db: &DatabaseClient,
+ holder: String,
+ blob_hash: String,
+) -> Result<(), Status> {
+ let reverse_index_item = ReverseIndexItem { holder, blob_hash };
+
+ if db.put_reverse_index_item(reverse_index_item).await.is_err() {
+ return Err(Status::aborted("Internal error"));
}
+ Ok(())
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 10:45 AM (16 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687451
Default Alt Text
D5728.diff (3 KB)
Attached To
Mode
D5728: [services][blob] Upload 2/2 - Handle data chunks
Attached
Detach File
Event Timeline
Log In to Comment