diff --git a/services/comm-services-lib/src/blob/client.rs b/services/comm-services-lib/src/blob/client.rs --- a/services/comm-services-lib/src/blob/client.rs +++ b/services/comm-services-lib/src/blob/client.rs @@ -113,6 +113,70 @@ Err(error) } + /// Assigns a new holder to a blob represented by [`blob_hash`]. + /// Returns `BlobServiceError::AlreadyExists` if blob already has + /// a holder with given [`holder`] name. + pub async fn assign_holder( + &self, + blob_hash: &str, + holder: &str, + ) -> BlobResult { + debug!("Assign holder request"); + let url = self.get_blob_url(None)?; + + let payload = AssignHolderRequest { + holder: holder.to_string(), + blob_hash: blob_hash.to_string(), + }; + debug!("Request payload: {:?}", payload); + let response = self.http_client.post(url).json(&payload).send().await?; + + debug!("Response status: {}", response.status()); + if response.status().is_success() { + let AssignHolderResponse { data_exists } = response.json().await?; + trace!("Data exists: {}", data_exists); + return Ok(data_exists); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } + + /// Revokes given holder from a blob represented by [`blob_hash`]. + /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist + /// or it does not have such holder + pub async fn revoke_holder( + &self, + blob_hash: &str, + holder: &str, + ) -> BlobResult<()> { + debug!("Revoke holder request"); + let url = self.get_blob_url(None)?; + + let payload = RevokeHolderRequest { + holder: holder.to_string(), + blob_hash: blob_hash.to_string(), + }; + debug!("Request payload: {:?}", payload); + + let response = self.http_client.delete(url).json(&payload).send().await?; + debug!("Response status: {}", response.status()); + + if response.status().is_success() { + trace!("Revoke holder request successful"); + return Ok(()); + } + + let error = handle_http_error(response.status()); + if let Ok(message) = response.text().await { + trace!("Error response message: {}", message); + } + Err(error) + } + /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// @@ -164,6 +228,32 @@ } Err(error) } + + /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. + /// + /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, + /// uploads the data from [`data_stream`]. + pub async fn simple_put( + &self, + blob_hash: &str, + holder: &str, + data_stream: S, + ) -> BlobResult + where + S: futures_core::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Vec: From, + { + trace!("Begin simple put. Assigning holder..."); + let data_exists = self.assign_holder(blob_hash, holder).await?; + if data_exists { + trace!("Blob data already exists. Skipping upload."); + return Ok(false); + } + trace!("Uploading blob data..."); + self.upload_blob(blob_hash, data_stream).await?; + Ok(true) + } } // private helper methods @@ -196,3 +286,15 @@ } type BlobResult = Result; + +#[derive(serde::Deserialize)] +struct AssignHolderResponse { + data_exists: bool, +} +#[derive(Debug, serde::Serialize)] +struct AssignHolderRequest { + blob_hash: String, + holder: String, +} +// they have the same layout so we can simply alias +type RevokeHolderRequest = AssignHolderRequest;