diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs --- a/services/backup/src/blob/downloader.rs +++ b/services/backup/src/blob/downloader.rs @@ -5,7 +5,7 @@ }; use tracing::{instrument, Instrument}; -use super::proto; +use super::{proto, BlobClient}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; @@ -28,13 +28,10 @@ /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_downloader")] - pub async fn start(holder: String) -> Result { - let service_url = &crate::CONFIG.blob_service_url; - let mut blob_client = - proto::blob_service_client::BlobServiceClient::connect( - service_url.to_string(), - ) - .await?; + pub async fn start( + holder: String, + mut blob_client: BlobClient, + ) -> Result { let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs --- a/services/backup/src/blob/uploader.rs +++ b/services/backup/src/blob/uploader.rs @@ -7,7 +7,7 @@ use tonic::Status; use tracing::{error, instrument, Instrument}; -use super::proto; +use super::{proto, BlobClient}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; @@ -30,13 +30,7 @@ /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. #[instrument(name = "blob_uploader")] - pub async fn start() -> Result { - let service_url = &crate::CONFIG.blob_service_url; - let mut blob_client = - proto::blob_service_client::BlobServiceClient::connect( - service_url.to_string(), - ) - .await?; + pub async fn start(mut blob_client: BlobClient) -> Result { let (blob_req_tx, blob_req_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let (blob_res_tx, blob_res_rx) = @@ -132,9 +126,10 @@ pub async fn start_simple_uploader( holder: &str, blob_hash: &str, + blob_client: BlobClient, ) -> Result, Status> { // start client - let mut uploader = BlobUploader::start().await.map_err(|err| { + let mut uploader = BlobUploader::start(blob_client).await.map_err(|err| { error!("Failed to instantiate uploader: {:?}", err); Status::aborted("Internal error") })?; diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs --- a/services/backup/src/service/handlers/add_attachments.rs +++ b/services/backup/src/service/handlers/add_attachments.rs @@ -89,8 +89,12 @@ Some(&log_item.log_id), ); - if let Some(mut uploader) = - crate::blob::start_simple_uploader(&holder, &log_item.data_hash).await? + if let Some(mut uploader) = crate::blob::start_simple_uploader( + &holder, + &log_item.data_hash, + blob_client.clone(), + ) + .await? { let blob_chunk = log_item.value.into_bytes(); uploader.put_data(blob_chunk).await.map_err(|err| { diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -205,7 +205,9 @@ tracing::Span::current().record("backup_id", &backup_id); tracing::Span::current().record("blob_holder", &holder); - match start_simple_uploader(&holder, data_hash).await? { + match start_simple_uploader(&holder, data_hash, self.blob_client.clone()) + .await? + { Some(uploader) => { self.state = HandlerState::ReceivingData { uploader }; trace!("Everything prepared, waiting for data..."); diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -116,7 +116,7 @@ try_stream! { let mut buffer = ResponseBuffer::default(); let mut downloader = - BlobDownloader::start(item.get_holder().to_string()).await.map_err(|err| { + BlobDownloader::start(item.get_holder().to_string(), blob_client).await.map_err(|err| { error!( "Failed to start blob downloader: {:?}", err ); diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -236,7 +236,13 @@ debug!("Log too large, switching persistence to Blob"); let holder = crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); - match crate::blob::start_simple_uploader(&holder, &log_hash).await? { + match crate::blob::start_simple_uploader( + &holder, + &log_hash, + self.blob_client.clone(), + ) + .await? + { Some(mut uploader) => { let blob_chunk = std::mem::take(&mut self.log_buffer); uploader.put_data(blob_chunk).await.map_err(|err| {