Page MenuHomePhabricator

D5030.diff
No OneTemporary

D5030.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,6 +6,7 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use crate::tools::report_error;
use lazy_static::lazy_static;
use libc;
use libc::c_char;
@@ -14,7 +15,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
struct ReadClient {
rx: mpsc::Receiver<Vec<u8>>,
@@ -35,19 +35,11 @@
return true;
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
false
}
-fn report_error(message: String) {
- println!("[RUST] [get] Error: {}", message);
- if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
- error_messages.push(message);
- }
- error!("could not access error messages");
-}
-
fn check_error() -> Result<(), String> {
if let Ok(errors) = ERROR_MESSAGES.lock() {
return match errors.is_empty() {
@@ -89,7 +81,11 @@
result = match response_thread_tx.send(data).await {
Ok(_) => true,
Err(err) => {
- report_error(err.to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ &err.to_string(),
+ Some("get"),
+ );
false
}
}
@@ -99,13 +95,17 @@
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("get"));
break;
}
};
}
} else {
- report_error("couldn't perform grpc get operation".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "couldn't perform grpc get operation",
+ Some("get"),
+ );
}
});
@@ -131,10 +131,10 @@
*maybe_client = Some(client);
return response;
} else {
- report_error("no client present".to_string());
+ report_error(&ERROR_MESSAGES, "no client present", Some("get"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
None
});
@@ -152,7 +152,11 @@
if let Some(client) = (*maybe_client).take() {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "wait for receiver handle failed",
+ Some("get"),
+ );
}
});
} else {
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -1,6 +1,7 @@
mod constants;
mod get_client;
mod put_client;
+mod tools;
use put_client::{
put_client_blocking_read_cxx, put_client_initialize_cxx,
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -8,6 +8,7 @@
use proto::PutRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use crate::tools::report_error;
use lazy_static::lazy_static;
use libc;
use libc::c_char;
@@ -16,7 +17,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -34,6 +34,7 @@
lazy_static! {
static ref CLIENT: Arc<Mutex<Option<BidiClient>>> =
Arc::new(Mutex::new(None));
+ // todo we should probably create separate clients for different IDs
static ref RUNTIME: Runtime = Runtime::new().unwrap();
static ref ERROR_MESSAGES: Arc<Mutex<Vec<String>>> =
Arc::new(Mutex::new(Vec::new()));
@@ -43,21 +44,12 @@
match CLIENT.lock() {
Ok(client) => client.is_some(),
_ => {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
false
}
}
}
-fn report_error(message: String) {
- println!("[RUST] [put] Error: {}", message);
- if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
- error_messages.push(message);
- return;
- }
- error!("could not access error messages");
-}
-
fn check_error() -> Result<(), String> {
if let Ok(errors) = ERROR_MESSAGES.lock() {
return match errors.is_empty() {
@@ -87,7 +79,7 @@
match String::from_utf8(data.data) {
Ok(utf8_data) => Some(Holder(utf8_data)),
_ => {
- report_error("invalid utf-8".to_string());
+ report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
None
},
}
@@ -96,7 +88,7 @@
match String::from_utf8(data.data).ok() {
Some(utf8_data) => Some(BlobHash(utf8_data)),
None => {
- report_error("invalid utf-8".to_string());
+ report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
None
},
}
@@ -105,7 +97,11 @@
Some(DataChunk(data.data))
}
_ => {
- report_error(format!("invalid field index value {}", data.field_index));
+ report_error(
+ &ERROR_MESSAGES,
+ &format!("invalid field index value {}", data.field_index),
+ Some("put")
+ );
None
}
};
@@ -115,7 +111,11 @@
};
yield request;
} else {
- report_error("an error occured, aborting connection".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "an error occured, aborting connection",
+ Some("put")
+ );
break;
}
}
@@ -142,7 +142,11 @@
{
result = true;
} else {
- report_error("response queue full".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "response queue full",
+ Some("put"),
+ );
}
}
if !result {
@@ -150,14 +154,14 @@
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
break;
}
};
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
}
};
});
@@ -184,15 +188,17 @@
return Some(data);
} else {
report_error(
- "couldn't receive data via client's receiver".to_string(),
+ &ERROR_MESSAGES,
+ "couldn't receive data via client's receiver",
+ Some("put"),
);
}
*maybe_client = Some(client);
} else {
- report_error("no client detected".to_string());
+ report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
}
None
});
@@ -226,16 +232,18 @@
.await
{
Ok(_) => (),
- Err(err) => {
- report_error(format!("send data to receiver failed: {}", err))
- }
+ Err(err) => report_error(
+ &ERROR_MESSAGES,
+ &format!("send data to receiver failed: {}", err),
+ Some("put"),
+ ),
}
*maybe_client = Some(client);
} else {
- report_error("no client detected".to_string());
+ report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
}
});
check_error()?;
@@ -254,7 +262,11 @@
drop(client.tx);
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "wait for receiver handle failed",
+ Some("put"),
+ );
}
});
} else {
diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/blob_client/src/tools.rs
@@ -0,0 +1,18 @@
+use std::sync::{Arc, Mutex};
+use tracing::error;
+
+pub fn report_error(
+ error_messages: &Arc<Mutex<Vec<String>>>,
+ message: &str,
+ label_provided: Option<&str>,
+) {
+ let label = match label_provided {
+ Some(value) => format!("[{}]", value),
+ None => "".to_string(),
+ };
+ println!("[RUST] {} Error: {}", label, message);
+ if let Ok(mut error_messages_unpacked) = error_messages.lock() {
+ error_messages_unpacked.push(message.to_string());
+ }
+ error!("could not access error messages");
+}

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 2:10 AM (4 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2559364
Default Alt Text
D5030.diff (9 KB)

Event Timeline