Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32602484
D8778.1767435818.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
2 KB
Referenced Files
None
Subscribers
None
D8778.1767435818.diff
View Options
diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs
--- a/services/comm-services-lib/src/blob/client.rs
+++ b/services/comm-services-lib/src/blob/client.rs
@@ -1,7 +1,8 @@
-#![allow(unused)]
-
use derive_more::{Display, Error, From};
-use tracing::debug;
+use futures_core::Stream;
+use futures_util::StreamExt;
+use reqwest::Url;
+use tracing::{debug, trace, warn};
// publicly re-export some reqwest types
pub use reqwest::Error as ReqwestError;
@@ -57,4 +58,86 @@
blob_service_url,
}
}
+
+ /// Downloads blob with given [`blob_hash`].
+ ///
+ /// @returns a stream of blob bytes
+ ///
+ /// # Errors thrown
+ /// - [BlobServiceError::NotFound] if blob with given hash does not exist
+ /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format
+ ///
+ /// # Example
+ /// ```rust
+ /// let client =
+ /// BlobServiceClient::new("http://localhost:50053".parse()?);
+ ///
+ /// let mut stream = client.get("hello").await?;
+ /// while let Some(data) = stream.try_next().await? {
+ /// println!("Got data: {:?}", data);
+ /// }
+ /// ```
+ pub async fn get(
+ &self,
+ blob_hash: &str,
+ ) -> BlobResult<impl Stream<Item = BlobResult<Vec<u8>>>> {
+ debug!(?blob_hash, "Get blob request");
+ let url = self.get_blob_url(Some(blob_hash))?;
+
+ let response = self
+ .http_client
+ .get(url)
+ .send()
+ .await
+ .map_err(BlobServiceError::ClientError)?;
+
+ debug!("Response status: {}", response.status());
+ if response.status().is_success() {
+ let stream = response.bytes_stream().map(|result| match result {
+ Ok(bytes) => Ok(bytes.into()),
+ Err(error) => {
+ warn!("Error while streaming response: {}", error);
+ Err(BlobServiceError::ClientError(error))
+ }
+ });
+ return Ok(stream);
+ }
+
+ let error = handle_http_error(response.status());
+ if let Ok(message) = response.text().await {
+ trace!("Error response message: {}", message);
+ }
+ Err(error)
+ }
}
+
+// private helper methods
+impl BlobServiceClient {
+ fn get_blob_url(
+ &self,
+ blob_hash: Option<&str>,
+ ) -> Result<Url, BlobServiceError> {
+ let path = match blob_hash {
+ Some(hash) => format!("/blob/{}", hash),
+ None => "/blob".to_string(),
+ };
+ let url = self
+ .blob_service_url
+ .join(&path)
+ .map_err(|err| BlobServiceError::URLError(err.to_string()))?;
+ trace!("Constructed request URL: {}", url);
+ Ok(url)
+ }
+}
+
+fn handle_http_error(status_code: StatusCode) -> BlobServiceError {
+ match status_code {
+ StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments,
+ StatusCode::NOT_FOUND => BlobServiceError::NotFound,
+ StatusCode::CONFLICT => BlobServiceError::AlreadyExists,
+ code if code.is_server_error() => BlobServiceError::ServerError,
+ code => BlobServiceError::UnexpectedHttpStatus(code),
+ }
+}
+
+type BlobResult<T> = Result<T, BlobServiceError>;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jan 3, 10:23 AM (6 h, 31 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5887786
Default Alt Text
D8778.1767435818.diff (2 KB)
Attached To
Mode
D8778: [services-lib] Add method to download blobs
Attached
Detach File
Event Timeline
Log In to Comment