Page MenuHomePhorge

D5029.1767464781.diff
No OneTemporary

Size
9 KB
Referenced Files
None
Subscribers
None

D5029.1767464781.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,6 +6,7 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use crate::tools::{report_error, check_error};
use lazy_static::lazy_static;
use libc;
use libc::c_char;
@@ -14,7 +15,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
struct ReadClient {
rx: mpsc::Receiver<Vec<u8>>,
@@ -35,29 +35,11 @@
return true;
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
false
}
-fn report_error(message: String) {
- println!("[RUST] [get] Error: {}", message);
- if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
- error_messages.push(message);
- }
- error!("could not access error messages");
-}
-
-fn check_error() -> Result<(), String> {
- if let Ok(errors) = ERROR_MESSAGES.lock() {
- return match errors.is_empty() {
- true => Ok(()),
- false => Err(errors.join("\n")),
- };
- }
- Err("could not access error messages".to_string())
-}
-
pub fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> Result<(), String> {
@@ -93,7 +75,11 @@
result = match response_thread_tx.send(data).await {
Ok(_) => true,
Err(err) => {
- report_error(err.to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ &err.to_string(),
+ Some("get"),
+ );
false
}
}
@@ -101,13 +87,17 @@
result
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("get"));
false
}
};
}
} else {
- report_error("couldn't perform grpc get operation".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "couldn't perform grpc get operation",
+ Some("get"),
+ );
}
});
@@ -125,7 +115,7 @@
pub fn get_client_blocking_read_cxx() -> Result<Vec<u8>, String> {
let mut response: Option<Vec<u8>> = None;
- check_error()?;
+ check_error(&ERROR_MESSAGES)?;
RUNTIME.block_on(async {
if let Ok(mut maybe_client) = CLIENT.lock() {
if let Some(mut client) = (*maybe_client).take() {
@@ -136,19 +126,19 @@
}
*maybe_client = Some(client);
} else {
- report_error("no client present".to_string());
+ report_error(&ERROR_MESSAGES, "no client present", Some("get"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
});
- check_error()?;
+ check_error(&ERROR_MESSAGES)?;
let response: Vec<u8> = response.unwrap();
Ok(response)
}
pub fn get_client_terminate_cxx() -> Result<(), String> {
- check_error()?;
+ check_error(&ERROR_MESSAGES)?;
if !is_initialized() {
return Ok(());
}
@@ -156,20 +146,24 @@
if let Some(client) = (*maybe_client).take() {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "wait for receiver handle failed",
+ Some("get"),
+ );
}
});
} else {
- report_error("no client detected".to_string());
+ report_error(&ERROR_MESSAGES, "no client detected", Some("get"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
assert!(
!is_initialized(),
"client transmitter handler released properly"
);
- check_error()?;
+ check_error(&ERROR_MESSAGES)?;
Ok(())
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -67,36 +67,30 @@
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
bool dataExists = false;
- try {
- put_client_initialize_cxx();
- put_client_write_cxx(
- tools::getBlobPutField(tools::BlobPutField::HOLDER),
- this->holder.c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
- put_client_write_cxx(
- tools::getBlobPutField(tools::BlobPutField::HASH),
- this->dataHash.c_str());
+ put_client_initialize_cxx();
+ put_client_write_cxx(
+ tools::getBlobPutField(tools::BlobPutField::HOLDER),
+ this->holder.c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ put_client_write_cxx(
+ tools::getBlobPutField(tools::BlobPutField::HASH),
+ this->dataHash.c_str());
- rust::String responseStr =
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably
- // want to delegate performing ops
- // to separate threads in the base
- // reactors
- // data exists?
- if ((bool)tools::charPtrToInt(responseStr.c_str())) {
- return std::make_unique<ServerBidiReactorStatus>(
- grpc::Status::OK, true);
- }
- } catch (std::exception &e) {
- throw std::runtime_error(
- e.what()); // todo in base reactors we can just handle std exception
- // instead of keep rethrowing here
+ rust::String responseStr =
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
+ // data exists?
+ if ((bool)tools::charPtrToInt(responseStr.c_str())) {
+ return std::make_unique<ServerBidiReactorStatus>(
+ grpc::Status::OK, true);
}
return nullptr;
}
@@ -104,21 +98,16 @@
if (request.mutable_newcompactionchunk()->empty()) {
return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
}
- try {
- put_client_write_cxx(
- tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK),
- std::string(std::move(*request.mutable_newcompactionchunk()))
- .c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
- } catch (std::exception &e) {
- throw std::runtime_error(
- e.what()); // todo in base reactors we can just handle std exception
- // instead of keep rethrowing here
- }
+ put_client_write_cxx(
+ tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK),
+ std::string(std::move(*request.mutable_newcompactionchunk()))
+ .c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+
return nullptr;
}
}
@@ -127,13 +116,7 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- try {
- put_client_terminate_cxx();
- } catch (std::exception &e) {
- throw std::runtime_error(
- e.what()); // todo in base reactors we can just handle std exception
- // instead of keep rethrowing here
- }
+ put_client_terminate_cxx();
// TODO add recovery data
// TODO handle attachments holders

File Metadata

Mime Type
text/plain
Expires
Sat, Jan 3, 6:26 PM (21 h, 6 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5890150
Default Alt Text
D5029.1767464781.diff (9 KB)

Event Timeline