diff --git a/services/backup/src/blob/get_client.rs b/services/backup/src/blob/downloader.rs rename from services/backup/src/blob/get_client.rs rename to services/backup/src/blob/downloader.rs --- a/services/backup/src/blob/get_client.rs +++ b/services/backup/src/blob/downloader.rs @@ -11,7 +11,7 @@ pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; -/// The GetClient instance is a handle holder of a Tokio task running the +/// The BlobDownloader instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via a MPSC channel /// and is one-sided - the data is transmitted from the client task to the /// caller. Blob chunks received in response stream are waiting @@ -19,15 +19,15 @@ /// to make room for more. /// The client task can be stopped and awaited for result via the `terminate()` /// method. -pub struct GetClient { +pub struct BlobDownloader { rx: Receiver>, handle: JoinHandle>, } -impl GetClient { +impl BlobDownloader { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. - #[instrument(name = "get_client")] + #[instrument(name = "blob_downloader")] pub async fn start(holder: String) -> Result { let service_url = &crate::CONFIG.blob_service_url; let mut blob_client = @@ -56,7 +56,7 @@ }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(GetClient { + Ok(BlobDownloader { rx: blob_res_rx, handle, }) @@ -69,7 +69,7 @@ /// determine if it was successful. After receiving `None`, the client /// should be consumed by calling [`GetClient::terminate`] to handle /// possible errors. - pub async fn get(&mut self) -> Option> { + pub async fn next_chunk(&mut self) -> Option> { self.rx.recv().await } diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs --- a/services/backup/src/blob/mod.rs +++ b/services/backup/src/blob/mod.rs @@ -4,7 +4,7 @@ pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; -mod get_client; +mod downloader; mod put_client; -pub use get_client::*; +pub use downloader::*; pub use put_client::*; diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -7,7 +7,7 @@ use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::{ - blob::GetClient, + blob::BlobDownloader, constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, @@ -108,10 +108,10 @@ { try_stream! { let mut buffer = ResponseBuffer::default(); - let mut client = - GetClient::start(item.get_holder().to_string()).await.map_err(|err| { + let mut downloader = + BlobDownloader::start(item.get_holder().to_string()).await.map_err(|err| { error!( - "Failed to start blob client: {:?}", err + "Failed to start blob downloader: {:?}", err ); Status::aborted("Internal error") })?; @@ -119,7 +119,7 @@ let mut is_first_chunk = true; loop { if !buffer.is_saturated() { - if let Some(data) = client.get().await { + if let Some(data) = downloader.next_chunk().await { buffer.put(data); } } @@ -140,8 +140,8 @@ is_first_chunk = false; } - client.terminate().await.map_err(|err| { - error!("Blob client failed: {:?}", err); + downloader.terminate().await.map_err(|err| { + error!("Blob downloader failed: {:?}", err); Status::aborted("Internal error") })?; }