Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3366058
D8458.id28997.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Referenced Files
None
Subscribers
None
D8458.id28997.diff
View Options
diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs
--- a/services/blob/src/http/handlers/blob.rs
+++ b/services/blob/src/http/handlers/blob.rs
@@ -1,9 +1,10 @@
-use crate::constants::{
- BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
-};
+#![allow(unused)]
+
+use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE;
use crate::database::old::{BlobItem, ReverseIndexItem};
-use crate::http::context::handle_s3_error;
-use crate::tools::MemOps;
+use crate::http::context::{handle_blob_service_error, handle_s3_error};
+use crate::service::BlobService;
+use crate::tools::BoxedError;
use crate::validate_identifier;
use super::{handle_db_error, AppContext};
@@ -19,7 +20,7 @@
use async_stream::{try_stream, AsyncStream};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
-use tracing::{debug, error, info, instrument, trace, warn};
+use tracing::{debug, info, instrument, trace, warn};
use tracing_futures::Instrument;
/// Returns a tuple of first and last byte number (inclusive) represented by given range header.
@@ -69,7 +70,7 @@
fields(blob_hash = %params.as_ref().as_str(), s3_path))
]
pub async fn get_blob_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
params: web::Path<String>,
range_header: Option<web::Header<Range>>,
) -> actix_web::Result<HttpResponse> {
@@ -77,52 +78,25 @@
let blob_hash = params.into_inner();
validate_identifier!(blob_hash);
- let s3_path = match ctx.db.find_blob_item(&blob_hash).await {
- Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => {
- debug!("Blob with given hash not found in database");
- Err(ErrorNotFound("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }?;
- tracing::Span::current().record("s3_path", s3_path.to_full_path());
-
- let object_metadata = ctx
- .s3
- .get_object_metadata(&s3_path)
- .await
- .map_err(handle_s3_error)?;
- let file_size: u64 =
- object_metadata.content_length().try_into().map_err(|err| {
- error!("Failed to get S3 object content length: {:?}", err);
- ErrorInternalServerError("server error")
- })?;
-
- // Stream the data in chunks to avoid loading the whole file into memory.
- let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE;
-
- let s3 = ctx.s3.clone();
+ trace!("Initializing download session");
+ let mut download = service.create_download(blob_hash).await?;
+ let total_size = download.blob_size;
let (range_start, range_end): (u64, u64) =
- parse_range_header(&range_header, file_size)?;
-
- let stream: AsyncStream<Result<web::Bytes, HttpError>, _> = try_stream! {
- debug!(?range_start, ?range_end, "Getting range of data");
- let mut offset: u64 = range_start;
-
- while offset < range_end {
- let next_size = std::cmp::min(chunk_size, range_end - offset + 1);
- let range = offset..(offset + next_size);
- trace!(?range, "Getting {} bytes of data", next_size);
-
- let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?;
- yield web::Bytes::from(data);
-
- offset += chunk_size;
+ parse_range_header(&range_header, total_size)?;
+ download.set_byte_range(range_start..=range_end);
+ let content_length = download.download_size();
+
+ let stream = download
+ .into_stream()
+ .map(|data| match data {
+ Ok(bytes) => Ok(web::Bytes::from(bytes)),
+ Err(err) => {
+ warn!("Error during download stream: {:?}", err);
+ Err(handle_blob_service_error(&err))
}
- };
-
- let content_length = (range_end - range_start + 1).to_string();
+ })
+ .in_current_span();
if range_header.is_some() {
return Ok(
@@ -131,9 +105,9 @@
.append_header(("Content-Length", content_length))
.append_header((
"Content-Range",
- format!("bytes {}-{}/{}", range_start, range_end, file_size),
+ format!("bytes {}-{}/{}", range_start, range_end, total_size),
))
- .streaming(Box::pin(stream.in_current_span())),
+ .streaming(Box::pin(stream)),
);
}
@@ -141,7 +115,7 @@
HttpResponse::Ok()
.content_type("application/octet-stream")
.append_header(("Content-Length", content_length))
- .streaming(Box::pin(stream.in_current_span())),
+ .streaming(Box::pin(stream)),
)
}
@@ -156,9 +130,9 @@
data_exists: bool,
}
-#[instrument(name = "assign_holder", skip(ctx))]
+#[instrument(name = "assign_holder", skip(service))]
pub async fn assign_holder_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
payload: web::Json<AssignHolderPayload>,
) -> actix_web::Result<HttpResponse> {
info!("Assign holder request");
@@ -166,32 +140,7 @@
validate_identifier!(holder);
validate_identifier!(blob_hash);
- if ctx
- .db
- .find_reverse_index_by_holder(&holder)
- .await
- .map_err(handle_db_error)?
- .is_some()
- {
- warn!("holder already assigned");
- return Err(ErrorConflict("holder already assigned"));
- }
-
- let data_exists = ctx
- .db
- .find_blob_item(&blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_some();
- debug!(data_exists, "Checked blob item existence");
-
- let reverse_index_item = ReverseIndexItem { holder, blob_hash };
- ctx
- .db
- .put_reverse_index_item(reverse_index_item)
- .await
- .map_err(handle_db_error)?;
-
+ let data_exists = service.assign_holder(blob_hash, holder).await?;
Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists })))
}
@@ -220,54 +169,9 @@
return Ok(blob_hash);
}
-async fn process_blob_data(
- multipart_payload: &mut actix_multipart::Multipart,
- upload_session: &mut crate::s3::MultiPartUploadSession,
-) -> Result<(), HttpError> {
- let mut s3_chunk: Vec<u8> = Vec::new();
- while let Some(mut field) = multipart_payload.try_next().await? {
- let field_name = field.name();
- if field_name != "blob_data" {
- warn!(
- field_name,
- "Malfolmed request: 'blob_data' multipart field expected."
- );
- return Err(ErrorBadRequest("Bad request"));
- }
-
- while let Some(chunk) = field.try_next().await? {
- let mut chunk = chunk.to_vec();
- s3_chunk.append(&mut chunk);
-
- // New parts should be added to AWS only if they exceed minimum part size,
- // Otherwise AWS returns error
- if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE {
- trace!(
- chunk_size = s3_chunk.len(),
- "Chunk size exceeded, adding new S3 part"
- );
- upload_session
- .add_part(s3_chunk.take_out())
- .await
- .map_err(handle_s3_error)?;
- }
- }
- }
-
- // add the remaining data as the last S3 part
- if !s3_chunk.is_empty() {
- upload_session
- .add_part(s3_chunk)
- .await
- .map_err(handle_s3_error)?;
- }
-
- Ok(())
-}
-
#[instrument(skip_all, name = "upload_blob", fields(blob_hash))]
pub async fn upload_blob_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
mut payload: actix_multipart::Multipart,
) -> actix_web::Result<HttpResponse> {
info!("Upload blob request");
@@ -276,39 +180,26 @@
debug!("Received blob_hash: {}", &blob_hash);
tracing::Span::current().record("blob_hash", &blob_hash);
- if ctx
- .db
- .find_blob_item(&blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_some()
- {
- warn!("Blob with given hash already exists");
- return Err(ErrorConflict("Conflict"));
- }
-
- let blob_item = BlobItem::new(blob_hash);
- let mut upload_session = ctx
- .s3
- .start_upload_session(&blob_item.s3_path)
- .await
- .map_err(handle_s3_error)?;
-
trace!("Receiving blob data");
- process_blob_data(&mut payload, &mut upload_session).await?;
-
- upload_session
- .finish_upload()
- .await
- .map_err(handle_s3_error)?;
+ let stream: AsyncStream<Result<Vec<u8>, BoxedError>, _> = try_stream! {
+ while let Some(mut field) = payload.try_next().await.map_err(Box::new)? {
+ let field_name = field.name();
+ if field_name != "blob_data" {
+ warn!(
+ field_name,
+ "Malfolmed request: 'blob_data' multipart field expected."
+ );
+ Err(ErrorBadRequest("Bad request"))?;
+ }
- trace!("Upload finished, saving blob item to DB: {:?}", &blob_item);
- ctx
- .db
- .put_blob_item(blob_item)
- .await
- .map_err(handle_db_error)?;
+ while let Some(chunk) = field.try_next().await.map_err(Box::new)? {
+ yield chunk.to_vec();
+ }
+ }
+ trace!("Stream done");
+ };
+ service.put_blob(blob_hash, stream).await?;
Ok(HttpResponse::NoContent().finish())
}
@@ -318,62 +209,16 @@
blob_hash: String,
}
-#[instrument(name = "remove_holder", skip(ctx))]
+#[instrument(name = "remove_holder", skip(service))]
pub async fn remove_holder_handler(
- ctx: web::Data<AppContext>,
+ service: web::Data<BlobService>,
payload: web::Json<RemoveHolderPayload>,
) -> actix_web::Result<HttpResponse> {
- info!("Remove holder request");
+ info!("Revoke holder request");
let RemoveHolderPayload { holder, blob_hash } = payload.into_inner();
validate_identifier!(holder);
validate_identifier!(blob_hash);
- let reverse_index_item = ctx
- .db
- .find_reverse_index_by_holder(&holder)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Blob not found");
- ErrorNotFound("Blob not found")
- })?;
- let blob_hash = &reverse_index_item.blob_hash;
-
- ctx
- .db
- .remove_reverse_index_item(&holder)
- .await
- .map_err(handle_db_error)?;
-
- // TODO: handle cleanup here properly
- // for now the object's being removed right away
- // after the last holder was removed
- if ctx
- .db
- .find_reverse_index_by_hash(blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_empty()
- {
- let s3_path = ctx
- .find_s3_path_by_reverse_index(&reverse_index_item)
- .await?;
- debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path);
-
- ctx
- .s3
- .delete_object(&s3_path)
- .await
- .map_err(handle_s3_error)?;
-
- ctx
- .db
- .remove_blob_item(blob_hash)
- .await
- .map_err(handle_db_error)?;
- } else {
- debug!("Blob still has holders, S3 object not deleted");
- }
-
+ service.revoke_holder(blob_hash, holder).await?;
Ok(HttpResponse::NoContent().finish())
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 26, 9:20 AM (20 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2584129
Default Alt Text
D8458.id28997.diff (10 KB)
Attached To
Mode
D8458: [blob-service] Call service methods from handlers
Attached
Detach File
Event Timeline
Log In to Comment