Page MenuHomePhabricator

D8455.diff
No OneTemporary

D8455.diff

diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -14,7 +14,7 @@
};
use crate::database::DBError;
use crate::s3::{Error as S3Error, S3Client, S3Path};
-use crate::tools::{BoxedError, MemOps};
+use crate::tools::{BoxedError, ByteStream, MemOps};
use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient};
#[derive(
@@ -112,6 +112,60 @@
};
Ok(session)
}
+
+ pub async fn put_blob(
+ &self,
+ blob_hash: impl Into<String>,
+ mut blob_data_stream: impl ByteStream,
+ ) -> Result<(), BlobServiceError> {
+ let blob_hash: String = blob_hash.into();
+ let blob_item = BlobItemInput::new(&blob_hash);
+
+ if self.db.get_blob_item(blob_hash).await?.is_some() {
+ debug!("Blob already exists");
+ return Err(BlobServiceError::BlobAlreadyExists);
+ }
+
+ let mut upload_session =
+ self.s3.start_upload_session(&blob_item.s3_path).await?;
+ trace!(?blob_item, "Started S3 upload session");
+
+ tokio::pin!(blob_data_stream);
+ let mut s3_chunk: Vec<u8> = Vec::new();
+ while let Some(mut chunk) =
+ blob_data_stream.try_next().await.map_err(|err| {
+ warn!("Failed to get data chunk: {:?}", err);
+ BlobServiceError::InputError(err)
+ })?
+ {
+ s3_chunk.append(&mut chunk);
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
+ trace!(
+ chunk_size = s3_chunk.len(),
+ "Chunk size exceeded, adding new S3 part"
+ );
+ upload_session
+ .add_part(s3_chunk.take_out())
+ .await
+ .map_err(BlobServiceError::from)?;
+ }
+ }
+ trace!("Upload stream drained");
+ // add the remaining data as the last S3 part
+ if !s3_chunk.is_empty() {
+ trace!("Uploading remaining {} bytes", s3_chunk.len());
+ upload_session.add_part(s3_chunk).await?;
+ }
+ // Complete the upload session
+ upload_session.finish_upload().await?;
+
+ trace!("S3 upload complete, putting item to db");
+ self.db.put_blob_item(blob_item).await?;
+ Ok(())
+ }
}
pub struct BlobDownloadObject {
diff --git a/services/blob/src/tools.rs b/services/blob/src/tools.rs
--- a/services/blob/src/tools.rs
+++ b/services/blob/src/tools.rs
@@ -1,4 +1,5 @@
use std::{env, error::Error as StdError};
+use tonic::codegen::futures_core::Stream;
use crate::constants;
@@ -13,6 +14,16 @@
}
pub type BoxedError = Box<dyn StdError>;
+// Trait type aliases aren't supported in Rust, but
+// we can workaround this by creating an empty trait
+// that extends the traits we want to alias.
+#[rustfmt::skip]
+pub trait ByteStream:
+ Stream<Item = Result<Vec<u8>, BoxedError>> {}
+#[rustfmt::skip]
+impl<T> ByteStream for T where
+ T: Stream<Item = Result<Vec<u8>, BoxedError>> {}
+
pub trait MemOps {
fn take_out(&mut self) -> Self;
}

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 2:55 PM (16 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583106
Default Alt Text
D8455.diff (2 KB)

Event Timeline