Page MenuHomePhorge

D8778.1767435818.diff
No OneTemporary

Size
2 KB
Referenced Files
None
Subscribers
None

D8778.1767435818.diff

diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -1,7 +1,8 @@
-#![allow(unused)]
-
use derive_more::{Display, Error, From};
-use tracing::debug;
+use futures_core::Stream;
+use futures_util::StreamExt;
+use reqwest::Url;
+use tracing::{debug, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
@@ -57,4 +58,86 @@
blob_service_url,
}
}
+
+ /// Downloads blob with given [`blob_hash`].
+ ///
+ /// @returns a stream of blob bytes
+ ///
+ /// # Errors thrown
+ /// - [BlobServiceError::NotFound] if blob with given hash does not exist
+ /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format
+ ///
+ /// # Example
+ /// ```rust
+ /// let client =
+ /// BlobServiceClient::new("http://localhost:50053".parse()?);
+ ///
+ /// let mut stream = client.get("hello").await?;
+ /// while let Some(data) = stream.try_next().await? {
+ /// println!("Got data: {:?}", data);
+ /// }
+ /// ```
+ pub async fn get(
+ &self,
+ blob_hash: &str,
+ ) -> BlobResult<impl Stream<Item = BlobResult<Vec<u8>>>> {
+ debug!(?blob_hash, "Get blob request");
+ let url = self.get_blob_url(Some(blob_hash))?;
+
+ let response = self
+ .http_client
+ .get(url)
+ .send()
+ .await
+ .map_err(BlobServiceError::ClientError)?;
+
+ debug!("Response status: {}", response.status());
+ if response.status().is_success() {
+ let stream = response.bytes_stream().map(|result| match result {
+ Ok(bytes) => Ok(bytes.into()),
+ Err(error) => {
+ warn!("Error while streaming response: {}", error);
+ Err(BlobServiceError::ClientError(error))
+ }
+ });
+ return Ok(stream);
+ }
+
+ let error = handle_http_error(response.status());
+ if let Ok(message) = response.text().await {
+ trace!("Error response message: {}", message);
+ }
+ Err(error)
+ }
}
+
+// private helper methods
+impl BlobServiceClient {
+ fn get_blob_url(
+ &self,
+ blob_hash: Option<&str>,
+ ) -> Result<Url, BlobServiceError> {
+ let path = match blob_hash {
+ Some(hash) => format!("/blob/{}", hash),
+ None => "/blob".to_string(),
+ };
+ let url = self
+ .blob_service_url
+ .join(&path)
+ .map_err(|err| BlobServiceError::URLError(err.to_string()))?;
+ trace!("Constructed request URL: {}", url);
+ Ok(url)
+ }
+}
+
+fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
+ match status_code {
+ StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments,
+ StatusCode::NOT_FOUND => BlobServiceError::NotFound,
+ StatusCode::CONFLICT => BlobServiceError::AlreadyExists,
+ code if code.is_server_error() => BlobServiceError::ServerError,
+ code => BlobServiceError::UnexpectedHttpStatus(code),
+ }
+}
+
+type BlobResult<T> = Result<T, BlobServiceError>;

File Metadata

Mime Type
text/plain
Expires
Sat, Jan 3, 10:23 AM (6 h, 31 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5887786
Default Alt Text
D8778.1767435818.diff (2 KB)

Event Timeline