diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/put_client.rs --- a/services/backup/src/blob/put_client.rs +++ b/services/backup/src/blob/put_client.rs @@ -4,7 +4,8 @@ task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; -use tracing::{instrument, Instrument}; +use tonic::Status; +use tracing::{error, instrument, Instrument}; use super::proto; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; @@ -92,6 +93,20 @@ .ok_or_else(|| anyhow::anyhow!("Blob client channel closed")) } + /// Convenience wrapper for + /// ``` + /// BlobClient::put(PutRequest { + /// data: Some(PutRequestData::DataChunk(data)) + /// }) + /// ``` + pub async fn put_data(&mut self, data: Vec) -> Result { + self + .put(PutRequest { + data: Some(PutRequestData::DataChunk(data)), + }) + .await + } + /// Closes the connection and awaits the blob client task to finish. pub async fn terminate(self) -> Result<()> { drop(self.req_tx); @@ -99,3 +114,62 @@ thread_result } } + +/// Starts a put client instance. Fulfills request with blob hash and holder. +/// +/// `None` is returned if given `holder` already exists. +/// +/// ## Example +/// ``` +/// if let Some(mut client) = +/// start_simple_put_client("my_holder", "my_hash").await? { +/// let my_data = vec![1,2,3,4]; +/// let _ = client.put_data(my_data).await; +/// +/// let status = client.terminate().await; +/// } +/// ``` +pub async fn start_simple_put_client( + holder: &str, + blob_hash: &str, +) -> Result, Status> { + // start client + let mut put_client = PutClient::start().await.map_err(|err| { + error!("Failed to instantiate blob client: {:?}", err); + Status::aborted("Internal error") + })?; + + // send holder + put_client + .put(PutRequest { + data: Some(PutRequestData::Holder(holder.to_string())), + }) + .await + .map_err(|err| { + error!("Failed to set blob holder: {:?}", err); + Status::aborted("Internal error") + })?; + + // send hash + let PutResponse { data_exists } = put_client + .put(PutRequest { + data: Some(PutRequestData::BlobHash(blob_hash.to_string())), + }) + .await + .map_err(|err| { + error!("Failed to set blob hash: {:?}", err); + Status::aborted("Internal error") + })?; + + // Blob with given holder already exists, nothing to do + if data_exists { + // the connection is already terminated by server, + // but it's good to await it anyway + put_client.terminate().await.map_err(|err| { + error!("Put client task closed with error: {:?}", err); + Status::aborted("Internal error") + })?; + return Ok(None); + } + Ok(Some(put_client)) +}