diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs --- a/services/backup/src/blob/mod.rs +++ b/services/backup/src/blob/mod.rs @@ -5,6 +5,6 @@ pub use proto::{PutRequest, PutResponse}; mod downloader; -mod put_client; +mod uploader; pub use downloader::*; -pub use put_client::*; +pub use uploader::*; diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/uploader.rs rename from services/backup/src/blob/put_client.rs rename to services/backup/src/blob/uploader.rs --- a/services/backup/src/blob/put_client.rs +++ b/services/backup/src/blob/uploader.rs @@ -13,23 +13,23 @@ pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; -pub struct PutClient { +pub struct BlobUploader { req_tx: Sender, res_rx: Receiver, handle: JoinHandle>, } -/// The PutClient instance is a handle holder of a Tokio task running the +/// The BlobUploader instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via two MPSC /// channels - one sending requests to the client task, and another for sending /// responses back to the caller. These messages should go in pairs /// - one request for one response. /// The client task can be stopped and awaited for result via the `terminate()` /// method. -impl PutClient { +impl BlobUploader { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. - #[instrument(name = "put_client")] + #[instrument(name = "blob_uploader")] pub async fn start() -> Result { let service_url = &crate::CONFIG.blob_service_url; let mut blob_client = @@ -74,7 +74,7 @@ }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(PutClient { + Ok(BlobUploader { req_tx: blob_req_tx, res_rx: blob_res_rx, handle, @@ -122,25 +122,25 @@ /// ## Example /// ``` /// if let Some(mut client) = -/// start_simple_put_client("my_holder", "my_hash").await? { +/// start_simple_uploader("my_holder", "my_hash").await? { /// let my_data = vec![1,2,3,4]; /// let _ = client.put_data(my_data).await; /// /// let status = client.terminate().await; /// } /// ``` -pub async fn start_simple_put_client( +pub async fn start_simple_uploader( holder: &str, blob_hash: &str, -) -> Result, Status> { +) -> Result, Status> { // start client - let mut put_client = PutClient::start().await.map_err(|err| { - error!("Failed to instantiate blob client: {:?}", err); + let mut uploader = BlobUploader::start().await.map_err(|err| { + error!("Failed to instantiate uploader: {:?}", err); Status::aborted("Internal error") })?; // send holder - put_client + uploader .put(PutRequest { data: Some(PutRequestData::Holder(holder.to_string())), }) @@ -151,7 +151,7 @@ })?; // send hash - let PutResponse { data_exists } = put_client + let PutResponse { data_exists } = uploader .put(PutRequest { data: Some(PutRequestData::BlobHash(blob_hash.to_string())), }) @@ -165,11 +165,11 @@ if data_exists { // the connection is already terminated by server, // but it's good to await it anyway - put_client.terminate().await.map_err(|err| { + uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; return Ok(None); } - Ok(Some(put_client)) + Ok(Some(uploader)) } diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs --- a/services/backup/src/service/handlers/add_attachments.rs +++ b/services/backup/src/service/handlers/add_attachments.rs @@ -84,16 +84,16 @@ Some(&log_item.log_id), ); - if let Some(mut blob_client) = - crate::blob::start_simple_put_client(&holder, &log_item.data_hash).await? + if let Some(mut uploader) = + crate::blob::start_simple_uploader(&holder, &log_item.data_hash).await? { let blob_chunk = log_item.value.into_bytes(); - blob_client.put_data(blob_chunk).await.map_err(|err| { + uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; - blob_client.terminate().await.map_err(|err| { + uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -2,7 +2,7 @@ use tracing::{debug, error, trace, warn}; use crate::{ - blob::{start_simple_put_client, PutClient}, + blob::{start_simple_uploader, BlobUploader}, database::{BackupItem, DatabaseClient}, service::proto, }; @@ -15,7 +15,7 @@ /// Initial state. Handler is receiving non-data inputs ReceivingParams, /// Handler is receiving data chunks - ReceivingData { blob_client: PutClient }, + ReceivingData { uploader: BlobUploader }, /// A special case when Blob service claims that a blob with given /// [`CreateBackupHandler::data_hash`] already exists DataAlreadyExists, @@ -110,7 +110,7 @@ &mut self, data_chunk: Vec, ) -> CreateBackupResult { - let HandlerState::ReceivingData { ref mut blob_client } = self.state else { + let HandlerState::ReceivingData { ref mut uploader } = self.state else { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( @@ -127,7 +127,7 @@ } trace!("Received {} bytes of data", data_chunk.len()); - blob_client.put_data(data_chunk).await.map_err(|err| { + uploader.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; @@ -147,9 +147,9 @@ trace!("Nothing to store in database. Finishing early"); return Ok(()); } - HandlerState::ReceivingData { blob_client } => { - blob_client.terminate().await.map_err(|err| { - error!("Put client task closed with error: {:?}", err); + HandlerState::ReceivingData { uploader } => { + uploader.terminate().await.map_err(|err| { + error!("Uploader task closed with error: {:?}", err); Status::aborted("Internal error") })?; } @@ -203,9 +203,9 @@ tracing::Span::current().record("backup_id", &backup_id); tracing::Span::current().record("blob_holder", &holder); - match start_simple_put_client(&holder, data_hash).await? { - Some(blob_client) => { - self.state = HandlerState::ReceivingData { blob_client }; + match start_simple_uploader(&holder, data_hash).await? { + Some(uploader) => { + self.state = HandlerState::ReceivingData { uploader }; trace!("Everything prepared, waiting for data..."); } None => { diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -4,7 +4,7 @@ use super::handle_db_error; use crate::{ - blob::PutClient, + blob::BlobUploader, constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, @@ -34,14 +34,14 @@ // client instances db: DatabaseClient, - blob_client: Option, + uploader: Option, } impl SendLogHandler { pub fn new(db: &DatabaseClient) -> Self { SendLogHandler { db: db.clone(), - blob_client: None, + uploader: None, user_id: None, backup_id: None, log_hash: None, @@ -116,7 +116,7 @@ self.ensure_size_constraints().await?; } LogPersistence::BLOB { .. } => { - let Some(client) = self.blob_client.as_mut() else { + let Some(client) = self.uploader.as_mut() else { self.should_close_stream = true; error!("Put client uninitialized. This should never happen!"); return Err(Status::failed_precondition("Internal error")); @@ -131,13 +131,13 @@ } pub async fn finish(self) -> Result { - if let Some(client) = self.blob_client { + if let Some(client) = self.uploader { client.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { - trace!("No blob client initialized. Skipping termination"); + trace!("No uploader initialized. Skipping termination"); } if !self.should_receive_data { @@ -234,14 +234,14 @@ debug!("Log too large, switching persistence to Blob"); let holder = crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); - match crate::blob::start_simple_put_client(&holder, &log_hash).await? { - Some(mut put_client) => { + match crate::blob::start_simple_uploader(&holder, &log_hash).await? { + Some(mut uploader) => { let blob_chunk = std::mem::take(&mut self.log_buffer); - put_client.put_data(blob_chunk).await.map_err(|err| { + uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; - self.blob_client = Some(put_client); + self.uploader = Some(uploader); } None => { debug!("Log hash already exists");