diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs index 967306422..6eed244ca 100644 --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -1,147 +1,171 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use lazy_static::lazy_static; use libc; use libc::c_char; use std::ffi::CStr; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tracing::error; struct ReadClient { rx: mpsc::Receiver>, rx_handle: JoinHandle<()>, } lazy_static! { static ref CLIENT: Arc>> = Arc::new(Mutex::new(None)); static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); } fn is_initialized() -> bool { if let Ok(client) = CLIENT.lock() { if client.is_some() { return true; } } else { report_error("couldn't access client".to_string()); } false } fn report_error(message: String) { println!("[RUST] [get] Error: {}", message); if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { error_messages.push(message); } error!("could not access error messages"); } fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { true => Ok(()), false => Err(errors.join("\n")), }; } Err("could not access error messages".to_string()) } pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { if is_initialized() { get_client_terminate_cxx()?; } assert!(!is_initialized(), "client cannot be initialized twice"); let holder_cstr: &CStr = unsafe { CStr::from_ptr(holder_char) }; let holder: String = holder_cstr.to_str().unwrap().to_owned(); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); let rx_handle = RUNTIME.spawn(async move { if let Ok(response) = grpc_client.get(GetRequest { holder }).await { let mut inner_response = response.into_inner(); loop { match inner_response.message().await { Ok(maybe_data) => { let mut result = false; if let Some(data) = maybe_data { let data: Vec = data.data_chunk; result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { report_error(err.to_string()); false } } } if !result { break; } } Err(err) => { report_error(err.to_string()); break; } }; } } else { report_error("couldn't perform grpc get operation".to_string()); } }); if let Ok(mut client) = CLIENT.lock() { *client = Some(ReadClient { rx_handle, rx: response_thread_rx, }); return Ok(()); } return Err("could not access client".to_string()); } Err("could not successfully connect to the blob server".to_string()) } pub fn get_client_blocking_read_cxx() -> Result, String> { check_error()?; let response: Option> = RUNTIME.block_on(async { if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(mut client) = (*maybe_client).take() { let maybe_data = client.rx.recv().await; let response = Some(maybe_data.unwrap_or_else(|| vec![])); *maybe_client = Some(client); return response; } else { report_error("no client present".to_string()); } } else { report_error("couldn't access client".to_string()); } None }); check_error()?; response.ok_or("response could not be obtained".to_string()) } pub fn get_client_terminate_cxx() -> Result<(), String> { - unimplemented!(); + check_error()?; + if !is_initialized() { + check_error()?; + return Ok(()); + } + if let Ok(mut maybe_client) = CLIENT.lock() { + if let Some(client) = (*maybe_client).take() { + RUNTIME.block_on(async { + if client.rx_handle.await.is_err() { + report_error("wait for receiver handle failed".to_string()); + } + }); + } else { + return Err("no client detected".to_string()); + } + } else { + return Err("couldn't access client".to_string()); + } + + assert!( + !is_initialized(), + "client transmitter handler released properly" + ); + check_error()?; + Ok(()) }