diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -10,13 +10,14 @@ use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use lazy_static::lazy_static; +use libc; +use libc::c_char; +use std::ffi::CStr; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use libc; -use libc::c_char; -use std::ffi::CStr; +use tracing::error; #[derive(Debug)] struct PutRequestData { @@ -62,6 +63,7 @@ } fn report_error(message: String) { + error!("[RUST] Error: {}", message); ERROR_MESSAGES .lock() .expect("access error messages") @@ -123,7 +125,7 @@ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let receiver_handle = RUNTIME.spawn(async move { println!("[RUST] [receiver_thread] begin"); - let response: Option< + let maybe_response: Option< tonic::Response>, > = match grpc_client .expect("access grpc client") @@ -133,35 +135,49 @@ Ok(res) => Some(res), Err(err) => { report_error(err.to_string()); - println!("ERROR!! {}", err.to_string()); None } }; - if response.is_none() { + if maybe_response.is_none() { return; } - let mut inbound = response.unwrap().into_inner(); - loop { - let response: Option = match inbound.message().await { - Ok(res) => res, - Err(err) => { - report_error(err.to_string()); - println!("ERROR!! {}", err.to_string()); - None + match maybe_response { + Some(response) => { + let mut inner_response = response.into_inner(); + let mut response_present = true; + while response_present { + response_present = match inner_response.message().await { + Ok(maybe_response_message) => { + let mut result = false; + if let Some(response_message) = maybe_response_message { + println!( + "[RUST] got response: {}", + response_message.data_exists + ); + // warning: this will hang if there's more unread responses than + // MPSC_CHANNEL_BUFFER_CAPACITY + // you should then use put_client_blocking_read_cxx in order to dequeue + // the responses in c++ and make room for more + if let Ok(_) = response_thread_tx + .send((response_message.data_exists as i32).to_string()) + .await + { + result = true; + } + } + result + } + Err(err) => { + report_error(err.to_string()); + false + } + }; } - }; - if response.is_none() { - break; } - let response: PutResponse = response.unwrap(); - println!("[RUST] got response: {}", response.data_exists); - // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY - // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more - response_thread_tx - .send((response.data_exists as i32).to_string()) - .await - .unwrap(); - } + unexpected => { + report_error(format!("unexpected result received: {:?}", unexpected)); + } + }; println!("[RUST] [receiver_thread] done"); }); @@ -188,10 +204,7 @@ } CLIENT.lock().expect("access client").rx = Some(rx); }); - if response.is_none() { - return Err("response not received properly".to_string()); - } - Ok(response.unwrap()) + response.ok_or("response not received properly".to_string()) } pub fn put_client_write_cxx( @@ -212,7 +225,10 @@ .tx .as_ref() .expect("access client's transmitter") - .send(PutRequestData{field_index, data: data_bytes}) + .send(PutRequestData { + field_index, + data: data_bytes, + }) .await .expect("send data to receiver"); }); @@ -225,16 +241,18 @@ pub fn put_client_terminate_cxx() -> Result<(), String> { println!("[RUST] put_client_terminating"); check_error()?; - let receiver_handle = CLIENT - .lock() - .expect("access client") - .receiver_handle - .take() - .unwrap(); - drop(CLIENT.lock().expect("access client").tx.take().unwrap()); - RUNTIME.block_on(async { - receiver_handle.await.unwrap(); - }); + if let Some(receiver_handle) = + CLIENT.lock().expect("access client").receiver_handle.take() + { + if let Some(tx) = CLIENT.lock().expect("access client").tx.take() { + drop(tx); + } + RUNTIME.block_on(async { + if receiver_handle.await.is_err() { + report_error("wait for receiver handle failed".to_string()); + } + }); + } assert!( !is_initialized(),