diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,6 +6,7 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::report_error; use lazy_static::lazy_static; use libc; use libc::c_char; @@ -14,7 +15,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; struct ReadClient { rx: mpsc::Receiver>, @@ -35,19 +35,11 @@ return true; } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } false } -fn report_error(message: String) { - println!("[RUST] [get] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { @@ -89,7 +81,11 @@ result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { - report_error(err.to_string()); + report_error( + &ERROR_MESSAGES, + &err.to_string(), + Some("get"), + ); false } } @@ -99,13 +95,17 @@ } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); break; } }; } } else { - report_error("couldn't perform grpc get operation".to_string()); + report_error( + &ERROR_MESSAGES, + "couldn't perform grpc get operation", + Some("get"), + ); } }); @@ -131,10 +131,10 @@ *maybe_client = Some(client); return response; } else { - report_error("no client present".to_string()); + report_error(&ERROR_MESSAGES, "no client present", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } None }); @@ -152,7 +152,11 @@ if let Some(client) = (*maybe_client).take() { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("get"), + ); } }); } else { diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -1,6 +1,7 @@ mod constants; mod get_client; mod put_client; +mod tools; use put_client::{ put_client_blocking_read_cxx, put_client_initialize_cxx, diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -8,6 +8,7 @@ use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::report_error; use lazy_static::lazy_static; use libc; use libc::c_char; @@ -16,7 +17,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; #[derive(Debug)] struct PutRequestData { @@ -34,6 +34,7 @@ lazy_static! { static ref CLIENT: Arc>> = Arc::new(Mutex::new(None)); + // todo we should probably create separate clients for different IDs static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); @@ -43,21 +44,12 @@ match CLIENT.lock() { Ok(client) => client.is_some(), _ => { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); false } } } -fn report_error(message: String) { - println!("[RUST] [put] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - return; - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { @@ -87,7 +79,7 @@ match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } @@ -96,7 +88,7 @@ match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } @@ -105,7 +97,11 @@ Some(DataChunk(data.data)) } _ => { - report_error(format!("invalid field index value {}", data.field_index)); + report_error( + &ERROR_MESSAGES, + &format!("invalid field index value {}", data.field_index), + Some("put") + ); None } }; @@ -115,7 +111,11 @@ }; yield request; } else { - report_error("an error occured, aborting connection".to_string()); + report_error( + &ERROR_MESSAGES, + "an error occured, aborting connection", + Some("put") + ); break; } } @@ -142,7 +142,11 @@ { result = true; } else { - report_error("response queue full".to_string()); + report_error( + &ERROR_MESSAGES, + "response queue full", + Some("put"), + ); } } if !result { @@ -150,14 +154,14 @@ } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); break; } }; } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); } }; }); @@ -184,15 +188,17 @@ return Some(data); } else { report_error( - "couldn't receive data via client's receiver".to_string(), + &ERROR_MESSAGES, + "couldn't receive data via client's receiver", + Some("put"), ); } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } None }); @@ -226,16 +232,18 @@ .await { Ok(_) => (), - Err(err) => { - report_error(format!("send data to receiver failed: {}", err)) - } + Err(err) => report_error( + &ERROR_MESSAGES, + &format!("send data to receiver failed: {}", err), + Some("put"), + ), } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } }); check_error()?; @@ -254,7 +262,11 @@ drop(client.tx); RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("put"), + ); } }); } else { diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs new file mode 100644 --- /dev/null +++ b/services/backup/blob_client/src/tools.rs @@ -0,0 +1,18 @@ +use std::sync::{Arc, Mutex}; +use tracing::error; + +pub fn report_error( + error_messages: &Arc>>, + message: &str, + label_provided: Option<&str>, +) { + let label = match label_provided { + Some(value) => format!("[{}]", value), + None => "".to_string(), + }; + println!("[RUST] {} Error: {}", label, message); + if let Ok(mut error_messages_unpacked) = error_messages.lock() { + error_messages_unpacked.push(message.to_string()); + } + error!("could not access error messages"); +}