diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs --- a/services/backup/src/blob/downloader.rs +++ b/services/backup/src/blob/downloader.rs @@ -3,7 +3,7 @@ sync::mpsc::{self, Receiver}, task::JoinHandle, }; -use tracing::{instrument, Instrument}; +use tracing::{error, instrument, Instrument}; use super::{proto, BlobClient}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; @@ -28,14 +28,17 @@ /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_downloader")] - pub async fn start( - holder: String, - mut blob_client: BlobClient, - ) -> Result { + pub fn start(holder: String, mut blob_client: BlobClient) -> Self { let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { - let response = blob_client.get(proto::GetRequest { holder }).await?; + let response = blob_client + .get(proto::GetRequest { holder }) + .await + .map_err(|err| { + error!("Get request failed: {:?}", err); + err + })?; let mut inner_response = response.into_inner(); loop { match inner_response.message().await? { @@ -53,10 +56,10 @@ }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(BlobDownloader { + BlobDownloader { rx: blob_res_rx, handle, - }) + } } /// Receives the next chunk of blob data if ready or sleeps diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs --- a/services/backup/src/blob/uploader.rs +++ b/services/backup/src/blob/uploader.rs @@ -30,7 +30,7 @@ /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_uploader")] - pub async fn start(mut blob_client: BlobClient) -> Result { + pub fn start(mut blob_client: BlobClient) -> Self { let (blob_req_tx, blob_req_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let (blob_res_tx, blob_res_rx) = @@ -61,6 +61,7 @@ } } Err(err) => { + error!("Put request failed: {:?}", err); bail!(err.to_string()); } }; @@ -68,11 +69,11 @@ }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(BlobUploader { + BlobUploader { req_tx: blob_req_tx, res_rx: blob_res_rx, handle, - }) + } } /// Sends a [`PutRequest`] to the stream and waits for blob service @@ -128,11 +129,8 @@ blob_hash: &str, blob_client: BlobClient, ) -> Result, Status> { - // start client - let mut uploader = BlobUploader::start(blob_client).await.map_err(|err| { - error!("Failed to instantiate uploader: {:?}", err); - Status::aborted("Internal error") - })?; + // start upload request + let mut uploader = BlobUploader::start(blob_client); // send holder uploader diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -116,12 +116,7 @@ try_stream! { let mut buffer = ResponseBuffer::default(); let mut downloader = - BlobDownloader::start(item.get_holder().to_string(), blob_client).await.map_err(|err| { - error!( - "Failed to start blob downloader: {:?}", err - ); - Status::aborted("Internal error") - })?; + BlobDownloader::start(item.get_holder().to_string(), blob_client); let mut is_first_chunk = true; loop {