diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -7,7 +7,7 @@ use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ - c_char_pointer_to_string_new, string_to_c_char_pointer_new, + c_char_pointer_to_string, string_to_c_char_pointer, }; use anyhow::bail; use lazy_static::lazy_static; @@ -44,9 +44,9 @@ pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string_new(holder_char)?; + let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder)? { - get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?; + get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; } if is_initialized(&holder)? { @@ -103,7 +103,7 @@ pub fn get_client_blocking_read_cxx( holder_char: *const c_char, ) -> anyhow::Result, anyhow::Error> { - let holder = c_char_pointer_to_string_new(holder_char)?; + let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { if let Some(client) = clients.get_mut(&holder) { @@ -121,7 +121,7 @@ pub fn get_client_terminate_cxx( holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string_new(holder_char)?; + let holder = c_char_pointer_to_string(holder_char)?; if !is_initialized(&holder)? { return Ok(()); } diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -9,7 +9,7 @@ use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ - c_char_pointer_to_string, c_char_pointer_to_string_new, report_error, string_to_c_char_pointer_new, + c_char_pointer_to_string, string_to_c_char_pointer, }; use anyhow::bail; use lazy_static::lazy_static; @@ -32,7 +32,7 @@ tx: mpsc::Sender, rx: mpsc::Receiver, - rx_handle: JoinHandle<()>, //>, + rx_handle: JoinHandle>, } lazy_static! { @@ -45,17 +45,7 @@ Mutex::new(Vec::new()); } -fn is_initialized(holder: &str) -> bool { - match CLIENTS.lock() { - Ok(clients) => clients.contains_key(holder), - _ => { - report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); - false - } - } -} - -fn is_initialized_new(holder: &str) -> anyhow::Result { +fn is_initialized(holder: &str) -> anyhow::Result { return Ok(match CLIENTS.lock() { Ok(clients) => clients.contains_key(holder), _ => { @@ -66,15 +56,14 @@ pub fn put_client_initialize_cxx( holder_char: *const c_char, -) -> Result<(), String> { +) -> anyhow::Result<(), anyhow::Error> { let holder = c_char_pointer_to_string(holder_char)?; - if is_initialized(&holder) { - // put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + if is_initialized(&holder)? { + put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + } + if is_initialized(&holder)? { + bail!("client cannot be initialized twice"); } - assert!( - !is_initialized(&holder), - "client cannot be initialized twice" - ); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) @@ -83,14 +72,15 @@ mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let outbound = async_stream::stream! { + let mut maybe_error: Option = None; while let Some(data) = request_thread_rx.recv().await { let request_data: Option = match data.field_index { 1 => { match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { - report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); - None + maybe_error = Some("invalid utf-8".to_string()); + break; }, } } @@ -98,8 +88,8 @@ match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { - report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); - None + maybe_error = Some("invalid utf-8".to_string()); + break; }, } } @@ -107,12 +97,8 @@ Some(DataChunk(data.data)) } _ => { - report_error( - &ERROR_MESSAGES, - &format!("invalid field index value {}", data.field_index), - Some("put") - ); - None + maybe_error = Some(format!("invalid field index value {}", data.field_index)); + break; } }; if let Some (unpacked_data) = request_data { @@ -121,14 +107,14 @@ }; yield request; } else { - report_error( - &ERROR_MESSAGES, - "an error occured, aborting connection", - Some("put") - ); + maybe_error = Some("an error occured, aborting connection".to_string()); break; } } + if let Some(error) = maybe_error { + // todo consider handling this differently + println!("an error occured in the stream: {}", error); + } }; // spawn receiver thread @@ -152,11 +138,7 @@ { result = true; } else { - report_error( - &ERROR_MESSAGES, - "response queue full", - Some("put"), - ); + bail!("response queue full"); } } if !result { @@ -164,20 +146,20 @@ } } Err(err) => { - report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); - break; + bail!(err.to_string()); } }; } } Err(err) => { - report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); + bail!(err.to_string()); } }; + Ok(()) }); - if is_initialized(&holder) { - return Err(format!( + if is_initialized(&holder)? { + bail!(format!( "client initialization overlapped for holder {}", holder )); @@ -191,15 +173,15 @@ (*clients).insert(holder, client); return Ok(()); } - return Err(format!("could not access client for holder {}", holder)); + bail!(format!("could not access client for holder {}", holder)); } - Err("could not successfully connect to the blob server".to_string()) + bail!("could not successfully connect to the blob server"); } pub fn put_client_blocking_read_cxx( holder_char: *const c_char, ) -> anyhow::Result { - let holder = c_char_pointer_to_string_new(holder_char)?; + let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { let maybe_client = clients.get_mut(&holder); @@ -229,7 +211,7 @@ field_index: usize, data: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string_new(holder_char)?; + let holder = c_char_pointer_to_string(holder_char)?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); @@ -257,8 +239,8 @@ pub fn put_client_terminate_cxx( holder_char: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string_new(holder_char)?; - if !is_initialized_new(&holder)? { + let holder = c_char_pointer_to_string(holder_char)?; + if !is_initialized(&holder)? { return Ok(()); } @@ -267,10 +249,7 @@ if let Some(client) = maybe_client { drop(client.tx); RUNTIME.block_on(async { - if client.rx_handle.await.is_err() { - bail!(format!("awaiting for the client {} failed", holder)); - } - Ok(()) + client.rx_handle.await? })?; } else { bail!("no client detected in terminate"); @@ -279,7 +258,7 @@ bail!("couldn't access client"); } - if is_initialized_new(&holder)? { + if is_initialized(&holder)? { bail!("client transmitter handler released properly"); } Ok(()) diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs --- a/services/backup/blob_client/src/tools.rs +++ b/services/backup/blob_client/src/tools.rs @@ -1,42 +1,14 @@ use libc::c_char; use std::ffi::{CStr, CString}; -use std::sync::Mutex; -use tracing::error; - -pub fn report_error( - error_messages: &Mutex>, - message: &str, - label_provided: Option<&str>, -) { - let label = match label_provided { - Some(value) => format!("[{}]", value), - None => "".to_string(), - }; - println!("[RUST] {} Error: {}", label, message); - if let Ok(mut error_messages_unpacked) = error_messages.lock() { - error_messages_unpacked.push(message.to_string()); - } - error!("could not access error messages"); -} pub fn c_char_pointer_to_string( c_char_pointer: *const c_char, -) -> Result { - let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) }; - match holder_cstr.to_str() { - Ok(result) => Ok(result.to_owned()), - Err(err) => Err(err.to_string()), - } -} - -pub fn c_char_pointer_to_string_new( - c_char_pointer: *const c_char, ) -> anyhow::Result { let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) }; Ok(holder_cstr.to_str()?.to_owned()) } -pub fn string_to_c_char_pointer_new( +pub fn string_to_c_char_pointer( signs: &String, ) -> anyhow::Result<*const c_char, anyhow::Error> { Ok(CString::new((&signs).as_bytes())?.as_ptr())