diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,6 +6,7 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::{report_error}; use lazy_static::lazy_static; use libc; use libc::c_char; @@ -14,7 +15,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; struct ReadClient { rx: mpsc::Receiver>, @@ -35,19 +35,11 @@ return true; } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } false } -fn report_error(message: String) { - println!("[RUST] [get] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { @@ -93,7 +85,11 @@ result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { - report_error(err.to_string()); + report_error( + &ERROR_MESSAGES, + &err.to_string(), + Some("get"), + ); false } } @@ -101,13 +97,17 @@ result } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); false } }; } } else { - report_error("couldn't perform grpc get operation".to_string()); + report_error( + &ERROR_MESSAGES, + "couldn't perform grpc get operation", + Some("get"), + ); } }); @@ -136,10 +136,10 @@ } *maybe_client = Some(client); } else { - report_error("no client present".to_string()); + report_error(&ERROR_MESSAGES, "no client present", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } }); check_error()?; @@ -156,14 +156,18 @@ if let Some(client) = (*maybe_client).take() { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("get"), + ); } }); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } assert!( diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -1,6 +1,7 @@ mod constants; mod get_client; mod put_client; +mod tools; use put_client::{ put_client_blocking_read_cxx, put_client_initialize_cxx, diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -9,6 +9,7 @@ use proto::PutResponse; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::report_error; use lazy_static::lazy_static; use libc; use libc::c_char; @@ -17,7 +18,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; #[derive(Debug)] struct PutRequestData { @@ -35,6 +35,7 @@ lazy_static! { static ref CLIENT: Arc>> = Arc::new(Mutex::new(None)); + // todo we should probably create separate clients for different IDs static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); @@ -46,20 +47,11 @@ return true; } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } false } -fn report_error(message: String) { - println!("[RUST] [put] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - return; - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { @@ -91,7 +83,7 @@ match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(Holder(utf8_data)), None => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } @@ -100,7 +92,7 @@ match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } @@ -109,7 +101,7 @@ Some(DataChunk(data.data)) } _ => { - report_error(format!("invalid field index value {}", data.field_index)); + report_error(&ERROR_MESSAGES, &format!("invalid field index value {}", data.field_index), Some("put")); None } }; @@ -119,7 +111,7 @@ }; yield request; } else { - report_error("an error occured, aborting connection".to_string()); + report_error(&ERROR_MESSAGES, "an error occured, aborting connection", Some("put")); break; } } @@ -136,7 +128,7 @@ > = match grpc_client.put(tonic::Request::new(outbound)).await { Ok(res) => Some(res), Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); None } }; @@ -158,20 +150,28 @@ { result = true; } else { - report_error("response queue full".to_string()); + report_error( + &ERROR_MESSAGES, + "response queue full", + Some("put"), + ); } } result } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); false } }; } } None => { - report_error(format!("unexpected result received")); + report_error( + &ERROR_MESSAGES, + &format!("unexpected result received"), + Some("put"), + ); return; } }; @@ -200,15 +200,17 @@ response = Some(data); } else { report_error( - "couldn't receive data via client's receiver".to_string(), + &ERROR_MESSAGES, + "couldn't receive data via client's receiver", + Some("put"), ); } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } }); check_error()?; @@ -241,16 +243,18 @@ .await { Ok(_) => (), - Err(err) => { - report_error(format!("send data to receiver failed: {}", err)) - } + Err(err) => report_error( + &ERROR_MESSAGES, + &format!("send data to receiver failed: {}", err), + Some("put"), + ), } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } }); Ok(()) @@ -267,14 +271,18 @@ drop(client.tx); RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("put"), + ); } }); } else { return Err("no client detected".to_string()); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } assert!( diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs new file mode 100644 --- /dev/null +++ b/services/backup/blob_client/src/tools.rs @@ -0,0 +1,18 @@ +use std::sync::{Arc, Mutex}; +use tracing::error; + +pub fn report_error( + error_messages: &Arc>>, + message: &str, + label_provided: Option<&str>, +) { + let label = match label_provided { + Some(value) => format!("[{}]", value), + None => "".to_string(), + }; + println!("[RUST] {} Error: {}", label, message); + if let Ok(mut error_messages_unpacked) = error_messages.lock() { + error_messages_unpacked.push(message.to_string()); + } + error!("could not access error messages"); +}