Page MenuHomePhabricator

No OneTemporary

diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
index 15a7cb66c..91bac5978 100644
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -1,143 +1,198 @@
use derive_more::{Display, Error, From};
use futures_core::Stream;
-use futures_util::StreamExt;
-use reqwest::Url;
+use futures_util::{StreamExt, TryStreamExt};
+use reqwest::{
+ multipart::{Form, Part},
+ Body, Url,
+};
use tracing::{debug, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
pub use reqwest::StatusCode;
#[derive(From, Error, Debug, Display)]
pub enum BlobServiceError {
/// HTTP Client errors, this includes:
/// - connection failures
/// - request errors (e.g. upload input stream)
#[display(...)]
ClientError(ReqwestError),
/// Invalid Blob service URL provided
#[display(...)]
URLError(#[error(ignore)] String),
/// Blob service returned HTTP 404
/// - blob or holder not found
#[display(...)]
NotFound,
/// Blob service returned HTTP 409
/// - blob or holder already exists
#[display(...)]
AlreadyExists,
/// Blob service returned HTTP 400
/// - invalid holder or blob_hash format
#[display(...)]
InvalidArguments,
/// Blob service returned HTTP 50x
#[display(...)]
ServerError,
#[display(...)]
UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode),
}
/// A client interface to Blob service.
//
/// The `BlobServiceClient` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it, by **cloning**.
///
/// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it,
/// because it already uses an `Arc` internally.
#[derive(Clone)]
pub struct BlobServiceClient {
http_client: reqwest::Client,
blob_service_url: reqwest::Url,
}
impl BlobServiceClient {
pub fn new(blob_service_url: reqwest::Url) -> Self {
debug!("Creating BlobServiceClient. URL: {}", blob_service_url);
Self {
http_client: reqwest::Client::new(),
blob_service_url,
}
}
/// Downloads blob with given [`blob_hash`].
///
/// @returns a stream of blob bytes
///
/// # Errors thrown
/// - [BlobServiceError::NotFound] if blob with given hash does not exist
/// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format
///
/// # Example
/// ```rust
/// let client =
/// BlobServiceClient::new("http://localhost:50053".parse()?);
///
/// let mut stream = client.get("hello").await?;
/// while let Some(data) = stream.try_next().await? {
/// println!("Got data: {:?}", data);
/// }
/// ```
pub async fn get(
&self,
blob_hash: &str,
) -> BlobResult<impl Stream<Item = BlobResult<Vec<u8>>>> {
debug!(?blob_hash, "Get blob request");
let url = self.get_blob_url(Some(blob_hash))?;
let response = self
.http_client
.get(url)
.send()
.await
.map_err(BlobServiceError::ClientError)?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
let stream = response.bytes_stream().map(|result| match result {
Ok(bytes) => Ok(bytes.into()),
Err(error) => {
warn!("Error while streaming response: {}", error);
Err(BlobServiceError::ClientError(error))
}
});
return Ok(stream);
}
let error = handle_http_error(response.status());
if let Ok(message) = response.text().await {
trace!("Error response message: {}", message);
}
Err(error)
}
+
+ /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash
+ /// already exists.
+ ///
+ /// # Example
+ /// ```rust
+ /// use std::io::{Error, ErrorKind};
+ ///
+ /// let client =
+ /// BlobServiceClient::new("http://localhost:50053".parse()?);
+ ///
+ /// let stream = async_stream::stream! {
+ /// yield Ok(vec![1, 2, 3]);
+ /// yield Ok(vec![4, 5, 6]);
+ /// yield Err(Error::new(ErrorKind::Other, "Oops"));
+ /// };
+ /// client.upload_blob(&blob_hash, stream).await?;
+ /// ```
+ pub async fn upload_blob<H, S>(
+ &self,
+ blob_hash: H,
+ data_stream: S,
+ ) -> BlobResult<()>
+ where
+ H: Into<String>,
+ S: futures_core::stream::TryStream + Send + Sync + 'static,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ Vec<u8>: From<S::Ok>,
+ {
+ debug!("Upload blob request");
+ let url = self.get_blob_url(None)?;
+
+ let stream = data_stream.map_ok(Vec::from);
+ let streaming_body = Body::wrap_stream(stream);
+ let form = Form::new()
+ .text("blob_hash", blob_hash.into())
+ .part("blob_data", Part::stream(streaming_body));
+
+ let response = self.http_client.put(url).multipart(form).send().await?;
+ debug!("Response status: {}", response.status());
+
+ if response.status().is_success() {
+ trace!("Blob upload successful");
+ return Ok(());
+ }
+
+ let error = handle_http_error(response.status());
+ if let Ok(message) = response.text().await {
+ trace!("Error response message: {}", message);
+ }
+ Err(error)
+ }
}
// private helper methods
impl BlobServiceClient {
fn get_blob_url(
&self,
blob_hash: Option<&str>,
) -> Result<Url, BlobServiceError> {
let path = match blob_hash {
Some(hash) => format!("/blob/{}", hash),
None => "/blob".to_string(),
};
let url = self
.blob_service_url
.join(&path)
.map_err(|err| BlobServiceError::URLError(err.to_string()))?;
trace!("Constructed request URL: {}", url);
Ok(url)
}
}
fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
match status_code {
StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments,
StatusCode::NOT_FOUND => BlobServiceError::NotFound,
StatusCode::CONFLICT => BlobServiceError::AlreadyExists,
code if code.is_server_error() => BlobServiceError::ServerError,
code => BlobServiceError::UnexpectedHttpStatus(code),
}
}
type BlobResult<T> = Result<T, BlobServiceError>;

File Metadata

Mime Type
text/x-diff
Expires
Mon, Dec 23, 3:54 AM (15 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2690321
Default Alt Text
(6 KB)

Event Timeline