diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/put_client.rs index 32bd49127..15b2f2c19 100644 --- a/services/backup/src/blob/put_client.rs +++ b/services/backup/src/blob/put_client.rs @@ -1,101 +1,175 @@ use anyhow::{anyhow, bail, Result}; use tokio::{ sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; -use tracing::{instrument, Instrument}; +use tonic::Status; +use tracing::{error, instrument, Instrument}; use super::proto; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; pub struct PutClient { req_tx: Sender, res_rx: Receiver, handle: JoinHandle>, } /// The PutClient instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via two MPSC /// channels - one sending requests to the client task, and another for sending /// responses back to the caller. These messages should go in pairs /// - one request for one response. /// The client task can be stopped and awaited for result via the `terminate()` /// method. impl PutClient { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "put_client")] pub async fn start() -> Result { let service_url = &crate::CONFIG.blob_service_url; let mut blob_client = proto::blob_service_client::BlobServiceClient::connect( service_url.to_string(), ) .await?; let (blob_req_tx, blob_req_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { match blob_client .put(tonic::Request::new(ReceiverStream::new(blob_req_rx))) .await { Ok(response) => { let mut response_stream = response.into_inner(); loop { match response_stream.message().await? { Some(response_message) => { // warning: this will produce an error if there's more unread // responses than MPSC_CHANNEL_BUFFER_CAPACITY // so you should always read the response MPSC channel // right after sending a request to dequeue the responses // and make room for more. // The PutClient::put() function should take care of that if let Err(err) = blob_res_tx.try_send(response_message) { bail!(err); } } // Response stream was closed None => break, } } } Err(err) => { bail!(err.to_string()); } }; Ok(()) }; let handle = tokio::spawn(client_thread.in_current_span()); Ok(PutClient { req_tx: blob_req_tx, res_rx: blob_res_rx, handle, }) } /// Sends a [`PutRequest`] to the stream and waits for blob service /// to send a response. After all data is sent, the [`PutClient::terminate`] /// should be called to end the transmission and handle possible errors. pub async fn put(&mut self, req: PutRequest) -> Result { self.req_tx.try_send(req)?; self .res_rx .recv() .await .ok_or_else(|| anyhow!("Blob client channel closed")) } + /// Convenience wrapper for + /// ``` + /// BlobClient::put(PutRequest { + /// data: Some(PutRequestData::DataChunk(data)) + /// }) + /// ``` + pub async fn put_data(&mut self, data: Vec) -> Result { + self + .put(PutRequest { + data: Some(PutRequestData::DataChunk(data)), + }) + .await + } + /// Closes the connection and awaits the blob client task to finish. pub async fn terminate(self) -> Result<()> { drop(self.req_tx); let thread_result = self.handle.await?; thread_result } } + +/// Starts a put client instance. Fulfills request with blob hash and holder. +/// +/// `None` is returned if given `holder` already exists. +/// +/// ## Example +/// ``` +/// if let Some(mut client) = +/// start_simple_put_client("my_holder", "my_hash").await? { +/// let my_data = vec![1,2,3,4]; +/// let _ = client.put_data(my_data).await; +/// +/// let status = client.terminate().await; +/// } +/// ``` +pub async fn start_simple_put_client( + holder: &str, + blob_hash: &str, +) -> Result, Status> { + // start client + let mut put_client = PutClient::start().await.map_err(|err| { + error!("Failed to instantiate blob client: {:?}", err); + Status::aborted("Internal error") + })?; + + // send holder + put_client + .put(PutRequest { + data: Some(PutRequestData::Holder(holder.to_string())), + }) + .await + .map_err(|err| { + error!("Failed to set blob holder: {:?}", err); + Status::aborted("Internal error") + })?; + + // send hash + let PutResponse { data_exists } = put_client + .put(PutRequest { + data: Some(PutRequestData::BlobHash(blob_hash.to_string())), + }) + .await + .map_err(|err| { + error!("Failed to set blob hash: {:?}", err); + Status::aborted("Internal error") + })?; + + // Blob with given holder already exists, nothing to do + if data_exists { + // the connection is already terminated by server, + // but it's good to await it anyway + put_client.terminate().await.map_err(|err| { + error!("Put client task closed with error: {:?}", err); + Status::aborted("Internal error") + })?; + return Ok(None); + } + Ok(Some(put_client)) +}