diff --git a/services/backup/src/blob/mod.rs b/services/backup/src/blob/mod.rs index a0e3e4d3d..041ff9a9d 100644 --- a/services/backup/src/blob/mod.rs +++ b/services/backup/src/blob/mod.rs @@ -1,10 +1,10 @@ mod proto { tonic::include_proto!("blob"); } pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; mod downloader; -mod put_client; +mod uploader; pub use downloader::*; -pub use put_client::*; +pub use uploader::*; diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/uploader.rs similarity index 89% rename from services/backup/src/blob/put_client.rs rename to services/backup/src/blob/uploader.rs index 15b2f2c19..b112aca3c 100644 --- a/services/backup/src/blob/put_client.rs +++ b/services/backup/src/blob/uploader.rs @@ -1,175 +1,175 @@ use anyhow::{anyhow, bail, Result}; use tokio::{ sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{error, instrument, Instrument}; use super::proto; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; pub use proto::put_request::Data as PutRequestData; pub use proto::{PutRequest, PutResponse}; -pub struct PutClient { +pub struct BlobUploader { req_tx: Sender, res_rx: Receiver, handle: JoinHandle>, } -/// The PutClient instance is a handle holder of a Tokio task running the +/// The BlobUploader instance is a handle holder of a Tokio task running the /// actual blob client instance. The communication is done via two MPSC /// channels - one sending requests to the client task, and another for sending /// responses back to the caller. These messages should go in pairs /// - one request for one response. /// The client task can be stopped and awaited for result via the `terminate()` /// method. -impl PutClient { +impl BlobUploader { /// Connects to the Blob service and keeps the client connection open /// in a separate Tokio task. - #[instrument(name = "put_client")] + #[instrument(name = "blob_uploader")] pub async fn start() -> Result { let service_url = &crate::CONFIG.blob_service_url; let mut blob_client = proto::blob_service_client::BlobServiceClient::connect( service_url.to_string(), ) .await?; let (blob_req_tx, blob_req_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let (blob_res_tx, blob_res_rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client_thread = async move { match blob_client .put(tonic::Request::new(ReceiverStream::new(blob_req_rx))) .await { Ok(response) => { let mut response_stream = response.into_inner(); loop { match response_stream.message().await? { Some(response_message) => { // warning: this will produce an error if there's more unread // responses than MPSC_CHANNEL_BUFFER_CAPACITY // so you should always read the response MPSC channel // right after sending a request to dequeue the responses // and make room for more. // The PutClient::put() function should take care of that if let Err(err) = blob_res_tx.try_send(response_message) { bail!(err); } } // Response stream was closed None => break, } } } Err(err) => { bail!(err.to_string()); } }; Ok(()) }; let handle = tokio::spawn(client_thread.in_current_span()); - Ok(PutClient { + Ok(BlobUploader { req_tx: blob_req_tx, res_rx: blob_res_rx, handle, }) } /// Sends a [`PutRequest`] to the stream and waits for blob service /// to send a response. After all data is sent, the [`PutClient::terminate`] /// should be called to end the transmission and handle possible errors. pub async fn put(&mut self, req: PutRequest) -> Result { self.req_tx.try_send(req)?; self .res_rx .recv() .await .ok_or_else(|| anyhow!("Blob client channel closed")) } /// Convenience wrapper for /// ``` /// BlobClient::put(PutRequest { /// data: Some(PutRequestData::DataChunk(data)) /// }) /// ``` pub async fn put_data(&mut self, data: Vec) -> Result { self .put(PutRequest { data: Some(PutRequestData::DataChunk(data)), }) .await } /// Closes the connection and awaits the blob client task to finish. pub async fn terminate(self) -> Result<()> { drop(self.req_tx); let thread_result = self.handle.await?; thread_result } } /// Starts a put client instance. Fulfills request with blob hash and holder. /// /// `None` is returned if given `holder` already exists. /// /// ## Example /// ``` /// if let Some(mut client) = -/// start_simple_put_client("my_holder", "my_hash").await? { +/// start_simple_uploader("my_holder", "my_hash").await? { /// let my_data = vec![1,2,3,4]; /// let _ = client.put_data(my_data).await; /// /// let status = client.terminate().await; /// } /// ``` -pub async fn start_simple_put_client( +pub async fn start_simple_uploader( holder: &str, blob_hash: &str, -) -> Result, Status> { +) -> Result, Status> { // start client - let mut put_client = PutClient::start().await.map_err(|err| { - error!("Failed to instantiate blob client: {:?}", err); + let mut uploader = BlobUploader::start().await.map_err(|err| { + error!("Failed to instantiate uploader: {:?}", err); Status::aborted("Internal error") })?; // send holder - put_client + uploader .put(PutRequest { data: Some(PutRequestData::Holder(holder.to_string())), }) .await .map_err(|err| { error!("Failed to set blob holder: {:?}", err); Status::aborted("Internal error") })?; // send hash - let PutResponse { data_exists } = put_client + let PutResponse { data_exists } = uploader .put(PutRequest { data: Some(PutRequestData::BlobHash(blob_hash.to_string())), }) .await .map_err(|err| { error!("Failed to set blob hash: {:?}", err); Status::aborted("Internal error") })?; // Blob with given holder already exists, nothing to do if data_exists { // the connection is already terminated by server, // but it's good to await it anyway - put_client.terminate().await.map_err(|err| { + uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; return Ok(None); } - Ok(Some(put_client)) + Ok(Some(uploader)) } diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs index f13194fd2..fa038cff5 100644 --- a/services/backup/src/service/handlers/add_attachments.rs +++ b/services/backup/src/service/handlers/add_attachments.rs @@ -1,199 +1,199 @@ use tonic::Status; use tracing::debug; use tracing::error; use super::handle_db_error; use super::proto; use crate::{ constants::{ATTACHMENT_HOLDER_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, }; pub async fn handle_add_attachments( db: &DatabaseClient, request: proto::AddAttachmentsRequest, ) -> Result<(), Status> { let proto::AddAttachmentsRequest { user_id, backup_id, log_id, holders, } = request; if user_id.is_empty() { return Err(Status::invalid_argument( "user id required but not provided", )); } if backup_id.is_empty() { return Err(Status::invalid_argument( "backup id required but not provided", )); } if holders.is_empty() { return Err(Status::invalid_argument( "holders required but not provided", )); } if log_id.is_empty() { let backup_item_result = db .find_backup_item(&user_id, &backup_id) .await .map_err(handle_db_error)?; let mut backup_item = backup_item_result.ok_or_else(|| { debug!("Backup item not found"); Status::not_found("Backup item not found") })?; add_new_attachments(&mut backup_item.attachment_holders, &holders); db.put_backup_item(backup_item) .await .map_err(handle_db_error)?; } else { let log_item_result = db .find_log_item(&backup_id, &log_id) .await .map_err(handle_db_error)?; let mut log_item = log_item_result.ok_or_else(|| { debug!("Log item not found"); Status::not_found("Log item not found") })?; add_new_attachments(&mut log_item.attachment_holders, &holders); // log item too large for database, move it to blob-service stroage if !log_item.persisted_in_blob && log_item.total_size() > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log item too large. Persisting in blob service..."); log_item = move_to_blob(log_item).await?; } db.put_log_item(log_item).await.map_err(handle_db_error)?; } Ok(()) } async fn move_to_blob(log_item: LogItem) -> Result { let holder = crate::utils::generate_blob_holder( &log_item.data_hash, &log_item.backup_id, Some(&log_item.log_id), ); - if let Some(mut blob_client) = - crate::blob::start_simple_put_client(&holder, &log_item.data_hash).await? + if let Some(mut uploader) = + crate::blob::start_simple_uploader(&holder, &log_item.data_hash).await? { let blob_chunk = log_item.value.into_bytes(); - blob_client.put_data(blob_chunk).await.map_err(|err| { + uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; - blob_client.terminate().await.map_err(|err| { + uploader.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { debug!("Blob holder for log ID={} already exists", &log_item.log_id); } Ok(LogItem { persisted_in_blob: true, value: holder, ..log_item }) } /// Modifies the [`current_holders_str`] by appending attachment holders /// contained in [`new_holders`]. Removes duplicates. Both arguments /// are expected to be [`ATTACHMENT_HOLDER_SEPARATOR`] separated strings. fn add_new_attachments(current_holders_str: &mut String, new_holders: &str) { let mut current_holders = parse_attachment_holders(current_holders_str); let new_holders = parse_attachment_holders(new_holders); current_holders.extend(new_holders); *current_holders_str = current_holders .into_iter() .collect::>() .join(ATTACHMENT_HOLDER_SEPARATOR); } /// Parses an [`ATTACHMENT_HOLDER_SEPARATOR`] separated string into a `HashSet` /// of attachment holder string slices. fn parse_attachment_holders( holders_str: &str, ) -> std::collections::HashSet<&str> { holders_str .split(ATTACHMENT_HOLDER_SEPARATOR) .filter(|holder| !holder.is_empty()) .collect() } #[cfg(test)] mod tests { use super::*; use std::collections::HashSet; #[test] fn test_parse_attachments() { let holders = "h1;h2;h3"; let expected = HashSet::from(["h1", "h2", "h3"]); assert_eq!(parse_attachment_holders(holders), expected); } #[test] fn test_empty_attachments() { let actual = parse_attachment_holders(""); let expected = HashSet::new(); assert_eq!(actual, expected); } #[test] fn test_add_attachments() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = "holder3;holder4"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2", "holder3", "holder4"]) ); } #[test] fn test_add_to_empty() { let mut current_holders = String::new(); let new_holders = "holder3;holder4"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder3", "holder4"]) ); } #[test] fn test_add_none() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = ""; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2"]) ); } #[test] fn test_remove_duplicates() { let mut current_holders = "holder1;holder2".to_string(); let new_holders = "holder2;holder3"; add_new_attachments(&mut current_holders, new_holders); assert_eq!( parse_attachment_holders(¤t_holders), HashSet::from(["holder1", "holder2", "holder3"]) ); } } diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs index 73c0cad6a..fb154bff5 100644 --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -1,230 +1,230 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use crate::{ - blob::{start_simple_put_client, PutClient}, + blob::{start_simple_uploader, BlobUploader}, database::{BackupItem, DatabaseClient}, service::proto, }; use super::handle_db_error; type CreateBackupResult = Result; enum HandlerState { /// Initial state. Handler is receiving non-data inputs ReceivingParams, /// Handler is receiving data chunks - ReceivingData { blob_client: PutClient }, + ReceivingData { uploader: BlobUploader }, /// A special case when Blob service claims that a blob with given /// [`CreateBackupHandler::data_hash`] already exists DataAlreadyExists, } pub struct CreateBackupHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, device_id: Option, key_entropy: Option>, data_hash: Option, // client instances db: DatabaseClient, // internal state state: HandlerState, backup_id: String, holder: Option, } impl CreateBackupHandler { pub fn new(db: &DatabaseClient) -> Self { CreateBackupHandler { should_close_stream: false, user_id: None, device_id: None, key_entropy: None, data_hash: None, db: db.clone(), state: HandlerState::ReceivingParams, backup_id: String::new(), holder: None, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> CreateBackupResult { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_device_id( &mut self, device_id: String, ) -> CreateBackupResult { if self.device_id.is_some() { warn!("Device ID already provided"); return Err(Status::invalid_argument("Device ID already provided")); } tracing::Span::current().record("device_id", &device_id); self.device_id = Some(device_id); self.handle_internal().await } pub async fn handle_key_entropy( &mut self, key_entropy: Vec, ) -> CreateBackupResult { if self.key_entropy.is_some() { warn!("Key entropy already provided"); return Err(Status::invalid_argument("Key entropy already provided")); } self.key_entropy = Some(key_entropy); self.handle_internal().await } pub async fn handle_data_hash( &mut self, data_hash: Vec, ) -> CreateBackupResult { if self.data_hash.is_some() { warn!("Data hash already provided"); return Err(Status::invalid_argument("Data hash already provided")); } let hash_str = String::from_utf8(data_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; tracing::Span::current().record("data_hash", &hash_str); self.data_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_data_chunk( &mut self, data_chunk: Vec, ) -> CreateBackupResult { - let HandlerState::ReceivingData { ref mut blob_client } = self.state else { + let HandlerState::ReceivingData { ref mut uploader } = self.state else { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); }; // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(proto::CreateNewBackupResponse { backup_id: self.backup_id.clone(), }); } trace!("Received {} bytes of data", data_chunk.len()); - blob_client.put_data(data_chunk).await.map_err(|err| { + uploader.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; Ok(proto::CreateNewBackupResponse { // actual Backup ID should be sent only once, the time it is generated // see handle_internal() backup_id: String::new(), }) } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { match self.state { HandlerState::ReceivingParams => { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(()); } - HandlerState::ReceivingData { blob_client } => { - blob_client.terminate().await.map_err(|err| { - error!("Put client task closed with error: {:?}", err); + HandlerState::ReceivingData { uploader } => { + uploader.terminate().await.map_err(|err| { + error!("Uploader task closed with error: {:?}", err); Status::aborted("Internal error") })?; } HandlerState::DataAlreadyExists => (), } let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { error!("Holder / UserID absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; if self.backup_id.is_empty() { error!("Backup ID was not generated. This should never happen!"); return Err(Status::failed_precondition("Internal error")); } let backup_item = BackupItem::new(user_id, self.backup_id, holder); self .db .put_backup_item(backup_item) .await .map_err(handle_db_error)?; Ok(()) } // internal param handler helper async fn handle_internal(&mut self) -> CreateBackupResult { if !matches!(self.state, HandlerState::ReceivingParams) { error!("CreateBackupHandler already received all non-data params."); return Err(Status::failed_precondition("Backup data chunk expected")); } // all non-data inputs must be set before receiving backup data chunks let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( self.data_hash.as_ref(), self.device_id.as_ref(), self.user_id.as_ref(), self.key_entropy.as_ref(), ) else { // return empty backup ID when inputs are incomplete return Ok(proto::CreateNewBackupResponse { backup_id: "".to_string(), }); }; let backup_id = generate_backup_id(device_id); let holder = crate::utils::generate_blob_holder(data_hash, &backup_id, None); self.backup_id = backup_id.clone(); self.holder = Some(holder.clone()); tracing::Span::current().record("backup_id", &backup_id); tracing::Span::current().record("blob_holder", &holder); - match start_simple_put_client(&holder, data_hash).await? { - Some(blob_client) => { - self.state = HandlerState::ReceivingData { blob_client }; + match start_simple_uploader(&holder, data_hash).await? { + Some(uploader) => { + self.state = HandlerState::ReceivingData { uploader }; trace!("Everything prepared, waiting for data..."); } None => { // Blob with given data_hash already exists debug!("Blob already exists, finishing"); self.should_close_stream = true; self.state = HandlerState::DataAlreadyExists; } }; Ok(proto::CreateNewBackupResponse { backup_id }) } } /// Generates ID for a new backup fn generate_backup_id(device_id: &str) -> String { format!( "{device_id}_{timestamp}", device_id = device_id, timestamp = chrono::Utc::now().timestamp_millis() ) } diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs index 4633a1259..b6c16537d 100644 --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -1,264 +1,264 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use uuid::Uuid; use super::handle_db_error; use crate::{ - blob::PutClient, + blob::BlobUploader, constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, }; enum LogPersistence { /// Log entirely stored in DynamoDB database DB, /// Log contents stored with Blob service BLOB { holder: String }, } pub struct SendLogHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, backup_id: Option, log_hash: Option, // internal state log_id: Option, log_buffer: Vec, persistence_method: LogPersistence, should_receive_data: bool, // client instances db: DatabaseClient, - blob_client: Option, + uploader: Option, } impl SendLogHandler { pub fn new(db: &DatabaseClient) -> Self { SendLogHandler { db: db.clone(), - blob_client: None, + uploader: None, user_id: None, backup_id: None, log_hash: None, log_id: None, log_buffer: Vec::new(), persistence_method: LogPersistence::DB, should_receive_data: false, should_close_stream: false, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> Result<(), Status> { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_backup_id( &mut self, backup_id: String, ) -> Result<(), Status> { if self.backup_id.is_some() { warn!("backup ID already provided"); return Err(Status::invalid_argument("Backup ID already provided")); } tracing::Span::current().record("backup_id", &backup_id); self.backup_id = Some(backup_id); self.handle_internal().await } pub async fn handle_log_hash( &mut self, log_hash: Vec, ) -> Result<(), Status> { if self.log_hash.is_some() { warn!("Log hash already provided"); return Err(Status::invalid_argument("Log hash already provided")); } let hash_str = String::from_utf8(log_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; tracing::Span::current().record("log_hash", &hash_str); self.log_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_log_data( &mut self, data_chunk: Vec, ) -> Result<(), Status> { if !self.should_receive_data || self.log_id.is_none() { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); } // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(()); } match self.persistence_method { LogPersistence::DB => { self.log_buffer.extend(data_chunk); self.ensure_size_constraints().await?; } LogPersistence::BLOB { .. } => { - let Some(client) = self.blob_client.as_mut() else { + let Some(client) = self.uploader.as_mut() else { self.should_close_stream = true; error!("Put client uninitialized. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; client.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; } } Ok(()) } pub async fn finish(self) -> Result { - if let Some(client) = self.blob_client { + if let Some(client) = self.uploader { client.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { - trace!("No blob client initialized. Skipping termination"); + trace!("No uploader initialized. Skipping termination"); } if !self.should_receive_data { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(SendLogResponse { log_checkpoint: "".to_string(), }); } let (Some(backup_id), Some(log_id), Some(data_hash)) = ( self.backup_id, self.log_id, self.log_hash ) else { error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let (log_value, persisted_in_blob) = match self.persistence_method { LogPersistence::BLOB { holder } => (holder, true), LogPersistence::DB => { let contents = String::from_utf8(self.log_buffer).map_err(|err| { error!("Failed to convert log contents data into string: {:?}", err); Status::aborted("Unexpected error") })?; (contents, false) } }; let log_item = LogItem { backup_id, log_id: log_id.clone(), persisted_in_blob, value: log_value, attachment_holders: String::new(), data_hash, }; self .db .put_log_item(log_item) .await .map_err(handle_db_error)?; Ok(SendLogResponse { log_checkpoint: log_id, }) } // internal param handler helper async fn handle_internal(&mut self) -> Result<(), Status> { if self.should_receive_data { error!("SendLogHandler is already expecting data chunks"); return Err(Status::failed_precondition("Log data chunk expected")); } // all non-data inputs must be set before receiving log contents let (Some(backup_id), Some(_), Some(_)) = ( self.backup_id.as_ref(), self.user_id.as_ref(), self.log_hash.as_ref() ) else { return Ok(()); }; let log_id = generate_log_id(backup_id); tracing::Span::current().record("log_id", &log_id); self.log_id = Some(log_id); trace!("Everything prepared, waiting for data..."); self.should_receive_data = true; Ok(()) } /// Ensures log fits db size constraints. If not, it is moved to blob /// persistence async fn ensure_size_constraints(&mut self) -> Result<(), Status> { let (Some(backup_id), Some(log_id), Some(log_hash)) = ( self.backup_id.as_ref(), self.log_id.as_ref(), self.log_hash.as_ref() ) else { self.should_close_stream = true; error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let log_size = LogItem::size_from_components( backup_id, log_id, log_hash, &self.log_buffer, ); if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log too large, switching persistence to Blob"); let holder = crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); - match crate::blob::start_simple_put_client(&holder, &log_hash).await? { - Some(mut put_client) => { + match crate::blob::start_simple_uploader(&holder, &log_hash).await? { + Some(mut uploader) => { let blob_chunk = std::mem::take(&mut self.log_buffer); - put_client.put_data(blob_chunk).await.map_err(|err| { + uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; - self.blob_client = Some(put_client); + self.uploader = Some(uploader); } None => { debug!("Log hash already exists"); self.should_close_stream = true; } } self.persistence_method = LogPersistence::BLOB { holder }; } Ok(()) } } fn generate_log_id(backup_id: &str) -> String { format!( "{backup_id}{sep}{uuid}", backup_id = backup_id, sep = ID_SEPARATOR, uuid = Uuid::new_v4() ) }