Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3509368
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
index 15a7cb66c..91bac5978 100644
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -1,143 +1,198 @@
use derive_more::{Display, Error, From};
use futures_core::Stream;
-use futures_util::StreamExt;
-use reqwest::Url;
+use futures_util::{StreamExt, TryStreamExt};
+use reqwest::{
+ multipart::{Form, Part},
+ Body, Url,
+};
use tracing::{debug, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
pub use reqwest::StatusCode;
#[derive(From, Error, Debug, Display)]
pub enum BlobServiceError {
/// HTTP Client errors, this includes:
/// - connection failures
/// - request errors (e.g. upload input stream)
#[display(...)]
ClientError(ReqwestError),
/// Invalid Blob service URL provided
#[display(...)]
URLError(#[error(ignore)] String),
/// Blob service returned HTTP 404
/// - blob or holder not found
#[display(...)]
NotFound,
/// Blob service returned HTTP 409
/// - blob or holder already exists
#[display(...)]
AlreadyExists,
/// Blob service returned HTTP 400
/// - invalid holder or blob_hash format
#[display(...)]
InvalidArguments,
/// Blob service returned HTTP 50x
#[display(...)]
ServerError,
#[display(...)]
UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode),
}
/// A client interface to Blob service.
//
/// The `BlobServiceClient` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it, by **cloning**.
///
/// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it,
/// because it already uses an `Arc` internally.
#[derive(Clone)]
pub struct BlobServiceClient {
http_client: reqwest::Client,
blob_service_url: reqwest::Url,
}
impl BlobServiceClient {
pub fn new(blob_service_url: reqwest::Url) -> Self {
debug!("Creating BlobServiceClient. URL: {}", blob_service_url);
Self {
http_client: reqwest::Client::new(),
blob_service_url,
}
}
/// Downloads blob with given [`blob_hash`].
///
/// @returns a stream of blob bytes
///
/// # Errors thrown
/// - [BlobServiceError::NotFound] if blob with given hash does not exist
/// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format
///
/// # Example
/// ```rust
/// let client =
/// BlobServiceClient::new("http://localhost:50053".parse()?);
///
/// let mut stream = client.get("hello").await?;
/// while let Some(data) = stream.try_next().await? {
/// println!("Got data: {:?}", data);
/// }
/// ```
pub async fn get(
&self,
blob_hash: &str,
) -> BlobResult<impl Stream<Item = BlobResult<Vec<u8>>>> {
debug!(?blob_hash, "Get blob request");
let url = self.get_blob_url(Some(blob_hash))?;
let response = self
.http_client
.get(url)
.send()
.await
.map_err(BlobServiceError::ClientError)?;
debug!("Response status: {}", response.status());
if response.status().is_success() {
let stream = response.bytes_stream().map(|result| match result {
Ok(bytes) => Ok(bytes.into()),
Err(error) => {
warn!("Error while streaming response: {}", error);
Err(BlobServiceError::ClientError(error))
}
});
return Ok(stream);
}
let error = handle_http_error(response.status());
if let Ok(message) = response.text().await {
trace!("Error response message: {}", message);
}
Err(error)
}
+
+ /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash
+ /// already exists.
+ ///
+ /// # Example
+ /// ```rust
+ /// use std::io::{Error, ErrorKind};
+ ///
+ /// let client =
+ /// BlobServiceClient::new("http://localhost:50053".parse()?);
+ ///
+ /// let stream = async_stream::stream! {
+ /// yield Ok(vec![1, 2, 3]);
+ /// yield Ok(vec![4, 5, 6]);
+ /// yield Err(Error::new(ErrorKind::Other, "Oops"));
+ /// };
+ /// client.upload_blob(&blob_hash, stream).await?;
+ /// ```
+ pub async fn upload_blob<H, S>(
+ &self,
+ blob_hash: H,
+ data_stream: S,
+ ) -> BlobResult<()>
+ where
+ H: Into<String>,
+ S: futures_core::stream::TryStream + Send + Sync + 'static,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ Vec<u8>: From<S::Ok>,
+ {
+ debug!("Upload blob request");
+ let url = self.get_blob_url(None)?;
+
+ let stream = data_stream.map_ok(Vec::from);
+ let streaming_body = Body::wrap_stream(stream);
+ let form = Form::new()
+ .text("blob_hash", blob_hash.into())
+ .part("blob_data", Part::stream(streaming_body));
+
+ let response = self.http_client.put(url).multipart(form).send().await?;
+ debug!("Response status: {}", response.status());
+
+ if response.status().is_success() {
+ trace!("Blob upload successful");
+ return Ok(());
+ }
+
+ let error = handle_http_error(response.status());
+ if let Ok(message) = response.text().await {
+ trace!("Error response message: {}", message);
+ }
+ Err(error)
+ }
}
// private helper methods
impl BlobServiceClient {
fn get_blob_url(
&self,
blob_hash: Option<&str>,
) -> Result<Url, BlobServiceError> {
let path = match blob_hash {
Some(hash) => format!("/blob/{}", hash),
None => "/blob".to_string(),
};
let url = self
.blob_service_url
.join(&path)
.map_err(|err| BlobServiceError::URLError(err.to_string()))?;
trace!("Constructed request URL: {}", url);
Ok(url)
}
}
fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
match status_code {
StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments,
StatusCode::NOT_FOUND => BlobServiceError::NotFound,
StatusCode::CONFLICT => BlobServiceError::AlreadyExists,
code if code.is_server_error() => BlobServiceError::ServerError,
code => BlobServiceError::UnexpectedHttpStatus(code),
}
}
type BlobResult<T> = Result<T, BlobServiceError>;
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Dec 23, 3:54 AM (15 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2690321
Default Alt Text
(6 KB)
Attached To
Mode
rCOMM Comm
Attached
Detach File
Event Timeline
Log In to Comment