diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -1,7 +1,10 @@ use derive_more::{Display, Error, From}; use futures_core::Stream; -use futures_util::StreamExt; -use reqwest::Url; +use futures_util::{StreamExt, TryStreamExt}; +use reqwest::{ + multipart::{Form, Part}, + Body, Url, +}; use tracing::{debug, trace, warn}; // publicly re-export some reqwest types @@ -109,6 +112,58 @@ } Err(error) } + + /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash + /// already exists. + /// + /// # Example + /// ```rust + /// use std::io::{Error, ErrorKind}; + /// + /// let client = + /// BlobServiceClient::new("http://localhost:50053".parse()?); + /// + /// let stream = async_stream::stream! { + /// yield Ok(vec![1, 2, 3]); + /// yield Ok(vec![4, 5, 6]); + /// yield Err(Error::new(ErrorKind::Other, "Oops")); + /// }; + /// client.upload_blob(&blob_hash, stream).await?; + /// ``` + pub async fn upload_blob( + &self, + blob_hash: H, + data_stream: S, + ) -> BlobResult<()> + where + H: Into, + S: futures_core::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Vec: From, + { + debug!("Revoke holder request"); + let url = self.get_blob_url(None)?; + + let stream = data_stream.map_ok(Vec::from); + let streaming_body = Body::wrap_stream(stream); + let form = Form::new() + .text("blob_hash", blob_hash.into()) + .part("blob_data", Part::stream(streaming_body)); + + let response = self.http_client.put(url).multipart(form).send().await?; + debug!("Response status: {}", response.status()); + + if response.status().is_success() { + trace!("Blob upload successful"); + return Ok(()); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } } // private helper methods