diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs index 8c85a4740..bb681346c 100644 --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -1,176 +1,145 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ - c_char_pointer_to_string, c_char_pointer_to_string_new, check_error, - report_error, string_to_c_char_pointer, string_to_c_char_pointer_new, + c_char_pointer_to_string_new, string_to_c_char_pointer_new, }; use anyhow::bail; use crate::RUNTIME; use lazy_static::lazy_static; use libc; use libc::c_char; use std::collections::HashMap; use std::sync::Mutex; use tokio::sync::mpsc; use tokio::task::JoinHandle; struct ReadClient { rx: mpsc::Receiver>, - rx_handle: JoinHandle<()>, + rx_handle: JoinHandle>, } lazy_static! { // todo: we should consider limiting the clients size, // if every client is able to allocate up to 4MB data at a time static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); static ref ERROR_MESSAGES: Mutex> = Mutex::new(Vec::new()); } -fn is_initialized(holder: &str) -> bool { - if let Ok(clients) = CLIENTS.lock() { - return clients.contains_key(holder); - } else { - report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); - } - false -} - -fn is_initialized_new(holder: &String) -> anyhow::Result { +fn is_initialized(holder: &str) -> anyhow::Result { if let Ok(clients) = CLIENTS.lock() { return Ok(clients.contains_key(holder)); } bail!("couldn't access client"); } pub fn get_client_initialize_cxx( holder_char: *const c_char, -) -> Result<(), String> { - let holder = c_char_pointer_to_string(holder_char)?; - - assert!( - !is_initialized(&holder), - "client cannot be initialized twice" - ); +) -> anyhow::Result<(), anyhow::Error> { + let holder = c_char_pointer_to_string_new(holder_char)?; + if is_initialized(&holder)? { + get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?; + } // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); let cloned_holder = holder.clone(); let rx_handle = RUNTIME.spawn(async move { - if let Ok(response) = grpc_client + let response = grpc_client .get(GetRequest { holder: cloned_holder, }) - .await - { - let mut inner_response = response.into_inner(); - loop { - match inner_response.message().await { - Ok(maybe_data) => { - let mut result = false; - if let Some(data) = maybe_data { - let data: Vec = data.data_chunk; - result = match response_thread_tx.send(data).await { - Ok(_) => true, - Err(err) => { - report_error( - &ERROR_MESSAGES, - &err.to_string(), - Some("get"), - ); - false - } - } - } - if !result { - break; - } - } + .await?; + let mut inner_response = response.into_inner(); + loop { + let maybe_data = inner_response.message().await?; + let mut result = false; + if let Some(data) = maybe_data { + let data: Vec = data.data_chunk; + result = match response_thread_tx.send(data).await { + Ok(_) => true, Err(err) => { - report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); - break; + bail!(err); } - }; + } + } + if !result { + break; } - } else { - report_error( - &ERROR_MESSAGES, - "couldn't perform grpc get operation", - Some("get"), - ); } + Ok(()) }); if let Ok(mut clients) = CLIENTS.lock() { let client = ReadClient { rx_handle, rx: response_thread_rx, }; (*clients).insert(holder, client); return Ok(()); } - return Err("could not access client".to_string()); + bail!("could not access client"); } - Err("could not successfully connect to the blob server".to_string()) + bail!("could not successfully connect to the blob server") } pub fn get_client_blocking_read_cxx( holder_char: *const c_char, ) -> anyhow::Result, anyhow::Error> { let holder = c_char_pointer_to_string_new(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { if let Some(client) = clients.get_mut(&holder) { let maybe_data = client.rx.recv().await; return Ok(maybe_data.unwrap_or_else(|| vec![])); } else { bail!(format!("no client present for {}", holder)); } } else { bail!("couldn't access client"); } })?) } pub fn get_client_terminate_cxx( holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { let holder = c_char_pointer_to_string_new(holder_char)?; - if !is_initialized_new(&holder)? { + if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { match clients.remove(&holder) { Some(client) => { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { bail!(format!("awaiting for the client {} failed", holder)); } Ok(()) })?; } None => { bail!(format!("no client foudn for {}", holder)); } } } else { bail!("couldn't access client"); } - if is_initialized_new(&holder)? { + if is_initialized(&holder)? { bail!("client transmitter handler released properly"); } Ok(()) }