Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3710439
D7483.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D7483.diff
View Options
diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs
--- a/services/blob/src/http/handlers/blob.rs
+++ b/services/blob/src/http/handlers/blob.rs
@@ -1,14 +1,18 @@
-use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE;
-use crate::database::ReverseIndexItem;
+use crate::constants::{
+ BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
+};
+use crate::database::{BlobItem, ReverseIndexItem};
+use crate::tools::MemOps;
use super::{handle_db_error, AppContext};
use actix_web::error::{
- ErrorConflict, ErrorInternalServerError, ErrorNotFound,
+ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound,
};
use actix_web::{web, Error as HttpError, HttpResponse};
use anyhow::Result;
use async_stream::{try_stream, AsyncStream};
use serde::{Deserialize, Serialize};
+use tokio_stream::StreamExt;
use tracing::{debug, error, info, instrument, trace, warn};
use tracing_futures::Instrument;
@@ -114,6 +118,125 @@
Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists })))
}
+async fn get_blob_hash_field(
+ multipart_payload: &mut actix_multipart::Multipart,
+) -> Result<String, HttpError> {
+ let Some(mut field) = multipart_payload.try_next().await? else {
+ debug!("Malfolmed multipart request");
+ return Err(ErrorBadRequest("Bad request"));
+ };
+
+ if field.name() != "blob_hash" {
+ warn!("Blob hash is required as a first form field");
+ return Err(ErrorBadRequest("Bad request"));
+ }
+
+ let mut buf = Vec::new();
+ while let Some(chunk) = field.try_next().await? {
+ buf.extend_from_slice(&chunk);
+ }
+
+ let blob_hash = String::from_utf8(buf)
+ .map_err(|_| ErrorInternalServerError("Internal error"))?;
+ return Ok(blob_hash);
+}
+
+async fn process_blob_data(
+ multipart_payload: &mut actix_multipart::Multipart,
+ upload_session: &mut crate::s3::MultiPartUploadSession,
+) -> Result<(), HttpError> {
+ let mut s3_chunk: Vec<u8> = Vec::new();
+ while let Some(mut field) = multipart_payload.try_next().await? {
+ let field_name = field.name();
+ if field_name != "blob_data" {
+ warn!(
+ field_name,
+ "Malfolmed request: 'blob_data' multipart field expected."
+ );
+ return Err(ErrorBadRequest("Bad request"));
+ }
+
+ while let Some(chunk) = field.try_next().await? {
+ let mut chunk = chunk.to_vec();
+ s3_chunk.append(&mut chunk);
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
+ trace!(
+ chunk_size = s3_chunk.len(),
+ "Chunk size exceeded, adding new S3 part"
+ );
+ if let Err(err) = upload_session.add_part(s3_chunk.take_out()).await {
+ error!("Failed to upload S3 part: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+ }
+ }
+ }
+
+ // add the remaining data as the last S3 part
+ if !s3_chunk.is_empty() {
+ if let Err(err) = upload_session.add_part(s3_chunk).await {
+ error!("Failed to upload final part: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+ }
+
+ Ok(())
+}
+
+#[instrument(skip_all, name = "upload_blob", fields(blob_hash))]
+pub async fn upload_blob_handler(
+ ctx: web::Data<AppContext>,
+ mut payload: actix_multipart::Multipart,
+) -> actix_web::Result<HttpResponse> {
+ info!("Upload blob request");
+
+ let blob_hash = get_blob_hash_field(&mut payload).await?;
+ debug!("Received blob_hash: {}", &blob_hash);
+ tracing::Span::current().record("blob_hash", &blob_hash);
+
+ if ctx
+ .db
+ .find_blob_item(&blob_hash)
+ .await
+ .map_err(handle_db_error)?
+ .is_some()
+ {
+ warn!("Blob with given hash already exists");
+ return Err(ErrorConflict("Conflict"));
+ }
+
+ let blob_item = BlobItem::new(blob_hash);
+ let mut upload_session = ctx
+ .s3
+ .start_upload_session(&blob_item.s3_path)
+ .await
+ .map_err(|err| {
+ error!("Failed to start S3 upload session: {:?}", err);
+ ErrorInternalServerError("Internal error")
+ })?;
+
+ trace!("Receiving blob data");
+ process_blob_data(&mut payload, &mut upload_session).await?;
+
+ // TODO: Handle "No parts to upload" as HTTP 400
+ if let Err(err) = upload_session.finish_upload().await {
+ error!("Failed to finish upload session: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+
+ trace!("Upload finished, saving blob item to DB: {:?}", &blob_item);
+ ctx
+ .db
+ .put_blob_item(blob_item)
+ .await
+ .map_err(handle_db_error)?;
+
+ Ok(HttpResponse::NoContent().finish())
+}
+
#[instrument(
name = "delete_blob",
skip_all,
diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs
--- a/services/blob/src/http/mod.rs
+++ b/services/blob/src/http/mod.rs
@@ -40,6 +40,7 @@
)
.service(
web::resource("/blob")
+ .route(web::put().to(handlers::blob::upload_blob_handler))
.route(web::post().to(handlers::blob::assign_holder_handler)),
)
})
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -1,4 +1,4 @@
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, ensure, Result};
use aws_sdk_s3::{
model::{CompletedMultipartUpload, CompletedPart},
output::CreateMultipartUploadOutput,
@@ -197,6 +197,12 @@
/// finishes the upload
pub async fn finish_upload(&self) -> Result<()> {
+ // TODO: handle this as S3-specific error
+ ensure!(
+ !self.upload_parts.is_empty(),
+ "There are no parts to upload"
+ );
+
let completed_multipart_upload = CompletedMultipartUpload::builder()
.set_parts(Some(self.upload_parts.clone()))
.build();
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 9, 4:46 AM (5 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2827695
Default Alt Text
D7483.diff (5 KB)
Attached To
Mode
D7483: [services][blob] Add upload blob HTTP handler
Attached
Detach File
Event Timeline
Log In to Comment