diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs index 6eed244ca..4d5cc4bd1 100644 --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -1,171 +1,175 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::report_error; use lazy_static::lazy_static; use libc; use libc::c_char; use std::ffi::CStr; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; struct ReadClient { rx: mpsc::Receiver>, rx_handle: JoinHandle<()>, } lazy_static! { static ref CLIENT: Arc>> = Arc::new(Mutex::new(None)); static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); } fn is_initialized() -> bool { if let Ok(client) = CLIENT.lock() { if client.is_some() { return true; } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } false } -fn report_error(message: String) { - println!("[RUST] [get] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { true => Ok(()), false => Err(errors.join("\n")), }; } Err("could not access error messages".to_string()) } pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { if is_initialized() { get_client_terminate_cxx()?; } assert!(!is_initialized(), "client cannot be initialized twice"); let holder_cstr: &CStr = unsafe { CStr::from_ptr(holder_char) }; let holder: String = holder_cstr.to_str().unwrap().to_owned(); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); let rx_handle = RUNTIME.spawn(async move { if let Ok(response) = grpc_client.get(GetRequest { holder }).await { let mut inner_response = response.into_inner(); loop { match inner_response.message().await { Ok(maybe_data) => { let mut result = false; if let Some(data) = maybe_data { let data: Vec = data.data_chunk; result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { - report_error(err.to_string()); + report_error( + &ERROR_MESSAGES, + &err.to_string(), + Some("get"), + ); false } } } if !result { break; } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); break; } }; } } else { - report_error("couldn't perform grpc get operation".to_string()); + report_error( + &ERROR_MESSAGES, + "couldn't perform grpc get operation", + Some("get"), + ); } }); if let Ok(mut client) = CLIENT.lock() { *client = Some(ReadClient { rx_handle, rx: response_thread_rx, }); return Ok(()); } return Err("could not access client".to_string()); } Err("could not successfully connect to the blob server".to_string()) } pub fn get_client_blocking_read_cxx() -> Result, String> { check_error()?; let response: Option> = RUNTIME.block_on(async { if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(mut client) = (*maybe_client).take() { let maybe_data = client.rx.recv().await; let response = Some(maybe_data.unwrap_or_else(|| vec![])); *maybe_client = Some(client); return response; } else { - report_error("no client present".to_string()); + report_error(&ERROR_MESSAGES, "no client present", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } None }); check_error()?; response.ok_or("response could not be obtained".to_string()) } pub fn get_client_terminate_cxx() -> Result<(), String> { check_error()?; if !is_initialized() { check_error()?; return Ok(()); } if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(client) = (*maybe_client).take() { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("get"), + ); } }); } else { return Err("no client detected".to_string()); } } else { return Err("couldn't access client".to_string()); } assert!( !is_initialized(), "client transmitter handler released properly" ); check_error()?; Ok(()) } diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs index 0bc932099..5a091686f 100644 --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -1,30 +1,31 @@ mod constants; mod get_client; mod put_client; +mod tools; use put_client::{ put_client_blocking_read_cxx, put_client_initialize_cxx, put_client_terminate_cxx, put_client_write_cxx, }; use get_client::{ get_client_blocking_read_cxx, get_client_initialize_cxx, get_client_terminate_cxx, }; #[cxx::bridge] mod ffi { extern "Rust" { fn put_client_initialize_cxx() -> Result<()>; unsafe fn put_client_write_cxx( field_index: usize, data: *const c_char, ) -> Result<()>; fn put_client_blocking_read_cxx() -> Result; fn put_client_terminate_cxx() -> Result<()>; unsafe fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<()>; fn get_client_blocking_read_cxx() -> Result>; fn get_client_terminate_cxx() -> Result<()>; } } diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs index 8f6a0a7f1..519aedc9d 100644 --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -1,273 +1,285 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::put_request; use proto::put_request::Data::*; use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::report_error; use lazy_static::lazy_static; use libc; use libc::c_char; use std::ffi::CStr; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; #[derive(Debug)] struct PutRequestData { field_index: usize, data: Vec, } struct BidiClient { tx: mpsc::Sender, rx: mpsc::Receiver, rx_handle: JoinHandle<()>, } lazy_static! { static ref CLIENT: Arc>> = Arc::new(Mutex::new(None)); + // todo we should probably create separate clients for different IDs static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); } fn is_initialized() -> bool { match CLIENT.lock() { Ok(client) => client.is_some(), _ => { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); false } } } -fn report_error(message: String) { - println!("[RUST] [put] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - return; - } - error!("could not access error messages"); -} - fn check_error() -> Result<(), String> { if let Ok(errors) = ERROR_MESSAGES.lock() { return match errors.is_empty() { true => Ok(()), false => Err(errors.join("\n")), }; } Err("could not access error messages".to_string()) } pub fn put_client_initialize_cxx() -> Result<(), String> { if is_initialized() { put_client_terminate_cxx()?; } assert!(!is_initialized(), "client cannot be initialized twice"); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { let (request_thread_tx, mut request_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let outbound = async_stream::stream! { while let Some(data) = request_thread_rx.recv().await { let request_data: Option = match data.field_index { 1 => { match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } } 2 => { match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { - report_error("invalid utf-8".to_string()); + report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } } 3 => { Some(DataChunk(data.data)) } _ => { - report_error(format!("invalid field index value {}", data.field_index)); + report_error( + &ERROR_MESSAGES, + &format!("invalid field index value {}", data.field_index), + Some("put") + ); None } }; if let Some (unpacked_data) = request_data { let request = PutRequest { data: Some(unpacked_data), }; yield request; } else { - report_error("an error occured, aborting connection".to_string()); + report_error( + &ERROR_MESSAGES, + "an error occured, aborting connection", + Some("put") + ); break; } } }; // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let rx_handle = RUNTIME.spawn(async move { match grpc_client.put(tonic::Request::new(outbound)).await { Ok(response) => { let mut inner_response = response.into_inner(); loop { match inner_response.message().await { Ok(maybe_response_message) => { let mut result = false; if let Some(response_message) = maybe_response_message { // warning: this will produce an error if there's more unread responses than // MPSC_CHANNEL_BUFFER_CAPACITY // you should then use put_client_blocking_read_cxx in order to dequeue // the responses in c++ and make room for more if let Ok(_) = response_thread_tx .try_send((response_message.data_exists as i32).to_string()) { result = true; } else { - report_error("response queue full".to_string()); + report_error( + &ERROR_MESSAGES, + "response queue full", + Some("put"), + ); } } if !result { break; } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); break; } }; } } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); } }; }); if let Ok(mut client) = CLIENT.lock() { *client = Some(BidiClient { tx: request_thread_tx, rx: response_thread_rx, rx_handle, }); return Ok(()); } return Err("could not access client".to_string()); } Err("could not successfully connect to the blob server".to_string()) } pub fn put_client_blocking_read_cxx() -> Result { check_error()?; let response: Option = RUNTIME.block_on(async { if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(mut client) = (*maybe_client).take() { if let Some(data) = client.rx.recv().await { return Some(data); } else { report_error( - "couldn't receive data via client's receiver".to_string(), + &ERROR_MESSAGES, + "couldn't receive data via client's receiver", + Some("put"), ); } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } None }); check_error()?; response.ok_or("response not received properly".to_string()) } /** * field index: * 1 - holder (utf8 string) * 2 - blob hash (utf8 string) * 3 - data chunk (bytes) */ pub fn put_client_write_cxx( field_index: usize, data: *const c_char, ) -> Result<(), String> { check_error()?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); RUNTIME.block_on(async { if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(client) = (*maybe_client).take() { match client .tx .send(PutRequestData { field_index, data: data_bytes, }) .await { Ok(_) => (), - Err(err) => { - report_error(format!("send data to receiver failed: {}", err)) - } + Err(err) => report_error( + &ERROR_MESSAGES, + &format!("send data to receiver failed: {}", err), + Some("put"), + ), } *maybe_client = Some(client); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("put")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } }); check_error()?; Ok(()) } pub fn put_client_terminate_cxx() -> Result<(), String> { check_error()?; if !is_initialized() { check_error()?; return Ok(()); } if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(client) = (*maybe_client).take() { drop(client.tx); RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("put"), + ); } }); } else { return Err("no client detected".to_string()); } } else { return Err("couldn't access client".to_string()); } assert!( !is_initialized(), "client transmitter handler released properly" ); check_error()?; Ok(()) } diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs new file mode 100644 index 000000000..720f325b6 --- /dev/null +++ b/services/backup/blob_client/src/tools.rs @@ -0,0 +1,18 @@ +use std::sync::{Arc, Mutex}; +use tracing::error; + +pub fn report_error( + error_messages: &Arc>>, + message: &str, + label_provided: Option<&str>, +) { + let label = match label_provided { + Some(value) => format!("[{}]", value), + None => "".to_string(), + }; + println!("[RUST] {} Error: {}", label, message); + if let Ok(mut error_messages_unpacked) = error_messages.lock() { + error_messages_unpacked.push(message.to_string()); + } + error!("could not access error messages"); +}