diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -1,7 +1,8 @@ -#![allow(unused)] - use derive_more::{Display, Error, From}; -use tracing::debug; +use futures_core::Stream; +use futures_util::StreamExt; +use reqwest::Url; +use tracing::{debug, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; @@ -57,4 +58,86 @@ blob_service_url, } } + + /// Downloads blob with given [`blob_hash`]. + /// + /// @returns a stream of blob bytes + /// + /// # Errors thrown + /// - [BlobServiceError::NotFound] if blob with given hash does not exist + /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format + /// + /// # Example + /// ```rust + /// let client = + /// BlobServiceClient::new("http://localhost:50053".parse()?); + /// + /// let mut stream = client.get("hello").await?; + /// while let Some(data) = stream.try_next().await? { + /// println!("Got data: {:?}", data); + /// } + /// ``` + pub async fn get( + &self, + blob_hash: &str, + ) -> BlobResult>>> { + debug!(?blob_hash, "Get blob request"); + let url = self.get_blob_url(Some(blob_hash))?; + + let response = self + .http_client + .get(url) + .send() + .await + .map_err(BlobServiceError::ClientError)?; + + debug!("Response status: {}", response.status()); + if response.status().is_success() { + let stream = response.bytes_stream().map(|result| match result { + Ok(bytes) => Ok(bytes.into()), + Err(error) => { + warn!("Error while streaming response: {}", error); + Err(BlobServiceError::ClientError(error)) + } + }); + return Ok(stream); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } } + +// private helper methods +impl BlobServiceClient { + fn get_blob_url( + &self, + blob_hash: Option<&str>, + ) -> Result { + let path = match blob_hash { + Some(hash) => format!("/blob/{}", hash), + None => "/blob".to_string(), + }; + let url = self + .blob_service_url + .join(&path) + .map_err(|err| BlobServiceError::URLError(err.to_string()))?; + trace!("Constructed request URL: {}", url); + Ok(url) + } +} + +fn handle_http_error(status_code: StatusCode) -> BlobServiceError { + match status_code { + StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, + StatusCode::NOT_FOUND => BlobServiceError::NotFound, + StatusCode::CONFLICT => BlobServiceError::AlreadyExists, + code if code.is_server_error() => BlobServiceError::ServerError, + code => BlobServiceError::UnexpectedHttpStatus(code), + } +} + +type BlobResult = Result;