diff --git a/services/backup/src/blob/get_client.rs b/services/backup/src/blob/get_client.rs --- a/services/backup/src/blob/get_client.rs +++ b/services/backup/src/blob/get_client.rs @@ -1,11 +1,24 @@ -use anyhow::Result; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; -use tracing::instrument; +use anyhow::{bail, Result}; +use tokio::{ + sync::mpsc::{self, Receiver}, + task::JoinHandle, +}; +use tracing::{instrument, Instrument}; use super::proto; +use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; + pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; +/// The GetClient instance is a handle holder of a Tokio task running the +/// actual blob client instance. The communication is done via a MPSC channel +/// and is one-sided - the data is transmitted from the client task to the +/// caller. Blob chunks received in response stream are waiting +/// for the channel to have capacity, so it is recommended to read them quickly +/// to make room for more. +/// The client task can be stopped and awaited for result via the `terminate()` +/// method. pub struct GetClient { rx: Receiver>, handle: JoinHandle>, @@ -16,7 +29,37 @@ /// in a separate Tokio task. #[instrument(name = "get_client")] pub async fn start(holder: String) -> Result { - todo!() + let service_url = &crate::CONFIG.blob_service_url; + let mut blob_client = + proto::blob_service_client::BlobServiceClient::connect( + service_url.to_string(), + ) + .await?; + let (blob_res_tx, blob_res_rx) = + mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let client_thread = async move { + let response = blob_client.get(proto::GetRequest { holder }).await?; + let mut inner_response = response.into_inner(); + loop { + match inner_response.message().await? { + Some(data) => { + let data: Vec = data.data_chunk; + if let Err(err) = blob_res_tx.send(data).await { + bail!(err); + } + } + // Response stream was closed + None => break, + } + } + Ok(()) + }; + let handle = tokio::spawn(client_thread.in_current_span()); + + Ok(GetClient { + rx: blob_res_rx, + handle, + }) } /// Receives the next chunk of blob data if ready or sleeps @@ -27,12 +70,13 @@ /// should be consumed by calling [`GetClient::terminate`] to handle /// possible errors. pub async fn get(&mut self) -> Option> { - todo!() + self.rx.recv().await } /// Stops receiving messages and awaits the client thread to exit /// and returns its status. pub async fn terminate(mut self) -> Result<()> { - todo!() + self.rx.close(); + self.handle.await? } } diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,3 +1,7 @@ +// Assorted constants + +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; + // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051;