diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,6 +6,7 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use crate::tools::{report_error, check_error}; use lazy_static::lazy_static; use libc; use libc::c_char; @@ -14,7 +15,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; struct ReadClient { rx: mpsc::Receiver>, @@ -35,29 +35,11 @@ return true; } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } false } -fn report_error(message: String) { - println!("[RUST] [get] Error: {}", message); - if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { - error_messages.push(message); - } - error!("could not access error messages"); -} - -fn check_error() -> Result<(), String> { - if let Ok(errors) = ERROR_MESSAGES.lock() { - return match errors.is_empty() { - true => Ok(()), - false => Err(errors.join("\n")), - }; - } - Err("could not access error messages".to_string()) -} - pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { @@ -93,7 +75,11 @@ result = match response_thread_tx.send(data).await { Ok(_) => true, Err(err) => { - report_error(err.to_string()); + report_error( + &ERROR_MESSAGES, + &err.to_string(), + Some("get"), + ); false } } @@ -101,13 +87,17 @@ result } Err(err) => { - report_error(err.to_string()); + report_error(&ERROR_MESSAGES, &err.to_string(), Some("get")); false } }; } } else { - report_error("couldn't perform grpc get operation".to_string()); + report_error( + &ERROR_MESSAGES, + "couldn't perform grpc get operation", + Some("get"), + ); } }); @@ -125,7 +115,7 @@ pub fn get_client_blocking_read_cxx() -> Result, String> { let mut response: Option> = None; - check_error()?; + check_error(&ERROR_MESSAGES)?; RUNTIME.block_on(async { if let Ok(mut maybe_client) = CLIENT.lock() { if let Some(mut client) = (*maybe_client).take() { @@ -136,19 +126,19 @@ } *maybe_client = Some(client); } else { - report_error("no client present".to_string()); + report_error(&ERROR_MESSAGES, "no client present", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } }); - check_error()?; + check_error(&ERROR_MESSAGES)?; let response: Vec = response.unwrap(); Ok(response) } pub fn get_client_terminate_cxx() -> Result<(), String> { - check_error()?; + check_error(&ERROR_MESSAGES)?; if !is_initialized() { return Ok(()); } @@ -156,20 +146,24 @@ if let Some(client) = (*maybe_client).take() { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { - report_error("wait for receiver handle failed".to_string()); + report_error( + &ERROR_MESSAGES, + "wait for receiver handle failed", + Some("get"), + ); } }); } else { - report_error("no client detected".to_string()); + report_error(&ERROR_MESSAGES, "no client detected", Some("get")); } } else { - report_error("couldn't access client".to_string()); + report_error(&ERROR_MESSAGES, "couldn't access client", Some("get")); } assert!( !is_initialized(), "client transmitter handler released properly" ); - check_error()?; + check_error(&ERROR_MESSAGES)?; Ok(()) } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -67,36 +67,30 @@ response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); bool dataExists = false; - try { - put_client_initialize_cxx(); - put_client_write_cxx( - tools::getBlobPutField(tools::BlobPutField::HOLDER), - this->holder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors - put_client_write_cxx( - tools::getBlobPutField(tools::BlobPutField::HASH), - this->dataHash.c_str()); + put_client_initialize_cxx(); + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::HOLDER), + this->holder.c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::HASH), + this->dataHash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors - // data exists? - if ((bool)tools::charPtrToInt(responseStr.c_str())) { - return std::make_unique( - grpc::Status::OK, true); - } - } catch (std::exception &e) { - throw std::runtime_error( - e.what()); // todo in base reactors we can just handle std exception - // instead of keep rethrowing here + rust::String responseStr = + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors + // data exists? + if ((bool)tools::charPtrToInt(responseStr.c_str())) { + return std::make_unique( + grpc::Status::OK, true); } return nullptr; } @@ -104,21 +98,16 @@ if (request.mutable_newcompactionchunk()->empty()) { return std::make_unique(grpc::Status::OK); } - try { - put_client_write_cxx( - tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), - std::string(std::move(*request.mutable_newcompactionchunk())) - .c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors - } catch (std::exception &e) { - throw std::runtime_error( - e.what()); // todo in base reactors we can just handle std exception - // instead of keep rethrowing here - } + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), + std::string(std::move(*request.mutable_newcompactionchunk())) + .c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + return nullptr; } } @@ -127,13 +116,7 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - try { - put_client_terminate_cxx(); - } catch (std::exception &e) { - throw std::runtime_error( - e.what()); // todo in base reactors we can just handle std exception - // instead of keep rethrowing here - } + put_client_terminate_cxx(); // TODO add recovery data // TODO handle attachments holders