Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3342923
D5096.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Referenced Files
None
Subscribers
None
D5096.diff
View Options
diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -7,7 +7,7 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use crate::tools::{
- c_char_pointer_to_string_new, string_to_c_char_pointer_new,
+ c_char_pointer_to_string, string_to_c_char_pointer,
};
use anyhow::bail;
use crate::RUNTIME;
@@ -43,9 +43,9 @@
pub fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?;
+ get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
}
// grpc
@@ -98,7 +98,7 @@
pub fn get_client_blocking_read_cxx(
holder_char: *const c_char,
) -> anyhow::Result<Vec<u8>, anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
if let Some(client) = clients.get_mut(&holder) {
@@ -116,7 +116,7 @@
pub fn get_client_terminate_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -9,7 +9,7 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use crate::tools::{
- c_char_pointer_to_string, c_char_pointer_to_string_new, report_error, string_to_c_char_pointer_new,
+ c_char_pointer_to_string, string_to_c_char_pointer,
};
use anyhow::bail;
use crate::RUNTIME;
@@ -32,7 +32,7 @@
tx: mpsc::Sender<PutRequestData>,
rx: mpsc::Receiver<String>,
- rx_handle: JoinHandle<()>,
+ rx_handle: JoinHandle<anyhow::Result<(), anyhow::Error>>,
}
lazy_static! {
@@ -44,33 +44,23 @@
Mutex::new(Vec::new());
}
-fn is_initialized(holder: &str) -> bool {
+fn is_initialized(holder: &str) -> anyhow::Result<bool, anyhow::Error> {
match CLIENTS.lock() {
- Ok(clients) => clients.contains_key(holder),
- _ => {
- report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
- false
- }
+ Ok(clients) => Ok(clients.contains_key(holder)),
+ _ => bail!("couldn't access client")
}
}
-fn is_initialized_new(holder: &str) -> anyhow::Result<bool, anyhow::Error> {
- return Ok(match CLIENTS.lock() {
- Ok(clients) => clients.contains_key(holder),
- _ => {
- bail!("couldn't access client");
- }
- });
-}
-
pub fn put_client_initialize_cxx(
holder_char: *const c_char,
-) -> Result<(), String> {
+) -> anyhow::Result<(), anyhow::Error> {
let holder = c_char_pointer_to_string(holder_char)?;
- assert!(
- !is_initialized(&holder),
- "client cannot be initialized twice"
- );
+ if is_initialized(&holder)? {
+ put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ }
+ if is_initialized(&holder)? {
+ bail!("client cannot be initialized twice");
+ }
// grpc
if let Ok(mut grpc_client) =
RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await })
@@ -79,14 +69,15 @@
mpsc::channel::<PutRequestData>(MPSC_CHANNEL_BUFFER_CAPACITY);
let outbound = async_stream::stream! {
+ let mut maybe_error: Option<String> = None;
while let Some(data) = request_thread_rx.recv().await {
let request_data: Option<put_request::Data> = match data.field_index {
1 => {
match String::from_utf8(data.data) {
Ok(utf8_data) => Some(Holder(utf8_data)),
_ => {
- report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
- None
+ maybe_error = Some("invalid utf-8".to_string());
+ break;
},
}
}
@@ -94,8 +85,8 @@
match String::from_utf8(data.data).ok() {
Some(utf8_data) => Some(BlobHash(utf8_data)),
None => {
- report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
- None
+ maybe_error = Some("invalid utf-8".to_string());
+ break;
},
}
}
@@ -103,12 +94,8 @@
Some(DataChunk(data.data))
}
_ => {
- report_error(
- &ERROR_MESSAGES,
- &format!("invalid field index value {}", data.field_index),
- Some("put")
- );
- None
+ maybe_error = Some(format!("invalid field index value {}", data.field_index));
+ break;
}
};
if let Some (unpacked_data) = request_data {
@@ -117,14 +104,14 @@
};
yield request;
} else {
- report_error(
- &ERROR_MESSAGES,
- "an error occured, aborting connection",
- Some("put")
- );
+ maybe_error = Some("an error occured, aborting connection".to_string());
break;
}
}
+ if let Some(error) = maybe_error {
+ // todo consider handling this differently
+ println!("an error occured in the stream: {}", error);
+ }
};
// spawn receiver thread
@@ -148,11 +135,7 @@
{
result = true;
} else {
- report_error(
- &ERROR_MESSAGES,
- "response queue full",
- Some("put"),
- );
+ bail!("response queue full");
}
}
if !result {
@@ -160,20 +143,20 @@
}
}
Err(err) => {
- report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
- break;
+ bail!(err.to_string());
}
};
}
}
Err(err) => {
- report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
+ bail!(err.to_string());
}
};
+ Ok(())
});
- if is_initialized(&holder) {
- return Err(format!(
+ if is_initialized(&holder)? {
+ bail!(format!(
"client initialization overlapped for holder {}",
holder
));
@@ -187,15 +170,15 @@
(*clients).insert(holder, client);
return Ok(());
}
- return Err(format!("could not access client for holder {}", holder));
+ bail!(format!("could not access client for holder {}", holder));
}
- Err("could not successfully connect to the blob server".to_string())
+ bail!("could not successfully connect to the blob server");
}
pub fn put_client_blocking_read_cxx(
holder_char: *const c_char,
) -> anyhow::Result<String, anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
let maybe_client = clients.get_mut(&holder);
@@ -225,7 +208,7 @@
field_index: usize,
data: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
@@ -253,8 +236,8 @@
pub fn put_client_terminate_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
- if !is_initialized_new(&holder)? {
+ let holder = c_char_pointer_to_string(holder_char)?;
+ if !is_initialized(&holder)? {
return Ok(());
}
@@ -263,10 +246,7 @@
if let Some(client) = maybe_client {
drop(client.tx);
RUNTIME.block_on(async {
- if client.rx_handle.await.is_err() {
- bail!(format!("awaiting for the client {} failed", holder));
- }
- Ok(())
+ client.rx_handle.await?
})?;
} else {
bail!("no client detected in terminate");
@@ -275,7 +255,7 @@
bail!("couldn't access client");
}
- if is_initialized_new(&holder)? {
+ if is_initialized(&holder)? {
bail!("client transmitter handler released properly");
}
Ok(())
diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs
--- a/services/backup/blob_client/src/tools.rs
+++ b/services/backup/blob_client/src/tools.rs
@@ -1,42 +1,14 @@
use libc::c_char;
use std::ffi::{CStr, CString};
-use std::sync::Mutex;
-use tracing::error;
-
-pub fn report_error(
- error_messages: &Mutex<Vec<String>>,
- message: &str,
- label_provided: Option<&str>,
-) {
- let label = match label_provided {
- Some(value) => format!("[{}]", value),
- None => "".to_string(),
- };
- println!("[RUST] {} Error: {}", label, message);
- if let Ok(mut error_messages_unpacked) = error_messages.lock() {
- error_messages_unpacked.push(message.to_string());
- }
- error!("could not access error messages");
-}
pub fn c_char_pointer_to_string(
c_char_pointer: *const c_char,
-) -> Result<String, String> {
- let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) };
- match holder_cstr.to_str() {
- Ok(result) => Ok(result.to_owned()),
- Err(err) => Err(err.to_string()),
- }
-}
-
-pub fn c_char_pointer_to_string_new(
- c_char_pointer: *const c_char,
) -> anyhow::Result<String, anyhow::Error> {
let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) };
Ok(holder_cstr.to_str()?.to_owned())
}
-pub fn string_to_c_char_pointer_new(
+pub fn string_to_c_char_pointer(
signs: &String,
) -> anyhow::Result<*const c_char, anyhow::Error> {
Ok(CString::new((&signs).as_bytes())?.as_ptr())
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 2:19 AM (18 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2567415
Default Alt Text
D5096.diff (10 KB)
Attached To
Mode
D5096: [services] Backup - Blob Get Client - Refactor put initialize
Attached
Detach File
Event Timeline
Log In to Comment