diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,9 +6,7 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{ - c_char_pointer_to_string, string_to_c_char_pointer, -}; +use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use lazy_static::lazy_static; use libc; @@ -30,8 +28,6 @@ static ref CLIENTS: Arc>> = Arc::new(Mutex::new(HashMap::new())); static ref RUNTIME: Runtime = Runtime::new().unwrap(); - static ref ERROR_MESSAGES: Arc>> = - Arc::new(Mutex::new(Vec::new())); } fn is_initialized(holder: &String) -> anyhow::Result { diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -8,9 +8,7 @@ use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{ - c_char_pointer_to_string, string_to_c_char_pointer, -}; +use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use lazy_static::lazy_static; use libc; @@ -41,8 +39,6 @@ static ref CLIENTS: Arc>> = Arc::new(Mutex::new(HashMap::new())); static ref RUNTIME: Runtime = Runtime::new().unwrap(); - static ref ERROR_MESSAGES: Arc>> = - Arc::new(Mutex::new(Vec::new())); } fn is_initialized(holder: &String) -> anyhow::Result { @@ -72,13 +68,15 @@ mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let outbound = async_stream::stream! { + let mut maybe_error: Option = None; while let Some(data) = request_thread_rx.recv().await { let request_data: Option = match data.field_index { 1 => { match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { - panic!("invalid utf-8"); + maybe_error = Some("invalid utf-8".to_string()); + break; }, } } @@ -86,7 +84,8 @@ match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { - panic!("invalid utf-8"); + maybe_error = Some("invalid utf-8".to_string()); + break; }, } } @@ -94,7 +93,8 @@ Some(DataChunk(data.data)) } _ => { - panic!("invalid field index value {}", data.field_index); + maybe_error = Some(format!("invalid field index value {}", data.field_index)); + break; } }; if let Some (unpacked_data) = request_data { @@ -103,9 +103,14 @@ }; yield request; } else { - panic!("an error occured, aborting connection"); + maybe_error = Some("an error occured, aborting connection".to_string()); + break; } } + if let Some(error) = maybe_error { + // todo consider handling this differently + println!("an error occured in the stream: {}", error); + } }; // spawn receiver thread @@ -116,30 +121,24 @@ Ok(response) => { let mut inner_response = response.into_inner(); loop { - match inner_response.message().await { - Ok(maybe_response_message) => { - let mut result = false; - if let Some(response_message) = maybe_response_message { - // warning: this will produce an error if there's more unread - // responses than MPSC_CHANNEL_BUFFER_CAPACITY - // you should then use put_client_blocking_read_cxx in order - // to dequeue the responses in c++ and make room for more - if let Ok(_) = response_thread_tx - .try_send((response_message.data_exists as i32).to_string()) - { - result = true; - } else { - bail!("response queue full"); - } - } - if !result { - break; - } - } - Err(err) => { - bail!(err.to_string()); + let maybe_response_message = inner_response.message().await?; + let mut result = false; + if let Some(response_message) = maybe_response_message { + // warning: this will produce an error if there's more unread + // responses than MPSC_CHANNEL_BUFFER_CAPACITY + // you should then use put_client_blocking_read_cxx in order + // to dequeue the responses in c++ and make room for more + if let Ok(_) = response_thread_tx + .try_send((response_message.data_exists as i32).to_string()) + { + result = true; + } else { + bail!("response queue full"); } - }; + } + if !result { + break; + } } } Err(err) => { @@ -183,7 +182,10 @@ bail!("couldn't receive data via client's receiver"); } } else { - bail!(format!("no client detected for {} in blocking read", holder)); + bail!(format!( + "no client detected for {} in blocking read", + holder + )); } } else { bail!("couldn't access clients"); @@ -239,9 +241,7 @@ let maybe_client = clients.remove(&holder); if let Some(client) = maybe_client { drop(client.tx); - RUNTIME.block_on(async { - client.rx_handle.await? - })?; + RUNTIME.block_on(async { client.rx_handle.await? })?; } else { bail!("no client detected in terminate"); }