diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -7,8 +7,7 @@ use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ - c_char_pointer_to_string, c_char_pointer_to_string_new, check_error, - report_error, string_to_c_char_pointer, string_to_c_char_pointer_new, + c_char_pointer_to_string_new, string_to_c_char_pointer_new, }; use anyhow::bail; use crate::RUNTIME; @@ -22,7 +21,7 @@ struct ReadClient { rx: mpsc::Receiver>, - rx_handle: JoinHandle<()>, + rx_handle: JoinHandle>, } lazy_static! { @@ -34,16 +33,7 @@ Mutex::new(Vec::new()); } -fn is_initialized(holder: &str) -> bool { - if let Ok(clients) = CLIENTS.lock() { - return clients.contains_key(holder); - } else { - report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); - } - false -} - -fn is_initialized_new(holder: &String) -> anyhow::Result { +fn is_initialized(holder: &str) -> anyhow::Result { if let Ok(clients) = CLIENTS.lock() { return Ok(clients.contains_key(holder)); } @@ -52,13 +42,11 @@ pub fn get_client_initialize_cxx( holder_char: *const c_char, -) -> Result<(), String> { - let holder = c_char_pointer_to_string(holder_char)?; - - assert!( - !is_initialized(&holder), - "client cannot be initialized twice" - ); +) -> anyhow::Result<(), anyhow::Error> { + let holder = c_char_pointer_to_string_new(holder_char)?; + if is_initialized(&holder)? { + get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?; + } // grpc if let Ok(mut grpc_client) = @@ -69,48 +57,29 @@ mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); let cloned_holder = holder.clone(); let rx_handle = RUNTIME.spawn(async move { - if let Ok(response) = grpc_client + let response = grpc_client .get(GetRequest { holder: cloned_holder, }) - .await - { - let mut inner_response = response.into_inner(); - loop { - match inner_response.message().await { - Ok(maybe_data) => { - let mut result = false; - if let Some(data) = maybe_data { - let data: Vec = data.data_chunk; - result = match response_thread_tx.send(data).await { - Ok(_) => true, - Err(err) => { - report_error( - &ERROR_MESSAGES, - &err.to_string(), - Some("get"), - ); - false - } - } - } - if !result { - break; - } - } + .await?; + let mut inner_response = response.into_inner(); + loop { + let maybe_data = inner_response.message().await?; + let mut result = false; + if let Some(data) = maybe_data { + let data: Vec = data.data_chunk; + result = match response_thread_tx.send(data).await { + Ok(_) => true, Err(err) => { - report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); - break; + bail!(err); } - }; + } + } + if !result { + break; } - } else { - report_error( - &ERROR_MESSAGES, - "couldn't perform grpc get operation", - Some("get"), - ); } + Ok(()) }); if let Ok(mut clients) = CLIENTS.lock() { @@ -121,9 +90,9 @@ (*clients).insert(holder, client); return Ok(()); } - return Err("could not access client".to_string()); + bail!("could not access client"); } - Err("could not successfully connect to the blob server".to_string()) + bail!("could not successfully connect to the blob server") } pub fn get_client_blocking_read_cxx( @@ -148,7 +117,7 @@ holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { let holder = c_char_pointer_to_string_new(holder_char)?; - if !is_initialized_new(&holder)? { + if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { @@ -169,7 +138,7 @@ bail!("couldn't access client"); } - if is_initialized_new(&holder)? { + if is_initialized(&holder)? { bail!("client transmitter handler released properly"); } Ok(())