Page MenuHomePhabricator

D8458.diff
No OneTemporary

D8458.diff

diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs
--- a/services/blob/src/http/handlers/blob.rs
+++ b/services/blob/src/http/handlers/blob.rs
@@ -1,9 +1,10 @@
-use crate::constants::{
- BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
-};
+#![allow(unused)]
+
+use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE;
use crate::database::old::{BlobItem, ReverseIndexItem};
-use crate::http::context::handle_s3_error;
-use crate::tools::MemOps;
+use crate::http::context::{handle_blob_service_error, handle_s3_error};
+use crate::service::BlobService;
+use crate::tools::BoxedError;
use crate::validate_identifier;
use super::{handle_db_error, AppContext};
@@ -19,7 +20,7 @@
use async_stream::{try_stream, AsyncStream};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
-use tracing::{debug, error, info, instrument, trace, warn};
+use tracing::{debug, info, instrument, trace, warn};
use tracing_futures::Instrument;
/// Returns a tuple of first and last byte number (inclusive) represented by given range header.
@@ -69,7 +70,7 @@
fields(blob_hash = %params.as_ref().as_str(), s3_path))
]
pub async fn get_blob_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
params: web::Path<String>,
range_header: Option<web::Header<Range>>,
) -> actix_web::Result<HttpResponse> {
@@ -77,52 +78,25 @@
let blob_hash = params.into_inner();
validate_identifier!(blob_hash);
- let s3_path = match ctx.db.find_blob_item(&blob_hash).await {
- Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => {
- debug!("Blob with given hash not found in database");
- Err(ErrorNotFound("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }?;
- tracing::Span::current().record("s3_path", s3_path.to_full_path());
-
- let object_metadata = ctx
- .s3
- .get_object_metadata(&s3_path)
- .await
- .map_err(handle_s3_error)?;
- let file_size: u64 =
- object_metadata.content_length().try_into().map_err(|err| {
- error!("Failed to get S3 object content length: {:?}", err);
- ErrorInternalServerError("server error")
- })?;
-
- // Stream the data in chunks to avoid loading the whole file into memory.
- let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE;
-
- let s3 = ctx.s3.clone();
+ trace!("Initializing download session");
+ let mut download = service.create_download(blob_hash).await?;
+ let total_size = download.blob_size;
let (range_start, range_end): (u64, u64) =
- parse_range_header(&range_header, file_size)?;
-
- let stream: AsyncStream<Result<web::Bytes, HttpError>, _> = try_stream! {
- debug!(?range_start, ?range_end, "Getting range of data");
- let mut offset: u64 = range_start;
-
- while offset < range_end {
- let next_size = std::cmp::min(chunk_size, range_end - offset + 1);
- let range = offset..(offset + next_size);
- trace!(?range, "Getting {} bytes of data", next_size);
-
- let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?;
- yield web::Bytes::from(data);
-
- offset += chunk_size;
+ parse_range_header(&range_header, total_size)?;
+ download.set_byte_range(range_start..=range_end);
+ let content_length = download.download_size();
+
+ let stream = download
+ .into_stream()
+ .map(|data| match data {
+ Ok(bytes) => Ok(web::Bytes::from(bytes)),
+ Err(err) => {
+ warn!("Error during download stream: {:?}", err);
+ Err(handle_blob_service_error(&err))
}
- };
-
- let content_length = (range_end - range_start + 1).to_string();
+ })
+ .in_current_span();
if range_header.is_some() {
return Ok(
@@ -131,9 +105,9 @@
.append_header(("Content-Length", content_length))
.append_header((
"Content-Range",
- format!("bytes {}-{}/{}", range_start, range_end, file_size),
+ format!("bytes {}-{}/{}", range_start, range_end, total_size),
))
- .streaming(Box::pin(stream.in_current_span())),
+ .streaming(Box::pin(stream)),
);
}
@@ -141,7 +115,7 @@
HttpResponse::Ok()
.content_type("application/octet-stream")
.append_header(("Content-Length", content_length))
- .streaming(Box::pin(stream.in_current_span())),
+ .streaming(Box::pin(stream)),
)
}
@@ -156,9 +130,9 @@
data_exists: bool,
}
-#[instrument(name = "assign_holder", skip(ctx))]
+#[instrument(name = "assign_holder", skip(service))]
pub async fn assign_holder_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
payload: web::Json<AssignHolderPayload>,
) -> actix_web::Result<HttpResponse> {
info!("Assign holder request");
@@ -166,32 +140,7 @@
validate_identifier!(holder);
validate_identifier!(blob_hash);
- if ctx
- .db
- .find_reverse_index_by_holder(&holder)
- .await
- .map_err(handle_db_error)?
- .is_some()
- {
- warn!("holder already assigned");
- return Err(ErrorConflict("holder already assigned"));
- }
-
- let data_exists = ctx
- .db
- .find_blob_item(&blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_some();
- debug!(data_exists, "Checked blob item existence");
-
- let reverse_index_item = ReverseIndexItem { holder, blob_hash };
- ctx
- .db
- .put_reverse_index_item(reverse_index_item)
- .await
- .map_err(handle_db_error)?;
-
+ let data_exists = service.assign_holder(blob_hash, holder).await?;
Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists })))
}
@@ -220,54 +169,9 @@
return Ok(blob_hash);
}
-async fn process_blob_data(
- multipart_payload: &mut actix_multipart::Multipart,
- upload_session: &mut crate::s3::MultiPartUploadSession,
-) -> Result<(), HttpError> {
- let mut s3_chunk: Vec<u8> = Vec::new();
- while let Some(mut field) = multipart_payload.try_next().await? {
- let field_name = field.name();
- if field_name != "blob_data" {
- warn!(
- field_name,
- "Malfolmed request: 'blob_data' multipart field expected."
- );
- return Err(ErrorBadRequest("Bad request"));
- }
-
- while let Some(chunk) = field.try_next().await? {
- let mut chunk = chunk.to_vec();
- s3_chunk.append(&mut chunk);
-
- // New parts should be added to AWS only if they exceed minimum part size,
- // Otherwise AWS returns error
- if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
- trace!(
- chunk_size = s3_chunk.len(),
- "Chunk size exceeded, adding new S3 part"
- );
- upload_session
- .add_part(s3_chunk.take_out())
- .await
- .map_err(handle_s3_error)?;
- }
- }
- }
-
- // add the remaining data as the last S3 part
- if !s3_chunk.is_empty() {
- upload_session
- .add_part(s3_chunk)
- .await
- .map_err(handle_s3_error)?;
- }
-
- Ok(())
-}
-
#[instrument(skip_all, name = "upload_blob", fields(blob_hash))]
pub async fn upload_blob_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
mut payload: actix_multipart::Multipart,
) -> actix_web::Result<HttpResponse> {
info!("Upload blob request");
@@ -276,39 +180,26 @@
debug!("Received blob_hash: {}", &blob_hash);
tracing::Span::current().record("blob_hash", &blob_hash);
- if ctx
- .db
- .find_blob_item(&blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_some()
- {
- warn!("Blob with given hash already exists");
- return Err(ErrorConflict("Conflict"));
- }
-
- let blob_item = BlobItem::new(blob_hash);
- let mut upload_session = ctx
- .s3
- .start_upload_session(&blob_item.s3_path)
- .await
- .map_err(handle_s3_error)?;
-
trace!("Receiving blob data");
- process_blob_data(&mut payload, &mut upload_session).await?;
-
- upload_session
- .finish_upload()
- .await
- .map_err(handle_s3_error)?;
+ let stream: AsyncStream<Result<Vec<u8>, BoxedError>, _> = try_stream! {
+ while let Some(mut field) = payload.try_next().await.map_err(Box::new)? {
+ let field_name = field.name();
+ if field_name != "blob_data" {
+ warn!(
+ field_name,
+ "Malfolmed request: 'blob_data' multipart field expected."
+ );
+ Err(ErrorBadRequest("Bad request"))?;
+ }
- trace!("Upload finished, saving blob item to DB: {:?}", &blob_item);
- ctx
- .db
- .put_blob_item(blob_item)
- .await
- .map_err(handle_db_error)?;
+ while let Some(chunk) = field.try_next().await.map_err(Box::new)? {
+ yield chunk.to_vec();
+ }
+ }
+ trace!("Stream done");
+ };
+ service.put_blob(blob_hash, stream).await?;
Ok(HttpResponse::NoContent().finish())
}
@@ -318,62 +209,16 @@
blob_hash: String,
}
-#[instrument(name = "remove_holder", skip(ctx))]
+#[instrument(name = "remove_holder", skip(service))]
pub async fn remove_holder_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
payload: web::Json<RemoveHolderPayload>,
) -> actix_web::Result<HttpResponse> {
- info!("Remove holder request");
+ info!("Revoke holder request");
let RemoveHolderPayload { holder, blob_hash } = payload.into_inner();
validate_identifier!(holder);
validate_identifier!(blob_hash);
- let reverse_index_item = ctx
- .db
- .find_reverse_index_by_holder(&holder)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Blob not found");
- ErrorNotFound("Blob not found")
- })?;
- let blob_hash = &reverse_index_item.blob_hash;
-
- ctx
- .db
- .remove_reverse_index_item(&holder)
- .await
- .map_err(handle_db_error)?;
-
- // TODO: handle cleanup here properly
- // for now the object's being removed right away
- // after the last holder was removed
- if ctx
- .db
- .find_reverse_index_by_hash(blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_empty()
- {
- let s3_path = ctx
- .find_s3_path_by_reverse_index(&reverse_index_item)
- .await?;
- debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path);
-
- ctx
- .s3
- .delete_object(&s3_path)
- .await
- .map_err(handle_s3_error)?;
-
- ctx
- .db
- .remove_blob_item(blob_hash)
- .await
- .map_err(handle_db_error)?;
- } else {
- debug!("Blob still has holders, S3 object not deleted");
- }
-
+ service.revoke_holder(blob_hash, holder).await?;
Ok(HttpResponse::NoContent().finish())
}

File Metadata

Mime Type
text/plain
Expires
Fri, Jan 10, 4:29 AM (5 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2845090
Default Alt Text
D8458.diff (10 KB)

Event Timeline