diff --git a/services/backup/src/blob/get_client.rs b/services/backup/src/blob/get_client.rs index 6e25f166f..a82d1b9d4 100644 --- a/services/backup/src/blob/get_client.rs +++ b/services/backup/src/blob/get_client.rs @@ -1,38 +1,82 @@ -use anyhow::Result; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; -use tracing::instrument; +use anyhow::{bail, Result}; +use tokio::{ + sync::mpsc::{self, Receiver}, + task::JoinHandle, +}; +use tracing::{instrument, Instrument}; use super::proto; +use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; + pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; +/// The GetClient instance is a handle holder of a Tokio task running the +/// actual blob client instance. The communication is done via a MPSC channel +/// and is one-sided - the data is transmitted from the client task to the +/// caller. Blob chunks received in response stream are waiting +/// for the channel to have capacity, so it is recommended to read them quickly +/// to make room for more. +/// The client task can be stopped and awaited for result via the `terminate()` +/// method. pub struct GetClient { rx: Receiver>, handle: JoinHandle>, } impl GetClient { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "get_client")] pub async fn start(holder: String) -> Result { - todo!() + let service_url = &crate::CONFIG.blob_service_url; + let mut blob_client = + proto::blob_service_client::BlobServiceClient::connect( + service_url.to_string(), + ) + .await?; + let (blob_res_tx, blob_res_rx) = + mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let client_thread = async move { + let response = blob_client.get(proto::GetRequest { holder }).await?; + let mut inner_response = response.into_inner(); + loop { + match inner_response.message().await? { + Some(data) => { + let data: Vec = data.data_chunk; + if let Err(err) = blob_res_tx.send(data).await { + bail!(err); + } + } + // Response stream was closed + None => break, + } + } + Ok(()) + }; + let handle = tokio::spawn(client_thread.in_current_span()); + + Ok(GetClient { + rx: blob_res_rx, + handle, + }) } /// Receives the next chunk of blob data if ready or sleeps /// until the data is available. /// /// Returns `None` when the transmission is finished, but this doesn't /// determine if it was successful. After receiving `None`, the client /// should be consumed by calling [`GetClient::terminate`] to handle /// possible errors. pub async fn get(&mut self) -> Option> { - todo!() + self.rx.recv().await } /// Stops receiving messages and awaits the client thread to exit /// and returns its status. pub async fn terminate(mut self) -> Result<()> { - todo!() + self.rx.close(); + self.handle.await? } } diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs index 6ed661e80..c55e2e1b1 100644 --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -1,11 +1,15 @@ +// Assorted constants + +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; + // Configuration defaults pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051; pub const DEFAULT_LOCALSTACK_URL: &str = "http://localhost:4566"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // Environment variable names pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV;