Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3367483
D8455.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Referenced Files
None
Subscribers
None
D8455.diff
View Options
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -14,7 +14,7 @@
};
use crate::database::DBError;
use crate::s3::{Error as S3Error, S3Client, S3Path};
-use crate::tools::{BoxedError, MemOps};
+use crate::tools::{BoxedError, ByteStream, MemOps};
use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient};
#[derive(
@@ -112,6 +112,60 @@
};
Ok(session)
}
+
+ pub async fn put_blob(
+ &self,
+ blob_hash: impl Into<String>,
+ mut blob_data_stream: impl ByteStream,
+ ) -> Result<(), BlobServiceError> {
+ let blob_hash: String = blob_hash.into();
+ let blob_item = BlobItemInput::new(&blob_hash);
+
+ if self.db.get_blob_item(blob_hash).await?.is_some() {
+ debug!("Blob already exists");
+ return Err(BlobServiceError::BlobAlreadyExists);
+ }
+
+ let mut upload_session =
+ self.s3.start_upload_session(&blob_item.s3_path).await?;
+ trace!(?blob_item, "Started S3 upload session");
+
+ tokio::pin!(blob_data_stream);
+ let mut s3_chunk: Vec<u8> = Vec::new();
+ while let Some(mut chunk) =
+ blob_data_stream.try_next().await.map_err(|err| {
+ warn!("Failed to get data chunk: {:?}", err);
+ BlobServiceError::InputError(err)
+ })?
+ {
+ s3_chunk.append(&mut chunk);
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
+ trace!(
+ chunk_size = s3_chunk.len(),
+ "Chunk size exceeded, adding new S3 part"
+ );
+ upload_session
+ .add_part(s3_chunk.take_out())
+ .await
+ .map_err(BlobServiceError::from)?;
+ }
+ }
+ trace!("Upload stream drained");
+ // add the remaining data as the last S3 part
+ if !s3_chunk.is_empty() {
+ trace!("Uploading remaining {} bytes", s3_chunk.len());
+ upload_session.add_part(s3_chunk).await?;
+ }
+ // Complete the upload session
+ upload_session.finish_upload().await?;
+
+ trace!("S3 upload complete, putting item to db");
+ self.db.put_blob_item(blob_item).await?;
+ Ok(())
+ }
}
pub struct BlobDownloadObject {
diff --git a/services/blob/src/tools.rs b/services/blob/src/tools.rs
--- a/services/blob/src/tools.rs
+++ b/services/blob/src/tools.rs
@@ -1,4 +1,5 @@
use std::{env, error::Error as StdError};
+use tonic::codegen::futures_core::Stream;
use crate::constants;
@@ -13,6 +14,16 @@
}
pub type BoxedError = Box<dyn StdError>;
+// Trait type aliases aren't supported in Rust, but
+// we can workaround this by creating an empty trait
+// that extends the traits we want to alias.
+#[rustfmt::skip]
+pub trait ByteStream:
+ Stream<Item = Result<Vec<u8>, BoxedError>> {}
+#[rustfmt::skip]
+impl<T> ByteStream for T where
+ T: Stream<Item = Result<Vec<u8>, BoxedError>> {}
+
pub trait MemOps {
fn take_out(&mut self) -> Self;
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 26, 2:55 PM (16 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583106
Default Alt Text
D8455.diff (2 KB)
Attached To
Mode
D8455: [blob-service] Implement upload logic with new db
Attached
Detach File
Event Timeline
Log In to Comment