diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs index 87701560f..8c85a4740 100644 --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -1,182 +1,176 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ c_char_pointer_to_string, c_char_pointer_to_string_new, check_error, report_error, string_to_c_char_pointer, string_to_c_char_pointer_new, }; use anyhow::bail; use crate::RUNTIME; use lazy_static::lazy_static; use libc; use libc::c_char; use std::collections::HashMap; use std::sync::Mutex; use tokio::sync::mpsc; use tokio::task::JoinHandle; struct ReadClient { rx: mpsc::Receiver>, rx_handle: JoinHandle<()>, } lazy_static! { // todo: we should consider limiting the clients size, // if every client is able to allocate up to 4MB data at a time static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); static ref ERROR_MESSAGES: Mutex> = Mutex::new(Vec::new()); } fn is_initialized(holder: &str) -> bool { if let Ok(clients) = CLIENTS.lock() { return clients.contains_key(holder); } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } false } fn is_initialized_new(holder: &String) -> anyhow::Result { if let Ok(clients) = CLIENTS.lock() { return Ok(clients.contains_key(holder)); } bail!("couldn't access client"); } pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { let holder = c_char_pointer_to_string(holder_char)?; assert!( !is_initialized(&holder), "client cannot be initialized twice" ); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); let cloned_holder = holder.clone(); let rx_handle = RUNTIME.spawn(async move { if let Ok(response) = grpc_client .get(GetRequest { holder: cloned_holder, }) .await { let mut inner_response = response.into_inner(); loop { match inner_response.message().await { Ok(maybe_data) => { let mut result = false; if let Some(data) = maybe_data { let data: Vec = data.data_chunk; result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { report_error( &ERROR_MESSAGES, &err.to_string(), Some("get"), ); false } } } if !result { break; } } Err(err) => { report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); break; } }; } } else { report_error( &ERROR_MESSAGES, "couldn't perform grpc get operation", Some("get"), ); } }); if let Ok(mut clients) = CLIENTS.lock() { let client = ReadClient { rx_handle, rx: response_thread_rx, }; (*clients).insert(holder, client); return Ok(()); } return Err("could not access client".to_string()); } Err("could not successfully connect to the blob server".to_string()) } pub fn get_client_blocking_read_cxx( holder_char: *const c_char, -) -> Result, String> { - let holder = c_char_pointer_to_string(holder_char)?; - check_error(&ERROR_MESSAGES)?; - let response: Option> = RUNTIME.block_on(async { +) -> anyhow::Result, anyhow::Error> { + let holder = c_char_pointer_to_string_new(holder_char)?; + Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.get_mut(&holder); - if let Some(client) = maybe_client { + if let Some(client) = clients.get_mut(&holder) { let maybe_data = client.rx.recv().await; - let response = Some(maybe_data.unwrap_or_else(|| vec![])); - return response; + return Ok(maybe_data.unwrap_or_else(|| vec![])); } else { - report_error(&ERROR_MESSAGES, "no client present", Some("get")); + bail!(format!("no client present for {}", holder)); } } else { - report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); + bail!("couldn't access client"); } - None - }); - check_error(&ERROR_MESSAGES)?; - response.ok_or("response could not be obtained".to_string()) + })?) } pub fn get_client_terminate_cxx( holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { let holder = c_char_pointer_to_string_new(holder_char)?; if !is_initialized_new(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { match clients.remove(&holder) { Some(client) => { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { bail!(format!("awaiting for the client {} failed", holder)); } Ok(()) })?; } None => { bail!(format!("no client foudn for {}", holder)); } } } else { bail!("couldn't access client"); } if is_initialized_new(&holder)? { bail!("client transmitter handler released properly"); } Ok(()) }