Page MenuHomePhabricator

D5096.diff
No OneTemporary

D5096.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -7,7 +7,7 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use crate::tools::{
- c_char_pointer_to_string_new, string_to_c_char_pointer_new,
+ c_char_pointer_to_string, string_to_c_char_pointer,
};
use anyhow::bail;
use crate::RUNTIME;
@@ -43,9 +43,9 @@
pub fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?;
+ get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
}
// grpc
@@ -98,7 +98,7 @@
pub fn get_client_blocking_read_cxx(
holder_char: *const c_char,
) -> anyhow::Result<Vec<u8>, anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
if let Some(client) = clients.get_mut(&holder) {
@@ -116,7 +116,7 @@
pub fn get_client_terminate_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -9,7 +9,7 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use crate::tools::{
- c_char_pointer_to_string, c_char_pointer_to_string_new, report_error, string_to_c_char_pointer_new,
+ c_char_pointer_to_string, string_to_c_char_pointer,
};
use anyhow::bail;
use crate::RUNTIME;
@@ -32,7 +32,7 @@
tx: mpsc::Sender<PutRequestData>,
rx: mpsc::Receiver<String>,
- rx_handle: JoinHandle<()>,
+ rx_handle: JoinHandle<anyhow::Result<(), anyhow::Error>>,
}
lazy_static! {
@@ -44,33 +44,23 @@
Mutex::new(Vec::new());
}
-fn is_initialized(holder: &str) -> bool {
+fn is_initialized(holder: &str) -> anyhow::Result<bool, anyhow::Error> {
match CLIENTS.lock() {
- Ok(clients) => clients.contains_key(holder),
- _ => {
- report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
- false
- }
+ Ok(clients) => Ok(clients.contains_key(holder)),
+ _ => bail!("couldn't access client")
}
}
-fn is_initialized_new(holder: &str) -> anyhow::Result<bool, anyhow::Error> {
- return Ok(match CLIENTS.lock() {
- Ok(clients) => clients.contains_key(holder),
- _ => {
- bail!("couldn't access client");
- }
- });
-}
-
pub fn put_client_initialize_cxx(
holder_char: *const c_char,
-) -> Result<(), String> {
+) -> anyhow::Result<(), anyhow::Error> {
let holder = c_char_pointer_to_string(holder_char)?;
- assert!(
- !is_initialized(&holder),
- "client cannot be initialized twice"
- );
+ if is_initialized(&holder)? {
+ put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ }
+ if is_initialized(&holder)? {
+ bail!("client cannot be initialized twice");
+ }
// grpc
if let Ok(mut grpc_client) =
RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await })
@@ -79,14 +69,15 @@
mpsc::channel::<PutRequestData>(MPSC_CHANNEL_BUFFER_CAPACITY);
let outbound = async_stream::stream! {
+ let mut maybe_error: Option<String> = None;
while let Some(data) = request_thread_rx.recv().await {
let request_data: Option<put_request::Data> = match data.field_index {
1 => {
match String::from_utf8(data.data) {
Ok(utf8_data) => Some(Holder(utf8_data)),
_ => {
- report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
- None
+ maybe_error = Some("invalid utf-8".to_string());
+ break;
},
}
}
@@ -94,8 +85,8 @@
match String::from_utf8(data.data).ok() {
Some(utf8_data) => Some(BlobHash(utf8_data)),
None => {
- report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put"));
- None
+ maybe_error = Some("invalid utf-8".to_string());
+ break;
},
}
}
@@ -103,12 +94,8 @@
Some(DataChunk(data.data))
}
_ => {
- report_error(
- &ERROR_MESSAGES,
- &format!("invalid field index value {}", data.field_index),
- Some("put")
- );
- None
+ maybe_error = Some(format!("invalid field index value {}", data.field_index));
+ break;
}
};
if let Some (unpacked_data) = request_data {
@@ -117,14 +104,14 @@
};
yield request;
} else {
- report_error(
- &ERROR_MESSAGES,
- "an error occured, aborting connection",
- Some("put")
- );
+ maybe_error = Some("an error occured, aborting connection".to_string());
break;
}
}
+ if let Some(error) = maybe_error {
+ // todo consider handling this differently
+ println!("an error occured in the stream: {}", error);
+ }
};
// spawn receiver thread
@@ -148,11 +135,7 @@
{
result = true;
} else {
- report_error(
- &ERROR_MESSAGES,
- "response queue full",
- Some("put"),
- );
+ bail!("response queue full");
}
}
if !result {
@@ -160,20 +143,20 @@
}
}
Err(err) => {
- report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
- break;
+ bail!(err.to_string());
}
};
}
}
Err(err) => {
- report_error(&ERROR_MESSAGES, &err.to_string(), Some("put"));
+ bail!(err.to_string());
}
};
+ Ok(())
});
- if is_initialized(&holder) {
- return Err(format!(
+ if is_initialized(&holder)? {
+ bail!(format!(
"client initialization overlapped for holder {}",
holder
));
@@ -187,15 +170,15 @@
(*clients).insert(holder, client);
return Ok(());
}
- return Err(format!("could not access client for holder {}", holder));
+ bail!(format!("could not access client for holder {}", holder));
}
- Err("could not successfully connect to the blob server".to_string())
+ bail!("could not successfully connect to the blob server");
}
pub fn put_client_blocking_read_cxx(
holder_char: *const c_char,
) -> anyhow::Result<String, anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
let maybe_client = clients.get_mut(&holder);
@@ -225,7 +208,7 @@
field_index: usize,
data: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
+ let holder = c_char_pointer_to_string(holder_char)?;
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
@@ -253,8 +236,8 @@
pub fn put_client_terminate_cxx(
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string_new(holder_char)?;
- if !is_initialized_new(&holder)? {
+ let holder = c_char_pointer_to_string(holder_char)?;
+ if !is_initialized(&holder)? {
return Ok(());
}
@@ -263,10 +246,7 @@
if let Some(client) = maybe_client {
drop(client.tx);
RUNTIME.block_on(async {
- if client.rx_handle.await.is_err() {
- bail!(format!("awaiting for the client {} failed", holder));
- }
- Ok(())
+ client.rx_handle.await?
})?;
} else {
bail!("no client detected in terminate");
@@ -275,7 +255,7 @@
bail!("couldn't access client");
}
- if is_initialized_new(&holder)? {
+ if is_initialized(&holder)? {
bail!("client transmitter handler released properly");
}
Ok(())
diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs
--- a/services/backup/blob_client/src/tools.rs
+++ b/services/backup/blob_client/src/tools.rs
@@ -1,42 +1,14 @@
use libc::c_char;
use std::ffi::{CStr, CString};
-use std::sync::Mutex;
-use tracing::error;
-
-pub fn report_error(
- error_messages: &Mutex<Vec<String>>,
- message: &str,
- label_provided: Option<&str>,
-) {
- let label = match label_provided {
- Some(value) => format!("[{}]", value),
- None => "".to_string(),
- };
- println!("[RUST] {} Error: {}", label, message);
- if let Ok(mut error_messages_unpacked) = error_messages.lock() {
- error_messages_unpacked.push(message.to_string());
- }
- error!("could not access error messages");
-}
pub fn c_char_pointer_to_string(
c_char_pointer: *const c_char,
-) -> Result<String, String> {
- let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) };
- match holder_cstr.to_str() {
- Ok(result) => Ok(result.to_owned()),
- Err(err) => Err(err.to_string()),
- }
-}
-
-pub fn c_char_pointer_to_string_new(
- c_char_pointer: *const c_char,
) -> anyhow::Result<String, anyhow::Error> {
let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) };
Ok(holder_cstr.to_str()?.to_owned())
}
-pub fn string_to_c_char_pointer_new(
+pub fn string_to_c_char_pointer(
signs: &String,
) -> anyhow::Result<*const c_char, anyhow::Error> {
Ok(CString::new((&signs).as_bytes())?.as_ptr())

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 2:19 AM (18 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2567415
Default Alt Text
D5096.diff (10 KB)

Event Timeline