diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs index 91bac5978..536c58119 100644 --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -1,198 +1,300 @@ use derive_more::{Display, Error, From}; use futures_core::Stream; use futures_util::{StreamExt, TryStreamExt}; use reqwest::{ multipart::{Form, Part}, Body, Url, }; use tracing::{debug, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: /// - connection failures /// - request errors (e.g. upload input stream) #[display(...)] ClientError(ReqwestError), /// Invalid Blob service URL provided #[display(...)] URLError(#[error(ignore)] String), /// Blob service returned HTTP 404 /// - blob or holder not found #[display(...)] NotFound, /// Blob service returned HTTP 409 /// - blob or holder already exists #[display(...)] AlreadyExists, /// Blob service returned HTTP 400 /// - invalid holder or blob_hash format #[display(...)] InvalidArguments, /// Blob service returned HTTP 50x #[display(...)] ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), } /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, } impl BlobServiceClient { pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, } } /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes /// /// # Errors thrown /// - [BlobServiceError::NotFound] if blob with given hash does not exist /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format /// /// # Example /// ```rust /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let mut stream = client.get("hello").await?; /// while let Some(data) = stream.try_next().await? { /// println!("Got data: {:?}", data); /// } /// ``` pub async fn get( &self, blob_hash: &str, ) -> BlobResult>>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; let response = self .http_client .get(url) .send() .await .map_err(BlobServiceError::ClientError)?; debug!("Response status: {}", response.status()); if response.status().is_success() { let stream = response.bytes_stream().map(|result| match result { Ok(bytes) => Ok(bytes.into()), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) } }); return Ok(stream); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } + /// Assigns a new holder to a blob represented by [`blob_hash`]. + /// Returns `BlobServiceError::AlreadyExists` if blob already has + /// a holder with given [`holder`] name. + pub async fn assign_holder( + &self, + blob_hash: &str, + holder: &str, + ) -> BlobResult { + debug!("Assign holder request"); + let url = self.get_blob_url(None)?; + + let payload = AssignHolderRequest { + holder: holder.to_string(), + blob_hash: blob_hash.to_string(), + }; + debug!("Request payload: {:?}", payload); + let response = self.http_client.post(url).json(&payload).send().await?; + + debug!("Response status: {}", response.status()); + if response.status().is_success() { + let AssignHolderResponse { data_exists } = response.json().await?; + trace!("Data exists: {}", data_exists); + return Ok(data_exists); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } + + /// Revokes given holder from a blob represented by [`blob_hash`]. + /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist + /// or it does not have such holder + pub async fn revoke_holder( + &self, + blob_hash: &str, + holder: &str, + ) -> BlobResult<()> { + debug!("Revoke holder request"); + let url = self.get_blob_url(None)?; + + let payload = RevokeHolderRequest { + holder: holder.to_string(), + blob_hash: blob_hash.to_string(), + }; + debug!("Request payload: {:?}", payload); + + let response = self.http_client.delete(url).json(&payload).send().await?; + debug!("Response status: {}", response.status()); + + if response.status().is_success() { + trace!("Revoke holder request successful"); + return Ok(()); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } + /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// /// # Example /// ```rust /// use std::io::{Error, ErrorKind}; /// /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let stream = async_stream::stream! { /// yield Ok(vec![1, 2, 3]); /// yield Ok(vec![4, 5, 6]); /// yield Err(Error::new(ErrorKind::Other, "Oops")); /// }; /// client.upload_blob(&blob_hash, stream).await?; /// ``` pub async fn upload_blob( &self, blob_hash: H, data_stream: S, ) -> BlobResult<()> where H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Vec: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; let stream = data_stream.map_ok(Vec::from); let streaming_body = Body::wrap_stream(stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); let response = self.http_client.put(url).multipart(form).send().await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Blob upload successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } + + /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. + /// + /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, + /// uploads the data from [`data_stream`]. + pub async fn simple_put( + &self, + blob_hash: &str, + holder: &str, + data_stream: S, + ) -> BlobResult + where + S: futures_core::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Vec: From, + { + trace!("Begin simple put. Assigning holder..."); + let data_exists = self.assign_holder(blob_hash, holder).await?; + if data_exists { + trace!("Blob data already exists. Skipping upload."); + return Ok(false); + } + trace!("Uploading blob data..."); + self.upload_blob(blob_hash, data_stream).await?; + Ok(true) + } } // private helper methods impl BlobServiceClient { fn get_blob_url( &self, blob_hash: Option<&str>, ) -> Result { let path = match blob_hash { Some(hash) => format!("/blob/{}", hash), None => "/blob".to_string(), }; let url = self .blob_service_url .join(&path) .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError { match status_code { StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, StatusCode::NOT_FOUND => BlobServiceError::NotFound, StatusCode::CONFLICT => BlobServiceError::AlreadyExists, code if code.is_server_error() => BlobServiceError::ServerError, code => BlobServiceError::UnexpectedHttpStatus(code), } } type BlobResult = Result; + +#[derive(serde::Deserialize)] +struct AssignHolderResponse { + data_exists: bool, +} +#[derive(Debug, serde::Serialize)] +struct AssignHolderRequest { + blob_hash: String, + holder: String, +} +// they have the same layout so we can simply alias +type RevokeHolderRequest = AssignHolderRequest;