Page MenuHomePhabricator

D7483.diff
No OneTemporary

D7483.diff

diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs
--- a/services/blob/src/http/handlers/blob.rs
+++ b/services/blob/src/http/handlers/blob.rs
@@ -1,14 +1,18 @@
-use crate::constants::BLOB_DOWNLOAD_CHUNK_SIZE;
-use crate::database::ReverseIndexItem;
+use crate::constants::{
+ BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
+};
+use crate::database::{BlobItem, ReverseIndexItem};
+use crate::tools::MemOps;
use super::{handle_db_error, AppContext};
use actix_web::error::{
- ErrorConflict, ErrorInternalServerError, ErrorNotFound,
+ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound,
};
use actix_web::{web, Error as HttpError, HttpResponse};
use anyhow::Result;
use async_stream::{try_stream, AsyncStream};
use serde::{Deserialize, Serialize};
+use tokio_stream::StreamExt;
use tracing::{debug, error, info, instrument, trace, warn};
use tracing_futures::Instrument;
@@ -114,6 +118,125 @@
Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists })))
}
+async fn get_blob_hash_field(
+ multipart_payload: &mut actix_multipart::Multipart,
+) -> Result<String, HttpError> {
+ let Some(mut field) = multipart_payload.try_next().await? else {
+ debug!("Malfolmed multipart request");
+ return Err(ErrorBadRequest("Bad request"));
+ };
+
+ if field.name() != "blob_hash" {
+ warn!("Blob hash is required as a first form field");
+ return Err(ErrorBadRequest("Bad request"));
+ }
+
+ let mut buf = Vec::new();
+ while let Some(chunk) = field.try_next().await? {
+ buf.extend_from_slice(&chunk);
+ }
+
+ let blob_hash = String::from_utf8(buf)
+ .map_err(|_| ErrorInternalServerError("Internal error"))?;
+ return Ok(blob_hash);
+}
+
+async fn process_blob_data(
+ multipart_payload: &mut actix_multipart::Multipart,
+ upload_session: &mut crate::s3::MultiPartUploadSession,
+) -> Result<(), HttpError> {
+ let mut s3_chunk: Vec<u8> = Vec::new();
+ while let Some(mut field) = multipart_payload.try_next().await? {
+ let field_name = field.name();
+ if field_name != "blob_data" {
+ warn!(
+ field_name,
+ "Malfolmed request: 'blob_data' multipart field expected."
+ );
+ return Err(ErrorBadRequest("Bad request"));
+ }
+
+ while let Some(chunk) = field.try_next().await? {
+ let mut chunk = chunk.to_vec();
+ s3_chunk.append(&mut chunk);
+
+ // New parts should be added to AWS only if they exceed minimum part size,
+ // Otherwise AWS returns error
+ if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
+ trace!(
+ chunk_size = s3_chunk.len(),
+ "Chunk size exceeded, adding new S3 part"
+ );
+ if let Err(err) = upload_session.add_part(s3_chunk.take_out()).await {
+ error!("Failed to upload S3 part: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+ }
+ }
+ }
+
+ // add the remaining data as the last S3 part
+ if !s3_chunk.is_empty() {
+ if let Err(err) = upload_session.add_part(s3_chunk).await {
+ error!("Failed to upload final part: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+ }
+
+ Ok(())
+}
+
+#[instrument(skip_all, name = "upload_blob", fields(blob_hash))]
+pub async fn upload_blob_handler(
+ ctx: web::Data<AppContext>,
+ mut payload: actix_multipart::Multipart,
+) -> actix_web::Result<HttpResponse> {
+ info!("Upload blob request");
+
+ let blob_hash = get_blob_hash_field(&mut payload).await?;
+ debug!("Received blob_hash: {}", &blob_hash);
+ tracing::Span::current().record("blob_hash", &blob_hash);
+
+ if ctx
+ .db
+ .find_blob_item(&blob_hash)
+ .await
+ .map_err(handle_db_error)?
+ .is_some()
+ {
+ warn!("Blob with given hash already exists");
+ return Err(ErrorConflict("Conflict"));
+ }
+
+ let blob_item = BlobItem::new(blob_hash);
+ let mut upload_session = ctx
+ .s3
+ .start_upload_session(&blob_item.s3_path)
+ .await
+ .map_err(|err| {
+ error!("Failed to start S3 upload session: {:?}", err);
+ ErrorInternalServerError("Internal error")
+ })?;
+
+ trace!("Receiving blob data");
+ process_blob_data(&mut payload, &mut upload_session).await?;
+
+ // TODO: Handle "No parts to upload" as HTTP 400
+ if let Err(err) = upload_session.finish_upload().await {
+ error!("Failed to finish upload session: {:?}", err);
+ return Err(ErrorInternalServerError("Internal error"));
+ }
+
+ trace!("Upload finished, saving blob item to DB: {:?}", &blob_item);
+ ctx
+ .db
+ .put_blob_item(blob_item)
+ .await
+ .map_err(handle_db_error)?;
+
+ Ok(HttpResponse::NoContent().finish())
+}
+
#[instrument(
name = "delete_blob",
skip_all,
diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs
--- a/services/blob/src/http/mod.rs
+++ b/services/blob/src/http/mod.rs
@@ -40,6 +40,7 @@
)
.service(
web::resource("/blob")
+ .route(web::put().to(handlers::blob::upload_blob_handler))
.route(web::post().to(handlers::blob::assign_holder_handler)),
)
})
diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs
--- a/services/blob/src/s3.rs
+++ b/services/blob/src/s3.rs
@@ -1,4 +1,4 @@
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, ensure, Result};
use aws_sdk_s3::{
model::{CompletedMultipartUpload, CompletedPart},
output::CreateMultipartUploadOutput,
@@ -197,6 +197,12 @@
/// finishes the upload
pub async fn finish_upload(&self) -> Result<()> {
+ // TODO: handle this as S3-specific error
+ ensure!(
+ !self.upload_parts.is_empty(),
+ "There are no parts to upload"
+ );
+
let completed_multipart_upload = CompletedMultipartUpload::builder()
.set_parts(Some(self.upload_parts.clone()))
.build();

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 9, 4:46 AM (5 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2827695
Default Alt Text
D7483.diff (5 KB)

Event Timeline