Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3332841
D5030.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Referenced Files
None
Subscribers
None
D5030.diff
View Options
diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,6 +6,7 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use crate::tools::report_error;
use lazy_static::lazy_static;
use libc;
use libc::c_char;
@@ -14,7 +15,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
struct ReadClient {
rx: mpsc::Receiver<Vec<u8>>,
@@ -35,19 +35,11 @@
return true;
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
false
}
-fn report_error(message: String) {
- println!("[RUST] [get] Error: {}", message);
- if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
- error_messages.push(message);
- }
- error!("could not access error messages");
-}
-
fn check_error() -> Result<(), String> {
if let Ok(errors) = ERROR_MESSAGES.lock() {
return match errors.is_empty() {
@@ -89,7 +81,11 @@
result = match response_thread_tx.send(data).await {
Ok(_) => true,
Err(err) => {
- report_error(err.to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ &err.to_string(),
+ Some("get"),
+ );
false
}
}
@@ -99,13 +95,17 @@
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("get"));
break;
}
};
}
} else {
- report_error("couldn't perform grpc get operation".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "couldn't perform grpc get operation",
+ Some("get"),
+ );
}
});
@@ -131,10 +131,10 @@
*maybe_client = Some(client);
return response;
} else {
- report_error("no client present".to_string());
+ report_error(&ERROR_MESSAGES, "no client present", Some("get"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
}
None
});
@@ -152,7 +152,11 @@
if let Some(client) = (*maybe_client).take() {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "wait for receiver handle failed",
+ Some("get"),
+ );
}
});
} else {
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -1,6 +1,7 @@
mod constants;
mod get_client;
mod put_client;
+mod tools;
use put_client::{
put_client_blocking_read_cxx, put_client_initialize_cxx,
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -8,6 +8,7 @@
use proto::PutRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use crate::tools::report_error;
use lazy_static::lazy_static;
use libc;
use libc::c_char;
@@ -16,7 +17,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -34,6 +34,7 @@
lazy_static! {
static ref CLIENT: Arc<Mutex<Option<BidiClient>>> =
Arc::new(Mutex::new(None));
+ // todo we should probably create separate clients for different IDs
static ref RUNTIME: Runtime = Runtime::new().unwrap();
static ref ERROR_MESSAGES: Arc<Mutex<Vec<String>>> =
Arc::new(Mutex::new(Vec::new()));
@@ -43,21 +44,12 @@
match CLIENT.lock() {
Ok(client) => client.is_some(),
_ => {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
false
}
}
}
-fn report_error(message: String) {
- println!("[RUST] [put] Error: {}", message);
- if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
- error_messages.push(message);
- return;
- }
- error!("could not access error messages");
-}
-
fn check_error() -> Result<(), String> {
if let Ok(errors) = ERROR_MESSAGES.lock() {
return match errors.is_empty() {
@@ -87,7 +79,7 @@
match String::from_utf8(data.data) {
Ok(utf8_data) => Some(Holder(utf8_data)),
_ => {
- report_error("invalid utf-8".to_string());
+ report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
None
},
}
@@ -96,7 +88,7 @@
match String::from_utf8(data.data).ok() {
Some(utf8_data) => Some(BlobHash(utf8_data)),
None => {
- report_error("invalid utf-8".to_string());
+ report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
None
},
}
@@ -105,7 +97,11 @@
Some(DataChunk(data.data))
}
_ => {
- report_error(format!("invalid field index value {}", data.field_index));
+ report_error(
+ &ERROR_MESSAGES,
+ &format!("invalid field index value {}", data.field_index),
+ Some("put")
+ );
None
}
};
@@ -115,7 +111,11 @@
};
yield request;
} else {
- report_error("an error occured, aborting connection".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "an error occured, aborting connection",
+ Some("put")
+ );
break;
}
}
@@ -142,7 +142,11 @@
{
result = true;
} else {
- report_error("response queue full".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "response queue full",
+ Some("put"),
+ );
}
}
if !result {
@@ -150,14 +154,14 @@
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
break;
}
};
}
}
Err(err) => {
- report_error(err.to_string());
+ report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
}
};
});
@@ -184,15 +188,17 @@
return Some(data);
} else {
report_error(
- "couldn't receive data via client's receiver".to_string(),
+ &ERROR_MESSAGES,
+ "couldn't receive data via client's receiver",
+ Some("put"),
);
}
*maybe_client = Some(client);
} else {
- report_error("no client detected".to_string());
+ report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
}
None
});
@@ -226,16 +232,18 @@
.await
{
Ok(_) => (),
- Err(err) => {
- report_error(format!("send data to receiver failed: {}", err))
- }
+ Err(err) => report_error(
+ &ERROR_MESSAGES,
+ &format!("send data to receiver failed: {}", err),
+ Some("put"),
+ ),
}
*maybe_client = Some(client);
} else {
- report_error("no client detected".to_string());
+ report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
}
} else {
- report_error("couldn't access client".to_string());
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
}
});
check_error()?;
@@ -254,7 +262,11 @@
drop(client.tx);
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+ report_error(
+ &ERROR_MESSAGES,
+ "wait for receiver handle failed",
+ Some("put"),
+ );
}
});
} else {
diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/blob_client/src/tools.rs
@@ -0,0 +1,18 @@
+use std::sync::{Arc, Mutex};
+use tracing::error;
+
+pub fn report_error(
+ error_messages: &Arc<Mutex<Vec<String>>>,
+ message: &str,
+ label_provided: Option<&str>,
+) {
+ let label = match label_provided {
+ Some(value) => format!("[{}]", value),
+ None => "".to_string(),
+ };
+ println!("[RUST] {} Error: {}", label, message);
+ if let Ok(mut error_messages_unpacked) = error_messages.lock() {
+ error_messages_unpacked.push(message.to_string());
+ }
+ error!("could not access error messages");
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Nov 22, 2:10 AM (4 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2559364
Default Alt Text
D5030.diff (9 KB)
Attached To
Mode
D5030: [services] Rust Integration - Backup - Make report_error shared
Attached
Detach File
Event Timeline
Log In to Comment